#!/usr/bin/env python3 import os import sys from collections import defaultdict from typing import Any import tempfile from itertools import zip_longest import matplotlib.pyplot as plt import numpy as np from tabulate import tabulate from openpilot.common.git import get_commit from openpilot.system.hardware import PC from openpilot.tools.lib.openpilotci import get_url from openpilot.selfdrive.test.process_replay.compare_logs import compare_logs, format_diff from openpilot.selfdrive.test.process_replay.process_replay import get_process_config, replay_process from openpilot.tools.lib.framereader import FrameReader, NumpyFrameReader from openpilot.tools.lib.logreader import LogReader, save_log from openpilot.tools.lib.github_utils import GithubUtils TEST_ROUTE = "8494c69d3c710e81|000001d4--2648a9a404" SEGMENT = 4 MAX_FRAMES = 100 if PC else 400 NO_MODEL = "NO_MODEL" in os.environ SEND_EXTRA_INPUTS = bool(int(os.getenv("SEND_EXTRA_INPUTS", "0"))) DATA_TOKEN = os.getenv("CI_ARTIFACTS_TOKEN","") API_TOKEN = os.getenv("GITHUB_COMMENTS_TOKEN","") MODEL_REPLAY_BUCKET="model_replay_master" GITHUB = GithubUtils(API_TOKEN, DATA_TOKEN) EXEC_TIMINGS = [ # model, instant max, average max ("modelV2", 0.035, 0.025), ("driverStateV2", 0.02, 0.015), ] def get_log_fn(test_route, ref="master"): return f"{test_route}_model_tici_{ref}.zst" def plot(proposed, master, title, tmp): proposed = list(proposed) master = list(master) fig, ax = plt.subplots() ax.plot(master, label='MASTER') ax.plot(proposed, label='PROPOSED') plt.legend(loc='best') plt.title(title) plt.savefig(f'{tmp}/{title}.png') return (title + '.png', proposed == master) def get_event(logs, event): return (getattr(m, m.which()) for m in filter(lambda m: m.which() == event, logs)) def zl(array, fill): return zip_longest(array, [], fillvalue=fill) def get_idx_if_non_empty(l, idx=None): return l if idx is None else (l[idx] if len(l) > 0 else None) def generate_report(proposed, master, tmp, commit): ModelV2_Plots = zl([ (lambda x: get_idx_if_non_empty(x.velocity.x, 0), "velocity.x"), (lambda x: get_idx_if_non_empty(x.action.desiredCurvature), "desiredCurvature"), (lambda x: get_idx_if_non_empty(x.leadsV3[0].x, 0), "leadsV3.x"), (lambda x: get_idx_if_non_empty(x.laneLines[1].y, 0), "laneLines.y"), (lambda x: get_idx_if_non_empty(x.meta.disengagePredictions.gasPressProbs, 1), "gasPressProbs") ], "modelV2") DriverStateV2_Plots = zl([ (lambda x: get_idx_if_non_empty(x.wheelOnRightProb), "wheelOnRightProb"), (lambda x: get_idx_if_non_empty(x.leftDriverData.faceProb), "leftDriverData.faceProb"), (lambda x: get_idx_if_non_empty(x.leftDriverData.faceOrientation, 0), "leftDriverData.faceOrientation0"), (lambda x: get_idx_if_non_empty(x.leftDriverData.leftBlinkProb), "leftDriverData.leftBlinkProb"), (lambda x: get_idx_if_non_empty(x.leftDriverData.notReadyProb, 0), "leftDriverData.notReadyProb0"), (lambda x: get_idx_if_non_empty(x.rightDriverData.faceProb), "rightDriverData.faceProb"), ], "driverStateV2") return [plot(map(v[0], get_event(proposed, event)), \ map(v[0], get_event(master, event)), f"{v[1]}_{commit[:7]}", tmp) \ for v,event in ([*ModelV2_Plots] + [*DriverStateV2_Plots])] def create_table(title, files, link, open_table=False): if not files: return "" table = [f'
{title}'] for i,f in enumerate(files): if not (i % 2): table.append("") table.append(f'') if (i % 2): table.append("") table.append("
") table = "".join(table) return table def comment_replay_report(proposed, master, full_logs): with tempfile.TemporaryDirectory() as tmp: PR_BRANCH = os.getenv("GIT_BRANCH","") DATA_BUCKET = f"model_replay_{PR_BRANCH}" try: GITHUB.get_pr_number(PR_BRANCH) except Exception: print("No PR associated with this branch. Skipping report.") return commit = get_commit() files = generate_report(proposed, master, tmp, commit) GITHUB.upload_files(DATA_BUCKET, [(x[0], tmp + '/' + x[0]) for x in files]) log_name = get_log_fn(TEST_ROUTE, commit) save_log(log_name, full_logs) GITHUB.upload_file(DATA_BUCKET, os.path.basename(log_name), log_name) diff_files = [x for x in files if not x[1]] link = GITHUB.get_bucket_link(DATA_BUCKET) diff_plots = create_table("Model Replay Differences", diff_files, link, open_table=True) all_plots = create_table("All Model Replay Plots", files, link) comment = f"ref for commit {commit}: {link}/{log_name}" + diff_plots + all_plots GITHUB.comment_on_pr(comment, PR_BRANCH) def trim_logs_to_max_frames(logs, max_frames, frs_types, include_all_types): all_msgs = [] cam_state_counts = defaultdict(int) # keep adding messages until cam states are equal to MAX_FRAMES for msg in sorted(logs, key=lambda m: m.logMonoTime): all_msgs.append(msg) if msg.which() in frs_types: cam_state_counts[msg.which()] += 1 if all(cam_state_counts[state] == max_frames for state in frs_types): break if len(include_all_types) != 0: other_msgs = [m for m in logs if m.which() in include_all_types] all_msgs.extend(other_msgs) return all_msgs def model_replay(lr, frs): # modeld is using frame pairs modeld_logs = trim_logs_to_max_frames(lr, MAX_FRAMES, {"roadCameraState", "wideRoadCameraState"}, {"roadEncodeIdx", "wideRoadEncodeIdx", "carParams"}) dmodeld_logs = trim_logs_to_max_frames(lr, MAX_FRAMES, {"driverCameraState"}, {"driverEncodeIdx", "carParams"}) if not SEND_EXTRA_INPUTS: modeld_logs = [msg for msg in modeld_logs if msg.which() != 'liveCalibration'] dmodeld_logs = [msg for msg in dmodeld_logs if msg.which() != 'liveCalibration'] # initial setup for s in ('liveCalibration', 'deviceState'): msg = next(msg for msg in lr if msg.which() == s).as_builder() msg.logMonoTime = lr[0].logMonoTime modeld_logs.insert(1, msg.as_reader()) dmodeld_logs.insert(1, msg.as_reader()) modeld = get_process_config("modeld") dmonitoringmodeld = get_process_config("dmonitoringmodeld") modeld_msgs = replay_process(modeld, modeld_logs, frs) if isinstance(frs['roadCameraState'], NumpyFrameReader): del frs['roadCameraState'].frames del frs['wideRoadCameraState'].frames dmonitoringmodeld_msgs = replay_process(dmonitoringmodeld, dmodeld_logs, frs) msgs = modeld_msgs + dmonitoringmodeld_msgs header = ['model', 'max instant', 'max instant allowed', 'average', 'max average allowed', 'test result'] rows = [] timings_ok = True for (s, instant_max, avg_max) in EXEC_TIMINGS: ts = [getattr(m, s).modelExecutionTime for m in msgs if m.which() == s] # TODO some init can happen in first iteration ts = ts[1:] errors = [] if np.max(ts) > instant_max: errors.append("❌ FAILED MAX TIMING CHECK ❌") if np.mean(ts) > avg_max: errors.append("❌ FAILED AVG TIMING CHECK ❌") timings_ok = not errors and timings_ok rows.append([s, np.max(ts), instant_max, np.mean(ts), avg_max, "\n".join(errors) or "✅"]) print("------------------------------------------------") print("----------------- Model Timing -----------------") print("------------------------------------------------") print(tabulate(rows, header, tablefmt="simple_grid", stralign="center", numalign="center", floatfmt=".4f")) assert timings_ok return msgs def get_frames(): regen_cache = "--regen-cache" in sys.argv cache = "--cache" in sys.argv or not PC or regen_cache videos = ('fcamera.hevc', 'dcamera.hevc', 'ecamera.hevc') cams = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState') if cache: frames_cache = '/tmp/model_replay_cache' if PC else '/data/model_replay_cache' os.makedirs(frames_cache, exist_ok=True) cache_size = 200 for v in videos: if not all(os.path.isfile(f'{frames_cache}/{TEST_ROUTE}_{v}_{i}.npy') for i in range(MAX_FRAMES//cache_size)) or regen_cache: f = FrameReader(get_url(TEST_ROUTE, SEGMENT, v)).get(0, MAX_FRAMES + 1, pix_fmt="nv12") print(f'Caching {v}...') for i in range(MAX_FRAMES//cache_size): np.save(f'{frames_cache}/{TEST_ROUTE}_{v}_{i}', f[(i * cache_size) + 1:((i + 1) * cache_size) + 1]) del f return {c : NumpyFrameReader(f"{frames_cache}/{TEST_ROUTE}_{v}", 1928, 1208, cache_size) for c,v in zip(cams, videos, strict=True)} else: return {c : FrameReader(get_url(TEST_ROUTE, SEGMENT, v), readahead=True) for c,v in zip(cams, videos, strict=True)} if __name__ == "__main__": update = "--update" in sys.argv or (os.getenv("GIT_BRANCH", "") == 'master') replay_dir = os.path.dirname(os.path.abspath(__file__)) # load logs lr = list(LogReader(get_url(TEST_ROUTE, SEGMENT, "rlog.zst"))) frs = get_frames() log_msgs = [] # run replays if not NO_MODEL: log_msgs += model_replay(lr, frs) # get diff failed = False if not update: log_fn = get_log_fn(TEST_ROUTE) try: all_logs = list(LogReader(GITHUB.get_file_url(MODEL_REPLAY_BUCKET, log_fn))) cmp_log = [] # logs are ordered based on type: modelV2, drivingModelData, driverStateV2 if not NO_MODEL: model_start_index = next(i for i, m in enumerate(all_logs) if m.which() in ("modelV2", "drivingModelData", "cameraOdometry")) cmp_log += all_logs[model_start_index:model_start_index + MAX_FRAMES*3] dmon_start_index = next(i for i, m in enumerate(all_logs) if m.which() == "driverStateV2") cmp_log += all_logs[dmon_start_index:dmon_start_index + MAX_FRAMES] ignore = [ 'logMonoTime', 'drivingModelData.frameDropPerc', 'drivingModelData.modelExecutionTime', 'modelV2.frameDropPerc', 'modelV2.modelExecutionTime', 'driverStateV2.modelExecutionTime', 'driverStateV2.gpuExecutionTime' ] if PC: # TODO We ignore whole bunch so we can compare important stuff # like posenet with reasonable tolerance ignore += ['modelV2.acceleration.x', 'modelV2.position.x', 'modelV2.position.xStd', 'modelV2.position.y', 'modelV2.position.yStd', 'modelV2.position.z', 'modelV2.position.zStd', 'drivingModelData.path.xCoefficients',] for i in range(3): for field in ('x', 'y', 'v', 'a'): ignore.append(f'modelV2.leadsV3.{i}.{field}') ignore.append(f'modelV2.leadsV3.{i}.{field}Std') for i in range(4): for field in ('x', 'y', 'z', 't'): ignore.append(f'modelV2.laneLines.{i}.{field}') for i in range(2): for field in ('x', 'y', 'z', 't'): ignore.append(f'modelV2.roadEdges.{i}.{field}') tolerance = .3 if PC else None results: Any = {TEST_ROUTE: {}} log_paths: Any = {TEST_ROUTE: {"models": {'ref': log_fn, 'new': log_fn}}} results[TEST_ROUTE]["models"] = compare_logs(cmp_log, log_msgs, tolerance=tolerance, ignore_fields=ignore) diff_short, diff_long, failed = format_diff(results, log_paths, 'master') if "CI" in os.environ: comment_replay_report(log_msgs, cmp_log, log_msgs) failed = False print(diff_long) print('-------------\n'*5) print(diff_short) with open("model_diff.txt", "w") as f: f.write(diff_long) except Exception as e: print(str(e)) failed = True # upload new refs if update and not PC: print("Uploading new refs") log_fn = get_log_fn(TEST_ROUTE) save_log(log_fn, log_msgs) try: GITHUB.upload_file(MODEL_REPLAY_BUCKET, os.path.basename(log_fn), log_fn) except Exception as e: print("failed to upload", e) sys.exit(int(failed))