process_replay: drain sockets instead recv_one (#28161)

* Drain the sockets until the next cycle in python process replay

* Add waiting mechanism to native process replay, which mimics the syncing

* Sort logs when comparing them. Drain all the sockets instead of only recv_socks.

* Reverse the retrieval order of FakeSocket

* Change pub order for torqued

* Update ref commit
old-commit-hash: 12b3ffcc49
beeps
Kacper Rączy 2 years ago committed by GitHub
parent 4e1ff2b4b9
commit dd70d2bdf3
  1. 1
      selfdrive/test/process_replay/compare_logs.py
  2. 121
      selfdrive/test/process_replay/process_replay.py
  3. 2
      selfdrive/test/process_replay/ref_commit

@ -83,7 +83,6 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non
diff = []
for msg1, msg2 in zip(log1, log2):
if msg1.which() != msg2.which():
print(msg1.which(), msg2.which())
raise Exception("msgs not aligned between logs")
msg1 = remove_ignored_fields(msg1, ignore_fields)

@ -5,6 +5,7 @@ import sys
import threading
import time
import signal
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
@ -43,6 +44,8 @@ class ProcessConfig:
subtest_name: str = ""
field_tolerances: Dict[str, float] = field(default_factory=dict)
timeout: int = 30
iter_wait_time: float = 0.0
drain_sockets: bool = False
def wait_for_event(evt):
@ -70,7 +73,7 @@ class FakeSocket:
self.recv_called.set()
wait_for_event(self.recv_ready)
self.recv_ready.clear()
return self.data.pop()
return self.data.pop(0)
def send(self, data):
if self.wait:
@ -137,35 +140,24 @@ class FakeSubMaster(messaging.SubMaster):
class FakePubMaster(messaging.PubMaster):
def __init__(self, services): # pylint: disable=super-init-not-called
self.data = {}
self.data = defaultdict(list)
self.sock = {}
self.last_updated = None
for s in services:
try:
data = messaging.new_message(s)
except capnp.lib.capnp.KjException:
data = messaging.new_message(s, 0)
self.data[s] = data.as_reader()
self.sock[s] = DumbSocket()
self.send_called = threading.Event()
self.get_called = threading.Event()
def send(self, s, dat):
self.last_updated = s
if isinstance(dat, bytes):
self.data[s] = log.Event.from_bytes(dat)
self.data[s].append(log.Event.from_bytes(dat))
else:
self.data[s] = dat.as_reader()
self.send_called.set()
wait_for_event(self.get_called)
self.get_called.clear()
self.data[s].append(dat.as_reader())
def drain(self, s):
msgs = self.data[s]
self.data[s] = []
def wait_for_msg(self):
wait_for_event(self.send_called)
self.send_called.clear()
dat = self.data[self.last_updated]
self.get_called.set()
return dat
return msgs
def fingerprint(msgs, fsm, can_sock, fingerprint):
@ -252,14 +244,12 @@ def torqued_rcv_callback(msg, CP, cfg, fsm):
return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman'
def ublox_rcv_callback(msg, CP, cfg, fsm):
msg_class, msg_id = msg.ubloxRaw[2:4]
if (msg_class, msg_id) in {(1, 7 * 16)}:
return ["gpsLocationExternal"], True
elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}:
return ["ubloxGnss"], True
else:
return [], False
def locationd_rcv_callback(msg, CP, cfg, fsm):
trigger_msg = "accelerometer" if CP is not None and CP.notCar else "cameraOdometry"
if msg.which() == trigger_msg:
return ["liveLocationKalman"], True
return [], False
CONFIGS = [
@ -286,7 +276,7 @@ CONFIGS = [
proc_name="radard",
pub_sub={
"can": ["radarState", "liveTracks"],
"liveParameters": [], "carState": [], "modelV2": [],
"carState": [], "modelV2": [],
},
ignore=["logMonoTime", "valid", "radarState.cumLagMs"],
init_callback=get_car_params,
@ -309,8 +299,8 @@ CONFIGS = [
ProcessConfig(
proc_name="calibrationd",
pub_sub={
"carState": ["liveCalibration"],
"cameraOdometry": [],
"carState": [],
"cameraOdometry": ["liveCalibration"],
"carParams": [],
},
ignore=["logMonoTime", "valid"],
@ -335,12 +325,13 @@ CONFIGS = [
proc_name="locationd",
pub_sub={
"cameraOdometry": ["liveLocationKalman"],
"accelerometer": [], "gyroscope": [],
"gpsLocationExternal": [], "liveCalibration": [], "carState": [], "gpsLocation": [],
"accelerometer": ["liveLocationKalman"], "gyroscope": [],
"gpsLocationExternal": [], "liveCalibration": [],
"carParams": [], "carState": [], "gpsLocation": [],
},
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
should_recv_callback=locationd_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=False,
),
@ -363,9 +354,11 @@ CONFIGS = [
},
ignore=["logMonoTime"],
init_callback=None,
should_recv_callback=ublox_rcv_callback,
should_recv_callback=None,
tolerance=None,
fake_pubsubmaster=False,
iter_wait_time=0.01,
drain_sockets=True
),
ProcessConfig(
proc_name="laikad",
@ -383,8 +376,8 @@ CONFIGS = [
ProcessConfig(
proc_name="torqued",
pub_sub={
"carControl": [], "carState": [],
"liveLocationKalman": ["liveTorqueParameters"],
"carState": [], "controlsState": [],
},
ignore=["logMonoTime"],
init_callback=get_car_params,
@ -463,6 +456,7 @@ def python_replay_process(cfg, lr, fingerprint=None):
fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config)
fpm = FakePubMaster(sub_sockets)
can_sock = None
args = (fsm, fpm)
if 'can' in list(cfg.pub_sub.keys()):
can_sock = FakeSocket()
@ -514,12 +508,13 @@ def python_replay_process(cfg, lr, fingerprint=None):
log_msgs, msg_queue = [], []
for msg in pub_msgs:
recv_socks = cfg.pub_sub[msg.which()]
if cfg.should_recv_callback is not None:
recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm)
_, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm)
else:
recv_socks = [s for s in cfg.pub_sub[msg.which()] if
(fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0]
should_recv = bool(len(recv_socks))
socks = [s for s in cfg.pub_sub[msg.which()] if
(fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0]
should_recv = bool(len(socks))
if msg.which() == 'can':
can_sock.send(msg.as_builder().to_bytes())
@ -530,14 +525,18 @@ def python_replay_process(cfg, lr, fingerprint=None):
fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue)
msg_queue = []
recv_cnt = len(recv_socks)
while recv_cnt > 0:
m = fpm.wait_for_msg().as_builder()
m.logMonoTime = msg.logMonoTime
m = m.as_reader()
if can_sock is not None:
can_sock.recv_called.wait()
else:
fsm.update_called.wait()
for s in recv_socks:
ms = fpm.drain(s)
for m in ms:
m = m.as_builder()
m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader())
log_msgs.append(m)
recv_cnt -= m.which() in recv_socks
return log_msgs
@ -550,7 +549,8 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
# We need to fake SubMaster alive since we can't inject a fake clock
setup_env(simulation=True, cfg=cfg, lr=lr)
CP = [m for m in lr if m.which() == 'carParams'][0].carParams
setup_env(simulation=True, CP=CP, cfg=cfg, lr=lr)
if cfg.proc_name == "laikad":
ublox = Params().get_bool("UbloxAvailable")
@ -560,6 +560,10 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
managed_processes[cfg.proc_name].prepare()
managed_processes[cfg.proc_name].start()
if cfg.init_callback is not None:
cfg.init_callback(all_msgs, None, None, fingerprint)
CP = car.CarParams.from_bytes(Params().get("CarParams", block=True))
log_msgs = []
try:
# Wait for process to startup
@ -572,11 +576,15 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
# Do the replay
cnt = 0
curr_CP = None
for msg in pub_msgs:
with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"):
if msg.which() == 'carParams':
curr_CP = msg.carParams
resp_sockets = cfg.pub_sub[msg.which()]
if cfg.should_recv_callback is not None:
resp_sockets, _ = cfg.should_recv_callback(msg, None, None, None)
resp_sockets, _ = cfg.should_recv_callback(msg, curr_CP, cfg, None)
# Make sure all subscribers are connected
if len(log_msgs) == 0 and len(resp_sockets) > 0:
@ -587,11 +595,18 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
while not pm.all_readers_updated(msg.which()):
time.sleep(0)
time.sleep(cfg.iter_wait_time)
for s in resp_sockets:
m = messaging.recv_one_retry(sockets[s])
m = m.as_builder()
m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader())
msgs = []
if cfg.drain_sockets:
msgs.extend(messaging.drain_sock(sockets[s], wait_for_one=True))
else:
msgs = [messaging.recv_one_retry(sockets[s])]
for m in msgs:
m = m.as_builder()
m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader())
cnt += 1
finally:
managed_processes[cfg.proc_name].signal(signal.SIGKILL)

@ -1 +1 @@
c67a0959201829ef07a7d68d5fb59603b3983587
89ead2916988a97a938f5e2e10855c37fcbf5d65

Loading…
Cancel
Save