openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

512 lines
16 KiB

#!/usr/bin/env python3
import importlib
import os
import sys
import threading
import time
import signal
from collections import namedtuple
import capnp
import cereal.messaging as messaging
from cereal import car, log
from cereal.services import service_list
from common.params import Params
from common.timeout import Timeout
from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.car.car_helpers import get_car, interfaces
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
from selfdrive.manager.process import PythonProcess
from selfdrive.manager.process_config import managed_processes
# Numpy gives different results based on CPU features after version 19
NUMPY_TOLERANCE = 1e-7
CI = "CI" in os.environ
TIMEOUT = 15
process replay: automatically push refs on fail (#24414) * test failure() * let's just change a tune here * debug revert * debug * use current commit, not ref_commit fix * need to figure out better place for this * quick test * test without upload * temp * use azure token * fixes * shouldn't need this * debug * debug * not getting anything? * does this mean nothing gets envvars? * add azure token to docker environment variables * quote * move back * clean up a bit * more clean up * like this sorting better * replace flags with CI and clean up * test FULL_TEST and minimalize diff a bit * now test all * revert tests comments * remove flags * revert this revert this * now make it fail * now update ref_commit to last commit (make sure we can re-start this test if we commit before last one finishes uploading) * fix fix fix fix * bad commit * why is it not throwing an exception? * debug * URLFile returns empty bytes if using cache and remote file doesn't exist * we always need to download anyway * debug... * duh, wrong file. but neither should have it * add that back and just check explicitly * check both * clean up and make a diff * stylize * see if this is a better diff on files changed * update refs * revert changes * only for owners or members * if we have token access * if we have token access * if we have token access * move up * clean up * revert * move update refs to test_processes * clean up * update messages * update msg * update README and delete update_refs * this isn't possible to reach anymore * fix readme * better help message better help message better help message * only show basename when uploading, only if failed to find * test diff * fix printing old ref commit * change to using * update refs * Revert "update refs" This reverts commit 2e352a736a6de68e2c7064daa4e2e9409ce77686. * revert * ref refers to reference commit/logs, cur refers to current logs/commit (future ref) * like for better * Apply suggestions from code review Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/test/process_replay/test_processes.py * every time lol Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: d2d3b7b823bac0d24e2136f7a97c3aa4c6d9ae47
3 years ago
PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance', 'fake_pubsubmaster', 'submaster_config'], defaults=({},))
def wait_for_event(evt):
if not evt.wait(TIMEOUT):
if threading.currentThread().getName() == "MainThread":
# tested process likely died. don't let test just hang
raise Exception("Timeout reached. Tested process likely crashed.")
else:
# done testing this process, let it die
sys.exit(0)
class FakeSocket:
def __init__(self, wait=True):
self.data = []
self.wait = wait
self.recv_called = threading.Event()
self.recv_ready = threading.Event()
def receive(self, non_blocking=False):
if non_blocking:
return None
if self.wait:
self.recv_called.set()
wait_for_event(self.recv_ready)
self.recv_ready.clear()
return self.data.pop()
def send(self, data):
if self.wait:
wait_for_event(self.recv_called)
self.recv_called.clear()
self.data.append(data)
if self.wait:
self.recv_ready.set()
def wait_for_recv(self):
wait_for_event(self.recv_called)
class DumbSocket:
def __init__(self, s=None):
if s is not None:
try:
dat = messaging.new_message(s)
except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member
# lists
dat = messaging.new_message(s, 0)
self.data = dat.to_bytes()
def receive(self, non_blocking=False):
return self.data
def send(self, dat):
pass
class FakeSubMaster(messaging.SubMaster):
def __init__(self, services, ignore_alive=None, ignore_avg_freq=None):
super().__init__(services, ignore_alive=ignore_alive, ignore_avg_freq=ignore_avg_freq, addr=None)
self.sock = {s: DumbSocket(s) for s in services}
self.update_called = threading.Event()
self.update_ready = threading.Event()
self.wait_on_getitem = False
def __getitem__(self, s):
# hack to know when fingerprinting is done
if self.wait_on_getitem:
self.update_called.set()
wait_for_event(self.update_ready)
self.update_ready.clear()
return self.data[s]
def update(self, timeout=-1):
self.update_called.set()
wait_for_event(self.update_ready)
self.update_ready.clear()
def update_msgs(self, cur_time, msgs):
wait_for_event(self.update_called)
self.update_called.clear()
super().update_msgs(cur_time, msgs)
self.update_ready.set()
def wait_for_update(self):
wait_for_event(self.update_called)
class FakePubMaster(messaging.PubMaster):
def __init__(self, services): # pylint: disable=super-init-not-called
self.data = {}
self.sock = {}
self.last_updated = None
for s in services:
try:
data = messaging.new_message(s)
except capnp.lib.capnp.KjException:
data = messaging.new_message(s, 0)
self.data[s] = data.as_reader()
self.sock[s] = DumbSocket()
self.send_called = threading.Event()
self.get_called = threading.Event()
def send(self, s, dat):
self.last_updated = s
if isinstance(dat, bytes):
self.data[s] = log.Event.from_bytes(dat)
else:
self.data[s] = dat.as_reader()
self.send_called.set()
wait_for_event(self.get_called)
self.get_called.clear()
def wait_for_msg(self):
wait_for_event(self.send_called)
self.send_called.clear()
dat = self.data[self.last_updated]
self.get_called.set()
return dat
def fingerprint(msgs, fsm, can_sock, fingerprint):
print("start fingerprinting")
fsm.wait_on_getitem = True
# populate fake socket with data for fingerprinting
canmsgs = [msg for msg in msgs if msg.which() == "can"]
wait_for_event(can_sock.recv_called)
can_sock.recv_called.clear()
can_sock.data = [msg.as_builder().to_bytes() for msg in canmsgs[:300]]
can_sock.recv_ready.set()
can_sock.wait = False
# we know fingerprinting is done when controlsd sets sm['lateralPlan'].sensorValid
wait_for_event(fsm.update_called)
fsm.update_called.clear()
fsm.wait_on_getitem = False
can_sock.wait = True
can_sock.data = []
fsm.update_ready.set()
def get_car_params(msgs, fsm, can_sock, fingerprint):
if fingerprint:
CarInterface, _, _ = interfaces[fingerprint]
CP = CarInterface.get_params(fingerprint)
else:
can = FakeSocket(wait=False)
sendcan = FakeSocket(wait=False)
canmsgs = [msg for msg in msgs if msg.which() == 'can']
for m in canmsgs[:300]:
can.send(m.as_builder().to_bytes())
_, CP = get_car(can, sendcan)
Params().put("CarParams", CP.to_bytes())
def controlsd_rcv_callback(msg, CP, cfg, fsm):
# no sendcan until controlsd is initialized
socks = [s for s in cfg.pub_sub[msg.which()] if
(fsm.frame + 1) % int(service_list[msg.which()].frequency / service_list[s].frequency) == 0]
if "sendcan" in socks and fsm.frame < 2000:
socks.remove("sendcan")
return socks, len(socks) > 0
def radar_rcv_callback(msg, CP, cfg, fsm):
if msg.which() != "can":
return [], False
elif CP.radarOffCan:
return ["radarState", "liveTracks"], True
radar_msgs = {"honda": [0x445], "toyota": [0x19f, 0x22f], "gm": [0x474],
"chrysler": [0x2d4]}.get(CP.carName, None)
if radar_msgs is None:
raise NotImplementedError
for m in msg.can:
if m.src == 1 and m.address in radar_msgs:
return ["radarState", "liveTracks"], True
return [], False
def calibration_rcv_callback(msg, CP, cfg, fsm):
# calibrationd publishes 1 calibrationData every 5 cameraOdometry packets.
# should_recv always true to increment frame
recv_socks = []
frame = fsm.frame + 1 # incrementing hasn't happened yet in SubMaster
if frame == 0 or (msg.which() == 'cameraOdometry' and (frame % 5) == 0):
recv_socks = ["liveCalibration"]
return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry'
def ublox_rcv_callback(msg):
msg_class, msg_id = msg.ubloxRaw[2:4]
if (msg_class, msg_id) in {(1, 7 * 16)}:
return ["gpsLocationExternal"]
elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}:
return ["ubloxGnss"]
else:
return []
CONFIGS = [
ProcessConfig(
proc_name="controlsd",
pub_sub={
"can": ["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"],
"deviceState": [], "pandaStates": [], "peripheralState": [], "liveCalibration": [], "driverMonitoringState": [], "longitudinalPlan": [], "lateralPlan": [], "liveLocationKalman": [], "liveParameters": [], "radarState": [],
"modelV2": [], "driverCameraState": [], "roadCameraState": [], "managerState": [], "testJoystick": [],
},
ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"],
init_callback=fingerprint,
should_recv_callback=controlsd_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
submaster_config={'ignore_avg_freq': ['radarState', 'longitudinalPlan']}
),
ProcessConfig(
proc_name="radard",
pub_sub={
"can": ["radarState", "liveTracks"],
"liveParameters": [], "carState": [], "modelV2": [],
},
ignore=["logMonoTime", "valid", "radarState.cumLagMs"],
init_callback=get_car_params,
should_recv_callback=radar_rcv_callback,
tolerance=None,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="plannerd",
pub_sub={
"modelV2": ["lateralPlan", "longitudinalPlan"],
"carState": [], "controlsState": [], "radarState": [],
},
ignore=["logMonoTime", "valid", "longitudinalPlan.processingDelay", "longitudinalPlan.solverExecutionTime", "lateralPlan.solverExecutionTime"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="calibrationd",
pub_sub={
"carState": ["liveCalibration"],
"cameraOdometry": []
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=calibration_rcv_callback,
tolerance=None,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="dmonitoringd",
pub_sub={
"driverState": ["driverMonitoringState"],
"liveCalibration": [], "carState": [], "modelV2": [], "controlsState": [],
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="locationd",
pub_sub={
"cameraOdometry": ["liveLocationKalman"],
"sensorEvents": [], "gpsLocationExternal": [], "liveCalibration": [], "carState": [],
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=False,
),
ProcessConfig(
proc_name="paramsd",
pub_sub={
"liveLocationKalman": ["liveParameters"],
"carState": []
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="ubloxd",
pub_sub={
"ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
},
ignore=["logMonoTime"],
init_callback=None,
should_recv_callback=ublox_rcv_callback,
tolerance=None,
fake_pubsubmaster=False,
),
]
def replay_process(cfg, lr, fingerprint=None):
with OpenpilotPrefix():
process replay: support running in parallel (#24534) * prefix params * set env * prefix in manager * filesystem except * dont delete manager folder * Update selfdrive/common/params.h Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * debug same path * remove cleanup + same default * dont use filesystem lib * param symlink path * prefix helpers * path * dont delete d * parallel loop * refactor loop * msgq * clean msgs * spelling * nestable pool * spelling * logreaders in parallel * bugfix * assert msgq * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * assert in cereal * folder exists * create dirs in test * bump cereal * Update selfdrive/test/process_replay/process_replay.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * PR feedback * params path in basedir * ref commit * param path * bugfix * upload_only * param path * Update selfdrive/test/process_replay/process_replay.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * msgq path name * python concurrency.features * progress bar * remove progress bar from compare logs * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * defaultdict * context manager * update refs * dont get logs if upload only * upload refs in parallel * cleanup * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * cleanup * text Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> Co-authored-by: Shane Smiskol <shane@smiskol.com> old-commit-hash: 444d26582121550a371f14e17d614ba6984cc10a
3 years ago
if cfg.fake_pubsubmaster:
return python_replay_process(cfg, lr, fingerprint)
else:
return cpp_replay_process(cfg, lr, fingerprint)
def setup_env(simulation=False):
params = Params()
params.clear_all()
params.put_bool("OpenpilotEnabledToggle", True)
params.put_bool("Passive", False)
Add Disengage on Accelerator Toggle (#23977) * Squashed commit of the following: commit 953bcf0ecf8d03f0fec9ce0d5442cc660ae2347d Merge: b4198608d efd04715e Author: Shane Smiskol <shane@smiskol.com> Date: Fri Apr 1 19:58:06 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit b4198608dac9595bfeb9443f95cd6c5d385cd62d Merge: 24f90b2c8 0e983562d Author: Shane Smiskol <shane@smiskol.com> Date: Fri Apr 1 14:32:54 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 24f90b2c8d6ae509c0776b6e33b743467aa08dee Author: Shane Smiskol <shane@smiskol.com> Date: Thu Mar 31 14:07:52 2022 -0700 revert changes and get ready for override commit aa514df6b5e8431f0faa07f6b25eb5cb6c9f2749 Author: Shane Smiskol <shane@smiskol.com> Date: Wed Mar 30 20:52:24 2022 -0700 temporary commit c874e10c2555968c683f629c4582ab22e51d431f Author: Shane Smiskol <shane@smiskol.com> Date: Wed Mar 30 00:30:13 2022 -0700 move gasPressed to controlsd commit fe670439dff5bc6dfc5963d79ca2febe0c5920c7 Merge: 49d3b6d11 c78701e47 Author: Shane Smiskol <shane@smiskol.com> Date: Wed Mar 30 00:15:48 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 49d3b6d1162893fa94b2294152f0718eeb4698ed Merge: 475b27e74 3e819bc5a Author: Shane Smiskol <shane@smiskol.com> Date: Mon Mar 28 13:06:08 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 475b27e74fd6479f1d148e9ded1eca6ee476865c Author: Shane Smiskol <shane@smiskol.com> Date: Thu Mar 24 20:24:59 2022 -0700 update SAFETY.md commit 0798eadb82749e12ecbaa86f4e57b21bd3dbc7e8 Author: Shane Smiskol <shane@smiskol.com> Date: Thu Mar 24 17:52:06 2022 -0700 ⛽ ➡️ 🔋 bump commit d2b64b89cf8d7265bc90848b4e3a2276c9a4cb78 Author: Shane Smiskol <shane@smiskol.com> Date: Thu Mar 24 16:19:34 2022 -0700 these changes will be in border PR these changes will be border PR commit bed31e63de2244c161729773978fef8a9246ddb5 Merge: 491417640 603e07793 Author: Shane Smiskol <shane@smiskol.com> Date: Thu Mar 24 16:17:31 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 49141764098e3a701ff123ead52de4656b31db4d Merge: 9291e9f00 46ed8bb8e Author: Shane Smiskol <shane@smiskol.com> Date: Wed Mar 23 19:00:26 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 9291e9f0045780669b2ffa7f08fb10373c03b070 Merge: 2aed64157 2c947f193 Author: Shane Smiskol <shane@smiskol.com> Date: Fri Mar 18 19:03:13 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 2aed64157d6e9dc993d103a2c6abf7e643410562 Author: Shane Smiskol <shane@smiskol.com> Date: Fri Mar 18 19:03:03 2022 -0700 Revert "send pre-enable state when gas is pressed" This reverts commit 8d82e697dc2efade307413e767d14b56b4d2d079. commit a219defe5342efa731a957b2ff4fd86292c1f239 Author: Shane Smiskol <shane@smiskol.com> Date: Thu Mar 17 12:50:46 2022 -0700 bump cereal commit 8d82e697dc2efade307413e767d14b56b4d2d079 Author: Shane Smiskol <shane@smiskol.com> Date: Wed Mar 16 22:40:13 2022 -0700 send pre-enable state when gas is pressed formatting and it's out forgot one allow gas press show toggle revert changes revert changes commit 5eb4d1ab1b77ac65bdd901f692ebfb848d5187ca Merge: bad1bdc74 d0ef25959 Author: Shane Smiskol <shane@smiskol.com> Date: Wed Mar 16 22:29:43 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit bad1bdc7428c5d8a4a765b259da504f0ade84424 Author: Shane Smiskol <shane@smiskol.com> Date: Tue Mar 15 23:03:56 2022 -0700 clean up a bit commit 7516ed9b786536a07f00a92b9c1742ca252a01d1 Author: Shane Smiskol <shane@smiskol.com> Date: Tue Mar 15 22:55:09 2022 -0700 these are null checks right now commit 408f5f1d633521b29b08d966c9b387fa79dff0d6 Author: Shane Smiskol <shane@smiskol.com> Date: Tue Mar 15 22:51:14 2022 -0700 should work commit be1978e29c03d8843cf073cc9ddf84020fd2b378 Author: Shane Smiskol <shane@smiskol.com> Date: Tue Mar 15 22:36:16 2022 -0700 stash commit f0bd4c47e9d27061c6cd3b88765a7c10995f42db Author: Shane Smiskol <shane@smiskol.com> Date: Tue Mar 15 22:26:24 2022 -0700 this was flipped commit d5b6e30389a727e8559331acd706422e1b246f60 Merge: 820b19894 c814c1383 Author: Shane Smiskol <shane@smiskol.com> Date: Tue Mar 15 22:25:49 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 820b1989496225df1f66f39b477f1e9402db1b19 Merge: 91763f9ff 6d2483ff9 Author: Shane Smiskol <shane@smiskol.com> Date: Mon Mar 14 23:39:34 2022 -0700 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 91763f9ff4321986babce1a7b1272fcf64c8f753 Author: Shane Smiskol <shane@smiskol.com> Date: Thu Feb 24 20:15:26 2022 -0800 add icon from jozef 🔥 commit 1885c9e2bacd9351c7e421b1436dcbfbde238436 Merge: 1f591736a eae207a84 Author: Shane Smiskol <shane@smiskol.com> Date: Thu Feb 24 20:08:07 2022 -0800 Merge remote-tracking branch 'upstream/master' into disengage-on-gas commit 1f591736a2767b1ed892c29ec5000cc7348f01c7 Merge: 465811f0b ebf5b27a7 Author: Jason Wen <47793918+sunnyhaibin@users.noreply.github.com> Date: Sat Feb 12 10:24:29 2022 -0500 Merge branch 'master' into disengage-on-gas commit 465811f0b4776e43a289398e4d4369362b7da58c Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Feb 11 11:20:32 2022 -0500 Add param to process replay commit 0344363a533505435cff87eb33356b2364a4156f Merge: 700efcb3f 1175355b7 Author: Jason Wen <47793918+sunnyhaibin@users.noreply.github.com> Date: Fri Feb 11 11:07:31 2022 -0500 Merge branch 'master' into disengage-on-gas commit 700efcb3f444092f99ef10e9908f83402429d742 Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Feb 11 11:04:39 2022 -0500 Add param to unsafeMode commit 220ce272fee1f7f5ace2a04052605a6e4f95712b Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Feb 11 10:54:16 2022 -0500 Add param to unsafeMode commit d273bb78acbc23ee5255e461fa71b2f0400a5e03 Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Feb 11 10:52:22 2022 -0500 Resolve conflicts commit 1a85afd60c2f0d3140dfa8e7cd574d6b88abd5e3 Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Feb 11 10:51:50 2022 -0500 Resolve conflicts commit e3be32afc2bb2423d29e2b4f85b3cff3fe72aab9 Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Feb 11 10:51:08 2022 -0500 Resolve conflicts commit f27203af3afe612173bf9a63e15c57118598b5b8 Author: Jason Wen <haibin.wen3@gmail.com> Date: Wed Jan 19 00:30:09 2022 -0500 Add param to process replay commit 04c0ad1a54f4fea5efe74b3203d9bdbd693b64a6 Author: Jason Wen <47793918+sunnyhaibin@users.noreply.github.com> Date: Wed Jan 19 00:28:49 2022 -0500 Update interfaces.py Co-authored-by: Willem Melching <willem.melching@gmail.com> commit 99cf13caeb4d9f2c95c25e1e78b6e353ea508cb6 Author: Jason Wen <haibin.wen3@gmail.com> Date: Sat Jan 15 00:03:38 2022 -0500 Fix missing params lib commit 2230254ca709625d591b0c73d1a6022c450f6ef6 Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Jan 14 23:38:46 2022 -0500 Disengage on gas toggle commit 87475e02cb63cc39c9b16edc67f303e02d17e6ef Author: Jason Wen <haibin.wen3@gmail.com> Date: Fri Jan 14 23:24:00 2022 -0500 Disengage on gas toggle * bump * use ALTERNATIVE_EXPERIENCE * or it * de-bump * update refs * update refs! * you can just ctrl+click ALTERNATIVE_EXPERIENCE! * already have a params * update text old-commit-hash: f1f40c117a65294e27f93f953a772c43078bdceb
3 years ago
params.put_bool("DisengageOnAccelerator", True)
params.put_bool("WideCameraOnly", False)
params.put_bool("DisableLogging", False)
os.environ["NO_RADAR_SLEEP"] = "1"
os.environ["REPLAY"] = "1"
if simulation:
os.environ["SIMULATION"] = "1"
elif "SIMULATION" in os.environ:
del os.environ["SIMULATION"]
def python_replay_process(cfg, lr, fingerprint=None):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub]
pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can']
fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config)
fpm = FakePubMaster(sub_sockets)
args = (fsm, fpm)
if 'can' in list(cfg.pub_sub.keys()):
can_sock = FakeSocket()
args = (fsm, fpm, can_sock)
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
setup_env()
# TODO: remove after getting new route for civic & accord
migration = {
"HONDA CIVIC 2016 TOURING": "HONDA CIVIC 2016",
"HONDA ACCORD 2018 SPORT 2T": "HONDA ACCORD 2018",
"HONDA ACCORD 2T 2018": "HONDA ACCORD 2018",
"Mazda CX-9 2021": "MAZDA CX-9 2021",
}
if fingerprint is not None:
os.environ['SKIP_FW_QUERY'] = "1"
os.environ['FINGERPRINT'] = fingerprint
else:
os.environ['SKIP_FW_QUERY'] = ""
os.environ['FINGERPRINT'] = ""
for msg in lr:
if msg.which() == 'carParams':
car_fingerprint = migration.get(msg.carParams.carFingerprint, msg.carParams.carFingerprint)
if msg.carParams.fingerprintSource == "fw" and (car_fingerprint in FW_VERSIONS):
Params().put("CarParamsCache", msg.carParams.as_builder().to_bytes())
else:
os.environ['SKIP_FW_QUERY'] = "1"
os.environ['FINGERPRINT'] = car_fingerprint
assert(type(managed_processes[cfg.proc_name]) is PythonProcess)
managed_processes[cfg.proc_name].prepare()
mod = importlib.import_module(managed_processes[cfg.proc_name].module)
thread = threading.Thread(target=mod.main, args=args)
thread.daemon = True
thread.start()
if cfg.init_callback is not None:
if 'can' not in list(cfg.pub_sub.keys()):
can_sock = None
cfg.init_callback(all_msgs, fsm, can_sock, fingerprint)
CP = car.CarParams.from_bytes(Params().get("CarParams", block=True))
# wait for started process to be ready
if 'can' in list(cfg.pub_sub.keys()):
can_sock.wait_for_recv()
else:
fsm.wait_for_update()
log_msgs, msg_queue = [], []
process replay: support running in parallel (#24534) * prefix params * set env * prefix in manager * filesystem except * dont delete manager folder * Update selfdrive/common/params.h Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * debug same path * remove cleanup + same default * dont use filesystem lib * param symlink path * prefix helpers * path * dont delete d * parallel loop * refactor loop * msgq * clean msgs * spelling * nestable pool * spelling * logreaders in parallel * bugfix * assert msgq * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * assert in cereal * folder exists * create dirs in test * bump cereal * Update selfdrive/test/process_replay/process_replay.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * PR feedback * params path in basedir * ref commit * param path * bugfix * upload_only * param path * Update selfdrive/test/process_replay/process_replay.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * msgq path name * python concurrency.features * progress bar * remove progress bar from compare logs * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * defaultdict * context manager * update refs * dont get logs if upload only * upload refs in parallel * cleanup * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * cleanup * text Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> Co-authored-by: Shane Smiskol <shane@smiskol.com> old-commit-hash: 444d26582121550a371f14e17d614ba6984cc10a
3 years ago
for msg in pub_msgs:
if cfg.should_recv_callback is not None:
recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm)
else:
recv_socks = [s for s in cfg.pub_sub[msg.which()] if
(fsm.frame + 1) % int(service_list[msg.which()].frequency / service_list[s].frequency) == 0]
should_recv = bool(len(recv_socks))
if msg.which() == 'can':
can_sock.send(msg.as_builder().to_bytes())
else:
msg_queue.append(msg.as_builder())
if should_recv:
fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue)
msg_queue = []
recv_cnt = len(recv_socks)
while recv_cnt > 0:
m = fpm.wait_for_msg().as_builder()
m.logMonoTime = msg.logMonoTime
m = m.as_reader()
log_msgs.append(m)
recv_cnt -= m.which() in recv_socks
return log_msgs
def cpp_replay_process(cfg, lr, fingerprint=None):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here
pm = messaging.PubMaster(cfg.pub_sub.keys())
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
log_msgs = []
# We need to fake SubMaster alive since we can't inject a fake clock
setup_env(simulation=True)
managed_processes[cfg.proc_name].prepare()
managed_processes[cfg.proc_name].start()
try:
with Timeout(TIMEOUT):
while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()):
time.sleep(0)
# Make sure all subscribers are connected
sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets}
for s in sub_sockets:
messaging.recv_one_or_none(sockets[s])
process replay: support running in parallel (#24534) * prefix params * set env * prefix in manager * filesystem except * dont delete manager folder * Update selfdrive/common/params.h Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * debug same path * remove cleanup + same default * dont use filesystem lib * param symlink path * prefix helpers * path * dont delete d * parallel loop * refactor loop * msgq * clean msgs * spelling * nestable pool * spelling * logreaders in parallel * bugfix * assert msgq * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * assert in cereal * folder exists * create dirs in test * bump cereal * Update selfdrive/test/process_replay/process_replay.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * PR feedback * params path in basedir * ref commit * param path * bugfix * upload_only * param path * Update selfdrive/test/process_replay/process_replay.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * msgq path name * python concurrency.features * progress bar * remove progress bar from compare logs * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * defaultdict * context manager * update refs * dont get logs if upload only * upload refs in parallel * cleanup * Update selfdrive/test/process_replay/test_processes.py Co-authored-by: Shane Smiskol <shane@smiskol.com> * cleanup * text Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> Co-authored-by: Shane Smiskol <shane@smiskol.com> old-commit-hash: 444d26582121550a371f14e17d614ba6984cc10a
3 years ago
for i, msg in enumerate(pub_msgs):
pm.send(msg.which(), msg.as_builder())
resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg)
for s in resp_sockets:
response = messaging.recv_one(sockets[s])
if response is None:
print(f"Warning, no response received {i}")
else:
response = response.as_builder()
response.logMonoTime = msg.logMonoTime
response = response.as_reader()
log_msgs.append(response)
if not len(resp_sockets): # We only need to wait if we didn't already wait for a response
while not pm.all_readers_updated(msg.which()):
time.sleep(0)
finally:
managed_processes[cfg.proc_name].signal(signal.SIGKILL)
managed_processes[cfg.proc_name].stop()
return log_msgs
def check_enabled(msgs):
for msg in msgs:
if msg.which() == "carParams":
if msg.carParams.notCar:
return True
elif msg.which() == "controlsState":
if msg.controlsState.active:
return True
return False