From 4171e45e9ca76892fd14dff9f61e8ac0231d397d Mon Sep 17 00:00:00 2001 From: Lukas Petersson Date: Mon, 6 Jun 2022 23:21:12 +0200 Subject: [PATCH] process replay: regen in parallel (#24628) * regen in parallel * prefixes * clean regen * clean output * tqdm loc * del swp file * add routes back * cleanup * disable tqdm * unique dirs * unique dirs * outdir in regen_all * formatting when played from other dirs * prefix dongle id * local disable_tqdm * formatting * bug fix * dont spam fakedata * 16 char fake dongle ids * formatting * formatting * more descriptive dongle * fix azure path * couple more fixes * handle failures nicely Co-authored-by: Adeeb Shihadeh old-commit-hash: 397da56c85d4d4932d25435dcb1d57f2cd2d8c4b --- selfdrive/test/process_replay/helpers.py | 2 +- selfdrive/test/process_replay/regen.py | 23 ++++++----- selfdrive/test/process_replay/regen_all.py | 46 ++++++++++++++-------- selfdrive/test/update_ci_routes.py | 10 +++-- 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/selfdrive/test/process_replay/helpers.py b/selfdrive/test/process_replay/helpers.py index b650ecb69a..8571f36c36 100644 --- a/selfdrive/test/process_replay/helpers.py +++ b/selfdrive/test/process_replay/helpers.py @@ -23,4 +23,4 @@ class OpenpilotPrefix(object): os.remove(symlink_path) shutil.rmtree(self.msgq_path, ignore_errors=True) del os.environ['OPENPILOT_PREFIX'] - return True + return False diff --git a/selfdrive/test/process_replay/regen.py b/selfdrive/test/process_replay/regen.py index bcc91b3ab6..653efaf32c 100755 --- a/selfdrive/test/process_replay/regen.py +++ b/selfdrive/test/process_replay/regen.py @@ -3,8 +3,8 @@ import bz2 import os import time import multiprocessing -from tqdm import tqdm import argparse +from tqdm import tqdm # run DM procs os.environ["USE_WEBCAM"] = "1" @@ -24,7 +24,6 @@ from tools.lib.route import Route from tools.lib.framereader import FrameReader from tools.lib.logreader import LogReader - def replay_panda_states(s, msgs): pm = messaging.PubMaster([s, 'peripheralState']) rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None) @@ -118,7 +117,7 @@ def replay_service(s, msgs): rk.keep_time() -def replay_cameras(lr, frs): +def replay_cameras(lr, frs, disable_tqdm=False): eon_cameras = [ ("roadCameraState", DT_MDL, eon_f_frame_size, VisionStreamType.VISION_STREAM_ROAD, True), ("driverCameraState", DT_DMON, eon_d_frame_size, VisionStreamType.VISION_STREAM_DRIVER, False), @@ -163,7 +162,7 @@ def replay_cameras(lr, frs): if fr is not None: print(f"Decompressing frames {s}") frames = [] - for i in tqdm(range(fr.frame_count)): + for i in tqdm(range(fr.frame_count), disable=disable_tqdm): img = fr.get(i, pix_fmt='nv12')[0] frames.append(img.flatten().tobytes()) @@ -177,7 +176,7 @@ def replay_cameras(lr, frs): return vs, p -def regen_segment(lr, frs=None, outdir=FAKEDATA): +def regen_segment(lr, frs=None, outdir=FAKEDATA, disable_tqdm=False): lr = list(lr) if frs is None: frs = dict() @@ -207,7 +206,7 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA): elif msg.which() == 'liveCalibration': params.put("CalibrationParams", msg.as_builder().to_bytes()) - vs, cam_procs = replay_cameras(lr, frs) + vs, cam_procs = replay_cameras(lr, frs, disable_tqdm=disable_tqdm) fake_daemons = { 'sensord': [ @@ -219,7 +218,7 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA): multiprocessing.Process(target=replay_panda_states, args=('pandaStates', lr)), ], 'managerState': [ - multiprocessing.Process(target=replay_manager_state, args=('managerState', lr)), + multiprocessing.Process(target=replay_manager_state, args=('managerState', lr)), ], 'thermald': [ multiprocessing.Process(target=replay_device_state, args=('deviceState', lr)), @@ -236,13 +235,13 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA): time.sleep(5) # start procs up - ignore = list(fake_daemons.keys()) + ['ui', 'manage_athenad', 'uploader'] + ignore = list(fake_daemons.keys()) + ['ui', 'manage_athenad', 'uploader', 'soundd'] ensure_running(managed_processes.values(), started=True, params=Params(), CP=car.CarParams(), not_run=ignore) for procs in fake_daemons.values(): for p in procs: p.start() - for _ in tqdm(range(60)): + for _ in tqdm(range(60), disable=disable_tqdm): # ensure all procs are running for d, procs in fake_daemons.items(): for p in procs: @@ -268,7 +267,7 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA): return seg_path -def regen_and_save(route, sidx, upload=False, use_route_meta=False): +def regen_and_save(route, sidx, upload=False, use_route_meta=False, outdir=FAKEDATA, disable_tqdm=False): if use_route_meta: r = Route(args.route) lr = LogReader(r.log_paths()[args.seg]) @@ -276,7 +275,7 @@ def regen_and_save(route, sidx, upload=False, use_route_meta=False): else: lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog.bz2") fr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc") - rpath = regen_segment(lr, {'roadCameraState': fr}) + rpath = regen_segment(lr, {'roadCameraState': fr}, outdir=outdir, disable_tqdm=disable_tqdm) # compress raw rlog before uploading with open(os.path.join(rpath, "rlog"), "rb") as f: @@ -294,7 +293,7 @@ def regen_and_save(route, sidx, upload=False, use_route_meta=False): print("\n\n", "*"*30, "\n\n") print("New route:", relr, "\n") if upload: - upload_route(relr) + upload_route(relr, exclude_patterns=['*.hevc', ]) return relr diff --git a/selfdrive/test/process_replay/regen_all.py b/selfdrive/test/process_replay/regen_all.py index 4f057bbf50..765e5c3b68 100755 --- a/selfdrive/test/process_replay/regen_all.py +++ b/selfdrive/test/process_replay/regen_all.py @@ -1,22 +1,36 @@ #!/usr/bin/env python3 +import argparse +import concurrent.futures +import os +import random +from tqdm import tqdm + +from selfdrive.test.process_replay.helpers import OpenpilotPrefix from selfdrive.test.process_replay.regen import regen_and_save -from selfdrive.test.process_replay.test_processes import original_segments as segments +from selfdrive.test.process_replay.test_processes import FAKEDATA, original_segments as segments -if __name__ == "__main__": - new_segments = [] - for segment in segments: +def regen_job(segment): + with OpenpilotPrefix(): route = segment[1].rsplit('--', 1)[0] sidx = int(segment[1].rsplit('--', 1)[1]) - print("Regen", route, sidx) - relr = regen_and_save(route, sidx, upload=True, use_route_meta=False) + fake_dongle_id = 'regen' + ''.join(random.choice('0123456789ABCDEF') for i in range(11)) + try: + relr = regen_and_save(route, sidx, upload=True, use_route_meta=False, outdir=os.path.join(FAKEDATA, fake_dongle_id), disable_tqdm=True) + relr = '|'.join(relr.split('/')[-2:]) + return f' ("{segment[0]}", "{relr}"), ' + except Exception as e: + return f" {segment} failed: {str(e)}" + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Generate new segments from old ones") + parser.add_argument("-j", "--jobs", type=int, default=1) + args = parser.parse_args() - print("\n\n", "*"*30, "\n\n") - print("New route:", relr, "\n") - relr = relr.replace('/', '|') - new_segments.append(f' ("{segment[0]}", "{relr}"), ') - print() - print() - print() - print('COPY THIS INTO test_processes.py') - for seg in new_segments: - print(seg) + with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool: + p = list(pool.map(regen_job, segments)) + msg = "Copy these new segments into test_processes.py:" + for seg in tqdm(p, desc="Generating segments"): + msg += "\n" + str(seg) + print() + print() + print(msg) diff --git a/selfdrive/test/update_ci_routes.py b/selfdrive/test/update_ci_routes.py index b388e599f7..99a63b8dfd 100755 --- a/selfdrive/test/update_ci_routes.py +++ b/selfdrive/test/update_ci_routes.py @@ -18,8 +18,12 @@ DEST_KEY = azureutil.get_user_token(_DATA_ACCOUNT_CI, "openpilotci") SOURCE_KEYS = [azureutil.get_user_token(account, bucket) for account, bucket in SOURCES] SERVICE = BlockBlobService(_DATA_ACCOUNT_CI, sas_token=DEST_KEY) -def upload_route(path): +def upload_route(path, exclude_patterns=None): + if exclude_patterns is None: + exclude_patterns = ['*/dcamera.hevc'] + r, n = path.rsplit("--", 1) + r = '/'.join(r.split('/')[-2:]) # strip out anything extra in the path destpath = f"{r}/{n}" cmd = [ "azcopy", @@ -28,9 +32,7 @@ def upload_route(path): f"https://{_DATA_ACCOUNT_CI}.blob.core.windows.net/openpilotci/{destpath}?{DEST_KEY}", "--recursive=false", "--overwrite=false", - "--exclude-pattern=*/dcamera.hevc", - "--exclude-pattern=*.mkv", - ] + ] + [f"--exclude-pattern={p}" for p in exclude_patterns] subprocess.check_call(cmd) def sync_to_ci_public(route):