process replay: support running in parallel (#24534)

* prefix params

* set env

* prefix in manager

* filesystem except

* dont delete manager folder

* Update selfdrive/common/params.h

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* debug same path

* remove cleanup + same default

* dont use filesystem lib

* param symlink path

* prefix helpers

* path

* dont delete d

* parallel loop

* refactor loop

* msgq

* clean msgs

* spelling

* nestable pool

* spelling

* logreaders in parallel

* bugfix

* assert msgq

* Update selfdrive/test/process_replay/test_processes.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* Update selfdrive/test/process_replay/test_processes.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* assert in cereal

* folder exists

* create dirs in test

* bump cereal

* Update selfdrive/test/process_replay/process_replay.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* PR feedback

* params path in basedir

* ref commit

* param path

* bugfix

* upload_only

* param path

* Update selfdrive/test/process_replay/process_replay.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* msgq path name

* python concurrency.features

* progress bar

* remove progress bar from compare logs

* Update selfdrive/test/process_replay/test_processes.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* Update selfdrive/test/process_replay/test_processes.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* defaultdict

* context manager

* update refs

* dont get logs if upload only

* upload refs in parallel

* cleanup

* Update selfdrive/test/process_replay/test_processes.py

Co-authored-by: Shane Smiskol <shane@smiskol.com>

* cleanup

* text

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
Co-authored-by: Shane Smiskol <shane@smiskol.com>
old-commit-hash: 444d265821
taco
Lukas Petersson 3 years ago committed by GitHub
parent 077a70d80d
commit d1e21cef79
  1. 11
      selfdrive/test/process_replay/compare_logs.py
  2. 38
      selfdrive/test/process_replay/process_replay.py
  3. 81
      selfdrive/test/process_replay/test_processes.py

@ -1,25 +1,18 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import bz2 import bz2
import os
import sys import sys
import math import math
import numbers import numbers
import dictdiffer import dictdiffer
from collections import Counter from collections import Counter
if "CI" in os.environ:
def tqdm(x):
return x
else:
from tqdm import tqdm # type: ignore
from tools.lib.logreader import LogReader from tools.lib.logreader import LogReader
EPSILON = sys.float_info.epsilon EPSILON = sys.float_info.epsilon
def save_log(dest, log_msgs, compress=True): def save_log(dest, log_msgs, compress=True):
dat = b"".join(msg.as_builder().to_bytes() for msg in tqdm(log_msgs)) dat = b"".join(msg.as_builder().to_bytes() for msg in log_msgs)
if compress: if compress:
dat = bz2.compress(dat) dat = bz2.compress(dat)
@ -67,7 +60,7 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non
raise Exception(f"logs are not same length: {len(log1)} VS {len(log2)}\n\t\t{cnt1}\n\t\t{cnt2}") raise Exception(f"logs are not same length: {len(log1)} VS {len(log2)}\n\t\t{cnt1}\n\t\t{cnt2}")
diff = [] diff = []
for msg1, msg2 in tqdm(zip(log1, log2)): for msg1, msg2 in zip(log1, log2):
if msg1.which() != msg2.which(): if msg1.which() != msg2.which():
print(msg1, msg2) print(msg1, msg2)
raise Exception("msgs not aligned between logs") raise Exception("msgs not aligned between logs")

@ -4,11 +4,12 @@ import os
import sys import sys
import threading import threading
import time import time
import shutil
import signal import signal
import uuid
from collections import namedtuple from collections import namedtuple
import capnp import capnp
from tqdm import tqdm
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import car, log from cereal import car, log
@ -336,12 +337,35 @@ CONFIGS = [
), ),
] ]
def setup_prefix():
os.environ['OPENPILOT_PREFIX'] = str(uuid.uuid4())
msgq_path = os.path.join('/dev/shm', os.environ['OPENPILOT_PREFIX'])
try:
os.mkdir(msgq_path)
except FileExistsError:
pass
def teardown_prefix():
if not os.environ.get("OPENPILOT_PREFIX", 0):
return
symlink_path = Params().get_param_path()
if os.path.exists(symlink_path):
shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
os.remove(symlink_path)
msgq_path = os.path.join('/dev/shm', os.environ['OPENPILOT_PREFIX'])
shutil.rmtree(msgq_path, ignore_errors=True)
def replay_process(cfg, lr, fingerprint=None): def replay_process(cfg, lr, fingerprint=None):
if cfg.fake_pubsubmaster: setup_prefix()
return python_replay_process(cfg, lr, fingerprint) try:
else: if cfg.fake_pubsubmaster:
return cpp_replay_process(cfg, lr, fingerprint) return python_replay_process(cfg, lr, fingerprint)
else:
return cpp_replay_process(cfg, lr, fingerprint)
finally:
teardown_prefix()
def setup_env(simulation=False): def setup_env(simulation=False):
params = Params() params = Params()
@ -421,7 +445,7 @@ def python_replay_process(cfg, lr, fingerprint=None):
fsm.wait_for_update() fsm.wait_for_update()
log_msgs, msg_queue = [], [] log_msgs, msg_queue = [], []
for msg in tqdm(pub_msgs, disable=CI): for msg in pub_msgs:
if cfg.should_recv_callback is not None: if cfg.should_recv_callback is not None:
recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm) recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm)
else: else:
@ -473,7 +497,7 @@ def cpp_replay_process(cfg, lr, fingerprint=None):
for s in sub_sockets: for s in sub_sockets:
messaging.recv_one_or_none(sockets[s]) messaging.recv_one_or_none(sockets[s])
for i, msg in enumerate(tqdm(pub_msgs, disable=False)): for i, msg in enumerate(pub_msgs):
pm.send(msg.which(), msg.as_builder()) pm.send(msg.which(), msg.as_builder())
resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg)

@ -1,7 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import concurrent.futures
import os import os
import sys import sys
from collections import defaultdict
from tqdm import tqdm
from typing import Any, Dict from typing import Any, Dict
from selfdrive.car.car_helpers import interface_names from selfdrive.car.car_helpers import interface_names
@ -11,7 +14,6 @@ from selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DI
from selfdrive.version import get_commit from selfdrive.version import get_commit
from tools.lib.logreader import LogReader from tools.lib.logreader import LogReader
original_segments = [ original_segments = [
("BODY", "bd6a637565e91581|2022-04-04--22-05-08--0"), # COMMA.BODY ("BODY", "bd6a637565e91581|2022-04-04--22-05-08--0"), # COMMA.BODY
("HYUNDAI", "02c45f73a2e5c6e9|2021-01-01--19-08-22--1"), # HYUNDAI.SONATA ("HYUNDAI", "02c45f73a2e5c6e9|2021-01-01--19-08-22--1"), # HYUNDAI.SONATA
@ -54,6 +56,28 @@ BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/"
REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit")
def run_test_process(data):
segment, cfg, args, cur_log_fn, lr, ref_commit = data
res = None
if not args.upload_only:
ref_log_fn = os.path.join(PROC_REPLAY_DIR, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2")
res, log_msgs = test_process(cfg, lr, ref_log_fn, args.ignore_fields, args.ignore_msgs)
# save logs so we can upload when updating refs
save_log(cur_log_fn, log_msgs)
if args.update_refs or args.upload_only:
print(f'Uploading: {os.path.basename(cur_log_fn)}')
assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}"
upload_file(cur_log_fn, os.path.basename(cur_log_fn))
os.remove(cur_log_fn)
return (segment, cfg.proc_name, res)
def get_logreader(segment):
r, n = segment.rsplit("--", 1)
lr = LogReader(get_url(r, n))
return (segment, lr)
def test_process(cfg, lr, ref_log_fn, ignore_fields=None, ignore_msgs=None): def test_process(cfg, lr, ref_log_fn, ignore_fields=None, ignore_msgs=None):
if ignore_fields is None: if ignore_fields is None:
ignore_fields = [] ignore_fields = []
@ -127,6 +151,7 @@ if __name__ == "__main__":
help="Updates reference logs using current commit") help="Updates reference logs using current commit")
parser.add_argument("--upload-only", action="store_true", parser.add_argument("--upload-only", action="store_true",
help="Skips testing processes and uploads logs from previous test run") help="Skips testing processes and uploads logs from previous test run")
parser.add_argument("-j", "--jobs", type=int, default=1)
args = parser.parse_args() args = parser.parse_args()
full_test = all(len(x) == 0 for x in (args.whitelist_procs, args.whitelist_cars, args.blacklist_procs, args.blacklist_cars, args.ignore_fields, args.ignore_msgs)) full_test = all(len(x) == 0 for x in (args.whitelist_procs, args.whitelist_cars, args.blacklist_procs, args.blacklist_cars, args.ignore_fields, args.ignore_msgs))
@ -154,37 +179,31 @@ if __name__ == "__main__":
untested = (set(interface_names) - set(excluded_interfaces)) - tested_cars untested = (set(interface_names) - set(excluded_interfaces)) - tested_cars
assert len(untested) == 0, f"Cars missing routes: {str(untested)}" assert len(untested) == 0, f"Cars missing routes: {str(untested)}"
results: Any = {} with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool:
for car_brand, segment in segments: if not args.upload_only:
if (len(args.whitelist_cars) and car_brand.upper() not in args.whitelist_cars) or \ lreaders: Any = {}
(not len(args.whitelist_cars) and car_brand.upper() in args.blacklist_cars): p1 = pool.map(get_logreader, [seg for car, seg in segments])
continue for (segment, lr) in tqdm(p1, desc="Getting Logs", total=len(segments)):
lreaders[segment] = lr
print(f"***** testing route segment {segment} *****\n")
pool_args: Any = []
results[segment] = {} for car_brand, segment in segments:
if (len(args.whitelist_cars) and car_brand.upper() not in args.whitelist_cars) or \
r, n = segment.rsplit("--", 1) (not len(args.whitelist_cars) and car_brand.upper() in args.blacklist_cars):
lr = LogReader(get_url(r, n))
for cfg in CONFIGS:
if (len(args.whitelist_procs) and cfg.proc_name not in args.whitelist_procs) or \
(not len(args.whitelist_procs) and cfg.proc_name in args.blacklist_procs):
continue continue
for cfg in CONFIGS:
cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2") if (len(args.whitelist_procs) and cfg.proc_name not in args.whitelist_procs) or \
if not args.upload_only: (not len(args.whitelist_procs) and cfg.proc_name in args.blacklist_procs):
ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2") continue
results[segment][cfg.proc_name], log_msgs = test_process(cfg, lr, ref_log_fn, args.ignore_fields, args.ignore_msgs) cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2")
lr = None if args.upload_only else lreaders[segment]
# save logs so we can upload when updating refs pool_args.append((segment, cfg, args, cur_log_fn, lr, ref_commit))
save_log(cur_log_fn, log_msgs)
results: Any = defaultdict(dict)
if upload: p2 = pool.map(run_test_process, pool_args)
print(f'Uploading: {os.path.basename(cur_log_fn)}') for (segment, proc, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)):
assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" if isinstance(result, list):
upload_file(cur_log_fn, os.path.basename(cur_log_fn)) results[segment][proc] = result
os.remove(cur_log_fn)
diff1, diff2, failed = format_diff(results, ref_commit) diff1, diff2, failed = format_diff(results, ref_commit)
if not args.upload_only: if not args.upload_only:

Loading…
Cancel
Save