openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
4.2 KiB

#!/usr/bin/env python3
import time
import random
from cereal import car, log
import cereal.messaging as messaging
from common.realtime import DT_CTRL
from selfdrive.car.honda.interface import CarInterface
from selfdrive.controls.lib.events import ET, Events
from selfdrive.controls.lib.alertmanager import AlertManager
from selfdrive.manager.process_config import managed_processes
EventName = car.CarEvent.EventName
def randperc() -> float:
return 100. * random.random()
def cycle_alerts(duration=200, is_metric=False):
# all alerts
#alerts = list(EVENTS.keys())
# this plays each type of audible alert
alerts = [
(EventName.buttonEnable, ET.ENABLE),
(EventName.buttonCancel, ET.USER_DISABLE),
(EventName.wrongGear, ET.NO_ENTRY),
(EventName.vehicleModelInvalid, ET.SOFT_DISABLE),
(EventName.accFaulted, ET.IMMEDIATE_DISABLE),
# DM sequence
(EventName.preDriverDistracted, ET.WARNING),
(EventName.promptDriverDistracted, ET.WARNING),
(EventName.driverDistracted, ET.WARNING),
]
# debug alerts
alerts = [
#(EventName.highCpuUsage, ET.NO_ENTRY),
#(EventName.lowMemory, ET.PERMANENT),
#(EventName.overheat, ET.PERMANENT),
#(EventName.outOfSpace, ET.PERMANENT),
#(EventName.modeldLagging, ET.PERMANENT),
#(EventName.processNotRunning, ET.NO_ENTRY),
(EventName.commIssue, ET.NO_ENTRY),
(EventName.calibrationInvalid, ET.PERMANENT),
(EventName.posenetInvalid, ET.NO_ENTRY),
]
CS = car.CarState.new_message()
CP = CarInterface.get_params("HONDA CIVIC 2016")
sm = messaging.SubMaster(['deviceState', 'pandaStates', 'roadCameraState', 'modelV2', 'liveCalibration',
'driverMonitoringState', 'longitudinalPlan', 'lateralPlan', 'liveLocationKalman', 'managerState'])
pm = messaging.PubMaster(['controlsState', 'pandaStates', 'deviceState'])
events = Events()
AM = AlertManager()
frame = 0
while True:
for alert, et in alerts:
events.clear()
events.add(alert)
sm['deviceState'].freeSpacePercent = randperc()
sm['deviceState'].memoryUsagePercent = int(randperc())
sm['deviceState'].cpuTempC = [randperc() for _ in range(3)]
sm['deviceState'].gpuTempC = [randperc() for _ in range(3)]
sm['deviceState'].cpuUsagePercent = [int(randperc()) for _ in range(8)]
sm['modelV2'].frameDropPerc = randperc()
if random.random() > 0.25:
sm['modelV2'].velocity.x = [random.random(), ]
if random.random() > 0.25:
CS.vEgo = random.random()
procs = [p.get_process_state_msg() for p in managed_processes.values()]
random.shuffle(procs)
for i in range(random.randint(0, 10)):
procs[i].shouldBeRunning = True
sm['managerState'].processes = procs
sm['liveCalibration'].rpyCalib = [-1 * random.random() for _ in range(random.randint(0, 3))]
for s in sm.data.keys():
sm.alive[s] = random.random() > 0.08
sm.valid[s] = random.random() > 0.08
sm.freq_ok[s] = random.random() > 0.08
a = events.create_alerts([et, ], [CP, CS, sm, is_metric, 0])
AM.add_many(frame, a)
alert = AM.process_alerts(frame, [])
print(alert)
for _ in range(duration):
dat = messaging.new_message()
dat.init('controlsState')
dat.controlsState.enabled = False
if alert:
dat.controlsState.alertText1 = alert.alert_text_1
dat.controlsState.alertText2 = alert.alert_text_2
dat.controlsState.alertSize = alert.alert_size
dat.controlsState.alertStatus = alert.alert_status
dat.controlsState.alertBlinkingRate = alert.alert_rate
dat.controlsState.alertType = alert.alert_type
dat.controlsState.alertSound = alert.audible_alert
pm.send('controlsState', dat)
dat = messaging.new_message()
dat.init('deviceState')
dat.deviceState.started = True
pm.send('deviceState', dat)
dat = messaging.new_message('pandaStates', 1)
dat.pandaStates[0].ignitionLine = True
dat.pandaStates[0].pandaType = log.PandaState.PandaType.uno
pm.send('pandaStates', dat)
frame += 1
time.sleep(DT_CTRL)
if __name__ == '__main__':
cycle_alerts()