draft to fix replay

pull/26850/head
Kurt Nistelberger 3 years ago
parent 238bccd251
commit 023d55d10b
  1. 47
      selfdrive/test/process_replay/process_replay.py

@ -25,14 +25,18 @@ from selfdrive.manager.process_config import managed_processes
NUMPY_TOLERANCE = 1e-7 NUMPY_TOLERANCE = 1e-7
CI = "CI" in os.environ CI = "CI" in os.environ
TIMEOUT = 15 TIMEOUT = 15
# laikad may not return on a gnss message, shorter timeout
TIMEOUT_LAIKAD_RESPONSE = 1
# laikad needs longer on first startup due to orbit downloads
TIMEOUT_LAIKAD_UPDATE = 40
PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__)) PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/") FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance', 'fake_pubsubmaster', 'submaster_config', 'environ', 'subtest_name', "field_tolerances"], defaults=({}, {}, "", {})) ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance', 'fake_pubsubmaster', 'submaster_config', 'environ', 'subtest_name', "field_tolerances", "allow_no_response"], defaults=({}, {}, "", {}, False))
def wait_for_event(evt): def wait_for_event(evt, timeout=TIMEOUT):
if not evt.wait(TIMEOUT): if not evt.wait(timeout):
if threading.currentThread().getName() == "MainThread": if threading.currentThread().getName() == "MainThread":
# tested process likely died. don't let test just hang # tested process likely died. don't let test just hang
raise Exception(f"Timeout reached. Tested process {os.environ['PROC_NAME']} likely crashed.") raise Exception(f"Timeout reached. Tested process {os.environ['PROC_NAME']} likely crashed.")
@ -111,8 +115,8 @@ class FakeSubMaster(messaging.SubMaster):
wait_for_event(self.update_ready) wait_for_event(self.update_ready)
self.update_ready.clear() self.update_ready.clear()
def update_msgs(self, cur_time, msgs): def update_msgs(self, cur_time, msgs, timeout=TIMEOUT):
wait_for_event(self.update_called) wait_for_event(self.update_called, timeout)
self.update_called.clear() self.update_called.clear()
super().update_msgs(cur_time, msgs) super().update_msgs(cur_time, msgs)
self.update_ready.set() self.update_ready.set()
@ -146,8 +150,8 @@ class FakePubMaster(messaging.PubMaster):
wait_for_event(self.get_called) wait_for_event(self.get_called)
self.get_called.clear() self.get_called.clear()
def wait_for_msg(self): def wait_for_msg(self, timeout=TIMEOUT):
wait_for_event(self.send_called) wait_for_event(self.send_called, timeout)
self.send_called.clear() self.send_called.clear()
dat = self.data[self.last_updated] dat = self.data[self.last_updated]
self.get_called.set() self.get_called.set()
@ -360,20 +364,6 @@ CONFIGS = [
tolerance=None, tolerance=None,
fake_pubsubmaster=False, fake_pubsubmaster=False,
), ),
ProcessConfig(
proc_name="laikad",
subtest_name="Offline",
pub_sub={
"ubloxGnss": ["gnssMeasurements"],
"clocks": []
},
ignore=["logMonoTime"],
init_callback=get_car_params,
should_recv_callback=laika_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
environ={"LAIKAD_NO_INTERNET": "1"},
),
ProcessConfig( ProcessConfig(
proc_name="laikad", proc_name="laikad",
pub_sub={ pub_sub={
@ -385,6 +375,7 @@ CONFIGS = [
should_recv_callback=laika_rcv_callback, should_recv_callback=laika_rcv_callback,
tolerance=NUMPY_TOLERANCE, tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True, fake_pubsubmaster=True,
allow_no_response=True,
), ),
ProcessConfig( ProcessConfig(
proc_name="torqued", proc_name="torqued",
@ -530,12 +521,22 @@ def python_replay_process(cfg, lr, fingerprint=None):
msg_queue.append(msg.as_builder()) msg_queue.append(msg.as_builder())
if should_recv: if should_recv:
fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue) timeout = TIMEOUT_LAIKAD_UPDATE if cfg.allow_no_response else TIMEOUT
fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue, timeout)
msg_queue = [] msg_queue = []
recv_cnt = len(recv_socks) recv_cnt = len(recv_socks)
while recv_cnt > 0: while recv_cnt > 0:
m = fpm.wait_for_msg().as_builder()
if cfg.allow_no_response:
try:
m = fpm.wait_for_msg(TIMEOUT_LAIKAD_RESPONSE).as_builder()
except:
recv_cnt = 0
continue
else:
m = fpm.wait_for_msg().as_builder()
m.logMonoTime = msg.logMonoTime m.logMonoTime = msg.logMonoTime
m = m.as_reader() m = m.as_reader()

Loading…
Cancel
Save