process replay: prep for laikad subsock + QCOM GPS (#27632)

* process replay: prep for laikad subsock

* cleanup

* fix for qcomGnss

* detect ublox

* more debug info in err

* wip

* cleanup
pull/24888/head
Adeeb Shihadeh 2 years ago committed by GitHub
parent ee36c106af
commit 7319afbd51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      selfdrive/locationd/laikad.py
  2. 78
      selfdrive/test/process_replay/process_replay.py
  3. 11
      selfdrive/test/process_replay/test_processes.py

@ -420,11 +420,11 @@ def clear_tmp_cache():
os.mkdir(DOWNLOADS_CACHE_FOLDER) os.mkdir(DOWNLOADS_CACHE_FOLDER)
def main(sm=None, pm=None, qc=None): def main(sm=None, pm=None):
#clear_tmp_cache() #clear_tmp_cache()
use_qcom = not Params().get_bool("UbloxAvailable", block=True) use_qcom = not Params().get_bool("UbloxAvailable", block=True)
if use_qcom or (qc is not None and qc): if use_qcom:
raw_gnss_socket = "qcomGnss" raw_gnss_socket = "qcomGnss"
else: else:
raw_gnss_socket = "ubloxGnss" raw_gnss_socket = "ubloxGnss"

@ -238,14 +238,14 @@ def torqued_rcv_callback(msg, CP, cfg, fsm):
return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman' return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman'
def ublox_rcv_callback(msg): def ublox_rcv_callback(msg, CP, cfg, fsm):
msg_class, msg_id = msg.ubloxRaw[2:4] msg_class, msg_id = msg.ubloxRaw[2:4]
if (msg_class, msg_id) in {(1, 7 * 16)}: if (msg_class, msg_id) in {(1, 7 * 16)}:
return ["gpsLocationExternal"] return ["gpsLocationExternal"], True
elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}: elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}:
return ["ubloxGnss"] return ["ubloxGnss"], True
else: else:
return [] return [], False
CONFIGS = [ CONFIGS = [
@ -364,7 +364,7 @@ CONFIGS = [
init_callback=get_car_params, init_callback=get_car_params,
should_recv_callback=None, should_recv_callback=None,
tolerance=NUMPY_TOLERANCE, tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True, fake_pubsubmaster=False,
), ),
ProcessConfig( ProcessConfig(
proc_name="torqued", proc_name="torqued",
@ -386,7 +386,7 @@ def replay_process(cfg, lr, fingerprint=None):
if cfg.fake_pubsubmaster: if cfg.fake_pubsubmaster:
return python_replay_process(cfg, lr, fingerprint) return python_replay_process(cfg, lr, fingerprint)
else: else:
return cpp_replay_process(cfg, lr, fingerprint) return replay_process_with_sockets(cfg, lr, fingerprint)
def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None): def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
@ -401,8 +401,12 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
os.environ["NO_RADAR_SLEEP"] = "1" os.environ["NO_RADAR_SLEEP"] = "1"
os.environ["REPLAY"] = "1" os.environ["REPLAY"] = "1"
os.environ['SKIP_FW_QUERY'] = "" os.environ["SKIP_FW_QUERY"] = ""
os.environ['FINGERPRINT'] = "" os.environ["FINGERPRINT"] = ""
if lr is not None:
services = {m.which() for m in lr}
params.put_bool("UbloxAvailable", "ubloxGnss" in services)
if lr is not None: if lr is not None:
services = {m.which() for m in lr} services = {m.which() for m in lr}
@ -458,12 +462,6 @@ def python_replay_process(cfg, lr, fingerprint=None):
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
# laikad needs decision between submaster ubloxGnss and qcomGnss, prio given to ubloxGnss
if cfg.proc_name == "laikad":
args = (*args, not any(m.which() == "ubloxGnss" for m in pub_msgs))
service = "qcomGnss" if args[2] else "ubloxGnss"
pub_msgs = [m for m in pub_msgs if m.which() == service or m.which() == 'clocks']
controlsState = None controlsState = None
initialized = False initialized = False
for msg in lr: for msg in lr:
@ -534,49 +532,53 @@ def python_replay_process(cfg, lr, fingerprint=None):
return log_msgs return log_msgs
def cpp_replay_process(cfg, lr, fingerprint=None): def replay_process_with_sockets(cfg, lr, fingerprint=None):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub]
pm = messaging.PubMaster(cfg.pub_sub.keys()) pm = messaging.PubMaster(cfg.pub_sub.keys())
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
log_msgs = []
# We need to fake SubMaster alive since we can't inject a fake clock # We need to fake SubMaster alive since we can't inject a fake clock
setup_env(simulation=True, cfg=cfg, lr=lr) setup_env(simulation=True, cfg=cfg, lr=lr)
if cfg.proc_name == "laikad":
ublox = Params().get_bool("UbloxAvailable")
keys = set(cfg.pub_sub.keys()) - ({"qcomGnss", } if ublox else {"ubloxGnss", })
pub_msgs = [msg for msg in pub_msgs if msg.which() in keys]
managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].prepare()
managed_processes[cfg.proc_name].start() managed_processes[cfg.proc_name].start()
log_msgs = []
try: try:
with Timeout(TIMEOUT, error_msg=f"timed out testing process {repr(cfg.proc_name)}"): # Wait for process to startup
with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"):
while not any(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): while not any(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()):
time.sleep(0) time.sleep(0)
# Make sure all subscribers are connected # Make sure all subscribers are connected
sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets}
for s in sub_sockets: for s in sub_sockets:
messaging.recv_one_or_none(sockets[s]) messaging.recv_one_or_none(sockets[s])
for i, msg in enumerate(pub_msgs): # Do the replay
cnt = 0
for msg in pub_msgs:
with Timeout(TIMEOUT, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"):
pm.send(msg.which(), msg.as_builder()) pm.send(msg.which(), msg.as_builder())
while not pm.all_readers_updated(msg.which()):
time.sleep(0)
resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) resp_sockets = cfg.pub_sub[msg.which()]
if cfg.should_recv_callback is not None:
resp_sockets, _ = cfg.should_recv_callback(msg, None, None, None)
for s in resp_sockets: for s in resp_sockets:
response = messaging.recv_one_retry(sockets[s]) m = messaging.recv_one_retry(sockets[s])
m = m.as_builder()
if response is None: m.logMonoTime = msg.logMonoTime
print(f"Warning, no response received {i}") log_msgs.append(m.as_reader())
else: cnt += 1
response = response.as_builder()
response.logMonoTime = msg.logMonoTime
response = response.as_reader()
log_msgs.append(response)
if not len(resp_sockets): # We only need to wait if we didn't already wait for a response
while not pm.all_readers_updated(msg.which()):
time.sleep(0)
finally: finally:
managed_processes[cfg.proc_name].signal(signal.SIGKILL) managed_processes[cfg.proc_name].signal(signal.SIGKILL)
managed_processes[cfg.proc_name].stop() managed_processes[cfg.proc_name].stop()

@ -18,7 +18,7 @@ from tools.lib.logreader import LogReader
source_segments = [ source_segments = [
("BODY", "937ccb7243511b65|2022-05-24--16-03-09--1"), # COMMA.BODY ("BODY", "937ccb7243511b65|2022-05-24--16-03-09--1"), # COMMA.BODY
("HYUNDAI", "02c45f73a2e5c6e9|2021-01-01--19-08-22--1"), # HYUNDAI.SONATA ("HYUNDAI", "02c45f73a2e5c6e9|2021-01-01--19-08-22--1"), # HYUNDAI.SONATA
("HYUNDAI2", "d545129f3ca90f28|2022-11-07--20-43-08--3"), # HYUNDAI.KIA_EV6 ("HYUNDAI2", "d545129f3ca90f28|2022-11-07--20-43-08--3"), # HYUNDAI.KIA_EV6 (+ QCOM GPS)
("TOYOTA", "0982d79ebb0de295|2021-01-04--17-13-21--13"), # TOYOTA.PRIUS (INDI) ("TOYOTA", "0982d79ebb0de295|2021-01-04--17-13-21--13"), # TOYOTA.PRIUS (INDI)
("TOYOTA2", "0982d79ebb0de295|2021-01-03--20-03-36--6"), # TOYOTA.RAV4 (LQR) ("TOYOTA2", "0982d79ebb0de295|2021-01-03--20-03-36--6"), # TOYOTA.RAV4 (LQR)
("TOYOTA3", "f7d7e3538cda1a2a|2021-08-16--08-55-34--6"), # TOYOTA.COROLLA_TSS2 ("TOYOTA3", "f7d7e3538cda1a2a|2021-08-16--08-55-34--6"), # TOYOTA.COROLLA_TSS2
@ -70,7 +70,7 @@ def run_test_process(data):
res = None res = None
if not args.upload_only: if not args.upload_only:
lr = LogReader.from_bytes(lr_dat) lr = LogReader.from_bytes(lr_dat)
res, log_msgs = test_process(cfg, lr, ref_log_path, cur_log_fn, args.ignore_fields, args.ignore_msgs) res, log_msgs = test_process(cfg, lr, segment, ref_log_path, cur_log_fn, args.ignore_fields, args.ignore_msgs)
# save logs so we can upload when updating refs # save logs so we can upload when updating refs
save_log(cur_log_fn, log_msgs) save_log(cur_log_fn, log_msgs)
@ -88,7 +88,7 @@ def get_log_data(segment):
return (segment, f.read()) return (segment, f.read())
def test_process(cfg, lr, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None): def test_process(cfg, lr, segment, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None):
if ignore_fields is None: if ignore_fields is None:
ignore_fields = [] ignore_fields = []
if ignore_msgs is None: if ignore_msgs is None:
@ -96,7 +96,10 @@ def test_process(cfg, lr, ref_log_path, new_log_path, ignore_fields=None, ignore
ref_log_msgs = list(LogReader(ref_log_path)) ref_log_msgs = list(LogReader(ref_log_path))
log_msgs = replay_process(cfg, lr) try:
log_msgs = replay_process(cfg, lr)
except Exception as e:
raise Exception("failed on segment: " + segment) from e
# check to make sure openpilot is engaged in the route # check to make sure openpilot is engaged in the route
if cfg.proc_name == "controlsd": if cfg.proc_name == "controlsd":

Loading…
Cancel
Save