diff --git a/cereal/car.capnp b/cereal/car.capnp index cbfb330c22..d945fc468e 100644 --- a/cereal/car.capnp +++ b/cereal/car.capnp @@ -117,6 +117,7 @@ struct CarEvent @0x9b1657f34caf3ad3 { paramsdPermanentError @119; actuatorsApiUnavailable @120; espActive @121; + personalityChanged @122; radarCanErrorDEPRECATED @15; communityFeatureDisallowedDEPRECATED @62; diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index cd957c731a..f5af9573d5 100755 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -655,6 +655,7 @@ class Controls: if any(not be.pressed and be.type == ButtonType.gapAdjustCruise for be in CS.buttonEvents): self.personality = (self.personality - 1) % 3 self.params.put_nonblocking('LongitudinalPersonality', str(self.personality)) + self.events.add(EventName.personalityChanged) return CC, lac_log @@ -714,7 +715,8 @@ class Controls: if self.enabled: clear_event_types.add(ET.NO_ENTRY) - alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer]) + pers = {v: k for k, v in log.LongitudinalPersonality.schema.enumerants.items()}[self.personality] + alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer, pers]) self.AM.add_many(self.sm.frame, alerts) current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types) if current_alert: diff --git a/selfdrive/controls/lib/events.py b/selfdrive/controls/lib/events.py index 2ef4774e87..3f4f15024d 100755 --- a/selfdrive/controls/lib/events.py +++ b/selfdrive/controls/lib/events.py @@ -205,35 +205,35 @@ def get_display_speed(speed_ms: float, metric: bool) -> str: # ********** alert callback functions ********** -AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int], Alert] +AlertCallbackType = Callable[[car.CarParams, car.CarState, messaging.SubMaster, bool, int, log.ControlsState], Alert] def soft_disable_alert(alert_text_2: str) -> AlertCallbackType: - def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: + def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: if soft_disable_time < int(0.5 / DT_CTRL): return ImmediateDisableAlert(alert_text_2) return SoftDisableAlert(alert_text_2) return func def user_soft_disable_alert(alert_text_2: str) -> AlertCallbackType: - def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: + def func(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: if soft_disable_time < int(0.5 / DT_CTRL): return ImmediateDisableAlert(alert_text_2) return UserSoftDisableAlert(alert_text_2) return func -def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def startup_master_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: branch = get_short_branch() # Ensure get_short_branch is cached to avoid lags on startup if "REPLAY" in os.environ: branch = "replay" return StartupAlert("WARNING: This branch is not tested", branch, alert_status=AlertStatus.userPrompt) -def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def below_engage_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: return NoEntryAlert(f"Drive above {get_display_speed(CP.minEnableSpeed, metric)} to engage") -def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: return Alert( f"Steer Unavailable Below {get_display_speed(CP.minSteerSpeed, metric)}", "", @@ -241,7 +241,7 @@ def below_steer_speed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.S Priority.LOW, VisualAlert.steerRequired, AudibleAlert.prompt, 0.4) -def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: first_word = 'Recalibration' if sm['liveCalibration'].calStatus == log.LiveCalibrationData.Status.recalibrating else 'Calibration' return Alert( f"{first_word} in Progress: {sm['liveCalibration'].calPerc:.0f}%", @@ -252,37 +252,37 @@ def calibration_incomplete_alert(CP: car.CarParams, CS: car.CarState, sm: messag # *** debug alerts *** -def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def out_of_space_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: full_perc = round(100. - sm['deviceState'].freeSpacePercent) return NormalPermanentAlert("Out of Storage", f"{full_perc}% full") -def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def posenet_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: mdl = sm['modelV2'].velocity.x[0] if len(sm['modelV2'].velocity.x) else math.nan err = CS.vEgo - mdl msg = f"Speed Error: {err:.1f} m/s" return NoEntryAlert(msg, alert_text_1="Posenet Speed Invalid") -def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def process_not_running_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: not_running = [p.name for p in sm['managerState'].processes if not p.running and p.shouldBeRunning] msg = ', '.join(not_running) return NoEntryAlert(msg, alert_text_1="Process Not Running") -def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def comm_issue_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: bs = [s for s in sm.data.keys() if not sm.all_checks([s, ])] msg = ', '.join(bs[:4]) # can't fit too many on one line return NoEntryAlert(msg, alert_text_1="Communication Issue Between Processes") -def camera_malfunction_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def camera_malfunction_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: all_cams = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState') bad_cams = [s.replace('State', '') for s in all_cams if s in sm.data.keys() and not sm.all_checks([s, ])] return NormalPermanentAlert("Camera Malfunction", ', '.join(bad_cams)) -def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: rpy = sm['liveCalibration'].rpyCalib yaw = math.degrees(rpy[2] if len(rpy) == 3 else math.nan) pitch = math.degrees(rpy[1] if len(rpy) == 3 else math.nan) @@ -290,39 +290,43 @@ def calibration_invalid_alert(CP: car.CarParams, CS: car.CarState, sm: messaging return NormalPermanentAlert("Calibration Invalid", angles) -def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def overheat_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: cpu = max(sm['deviceState'].cpuTempC, default=0.) gpu = max(sm['deviceState'].gpuTempC, default=0.) temp = max((cpu, gpu, sm['deviceState'].memoryTempC)) return NormalPermanentAlert("System Overheated", f"{temp:.0f} °C") -def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def low_memory_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: return NormalPermanentAlert("Low Memory", f"{sm['deviceState'].memoryUsagePercent}% used") -def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def high_cpu_usage_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: x = max(sm['deviceState'].cpuUsagePercent, default=0.) return NormalPermanentAlert("High CPU Usage", f"{x}% used") -def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def modeld_lagging_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: return NormalPermanentAlert("Driving Model Lagging", f"{sm['modelV2'].frameDropPerc:.1f}% frames dropped") -def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def wrong_car_mode_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: text = "Enable Adaptive Cruise to Engage" if CP.carName == "honda": text = "Enable Main Switch to Engage" return NoEntryAlert(text) -def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int) -> Alert: +def joystick_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: axes = sm['testJoystick'].axes gb, steer = list(axes)[:2] if len(axes) else (0., 0.) vals = f"Gas: {round(gb * 100.)}%, Steer: {round(steer * 100.)}%" return NormalPermanentAlert("Joystick Mode", vals) +def personality_changed_alert(CP: car.CarParams, CS: car.CarState, sm: messaging.SubMaster, metric: bool, soft_disable_time: int, personality) -> Alert: + personality = str(personality).title() + return NormalPermanentAlert(f"Driving Personality: {personality}", duration=1.5) + EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = { @@ -955,6 +959,10 @@ EVENTS: dict[int, dict[str, Alert | AlertCallbackType]] = { ET.NO_ENTRY: NoEntryAlert("Vehicle Sensors Calibrating"), }, + EventName.personalityChanged: { + ET.WARNING: personality_changed_alert, + }, + } @@ -973,7 +981,7 @@ if __name__ == '__main__': for i, alerts in EVENTS.items(): for et, alert in alerts.items(): if callable(alert): - alert = alert(CP, CS, sm, False, 1) + alert = alert(CP, CS, sm, False, 1, log.LongitudinalPersonality.standard) alerts_by_type[et][alert.priority].append(event_names[i]) all_alerts: dict[str, list[tuple[Priority, list[str]]]] = {}