You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
62 lines
1.7 KiB
62 lines
1.7 KiB
import copy
|
|
import os
|
|
import json
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
|
|
from openpilot.common.basedir import BASEDIR
|
|
from openpilot.common.params import Params
|
|
from openpilot.selfdrive.selfdrived.events import Alert
|
|
|
|
|
|
with open(os.path.join(BASEDIR, "selfdrive/selfdrived/alerts_offroad.json")) as f:
|
|
OFFROAD_ALERTS = json.load(f)
|
|
|
|
|
|
def set_offroad_alert(alert: str, show_alert: bool, extra_text: str = None) -> None:
|
|
if show_alert:
|
|
a = copy.copy(OFFROAD_ALERTS[alert])
|
|
a['extra'] = extra_text or ''
|
|
Params().put(alert, json.dumps(a))
|
|
else:
|
|
Params().remove(alert)
|
|
|
|
|
|
@dataclass
|
|
class AlertEntry:
|
|
alert: Alert | None = None
|
|
start_frame: int = -1
|
|
end_frame: int = -1
|
|
|
|
def active(self, frame: int) -> bool:
|
|
return frame <= self.end_frame
|
|
|
|
class AlertManager:
|
|
def __init__(self):
|
|
self.alerts: dict[str, AlertEntry] = defaultdict(AlertEntry)
|
|
self.current_alert: Alert | None = None
|
|
|
|
def add_many(self, frame: int, alerts: list[Alert]) -> None:
|
|
for alert in alerts:
|
|
entry = self.alerts[alert.alert_type]
|
|
entry.alert = alert
|
|
if not entry.active(frame):
|
|
entry.start_frame = frame
|
|
min_end_frame = entry.start_frame + alert.duration
|
|
entry.end_frame = max(frame + 1, min_end_frame)
|
|
|
|
def process_alerts(self, frame: int, clear_event_types: set):
|
|
ae = AlertEntry()
|
|
for v in self.alerts.values():
|
|
if not v.alert:
|
|
continue
|
|
|
|
if v.alert.event_type in clear_event_types:
|
|
v.end_frame = -1
|
|
|
|
# sort by priority first and then by start_frame
|
|
greater = ae.alert is None or (v.alert.priority, v.start_frame) > (ae.alert.priority, ae.start_frame)
|
|
if v.active(frame) and greater:
|
|
ae = v
|
|
|
|
self.current_alert = ae.alert
|
|
|