import copy import os import json from collections import defaultdict from dataclasses import dataclass from openpilot.common.basedir import BASEDIR from openpilot.common.params import Params from openpilot.selfdrive.selfdrived.events import Alert, EmptyAlert with open(os.path.join(BASEDIR, "selfdrive/selfdrived/alerts_offroad.json")) as f: OFFROAD_ALERTS = json.load(f) def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None: if show_alert: a = copy.copy(OFFROAD_ALERTS[alert]) a['extra'] = extra_text or '' Params().put(alert, json.dumps(a)) else: Params().remove(alert) @dataclass class AlertEntry: alert: Alert | None = None start_frame: int = -1 end_frame: int = -1 added_frame: int = -1 def active(self, frame: int) -> bool: return frame <= self.end_frame def just_added(self, frame: int) -> bool: return self.active(frame) and frame == (self.added_frame + 1) class AlertManager: def __init__(self): self.alerts: dict[str, AlertEntry] = defaultdict(AlertEntry) self.current_alert = EmptyAlert self.current_audible_alert = EmptyAlert def add_many(self, frame: int, alerts: list[Alert]) -> None: for alert in alerts: entry = self.alerts[alert.alert_type] entry.alert = alert if not entry.just_added(frame): entry.start_frame = frame min_end_frame = entry.start_frame + alert.duration entry.end_frame = max(frame + 1, min_end_frame) entry.added_frame = frame def process_alerts(self, frame: int, clear_event_types: set): ae = AlertEntry() ae_audible = AlertEntry() for v in self.alerts.values(): if not v.alert: continue if v.alert.event_type in clear_event_types: v.end_frame = -1 # sort by priority first and then by start_frame greater = ae.alert is None or (v.alert.priority, v.start_frame) > (ae.alert.priority, ae.start_frame) if v.active(frame) and greater: ae = v ae_audible = v for v in self.alerts.values(): if not v.alert: continue if v.alert.event_type in clear_event_types: v.end_frame = -1 # sort by priority first and then by start_frame greater = ae_audible.alert is None or (v.alert.priority, v.start_frame) > (ae_audible.alert.priority, ae_audible.start_frame) if v.active(frame) and v.alert.alert_size == AlertSize.none and greater: ae_audible = v self.current_alert = ae.alert if ae.alert is not None else EmptyAlert self.current_alert = ae_audible.alert if ae_audible.alert is not None else EmptyAlert