@ -14,9 +14,12 @@ from openpilot.common.realtime import DT_CTRL
from openpilot . selfdrive . boardd . boardd import can_list_to_can_capnp
from openpilot . selfdrive . car . car_helpers import get_car , get_one_can
from openpilot . selfdrive . car . interfaces import CarInterfaceBase
from openpilot . selfdrive . controls . lib . events import Events
REPLAY = " REPLAY " in os . environ
EventName = car . CarEvent . EventName
class CarD :
CI : CarInterfaceBase
@ -47,9 +50,9 @@ class CarD:
self . CI , self . CP = CI , CI . CP
# set alternative experiences from parameters
disengage_on_accelerator = self . params . get_bool ( " DisengageOnAccelerator " )
self . disengage_on_accelerator = self . params . get_bool ( " DisengageOnAccelerator " )
self . CP . alternativeExperience = 0
if not disengage_on_accelerator :
if not self . disengage_on_accelerator :
self . CP . alternativeExperience | = ALTERNATIVE_EXPERIENCE . DISABLE_DISENGAGE_ON_GAS
openpilot_enabled_toggle = self . params . get_bool ( " OpenpilotEnabledToggle " )
@ -73,6 +76,9 @@ class CarD:
self . params . put_nonblocking ( " CarParamsCache " , cp_bytes )
self . params . put_nonblocking ( " CarParamsPersistent " , cp_bytes )
self . CS_prev = car . CarState . new_message ( )
self . events = Events ( )
def initialize ( self ) :
""" Initialize CarInterface, once controls are ready """
self . CI . init ( self . CP , self . can_sock , self . pm . sock [ ' sendcan ' ] )
@ -100,10 +106,26 @@ class CarD:
if can_rcv_valid and REPLAY :
self . can_log_mono_time = messaging . log_from_bytes ( can_strs [ 0 ] ) . logMonoTime
self . update_events ( CS )
self . state_publish ( CS )
CS = CS . as_reader ( )
self . CS_prev = CS
return CS
def update_events ( self , CS : car . CarState ) - > car . CarState :
self . events . clear ( )
self . events . add_from_msg ( CS . events )
# Disable on rising edge of accelerator or brake. Also disable on brake when speed > 0
if ( CS . gasPressed and not self . CS_prev . gasPressed and self . disengage_on_accelerator ) or \
( CS . brakePressed and ( not self . CS_prev . brakePressed or not CS . standstill ) ) or \
( CS . regenBraking and ( not self . CS_prev . regenBraking or not CS . standstill ) ) :
self . events . add ( EventName . pedalPressed )
CS . events = self . events . to_msg ( )
def state_publish ( self , CS : car . CarState ) :
""" carState and carParams publish loop """