|  |  |  | import os
 | 
					
						
							|  |  |  | import time
 | 
					
						
							|  |  |  | from abc import abstractmethod, ABC
 | 
					
						
							|  |  |  | from typing import Dict, Tuple, List
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from cereal import car
 | 
					
						
							|  |  |  | from common.kalman.simple_kalman import KF1D
 | 
					
						
							|  |  |  | from common.realtime import DT_CTRL
 | 
					
						
							|  |  |  | from selfdrive.car import gen_empty_fingerprint
 | 
					
						
							|  |  |  | from selfdrive.config import Conversions as CV
 | 
					
						
							|  |  |  | from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX
 | 
					
						
							|  |  |  | from selfdrive.controls.lib.events import Events
 | 
					
						
							|  |  |  | from selfdrive.controls.lib.vehicle_model import VehicleModel
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | GearShifter = car.CarState.GearShifter
 | 
					
						
							|  |  |  | EventName = car.CarEvent.EventName
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | MAX_CTRL_SPEED = (V_CRUISE_MAX + 4) * CV.KPH_TO_MS
 | 
					
						
							|  |  |  | ACCEL_MAX = 2.0
 | 
					
						
							|  |  |  | ACCEL_MIN = -3.5
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # generic car and radar interfaces
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class CarInterfaceBase(ABC):
 | 
					
						
							|  |  |  |   def __init__(self, CP, CarController, CarState):
 | 
					
						
							|  |  |  |     self.CP = CP
 | 
					
						
							|  |  |  |     self.VM = VehicleModel(CP)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.frame = 0
 | 
					
						
							|  |  |  |     self.steering_unpressed = 0
 | 
					
						
							|  |  |  |     self.low_speed_alert = False
 | 
					
						
							|  |  |  |     self.silent_steer_warning = True
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if CarState is not None:
 | 
					
						
							|  |  |  |       self.CS = CarState(CP)
 | 
					
						
							|  |  |  |       self.cp = self.CS.get_can_parser(CP)
 | 
					
						
							|  |  |  |       self.cp_cam = self.CS.get_cam_can_parser(CP)
 | 
					
						
							|  |  |  |       self.cp_body = self.CS.get_body_can_parser(CP)
 | 
					
						
							|  |  |  |       self.cp_loopback = self.CS.get_loopback_can_parser(CP)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.CC = None
 | 
					
						
							|  |  |  |     if CarController is not None:
 | 
					
						
							|  |  |  |       self.CC = CarController(self.cp.dbc_name, CP, self.VM)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def get_pid_accel_limits(CP, current_speed, cruise_speed):
 | 
					
						
							|  |  |  |     return ACCEL_MIN, ACCEL_MAX
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   @abstractmethod
 | 
					
						
							|  |  |  |   def get_params(candidate, fingerprint=gen_empty_fingerprint(), car_fw=None):
 | 
					
						
							|  |  |  |     pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def init(CP, logcan, sendcan):
 | 
					
						
							|  |  |  |     pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def get_steer_feedforward_default(desired_angle, v_ego):
 | 
					
						
							|  |  |  |     # Proportional to realigning tire momentum: lateral acceleration.
 | 
					
						
							|  |  |  |     # TODO: something with lateralPlan.curvatureRates
 | 
					
						
							|  |  |  |     return desired_angle * (v_ego**2)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @classmethod
 | 
					
						
							|  |  |  |   def get_steer_feedforward_function(cls):
 | 
					
						
							|  |  |  |     return cls.get_steer_feedforward_default
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # returns a set of default params to avoid repetition in car specific params
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def get_std_params(candidate, fingerprint):
 | 
					
						
							|  |  |  |     ret = car.CarParams.new_message()
 | 
					
						
							|  |  |  |     ret.carFingerprint = candidate
 | 
					
						
							|  |  |  |     ret.unsafeMode = 0  # see panda/board/safety_declarations.h for allowed values
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # standard ALC params
 | 
					
						
							|  |  |  |     ret.steerControlType = car.CarParams.SteerControlType.torque
 | 
					
						
							|  |  |  |     ret.steerMaxBP = [0.]
 | 
					
						
							|  |  |  |     ret.steerMaxV = [1.]
 | 
					
						
							|  |  |  |     ret.minSteerSpeed = 0.
 | 
					
						
							|  |  |  |     ret.wheelSpeedFactor = 1.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     ret.pcmCruise = True     # openpilot's state is tied to the PCM's cruise state on most cars
 | 
					
						
							|  |  |  |     ret.minEnableSpeed = -1. # enable is done by stock ACC, so ignore this
 | 
					
						
							|  |  |  |     ret.steerRatioRear = 0.  # no rear steering, at least on the listed cars aboveA
 | 
					
						
							|  |  |  |     ret.openpilotLongitudinalControl = False
 | 
					
						
							|  |  |  |     ret.stopAccel = -2.0
 | 
					
						
							|  |  |  |     ret.stoppingDecelRate = 0.8 # brake_travel/s while trying to stop
 | 
					
						
							|  |  |  |     ret.vEgoStopping = 0.5
 | 
					
						
							|  |  |  |     ret.vEgoStarting = 0.5  # needs to be >= vEgoStopping to avoid state transition oscillation
 | 
					
						
							|  |  |  |     ret.stoppingControl = True
 | 
					
						
							|  |  |  |     ret.longitudinalTuning.deadzoneBP = [0.]
 | 
					
						
							|  |  |  |     ret.longitudinalTuning.deadzoneV = [0.]
 | 
					
						
							|  |  |  |     ret.longitudinalTuning.kf = 1.
 | 
					
						
							|  |  |  |     ret.longitudinalTuning.kpBP = [0.]
 | 
					
						
							|  |  |  |     ret.longitudinalTuning.kpV = [1.]
 | 
					
						
							|  |  |  |     ret.longitudinalTuning.kiBP = [0.]
 | 
					
						
							|  |  |  |     ret.longitudinalTuning.kiV = [1.]
 | 
					
						
							|  |  |  |     # TODO estimate car specific lag, use .15s for now
 | 
					
						
							|  |  |  |     ret.longitudinalActuatorDelayLowerBound = 0.15
 | 
					
						
							|  |  |  |     ret.longitudinalActuatorDelayUpperBound = 0.15
 | 
					
						
							|  |  |  |     ret.steerLimitTimer = 1.0
 | 
					
						
							|  |  |  |     return ret
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @abstractmethod
 | 
					
						
							|  |  |  |   def update(self, c: car.CarControl, can_strings: List[bytes]) -> car.CarState:
 | 
					
						
							|  |  |  |     pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @abstractmethod
 | 
					
						
							|  |  |  |   def apply(self, c: car.CarControl) -> Tuple[car.CarControl.Actuators, List[bytes]]:
 | 
					
						
							|  |  |  |     pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def create_common_events(self, cs_out, extra_gears=None, pcm_enable=True):
 | 
					
						
							|  |  |  |     events = Events()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if cs_out.doorOpen:
 | 
					
						
							|  |  |  |       events.add(EventName.doorOpen)
 | 
					
						
							|  |  |  |     if cs_out.seatbeltUnlatched:
 | 
					
						
							|  |  |  |       events.add(EventName.seatbeltNotLatched)
 | 
					
						
							|  |  |  |     if cs_out.gearShifter != GearShifter.drive and (extra_gears is None or
 | 
					
						
							|  |  |  |        cs_out.gearShifter not in extra_gears):
 | 
					
						
							|  |  |  |       events.add(EventName.wrongGear)
 | 
					
						
							|  |  |  |     if cs_out.gearShifter == GearShifter.reverse:
 | 
					
						
							|  |  |  |       events.add(EventName.reverseGear)
 | 
					
						
							|  |  |  |     if not cs_out.cruiseState.available:
 | 
					
						
							|  |  |  |       events.add(EventName.wrongCarMode)
 | 
					
						
							|  |  |  |     if cs_out.espDisabled:
 | 
					
						
							|  |  |  |       events.add(EventName.espDisabled)
 | 
					
						
							|  |  |  |     if cs_out.gasPressed:
 | 
					
						
							|  |  |  |       events.add(EventName.gasPressed)
 | 
					
						
							|  |  |  |     if cs_out.stockFcw:
 | 
					
						
							|  |  |  |       events.add(EventName.stockFcw)
 | 
					
						
							|  |  |  |     if cs_out.stockAeb:
 | 
					
						
							|  |  |  |       events.add(EventName.stockAeb)
 | 
					
						
							|  |  |  |     if cs_out.vEgo > MAX_CTRL_SPEED:
 | 
					
						
							|  |  |  |       events.add(EventName.speedTooHigh)
 | 
					
						
							|  |  |  |     if cs_out.cruiseState.nonAdaptive:
 | 
					
						
							|  |  |  |       events.add(EventName.wrongCruiseMode)
 | 
					
						
							|  |  |  |     if cs_out.brakeHoldActive and self.CP.openpilotLongitudinalControl:
 | 
					
						
							|  |  |  |       events.add(EventName.brakeHold)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Handle permanent and temporary steering faults
 | 
					
						
							|  |  |  |     self.steering_unpressed = 0 if cs_out.steeringPressed else self.steering_unpressed + 1
 | 
					
						
							|  |  |  |     if cs_out.steerWarning:
 | 
					
						
							|  |  |  |       # if the user overrode recently, show a less harsh alert
 | 
					
						
							|  |  |  |       if self.silent_steer_warning or cs_out.standstill or self.steering_unpressed < int(1.5 / DT_CTRL):
 | 
					
						
							|  |  |  |         self.silent_steer_warning = True
 | 
					
						
							|  |  |  |         events.add(EventName.steerTempUnavailableSilent)
 | 
					
						
							|  |  |  |       else:
 | 
					
						
							|  |  |  |         events.add(EventName.steerTempUnavailable)
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.silent_steer_warning = False
 | 
					
						
							|  |  |  |     if cs_out.steerError:
 | 
					
						
							|  |  |  |       events.add(EventName.steerUnavailable)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Disable on rising edge of gas or brake. Also disable on brake when speed > 0.
 | 
					
						
							|  |  |  |     if (cs_out.gasPressed and not self.CS.out.gasPressed) or \
 | 
					
						
							|  |  |  |        (cs_out.brakePressed and (not self.CS.out.brakePressed or not cs_out.standstill)):
 | 
					
						
							|  |  |  |       events.add(EventName.pedalPressed)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # we engage when pcm is active (rising edge)
 | 
					
						
							|  |  |  |     if pcm_enable:
 | 
					
						
							|  |  |  |       if cs_out.cruiseState.enabled and not self.CS.out.cruiseState.enabled:
 | 
					
						
							|  |  |  |         events.add(EventName.pcmEnable)
 | 
					
						
							|  |  |  |       elif not cs_out.cruiseState.enabled:
 | 
					
						
							|  |  |  |         events.add(EventName.pcmDisable)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return events
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class RadarInterfaceBase(ABC):
 | 
					
						
							|  |  |  |   def __init__(self, CP):
 | 
					
						
							|  |  |  |     self.pts = {}
 | 
					
						
							|  |  |  |     self.delay = 0
 | 
					
						
							|  |  |  |     self.radar_ts = CP.radarTimeStep
 | 
					
						
							|  |  |  |     self.no_radar_sleep = 'NO_RADAR_SLEEP' in os.environ
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def update(self, can_strings):
 | 
					
						
							|  |  |  |     ret = car.RadarData.new_message()
 | 
					
						
							|  |  |  |     if not self.no_radar_sleep:
 | 
					
						
							|  |  |  |       time.sleep(self.radar_ts)  # radard runs on RI updates
 | 
					
						
							|  |  |  |     return ret
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class CarStateBase(ABC):
 | 
					
						
							|  |  |  |   def __init__(self, CP):
 | 
					
						
							|  |  |  |     self.CP = CP
 | 
					
						
							|  |  |  |     self.car_fingerprint = CP.carFingerprint
 | 
					
						
							|  |  |  |     self.out = car.CarState.new_message()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.cruise_buttons = 0
 | 
					
						
							|  |  |  |     self.left_blinker_cnt = 0
 | 
					
						
							|  |  |  |     self.right_blinker_cnt = 0
 | 
					
						
							|  |  |  |     self.left_blinker_prev = False
 | 
					
						
							|  |  |  |     self.right_blinker_prev = False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Q = np.matrix([[10.0, 0.0], [0.0, 100.0]])
 | 
					
						
							|  |  |  |     # R = 1e3
 | 
					
						
							|  |  |  |     self.v_ego_kf = KF1D(x0=[[0.0], [0.0]],
 | 
					
						
							|  |  |  |                          A=[[1.0, DT_CTRL], [0.0, 1.0]],
 | 
					
						
							|  |  |  |                          C=[1.0, 0.0],
 | 
					
						
							|  |  |  |                          K=[[0.12287673], [0.29666309]])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def update_speed_kf(self, v_ego_raw):
 | 
					
						
							|  |  |  |     if abs(v_ego_raw - self.v_ego_kf.x[0][0]) > 2.0:  # Prevent large accelerations when car starts at non zero speed
 | 
					
						
							|  |  |  |       self.v_ego_kf.x = [[v_ego_raw], [0.0]]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     v_ego_x = self.v_ego_kf.update(v_ego_raw)
 | 
					
						
							|  |  |  |     return float(v_ego_x[0]), float(v_ego_x[1])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get_wheel_speeds(self, fl, fr, rl, rr, unit=CV.KPH_TO_MS):
 | 
					
						
							|  |  |  |     factor = unit * self.CP.wheelSpeedFactor
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     wheelSpeeds = car.CarState.WheelSpeeds.new_message()
 | 
					
						
							|  |  |  |     wheelSpeeds.fl = fl * factor
 | 
					
						
							|  |  |  |     wheelSpeeds.fr = fr * factor
 | 
					
						
							|  |  |  |     wheelSpeeds.rl = rl * factor
 | 
					
						
							|  |  |  |     wheelSpeeds.rr = rr * factor
 | 
					
						
							|  |  |  |     return wheelSpeeds
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def update_blinker_from_lamp(self, blinker_time: int, left_blinker_lamp: bool, right_blinker_lamp: bool):
 | 
					
						
							|  |  |  |     """Update blinkers from lights. Enable output when light was seen within the last `blinker_time`
 | 
					
						
							|  |  |  |     iterations"""
 | 
					
						
							|  |  |  |     # TODO: Handle case when switching direction. Now both blinkers can be on at the same time
 | 
					
						
							|  |  |  |     self.left_blinker_cnt = blinker_time if left_blinker_lamp else max(self.left_blinker_cnt - 1, 0)
 | 
					
						
							|  |  |  |     self.right_blinker_cnt = blinker_time if right_blinker_lamp else max(self.right_blinker_cnt - 1, 0)
 | 
					
						
							|  |  |  |     return self.left_blinker_cnt > 0, self.right_blinker_cnt > 0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def update_blinker_from_stalk(self, blinker_time: int, left_blinker_stalk: bool, right_blinker_stalk: bool):
 | 
					
						
							|  |  |  |     """Update blinkers from stalk position. When stalk is seen the blinker will be on for at least blinker_time,
 | 
					
						
							|  |  |  |     or until the stalk is turned off, whichever is longer. If the opposite stalk direction is seen the blinker
 | 
					
						
							|  |  |  |     is forced to the other side. On a rising edge of the stalk the timeout is reset."""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if left_blinker_stalk:
 | 
					
						
							|  |  |  |       self.right_blinker_cnt = 0
 | 
					
						
							|  |  |  |       if not self.left_blinker_prev:
 | 
					
						
							|  |  |  |         self.left_blinker_cnt = blinker_time
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if right_blinker_stalk:
 | 
					
						
							|  |  |  |       self.left_blinker_cnt = 0
 | 
					
						
							|  |  |  |       if not self.right_blinker_prev:
 | 
					
						
							|  |  |  |         self.right_blinker_cnt = blinker_time
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.left_blinker_cnt = max(self.left_blinker_cnt - 1, 0)
 | 
					
						
							|  |  |  |     self.right_blinker_cnt = max(self.right_blinker_cnt - 1, 0)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.left_blinker_prev = left_blinker_stalk
 | 
					
						
							|  |  |  |     self.right_blinker_prev = right_blinker_stalk
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return bool(left_blinker_stalk or self.left_blinker_cnt > 0), bool(right_blinker_stalk or self.right_blinker_cnt > 0)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def parse_gear_shifter(gear: str) -> car.CarState.GearShifter:
 | 
					
						
							|  |  |  |     d: Dict[str, car.CarState.GearShifter] = {
 | 
					
						
							|  |  |  |         'P': GearShifter.park, 'R': GearShifter.reverse, 'N': GearShifter.neutral,
 | 
					
						
							|  |  |  |         'E': GearShifter.eco, 'T': GearShifter.manumatic, 'D': GearShifter.drive,
 | 
					
						
							|  |  |  |         'S': GearShifter.sport, 'L': GearShifter.low, 'B': GearShifter.brake
 | 
					
						
							|  |  |  |     }
 | 
					
						
							|  |  |  |     return d.get(gear, GearShifter.unknown)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def get_cam_can_parser(CP):
 | 
					
						
							|  |  |  |     return None
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def get_body_can_parser(CP):
 | 
					
						
							|  |  |  |     return None
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @staticmethod
 | 
					
						
							|  |  |  |   def get_loopback_can_parser(CP):
 | 
					
						
							|  |  |  |     return None
 |