process_replay: drain sockets instead recv_one (#28161)

* Drain the sockets until the next cycle in python process replay

* Add waiting mechanism to native process replay, which mimics the syncing

* Sort logs when comparing them. Drain all the sockets instead of only recv_socks.

* Reverse the retrieval order of FakeSocket

* Change pub order for torqued

* Update ref commit
pull/28274/head
Kacper Rączy 2 years ago committed by GitHub
parent bf8e6d0d29
commit 12b3ffcc49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      selfdrive/test/process_replay/compare_logs.py
  2. 109
      selfdrive/test/process_replay/process_replay.py
  3. 2
      selfdrive/test/process_replay/ref_commit

@ -83,7 +83,6 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non
diff = [] diff = []
for msg1, msg2 in zip(log1, log2): for msg1, msg2 in zip(log1, log2):
if msg1.which() != msg2.which(): if msg1.which() != msg2.which():
print(msg1.which(), msg2.which())
raise Exception("msgs not aligned between logs") raise Exception("msgs not aligned between logs")
msg1 = remove_ignored_fields(msg1, ignore_fields) msg1 = remove_ignored_fields(msg1, ignore_fields)

@ -5,6 +5,7 @@ import sys
import threading import threading
import time import time
import signal import signal
from collections import defaultdict
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable from typing import Dict, List, Optional, Callable
@ -43,6 +44,8 @@ class ProcessConfig:
subtest_name: str = "" subtest_name: str = ""
field_tolerances: Dict[str, float] = field(default_factory=dict) field_tolerances: Dict[str, float] = field(default_factory=dict)
timeout: int = 30 timeout: int = 30
iter_wait_time: float = 0.0
drain_sockets: bool = False
def wait_for_event(evt): def wait_for_event(evt):
@ -70,7 +73,7 @@ class FakeSocket:
self.recv_called.set() self.recv_called.set()
wait_for_event(self.recv_ready) wait_for_event(self.recv_ready)
self.recv_ready.clear() self.recv_ready.clear()
return self.data.pop() return self.data.pop(0)
def send(self, data): def send(self, data):
if self.wait: if self.wait:
@ -137,35 +140,24 @@ class FakeSubMaster(messaging.SubMaster):
class FakePubMaster(messaging.PubMaster): class FakePubMaster(messaging.PubMaster):
def __init__(self, services): # pylint: disable=super-init-not-called def __init__(self, services): # pylint: disable=super-init-not-called
self.data = {} self.data = defaultdict(list)
self.sock = {} self.sock = {}
self.last_updated = None self.last_updated = None
for s in services: for s in services:
try:
data = messaging.new_message(s)
except capnp.lib.capnp.KjException:
data = messaging.new_message(s, 0)
self.data[s] = data.as_reader()
self.sock[s] = DumbSocket() self.sock[s] = DumbSocket()
self.send_called = threading.Event()
self.get_called = threading.Event()
def send(self, s, dat): def send(self, s, dat):
self.last_updated = s self.last_updated = s
if isinstance(dat, bytes): if isinstance(dat, bytes):
self.data[s] = log.Event.from_bytes(dat) self.data[s].append(log.Event.from_bytes(dat))
else: else:
self.data[s] = dat.as_reader() self.data[s].append(dat.as_reader())
self.send_called.set()
wait_for_event(self.get_called) def drain(self, s):
self.get_called.clear() msgs = self.data[s]
self.data[s] = []
def wait_for_msg(self): return msgs
wait_for_event(self.send_called)
self.send_called.clear()
dat = self.data[self.last_updated]
self.get_called.set()
return dat
def fingerprint(msgs, fsm, can_sock, fingerprint): def fingerprint(msgs, fsm, can_sock, fingerprint):
@ -252,13 +244,11 @@ def torqued_rcv_callback(msg, CP, cfg, fsm):
return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman' return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman'
def ublox_rcv_callback(msg, CP, cfg, fsm): def locationd_rcv_callback(msg, CP, cfg, fsm):
msg_class, msg_id = msg.ubloxRaw[2:4] trigger_msg = "accelerometer" if CP is not None and CP.notCar else "cameraOdometry"
if (msg_class, msg_id) in {(1, 7 * 16)}: if msg.which() == trigger_msg:
return ["gpsLocationExternal"], True return ["liveLocationKalman"], True
elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}:
return ["ubloxGnss"], True
else:
return [], False return [], False
@ -286,7 +276,7 @@ CONFIGS = [
proc_name="radard", proc_name="radard",
pub_sub={ pub_sub={
"can": ["radarState", "liveTracks"], "can": ["radarState", "liveTracks"],
"liveParameters": [], "carState": [], "modelV2": [], "carState": [], "modelV2": [],
}, },
ignore=["logMonoTime", "valid", "radarState.cumLagMs"], ignore=["logMonoTime", "valid", "radarState.cumLagMs"],
init_callback=get_car_params, init_callback=get_car_params,
@ -309,8 +299,8 @@ CONFIGS = [
ProcessConfig( ProcessConfig(
proc_name="calibrationd", proc_name="calibrationd",
pub_sub={ pub_sub={
"carState": ["liveCalibration"], "carState": [],
"cameraOdometry": [], "cameraOdometry": ["liveCalibration"],
"carParams": [], "carParams": [],
}, },
ignore=["logMonoTime", "valid"], ignore=["logMonoTime", "valid"],
@ -335,12 +325,13 @@ CONFIGS = [
proc_name="locationd", proc_name="locationd",
pub_sub={ pub_sub={
"cameraOdometry": ["liveLocationKalman"], "cameraOdometry": ["liveLocationKalman"],
"accelerometer": [], "gyroscope": [], "accelerometer": ["liveLocationKalman"], "gyroscope": [],
"gpsLocationExternal": [], "liveCalibration": [], "carState": [], "gpsLocation": [], "gpsLocationExternal": [], "liveCalibration": [],
"carParams": [], "carState": [], "gpsLocation": [],
}, },
ignore=["logMonoTime", "valid"], ignore=["logMonoTime", "valid"],
init_callback=get_car_params, init_callback=get_car_params,
should_recv_callback=None, should_recv_callback=locationd_rcv_callback,
tolerance=NUMPY_TOLERANCE, tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=False, fake_pubsubmaster=False,
), ),
@ -363,9 +354,11 @@ CONFIGS = [
}, },
ignore=["logMonoTime"], ignore=["logMonoTime"],
init_callback=None, init_callback=None,
should_recv_callback=ublox_rcv_callback, should_recv_callback=None,
tolerance=None, tolerance=None,
fake_pubsubmaster=False, fake_pubsubmaster=False,
iter_wait_time=0.01,
drain_sockets=True
), ),
ProcessConfig( ProcessConfig(
proc_name="laikad", proc_name="laikad",
@ -383,8 +376,8 @@ CONFIGS = [
ProcessConfig( ProcessConfig(
proc_name="torqued", proc_name="torqued",
pub_sub={ pub_sub={
"carControl": [], "carState": [],
"liveLocationKalman": ["liveTorqueParameters"], "liveLocationKalman": ["liveTorqueParameters"],
"carState": [], "controlsState": [],
}, },
ignore=["logMonoTime"], ignore=["logMonoTime"],
init_callback=get_car_params, init_callback=get_car_params,
@ -463,6 +456,7 @@ def python_replay_process(cfg, lr, fingerprint=None):
fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config) fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config)
fpm = FakePubMaster(sub_sockets) fpm = FakePubMaster(sub_sockets)
can_sock = None
args = (fsm, fpm) args = (fsm, fpm)
if 'can' in list(cfg.pub_sub.keys()): if 'can' in list(cfg.pub_sub.keys()):
can_sock = FakeSocket() can_sock = FakeSocket()
@ -514,12 +508,13 @@ def python_replay_process(cfg, lr, fingerprint=None):
log_msgs, msg_queue = [], [] log_msgs, msg_queue = [], []
for msg in pub_msgs: for msg in pub_msgs:
recv_socks = cfg.pub_sub[msg.which()]
if cfg.should_recv_callback is not None: if cfg.should_recv_callback is not None:
recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm) _, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm)
else: else:
recv_socks = [s for s in cfg.pub_sub[msg.which()] if socks = [s for s in cfg.pub_sub[msg.which()] if
(fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0] (fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0]
should_recv = bool(len(recv_socks)) should_recv = bool(len(socks))
if msg.which() == 'can': if msg.which() == 'can':
can_sock.send(msg.as_builder().to_bytes()) can_sock.send(msg.as_builder().to_bytes())
@ -530,14 +525,18 @@ def python_replay_process(cfg, lr, fingerprint=None):
fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue) fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue)
msg_queue = [] msg_queue = []
recv_cnt = len(recv_socks) if can_sock is not None:
while recv_cnt > 0: can_sock.recv_called.wait()
m = fpm.wait_for_msg().as_builder() else:
fsm.update_called.wait()
for s in recv_socks:
ms = fpm.drain(s)
for m in ms:
m = m.as_builder()
m.logMonoTime = msg.logMonoTime m.logMonoTime = msg.logMonoTime
m = m.as_reader() log_msgs.append(m.as_reader())
log_msgs.append(m)
recv_cnt -= m.which() in recv_socks
return log_msgs return log_msgs
@ -550,7 +549,8 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
# We need to fake SubMaster alive since we can't inject a fake clock # We need to fake SubMaster alive since we can't inject a fake clock
setup_env(simulation=True, cfg=cfg, lr=lr) CP = [m for m in lr if m.which() == 'carParams'][0].carParams
setup_env(simulation=True, CP=CP, cfg=cfg, lr=lr)
if cfg.proc_name == "laikad": if cfg.proc_name == "laikad":
ublox = Params().get_bool("UbloxAvailable") ublox = Params().get_bool("UbloxAvailable")
@ -560,6 +560,10 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].prepare()
managed_processes[cfg.proc_name].start() managed_processes[cfg.proc_name].start()
if cfg.init_callback is not None:
cfg.init_callback(all_msgs, None, None, fingerprint)
CP = car.CarParams.from_bytes(Params().get("CarParams", block=True))
log_msgs = [] log_msgs = []
try: try:
# Wait for process to startup # Wait for process to startup
@ -572,11 +576,15 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
# Do the replay # Do the replay
cnt = 0 cnt = 0
curr_CP = None
for msg in pub_msgs: for msg in pub_msgs:
with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"): with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"):
if msg.which() == 'carParams':
curr_CP = msg.carParams
resp_sockets = cfg.pub_sub[msg.which()] resp_sockets = cfg.pub_sub[msg.which()]
if cfg.should_recv_callback is not None: if cfg.should_recv_callback is not None:
resp_sockets, _ = cfg.should_recv_callback(msg, None, None, None) resp_sockets, _ = cfg.should_recv_callback(msg, curr_CP, cfg, None)
# Make sure all subscribers are connected # Make sure all subscribers are connected
if len(log_msgs) == 0 and len(resp_sockets) > 0: if len(log_msgs) == 0 and len(resp_sockets) > 0:
@ -587,8 +595,15 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None):
while not pm.all_readers_updated(msg.which()): while not pm.all_readers_updated(msg.which()):
time.sleep(0) time.sleep(0)
time.sleep(cfg.iter_wait_time)
for s in resp_sockets: for s in resp_sockets:
m = messaging.recv_one_retry(sockets[s]) msgs = []
if cfg.drain_sockets:
msgs.extend(messaging.drain_sock(sockets[s], wait_for_one=True))
else:
msgs = [messaging.recv_one_retry(sockets[s])]
for m in msgs:
m = m.as_builder() m = m.as_builder()
m.logMonoTime = msg.logMonoTime m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader()) log_msgs.append(m.as_reader())

@ -1 +1 @@
c67a0959201829ef07a7d68d5fb59603b3983587 89ead2916988a97a938f5e2e10855c37fcbf5d65

Loading…
Cancel
Save