diff --git a/selfdrive/test/process_replay/compare_logs.py b/selfdrive/test/process_replay/compare_logs.py index f1ca69987c..057e46cd9c 100755 --- a/selfdrive/test/process_replay/compare_logs.py +++ b/selfdrive/test/process_replay/compare_logs.py @@ -1,25 +1,18 @@ #!/usr/bin/env python3 import bz2 -import os import sys import math import numbers import dictdiffer from collections import Counter -if "CI" in os.environ: - def tqdm(x): - return x -else: - from tqdm import tqdm # type: ignore - from tools.lib.logreader import LogReader EPSILON = sys.float_info.epsilon def save_log(dest, log_msgs, compress=True): - dat = b"".join(msg.as_builder().to_bytes() for msg in tqdm(log_msgs)) + dat = b"".join(msg.as_builder().to_bytes() for msg in log_msgs) if compress: dat = bz2.compress(dat) @@ -67,7 +60,7 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non raise Exception(f"logs are not same length: {len(log1)} VS {len(log2)}\n\t\t{cnt1}\n\t\t{cnt2}") diff = [] - for msg1, msg2 in tqdm(zip(log1, log2)): + for msg1, msg2 in zip(log1, log2): if msg1.which() != msg2.which(): print(msg1, msg2) raise Exception("msgs not aligned between logs") diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 03d882591f..15086412a3 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -4,11 +4,12 @@ import os import sys import threading import time +import shutil import signal +import uuid from collections import namedtuple import capnp -from tqdm import tqdm import cereal.messaging as messaging from cereal import car, log @@ -336,12 +337,35 @@ CONFIGS = [ ), ] +def setup_prefix(): + os.environ['OPENPILOT_PREFIX'] = str(uuid.uuid4()) + msgq_path = os.path.join('/dev/shm', os.environ['OPENPILOT_PREFIX']) + try: + os.mkdir(msgq_path) + except FileExistsError: + pass + + +def teardown_prefix(): + if not os.environ.get("OPENPILOT_PREFIX", 0): + return + symlink_path = Params().get_param_path() + if os.path.exists(symlink_path): + shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True) + os.remove(symlink_path) + msgq_path = os.path.join('/dev/shm', os.environ['OPENPILOT_PREFIX']) + shutil.rmtree(msgq_path, ignore_errors=True) + def replay_process(cfg, lr, fingerprint=None): - if cfg.fake_pubsubmaster: - return python_replay_process(cfg, lr, fingerprint) - else: - return cpp_replay_process(cfg, lr, fingerprint) + setup_prefix() + try: + if cfg.fake_pubsubmaster: + return python_replay_process(cfg, lr, fingerprint) + else: + return cpp_replay_process(cfg, lr, fingerprint) + finally: + teardown_prefix() def setup_env(simulation=False): params = Params() @@ -421,7 +445,7 @@ def python_replay_process(cfg, lr, fingerprint=None): fsm.wait_for_update() log_msgs, msg_queue = [], [] - for msg in tqdm(pub_msgs, disable=CI): + for msg in pub_msgs: if cfg.should_recv_callback is not None: recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm) else: @@ -473,7 +497,7 @@ def cpp_replay_process(cfg, lr, fingerprint=None): for s in sub_sockets: messaging.recv_one_or_none(sockets[s]) - for i, msg in enumerate(tqdm(pub_msgs, disable=False)): + for i, msg in enumerate(pub_msgs): pm.send(msg.which(), msg.as_builder()) resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) diff --git a/selfdrive/test/process_replay/test_processes.py b/selfdrive/test/process_replay/test_processes.py index ec94cc333b..e6dcf10e48 100755 --- a/selfdrive/test/process_replay/test_processes.py +++ b/selfdrive/test/process_replay/test_processes.py @@ -1,7 +1,10 @@ #!/usr/bin/env python3 import argparse +import concurrent.futures import os import sys +from collections import defaultdict +from tqdm import tqdm from typing import Any, Dict from selfdrive.car.car_helpers import interface_names @@ -11,7 +14,6 @@ from selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DI from selfdrive.version import get_commit from tools.lib.logreader import LogReader - original_segments = [ ("BODY", "bd6a637565e91581|2022-04-04--22-05-08--0"), # COMMA.BODY ("HYUNDAI", "02c45f73a2e5c6e9|2021-01-01--19-08-22--1"), # HYUNDAI.SONATA @@ -54,6 +56,28 @@ BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") +def run_test_process(data): + segment, cfg, args, cur_log_fn, lr, ref_commit = data + res = None + if not args.upload_only: + ref_log_fn = os.path.join(PROC_REPLAY_DIR, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2") + res, log_msgs = test_process(cfg, lr, ref_log_fn, args.ignore_fields, args.ignore_msgs) + # save logs so we can upload when updating refs + save_log(cur_log_fn, log_msgs) + if args.update_refs or args.upload_only: + print(f'Uploading: {os.path.basename(cur_log_fn)}') + assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" + upload_file(cur_log_fn, os.path.basename(cur_log_fn)) + os.remove(cur_log_fn) + return (segment, cfg.proc_name, res) + + +def get_logreader(segment): + r, n = segment.rsplit("--", 1) + lr = LogReader(get_url(r, n)) + return (segment, lr) + + def test_process(cfg, lr, ref_log_fn, ignore_fields=None, ignore_msgs=None): if ignore_fields is None: ignore_fields = [] @@ -127,6 +151,7 @@ if __name__ == "__main__": help="Updates reference logs using current commit") parser.add_argument("--upload-only", action="store_true", help="Skips testing processes and uploads logs from previous test run") + parser.add_argument("-j", "--jobs", type=int, default=1) args = parser.parse_args() full_test = all(len(x) == 0 for x in (args.whitelist_procs, args.whitelist_cars, args.blacklist_procs, args.blacklist_cars, args.ignore_fields, args.ignore_msgs)) @@ -154,37 +179,31 @@ if __name__ == "__main__": untested = (set(interface_names) - set(excluded_interfaces)) - tested_cars assert len(untested) == 0, f"Cars missing routes: {str(untested)}" - results: Any = {} - for car_brand, segment in segments: - if (len(args.whitelist_cars) and car_brand.upper() not in args.whitelist_cars) or \ - (not len(args.whitelist_cars) and car_brand.upper() in args.blacklist_cars): - continue - - print(f"***** testing route segment {segment} *****\n") - - results[segment] = {} - - r, n = segment.rsplit("--", 1) - lr = LogReader(get_url(r, n)) - - for cfg in CONFIGS: - if (len(args.whitelist_procs) and cfg.proc_name not in args.whitelist_procs) or \ - (not len(args.whitelist_procs) and cfg.proc_name in args.blacklist_procs): + with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool: + if not args.upload_only: + lreaders: Any = {} + p1 = pool.map(get_logreader, [seg for car, seg in segments]) + for (segment, lr) in tqdm(p1, desc="Getting Logs", total=len(segments)): + lreaders[segment] = lr + + pool_args: Any = [] + for car_brand, segment in segments: + if (len(args.whitelist_cars) and car_brand.upper() not in args.whitelist_cars) or \ + (not len(args.whitelist_cars) and car_brand.upper() in args.blacklist_cars): continue - - cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2") - if not args.upload_only: - ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2") - results[segment][cfg.proc_name], log_msgs = test_process(cfg, lr, ref_log_fn, args.ignore_fields, args.ignore_msgs) - - # save logs so we can upload when updating refs - save_log(cur_log_fn, log_msgs) - - if upload: - print(f'Uploading: {os.path.basename(cur_log_fn)}') - assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" - upload_file(cur_log_fn, os.path.basename(cur_log_fn)) - os.remove(cur_log_fn) + for cfg in CONFIGS: + if (len(args.whitelist_procs) and cfg.proc_name not in args.whitelist_procs) or \ + (not len(args.whitelist_procs) and cfg.proc_name in args.blacklist_procs): + continue + cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2") + lr = None if args.upload_only else lreaders[segment] + pool_args.append((segment, cfg, args, cur_log_fn, lr, ref_commit)) + + results: Any = defaultdict(dict) + p2 = pool.map(run_test_process, pool_args) + for (segment, proc, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)): + if isinstance(result, list): + results[segment][proc] = result diff1, diff2, failed = format_diff(results, ref_commit) if not args.upload_only: