#!/usr/bin/env python3
import time
import random
from cereal import car , log
import cereal . messaging as messaging
from opendbc . car . honda . interface import CarInterface
from openpilot . common . realtime import DT_CTRL
from openpilot . selfdrive . controls . lib . events import ET , Events
from openpilot . selfdrive . controls . lib . alertmanager import AlertManager
from openpilot . system . manager . process_config import managed_processes
EventName = car . OnroadEvent . EventName
def randperc ( ) - > float :
return 100. * random . random ( )
def cycle_alerts ( duration = 200 , is_metric = False ) :
# all alerts
#alerts = list(EVENTS.keys())
# this plays each type of audible alert
alerts = [
( EventName . buttonEnable , ET . ENABLE ) ,
( EventName . buttonCancel , ET . USER_DISABLE ) ,
( EventName . wrongGear , ET . NO_ENTRY ) ,
( EventName . locationdTemporaryError , ET . SOFT_DISABLE ) ,
( EventName . paramsdTemporaryError , ET . SOFT_DISABLE ) ,
( EventName . accFaulted , ET . IMMEDIATE_DISABLE ) ,
# DM sequence
( EventName . preDriverDistracted , ET . WARNING ) ,
( EventName . promptDriverDistracted , ET . WARNING ) ,
( EventName . driverDistracted , ET . WARNING ) ,
]
# debug alerts
alerts = [
#(EventName.highCpuUsage, ET.NO_ENTRY),
#(EventName.lowMemory, ET.PERMANENT),
#(EventName.overheat, ET.PERMANENT),
#(EventName.outOfSpace, ET.PERMANENT),
#(EventName.modeldLagging, ET.PERMANENT),
#(EventName.processNotRunning, ET.NO_ENTRY),
#(EventName.commIssue, ET.NO_ENTRY),
#(EventName.calibrationInvalid, ET.PERMANENT),
( EventName . cameraMalfunction , ET . PERMANENT ) ,
( EventName . cameraFrameRate , ET . PERMANENT ) ,
]
cameras = [ ' roadCameraState ' , ' wideRoadCameraState ' , ' driverCameraState ' ]
CS = car . CarState . new_message ( )
CP = CarInterface . get_non_essential_params ( " HONDA_CIVIC " )
sm = messaging . SubMaster ( [ ' deviceState ' , ' pandaStates ' , ' roadCameraState ' , ' modelV2 ' , ' liveCalibration ' ,
' driverMonitoringState ' , ' longitudinalPlan ' , ' livePose ' ,
' managerState ' ] + cameras )
pm = messaging . PubMaster ( [ ' selfdriveState ' , ' pandaStates ' , ' deviceState ' ] )
events = Events ( )
AM = AlertManager ( )
frame = 0
while True :
for alert , et in alerts :
events . clear ( )
events . add ( alert )
sm [ ' deviceState ' ] . freeSpacePercent = randperc ( )
sm [ ' deviceState ' ] . memoryUsagePercent = int ( randperc ( ) )
sm [ ' deviceState ' ] . cpuTempC = [ randperc ( ) for _ in range ( 3 ) ]
sm [ ' deviceState ' ] . gpuTempC = [ randperc ( ) for _ in range ( 3 ) ]
sm [ ' deviceState ' ] . cpuUsagePercent = [ int ( randperc ( ) ) for _ in range ( 8 ) ]
sm [ ' modelV2 ' ] . frameDropPerc = randperc ( )
if random . random ( ) > 0.25 :
sm [ ' modelV2 ' ] . velocity . x = [ random . random ( ) , ]
if random . random ( ) > 0.25 :
CS . vEgo = random . random ( )
procs = [ p . get_process_state_msg ( ) for p in managed_processes . values ( ) ]
random . shuffle ( procs )
for i in range ( random . randint ( 0 , 10 ) ) :
procs [ i ] . shouldBeRunning = True
sm [ ' managerState ' ] . processes = procs
sm [ ' liveCalibration ' ] . rpyCalib = [ - 1 * random . random ( ) for _ in range ( random . randint ( 0 , 3 ) ) ]
for s in sm . data . keys ( ) :
prob = 0.3 if s in cameras else 0.08
sm . alive [ s ] = random . random ( ) > prob
sm . valid [ s ] = random . random ( ) > prob
sm . freq_ok [ s ] = random . random ( ) > prob
a = events . create_alerts ( [ et , ] , [ CP , CS , sm , is_metric , 0 ] )
AM . add_many ( frame , a )
alert = AM . process_alerts ( frame , [ ] )
print ( alert )
for _ in range ( duration ) :
dat = messaging . new_message ( )
dat . init ( ' selfdriveState ' )
dat . selfdriveState . enabled = False
if alert :
dat . selfdriveState . alertText1 = alert . alert_text_1
dat . selfdriveState . alertText2 = alert . alert_text_2
dat . selfdriveState . alertSize = alert . alert_size
dat . selfdriveState . alertStatus = alert . alert_status
dat . selfdriveState . alertType = alert . alert_type
dat . selfdriveState . alertSound = alert . audible_alert
pm . send ( ' selfdriveState ' , dat )
dat = messaging . new_message ( )
dat . init ( ' deviceState ' )
dat . deviceState . started = True
pm . send ( ' deviceState ' , dat )
dat = messaging . new_message ( ' pandaStates ' , 1 )
dat . pandaStates [ 0 ] . ignitionLine = True
dat . pandaStates [ 0 ] . pandaType = log . PandaState . PandaType . uno
pm . send ( ' pandaStates ' , dat )
frame + = 1
time . sleep ( DT_CTRL )
if __name__ == ' __main__ ' :
cycle_alerts ( )