from selfdrive . car import apply_std_steer_torque_limits
from selfdrive . car . subaru import subarucan
from selfdrive . car . subaru . values import DBC , PREGLOBAL_CARS , CarControllerParams
from opendbc . can . packer import CANPacker
class CarController ( ) :
def __init__ ( self , dbc_name , CP , VM ) :
self . apply_steer_last = 0
self . es_distance_cnt = - 1
self . es_lkas_cnt = - 1
self . cruise_button_prev = 0
self . steer_rate_limited = False
self . p = CarControllerParams ( CP )
self . packer = CANPacker ( DBC [ CP . carFingerprint ] [ ' pt ' ] )
def update ( self , enabled , CS , frame , actuators , pcm_cancel_cmd , visual_alert , left_line , right_line , left_lane_depart , right_lane_depart ) :
can_sends = [ ]
# *** steering ***
if ( frame % self . p . STEER_STEP ) == 0 :
apply_steer = int ( round ( actuators . steer * self . p . STEER_MAX ) )
# limits due to driver torque
new_steer = int ( round ( apply_steer ) )
apply_steer = apply_std_steer_torque_limits ( new_steer , self . apply_steer_last , CS . out . steeringTorque , self . p )
self . steer_rate_limited = new_steer != apply_steer
if not enabled :
apply_steer = 0
if CS . CP . carFingerprint in PREGLOBAL_CARS :
can_sends . append ( subarucan . create_preglobal_steering_control ( self . packer , apply_steer , frame , self . p . STEER_STEP ) )
else :
can_sends . append ( subarucan . create_steering_control ( self . packer , apply_steer , frame , self . p . STEER_STEP ) )
self . apply_steer_last = apply_steer
# *** alerts and pcm cancel ***
if CS . CP . carFingerprint in PREGLOBAL_CARS :
if self . es_distance_cnt != CS . es_distance_msg [ " Counter " ] :
# 1 = main, 2 = set shallow, 3 = set deep, 4 = resume shallow, 5 = resume deep
# disengage ACC when OP is disengaged
if pcm_cancel_cmd :
cruise_button = 1
# turn main on if off and past start-up state
elif not CS . out . cruiseState . available and CS . ready :
cruise_button = 1
else :
cruise_button = CS . cruise_button
# unstick previous mocked button press
if cruise_button == 1 and self . cruise_button_prev == 1 :
cruise_button = 0
self . cruise_button_prev = cruise_button
can_sends . append ( subarucan . create_preglobal_es_distance ( self . packer , cruise_button , CS . es_distance_msg ) )
self . es_distance_cnt = CS . es_distance_msg [ " Counter " ]
else :
if self . es_distance_cnt != CS . es_distance_msg [ " Counter " ] :
can_sends . append ( subarucan . create_es_distance ( self . packer , CS . es_distance_msg , pcm_cancel_cmd ) )
self . es_distance_cnt = CS . es_distance_msg [ " Counter " ]
if self . es_lkas_cnt != CS . es_lkas_msg [ " Counter " ] :
can_sends . append ( subarucan . create_es_lkas ( self . packer , CS . es_lkas_msg , enabled , visual_alert , left_line , right_line , left_lane_depart , right_lane_depart ) )
self . es_lkas_cnt = CS . es_lkas_msg [ " Counter " ]
return can_sends