openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

272 lines
9.6 KiB

5 years ago
import os
import time
from typing import Dict
5 years ago
from cereal import car
from common.kalman.simple_kalman import KF1D
from common.realtime import DT_CTRL
5 years ago
from selfdrive.car import gen_empty_fingerprint
from selfdrive.config import Conversions as CV
from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX
from selfdrive.controls.lib.events import Events
from selfdrive.controls.lib.vehicle_model import VehicleModel
5 years ago
GearShifter = car.CarState.GearShifter
EventName = car.CarEvent.EventName
MAX_CTRL_SPEED = (V_CRUISE_MAX + 4) * CV.KPH_TO_MS
ACCEL_MAX = 2.0
ACCEL_MIN = -3.5
5 years ago
# generic car and radar interfaces
5 years ago
class CarInterfaceBase():
def __init__(self, CP, CarController, CarState):
self.CP = CP
self.VM = VehicleModel(CP)
self.frame = 0
self.steering_unpressed = 0
self.low_speed_alert = False
self.silent_steer_warning = True
if CarState is not None:
self.CS = CarState(CP)
self.cp = self.CS.get_can_parser(CP)
self.cp_cam = self.CS.get_cam_can_parser(CP)
self.cp_body = self.CS.get_body_can_parser(CP)
self.cp_loopback = self.CS.get_loopback_can_parser(CP)
self.CC = None
if CarController is not None:
self.CC = CarController(self.cp.dbc_name, CP, self.VM)
5 years ago
@staticmethod
def get_pid_accel_limits(CP, current_speed, cruise_speed):
return ACCEL_MIN, ACCEL_MAX
5 years ago
@staticmethod
def get_params(candidate, fingerprint=gen_empty_fingerprint(), car_fw=None):
5 years ago
raise NotImplementedError
@staticmethod
def init(CP, logcan, sendcan):
pass
@staticmethod
def get_steer_feedforward_default(desired_angle, v_ego):
# Proportional to realigning tire momentum: lateral acceleration.
# TODO: something with lateralPlan.curvatureRates
return desired_angle * (v_ego**2)
@classmethod
def get_steer_feedforward_function(cls):
return cls.get_steer_feedforward_default
# returns a set of default params to avoid repetition in car specific params
@staticmethod
def get_std_params(candidate, fingerprint):
ret = car.CarParams.new_message()
ret.carFingerprint = candidate
# standard ALC params
ret.steerControlType = car.CarParams.SteerControlType.torque
ret.steerMaxBP = [0.]
ret.steerMaxV = [1.]
ret.minSteerSpeed = 0.
ret.wheelSpeedFactor = 1.0
ret.pcmCruise = True # openpilot's state is tied to the PCM's cruise state on most cars
ret.minEnableSpeed = -1. # enable is done by stock ACC, so ignore this
ret.steerRatioRear = 0. # no rear steering, at least on the listed cars aboveA
ret.openpilotLongitudinalControl = False
ret.startAccel = -0.8
ret.stopAccel = -2.0
ret.startingAccelRate = 3.2 # brake_travel/s while releasing on restart
ret.stoppingDecelRate = 0.8 # brake_travel/s while trying to stop
ret.vEgoStopping = 0.5
ret.vEgoStarting = 0.5
ret.stoppingControl = True
ret.longitudinalTuning.deadzoneBP = [0.]
ret.longitudinalTuning.deadzoneV = [0.]
ret.longitudinalTuning.kpBP = [0.]
ret.longitudinalTuning.kpV = [1.]
ret.longitudinalTuning.kiBP = [0.]
ret.longitudinalTuning.kiV = [1.]
ret.longitudinalActuatorDelayLowerBound = 0.15
ret.longitudinalActuatorDelayUpperBound = 0.15
return ret
5 years ago
# returns a car.CarState, pass in car.CarControl
def update(self, c, can_strings):
raise NotImplementedError
# return sendcan, pass in a car.CarControl
def apply(self, c):
raise NotImplementedError
def create_common_events(self, cs_out, extra_gears=None, gas_resume_speed=-1, pcm_enable=True):
events = Events()
if cs_out.doorOpen:
events.add(EventName.doorOpen)
if cs_out.seatbeltUnlatched:
events.add(EventName.seatbeltNotLatched)
if cs_out.gearShifter != GearShifter.drive and (extra_gears is None or
cs_out.gearShifter not in extra_gears):
events.add(EventName.wrongGear)
if cs_out.gearShifter == GearShifter.reverse:
events.add(EventName.reverseGear)
if not cs_out.cruiseState.available:
events.add(EventName.wrongCarMode)
if cs_out.espDisabled:
events.add(EventName.espDisabled)
if cs_out.gasPressed:
events.add(EventName.gasPressed)
if cs_out.stockFcw:
events.add(EventName.stockFcw)
if cs_out.stockAeb:
events.add(EventName.stockAeb)
if cs_out.vEgo > MAX_CTRL_SPEED:
events.add(EventName.speedTooHigh)
if cs_out.cruiseState.nonAdaptive:
events.add(EventName.wrongCruiseMode)
if cs_out.brakeHoldActive and self.CP.openpilotLongitudinalControl:
events.add(EventName.brakeHold)
# Handle permanent and temporary steering faults
self.steering_unpressed = 0 if cs_out.steeringPressed else self.steering_unpressed + 1
if cs_out.steerWarning:
# if the user overrode recently, show a less harsh alert
if self.silent_steer_warning or cs_out.standstill or self.steering_unpressed < int(1.5 / DT_CTRL):
self.silent_steer_warning = True
events.add(EventName.steerTempUnavailableSilent)
else:
events.add(EventName.steerTempUnavailable)
else:
self.silent_steer_warning = False
if cs_out.steerError:
events.add(EventName.steerUnavailable)
# Disable on rising edge of gas or brake. Also disable on brake when speed > 0.
# Optionally allow to press gas at zero speed to resume.
# e.g. Chrysler does not spam the resume button yet, so resuming with gas is handy. FIXME!
if (cs_out.gasPressed and (not self.CS.out.gasPressed) and cs_out.vEgo > gas_resume_speed) or \
(cs_out.brakePressed and (not self.CS.out.brakePressed or not cs_out.standstill)):
events.add(EventName.pedalPressed)
# we engage when pcm is active (rising edge)
if pcm_enable:
if cs_out.cruiseState.enabled and not self.CS.out.cruiseState.enabled:
events.add(EventName.pcmEnable)
elif not cs_out.cruiseState.enabled:
events.add(EventName.pcmDisable)
return events
5 years ago
class RadarInterfaceBase():
def __init__(self, CP):
self.pts = {}
self.delay = 0
self.radar_ts = CP.radarTimeStep
self.no_radar_sleep = 'NO_RADAR_SLEEP' in os.environ
5 years ago
def update(self, can_strings):
ret = car.RadarData.new_message()
if not self.no_radar_sleep:
5 years ago
time.sleep(self.radar_ts) # radard runs on RI updates
return ret
class CarStateBase:
def __init__(self, CP):
self.CP = CP
self.car_fingerprint = CP.carFingerprint
self.out = car.CarState.new_message()
self.cruise_buttons = 0
self.left_blinker_cnt = 0
self.right_blinker_cnt = 0
self.left_blinker_prev = False
self.right_blinker_prev = False
# Q = np.matrix([[10.0, 0.0], [0.0, 100.0]])
# R = 1e3
self.v_ego_kf = KF1D(x0=[[0.0], [0.0]],
A=[[1.0, DT_CTRL], [0.0, 1.0]],
C=[1.0, 0.0],
K=[[0.12287673], [0.29666309]])
def update_speed_kf(self, v_ego_raw):
if abs(v_ego_raw - self.v_ego_kf.x[0][0]) > 2.0: # Prevent large accelerations when car starts at non zero speed
self.v_ego_kf.x = [[v_ego_raw], [0.0]]
v_ego_x = self.v_ego_kf.update(v_ego_raw)
return float(v_ego_x[0]), float(v_ego_x[1])
def get_wheel_speeds(self, fl, fr, rl, rr, unit=CV.KPH_TO_MS):
factor = unit * self.CP.wheelSpeedFactor
wheelSpeeds = car.CarState.WheelSpeeds.new_message()
wheelSpeeds.fl = fl * factor
wheelSpeeds.fr = fr * factor
wheelSpeeds.rl = rl * factor
wheelSpeeds.rr = rr * factor
return wheelSpeeds
def update_blinker_from_lamp(self, blinker_time: int, left_blinker_lamp: bool, right_blinker_lamp: bool):
"""Update blinkers from lights. Enable output when light was seen within the last `blinker_time`
iterations"""
# TODO: Handle case when switching direction. Now both blinkers can be on at the same time
self.left_blinker_cnt = blinker_time if left_blinker_lamp else max(self.left_blinker_cnt - 1, 0)
self.right_blinker_cnt = blinker_time if right_blinker_lamp else max(self.right_blinker_cnt - 1, 0)
return self.left_blinker_cnt > 0, self.right_blinker_cnt > 0
def update_blinker_from_stalk(self, blinker_time: int, left_blinker_stalk: bool, right_blinker_stalk: bool):
"""Update blinkers from stalk position. When stalk is seen the blinker will be on for at least blinker_time,
or until the stalk is turned off, whichever is longer. If the opposite stalk direction is seen the blinker
is forced to the other side. On a rising edge of the stalk the timeout is reset."""
if left_blinker_stalk:
self.right_blinker_cnt = 0
if not self.left_blinker_prev:
self.left_blinker_cnt = blinker_time
if right_blinker_stalk:
self.left_blinker_cnt = 0
if not self.right_blinker_prev:
self.right_blinker_cnt = blinker_time
self.left_blinker_cnt = max(self.left_blinker_cnt - 1, 0)
self.right_blinker_cnt = max(self.right_blinker_cnt - 1, 0)
self.left_blinker_prev = left_blinker_stalk
self.right_blinker_prev = right_blinker_stalk
return bool(left_blinker_stalk or self.left_blinker_cnt > 0), bool(right_blinker_stalk or self.right_blinker_cnt > 0)
@staticmethod
def parse_gear_shifter(gear: str) -> car.CarState.GearShifter:
d: Dict[str, car.CarState.GearShifter] = {
'P': GearShifter.park, 'R': GearShifter.reverse, 'N': GearShifter.neutral,
'E': GearShifter.eco, 'T': GearShifter.manumatic, 'D': GearShifter.drive,
'S': GearShifter.sport, 'L': GearShifter.low, 'B': GearShifter.brake
}
return d.get(gear, GearShifter.unknown)
@staticmethod
def get_cam_can_parser(CP):
return None
@staticmethod
def get_body_can_parser(CP):
return None
@staticmethod
def get_loopback_can_parser(CP):
return None