@ -205,35 +205,35 @@ def get_display_speed(speed_ms: float, metric: bool) -> str:
# ********** alert callback functions **********
AlertCallbackType = Callable [ [ car . CarParams , car . CarState , messaging . SubMaster , bool , int ] , Alert ]
AlertCallbackType = Callable [ [ car . CarParams , car . CarState , messaging . SubMaster , bool , int , log . ControlsState ] , Alert ]
def soft_disable_alert ( alert_text_2 : str ) - > AlertCallbackType :
def func ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def func ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
if soft_disable_time < int ( 0.5 / DT_CTRL ) :
return ImmediateDisableAlert ( alert_text_2 )
return SoftDisableAlert ( alert_text_2 )
return func
def user_soft_disable_alert ( alert_text_2 : str ) - > AlertCallbackType :
def func ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def func ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
if soft_disable_time < int ( 0.5 / DT_CTRL ) :
return ImmediateDisableAlert ( alert_text_2 )
return UserSoftDisableAlert ( alert_text_2 )
return func
def startup_master_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def startup_master_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
branch = get_short_branch ( ) # Ensure get_short_branch is cached to avoid lags on startup
if " REPLAY " in os . environ :
branch = " replay "
return StartupAlert ( " WARNING: This branch is not tested " , branch , alert_status = AlertStatus . userPrompt )
def below_engage_speed_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def below_engage_speed_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
return NoEntryAlert ( f " Drive above { get_display_speed ( CP . minEnableSpeed , metric ) } to engage " )
def below_steer_speed_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def below_steer_speed_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
return Alert (
f " Steer Unavailable Below { get_display_speed ( CP . minSteerSpeed , metric ) } " ,
" " ,
@ -241,7 +241,7 @@ def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.S
Priority . LOW , VisualAlert . steerRequired , AudibleAlert . prompt , 0.4 )
def calibration_incomplete_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def calibration_incomplete_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
first_word = ' Recalibration ' if sm [ ' liveCalibration ' ] . calStatus == log . LiveCalibrationData . Status . recalibrating else ' Calibration '
return Alert (
f " { first_word } in Progress: { sm [ ' liveCalibration ' ] . calPerc : .0f } % " ,
@ -252,37 +252,37 @@ def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messag
# *** debug alerts ***
def out_of_space_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def out_of_space_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
full_perc = round ( 100. - sm [ ' deviceState ' ] . freeSpacePercent )
return NormalPermanentAlert ( " Out of Storage " , f " { full_perc } % full " )
def posenet_invalid_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def posenet_invalid_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
mdl = sm [ ' modelV2 ' ] . velocity . x [ 0 ] if len ( sm [ ' modelV2 ' ] . velocity . x ) else math . nan
err = CS . vEgo - mdl
msg = f " Speed Error: { err : .1f } m/s "
return NoEntryAlert ( msg , alert_text_1 = " Posenet Speed Invalid " )
def process_not_running_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def process_not_running_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
not_running = [ p . name for p in sm [ ' managerState ' ] . processes if not p . running and p . shouldBeRunning ]
msg = ' , ' . join ( not_running )
return NoEntryAlert ( msg , alert_text_1 = " Process Not Running " )
def comm_issue_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def comm_issue_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
bs = [ s for s in sm . data . keys ( ) if not sm . all_checks ( [ s , ] ) ]
msg = ' , ' . join ( bs [ : 4 ] ) # can't fit too many on one line
return NoEntryAlert ( msg , alert_text_1 = " Communication Issue Between Processes " )
def camera_malfunction_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def camera_malfunction_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
all_cams = ( ' roadCameraState ' , ' driverCameraState ' , ' wideRoadCameraState ' )
bad_cams = [ s . replace ( ' State ' , ' ' ) for s in all_cams if s in sm . data . keys ( ) and not sm . all_checks ( [ s , ] ) ]
return NormalPermanentAlert ( " Camera Malfunction " , ' , ' . join ( bad_cams ) )
def calibration_invalid_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def calibration_invalid_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
rpy = sm [ ' liveCalibration ' ] . rpyCalib
yaw = math . degrees ( rpy [ 2 ] if len ( rpy ) == 3 else math . nan )
pitch = math . degrees ( rpy [ 1 ] if len ( rpy ) == 3 else math . nan )
@ -290,39 +290,43 @@ def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging
return NormalPermanentAlert ( " Calibration Invalid " , angles )
def overheat_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def overheat_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
cpu = max ( sm [ ' deviceState ' ] . cpuTempC , default = 0. )
gpu = max ( sm [ ' deviceState ' ] . gpuTempC , default = 0. )
temp = max ( ( cpu , gpu , sm [ ' deviceState ' ] . memoryTempC ) )
return NormalPermanentAlert ( " System Overheated " , f " { temp : .0f } °C " )
def low_memory_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def low_memory_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
return NormalPermanentAlert ( " Low Memory " , f " { sm [ ' deviceState ' ] . memoryUsagePercent } % used " )
def high_cpu_usage_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def high_cpu_usage_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
x = max ( sm [ ' deviceState ' ] . cpuUsagePercent , default = 0. )
return NormalPermanentAlert ( " High CPU Usage " , f " { x } % used " )
def modeld_lagging_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def modeld_lagging_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
return NormalPermanentAlert ( " Driving Model Lagging " , f " { sm [ ' modelV2 ' ] . frameDropPerc : .1f } % frames dropped " )
def wrong_car_mode_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def wrong_car_mode_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
text = " Enable Adaptive Cruise to Engage "
if CP . carName == " honda " :
text = " Enable Main Switch to Engage "
return NoEntryAlert ( text )
def joystick_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int ) - > Alert :
def joystick_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
axes = sm [ ' testJoystick ' ] . axes
gb , steer = list ( axes ) [ : 2 ] if len ( axes ) else ( 0. , 0. )
vals = f " Gas: { round ( gb * 100. ) } %, Steer: { round ( steer * 100. ) } % "
return NormalPermanentAlert ( " Joystick Mode " , vals )
def personality_changed_alert ( CP : car . CarParams , CS : car . CarState , sm : messaging . SubMaster , metric : bool , soft_disable_time : int , personality ) - > Alert :
personality = str ( personality ) . title ( )
return NormalPermanentAlert ( f " Driving Personality: { personality } " , duration = 1.5 )
EVENTS : dict [ int , dict [ str , Alert | AlertCallbackType ] ] = {
@ -955,6 +959,10 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = {
ET . NO_ENTRY : NoEntryAlert ( " Vehicle Sensors Calibrating " ) ,
} ,
EventName . personalityChanged : {
ET . WARNING : personality_changed_alert ,
} ,
}
@ -973,7 +981,7 @@ if __name__ == '__main__':
for i , alerts in EVENTS . items ( ) :
for et , alert in alerts . items ( ) :
if callable ( alert ) :
alert = alert ( CP , CS , sm , False , 1 )
alert = alert ( CP , CS , sm , False , 1 , log . LongitudinalPersonality . standard )
alerts_by_type [ et ] [ alert . priority ] . append ( event_names [ i ] )
all_alerts : dict [ str , list [ tuple [ Priority , list [ str ] ] ] ] = { }