import copy
import os
import json
from collections import defaultdict
from dataclasses import dataclass
from typing import List , Dict , Optional
from cereal import car , log
from common . basedir import BASEDIR
from common . params import Params
from common . realtime import DT_CTRL
from selfdrive . controls . lib . events import Alert
with open ( os . path . join ( BASEDIR , " selfdrive/controls/lib/alerts_offroad.json " ) ) as f :
OFFROAD_ALERTS = json . load ( f )
def set_offroad_alert ( alert : str , show_alert : bool , extra_text : Optional [ str ] = None ) - > None :
if show_alert :
a = OFFROAD_ALERTS [ alert ]
if extra_text is not None :
a = copy . copy ( OFFROAD_ALERTS [ alert ] )
a [ ' text ' ] + = extra_text
Params ( ) . put ( alert , json . dumps ( a ) )
else :
Params ( ) . delete ( alert )
@dataclass
class AlertEntry :
alert : Optional [ Alert ] = None
start_frame : int = - 1
end_frame : int = - 1
class AlertManager :
def __init__ ( self ) :
self . reset ( )
self . activealerts : Dict [ str , AlertEntry ] = defaultdict ( AlertEntry )
def reset ( self ) - > None :
self . alert_type : str = " "
self . alert_text_1 : str = " "
self . alert_text_2 : str = " "
self . alert_status = log . ControlsState . AlertStatus . normal
self . alert_size = log . ControlsState . AlertSize . none
self . visual_alert = car . CarControl . HUDControl . VisualAlert . none
self . audible_alert = car . CarControl . HUDControl . AudibleAlert . none
self . alert_rate : float = 0.
def add_many ( self , frame : int , alerts : List [ Alert ] , enabled : bool = True ) - > None :
for alert in alerts :
alert_duration = max ( alert . duration_sound , alert . duration_hud_alert , alert . duration_text )
self . activealerts [ alert . alert_type ] . alert = alert
self . activealerts [ alert . alert_type ] . start_frame = frame
self . activealerts [ alert . alert_type ] . end_frame = frame + int ( alert_duration / DT_CTRL )
def process_alerts ( self , frame : int , clear_event_type = None ) - > None :
current_alert = AlertEntry ( )
for k , v in self . activealerts . items ( ) :
if v . alert is None :
continue
if v . alert . event_type == clear_event_type :
self . activealerts [ k ] . end_frame = - 1
# sort by priority first and then by start_frame
active = self . activealerts [ k ] . end_frame > frame
greater = current_alert . alert is None or ( v . alert . priority , v . start_frame ) > ( current_alert . alert . priority , current_alert . start_frame )
if active and greater :
current_alert = v
# clear current alert
self . reset ( )
a = current_alert . alert
if a is not None :
self . alert_type = a . alert_type
self . audible_alert = a . audible_alert
self . visual_alert = a . visual_alert
self . alert_text_1 = a . alert_text_1
self . alert_text_2 = a . alert_text_2
self . alert_status = a . alert_status
self . alert_size = a . alert_size
self . alert_rate = a . alert_rate