|
|
@ -1,14 +1,15 @@ |
|
|
|
import os |
|
|
|
|
|
|
|
import copy |
|
|
|
import copy |
|
|
|
|
|
|
|
import os |
|
|
|
import json |
|
|
|
import json |
|
|
|
from typing import List, Optional |
|
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
from dataclasses import dataclass |
|
|
|
|
|
|
|
from typing import List, Dict, Optional |
|
|
|
|
|
|
|
|
|
|
|
from cereal import car, log |
|
|
|
from cereal import car, log |
|
|
|
from common.basedir import BASEDIR |
|
|
|
from common.basedir import BASEDIR |
|
|
|
from common.params import Params |
|
|
|
from common.params import Params |
|
|
|
from common.realtime import DT_CTRL |
|
|
|
from common.realtime import DT_CTRL |
|
|
|
from selfdrive.controls.lib.events import Alert |
|
|
|
from selfdrive.controls.lib.events import Alert |
|
|
|
from selfdrive.swaglog import cloudlog |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) as f: |
|
|
|
with open(os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")) as f: |
|
|
@ -26,13 +27,20 @@ def set_offroad_alert(alert: str, show_alert: bool, extra_text: Optional[str] = |
|
|
|
Params().delete(alert) |
|
|
|
Params().delete(alert) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
|
|
|
|
|
class AlertEntry: |
|
|
|
|
|
|
|
alert: Optional[Alert] = None |
|
|
|
|
|
|
|
start_frame: int = -1 |
|
|
|
|
|
|
|
end_frame: int = -1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AlertManager: |
|
|
|
class AlertManager: |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
def __init__(self): |
|
|
|
self.activealerts: List[Alert] = [] |
|
|
|
self.reset() |
|
|
|
self.clear_current_alert() |
|
|
|
self.activealerts: Dict[str, AlertEntry] = defaultdict(AlertEntry) |
|
|
|
|
|
|
|
|
|
|
|
def clear_current_alert(self) -> None: |
|
|
|
def reset(self) -> None: |
|
|
|
self.alert_type: str = "" |
|
|
|
self.alert_type: str = "" |
|
|
|
self.alert_text_1: str = "" |
|
|
|
self.alert_text_1: str = "" |
|
|
|
self.alert_text_2: str = "" |
|
|
|
self.alert_text_2: str = "" |
|
|
@ -44,42 +52,36 @@ class AlertManager: |
|
|
|
|
|
|
|
|
|
|
|
def add_many(self, frame: int, alerts: List[Alert], enabled: bool = True) -> None: |
|
|
|
def add_many(self, frame: int, alerts: List[Alert], enabled: bool = True) -> None: |
|
|
|
for alert in alerts: |
|
|
|
for alert in alerts: |
|
|
|
added_alert = copy.copy(alert) |
|
|
|
alert_duration = max(alert.duration_sound, alert.duration_hud_alert, alert.duration_text) |
|
|
|
added_alert.start_time = frame * DT_CTRL |
|
|
|
self.activealerts[alert.alert_type].alert = alert |
|
|
|
|
|
|
|
self.activealerts[alert.alert_type].start_frame = frame |
|
|
|
# if new alert is higher priority, log it |
|
|
|
self.activealerts[alert.alert_type].end_frame = frame + int(alert_duration / DT_CTRL) |
|
|
|
if not len(self.activealerts) or added_alert.alert_priority > self.activealerts[0].alert_priority: |
|
|
|
|
|
|
|
cloudlog.event('alert_add', alert_type=added_alert.alert_type, enabled=enabled) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.activealerts.append(added_alert) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_alerts(self, frame: int, clear_event_type=None) -> None: |
|
|
|
def process_alerts(self, frame: int, clear_event_type=None) -> None: |
|
|
|
cur_time = frame * DT_CTRL |
|
|
|
current_alert = AlertEntry() |
|
|
|
|
|
|
|
for k, v in self.activealerts.items(): |
|
|
|
# first get rid of all the expired alerts |
|
|
|
if v.alert is None: |
|
|
|
self.activealerts = [a for a in self.activealerts if a.event_type != clear_event_type and |
|
|
|
continue |
|
|
|
a.start_time + max(a.duration_sound, a.duration_hud_alert, a.duration_text) > cur_time] |
|
|
|
|
|
|
|
|
|
|
|
if v.alert.event_type == clear_event_type: |
|
|
|
# sort by priority first and then by start_time |
|
|
|
self.activealerts[k].end_frame = -1 |
|
|
|
self.activealerts.sort(key=lambda k: (k.alert_priority, k.start_time), reverse=True) |
|
|
|
|
|
|
|
|
|
|
|
# sort by priority first and then by start_frame |
|
|
|
# start with assuming no alerts |
|
|
|
active = self.activealerts[k].end_frame > frame |
|
|
|
self.clear_current_alert() |
|
|
|
greater = current_alert.alert is None or (v.alert.priority, v.start_frame) > (current_alert.alert.priority, current_alert.start_frame) |
|
|
|
|
|
|
|
if active and greater: |
|
|
|
if len(self.activealerts): |
|
|
|
current_alert = v |
|
|
|
current_alert = self.activealerts[0] |
|
|
|
|
|
|
|
|
|
|
|
# clear current alert |
|
|
|
self.alert_type = current_alert.alert_type |
|
|
|
self.reset() |
|
|
|
|
|
|
|
|
|
|
|
if current_alert.start_time + current_alert.duration_sound > cur_time: |
|
|
|
a = current_alert.alert |
|
|
|
self.audible_alert = current_alert.audible_alert |
|
|
|
if a is not None: |
|
|
|
|
|
|
|
self.alert_type = a.alert_type |
|
|
|
if current_alert.start_time + current_alert.duration_hud_alert > cur_time: |
|
|
|
self.audible_alert = a.audible_alert |
|
|
|
self.visual_alert = current_alert.visual_alert |
|
|
|
self.visual_alert = a.visual_alert |
|
|
|
|
|
|
|
self.alert_text_1 = a.alert_text_1 |
|
|
|
if current_alert.start_time + current_alert.duration_text > cur_time: |
|
|
|
self.alert_text_2 = a.alert_text_2 |
|
|
|
self.alert_text_1 = current_alert.alert_text_1 |
|
|
|
self.alert_status = a.alert_status |
|
|
|
self.alert_text_2 = current_alert.alert_text_2 |
|
|
|
self.alert_size = a.alert_size |
|
|
|
self.alert_status = current_alert.alert_status |
|
|
|
self.alert_rate = a.alert_rate |
|
|
|
self.alert_size = current_alert.alert_size |
|
|
|
|
|
|
|
self.alert_rate = current_alert.alert_rate |
|
|
|
|
|
|
|