import yaml import os import time from abc import abstractmethod, ABC from typing import Any, Dict, Optional, Tuple, List from cereal import car from common.basedir import BASEDIR from common.conversions import Conversions as CV from common.kalman.simple_kalman import KF1D from common.realtime import DT_CTRL from selfdrive.car import gen_empty_fingerprint from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX from selfdrive.controls.lib.events import Events from selfdrive.controls.lib.vehicle_model import VehicleModel GearShifter = car.CarState.GearShifter EventName = car.CarEvent.EventName MAX_CTRL_SPEED = (V_CRUISE_MAX + 4) * CV.KPH_TO_MS ACCEL_MAX = 2.0 ACCEL_MIN = -3.5 TORQUE_PARAMS_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/params.yaml') TORQUE_OVERRIDE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/override.yaml') TORQUE_SUBSTITUTE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/substitute.yaml') def get_torque_params(candidate): with open(TORQUE_SUBSTITUTE_PATH) as f: sub = yaml.load(f, Loader=yaml.CSafeLoader) if candidate in sub: candidate = sub[candidate] with open(TORQUE_PARAMS_PATH) as f: params = yaml.load(f, Loader=yaml.CSafeLoader) with open(TORQUE_OVERRIDE_PATH) as f: override = yaml.load(f, Loader=yaml.CSafeLoader) # Ensure no overlap if sum([candidate in x for x in [sub, params, override]]) > 1: raise RuntimeError(f'{candidate} is defined twice in torque config') if candidate in override: out = override[candidate] elif candidate in params: out = params[candidate] else: raise NotImplementedError(f"Did not find torque params for {candidate}") return {key: out[i] for i, key in enumerate(params['legend'])} # generic car and radar interfaces class CarInterfaceBase(ABC): def __init__(self, CP, CarController, CarState): self.CP = CP self.VM = VehicleModel(CP) self.frame = 0 self.steering_unpressed = 0 self.low_speed_alert = False self.silent_steer_warning = True self.CS = None self.can_parsers = [] if CarState is not None: self.CS = CarState(CP) self.cp = self.CS.get_can_parser(CP) self.cp_cam = self.CS.get_cam_can_parser(CP) self.cp_adas = self.CS.get_adas_can_parser(CP) self.cp_body = self.CS.get_body_can_parser(CP) self.cp_loopback = self.CS.get_loopback_can_parser(CP) self.can_parsers = [self.cp, self.cp_cam, self.cp_adas, self.cp_body, self.cp_loopback] self.CC = None if CarController is not None: self.CC = CarController(self.cp.dbc_name, CP, self.VM) @staticmethod def get_pid_accel_limits(CP, current_speed, cruise_speed): return ACCEL_MIN, ACCEL_MAX @staticmethod @abstractmethod def get_params(candidate, fingerprint=gen_empty_fingerprint(), car_fw=None, disable_radar=False): pass @staticmethod def init(CP, logcan, sendcan): pass @staticmethod def get_steer_feedforward_default(desired_angle, v_ego): # Proportional to realigning tire momentum: lateral acceleration. # TODO: something with lateralPlan.curvatureRates return desired_angle * (v_ego**2) def get_steer_feedforward_function(self): return self.get_steer_feedforward_default # returns a set of default params to avoid repetition in car specific params @staticmethod def get_std_params(candidate, fingerprint): ret = car.CarParams.new_message() ret.carFingerprint = candidate # standard ALC params ret.steerControlType = car.CarParams.SteerControlType.torque ret.minSteerSpeed = 0. ret.wheelSpeedFactor = 1.0 ret.maxLateralAccel = get_torque_params(candidate)['MAX_LAT_ACCEL_MEASURED'] ret.pcmCruise = True # openpilot's state is tied to the PCM's cruise state on most cars ret.minEnableSpeed = -1. # enable is done by stock ACC, so ignore this ret.steerRatioRear = 0. # no rear steering, at least on the listed cars aboveA ret.openpilotLongitudinalControl = False ret.stopAccel = -2.0 ret.stoppingDecelRate = 0.8 # brake_travel/s while trying to stop ret.vEgoStopping = 0.5 ret.vEgoStarting = 0.5 ret.stoppingControl = True ret.longitudinalTuning.deadzoneBP = [0.] ret.longitudinalTuning.deadzoneV = [0.] ret.longitudinalTuning.kf = 1. ret.longitudinalTuning.kpBP = [0.] ret.longitudinalTuning.kpV = [1.] ret.longitudinalTuning.kiBP = [0.] ret.longitudinalTuning.kiV = [1.] # TODO estimate car specific lag, use .15s for now ret.longitudinalActuatorDelayLowerBound = 0.15 ret.longitudinalActuatorDelayUpperBound = 0.15 ret.steerLimitTimer = 1.0 return ret @staticmethod def configure_torque_tune(candidate, tune, steering_angle_deadzone_deg=0.0, use_steering_angle=True): params = get_torque_params(candidate) tune.init('torque') tune.torque.useSteeringAngle = use_steering_angle tune.torque.kp = 1.0 / params['LAT_ACCEL_FACTOR'] tune.torque.kf = 1.0 / params['LAT_ACCEL_FACTOR'] tune.torque.ki = 0.1 / params['LAT_ACCEL_FACTOR'] tune.torque.friction = params['FRICTION'] tune.torque.steeringAngleDeadzoneDeg = steering_angle_deadzone_deg @abstractmethod def _update(self, c: car.CarControl) -> car.CarState: pass def update(self, c: car.CarControl, can_strings: List[bytes]) -> car.CarState: # parse can for cp in self.can_parsers: if cp is not None: cp.update_strings(can_strings) # get CarState ret = self._update(c) ret.canValid = all(cp.can_valid for cp in self.can_parsers if cp is not None) ret.canTimeout = any(cp.bus_timeout for cp in self.can_parsers if cp is not None) if ret.vEgoCluster == 0.0: ret.vEgoCluster = ret.vEgo if ret.cruiseState.speedCluster == 0: ret.cruiseState.speedCluster = ret.cruiseState.speed # copy back for next iteration reader = ret.as_reader() if self.CS is not None: self.CS.out = reader return reader @abstractmethod def apply(self, c: car.CarControl) -> Tuple[car.CarControl.Actuators, List[bytes]]: pass def create_common_events(self, cs_out, extra_gears=None, pcm_enable=True, allow_enable=True): events = Events() if cs_out.doorOpen: events.add(EventName.doorOpen) if cs_out.seatbeltUnlatched: events.add(EventName.seatbeltNotLatched) if cs_out.gearShifter != GearShifter.drive and (extra_gears is None or cs_out.gearShifter not in extra_gears): events.add(EventName.wrongGear) if cs_out.gearShifter == GearShifter.reverse: events.add(EventName.reverseGear) if not cs_out.cruiseState.available: events.add(EventName.wrongCarMode) if cs_out.espDisabled: events.add(EventName.espDisabled) if cs_out.stockFcw: events.add(EventName.stockFcw) if cs_out.stockAeb: events.add(EventName.stockAeb) if cs_out.vEgo > MAX_CTRL_SPEED: events.add(EventName.speedTooHigh) if cs_out.cruiseState.nonAdaptive: events.add(EventName.wrongCruiseMode) if cs_out.brakeHoldActive and self.CP.openpilotLongitudinalControl: events.add(EventName.brakeHold) if cs_out.parkingBrake: events.add(EventName.parkBrake) if cs_out.accFaulted: events.add(EventName.accFaulted) # Handle permanent and temporary steering faults self.steering_unpressed = 0 if cs_out.steeringPressed else self.steering_unpressed + 1 if cs_out.steerFaultTemporary: # if the user overrode recently, show a less harsh alert if self.silent_steer_warning or cs_out.standstill or self.steering_unpressed < int(1.5 / DT_CTRL): self.silent_steer_warning = True events.add(EventName.steerTempUnavailableSilent) else: events.add(EventName.steerTempUnavailable) else: self.silent_steer_warning = False if cs_out.steerFaultPermanent: events.add(EventName.steerUnavailable) # we engage when pcm is active (rising edge) # enabling can optionally be blocked by the car interface if pcm_enable: if cs_out.cruiseState.enabled and not self.CS.out.cruiseState.enabled and allow_enable: events.add(EventName.pcmEnable) elif not cs_out.cruiseState.enabled: events.add(EventName.pcmDisable) return events class RadarInterfaceBase(ABC): def __init__(self, CP): self.rcp = None self.pts = {} self.delay = 0 self.radar_ts = CP.radarTimeStep self.no_radar_sleep = 'NO_RADAR_SLEEP' in os.environ def update(self, can_strings): ret = car.RadarData.new_message() if not self.no_radar_sleep: time.sleep(self.radar_ts) # radard runs on RI updates return ret class CarStateBase(ABC): def __init__(self, CP): self.CP = CP self.car_fingerprint = CP.carFingerprint self.out = car.CarState.new_message() self.cruise_buttons = 0 self.left_blinker_cnt = 0 self.right_blinker_cnt = 0 self.left_blinker_prev = False self.right_blinker_prev = False # Q = np.matrix([[10.0, 0.0], [0.0, 100.0]]) # R = 1e3 self.v_ego_kf = KF1D(x0=[[0.0], [0.0]], A=[[1.0, DT_CTRL], [0.0, 1.0]], C=[1.0, 0.0], K=[[0.12287673], [0.29666309]]) def update_speed_kf(self, v_ego_raw): if abs(v_ego_raw - self.v_ego_kf.x[0][0]) > 2.0: # Prevent large accelerations when car starts at non zero speed self.v_ego_kf.x = [[v_ego_raw], [0.0]] v_ego_x = self.v_ego_kf.update(v_ego_raw) return float(v_ego_x[0]), float(v_ego_x[1]) def get_wheel_speeds(self, fl, fr, rl, rr, unit=CV.KPH_TO_MS): factor = unit * self.CP.wheelSpeedFactor wheelSpeeds = car.CarState.WheelSpeeds.new_message() wheelSpeeds.fl = fl * factor wheelSpeeds.fr = fr * factor wheelSpeeds.rl = rl * factor wheelSpeeds.rr = rr * factor return wheelSpeeds def update_blinker_from_lamp(self, blinker_time: int, left_blinker_lamp: bool, right_blinker_lamp: bool): """Update blinkers from lights. Enable output when light was seen within the last `blinker_time` iterations""" # TODO: Handle case when switching direction. Now both blinkers can be on at the same time self.left_blinker_cnt = blinker_time if left_blinker_lamp else max(self.left_blinker_cnt - 1, 0) self.right_blinker_cnt = blinker_time if right_blinker_lamp else max(self.right_blinker_cnt - 1, 0) return self.left_blinker_cnt > 0, self.right_blinker_cnt > 0 def update_blinker_from_stalk(self, blinker_time: int, left_blinker_stalk: bool, right_blinker_stalk: bool): """Update blinkers from stalk position. When stalk is seen the blinker will be on for at least blinker_time, or until the stalk is turned off, whichever is longer. If the opposite stalk direction is seen the blinker is forced to the other side. On a rising edge of the stalk the timeout is reset.""" if left_blinker_stalk: self.right_blinker_cnt = 0 if not self.left_blinker_prev: self.left_blinker_cnt = blinker_time if right_blinker_stalk: self.left_blinker_cnt = 0 if not self.right_blinker_prev: self.right_blinker_cnt = blinker_time self.left_blinker_cnt = max(self.left_blinker_cnt - 1, 0) self.right_blinker_cnt = max(self.right_blinker_cnt - 1, 0) self.left_blinker_prev = left_blinker_stalk self.right_blinker_prev = right_blinker_stalk return bool(left_blinker_stalk or self.left_blinker_cnt > 0), bool(right_blinker_stalk or self.right_blinker_cnt > 0) @staticmethod def parse_gear_shifter(gear: Optional[str]) -> car.CarState.GearShifter: if gear is None: return GearShifter.unknown d: Dict[str, car.CarState.GearShifter] = { 'P': GearShifter.park, 'PARK': GearShifter.park, 'R': GearShifter.reverse, 'REVERSE': GearShifter.reverse, 'N': GearShifter.neutral, 'NEUTRAL': GearShifter.neutral, 'E': GearShifter.eco, 'ECO': GearShifter.eco, 'T': GearShifter.manumatic, 'MANUAL': GearShifter.manumatic, 'D': GearShifter.drive, 'DRIVE': GearShifter.drive, 'S': GearShifter.sport, 'SPORT': GearShifter.sport, 'L': GearShifter.low, 'LOW': GearShifter.low, 'B': GearShifter.brake, 'BRAKE': GearShifter.brake, } return d.get(gear.upper(), GearShifter.unknown) @staticmethod def get_cam_can_parser(CP): return None @staticmethod def get_adas_can_parser(CP): return None @staticmethod def get_body_can_parser(CP): return None @staticmethod def get_loopback_can_parser(CP): return None # interface-specific helpers def get_interface_attr(attr: str, combine_brands: bool = False, ignore_none: bool = False) -> Dict[str, Any]: # read all the folders in selfdrive/car and return a dict where: # - keys are all the car models or brand names # - values are attr values from all car folders result = {} for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]): try: brand_name = car_folder.split('/')[-1] brand_values = __import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr]) if hasattr(brand_values, attr) or not ignore_none: attr_data = getattr(brand_values, attr, None) else: continue if combine_brands: if isinstance(attr_data, dict): for f, v in attr_data.items(): result[f] = v else: result[brand_name] = attr_data except (ImportError, OSError): pass return result