#!/usr/bin/env python3
import os
from cereal import car , log
from common . numpy_fast import clip
from common . realtime import sec_since_boot , config_realtime_process , Priority , Ratekeeper , DT_CTRL
from common . profiler import Profiler
from common . params import Params , put_nonblocking
import cereal . messaging as messaging
from selfdrive . config import Conversions as CV
from selfdrive . swaglog import cloudlog
from selfdrive . boardd . boardd import can_list_to_can_capnp
from selfdrive . car . car_helpers import get_car , get_startup_event , get_one_can
from selfdrive . controls . lib . lane_planner import CAMERA_OFFSET
from selfdrive . controls . lib . drive_helpers import update_v_cruise , initialize_v_cruise
from selfdrive . controls . lib . longcontrol import LongControl , STARTING_TARGET_SPEED
from selfdrive . controls . lib . latcontrol_pid import LatControlPID
from selfdrive . controls . lib . latcontrol_indi import LatControlINDI
from selfdrive . controls . lib . latcontrol_lqr import LatControlLQR
from selfdrive . controls . lib . events import Events , ET
from selfdrive . controls . lib . alertmanager import AlertManager
from selfdrive . controls . lib . vehicle_model import VehicleModel
from selfdrive . controls . lib . longitudinal_planner import LON_MPC_STEP
from selfdrive . locationd . calibrationd import Calibration
from selfdrive . hardware import HARDWARE , TICI
LDW_MIN_SPEED = 31 * CV . MPH_TO_MS
LANE_DEPARTURE_THRESHOLD = 0.1
STEER_ANGLE_SATURATION_TIMEOUT = 1.0 / DT_CTRL
STEER_ANGLE_SATURATION_THRESHOLD = 2.5 # Degrees
SIMULATION = " SIMULATION " in os . environ
NOSENSOR = " NOSENSOR " in os . environ
IGNORE_PROCESSES = set ( [ " rtshield " , " uploader " , " deleter " , " loggerd " , " logmessaged " , " tombstoned " , " logcatd " , " proclogd " , " clocksd " , " updated " , " timezoned " ] )
ThermalStatus = log . DeviceState . ThermalStatus
State = log . ControlsState . OpenpilotState
PandaType = log . PandaState . PandaType
LongitudinalPlanSource = log . LongitudinalPlan . LongitudinalPlanSource
Desire = log . LateralPlan . Desire
LaneChangeState = log . LateralPlan . LaneChangeState
LaneChangeDirection = log . LateralPlan . LaneChangeDirection
EventName = car . CarEvent . EventName
class Controls :
def __init__ ( self , sm = None , pm = None , can_sock = None ) :
config_realtime_process ( 3 , Priority . CTRL_HIGH )
# Setup sockets
self . pm = pm
if self . pm is None :
self . pm = messaging . PubMaster ( [ ' sendcan ' , ' controlsState ' , ' carState ' ,
' carControl ' , ' carEvents ' , ' carParams ' ] )
self . sm = sm
if self . sm is None :
ignore = [ ' ubloxRaw ' , ' driverCameraState ' , ' managerState ' ] if SIMULATION else None
self . sm = messaging . SubMaster ( [ ' deviceState ' , ' pandaState ' , ' modelV2 ' , ' liveCalibration ' , ' ubloxRaw ' ,
' driverMonitoringState ' , ' longitudinalPlan ' , ' lateralPlan ' , ' liveLocationKalman ' ,
' roadCameraState ' , ' driverCameraState ' , ' managerState ' , ' liveParameters ' , ' radarState ' ] , ignore_alive = ignore )
self . can_sock = can_sock
if can_sock is None :
can_timeout = None if os . environ . get ( ' NO_CAN_TIMEOUT ' , False ) else 100
self . can_sock = messaging . sub_sock ( ' can ' , timeout = can_timeout )
# wait for one pandaState and one CAN packet
print ( " Waiting for CAN messages... " )
get_one_can ( self . can_sock )
self . CI , self . CP = get_car ( self . can_sock , self . pm . sock [ ' sendcan ' ] )
# read params
params = Params ( )
self . is_metric = params . get ( " IsMetric " , encoding = ' utf8 ' ) == " 1 "
self . is_ldw_enabled = params . get ( " IsLdwEnabled " , encoding = ' utf8 ' ) == " 1 "
community_feature_toggle = params . get ( " CommunityFeaturesToggle " , encoding = ' utf8 ' ) == " 1 "
openpilot_enabled_toggle = params . get ( " OpenpilotEnabledToggle " , encoding = ' utf8 ' ) == " 1 "
passive = params . get ( " Passive " , encoding = ' utf8 ' ) == " 1 " or not openpilot_enabled_toggle
# detect sound card presence and ensure successful init
sounds_available = HARDWARE . get_sound_card_online ( )
car_recognized = self . CP . carName != ' mock '
# If stock camera is disconnected, we loaded car controls and it's not dashcam mode
controller_available = self . CP . enableCamera and self . CI . CC is not None and not passive and not self . CP . dashcamOnly
community_feature_disallowed = self . CP . communityFeature and not community_feature_toggle
self . read_only = not car_recognized or not controller_available or \
self . CP . dashcamOnly or community_feature_disallowed
if self . read_only :
self . CP . safetyModel = car . CarParams . SafetyModel . noOutput
# Write CarParams for radard and boardd safety mode
cp_bytes = self . CP . to_bytes ( )
params . put ( " CarParams " , cp_bytes )
put_nonblocking ( " CarParamsCache " , cp_bytes )
self . CC = car . CarControl . new_message ( )
self . AM = AlertManager ( )
self . events = Events ( )
self . LoC = LongControl ( self . CP , self . CI . compute_gb )
self . VM = VehicleModel ( self . CP )
if self . CP . lateralTuning . which ( ) == ' pid ' :
self . LaC = LatControlPID ( self . CP )
elif self . CP . lateralTuning . which ( ) == ' indi ' :
self . LaC = LatControlINDI ( self . CP )
elif self . CP . lateralTuning . which ( ) == ' lqr ' :
self . LaC = LatControlLQR ( self . CP )
self . state = State . disabled
self . enabled = False
self . active = False
self . can_rcv_error = False
self . soft_disable_timer = 0
self . v_cruise_kph = 255
self . v_cruise_kph_last = 0
self . mismatch_counter = 0
self . can_error_counter = 0
self . last_blinker_frame = 0
self . saturated_count = 0
self . distance_traveled = 0
self . last_functional_fan_frame = 0
self . events_prev = [ ]
self . current_alert_types = [ ET . PERMANENT ]
self . logged_comm_issue = False
self . sm [ ' liveCalibration ' ] . calStatus = Calibration . CALIBRATED
self . sm [ ' deviceState ' ] . freeSpacePercent = 100
self . sm [ ' driverMonitoringState ' ] . events = [ ]
self . sm [ ' driverMonitoringState ' ] . awarenessStatus = 1.
self . sm [ ' driverMonitoringState ' ] . faceDetected = False
self . startup_event = get_startup_event ( car_recognized , controller_available )
if not sounds_available :
self . events . add ( EventName . soundsUnavailable , static = True )
if community_feature_disallowed :
self . events . add ( EventName . communityFeatureDisallowed , static = True )
if not car_recognized :
self . events . add ( EventName . carUnrecognized , static = True )
# controlsd is driven by can recv, expected at 100Hz
self . rk = Ratekeeper ( 100 , print_delay_threshold = None )
self . prof = Profiler ( False ) # off by default
def update_events ( self , CS ) :
""" Compute carEvents from carState """
self . events . clear ( )
self . events . add_from_msg ( CS . events )
self . events . add_from_msg ( self . sm [ ' driverMonitoringState ' ] . events )
# Handle startup event
if self . startup_event is not None :
self . events . add ( self . startup_event )
self . startup_event = None
# Create events for battery, temperature, disk space, and memory
if self . sm [ ' deviceState ' ] . batteryPercent < 1 and self . sm [ ' deviceState ' ] . chargingError :
# at zero percent battery, while discharging, OP should not allowed
self . events . add ( EventName . lowBattery )
if self . sm [ ' deviceState ' ] . thermalStatus > = ThermalStatus . red :
self . events . add ( EventName . overheat )
if self . sm [ ' deviceState ' ] . freeSpacePercent < 7 :
# under 7% of space free no enable allowed
self . events . add ( EventName . outOfSpace )
if self . sm [ ' deviceState ' ] . memoryUsagePercent > 90 :
self . events . add ( EventName . lowMemory )
# Alert if fan isn't spinning for 5 seconds
if self . sm [ ' pandaState ' ] . pandaType in [ PandaType . uno , PandaType . dos ] :
if self . sm [ ' pandaState ' ] . fanSpeedRpm == 0 and self . sm [ ' deviceState ' ] . fanSpeedPercentDesired > 50 :
if ( self . sm . frame - self . last_functional_fan_frame ) * DT_CTRL > 5.0 :
self . events . add ( EventName . fanMalfunction )
else :
self . last_functional_fan_frame = self . sm . frame
# Handle calibration status
cal_status = self . sm [ ' liveCalibration ' ] . calStatus
if cal_status != Calibration . CALIBRATED :
if cal_status == Calibration . UNCALIBRATED :
self . events . add ( EventName . calibrationIncomplete )
else :
self . events . add ( EventName . calibrationInvalid )
# Handle lane change
if self . sm [ ' lateralPlan ' ] . laneChangeState == LaneChangeState . preLaneChange :
direction = self . sm [ ' lateralPlan ' ] . laneChangeDirection
if ( CS . leftBlindspot and direction == LaneChangeDirection . left ) or \
( CS . rightBlindspot and direction == LaneChangeDirection . right ) :
self . events . add ( EventName . laneChangeBlocked )
else :
if direction == LaneChangeDirection . left :
self . events . add ( EventName . preLaneChangeLeft )
else :
self . events . add ( EventName . preLaneChangeRight )
elif self . sm [ ' lateralPlan ' ] . laneChangeState in [ LaneChangeState . laneChangeStarting ,
LaneChangeState . laneChangeFinishing ] :
self . events . add ( EventName . laneChange )
if self . can_rcv_error or ( not CS . canValid and self . sm . frame > 5 / DT_CTRL ) :
self . events . add ( EventName . canError )
if ( self . sm [ ' pandaState ' ] . safetyModel != self . CP . safetyModel and self . sm . frame > 2 / DT_CTRL ) or \
self . mismatch_counter > = 200 :
self . events . add ( EventName . controlsMismatch )
if len ( self . sm [ ' radarState ' ] . radarErrors ) :
self . events . add ( EventName . radarFault )
elif not self . sm . valid [ ' liveParameters ' ] :
self . events . add ( EventName . vehicleModelInvalid )
elif not self . sm . all_alive_and_valid ( ) :
self . events . add ( EventName . commIssue )
if not self . logged_comm_issue :
cloudlog . error ( f " commIssue - valid: { self . sm . valid } - alive: { self . sm . alive } " )
self . logged_comm_issue = True
else :
self . logged_comm_issue = False
if not self . sm [ ' lateralPlan ' ] . mpcSolutionValid :
self . events . add ( EventName . plannerError )
if not self . sm [ ' liveLocationKalman ' ] . sensorsOK and not NOSENSOR :
if self . sm . frame > 5 / DT_CTRL : # Give locationd some time to receive all the inputs
self . events . add ( EventName . sensorDataInvalid )
if not self . sm [ ' liveLocationKalman ' ] . posenetOK :
self . events . add ( EventName . posenetInvalid )
if not self . sm [ ' liveLocationKalman ' ] . deviceStable :
self . events . add ( EventName . deviceFalling )
if log . PandaState . FaultType . relayMalfunction in self . sm [ ' pandaState ' ] . faults :
self . events . add ( EventName . relayMalfunction )
if self . sm [ ' longitudinalPlan ' ] . fcw :
self . events . add ( EventName . fcw )
# TODO: fix simulator
if not SIMULATION :
if not NOSENSOR :
if not self . sm . alive [ ' ubloxRaw ' ] and ( self . sm . frame > 10. / DT_CTRL ) :
self . events . add ( EventName . gpsMalfunction )
elif not self . sm [ ' liveLocationKalman ' ] . gpsOK and ( self . distance_traveled > 1000 ) and not TICI :
# Not show in first 1 km to allow for driving out of garage. This event shows after 5 minutes
self . events . add ( EventName . noGps )
if not self . sm . all_alive ( [ ' roadCameraState ' , ' driverCameraState ' ] ) and ( self . sm . frame > 5 / DT_CTRL ) :
self . events . add ( EventName . cameraMalfunction )
if self . sm [ ' modelV2 ' ] . frameDropPerc > 20 :
self . events . add ( EventName . modeldLagging )
# Check if all manager processes are running
not_running = set ( p . name for p in self . sm [ ' managerState ' ] . processes if not p . running )
if self . sm . rcv_frame [ ' managerState ' ] and ( not_running - IGNORE_PROCESSES ) :
self . events . add ( EventName . processNotRunning )
# Only allow engagement with brake pressed when stopped behind another stopped car
if CS . brakePressed and self . sm [ ' longitudinalPlan ' ] . vTargetFuture > = STARTING_TARGET_SPEED \
and self . CP . openpilotLongitudinalControl and CS . vEgo < 0.3 :
self . events . add ( EventName . noTarget )
def data_sample ( self ) :
""" Receive data from sockets and update carState """
# Update carState from CAN
can_strs = messaging . drain_sock_raw ( self . can_sock , wait_for_one = True )
CS = self . CI . update ( self . CC , can_strs )
self . sm . update ( 0 )
# Check for CAN timeout
if not can_strs :
self . can_error_counter + = 1
self . can_rcv_error = True
else :
self . can_rcv_error = False
# When the panda and controlsd do not agree on controls_allowed
# we want to disengage openpilot. However the status from the panda goes through
# another socket other than the CAN messages and one can arrive earlier than the other.
# Therefore we allow a mismatch for two samples, then we trigger the disengagement.
if not self . enabled :
self . mismatch_counter = 0
if not self . sm [ ' pandaState ' ] . controlsAllowed and self . enabled :
self . mismatch_counter + = 1
self . distance_traveled + = CS . vEgo * DT_CTRL
return CS
def state_transition ( self , CS ) :
""" Compute conditional state transitions and execute actions on state transitions """
self . v_cruise_kph_last = self . v_cruise_kph
# if stock cruise is completely disabled, then we can use our own set speed logic
if not self . CP . enableCruise :
self . v_cruise_kph = update_v_cruise ( self . v_cruise_kph , CS . buttonEvents , self . enabled )
elif self . CP . enableCruise and CS . cruiseState . enabled :
self . v_cruise_kph = CS . cruiseState . speed * CV . MS_TO_KPH
# decrease the soft disable timer at every step, as it's reset on
# entrance in SOFT_DISABLING state
self . soft_disable_timer = max ( 0 , self . soft_disable_timer - 1 )
self . current_alert_types = [ ET . PERMANENT ]
# ENABLED, PRE ENABLING, SOFT DISABLING
if self . state != State . disabled :
# user and immediate disable always have priority in a non-disabled state
if self . events . any ( ET . USER_DISABLE ) :
self . state = State . disabled
self . current_alert_types . append ( ET . USER_DISABLE )
elif self . events . any ( ET . IMMEDIATE_DISABLE ) :
self . state = State . disabled
self . current_alert_types . append ( ET . IMMEDIATE_DISABLE )
else :
# ENABLED
if self . state == State . enabled :
if self . events . any ( ET . SOFT_DISABLE ) :
self . state = State . softDisabling
self . soft_disable_timer = 300 # 3s
self . current_alert_types . append ( ET . SOFT_DISABLE )
# SOFT DISABLING
elif self . state == State . softDisabling :
if not self . events . any ( ET . SOFT_DISABLE ) :
# no more soft disabling condition, so go back to ENABLED
self . state = State . enabled
elif self . events . any ( ET . SOFT_DISABLE ) and self . soft_disable_timer > 0 :
self . current_alert_types . append ( ET . SOFT_DISABLE )
elif self . soft_disable_timer < = 0 :
self . state = State . disabled
# PRE ENABLING
elif self . state == State . preEnabled :
if not self . events . any ( ET . PRE_ENABLE ) :
self . state = State . enabled
else :
self . current_alert_types . append ( ET . PRE_ENABLE )
# DISABLED
elif self . state == State . disabled :
if self . events . any ( ET . ENABLE ) :
if self . events . any ( ET . NO_ENTRY ) :
self . current_alert_types . append ( ET . NO_ENTRY )
else :
if self . events . any ( ET . PRE_ENABLE ) :
self . state = State . preEnabled
else :
self . state = State . enabled
self . current_alert_types . append ( ET . ENABLE )
self . v_cruise_kph = initialize_v_cruise ( CS . vEgo , CS . buttonEvents , self . v_cruise_kph_last )
# Check if actuators are enabled
self . active = self . state == State . enabled or self . state == State . softDisabling
if self . active :
self . current_alert_types . append ( ET . WARNING )
# Check if openpilot is engaged
self . enabled = self . active or self . state == State . preEnabled
def state_control ( self , CS ) :
""" Given the state, this function returns an actuators packet """
lat_plan = self . sm [ ' lateralPlan ' ]
long_plan = self . sm [ ' longitudinalPlan ' ]
actuators = car . CarControl . Actuators . new_message ( )
if CS . leftBlinker or CS . rightBlinker :
self . last_blinker_frame = self . sm . frame
# State specific actions
if not self . active :
self . LaC . reset ( )
self . LoC . reset ( v_pid = CS . vEgo )
long_plan_age = DT_CTRL * ( self . sm . frame - self . sm . rcv_frame [ ' longitudinalPlan ' ] )
# no greater than dt mpc + dt, to prevent too high extraps
dt = min ( long_plan_age , LON_MPC_STEP + DT_CTRL ) + DT_CTRL
a_acc_sol = long_plan . aStart + ( dt / LON_MPC_STEP ) * ( long_plan . aTarget - long_plan . aStart )
v_acc_sol = long_plan . vStart + dt * ( a_acc_sol + long_plan . aStart ) / 2.0
# Gas/Brake PID loop
actuators . gas , actuators . brake = self . LoC . update ( self . active , CS , v_acc_sol , long_plan . vTargetFuture , a_acc_sol , self . CP )
# Steering PID loop and lateral MPC
actuators . steer , actuators . steeringAngleDeg , lac_log = self . LaC . update ( self . active , CS , self . CP , lat_plan )
# Check for difference between desired angle and angle for angle based control
angle_control_saturated = self . CP . steerControlType == car . CarParams . SteerControlType . angle and \
abs ( actuators . steeringAngleDeg - CS . steeringAngleDeg ) > STEER_ANGLE_SATURATION_THRESHOLD
if angle_control_saturated and not CS . steeringPressed and self . active :
self . saturated_count + = 1
else :
self . saturated_count = 0
# Send a "steering required alert" if saturation count has reached the limit
if ( lac_log . saturated and not CS . steeringPressed ) or \
( self . saturated_count > STEER_ANGLE_SATURATION_TIMEOUT ) :
# Check if we deviated from the path
left_deviation = actuators . steer > 0 and lat_plan . dPathPoints [ 0 ] < - 0.1
right_deviation = actuators . steer < 0 and lat_plan . dPathPoints [ 0 ] > 0.1
if left_deviation or right_deviation :
self . events . add ( EventName . steerSaturated )
return actuators , v_acc_sol , a_acc_sol , lac_log
def publish_logs ( self , CS , start_time , actuators , v_acc , a_acc , lac_log ) :
""" Send actuators and hud commands to the car, send controlsstate and MPC logging """
CC = car . CarControl . new_message ( )
CC . enabled = self . enabled
CC . actuators = actuators
CC . cruiseControl . override = True
CC . cruiseControl . cancel = not self . CP . enableCruise or ( not self . enabled and CS . cruiseState . enabled )
# Some override values for Honda
# brake discount removes a sharp nonlinearity
brake_discount = ( 1.0 - clip ( actuators . brake * 3. , 0.0 , 1.0 ) )
speed_override = max ( 0.0 , ( self . LoC . v_pid + CS . cruiseState . speedOffset ) * brake_discount )
CC . cruiseControl . speedOverride = float ( speed_override if self . CP . enableCruise else 0.0 )
CC . cruiseControl . accelOverride = self . CI . calc_accel_override ( CS . aEgo , self . sm [ ' longitudinalPlan ' ] . aTarget , CS . vEgo , self . sm [ ' longitudinalPlan ' ] . vTarget )
CC . hudControl . setSpeed = float ( self . v_cruise_kph * CV . KPH_TO_MS )
CC . hudControl . speedVisible = self . enabled
CC . hudControl . lanesVisible = self . enabled
CC . hudControl . leadVisible = self . sm [ ' longitudinalPlan ' ] . hasLead
right_lane_visible = self . sm [ ' lateralPlan ' ] . rProb > 0.5
left_lane_visible = self . sm [ ' lateralPlan ' ] . lProb > 0.5
CC . hudControl . rightLaneVisible = bool ( right_lane_visible )
CC . hudControl . leftLaneVisible = bool ( left_lane_visible )
recent_blinker = ( self . sm . frame - self . last_blinker_frame ) * DT_CTRL < 5.0 # 5s blinker cooldown
ldw_allowed = self . is_ldw_enabled and CS . vEgo > LDW_MIN_SPEED and not recent_blinker \
and not self . active and self . sm [ ' liveCalibration ' ] . calStatus == Calibration . CALIBRATED
meta = self . sm [ ' modelV2 ' ] . meta
if len ( meta . desirePrediction ) and ldw_allowed :
l_lane_change_prob = meta . desirePrediction [ Desire . laneChangeLeft - 1 ]
r_lane_change_prob = meta . desirePrediction [ Desire . laneChangeRight - 1 ]
l_lane_close = left_lane_visible and ( self . sm [ ' modelV2 ' ] . laneLines [ 1 ] . y [ 0 ] > - ( 1.08 + CAMERA_OFFSET ) )
r_lane_close = right_lane_visible and ( self . sm [ ' modelV2 ' ] . laneLines [ 2 ] . y [ 0 ] < ( 1.08 - CAMERA_OFFSET ) )
CC . hudControl . leftLaneDepart = bool ( l_lane_change_prob > LANE_DEPARTURE_THRESHOLD and l_lane_close )
CC . hudControl . rightLaneDepart = bool ( r_lane_change_prob > LANE_DEPARTURE_THRESHOLD and r_lane_close )
if CC . hudControl . rightLaneDepart or CC . hudControl . leftLaneDepart :
self . events . add ( EventName . ldw )
clear_event = ET . WARNING if ET . WARNING not in self . current_alert_types else None
alerts = self . events . create_alerts ( self . current_alert_types , [ self . CP , self . sm , self . is_metric ] )
self . AM . add_many ( self . sm . frame , alerts , self . enabled )
self . AM . process_alerts ( self . sm . frame , clear_event )
CC . hudControl . visualAlert = self . AM . visual_alert
if not self . read_only :
# send car controls over can
can_sends = self . CI . apply ( CC )
self . pm . send ( ' sendcan ' , can_list_to_can_capnp ( can_sends , msgtype = ' sendcan ' , valid = CS . canValid ) )
force_decel = ( self . sm [ ' driverMonitoringState ' ] . awarenessStatus < 0. ) or \
( self . state == State . softDisabling )
steer_angle_rad = ( CS . steeringAngleDeg - self . sm [ ' lateralPlan ' ] . angleOffsetDeg ) * CV . DEG_TO_RAD
# controlsState
dat = messaging . new_message ( ' controlsState ' )
dat . valid = CS . canValid
controlsState = dat . controlsState
controlsState . alertText1 = self . AM . alert_text_1
controlsState . alertText2 = self . AM . alert_text_2
controlsState . alertSize = self . AM . alert_size
controlsState . alertStatus = self . AM . alert_status
controlsState . alertBlinkingRate = self . AM . alert_rate
controlsState . alertType = self . AM . alert_type
controlsState . alertSound = self . AM . audible_alert
controlsState . canMonoTimes = list ( CS . canMonoTimes )
controlsState . longitudinalPlanMonoTime = self . sm . logMonoTime [ ' longitudinalPlan ' ]
controlsState . lateralPlanMonoTime = self . sm . logMonoTime [ ' lateralPlan ' ]
controlsState . enabled = self . enabled
controlsState . active = self . active
controlsState . curvature = self . VM . calc_curvature ( steer_angle_rad , CS . vEgo )
controlsState . state = self . state
controlsState . engageable = not self . events . any ( ET . NO_ENTRY )
controlsState . longControlState = self . LoC . long_control_state
controlsState . vPid = float ( self . LoC . v_pid )
controlsState . vCruise = float ( self . v_cruise_kph )
controlsState . upAccelCmd = float ( self . LoC . pid . p )
controlsState . uiAccelCmd = float ( self . LoC . pid . i )
controlsState . ufAccelCmd = float ( self . LoC . pid . f )
controlsState . steeringAngleDesiredDeg = float ( self . LaC . angle_steers_des )
controlsState . vTargetLead = float ( v_acc )
controlsState . aTarget = float ( a_acc )
controlsState . cumLagMs = - self . rk . remaining * 1000.
controlsState . startMonoTime = int ( start_time * 1e9 )
controlsState . forceDecel = bool ( force_decel )
controlsState . canErrorCounter = self . can_error_counter
if self . CP . lateralTuning . which ( ) == ' pid ' :
controlsState . lateralControlState . pidState = lac_log
elif self . CP . lateralTuning . which ( ) == ' lqr ' :
controlsState . lateralControlState . lqrState = lac_log
elif self . CP . lateralTuning . which ( ) == ' indi ' :
controlsState . lateralControlState . indiState = lac_log
self . pm . send ( ' controlsState ' , dat )
# carState
car_events = self . events . to_msg ( )
cs_send = messaging . new_message ( ' carState ' )
cs_send . valid = CS . canValid
cs_send . carState = CS
cs_send . carState . events = car_events
self . pm . send ( ' carState ' , cs_send )
# carEvents - logged every second or on change
if ( self . sm . frame % int ( 1. / DT_CTRL ) == 0 ) or ( self . events . names != self . events_prev ) :
ce_send = messaging . new_message ( ' carEvents ' , len ( self . events ) )
ce_send . carEvents = car_events
self . pm . send ( ' carEvents ' , ce_send )
self . events_prev = self . events . names . copy ( )
# carParams - logged every 50 seconds (> 1 per segment)
if ( self . sm . frame % int ( 50. / DT_CTRL ) == 0 ) :
cp_send = messaging . new_message ( ' carParams ' )
cp_send . carParams = self . CP
self . pm . send ( ' carParams ' , cp_send )
# carControl
cc_send = messaging . new_message ( ' carControl ' )
cc_send . valid = CS . canValid
cc_send . carControl = CC
self . pm . send ( ' carControl ' , cc_send )
# copy CarControl to pass to CarInterface on the next iteration
self . CC = CC
def step ( self ) :
start_time = sec_since_boot ( )
self . prof . checkpoint ( " Ratekeeper " , ignore = True )
# Sample data from sockets and get a carState
CS = self . data_sample ( )
self . prof . checkpoint ( " Sample " )
self . update_events ( CS )
if not self . read_only :
# Update control state
self . state_transition ( CS )
self . prof . checkpoint ( " State transition " )
# Compute actuators (runs PID loops and lateral MPC)
actuators , v_acc , a_acc , lac_log = self . state_control ( CS )
self . prof . checkpoint ( " State Control " )
# Publish data
self . publish_logs ( CS , start_time , actuators , v_acc , a_acc , lac_log )
self . prof . checkpoint ( " Sent " )
def controlsd_thread ( self ) :
while True :
self . step ( )
self . rk . monitor_time ( )
self . prof . display ( )
def main ( sm = None , pm = None , logcan = None ) :
controls = Controls ( sm , pm , logcan )
controls . controlsd_thread ( )
if __name__ == " __main__ " :
main ( )