You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							202 lines
						
					
					
						
							6.8 KiB
						
					
					
				
			
		
		
	
	
							202 lines
						
					
					
						
							6.8 KiB
						
					
					
				import os
 | 
						|
import time
 | 
						|
from typing import Dict
 | 
						|
 | 
						|
from cereal import car
 | 
						|
from common.kalman.simple_kalman import KF1D
 | 
						|
from common.realtime import DT_CTRL
 | 
						|
from selfdrive.car import gen_empty_fingerprint
 | 
						|
from selfdrive.config import Conversions as CV
 | 
						|
from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX
 | 
						|
from selfdrive.controls.lib.events import Events
 | 
						|
from selfdrive.controls.lib.vehicle_model import VehicleModel
 | 
						|
 | 
						|
GearShifter = car.CarState.GearShifter
 | 
						|
EventName = car.CarEvent.EventName
 | 
						|
MAX_CTRL_SPEED = (V_CRUISE_MAX + 4) * CV.KPH_TO_MS  # 135 + 4 = 86 mph
 | 
						|
 | 
						|
# generic car and radar interfaces
 | 
						|
 | 
						|
 | 
						|
class CarInterfaceBase():
 | 
						|
  def __init__(self, CP, CarController, CarState):
 | 
						|
    self.CP = CP
 | 
						|
    self.VM = VehicleModel(CP)
 | 
						|
 | 
						|
    self.frame = 0
 | 
						|
    self.low_speed_alert = False
 | 
						|
 | 
						|
    if CarState is not None:
 | 
						|
      self.CS = CarState(CP)
 | 
						|
      self.cp = self.CS.get_can_parser(CP)
 | 
						|
      self.cp_cam = self.CS.get_cam_can_parser(CP)
 | 
						|
      self.cp_body = self.CS.get_body_can_parser(CP)
 | 
						|
 | 
						|
    self.CC = None
 | 
						|
    if CarController is not None:
 | 
						|
      self.CC = CarController(self.cp.dbc_name, CP, self.VM)
 | 
						|
 | 
						|
  @staticmethod
 | 
						|
  def calc_accel_override(a_ego, a_target, v_ego, v_target):
 | 
						|
    return 1.
 | 
						|
 | 
						|
  @staticmethod
 | 
						|
  def compute_gb(accel, speed):
 | 
						|
    raise NotImplementedError
 | 
						|
 | 
						|
  @staticmethod
 | 
						|
  def get_params(candidate, fingerprint=gen_empty_fingerprint(), car_fw=None):
 | 
						|
    raise NotImplementedError
 | 
						|
 | 
						|
  # returns a set of default params to avoid repetition in car specific params
 | 
						|
  @staticmethod
 | 
						|
  def get_std_params(candidate, fingerprint):
 | 
						|
    ret = car.CarParams.new_message()
 | 
						|
    ret.carFingerprint = candidate
 | 
						|
 | 
						|
    # standard ALC params
 | 
						|
    ret.steerControlType = car.CarParams.SteerControlType.torque
 | 
						|
    ret.steerMaxBP = [0.]
 | 
						|
    ret.steerMaxV = [1.]
 | 
						|
    ret.minSteerSpeed = 0.
 | 
						|
 | 
						|
    # stock ACC by default
 | 
						|
    ret.enableCruise = True
 | 
						|
    ret.minEnableSpeed = -1.  # enable is done by stock ACC, so ignore this
 | 
						|
    ret.steerRatioRear = 0.  # no rear steering, at least on the listed cars aboveA
 | 
						|
    ret.gasMaxBP = [0.]
 | 
						|
    ret.gasMaxV = [.5]  # half max brake
 | 
						|
    ret.brakeMaxBP = [0.]
 | 
						|
    ret.brakeMaxV = [1.]
 | 
						|
    ret.openpilotLongitudinalControl = False
 | 
						|
    ret.startAccel = 0.0
 | 
						|
    ret.minSpeedCan = 0.3
 | 
						|
    ret.stoppingBrakeRate = 0.2 # brake_travel/s while trying to stop
 | 
						|
    ret.startingBrakeRate = 0.8 # brake_travel/s while releasing on restart
 | 
						|
    ret.stoppingControl = False
 | 
						|
    ret.longitudinalTuning.deadzoneBP = [0.]
 | 
						|
    ret.longitudinalTuning.deadzoneV = [0.]
 | 
						|
    ret.longitudinalTuning.kpBP = [0.]
 | 
						|
    ret.longitudinalTuning.kpV = [1.]
 | 
						|
    ret.longitudinalTuning.kiBP = [0.]
 | 
						|
    ret.longitudinalTuning.kiV = [1.]
 | 
						|
    return ret
 | 
						|
 | 
						|
  # returns a car.CarState, pass in car.CarControl
 | 
						|
  def update(self, c, can_strings):
 | 
						|
    raise NotImplementedError
 | 
						|
 | 
						|
  # return sendcan, pass in a car.CarControl
 | 
						|
  def apply(self, c):
 | 
						|
    raise NotImplementedError
 | 
						|
 | 
						|
  def create_common_events(self, cs_out, extra_gears=[], gas_resume_speed=-1, pcm_enable=True):  # pylint: disable=dangerous-default-value
 | 
						|
    events = Events()
 | 
						|
 | 
						|
    if cs_out.doorOpen:
 | 
						|
      events.add(EventName.doorOpen)
 | 
						|
    if cs_out.seatbeltUnlatched:
 | 
						|
      events.add(EventName.seatbeltNotLatched)
 | 
						|
    if cs_out.gearShifter != GearShifter.drive and cs_out.gearShifter not in extra_gears:
 | 
						|
      events.add(EventName.wrongGear)
 | 
						|
    if cs_out.gearShifter == GearShifter.reverse:
 | 
						|
      events.add(EventName.reverseGear)
 | 
						|
    if not cs_out.cruiseState.available:
 | 
						|
      events.add(EventName.wrongCarMode)
 | 
						|
    if cs_out.espDisabled:
 | 
						|
      events.add(EventName.espDisabled)
 | 
						|
    if cs_out.gasPressed:
 | 
						|
      events.add(EventName.gasPressed)
 | 
						|
    if cs_out.stockFcw:
 | 
						|
      events.add(EventName.stockFcw)
 | 
						|
    if cs_out.stockAeb:
 | 
						|
      events.add(EventName.stockAeb)
 | 
						|
    if cs_out.vEgo > MAX_CTRL_SPEED:
 | 
						|
      events.add(EventName.speedTooHigh)
 | 
						|
    if cs_out.cruiseState.nonAdaptive:
 | 
						|
      events.add(EventName.wrongCruiseMode)
 | 
						|
 | 
						|
    if cs_out.steerError:
 | 
						|
      events.add(EventName.steerUnavailable)
 | 
						|
    elif cs_out.steerWarning:
 | 
						|
      if cs_out.steeringPressed:
 | 
						|
        events.add(EventName.steerTempUnavailableUserOverride)
 | 
						|
      else:
 | 
						|
        events.add(EventName.steerTempUnavailable)
 | 
						|
 | 
						|
    # Disable on rising edge of gas or brake. Also disable on brake when speed > 0.
 | 
						|
    # Optionally allow to press gas at zero speed to resume.
 | 
						|
    # e.g. Chrysler does not spam the resume button yet, so resuming with gas is handy. FIXME!
 | 
						|
    if (cs_out.gasPressed and (not self.CS.out.gasPressed) and cs_out.vEgo > gas_resume_speed) or \
 | 
						|
       (cs_out.brakePressed and (not self.CS.out.brakePressed or not cs_out.standstill)):
 | 
						|
      events.add(EventName.pedalPressed)
 | 
						|
 | 
						|
    # we engage when pcm is active (rising edge)
 | 
						|
    if pcm_enable:
 | 
						|
      if cs_out.cruiseState.enabled and not self.CS.out.cruiseState.enabled:
 | 
						|
        events.add(EventName.pcmEnable)
 | 
						|
      elif not cs_out.cruiseState.enabled:
 | 
						|
        events.add(EventName.pcmDisable)
 | 
						|
 | 
						|
    return events
 | 
						|
 | 
						|
 | 
						|
class RadarInterfaceBase():
 | 
						|
  def __init__(self, CP):
 | 
						|
    self.pts = {}
 | 
						|
    self.delay = 0
 | 
						|
    self.radar_ts = CP.radarTimeStep
 | 
						|
    self.no_radar_sleep = 'NO_RADAR_SLEEP' in os.environ
 | 
						|
 | 
						|
  def update(self, can_strings):
 | 
						|
    ret = car.RadarData.new_message()
 | 
						|
    if not self.no_radar_sleep:
 | 
						|
      time.sleep(self.radar_ts)  # radard runs on RI updates
 | 
						|
    return ret
 | 
						|
 | 
						|
 | 
						|
class CarStateBase:
 | 
						|
  def __init__(self, CP):
 | 
						|
    self.CP = CP
 | 
						|
    self.car_fingerprint = CP.carFingerprint
 | 
						|
    self.out = car.CarState.new_message()
 | 
						|
 | 
						|
    self.cruise_buttons = 0
 | 
						|
    self.left_blinker_cnt = 0
 | 
						|
    self.right_blinker_cnt = 0
 | 
						|
 | 
						|
    # Q = np.matrix([[10.0, 0.0], [0.0, 100.0]])
 | 
						|
    # R = 1e3
 | 
						|
    self.v_ego_kf = KF1D(x0=[[0.0], [0.0]],
 | 
						|
                         A=[[1.0, DT_CTRL], [0.0, 1.0]],
 | 
						|
                         C=[1.0, 0.0],
 | 
						|
                         K=[[0.12287673], [0.29666309]])
 | 
						|
 | 
						|
  def update_speed_kf(self, v_ego_raw):
 | 
						|
    if abs(v_ego_raw - self.v_ego_kf.x[0][0]) > 2.0:  # Prevent large accelerations when car starts at non zero speed
 | 
						|
      self.v_ego_kf.x = [[v_ego_raw], [0.0]]
 | 
						|
 | 
						|
    v_ego_x = self.v_ego_kf.update(v_ego_raw)
 | 
						|
    return float(v_ego_x[0]), float(v_ego_x[1])
 | 
						|
 | 
						|
  def update_blinker(self, blinker_time: int, left_blinker_lamp: bool, right_blinker_lamp: bool):
 | 
						|
    self.left_blinker_cnt = blinker_time if left_blinker_lamp else max(self.left_blinker_cnt - 1, 0)
 | 
						|
    self.right_blinker_cnt = blinker_time if right_blinker_lamp else max(self.right_blinker_cnt - 1, 0)
 | 
						|
    return self.left_blinker_cnt > 0, self.right_blinker_cnt > 0
 | 
						|
 | 
						|
  @staticmethod
 | 
						|
  def parse_gear_shifter(gear: str) -> car.CarState.GearShifter:
 | 
						|
    d: Dict[str, car.CarState.GearShifter] = {
 | 
						|
        'P': GearShifter.park, 'R': GearShifter.reverse, 'N': GearShifter.neutral,
 | 
						|
        'E': GearShifter.eco, 'T': GearShifter.manumatic, 'D': GearShifter.drive,
 | 
						|
        'S': GearShifter.sport, 'L': GearShifter.low, 'B': GearShifter.brake
 | 
						|
    }
 | 
						|
    return d.get(gear, GearShifter.unknown)
 | 
						|
 | 
						|
  @staticmethod
 | 
						|
  def get_cam_can_parser(CP):
 | 
						|
    return None
 | 
						|
 | 
						|
  @staticmethod
 | 
						|
  def get_body_can_parser(CP):
 | 
						|
    return None
 | 
						|
 |