|
|
|
@ -1,3 +1,4 @@ |
|
|
|
|
import math |
|
|
|
|
import os |
|
|
|
|
from enum import IntEnum |
|
|
|
|
from typing import Dict, Union, Callable, List, Optional |
|
|
|
@ -141,8 +142,10 @@ class Alert: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NoEntryAlert(Alert): |
|
|
|
|
def __init__(self, alert_text_2: str, visual_alert: car.CarControl.HUDControl.VisualAlert=VisualAlert.none): |
|
|
|
|
super().__init__("openpilot Unavailable", alert_text_2, AlertStatus.normal, |
|
|
|
|
def __init__(self, alert_text_2: str, |
|
|
|
|
alert_text_1: str = "openpilot Unavailable", |
|
|
|
|
visual_alert: car.CarControl.HUDControl.VisualAlert=VisualAlert.none): |
|
|
|
|
super().__init__(alert_text_1, alert_text_2, AlertStatus.normal, |
|
|
|
|
AlertSize.mid, Priority.LOW, visual_alert, |
|
|
|
|
AudibleAlert.refuse, 3.) |
|
|
|
|
|
|
|
|
@ -201,35 +204,35 @@ def get_display_speed(speed_ms: float, metric: bool) -> str: |
|
|
|
|
|
|
|
|
|
# ********** alert callback functions ********** |
|
|
|
|
|
|
|
|
|
AlertCallbackType = Callable[[car.CarParams, messaging.SubMaster, bool, int], Alert] |
|
|
|
|
AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int], Alert] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def soft_disable_alert(alert_text_2: str) -> AlertCallbackType: |
|
|
|
|
def func(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
if soft_disable_time < int(0.5 / DT_CTRL): |
|
|
|
|
return ImmediateDisableAlert(alert_text_2) |
|
|
|
|
return SoftDisableAlert(alert_text_2) |
|
|
|
|
return func |
|
|
|
|
|
|
|
|
|
def user_soft_disable_alert(alert_text_2: str) -> AlertCallbackType: |
|
|
|
|
def func(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
if soft_disable_time < int(0.5 / DT_CTRL): |
|
|
|
|
return ImmediateDisableAlert(alert_text_2) |
|
|
|
|
return UserSoftDisableAlert(alert_text_2) |
|
|
|
|
return func |
|
|
|
|
|
|
|
|
|
def startup_master_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
branch = get_short_branch("") |
|
|
|
|
if "REPLAY" in os.environ: |
|
|
|
|
branch = "replay" |
|
|
|
|
|
|
|
|
|
return StartupAlert("WARNING: This branch is not tested", branch, alert_status=AlertStatus.userPrompt) |
|
|
|
|
|
|
|
|
|
def below_engage_speed_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
return NoEntryAlert(f"Speed Below {get_display_speed(CP.minEnableSpeed, metric)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def below_steer_speed_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
return Alert( |
|
|
|
|
f"Steer Unavailable Below {get_display_speed(CP.minSteerSpeed, metric)}", |
|
|
|
|
"", |
|
|
|
@ -237,7 +240,7 @@ def below_steer_speed_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: |
|
|
|
|
Priority.MID, VisualAlert.steerRequired, AudibleAlert.prompt, 0.4) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calibration_incomplete_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
return Alert( |
|
|
|
|
"Calibration in Progress: %d%%" % sm['liveCalibration'].calPerc, |
|
|
|
|
f"Drive Above {get_display_speed(MIN_SPEED_FILTER, metric)}", |
|
|
|
@ -245,7 +248,7 @@ def calibration_incomplete_alert(CP: car.CarParams, sm: messaging.SubMaster, met |
|
|
|
|
Priority.LOWEST, VisualAlert.none, AudibleAlert.none, .2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def no_gps_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def no_gps_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
gps_integrated = sm['peripheralState'].pandaType in (log.PandaState.PandaType.uno, log.PandaState.PandaType.dos) |
|
|
|
|
return Alert( |
|
|
|
|
"Poor GPS reception", |
|
|
|
@ -255,39 +258,66 @@ def no_gps_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_ |
|
|
|
|
|
|
|
|
|
# *** debug alerts *** |
|
|
|
|
|
|
|
|
|
def out_of_space_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
full_perc = round(100. - sm['deviceState'].freeSpacePercent) |
|
|
|
|
return NormalPermanentAlert("Out of Storage", f"{full_perc}% full") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def overheat_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
mdl = sm['modelV2'].velocity.x[0] if len(sm['modelV2'].velocity.x) else math.nan |
|
|
|
|
err = CS.vEgo - mdl |
|
|
|
|
msg = f"Speed Error: {err:.1f} m/s" |
|
|
|
|
return NoEntryAlert(msg, alert_text_1="Posenet Speed Invalid") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
not_running = [p.name for p in sm['managerState'].processes if not p.running and p.shouldBeRunning] |
|
|
|
|
msg = ', '.join(not_running) |
|
|
|
|
return NoEntryAlert(msg, alert_text_1="Process Not Running") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
bs = [s for s in sm.data.keys() if not sm.all_checks([s, ])] |
|
|
|
|
msg = ', '.join(bs[:4]) # can't fit too many on one line |
|
|
|
|
return NoEntryAlert(msg, alert_text_1="Communication Issue Between Processes") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
rpy = sm['liveCalibration'].rpyCalib |
|
|
|
|
yaw = math.degrees(rpy[2] if len(rpy) == 3 else math.nan) |
|
|
|
|
pitch = math.degrees(rpy[1] if len(rpy) == 3 else math.nan) |
|
|
|
|
angles = f"Pitch: {pitch:.1f}°, Yaw: {yaw:.1f}°" |
|
|
|
|
return NormalPermanentAlert("Calibration Invalid", angles) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
cpu = max(sm['deviceState'].cpuTempC, default=0.) |
|
|
|
|
gpu = max(sm['deviceState'].gpuTempC, default=0.) |
|
|
|
|
temp = max((cpu, gpu, sm['deviceState'].memoryTempC)) |
|
|
|
|
return NormalPermanentAlert("System Overheated", f"{temp:.0f} °C") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def low_memory_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
return NormalPermanentAlert("Low Memory", f"{sm['deviceState'].memoryUsagePercent}% used") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def high_cpu_usage_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
x = max(sm['deviceState'].cpuUsagePercent, default=0.) |
|
|
|
|
return NormalPermanentAlert("High CPU Usage", f"{x}% used") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def modeld_lagging_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
return NormalPermanentAlert("Driving model lagging", f"{sm['modelV2'].frameDropPerc:.1f}% frames dropped") |
|
|
|
|
def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
return NormalPermanentAlert("Driving Model Lagging", f"{sm['modelV2'].frameDropPerc:.1f}% frames dropped") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def wrong_car_mode_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
text = "Cruise Mode Disabled" |
|
|
|
|
if CP.carName == "honda": |
|
|
|
|
text = "Main Switch Off" |
|
|
|
|
return NoEntryAlert(text) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def joystick_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: |
|
|
|
|
axes = sm['testJoystick'].axes |
|
|
|
|
gb, steer = list(axes)[:2] if len(axes) else (0., 0.) |
|
|
|
|
vals = f"Gas: {round(gb * 100.)}%, Steer: {round(steer * 100.)}%" |
|
|
|
@ -653,7 +683,7 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { |
|
|
|
|
# and attaching while making sure the device is pointed straight forward and is level. |
|
|
|
|
# See https://comma.ai/setup for more information |
|
|
|
|
EventName.calibrationInvalid: { |
|
|
|
|
ET.PERMANENT: NormalPermanentAlert("Calibration Invalid", "Remount Device and Recalibrate"), |
|
|
|
|
ET.PERMANENT: calibration_invalid_alert, |
|
|
|
|
ET.SOFT_DISABLE: soft_disable_alert("Calibration Invalid: Remount Device & Recalibrate"), |
|
|
|
|
ET.NO_ENTRY: NoEntryAlert("Calibration Invalid: Remount Device & Recalibrate"), |
|
|
|
|
}, |
|
|
|
@ -690,7 +720,7 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { |
|
|
|
|
# ten times the regular interval, or the average interval is more than 10% too high. |
|
|
|
|
EventName.commIssue: { |
|
|
|
|
ET.SOFT_DISABLE: soft_disable_alert("Communication Issue between Processes"), |
|
|
|
|
ET.NO_ENTRY: NoEntryAlert("Communication Issue between Processes"), |
|
|
|
|
ET.NO_ENTRY: comm_issue_alert, |
|
|
|
|
}, |
|
|
|
|
EventName.commIssueAvgFreq: { |
|
|
|
|
ET.SOFT_DISABLE: soft_disable_alert("Low Communication Rate between Processes"), |
|
|
|
@ -704,7 +734,7 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { |
|
|
|
|
|
|
|
|
|
# Thrown when manager detects a service exited unexpectedly while driving |
|
|
|
|
EventName.processNotRunning: { |
|
|
|
|
ET.NO_ENTRY: NoEntryAlert("System Malfunction: Reboot Your Device"), |
|
|
|
|
ET.NO_ENTRY: process_not_running_alert, |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
|
EventName.radarFault: { |
|
|
|
@ -716,8 +746,8 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { |
|
|
|
|
# is not processing frames fast enough they have to be dropped. This alert is |
|
|
|
|
# thrown when over 20% of frames are dropped. |
|
|
|
|
EventName.modeldLagging: { |
|
|
|
|
ET.SOFT_DISABLE: soft_disable_alert("Driving model lagging"), |
|
|
|
|
ET.NO_ENTRY: NoEntryAlert("Driving model lagging"), |
|
|
|
|
ET.SOFT_DISABLE: soft_disable_alert("Driving Model Lagging"), |
|
|
|
|
ET.NO_ENTRY: NoEntryAlert("Driving Model Lagging"), |
|
|
|
|
ET.PERMANENT: modeld_lagging_alert, |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
@ -727,8 +757,8 @@ EVENTS: Dict[int, Dict[str, Union[Alert, AlertCallbackType]]] = { |
|
|
|
|
# usually means the model has trouble understanding the scene. This is used |
|
|
|
|
# as a heuristic to warn the driver. |
|
|
|
|
EventName.posenetInvalid: { |
|
|
|
|
ET.SOFT_DISABLE: soft_disable_alert("Model Output Uncertain"), |
|
|
|
|
ET.NO_ENTRY: NoEntryAlert("Model Output Uncertain"), |
|
|
|
|
ET.SOFT_DISABLE: soft_disable_alert("Posenet Speed Invalid"), |
|
|
|
|
ET.NO_ENTRY: posenet_invalid_alert, |
|
|
|
|
}, |
|
|
|
|
|
|
|
|
|
# When the localizer detects an acceleration of more than 40 m/s^2 (~4G) we |
|
|
|
|