Abstract CarInterface init (#1131)

* abstract CarInterface init

* unused

* subaru

* gm cleanup

* vw

* typo

* carcontroller

* more cleanup

* fix vw
pull/1134/head
Adeeb 5 years ago committed by GitHub
parent ab54a9e2c9
commit 13f60eae91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      selfdrive/car/chrysler/carcontroller.py
  2. 2
      selfdrive/car/chrysler/carstate.py
  3. 16
      selfdrive/car/chrysler/interface.py
  4. 6
      selfdrive/car/ford/carcontroller.py
  5. 18
      selfdrive/car/ford/interface.py
  6. 39
      selfdrive/car/gm/carcontroller.py
  7. 6
      selfdrive/car/gm/carstate.py
  8. 10
      selfdrive/car/gm/gmcan.py
  9. 41
      selfdrive/car/gm/interface.py
  10. 11
      selfdrive/car/gm/radar_interface.py
  11. 6
      selfdrive/car/gm/values.py
  12. 2
      selfdrive/car/honda/carcontroller.py
  13. 17
      selfdrive/car/honda/interface.py
  14. 4
      selfdrive/car/hyundai/carcontroller.py
  15. 18
      selfdrive/car/hyundai/interface.py
  16. 20
      selfdrive/car/interfaces.py
  17. 8
      selfdrive/car/subaru/carcontroller.py
  18. 24
      selfdrive/car/subaru/interface.py
  19. 12
      selfdrive/car/toyota/carcontroller.py
  20. 19
      selfdrive/car/toyota/interface.py
  21. 16
      selfdrive/car/volkswagen/carcontroller.py
  22. 12
      selfdrive/car/volkswagen/carstate.py
  23. 43
      selfdrive/car/volkswagen/interface.py
  24. 4
      selfdrive/car/volkswagen/values.py

@ -1,27 +1,21 @@
from selfdrive.car import apply_toyota_steer_torque_limits
from selfdrive.car.chrysler.chryslercan import create_lkas_hud, create_lkas_command, \
create_wheel_buttons
from selfdrive.car.chrysler.values import Ecu, CAR, SteerLimitParams
from selfdrive.car.chrysler.values import CAR, SteerLimitParams
from opendbc.can.packer import CANPacker
class CarController():
def __init__(self, dbc_name, car_fingerprint, enable_camera):
def __init__(self, dbc_name, CP, VM):
self.braking = False
# redundant safety check with the board
self.controls_allowed = True
self.apply_steer_last = 0
self.ccframe = 0
self.prev_frame = -1
self.hud_count = 0
self.car_fingerprint = car_fingerprint
self.car_fingerprint = CP.carFingerprint
self.alert_active = False
self.gone_fast_yet = False
self.steer_rate_limited = False
self.fake_ecus = set()
if enable_camera:
self.fake_ecus.add(Ecu.fwdCamera)
self.packer = CANPacker(dbc_name)

@ -34,7 +34,6 @@ def get_can_parser(CP):
("COUNTER", "EPS_STATUS", -1),
("TRACTION_OFF", "TRACTION_BUTTON", 0),
("SEATBELT_DRIVER_UNLATCHED", "SEATBELT_STATUS", 0),
("COUNTER", "WHEEL_BUTTONS", -1), # incrementing counter for 23b
]
# It's considered invalid if it is not received for 10x the expected period (1/f).
@ -73,7 +72,6 @@ class CarState(CarStateBase):
ret = car.CarState.new_message()
self.frame_23b = int(cp.vl["WHEEL_BUTTONS"]['COUNTER'])
self.frame = int(cp.vl["EPS_STATUS"]['COUNTER'])
ret.doorOpen = any([cp.vl["DOORS"]['DOOR_OPEN_FL'],

@ -2,7 +2,6 @@
from cereal import car
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import EventTypes as ET, create_event
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.chrysler.carstate import CarState, get_can_parser, get_camera_parser
from selfdrive.car.chrysler.values import Ecu, ECU_FINGERPRINT, CAR, FINGERPRINTS
from selfdrive.car import STD_CARGO_KG, scale_rot_inertia, scale_tire_stiffness, is_ecu_disconnected, gen_empty_fingerprint
@ -13,23 +12,10 @@ ButtonType = car.CarState.ButtonEvent.Type
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
self.CP = CP
self.VM = VehicleModel(CP)
super().__init__(CP, CarController, CarState, get_can_parser, get_cam_can_parser=get_camera_parser)
self.gas_pressed_prev = False
self.brake_pressed_prev = False
self.cruise_enabled_prev = False
self.low_speed_alert = False
# *** init the major players ***
self.CS = CarState(CP)
self.cp = get_can_parser(CP)
self.cp_cam = get_camera_parser(CP)
self.CC = None
if CarController is not None:
self.CC = CarController(self.cp.dbc_name, CP.carFingerprint, CP.enableCamera)
@staticmethod
def compute_gb(accel, speed):
return float(accel) / 3.0

@ -8,12 +8,12 @@ MAX_STEER_DELTA = 1
TOGGLE_DEBUG = False
class CarController():
def __init__(self, dbc_name, enable_camera, vehicle_model):
def __init__(self, dbc_name, CP, VM):
self.packer = CANPacker(dbc_name)
self.enable_camera = enable_camera
self.enable_camera = CP.enableCamera
self.enabled_last = False
self.main_on_last = False
self.vehicle_model = vehicle_model
self.vehicle_model = VM
self.generic_toggle_last = 0
self.steer_alert_last = False
self.lkas_action = 0

@ -3,7 +3,6 @@ from cereal import car
from selfdrive.swaglog import cloudlog
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import EventTypes as ET, create_event
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.ford.carstate import CarState, get_can_parser
from selfdrive.car.ford.values import MAX_ANGLE, Ecu, ECU_FINGERPRINT, FINGERPRINTS
from selfdrive.car import STD_CARGO_KG, scale_rot_inertia, scale_tire_stiffness, is_ecu_disconnected, gen_empty_fingerprint
@ -12,22 +11,7 @@ from selfdrive.car.interfaces import CarInterfaceBase
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
self.CP = CP
self.VM = VehicleModel(CP)
self.frame = 0
self.gas_pressed_prev = False
self.brake_pressed_prev = False
self.cruise_enabled_prev = False
# *** init the major players ***
self.CS = CarState(CP)
self.cp = get_can_parser(CP)
self.CC = None
if CarController is not None:
self.CC = CarController(self.cp.dbc_name, CP.enableCamera, self.VM)
super().__init__(CP, CarController, CarState, get_can_parser)
@staticmethod
def compute_gb(accel, speed):

@ -4,7 +4,7 @@ from common.numpy_fast import interp
from selfdrive.config import Conversions as CV
from selfdrive.car import apply_std_steer_torque_limits
from selfdrive.car.gm import gmcan
from selfdrive.car.gm.values import DBC, SUPERCRUISE_CARS
from selfdrive.car.gm.values import DBC, SUPERCRUISE_CARS, CanBus
from opendbc.can.packer import CANPacker
VisualAlert = car.CarControl.HUDControl.VisualAlert
@ -69,22 +69,18 @@ def process_hud_alert(hud_alert):
return steer
class CarController():
def __init__(self, canbus, car_fingerprint):
def __init__(self, dbc_name, CP, VM):
self.pedal_steady = 0.
self.start_time = 0.
self.steer_idx = 0
self.apply_steer_last = 0
self.car_fingerprint = car_fingerprint
self.car_fingerprint = CP.carFingerprint
self.lka_icon_status_last = (False, False)
self.steer_rate_limited = False
# Setup detection helper. Routes commands to
# an appropriate CAN bus number.
self.canbus = canbus
self.params = CarControllerParams(car_fingerprint)
self.params = CarControllerParams(CP.carFingerprint)
self.packer_pt = CANPacker(DBC[car_fingerprint]['pt'])
self.packer_ch = CANPacker(DBC[car_fingerprint]['chassis'])
self.packer_pt = CANPacker(DBC[CP.carFingerprint]['pt'])
self.packer_ch = CANPacker(DBC[CP.carFingerprint]['chassis'])
def update(self, enabled, CS, frame, actuators, \
hud_v_cruise, hud_show_lanes, hud_show_car, hud_alert):
@ -93,7 +89,6 @@ class CarController():
# Send CAN commands.
can_sends = []
canbus = self.canbus
alert_out = process_hud_alert(hud_alert)
steer = alert_out
@ -114,10 +109,10 @@ class CarController():
if self.car_fingerprint in SUPERCRUISE_CARS:
can_sends += gmcan.create_steering_control_ct6(self.packer_pt,
canbus, apply_steer, CS.out.vEgo, idx, lkas_enabled)
CanBus, apply_steer, CS.out.vEgo, idx, lkas_enabled)
else:
can_sends.append(gmcan.create_steering_control(self.packer_pt,
canbus.powertrain, apply_steer, idx, lkas_enabled))
CanBus.POWERTRAIN, apply_steer, idx, lkas_enabled))
### GAS/BRAKE ###
@ -144,14 +139,14 @@ class CarController():
at_full_stop = enabled and CS.out.standstill
near_stop = enabled and (CS.out.vEgo < P.NEAR_STOP_BRAKE_PHASE)
can_sends.append(gmcan.create_friction_brake_command(self.packer_ch, canbus.chassis, apply_brake, idx, near_stop, at_full_stop))
can_sends.append(gmcan.create_friction_brake_command(self.packer_ch, CanBus.CHASSIS, apply_brake, idx, near_stop, at_full_stop))
at_full_stop = enabled and CS.out.standstill
can_sends.append(gmcan.create_gas_regen_command(self.packer_pt, canbus.powertrain, apply_gas, idx, enabled, at_full_stop))
can_sends.append(gmcan.create_gas_regen_command(self.packer_pt, CanBus.POWERTRAIN, apply_gas, idx, enabled, at_full_stop))
# Send dashboard UI commands (ACC status), 25hz
if (frame % 4) == 0:
can_sends.append(gmcan.create_acc_dashboard_command(self.packer_pt, canbus.powertrain, enabled, hud_v_cruise * CV.MS_TO_KPH, hud_show_car))
can_sends.append(gmcan.create_acc_dashboard_command(self.packer_pt, CanBus.POWERTRAIN, enabled, hud_v_cruise * CV.MS_TO_KPH, hud_show_car))
# Radar needs to know current speed and yaw rate (50hz),
# and that ADAS is alive (10hz)
@ -160,17 +155,17 @@ class CarController():
if frame % time_and_headlights_step == 0:
idx = (frame // time_and_headlights_step) % 4
can_sends.append(gmcan.create_adas_time_status(canbus.obstacle, int((tt - self.start_time) * 60), idx))
can_sends.append(gmcan.create_adas_headlights_status(canbus.obstacle))
can_sends.append(gmcan.create_adas_time_status(CanBus.OBSTACLE, int((tt - self.start_time) * 60), idx))
can_sends.append(gmcan.create_adas_headlights_status(CanBus.OBSTACLE))
speed_and_accelerometer_step = 2
if frame % speed_and_accelerometer_step == 0:
idx = (frame // speed_and_accelerometer_step) % 4
can_sends.append(gmcan.create_adas_steering_status(canbus.obstacle, idx))
can_sends.append(gmcan.create_adas_accelerometer_speed_status(canbus.obstacle, CS.out.vEgo, idx))
can_sends.append(gmcan.create_adas_steering_status(CanBus.OBSTACLE, idx))
can_sends.append(gmcan.create_adas_accelerometer_speed_status(CanBus.OBSTACLE, CS.out.vEgo, idx))
if frame % P.ADAS_KEEPALIVE_STEP == 0:
can_sends += gmcan.create_adas_keepalive(canbus.powertrain)
can_sends += gmcan.create_adas_keepalive(CanBus.POWERTRAIN)
# Show green icon when LKA torque is applied, and
# alarming orange icon when approaching torque limit.
@ -181,7 +176,7 @@ class CarController():
lka_icon_status = (lka_active, lka_critical)
if frame % P.CAMERA_KEEPALIVE_STEP == 0 \
or lka_icon_status != self.lka_icon_status_last:
can_sends.append(gmcan.create_lka_icon_command(canbus.sw_gmlan, lka_active, lka_critical, steer))
can_sends.append(gmcan.create_lka_icon_command(CanBus.SW_GMLAN, lka_active, lka_critical, steer))
self.lka_icon_status_last = lka_icon_status
return can_sends

@ -4,11 +4,11 @@ from selfdrive.config import Conversions as CV
from opendbc.can.can_define import CANDefine
from opendbc.can.parser import CANParser
from selfdrive.car.interfaces import CarStateBase
from selfdrive.car.gm.values import DBC, CAR, AccState, \
from selfdrive.car.gm.values import DBC, CAR, AccState, CanBus, \
CruiseButtons, is_eps_status_ok, \
STEER_THRESHOLD, SUPERCRUISE_CARS
def get_powertrain_can_parser(CP, canbus):
def get_powertrain_can_parser(CP):
# this function generates lists for signal, messages and initial values
signals = [
# sig_name, sig_address, default
@ -48,7 +48,7 @@ def get_powertrain_can_parser(CP, canbus):
("CruiseState", "AcceleratorPedal2", 0),
]
return CANParser(DBC[CP.carFingerprint]['pt'], signals, [], canbus.powertrain)
return CANParser(DBC[CP.carFingerprint]['pt'], signals, [], CanBus.POWERTRAIN)
class CarState(CarStateBase):

@ -11,7 +11,7 @@ def create_steering_control(packer, bus, apply_steer, idx, lkas_active):
return packer.make_can_msg("ASCMLKASteeringCmd", bus, values)
def create_steering_control_ct6(packer, canbus, apply_steer, v_ego, idx, enabled):
def create_steering_control_ct6(packer, CanBus, apply_steer, v_ego, idx, enabled):
values = {
"LKASteeringCmdActive": 1 if enabled else 0,
@ -31,10 +31,10 @@ def create_steering_control_ct6(packer, canbus, apply_steer, v_ego, idx, enabled
# pack again with checksum
dat = packer.make_can_msg("ASCMLKASteeringCmd", 0, values)[2]
return [0x152, 0, dat, canbus.powertrain], \
[0x154, 0, dat, canbus.powertrain], \
[0x151, 0, dat, canbus.chassis], \
[0x153, 0, dat, canbus.chassis]
return [0x152, 0, dat, CanBus.POWERTRAIN], \
[0x154, 0, dat, CanBus.POWERTRAIN], \
[0x151, 0, dat, CanBus.CHASSIS], \
[0x153, 0, dat, CanBus.CHASSIS]
def create_adas_keepalive(bus):

@ -2,8 +2,7 @@
from cereal import car
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import create_event, EventTypes as ET
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.gm.values import DBC, CAR, Ecu, ECU_FINGERPRINT, \
from selfdrive.car.gm.values import CAR, Ecu, ECU_FINGERPRINT, \
SUPERCRUISE_CARS, AccState, FINGERPRINTS
from selfdrive.car.gm.carstate import CarState, CruiseButtons, get_powertrain_can_parser
from selfdrive.car import STD_CARGO_KG, scale_rot_inertia, scale_tire_stiffness, is_ecu_disconnected, gen_empty_fingerprint
@ -11,33 +10,12 @@ from selfdrive.car.interfaces import CarInterfaceBase
ButtonType = car.CarState.ButtonEvent.Type
class CanBus(CarInterfaceBase):
def __init__(self):
self.powertrain = 0
self.obstacle = 1
self.chassis = 2
self.sw_gmlan = 3
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
self.CP = CP
super().__init__(CP, CarController, CarState, get_powertrain_can_parser)
self.frame = 0
self.gas_pressed_prev = False
self.brake_pressed_prev = False
self.acc_active_prev = 0
# *** init the major players ***
canbus = CanBus()
self.CS = CarState(CP)
self.VM = VehicleModel(CP)
self.pt_cp = get_powertrain_can_parser(CP, canbus)
self.ch_cp_dbc_name = DBC[CP.carFingerprint]['chassis']
self.CC = None
if CarController is not None:
self.CC = CarController(canbus, CP.carFingerprint)
@staticmethod
def compute_gb(accel, speed):
return float(accel) / 4.0
@ -71,11 +49,13 @@ class CarInterface(CarInterfaceBase):
ret.steerRateCost = 1.0
ret.steerActuatorDelay = 0.1 # Default delay, not measured yet
# default to gm
ret.safetyModel = car.CarParams.SafetyModel.gm
if candidate == CAR.VOLT:
# supports stop and go, but initial engage must be above 18mph (which include conservatism)
ret.minEnableSpeed = 18 * CV.MPH_TO_MS
ret.mass = 1607. + STD_CARGO_KG
ret.safetyModel = car.CarParams.SafetyModel.gm
ret.wheelbase = 2.69
ret.steerRatio = 15.7
ret.steerRatioRear = 0.
@ -85,7 +65,6 @@ class CarInterface(CarInterfaceBase):
# supports stop and go, but initial engage must be above 18mph (which include conservatism)
ret.minEnableSpeed = 18 * CV.MPH_TO_MS
ret.mass = 1496. + STD_CARGO_KG
ret.safetyModel = car.CarParams.SafetyModel.gm
ret.wheelbase = 2.83
ret.steerRatio = 15.8
ret.steerRatioRear = 0.
@ -97,14 +76,12 @@ class CarInterface(CarInterfaceBase):
# Remaining parameters copied from Volt for now
ret.centerToFront = ret.wheelbase * 0.4
ret.minEnableSpeed = 18 * CV.MPH_TO_MS
ret.safetyModel = car.CarParams.SafetyModel.gm
ret.steerRatio = 15.7
ret.steerRatioRear = 0.
elif candidate == CAR.ACADIA:
ret.minEnableSpeed = -1. # engage speed is decided by pcm
ret.mass = 4353. * CV.LB_TO_KG + STD_CARGO_KG
ret.safetyModel = car.CarParams.SafetyModel.gm
ret.wheelbase = 2.86
ret.steerRatio = 14.4 #end to end is 13.46
ret.steerRatioRear = 0.
@ -113,7 +90,6 @@ class CarInterface(CarInterfaceBase):
elif candidate == CAR.BUICK_REGAL:
ret.minEnableSpeed = 18 * CV.MPH_TO_MS
ret.mass = 3779. * CV.LB_TO_KG + STD_CARGO_KG # (3849+3708)/2
ret.safetyModel = car.CarParams.SafetyModel.gm
ret.wheelbase = 2.83 #111.4 inches in meters
ret.steerRatio = 14.4 # guess for tourx
ret.steerRatioRear = 0.
@ -122,7 +98,6 @@ class CarInterface(CarInterfaceBase):
elif candidate == CAR.CADILLAC_ATS:
ret.minEnableSpeed = 18 * CV.MPH_TO_MS
ret.mass = 1601. + STD_CARGO_KG
ret.safetyModel = car.CarParams.SafetyModel.gm
ret.wheelbase = 2.78
ret.steerRatio = 15.3
ret.steerRatioRear = 0.
@ -173,11 +148,11 @@ class CarInterface(CarInterfaceBase):
# returns a car.CarState
def update(self, c, can_strings):
self.pt_cp.update_strings(can_strings)
self.cp.update_strings(can_strings)
ret = self.CS.update(self.pt_cp)
ret = self.CS.update(self.cp)
ret.canValid = self.pt_cp.can_valid
ret.canValid = self.cp.can_valid
ret.yawRate = self.VM.yaw_rate(ret.steeringAngle * CV.DEG_TO_RAD, ret.vEgo)
ret.steeringRateLimited = self.CC.steer_rate_limited if self.CC is not None else False

@ -4,8 +4,7 @@ import math
import time
from cereal import car
from opendbc.can.parser import CANParser
from selfdrive.car.gm.interface import CanBus
from selfdrive.car.gm.values import DBC, CAR
from selfdrive.car.gm.values import DBC, CAR, CanBus
from selfdrive.config import Conversions as CV
from selfdrive.car.interfaces import RadarInterfaceBase
@ -17,7 +16,7 @@ NUM_SLOTS = 20
# messages that are present in DBC
LAST_RADAR_MSG = RADAR_HEADER_MSG + NUM_SLOTS
def create_radar_can_parser(canbus, car_fingerprint):
def create_radar_can_parser(car_fingerprint):
dbc_f = DBC[car_fingerprint]['radar']
if car_fingerprint in (CAR.VOLT, CAR.MALIBU, CAR.HOLDEN_ASTRA, CAR.ACADIA, CAR.CADILLAC_ATS):
@ -38,7 +37,7 @@ def create_radar_can_parser(canbus, car_fingerprint):
checks = []
return CANParser(dbc_f, signals, checks, canbus.obstacle)
return CANParser(dbc_f, signals, checks, CanBus.OBSTACLE)
else:
return None
@ -49,9 +48,7 @@ class RadarInterface(RadarInterfaceBase):
self.delay = 0 # Delay of radar
canbus = CanBus()
print("Using %d as obstacle CAN bus ID" % canbus.obstacle)
self.rcp = create_radar_can_parser(canbus, CP.carFingerprint)
self.rcp = create_radar_can_parser(CP.carFingerprint)
self.trigger_msg = LAST_RADAR_MSG
self.updated_messages = set()

@ -27,6 +27,12 @@ class AccState:
FAULTED = 3
STANDSTILL = 4
class CanBus:
POWERTRAIN = 0
OBSTACLE = 1
CHASSIS = 2
SW_GMLAN = 3
def is_eps_status_ok(eps_status, car_fingerprint):
valid_eps_status = []
if car_fingerprint in SUPERCRUISE_CARS:

@ -85,7 +85,7 @@ class CarControllerParams():
self.STEER_LOOKUP_V = [v * -1 for v in CP.lateralParams.torqueV][1:][::-1] + list(CP.lateralParams.torqueV)
class CarController():
def __init__(self, dbc_name, CP):
def __init__(self, dbc_name, CP, VM):
self.braking = False
self.brake_steady = 0.
self.brake_last = 0.

@ -6,7 +6,6 @@ from common.realtime import DT_CTRL
from selfdrive.swaglog import cloudlog
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import create_event, EventTypes as ET, get_events
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.honda.carstate import CarState, get_can_parser, get_cam_can_parser
from selfdrive.car.honda.values import CruiseButtons, CAR, HONDA_BOSCH, Ecu, ECU_FINGERPRINT, FINGERPRINTS
from selfdrive.car import STD_CARGO_KG, CivicParams, scale_rot_inertia, scale_tire_stiffness, is_ecu_disconnected, gen_empty_fingerprint
@ -74,24 +73,10 @@ def get_compute_gb_acura():
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
self.CP = CP
super().__init__(CP, CarController, CarState, get_can_parser, get_cam_can_parser=get_cam_can_parser)
self.frame = 0
self.last_enable_pressed = 0
self.last_enable_sent = 0
self.gas_pressed_prev = False
self.brake_pressed_prev = False
self.cp = get_can_parser(CP)
self.cp_cam = get_cam_can_parser(CP)
# *** init the major players ***
self.CS = CarState(CP)
self.VM = VehicleModel(CP)
self.CC = None
if CarController is not None:
self.CC = CarController(self.cp.dbc_name, CP)
if self.CS.CP.carFingerprint == CAR.ACURA_ILX:
self.compute_gb = get_compute_gb_acura()

@ -5,9 +5,9 @@ from opendbc.can.packer import CANPacker
class CarController():
def __init__(self, dbc_name, car_fingerprint):
def __init__(self, dbc_name, CP, VM):
self.apply_steer_last = 0
self.car_fingerprint = car_fingerprint
self.car_fingerprint = CP.carFingerprint
self.lkas11_cnt = 0
self.cnt = 0
self.last_resume_cnt = 0

@ -2,7 +2,6 @@
from cereal import car
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import EventTypes as ET, create_event
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.hyundai.carstate import CarState, get_can_parser, get_camera_parser
from selfdrive.car.hyundai.values import Ecu, ECU_FINGERPRINT, CAR, get_hud_alerts, FINGERPRINTS
from selfdrive.car import STD_CARGO_KG, scale_rot_inertia, scale_tire_stiffness, is_ecu_disconnected, gen_empty_fingerprint
@ -13,26 +12,13 @@ ButtonType = car.CarState.ButtonEvent.Type
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
self.CP = CP
self.VM = VehicleModel(CP)
super().__init__(CP, CarController, CarState, get_can_parser, get_cam_can_parser=get_camera_parser)
self.idx = 0
self.lanes = 0
self.lkas_request = 0
self.gas_pressed_prev = False
self.brake_pressed_prev = False
self.cruise_enabled_prev = False
self.low_speed_alert = False
# *** init the major players ***
self.CS = CarState(CP)
self.cp = get_can_parser(CP)
self.cp_cam = get_camera_parser(CP)
self.CC = None
if CarController is not None:
self.CC = CarController(self.cp.dbc_name, CP.carFingerprint)
@staticmethod
def compute_gb(accel, speed):
return float(accel) / 3.0

@ -4,14 +4,30 @@ from cereal import car
from common.kalman.simple_kalman import KF1D
from common.realtime import DT_CTRL
from selfdrive.car import gen_empty_fingerprint
from selfdrive.controls.lib.vehicle_model import VehicleModel
GearShifter = car.CarState.GearShifter
# generic car and radar interfaces
class CarInterfaceBase():
def __init__(self, CP, CarController):
pass
def __init__(self, CP, CarController, CarState, get_can_parser, get_cam_can_parser=None):
self.CP = CP
self.VM = VehicleModel(CP)
self.frame = 0
self.gas_pressed_prev = False
self.brake_pressed_prev = False
self.cruise_enabled_prev = False
self.CS = CarState(CP)
self.cp = get_can_parser(CP)
if get_cam_can_parser is not None:
self.cp_cam = get_cam_can_parser(CP)
self.CC = None
if CarController is not None:
self.CC = CarController(self.cp.dbc_name, CP, self.VM)
@staticmethod
def calc_accel_override(a_ego, a_target, v_ego, v_target):

@ -18,19 +18,17 @@ class CarControllerParams():
class CarController():
def __init__(self, car_fingerprint):
def __init__(self, dbc_name, CP, VM):
self.lkas_active = False
self.steer_idx = 0
self.apply_steer_last = 0
self.car_fingerprint = car_fingerprint
self.es_distance_cnt = -1
self.es_lkas_cnt = -1
self.steer_rate_limited = False
# Setup detection helper. Routes commands to
# an appropriate CAN bus number.
self.params = CarControllerParams(car_fingerprint)
self.packer = CANPacker(DBC[car_fingerprint]['pt'])
self.params = CarControllerParams(CP.carFingerprint)
self.packer = CANPacker(DBC[CP.carFingerprint]['pt'])
def update(self, enabled, CS, frame, actuators, pcm_cancel_cmd, visual_alert, left_line, right_line):
""" Controls thread """

@ -2,7 +2,6 @@
from cereal import car
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import create_event, EventTypes as ET
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.subaru.values import CAR
from selfdrive.car.subaru.carstate import CarState, get_powertrain_can_parser, get_camera_can_parser
from selfdrive.car import STD_CARGO_KG, scale_rot_inertia, scale_tire_stiffness, gen_empty_fingerprint
@ -12,23 +11,10 @@ ButtonType = car.CarState.ButtonEvent.Type
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
super().__init__(CP, CarController, CarState, get_powertrain_can_parser, get_cam_can_parser=get_camera_can_parser)
self.CP = CP
self.frame = 0
self.enabled_prev = 0
self.gas_pressed_prev = False
# *** init the major players ***
self.CS = CarState(CP)
self.VM = VehicleModel(CP)
self.pt_cp = get_powertrain_can_parser(CP)
self.cam_cp = get_camera_can_parser(CP)
self.gas_pressed_prev = False
self.CC = None
if CarController is not None:
self.CC = CarController(CP.carFingerprint)
@staticmethod
def compute_gb(accel, speed):
@ -95,12 +81,12 @@ class CarInterface(CarInterfaceBase):
# returns a car.CarState
def update(self, c, can_strings):
self.pt_cp.update_strings(can_strings)
self.cam_cp.update_strings(can_strings)
self.cp.update_strings(can_strings)
self.cp_cam.update_strings(can_strings)
ret = self.CS.update(self.pt_cp, self.cam_cp)
ret = self.CS.update(self.cp, self.cp_cam)
ret.canValid = self.pt_cp.can_valid and self.cam_cp.can_valid
ret.canValid = self.cp.can_valid and self.cp_cam.can_valid
ret.steeringRateLimited = self.CC.steer_rate_limited if self.CC is not None else False
ret.yawRate = self.VM.yaw_rate(ret.steeringAngle * CV.DEG_TO_RAD, ret.vEgo)

@ -76,14 +76,12 @@ def ipas_state_transition(steer_angle_enabled, enabled, ipas_active, ipas_reset_
class CarController():
def __init__(self, dbc_name, car_fingerprint, enable_camera, enable_dsu, enable_apg):
def __init__(self, dbc_name, CP, VM):
self.braking = False
# redundant safety check with the board
self.controls_allowed = True
self.last_steer = 0
self.last_angle = 0
self.accel_steady = 0.
self.car_fingerprint = car_fingerprint
self.car_fingerprint = CP.carFingerprint
self.alert_active = False
self.last_standstill = False
self.standstill_req = False
@ -95,9 +93,9 @@ class CarController():
self.steer_rate_limited = False
self.fake_ecus = set()
if enable_camera: self.fake_ecus.add(Ecu.fwdCamera)
if enable_dsu: self.fake_ecus.add(Ecu.dsu)
if enable_apg: self.fake_ecus.add(Ecu.apgs)
if CP.enableCamera: self.fake_ecus.add(Ecu.fwdCamera)
if CP.enableDsu: self.fake_ecus.add(Ecu.dsu)
if CP.enableApgs: self.fake_ecus.add(Ecu.apgs)
self.packer = CANPacker(dbc_name)

@ -2,7 +2,6 @@
from cereal import car
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import EventTypes as ET, create_event
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.toyota.carstate import CarState, get_can_parser, get_cam_can_parser
from selfdrive.car.toyota.values import Ecu, ECU_FINGERPRINT, CAR, TSS2_CAR, FINGERPRINTS
from selfdrive.car import STD_CARGO_KG, scale_rot_inertia, scale_tire_stiffness, is_ecu_disconnected, gen_empty_fingerprint
@ -14,23 +13,7 @@ GearShifter = car.CarState.GearShifter
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
self.CP = CP
self.VM = VehicleModel(CP)
self.frame = 0
self.gas_pressed_prev = False
self.brake_pressed_prev = False
self.cruise_enabled_prev = False
# *** init the major players ***
self.CS = CarState(CP)
self.cp = get_can_parser(CP)
self.cp_cam = get_cam_can_parser(CP)
self.CC = None
if CarController is not None:
self.CC = CarController(self.cp.dbc_name, CP.carFingerprint, CP.enableCamera, CP.enableDsu, CP.enableApgs)
super().__init__(CP, CarController, CarState, get_can_parser, get_cam_can_parser=get_cam_can_parser)
@staticmethod
def compute_gb(accel, speed):

@ -1,20 +1,17 @@
from cereal import car
from selfdrive.car import apply_std_steer_torque_limits
from selfdrive.car.volkswagen import volkswagencan
from selfdrive.car.volkswagen.values import DBC, MQB_LDW_MESSAGES, BUTTON_STATES, CarControllerParams
from selfdrive.car.volkswagen.values import DBC, CANBUS, MQB_LDW_MESSAGES, BUTTON_STATES, CarControllerParams
from opendbc.can.packer import CANPacker
VisualAlert = car.CarControl.HUDControl.VisualAlert
class CarController():
def __init__(self, canbus, car_fingerprint):
def __init__(self, dbc_name, CP, VM):
self.apply_steer_last = 0
self.car_fingerprint = car_fingerprint
# Setup detection helper. Routes commands to an appropriate CAN bus number.
self.canbus = canbus
self.packer_pt = CANPacker(DBC[car_fingerprint]['pt'])
self.packer_pt = CANPacker(DBC[CP.carFingerprint]['pt'])
self.hcaSameTorqueCount = 0
self.hcaEnabledFrameCount = 0
@ -32,7 +29,6 @@ class CarController():
# Send CAN commands.
can_sends = []
canbus = self.canbus
#--------------------------------------------------------------------------
# #
@ -102,7 +98,7 @@ class CarController():
self.apply_steer_last = apply_steer
idx = (frame / P.HCA_STEP) % 16
can_sends.append(volkswagencan.create_mqb_steering_control(self.packer_pt, canbus.pt, apply_steer,
can_sends.append(volkswagencan.create_mqb_steering_control(self.packer_pt, CANBUS.pt, apply_steer,
idx, hcaEnabled))
#--------------------------------------------------------------------------
@ -123,7 +119,7 @@ class CarController():
else:
hud_alert = MQB_LDW_MESSAGES["none"]
can_sends.append(volkswagencan.create_mqb_hud_control(self.packer_pt, canbus.pt, hcaEnabled,
can_sends.append(volkswagencan.create_mqb_hud_control(self.packer_pt, CANBUS.pt, hcaEnabled,
CS.out.steeringPressed, hud_alert, leftLaneVisible,
rightLaneVisible))
@ -182,7 +178,7 @@ class CarController():
if self.graMsgSentCount == 0:
self.graMsgStartFramePrev = frame
idx = (CS.graMsgBusCounter + 1) % 16
can_sends.append(volkswagencan.create_mqb_acc_buttons_control(self.packer_pt, canbus.pt, self.graButtonStatesToSend, CS, idx))
can_sends.append(volkswagencan.create_mqb_acc_buttons_control(self.packer_pt, CANBUS.pt, self.graButtonStatesToSend, CS, idx))
self.graMsgSentCount += 1
if self.graMsgSentCount >= P.GRA_VBP_COUNT:
self.graButtonStatesToSend = None

@ -4,9 +4,9 @@ from selfdrive.config import Conversions as CV
from selfdrive.car.interfaces import CarStateBase
from opendbc.can.parser import CANParser
from opendbc.can.can_define import CANDefine
from selfdrive.car.volkswagen.values import DBC, BUTTON_STATES, CarControllerParams
from selfdrive.car.volkswagen.values import DBC, CANBUS, BUTTON_STATES, CarControllerParams
def get_mqb_pt_can_parser(CP, canbus):
def get_mqb_pt_can_parser(CP):
# this function generates lists for signal, messages and initial values
signals = [
# sig_name, sig_address, default
@ -78,12 +78,12 @@ def get_mqb_pt_can_parser(CP, canbus):
("Einheiten_01", 1), # From J??? not known if gateway, cluster, or BCM
]
return CANParser(DBC[CP.carFingerprint]['pt'], signals, checks, canbus.pt)
return CANParser(DBC[CP.carFingerprint]['pt'], signals, checks, CANBUS.pt)
# A single signal is monitored from the camera CAN bus, and then ignored,
# so the presence of CAN traffic can be verified with cam_cp.valid.
def get_mqb_cam_can_parser(CP, canbus):
def get_mqb_cam_can_parser(CP):
signals = [
# sig_name, sig_address, default
@ -95,11 +95,11 @@ def get_mqb_cam_can_parser(CP, canbus):
("LDW_02", 10) # From R242 Driver assistance camera
]
return CANParser(DBC[CP.carFingerprint]['pt'], signals, checks, canbus.cam)
return CANParser(DBC[CP.carFingerprint]['pt'], signals, checks, CANBUS.cam)
class CarState(CarStateBase):
def __init__(self, CP, canbus):
def __init__(self, CP):
super().__init__(CP)
can_define = CANDefine(DBC[CP.carFingerprint]['pt'])
self.shifter_values = can_define.dv["Getriebe_11"]['GE_Fahrstufe']

@ -1,7 +1,6 @@
from cereal import car
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import create_event, EventTypes as ET
from selfdrive.controls.lib.vehicle_model import VehicleModel
from selfdrive.car.volkswagen.values import CAR, BUTTON_STATES
from selfdrive.car.volkswagen.carstate import CarState, get_mqb_pt_can_parser, get_mqb_cam_can_parser
from common.params import Params
@ -10,33 +9,13 @@ from selfdrive.car.interfaces import CarInterfaceBase
GEAR = car.CarState.GearShifter
class CANBUS:
pt = 0
cam = 2
class CarInterface(CarInterfaceBase):
def __init__(self, CP, CarController):
self.CP = CP
self.CC = None
self.frame = 0
super().__init__(CP, CarController, CarState, get_mqb_pt_can_parser, get_mqb_cam_can_parser)
self.gasPressedPrev = False
self.brakePressedPrev = False
self.cruiseStateEnabledPrev = False
self.displayMetricUnitsPrev = None
self.buttonStatesPrev = BUTTON_STATES.copy()
# *** init the major players ***
self.CS = CarState(CP, CANBUS)
self.VM = VehicleModel(CP)
self.pt_cp = get_mqb_pt_can_parser(CP, CANBUS)
self.cam_cp = get_mqb_cam_can_parser(CP, CANBUS)
# sending if read only is False
if CarController is not None:
self.CC = CarController(CANBUS, CP.carFingerprint)
@staticmethod
def compute_gb(accel, speed):
return float(accel) / 4.0
@ -124,11 +103,11 @@ class CarInterface(CarInterfaceBase):
# Process the most recent CAN message traffic, and check for validity
# The camera CAN has no signals we use at this time, but we process it
# anyway so we can test connectivity with can_valid
self.pt_cp.update_strings(can_strings)
self.cam_cp.update_strings(can_strings)
self.cp.update_strings(can_strings)
self.cp_cam.update_strings(can_strings)
ret = self.CS.update(self.pt_cp)
ret.canValid = self.pt_cp.can_valid and self.cam_cp.can_valid
ret = self.CS.update(self.cp)
ret.canValid = self.cp.can_valid and self.cp_cam.can_valid
ret.steeringRateLimited = self.CC.steer_rate_limited if self.CC is not None else False
# Update the EON metric configuration to match the car at first startup,
@ -167,8 +146,8 @@ class CarInterface(CarInterfaceBase):
# Per the Comma safety model, disable on pedals rising edge or when brake
# is pressed and speed isn't zero.
if (ret.gasPressed and not self.gasPressedPrev) or \
(ret.brakePressed and (not self.brakePressedPrev or not ret.standstill)):
if (ret.gasPressed and not self.gas_pressed_prev) or \
(ret.brakePressed and (not self.brake_pressed_prev or not ret.standstill)):
events.append(create_event('pedalPressed', [ET.NO_ENTRY, ET.USER_DISABLE]))
if ret.gasPressed:
events.append(create_event('pedalPressed', [ET.PRE_ENABLE]))
@ -178,7 +157,7 @@ class CarInterface(CarInterfaceBase):
if not ret.cruiseState.enabled:
events.append(create_event('pcmDisable', [ET.USER_DISABLE]))
# Attempt OP engagement only on rising edge of stock ACC engagement.
elif not self.cruiseStateEnabledPrev:
elif not self.cruise_enabled_prev:
events.append(create_event('pcmEnable', [ET.ENABLE]))
ret.events = events
@ -186,9 +165,9 @@ class CarInterface(CarInterfaceBase):
ret.canMonoTimes = canMonoTimes
# update previous car states
self.gasPressedPrev = ret.gasPressed
self.brakePressedPrev = ret.brakePressed
self.cruiseStateEnabledPrev = ret.cruiseState.enabled
self.gas_pressed_prev = ret.gasPressed
self.brake_pressed_prev = ret.brakePressed
self.cruise_enabled_prev = ret.cruiseState.enabled
self.displayMetricUnitsPrev = self.CS.displayMetricUnits
self.buttonStatesPrev = self.CS.buttonStates.copy()

@ -18,6 +18,10 @@ class CarControllerParams:
STEER_DRIVER_MULTIPLIER = 3 # weight driver torque heavily
STEER_DRIVER_FACTOR = 1 # from dbc
class CANBUS:
pt = 0
cam = 2
BUTTON_STATES = {
"accelCruise": False,
"decelCruise": False,

Loading…
Cancel
Save