|
|
|
@ -5,7 +5,6 @@ from collections import defaultdict |
|
|
|
|
from dataclasses import dataclass |
|
|
|
|
from typing import List, Dict, Optional |
|
|
|
|
|
|
|
|
|
from cereal import car, log |
|
|
|
|
from common.basedir import BASEDIR |
|
|
|
|
from common.params import Params |
|
|
|
|
from selfdrive.controls.lib.events import Alert |
|
|
|
@ -36,22 +35,9 @@ class AlertEntry: |
|
|
|
|
return frame <= self.end_frame |
|
|
|
|
|
|
|
|
|
class AlertManager: |
|
|
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
|
self.reset() |
|
|
|
|
self.alerts: Dict[str, AlertEntry] = defaultdict(AlertEntry) |
|
|
|
|
|
|
|
|
|
def reset(self) -> None: |
|
|
|
|
self.alert: Optional[Alert] = None |
|
|
|
|
self.alert_type: str = "" |
|
|
|
|
self.alert_text_1: str = "" |
|
|
|
|
self.alert_text_2: str = "" |
|
|
|
|
self.alert_status = log.ControlsState.AlertStatus.normal |
|
|
|
|
self.alert_size = log.ControlsState.AlertSize.none |
|
|
|
|
self.visual_alert = car.CarControl.HUDControl.VisualAlert.none |
|
|
|
|
self.audible_alert = car.CarControl.HUDControl.AudibleAlert.none |
|
|
|
|
self.alert_rate: float = 0. |
|
|
|
|
|
|
|
|
|
def add_many(self, frame: int, alerts: List[Alert]) -> None: |
|
|
|
|
for alert in alerts: |
|
|
|
|
key = alert.alert_type |
|
|
|
@ -61,30 +47,18 @@ class AlertManager: |
|
|
|
|
min_end_frame = self.alerts[key].start_frame + alert.duration |
|
|
|
|
self.alerts[key].end_frame = max(frame + 1, min_end_frame) |
|
|
|
|
|
|
|
|
|
def process_alerts(self, frame: int, clear_event_type=None) -> None: |
|
|
|
|
def process_alerts(self, frame: int, clear_event_type=None) -> Optional[Alert]: |
|
|
|
|
current_alert = AlertEntry() |
|
|
|
|
for k, v in self.alerts.items(): |
|
|
|
|
if v.alert is None: |
|
|
|
|
for v in self.alerts.values(): |
|
|
|
|
if not v.alert: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
if clear_event_type is not None and v.alert.event_type == clear_event_type: |
|
|
|
|
self.alerts[k].end_frame = -1 |
|
|
|
|
if clear_event_type and v.alert.event_type == clear_event_type: |
|
|
|
|
v.end_frame = -1 |
|
|
|
|
|
|
|
|
|
# sort by priority first and then by start_frame |
|
|
|
|
greater = current_alert.alert is None or (v.alert.priority, v.start_frame) > (current_alert.alert.priority, current_alert.start_frame) |
|
|
|
|
if v.active(frame) and greater: |
|
|
|
|
current_alert = v |
|
|
|
|
|
|
|
|
|
# clear current alert |
|
|
|
|
self.reset() |
|
|
|
|
|
|
|
|
|
self.alert = current_alert.alert |
|
|
|
|
if self.alert is not None: |
|
|
|
|
self.alert_type = self.alert.alert_type |
|
|
|
|
self.audible_alert = self.alert.audible_alert |
|
|
|
|
self.visual_alert = self.alert.visual_alert |
|
|
|
|
self.alert_text_1 = self.alert.alert_text_1 |
|
|
|
|
self.alert_text_2 = self.alert.alert_text_2 |
|
|
|
|
self.alert_status = self.alert.alert_status |
|
|
|
|
self.alert_size = self.alert.alert_size |
|
|
|
|
self.alert_rate = self.alert.alert_rate |
|
|
|
|
return current_alert.alert |
|
|
|
|