From 6af8e72604e737a9925b209f01839a30f4b58433 Mon Sep 17 00:00:00 2001 From: Justin Newberry Date: Mon, 13 Nov 2023 13:50:23 -0800 Subject: [PATCH] Process Replay: move to pytest (#30260) * process replay pytest * enable long diff * readd job name * make it executable * cleanup imports * retrigger old-commit-hash: 90c873ab1def0756f6f91fcb7f2d6fad503e3f49 --- .github/workflows/selfdrive_tests.yaml | 26 +- common/prefix.py | 1 + selfdrive/test/process_replay/conftest.py | 37 +++ selfdrive/test/process_replay/helpers.py | 150 ++++++++++++ selfdrive/test/process_replay/regen_all.py | 3 +- .../test/process_replay/test_processes.py | 222 ++++-------------- 6 files changed, 244 insertions(+), 195 deletions(-) create mode 100644 selfdrive/test/process_replay/conftest.py create mode 100755 selfdrive/test/process_replay/helpers.py diff --git a/.github/workflows/selfdrive_tests.yaml b/.github/workflows/selfdrive_tests.yaml index 6ab55f6add..4e622f68a5 100644 --- a/.github/workflows/selfdrive_tests.yaml +++ b/.github/workflows/selfdrive_tests.yaml @@ -208,30 +208,20 @@ jobs: run: | ${{ env.RUN }} "scons -j$(nproc)" - name: Run replay + id: run-replay timeout-minutes: 30 run: | - ${{ env.RUN }} "CI=1 coverage run selfdrive/test/process_replay/test_processes.py -j$(nproc) && \ - chmod -R 777 /tmp/comma_download_cache && \ - coverage combine && \ - coverage xml" - - name: Print diff - id: print-diff - if: always() - run: cat selfdrive/test/process_replay/diff.txt - - uses: actions/upload-artifact@v3 - if: always() - continue-on-error: true - with: - name: process_replay_diff.txt - path: selfdrive/test/process_replay/diff.txt - - name: Upload reference logs - if: ${{ failure() && steps.print-diff.outcome == 'success' && github.repository == 'commaai/openpilot' && env.AZURE_TOKEN != '' }} - run: | - ${{ env.RUN }} "unset PYTHONWARNINGS && CI=1 AZURE_TOKEN='$AZURE_TOKEN' python selfdrive/test/process_replay/test_processes.py -j$(nproc) --upload-only" + ${{ env.RUN }} "CI=1 $PYTEST -n auto --dist=loadscope selfdrive/test/process_replay/test_processes.py --long-diff && \ + chmod -R 777 /tmp/comma_download_cache" - name: "Upload coverage to Codecov" uses: codecov/codecov-action@v3 with: name: ${{ github.job }} + - name: Upload reference logs + if: ${{ failure() && github.repository == 'commaai/openpilot' && env.AZURE_TOKEN != '' }} + run: | + ${{ env.RUN }} "unset PYTHONWARNINGS && CI=1 AZURE_TOKEN='$AZURE_TOKEN' \ + pytest -n auto --dist=loadscope selfdrive/test/process_replay/test_processes.py --upload-only" regen: name: regen diff --git a/common/prefix.py b/common/prefix.py index c1744e8ff7..c5ae4393cd 100644 --- a/common/prefix.py +++ b/common/prefix.py @@ -21,6 +21,7 @@ class OpenpilotPrefix: except FileExistsError: pass os.makedirs(Paths.log_root(), exist_ok=True) + os.makedirs(Paths.download_cache_root(), exist_ok=True) return self diff --git a/selfdrive/test/process_replay/conftest.py b/selfdrive/test/process_replay/conftest.py new file mode 100644 index 0000000000..f3794d26ac --- /dev/null +++ b/selfdrive/test/process_replay/conftest.py @@ -0,0 +1,37 @@ +import pytest + +from openpilot.selfdrive.test.process_replay.helpers import ALL_PROCS +from openpilot.selfdrive.test.process_replay.test_processes import ALL_CARS + + +def pytest_addoption(parser: pytest.Parser): + parser.addoption("--whitelist-procs", type=str, nargs="*", default=ALL_PROCS, + help="Whitelist given processes from the test (e.g. controlsd)") + parser.addoption("--whitelist-cars", type=str, nargs="*", default=ALL_CARS, + help="Whitelist given cars from the test (e.g. HONDA)") + parser.addoption("--blacklist-procs", type=str, nargs="*", default=[], + help="Blacklist given processes from the test (e.g. controlsd)") + parser.addoption("--blacklist-cars", type=str, nargs="*", default=[], + help="Blacklist given cars from the test (e.g. HONDA)") + parser.addoption("--ignore-fields", type=str, nargs="*", default=[], + help="Extra fields or msgs to ignore (e.g. carState.events)") + parser.addoption("--ignore-msgs", type=str, nargs="*", default=[], + help="Msgs to ignore (e.g. carEvents)") + parser.addoption("--update-refs", action="store_true", + help="Updates reference logs using current commit") + parser.addoption("--upload-only", action="store_true", + help="Skips testing processes and uploads logs from previous test run") + parser.addoption("--long-diff", action="store_true", + help="Outputs diff in long format") + + +@pytest.fixture(scope="class", autouse=True) +def process_replay_test_arguments(request): + if hasattr(request.cls, "segment"): # check if a subclass of TestProcessReplayBase + request.cls.tested_procs = list(set(request.config.getoption("--whitelist-procs")) - set(request.config.getoption("--blacklist-procs"))) + request.cls.tested_cars = list({c.upper() for c in set(request.config.getoption("--whitelist-cars")) - set(request.config.getoption("--blacklist-cars"))}) + request.cls.ignore_fields = request.config.getoption("--ignore-fields") + request.cls.ignore_msgs = request.config.getoption("--ignore-msgs") + request.cls.upload_only = request.config.getoption("--upload-only") + request.cls.update_refs = request.config.getoption("--update-refs") + request.cls.long_diff = request.config.getoption("--long-diff") \ No newline at end of file diff --git a/selfdrive/test/process_replay/helpers.py b/selfdrive/test/process_replay/helpers.py new file mode 100755 index 0000000000..0952a01870 --- /dev/null +++ b/selfdrive/test/process_replay/helpers.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +import os +import sys +import unittest + +from parameterized import parameterized +from typing import Optional, Union, List + + +from openpilot.selfdrive.test.openpilotci import get_url, upload_file +from openpilot.selfdrive.test.process_replay.compare_logs import compare_logs, format_process_diff +from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DIR, FAKEDATA, replay_process +from openpilot.system.version import get_commit +from openpilot.tools.lib.filereader import FileReader +from openpilot.tools.lib.helpers import save_log +from openpilot.tools.lib.logreader import LogReader, LogIterable + + +BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" +REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") +EXCLUDED_PROCS = {"modeld", "dmonitoringmodeld"} + + +def get_log_data(segment): + r, n = segment.rsplit("--", 1) + with FileReader(get_url(r, n)) as f: + return f.read() + + +ALL_PROCS = sorted({cfg.proc_name for cfg in CONFIGS if cfg.proc_name not in EXCLUDED_PROCS}) +PROC_TO_CFG = {cfg.proc_name: cfg for cfg in CONFIGS} + +cpu_count = os.cpu_count() or 1 + + +class TestProcessReplayBase(unittest.TestCase): + """ + Base class that replays all processes within test_proceses from a segment, + and puts the log messages in self.log_msgs for analysis by other tests. + """ + segment: Optional[Union[str, LogIterable]] = None + tested_procs: List[str] = ALL_PROCS + + @classmethod + def setUpClass(cls, create_logs=True): + if "Base" in cls.__name__: + raise unittest.SkipTest("skipping base class") + + if isinstance(cls.segment, str): + cls.log_reader = LogReader.from_bytes(get_log_data(cls.segment)) + else: + cls.log_reader = cls.segment + + if create_logs: + cls._create_log_msgs() + + @classmethod + def _run_replay(cls, cfg): + try: + return replay_process(cfg, cls.log_reader, disable_progress=True) + except Exception as e: + raise Exception(f"failed on segment: {cls.segment} \n{e}") from e + + @classmethod + def _create_log_msgs(cls): + cls.log_msgs = {} + cls.proc_cfg = {} + + for proc in cls.tested_procs: + cfg = PROC_TO_CFG[proc] + + log_msgs = cls._run_replay(cfg) + + cls.log_msgs[proc] = log_msgs + cls.proc_cfg[proc] = cfg + + +class TestProcessReplayDiffBase(TestProcessReplayBase): + """ + Base class for checking for diff between process outputs. + """ + update_refs = False + upload_only = False + long_diff = False + ignore_msgs: List[str] = [] + ignore_fields: List[str] = [] + + def setUp(self): + super().setUp() + if self.upload_only: + raise unittest.SkipTest("skipping test, uploading only") + + @classmethod + def setUpClass(cls): + super().setUpClass(not cls.upload_only) + + if cls.long_diff: + cls.maxDiff = None + + os.makedirs(os.path.dirname(FAKEDATA), exist_ok=True) + + cls.cur_commit = get_commit() + cls.assertNotEqual(cls.cur_commit, None, "Couldn't get current commit") + + cls.upload = cls.update_refs or cls.upload_only + + try: + with open(REF_COMMIT_FN) as f: + cls.ref_commit = f.read().strip() + except FileNotFoundError: + print("Couldn't find reference commit") + sys.exit(1) + + cls._create_ref_log_msgs() + + @classmethod + def _create_ref_log_msgs(cls): + cls.ref_log_msgs = {} + + for proc in cls.tested_procs: + cur_log_fn = os.path.join(FAKEDATA, f"{cls.segment}_{proc}_{cls.cur_commit}.bz2") + if cls.update_refs: # reference logs will not exist if routes were just regenerated + ref_log_path = get_url(*cls.segment.rsplit("--", 1)) + else: + ref_log_fn = os.path.join(FAKEDATA, f"{cls.segment}_{proc}_{cls.ref_commit}.bz2") + ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn) + + if not cls.upload_only: + save_log(cur_log_fn, cls.log_msgs[proc]) + cls.ref_log_msgs[proc] = list(LogReader(ref_log_path)) + + if cls.upload: + assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" + upload_file(cur_log_fn, os.path.basename(cur_log_fn)) + os.remove(cur_log_fn) + + @parameterized.expand(ALL_PROCS) + def test_process_diff(self, proc): + if proc not in self.tested_procs: + raise unittest.SkipTest(f"{proc} was not requested to be tested") + + cfg = self.proc_cfg[proc] + log_msgs = self.log_msgs[proc] + ref_log_msgs = self.ref_log_msgs[proc] + + diff = compare_logs(ref_log_msgs, log_msgs, self.ignore_fields + cfg.ignore, self.ignore_msgs) + + diff_short, diff_long = format_process_diff(diff) + + self.assertEqual(len(diff), 0, "\n" + diff_long if self.long_diff else diff_short) \ No newline at end of file diff --git a/selfdrive/test/process_replay/regen_all.py b/selfdrive/test/process_replay/regen_all.py index 656a5b89e1..070cb5f783 100755 --- a/selfdrive/test/process_replay/regen_all.py +++ b/selfdrive/test/process_replay/regen_all.py @@ -8,7 +8,8 @@ from tqdm import tqdm from openpilot.common.prefix import OpenpilotPrefix from openpilot.selfdrive.test.process_replay.regen import regen_and_save -from openpilot.selfdrive.test.process_replay.test_processes import FAKEDATA, source_segments as segments +from openpilot.selfdrive.test.process_replay.process_replay import FAKEDATA +from openpilot.selfdrive.test.process_replay.test_processes import source_segments as segments from openpilot.tools.lib.route import SegmentName diff --git a/selfdrive/test/process_replay/test_processes.py b/selfdrive/test/process_replay/test_processes.py index 5429c9b63e..efdd166cac 100755 --- a/selfdrive/test/process_replay/test_processes.py +++ b/selfdrive/test/process_replay/test_processes.py @@ -1,20 +1,15 @@ #!/usr/bin/env python3 -import argparse -import concurrent.futures -import os +import unittest +import pytest import sys -from collections import defaultdict -from tqdm import tqdm -from typing import Any, DefaultDict, Dict + +from parameterized import parameterized_class +from typing import List, Optional from openpilot.selfdrive.car.car_helpers import interface_names -from openpilot.selfdrive.test.openpilotci import get_url, upload_file -from openpilot.selfdrive.test.process_replay.compare_logs import compare_logs, format_diff -from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DIR, FAKEDATA, check_openpilot_enabled, replay_process -from openpilot.system.version import get_commit -from openpilot.tools.lib.filereader import FileReader -from openpilot.tools.lib.logreader import LogReader -from openpilot.tools.lib.helpers import save_log +from openpilot.selfdrive.test.process_replay.process_replay import check_openpilot_enabled +from openpilot.selfdrive.test.process_replay.helpers import TestProcessReplayDiffBase + source_segments = [ ("BODY", "937ccb7243511b65|2022-05-24--16-03-09--1"), # COMMA.BODY @@ -63,166 +58,41 @@ segments = [ # dashcamOnly makes don't need to be tested until a full port is done excluded_interfaces = ["mock", "tesla"] -BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" -REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") -EXCLUDED_PROCS = {"modeld", "dmonitoringmodeld"} - - -def run_test_process(data): - segment, cfg, args, cur_log_fn, ref_log_path, lr_dat = data - res = None - if not args.upload_only: - lr = LogReader.from_bytes(lr_dat) - res, log_msgs = test_process(cfg, lr, segment, ref_log_path, cur_log_fn, args.ignore_fields, args.ignore_msgs) - # save logs so we can upload when updating refs - save_log(cur_log_fn, log_msgs) - - if args.update_refs or args.upload_only: - print(f'Uploading: {os.path.basename(cur_log_fn)}') - assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" - upload_file(cur_log_fn, os.path.basename(cur_log_fn)) - os.remove(cur_log_fn) - return (segment, cfg.proc_name, res) - - -def get_log_data(segment): - r, n = segment.rsplit("--", 1) - with FileReader(get_url(r, n)) as f: - return (segment, f.read()) - - -def test_process(cfg, lr, segment, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None): - if ignore_fields is None: - ignore_fields = [] - if ignore_msgs is None: - ignore_msgs = [] - - ref_log_msgs = list(LogReader(ref_log_path)) - - try: - log_msgs = replay_process(cfg, lr, disable_progress=True) - except Exception as e: - raise Exception("failed on segment: " + segment) from e - - # check to make sure openpilot is engaged in the route - if cfg.proc_name == "controlsd": - if not check_openpilot_enabled(log_msgs): - return f"Route did not enable at all or for long enough: {new_log_path}", log_msgs - - try: - return compare_logs(ref_log_msgs, log_msgs, ignore_fields + cfg.ignore, ignore_msgs, cfg.tolerance), log_msgs - except Exception as e: - return str(e), log_msgs - - -if __name__ == "__main__": - all_cars = {car for car, _ in segments} - all_procs = {cfg.proc_name for cfg in CONFIGS if cfg.proc_name not in EXCLUDED_PROCS} - - cpu_count = os.cpu_count() or 1 - - parser = argparse.ArgumentParser(description="Regression test to identify changes in a process's output") - parser.add_argument("--whitelist-procs", type=str, nargs="*", default=all_procs, - help="Whitelist given processes from the test (e.g. controlsd)") - parser.add_argument("--whitelist-cars", type=str, nargs="*", default=all_cars, - help="Whitelist given cars from the test (e.g. HONDA)") - parser.add_argument("--blacklist-procs", type=str, nargs="*", default=[], - help="Blacklist given processes from the test (e.g. controlsd)") - parser.add_argument("--blacklist-cars", type=str, nargs="*", default=[], - help="Blacklist given cars from the test (e.g. HONDA)") - parser.add_argument("--ignore-fields", type=str, nargs="*", default=[], - help="Extra fields or msgs to ignore (e.g. carState.events)") - parser.add_argument("--ignore-msgs", type=str, nargs="*", default=[], - help="Msgs to ignore (e.g. carEvents)") - parser.add_argument("--update-refs", action="store_true", - help="Updates reference logs using current commit") - parser.add_argument("--upload-only", action="store_true", - help="Skips testing processes and uploads logs from previous test run") - parser.add_argument("-j", "--jobs", type=int, default=max(cpu_count - 2, 1), - help="Max amount of parallel jobs") - args = parser.parse_args() - - tested_procs = set(args.whitelist_procs) - set(args.blacklist_procs) - tested_cars = set(args.whitelist_cars) - set(args.blacklist_cars) - tested_cars = {c.upper() for c in tested_cars} - - full_test = (tested_procs == all_procs) and (tested_cars == all_cars) and all(len(x) == 0 for x in (args.ignore_fields, args.ignore_msgs)) - upload = args.update_refs or args.upload_only - os.makedirs(os.path.dirname(FAKEDATA), exist_ok=True) - - if upload: - assert full_test, "Need to run full test when updating refs" - - try: - ref_commit = open(REF_COMMIT_FN).read().strip() - except FileNotFoundError: - print("Couldn't find reference commit") - sys.exit(1) - - cur_commit = get_commit() - if cur_commit is None: - raise Exception("Couldn't get current commit") - - print(f"***** testing against commit {ref_commit} *****") - - # check to make sure all car brands are tested - if full_test: - untested = (set(interface_names) - set(excluded_interfaces)) - {c.lower() for c in tested_cars} - assert len(untested) == 0, f"Cars missing routes: {str(untested)}" - - log_paths: DefaultDict[str, Dict[str, Dict[str, str]]] = defaultdict(lambda: defaultdict(dict)) - with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool: - if not args.upload_only: - download_segments = [seg for car, seg in segments if car in tested_cars] - log_data: Dict[str, LogReader] = {} - p1 = pool.map(get_log_data, download_segments) - for segment, lr in tqdm(p1, desc="Getting Logs", total=len(download_segments)): - log_data[segment] = lr - - pool_args: Any = [] - for car_brand, segment in segments: - if car_brand not in tested_cars: - continue - - for cfg in CONFIGS: - if cfg.proc_name not in tested_procs: - continue - - cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2") - if args.update_refs: # reference logs will not exist if routes were just regenerated - ref_log_path = get_url(*segment.rsplit("--", 1)) - else: - ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2") - ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn) - - dat = None if args.upload_only else log_data[segment] - pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, dat)) - - log_paths[segment][cfg.proc_name]['ref'] = ref_log_path - log_paths[segment][cfg.proc_name]['new'] = cur_log_fn - - results: Any = defaultdict(dict) - p2 = pool.map(run_test_process, pool_args) - for (segment, proc, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)): - if not args.upload_only: - results[segment][proc] = result - - diff_short, diff_long, failed = format_diff(results, log_paths, ref_commit) - if not upload: - with open(os.path.join(PROC_REPLAY_DIR, "diff.txt"), "w") as f: - f.write(diff_long) - print(diff_short) - - if failed: - print("TEST FAILED") - print("\n\nTo push the new reference logs for this commit run:") - print("./test_processes.py --upload-only") - else: - print("TEST SUCCEEDED") - - else: - with open(REF_COMMIT_FN, "w") as f: - f.write(cur_commit) - print(f"\n\nUpdated reference logs for commit: {cur_commit}") - - sys.exit(int(failed)) +ALL_CARS = sorted({car for car, _ in segments}) + + +@pytest.mark.slow +@parameterized_class(('case_name', 'segment'), segments) +class TestCarProcessReplay(TestProcessReplayDiffBase): + """ + Runs a replay diff on a segment for each car. + """ + + case_name: Optional[str] = None + tested_cars: List[str] = ALL_CARS + + @classmethod + def setUpClass(cls): + if cls.case_name not in cls.tested_cars: + raise unittest.SkipTest(f"{cls.case_name} was not requested to be tested") + super().setUpClass() + + def test_all_makes_are_tested(self): + if set(self.tested_cars) != set(ALL_CARS): + raise unittest.SkipTest("skipping check because some cars were skipped via command line") + + # check to make sure all car brands are tested + untested = (set(interface_names) - set(excluded_interfaces)) - {c.lower() for c in self.tested_cars} + self.assertEqual(len(untested), 0, f"Cars missing routes: {str(untested)}") + + def test_controlsd_engaged(self): + if "controlsd" not in self.tested_procs: + raise unittest.SkipTest("controlsd was not requested to be tested") + + # check to make sure openpilot is engaged in the route + log_msgs = self.log_msgs["controlsd"] + self.assertTrue(check_openpilot_enabled(log_msgs), f"Route did not enable at all or for long enough: {self.segment}") + + +if __name__ == '__main__': + pytest.main([*sys.argv[1:], __file__]) \ No newline at end of file