# functions common among cars
from collections import namedtuple
from dataclasses import dataclass
from enum import IntFlag , ReprEnum , EnumType
from dataclasses import replace
import capnp
from cereal import car
from openpilot . common . numpy_fast import clip , interp
from openpilot . common . utils import Freezable
from openpilot . selfdrive . car . docs_definitions import CarDocs
# kg of standard extra cargo to count for drive, gas, etc...
STD_CARGO_KG = 136.
ButtonType = car . CarState . ButtonEvent . Type
EventName = car . CarEvent . EventName
AngleRateLimit = namedtuple ( ' AngleRateLimit ' , [ ' speed_bp ' , ' angle_v ' ] )
def apply_hysteresis ( val : float , val_steady : float , hyst_gap : float ) - > float :
if val > val_steady + hyst_gap :
val_steady = val - hyst_gap
elif val < val_steady - hyst_gap :
val_steady = val + hyst_gap
return val_steady
def create_button_events ( cur_btn : int , prev_btn : int , buttons_dict : dict [ int , capnp . lib . capnp . _EnumModule ] ,
unpressed_btn : int = 0 ) - > list [ capnp . lib . capnp . _DynamicStructBuilder ] :
events : list [ capnp . lib . capnp . _DynamicStructBuilder ] = [ ]
if cur_btn == prev_btn :
return events
# Add events for button presses, multiple when a button switches without going to unpressed
for pressed , btn in ( ( False , prev_btn ) , ( True , cur_btn ) ) :
if btn != unpressed_btn :
events . append ( car . CarState . ButtonEvent ( pressed = pressed ,
type = buttons_dict . get ( btn , ButtonType . unknown ) ) )
return events
def gen_empty_fingerprint ( ) :
return { i : { } for i in range ( 8 ) }
# these params were derived for the Civic and used to calculate params for other cars
class VehicleDynamicsParams :
MASS = 1326. + STD_CARGO_KG
WHEELBASE = 2.70
CENTER_TO_FRONT = WHEELBASE * 0.4
CENTER_TO_REAR = WHEELBASE - CENTER_TO_FRONT
ROTATIONAL_INERTIA = 2500
TIRE_STIFFNESS_FRONT = 192150
TIRE_STIFFNESS_REAR = 202500
# TODO: get actual value, for now starting with reasonable value for
# civic and scaling by mass and wheelbase
def scale_rot_inertia ( mass , wheelbase ) :
return VehicleDynamicsParams . ROTATIONAL_INERTIA * mass * wheelbase * * 2 / ( VehicleDynamicsParams . MASS * VehicleDynamicsParams . WHEELBASE * * 2 )
# TODO: start from empirically derived lateral slip stiffness for the civic and scale by
# mass and CG position, so all cars will have approximately similar dyn behaviors
def scale_tire_stiffness ( mass , wheelbase , center_to_front , tire_stiffness_factor ) :
center_to_rear = wheelbase - center_to_front
tire_stiffness_front = ( VehicleDynamicsParams . TIRE_STIFFNESS_FRONT * tire_stiffness_factor ) * mass / VehicleDynamicsParams . MASS * \
( center_to_rear / wheelbase ) / ( VehicleDynamicsParams . CENTER_TO_REAR / VehicleDynamicsParams . WHEELBASE )
tire_stiffness_rear = ( VehicleDynamicsParams . TIRE_STIFFNESS_REAR * tire_stiffness_factor ) * mass / VehicleDynamicsParams . MASS * \
( center_to_front / wheelbase ) / ( VehicleDynamicsParams . CENTER_TO_FRONT / VehicleDynamicsParams . WHEELBASE )
return tire_stiffness_front , tire_stiffness_rear
DbcDict = dict [ str , str ]
def dbc_dict ( pt_dbc , radar_dbc , chassis_dbc = None , body_dbc = None ) - > DbcDict :
return { ' pt ' : pt_dbc , ' radar ' : radar_dbc , ' chassis ' : chassis_dbc , ' body ' : body_dbc }
def apply_driver_steer_torque_limits ( apply_torque , apply_torque_last , driver_torque , LIMITS ) :
# limits due to driver torque
driver_max_torque = LIMITS . STEER_MAX + ( LIMITS . STEER_DRIVER_ALLOWANCE + driver_torque * LIMITS . STEER_DRIVER_FACTOR ) * LIMITS . STEER_DRIVER_MULTIPLIER
driver_min_torque = - LIMITS . STEER_MAX + ( - LIMITS . STEER_DRIVER_ALLOWANCE + driver_torque * LIMITS . STEER_DRIVER_FACTOR ) * LIMITS . STEER_DRIVER_MULTIPLIER
max_steer_allowed = max ( min ( LIMITS . STEER_MAX , driver_max_torque ) , 0 )
min_steer_allowed = min ( max ( - LIMITS . STEER_MAX , driver_min_torque ) , 0 )
apply_torque = clip ( apply_torque , min_steer_allowed , max_steer_allowed )
# slow rate if steer torque increases in magnitude
if apply_torque_last > 0 :
apply_torque = clip ( apply_torque , max ( apply_torque_last - LIMITS . STEER_DELTA_DOWN , - LIMITS . STEER_DELTA_UP ) ,
apply_torque_last + LIMITS . STEER_DELTA_UP )
else :
apply_torque = clip ( apply_torque , apply_torque_last - LIMITS . STEER_DELTA_UP ,
min ( apply_torque_last + LIMITS . STEER_DELTA_DOWN , LIMITS . STEER_DELTA_UP ) )
return int ( round ( float ( apply_torque ) ) )
def apply_dist_to_meas_limits ( val , val_last , val_meas ,
STEER_DELTA_UP , STEER_DELTA_DOWN ,
STEER_ERROR_MAX , STEER_MAX ) :
# limits due to comparison of commanded val VS measured val (torque/angle/curvature)
max_lim = min ( max ( val_meas + STEER_ERROR_MAX , STEER_ERROR_MAX ) , STEER_MAX )
min_lim = max ( min ( val_meas - STEER_ERROR_MAX , - STEER_ERROR_MAX ) , - STEER_MAX )
val = clip ( val , min_lim , max_lim )
# slow rate if val increases in magnitude
if val_last > 0 :
val = clip ( val ,
max ( val_last - STEER_DELTA_DOWN , - STEER_DELTA_UP ) ,
val_last + STEER_DELTA_UP )
else :
val = clip ( val ,
val_last - STEER_DELTA_UP ,
min ( val_last + STEER_DELTA_DOWN , STEER_DELTA_UP ) )
return float ( val )
def apply_meas_steer_torque_limits ( apply_torque , apply_torque_last , motor_torque , LIMITS ) :
return int ( round ( apply_dist_to_meas_limits ( apply_torque , apply_torque_last , motor_torque ,
LIMITS . STEER_DELTA_UP , LIMITS . STEER_DELTA_DOWN ,
LIMITS . STEER_ERROR_MAX , LIMITS . STEER_MAX ) ) )
def apply_std_steer_angle_limits ( apply_angle , apply_angle_last , v_ego , LIMITS ) :
# pick angle rate limits based on wind up/down
steer_up = apply_angle_last * apply_angle > = 0. and abs ( apply_angle ) > abs ( apply_angle_last )
rate_limits = LIMITS . ANGLE_RATE_LIMIT_UP if steer_up else LIMITS . ANGLE_RATE_LIMIT_DOWN
angle_rate_lim = interp ( v_ego , rate_limits . speed_bp , rate_limits . angle_v )
return clip ( apply_angle , apply_angle_last - angle_rate_lim , apply_angle_last + angle_rate_lim )
def common_fault_avoidance ( fault_condition : bool , request : bool , above_limit_frames : int ,
max_above_limit_frames : int , max_mismatching_frames : int = 1 ) :
"""
Several cars have the ability to work around their EPS limits by cutting the
request bit of their LKAS message after a certain number of frames above the limit .
"""
# Count up to max_above_limit_frames, at which point we need to cut the request for above_limit_frames to avoid a fault
if request and fault_condition :
above_limit_frames + = 1
else :
above_limit_frames = 0
# Once we cut the request bit, count additionally to max_mismatching_frames before setting the request bit high again.
# Some brands do not respect our workaround without multiple messages on the bus, for example
if above_limit_frames > max_above_limit_frames :
request = False
if above_limit_frames > = max_above_limit_frames + max_mismatching_frames :
above_limit_frames = 0
return above_limit_frames , request
def make_can_msg ( addr , dat , bus ) :
return [ addr , 0 , dat , bus ]
def get_safety_config ( safety_model , safety_param = None ) :
ret = car . CarParams . SafetyConfig . new_message ( )
ret . safetyModel = safety_model
if safety_param is not None :
ret . safetyParam = safety_param
return ret
class CanBusBase :
offset : int
def __init__ ( self , CP , fingerprint : dict [ int , dict [ int , int ] ] | None ) - > None :
if CP is None :
assert fingerprint is not None
num = max ( [ k for k , v in fingerprint . items ( ) if len ( v ) ] , default = 0 ) / / 4 + 1
else :
num = len ( CP . safetyConfigs )
self . offset = 4 * ( num - 1 )
class CanSignalRateCalculator :
"""
Calculates the instantaneous rate of a CAN signal by using the counter
variable and the known frequency of the CAN message that contains it .
"""
def __init__ ( self , frequency ) :
self . frequency = frequency
self . previous_counter = 0
self . previous_value = 0
self . rate = 0
def update ( self , current_value , current_counter ) :
if current_counter != self . previous_counter :
self . rate = ( current_value - self . previous_value ) * self . frequency
self . previous_counter = current_counter
self . previous_value = current_value
return self . rate
@dataclass ( frozen = True , kw_only = True )
class CarSpecs :
mass : float # kg, curb weight
wheelbase : float # meters
steerRatio : float
centerToFrontRatio : float = 0.5
minSteerSpeed : float = 0.0 # m/s
minEnableSpeed : float = - 1.0 # m/s
tireStiffnessFactor : float = 1.0
def override ( self , * * kwargs ) :
return replace ( self , * * kwargs )
@dataclass ( order = True )
class PlatformConfig ( Freezable ) :
car_docs : list [ CarDocs ]
specs : CarSpecs
dbc_dict : DbcDict
flags : int = 0
platform_str : str | None = None
def __hash__ ( self ) - > int :
return hash ( self . platform_str )
def override ( self , * * kwargs ) :
return replace ( self , * * kwargs )
def init ( self ) :
pass
def __post_init__ ( self ) :
self . init ( )
class PlatformsType ( EnumType ) :
def __new__ ( metacls , cls , bases , classdict , * , boundary = None , _simple = False , * * kwds ) :
for key in classdict . _member_names . keys ( ) :
cfg : PlatformConfig = classdict [ key ]
cfg . platform_str = key
cfg . freeze ( )
return super ( ) . __new__ ( metacls , cls , bases , classdict , boundary = boundary , _simple = _simple , * * kwds )
class Platforms ( str , ReprEnum , metaclass = PlatformsType ) :
config : PlatformConfig
def __new__ ( cls , platform_config : PlatformConfig ) :
member = str . __new__ ( cls , platform_config . platform_str )
member . config = platform_config
member . _value_ = platform_config . platform_str
return member
def __repr__ ( self ) :
return f " < { self . __class__ . __name__ } . { self . name } > "
@classmethod
def create_dbc_map ( cls ) - > dict [ str , DbcDict ] :
return { p : p . config . dbc_dict for p in cls }
@classmethod
def with_flags ( cls , flags : IntFlag ) - > set [ ' Platforms ' ] :
return { p for p in cls if p . config . flags & flags }