diff --git a/.github/workflows/selfdrive_tests.yaml b/.github/workflows/selfdrive_tests.yaml index 94e6304539..c0a72924e4 100644 --- a/.github/workflows/selfdrive_tests.yaml +++ b/.github/workflows/selfdrive_tests.yaml @@ -208,20 +208,30 @@ jobs: run: | ${{ env.RUN }} "scons -j$(nproc)" - name: Run replay - id: run-replay timeout-minutes: 30 run: | - ${{ env.RUN }} "CI=1 $PYTEST -n auto --dist=loadscope selfdrive/test/process_replay/test_processes.py --long-diff && \ - chmod -R 777 /tmp/comma_download_cache" + ${{ env.RUN }} "CI=1 coverage run selfdrive/test/process_replay/test_processes.py -j$(nproc) && \ + chmod -R 777 /tmp/comma_download_cache && \ + coverage combine && \ + coverage xml" + - name: Print diff + id: print-diff + if: always() + run: cat selfdrive/test/process_replay/diff.txt + - uses: actions/upload-artifact@v3 + if: always() + continue-on-error: true + with: + name: process_replay_diff.txt + path: selfdrive/test/process_replay/diff.txt + - name: Upload reference logs + if: ${{ failure() && steps.print-diff.outcome == 'success' && github.repository == 'commaai/openpilot' && env.AZURE_TOKEN != '' }} + run: | + ${{ env.RUN }} "unset PYTHONWARNINGS && CI=1 AZURE_TOKEN='$AZURE_TOKEN' python selfdrive/test/process_replay/test_processes.py -j$(nproc) --upload-only" - name: "Upload coverage to Codecov" uses: codecov/codecov-action@v3 with: name: ${{ github.job }} - - name: Upload reference logs - if: ${{ failure() && github.repository == 'commaai/openpilot' && env.AZURE_TOKEN != '' }} - run: | - ${{ env.RUN }} "unset PYTHONWARNINGS && CI=1 AZURE_TOKEN='$AZURE_TOKEN' \ - pytest -n auto --dist=loadscope selfdrive/test/process_replay/test_processes.py --upload-only" regen: name: regen diff --git a/common/prefix.py b/common/prefix.py index c5ae4393cd..c1744e8ff7 100644 --- a/common/prefix.py +++ b/common/prefix.py @@ -21,7 +21,6 @@ class OpenpilotPrefix: except FileExistsError: pass os.makedirs(Paths.log_root(), exist_ok=True) - os.makedirs(Paths.download_cache_root(), exist_ok=True) return self diff --git a/selfdrive/test/process_replay/conftest.py b/selfdrive/test/process_replay/conftest.py deleted file mode 100644 index a78f37f392..0000000000 --- a/selfdrive/test/process_replay/conftest.py +++ /dev/null @@ -1,37 +0,0 @@ -import pytest - -from openpilot.selfdrive.test.process_replay.helpers import ALL_PROCS -from openpilot.selfdrive.test.process_replay.test_processes import ALL_CARS - - -def pytest_addoption(parser: pytest.Parser): - parser.addoption("--whitelist-procs", type=str, nargs="*", default=ALL_PROCS, - help="Whitelist given processes from the test (e.g. controlsd)") - parser.addoption("--whitelist-cars", type=str, nargs="*", default=ALL_CARS, - help="Whitelist given cars from the test (e.g. HONDA)") - parser.addoption("--blacklist-procs", type=str, nargs="*", default=[], - help="Blacklist given processes from the test (e.g. controlsd)") - parser.addoption("--blacklist-cars", type=str, nargs="*", default=[], - help="Blacklist given cars from the test (e.g. HONDA)") - parser.addoption("--ignore-fields", type=str, nargs="*", default=[], - help="Extra fields or msgs to ignore (e.g. carState.events)") - parser.addoption("--ignore-msgs", type=str, nargs="*", default=[], - help="Msgs to ignore (e.g. onroadEvents)") - parser.addoption("--update-refs", action="store_true", - help="Updates reference logs using current commit") - parser.addoption("--upload-only", action="store_true", - help="Skips testing processes and uploads logs from previous test run") - parser.addoption("--long-diff", action="store_true", - help="Outputs diff in long format") - - -@pytest.fixture(scope="class", autouse=True) -def process_replay_test_arguments(request): - if hasattr(request.cls, "segment"): # check if a subclass of TestProcessReplayBase - request.cls.tested_procs = list(set(request.config.getoption("--whitelist-procs")) - set(request.config.getoption("--blacklist-procs"))) - request.cls.tested_cars = list({c.upper() for c in set(request.config.getoption("--whitelist-cars")) - set(request.config.getoption("--blacklist-cars"))}) - request.cls.ignore_fields = request.config.getoption("--ignore-fields") - request.cls.ignore_msgs = request.config.getoption("--ignore-msgs") - request.cls.upload_only = request.config.getoption("--upload-only") - request.cls.update_refs = request.config.getoption("--update-refs") - request.cls.long_diff = request.config.getoption("--long-diff") diff --git a/selfdrive/test/process_replay/helpers.py b/selfdrive/test/process_replay/helpers.py deleted file mode 100755 index 8e77454dae..0000000000 --- a/selfdrive/test/process_replay/helpers.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -import unittest - -from parameterized import parameterized -from typing import Optional, Union, List - - -from openpilot.selfdrive.test.openpilotci import get_url, upload_file -from openpilot.selfdrive.test.process_replay.compare_logs import compare_logs, format_process_diff -from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DIR, FAKEDATA, replay_process -from openpilot.system.version import get_commit -from openpilot.tools.lib.filereader import FileReader -from openpilot.tools.lib.helpers import save_log -from openpilot.tools.lib.logreader import LogReader, LogIterable - - -BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" -REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") -EXCLUDED_PROCS = {"modeld", "dmonitoringmodeld"} - - -def get_log_data(segment): - r, n = segment.rsplit("--", 1) - with FileReader(get_url(r, n)) as f: - return f.read() - - -ALL_PROCS = sorted({cfg.proc_name for cfg in CONFIGS if cfg.proc_name not in EXCLUDED_PROCS}) -PROC_TO_CFG = {cfg.proc_name: cfg for cfg in CONFIGS} - -cpu_count = os.cpu_count() or 1 - - -class TestProcessReplayBase(unittest.TestCase): - """ - Base class that replays all processes within test_proceses from a segment, - and puts the log messages in self.log_msgs for analysis by other tests. - """ - segment: Optional[Union[str, LogIterable]] = None - tested_procs: List[str] = ALL_PROCS - - @classmethod - def setUpClass(cls, create_logs=True): - if "Base" in cls.__name__: - raise unittest.SkipTest("skipping base class") - - if isinstance(cls.segment, str): - cls.log_reader = LogReader.from_bytes(get_log_data(cls.segment)) - else: - cls.log_reader = cls.segment - - if create_logs: - cls._create_log_msgs() - - @classmethod - def _run_replay(cls, cfg): - try: - return replay_process(cfg, cls.log_reader, disable_progress=True) - except Exception as e: - raise Exception(f"failed on segment: {cls.segment} \n{e}") from e - - @classmethod - def _create_log_msgs(cls): - cls.log_msgs = {} - cls.proc_cfg = {} - - for proc in cls.tested_procs: - cfg = PROC_TO_CFG[proc] - - log_msgs = cls._run_replay(cfg) - - cls.log_msgs[proc] = log_msgs - cls.proc_cfg[proc] = cfg - - -class TestProcessReplayDiffBase(TestProcessReplayBase): - """ - Base class for checking for diff between process outputs. - """ - update_refs = False - upload_only = False - long_diff = False - ignore_msgs: List[str] = [] - ignore_fields: List[str] = [] - - def setUp(self): - super().setUp() - if self.upload_only: - raise unittest.SkipTest("skipping test, uploading only") - - @classmethod - def setUpClass(cls): - super().setUpClass(not cls.upload_only) - - if cls.long_diff: - cls.maxDiff = None - - os.makedirs(os.path.dirname(FAKEDATA), exist_ok=True) - - cls.cur_commit = get_commit() - cls.assertNotEqual(cls.cur_commit, None, "Couldn't get current commit") - - cls.upload = cls.update_refs or cls.upload_only - - try: - with open(REF_COMMIT_FN) as f: - cls.ref_commit = f.read().strip() - except FileNotFoundError: - print("Couldn't find reference commit") - sys.exit(1) - - cls._create_ref_log_msgs() - - @classmethod - def _create_ref_log_msgs(cls): - cls.ref_log_msgs = {} - - for proc in cls.tested_procs: - cur_log_fn = os.path.join(FAKEDATA, f"{cls.segment}_{proc}_{cls.cur_commit}.bz2") - if cls.update_refs: # reference logs will not exist if routes were just regenerated - ref_log_path = get_url(*cls.segment.rsplit("--", 1)) - else: - ref_log_fn = os.path.join(FAKEDATA, f"{cls.segment}_{proc}_{cls.ref_commit}.bz2") - ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn) - - if not cls.upload_only: - save_log(cur_log_fn, cls.log_msgs[proc]) - cls.ref_log_msgs[proc] = list(LogReader(ref_log_path)) - - if cls.upload: - assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" - upload_file(cur_log_fn, os.path.basename(cur_log_fn)) - os.remove(cur_log_fn) - - @parameterized.expand(ALL_PROCS) - def test_process_diff(self, proc): - if proc not in self.tested_procs: - raise unittest.SkipTest(f"{proc} was not requested to be tested") - - cfg = self.proc_cfg[proc] - log_msgs = self.log_msgs[proc] - ref_log_msgs = self.ref_log_msgs[proc] - - diff = compare_logs(ref_log_msgs, log_msgs, self.ignore_fields + cfg.ignore, self.ignore_msgs) - - diff_short, diff_long = format_process_diff(diff) - - self.assertEqual(len(diff), 0, "\n" + diff_long if self.long_diff else diff_short) diff --git a/selfdrive/test/process_replay/regen_all.py b/selfdrive/test/process_replay/regen_all.py index 070cb5f783..656a5b89e1 100755 --- a/selfdrive/test/process_replay/regen_all.py +++ b/selfdrive/test/process_replay/regen_all.py @@ -8,8 +8,7 @@ from tqdm import tqdm from openpilot.common.prefix import OpenpilotPrefix from openpilot.selfdrive.test.process_replay.regen import regen_and_save -from openpilot.selfdrive.test.process_replay.process_replay import FAKEDATA -from openpilot.selfdrive.test.process_replay.test_processes import source_segments as segments +from openpilot.selfdrive.test.process_replay.test_processes import FAKEDATA, source_segments as segments from openpilot.tools.lib.route import SegmentName diff --git a/selfdrive/test/process_replay/test_processes.py b/selfdrive/test/process_replay/test_processes.py index 1517d9bab0..5429c9b63e 100755 --- a/selfdrive/test/process_replay/test_processes.py +++ b/selfdrive/test/process_replay/test_processes.py @@ -1,15 +1,20 @@ #!/usr/bin/env python3 -import unittest -import pytest +import argparse +import concurrent.futures +import os import sys - -from parameterized import parameterized_class -from typing import List, Optional +from collections import defaultdict +from tqdm import tqdm +from typing import Any, DefaultDict, Dict from openpilot.selfdrive.car.car_helpers import interface_names -from openpilot.selfdrive.test.process_replay.process_replay import check_openpilot_enabled -from openpilot.selfdrive.test.process_replay.helpers import TestProcessReplayDiffBase - +from openpilot.selfdrive.test.openpilotci import get_url, upload_file +from openpilot.selfdrive.test.process_replay.compare_logs import compare_logs, format_diff +from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DIR, FAKEDATA, check_openpilot_enabled, replay_process +from openpilot.system.version import get_commit +from openpilot.tools.lib.filereader import FileReader +from openpilot.tools.lib.logreader import LogReader +from openpilot.tools.lib.helpers import save_log source_segments = [ ("BODY", "937ccb7243511b65|2022-05-24--16-03-09--1"), # COMMA.BODY @@ -58,42 +63,166 @@ segments = [ # dashcamOnly makes don't need to be tested until a full port is done excluded_interfaces = ["mock", "tesla"] -ALL_CARS = sorted({car for car, _ in segments}) - - -@pytest.mark.slow -@parameterized_class(('case_name', 'segment'), segments) -@pytest.mark.xdist_group_class_property('case_name') -class TestCarProcessReplay(TestProcessReplayDiffBase): - """ - Runs a replay diff on a segment for each car. - """ - - case_name: Optional[str] = None - tested_cars: List[str] = ALL_CARS - - @classmethod - def setUpClass(cls): - if cls.case_name not in cls.tested_cars: - raise unittest.SkipTest(f"{cls.case_name} was not requested to be tested") - super().setUpClass() - - def test_all_makes_are_tested(self): - if set(self.tested_cars) != set(ALL_CARS): - raise unittest.SkipTest("skipping check because some cars were skipped via command line") - - # check to make sure all car brands are tested - untested = (set(interface_names) - set(excluded_interfaces)) - {c.lower() for c in self.tested_cars} - self.assertEqual(len(untested), 0, f"Cars missing routes: {str(untested)}") - - def test_controlsd_engaged(self): - if "controlsd" not in self.tested_procs: - raise unittest.SkipTest("controlsd was not requested to be tested") - - # check to make sure openpilot is engaged in the route - log_msgs = self.log_msgs["controlsd"] - self.assertTrue(check_openpilot_enabled(log_msgs), f"Route did not enable at all or for long enough: {self.segment}") - - -if __name__ == '__main__': - pytest.main([*sys.argv[1:], __file__]) +BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" +REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") +EXCLUDED_PROCS = {"modeld", "dmonitoringmodeld"} + + +def run_test_process(data): + segment, cfg, args, cur_log_fn, ref_log_path, lr_dat = data + res = None + if not args.upload_only: + lr = LogReader.from_bytes(lr_dat) + res, log_msgs = test_process(cfg, lr, segment, ref_log_path, cur_log_fn, args.ignore_fields, args.ignore_msgs) + # save logs so we can upload when updating refs + save_log(cur_log_fn, log_msgs) + + if args.update_refs or args.upload_only: + print(f'Uploading: {os.path.basename(cur_log_fn)}') + assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" + upload_file(cur_log_fn, os.path.basename(cur_log_fn)) + os.remove(cur_log_fn) + return (segment, cfg.proc_name, res) + + +def get_log_data(segment): + r, n = segment.rsplit("--", 1) + with FileReader(get_url(r, n)) as f: + return (segment, f.read()) + + +def test_process(cfg, lr, segment, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None): + if ignore_fields is None: + ignore_fields = [] + if ignore_msgs is None: + ignore_msgs = [] + + ref_log_msgs = list(LogReader(ref_log_path)) + + try: + log_msgs = replay_process(cfg, lr, disable_progress=True) + except Exception as e: + raise Exception("failed on segment: " + segment) from e + + # check to make sure openpilot is engaged in the route + if cfg.proc_name == "controlsd": + if not check_openpilot_enabled(log_msgs): + return f"Route did not enable at all or for long enough: {new_log_path}", log_msgs + + try: + return compare_logs(ref_log_msgs, log_msgs, ignore_fields + cfg.ignore, ignore_msgs, cfg.tolerance), log_msgs + except Exception as e: + return str(e), log_msgs + + +if __name__ == "__main__": + all_cars = {car for car, _ in segments} + all_procs = {cfg.proc_name for cfg in CONFIGS if cfg.proc_name not in EXCLUDED_PROCS} + + cpu_count = os.cpu_count() or 1 + + parser = argparse.ArgumentParser(description="Regression test to identify changes in a process's output") + parser.add_argument("--whitelist-procs", type=str, nargs="*", default=all_procs, + help="Whitelist given processes from the test (e.g. controlsd)") + parser.add_argument("--whitelist-cars", type=str, nargs="*", default=all_cars, + help="Whitelist given cars from the test (e.g. HONDA)") + parser.add_argument("--blacklist-procs", type=str, nargs="*", default=[], + help="Blacklist given processes from the test (e.g. controlsd)") + parser.add_argument("--blacklist-cars", type=str, nargs="*", default=[], + help="Blacklist given cars from the test (e.g. HONDA)") + parser.add_argument("--ignore-fields", type=str, nargs="*", default=[], + help="Extra fields or msgs to ignore (e.g. carState.events)") + parser.add_argument("--ignore-msgs", type=str, nargs="*", default=[], + help="Msgs to ignore (e.g. carEvents)") + parser.add_argument("--update-refs", action="store_true", + help="Updates reference logs using current commit") + parser.add_argument("--upload-only", action="store_true", + help="Skips testing processes and uploads logs from previous test run") + parser.add_argument("-j", "--jobs", type=int, default=max(cpu_count - 2, 1), + help="Max amount of parallel jobs") + args = parser.parse_args() + + tested_procs = set(args.whitelist_procs) - set(args.blacklist_procs) + tested_cars = set(args.whitelist_cars) - set(args.blacklist_cars) + tested_cars = {c.upper() for c in tested_cars} + + full_test = (tested_procs == all_procs) and (tested_cars == all_cars) and all(len(x) == 0 for x in (args.ignore_fields, args.ignore_msgs)) + upload = args.update_refs or args.upload_only + os.makedirs(os.path.dirname(FAKEDATA), exist_ok=True) + + if upload: + assert full_test, "Need to run full test when updating refs" + + try: + ref_commit = open(REF_COMMIT_FN).read().strip() + except FileNotFoundError: + print("Couldn't find reference commit") + sys.exit(1) + + cur_commit = get_commit() + if cur_commit is None: + raise Exception("Couldn't get current commit") + + print(f"***** testing against commit {ref_commit} *****") + + # check to make sure all car brands are tested + if full_test: + untested = (set(interface_names) - set(excluded_interfaces)) - {c.lower() for c in tested_cars} + assert len(untested) == 0, f"Cars missing routes: {str(untested)}" + + log_paths: DefaultDict[str, Dict[str, Dict[str, str]]] = defaultdict(lambda: defaultdict(dict)) + with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool: + if not args.upload_only: + download_segments = [seg for car, seg in segments if car in tested_cars] + log_data: Dict[str, LogReader] = {} + p1 = pool.map(get_log_data, download_segments) + for segment, lr in tqdm(p1, desc="Getting Logs", total=len(download_segments)): + log_data[segment] = lr + + pool_args: Any = [] + for car_brand, segment in segments: + if car_brand not in tested_cars: + continue + + for cfg in CONFIGS: + if cfg.proc_name not in tested_procs: + continue + + cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2") + if args.update_refs: # reference logs will not exist if routes were just regenerated + ref_log_path = get_url(*segment.rsplit("--", 1)) + else: + ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2") + ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn) + + dat = None if args.upload_only else log_data[segment] + pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, dat)) + + log_paths[segment][cfg.proc_name]['ref'] = ref_log_path + log_paths[segment][cfg.proc_name]['new'] = cur_log_fn + + results: Any = defaultdict(dict) + p2 = pool.map(run_test_process, pool_args) + for (segment, proc, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)): + if not args.upload_only: + results[segment][proc] = result + + diff_short, diff_long, failed = format_diff(results, log_paths, ref_commit) + if not upload: + with open(os.path.join(PROC_REPLAY_DIR, "diff.txt"), "w") as f: + f.write(diff_long) + print(diff_short) + + if failed: + print("TEST FAILED") + print("\n\nTo push the new reference logs for this commit run:") + print("./test_processes.py --upload-only") + else: + print("TEST SUCCEEDED") + + else: + with open(REF_COMMIT_FN, "w") as f: + f.write(cur_commit) + print(f"\n\nUpdated reference logs for commit: {cur_commit}") + + sys.exit(int(failed))