|
|
@ -4,7 +4,6 @@ import pytest |
|
|
|
import random |
|
|
|
import random |
|
|
|
import unittest # noqa: TID251 |
|
|
|
import unittest # noqa: TID251 |
|
|
|
from collections import defaultdict, Counter |
|
|
|
from collections import defaultdict, Counter |
|
|
|
from functools import partial |
|
|
|
|
|
|
|
import hypothesis.strategies as st |
|
|
|
import hypothesis.strategies as st |
|
|
|
from hypothesis import Phase, given, settings |
|
|
|
from hypothesis import Phase, given, settings |
|
|
|
from parameterized import parameterized_class |
|
|
|
from parameterized import parameterized_class |
|
|
@ -23,7 +22,7 @@ from openpilot.selfdrive.selfdrived.selfdrived import SelfdriveD |
|
|
|
from openpilot.selfdrive.pandad import can_capnp_to_list |
|
|
|
from openpilot.selfdrive.pandad import can_capnp_to_list |
|
|
|
from openpilot.selfdrive.test.helpers import read_segment_list |
|
|
|
from openpilot.selfdrive.test.helpers import read_segment_list |
|
|
|
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT |
|
|
|
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT |
|
|
|
from openpilot.tools.lib.logreader import LogReader, auto_source, internal_source, openpilotci_source, openpilotci_source_zst |
|
|
|
from openpilot.tools.lib.logreader import LogReader |
|
|
|
from openpilot.tools.lib.route import SegmentName |
|
|
|
from openpilot.tools.lib.route import SegmentName |
|
|
|
|
|
|
|
|
|
|
|
from panda.tests.libpanda import libpanda_py |
|
|
|
from panda.tests.libpanda import libpanda_py |
|
|
@ -68,7 +67,6 @@ def get_test_cases() -> list[tuple[str, CarTestRoute | None]]: |
|
|
|
class TestCarModelBase(unittest.TestCase): |
|
|
|
class TestCarModelBase(unittest.TestCase): |
|
|
|
platform: Platform | None = None |
|
|
|
platform: Platform | None = None |
|
|
|
test_route: CarTestRoute | None = None |
|
|
|
test_route: CarTestRoute | None = None |
|
|
|
test_route_on_bucket: bool = True # whether the route is on the preserved CI bucket |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
can_msgs: list[capnp.lib.capnp._DynamicStructReader] |
|
|
|
can_msgs: list[capnp.lib.capnp._DynamicStructReader] |
|
|
|
fingerprint: dict[int, dict[int, int]] |
|
|
|
fingerprint: dict[int, dict[int, int]] |
|
|
@ -126,31 +124,15 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
if cls.test_route.segment is not None: |
|
|
|
if cls.test_route.segment is not None: |
|
|
|
test_segs = (cls.test_route.segment,) |
|
|
|
test_segs = (cls.test_route.segment,) |
|
|
|
|
|
|
|
|
|
|
|
is_internal = len(INTERNAL_SEG_LIST) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for seg in test_segs: |
|
|
|
for seg in test_segs: |
|
|
|
segment_range = f"{cls.test_route.route}/{seg}" |
|
|
|
segment_range = f"{cls.test_route.route}/{seg}" |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
try: |
|
|
|
source = partial(auto_source, sources=[internal_source] if is_internal else [openpilotci_source, openpilotci_source_zst]) |
|
|
|
lr = LogReader(segment_range) |
|
|
|
lr = LogReader(segment_range, source=source) |
|
|
|
|
|
|
|
return cls.get_testing_data_from_logreader(lr) |
|
|
|
return cls.get_testing_data_from_logreader(lr) |
|
|
|
except Exception: |
|
|
|
except Exception: |
|
|
|
pass |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
# Route is not in CI bucket, assume either user has access (private), or it is public |
|
|
|
|
|
|
|
# test_route_on_ci_bucket will fail when running in CI |
|
|
|
|
|
|
|
if not is_internal: |
|
|
|
|
|
|
|
cls.test_route_on_bucket = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for seg in test_segs: |
|
|
|
|
|
|
|
segment_range = f"{cls.test_route.route}/{seg}" |
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
lr = LogReader(segment_range) |
|
|
|
|
|
|
|
return cls.get_testing_data_from_logreader(lr) |
|
|
|
|
|
|
|
except Exception: |
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?") |
|
|
|
raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -467,11 +449,6 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
failed_checks = {k: v for k, v in checks.items() if v > 0} |
|
|
|
failed_checks = {k: v for k, v in checks.items() if v > 0} |
|
|
|
self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}") |
|
|
|
self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}") |
|
|
|
|
|
|
|
|
|
|
|
@unittest.skipIf(not CI, "Accessing non CI-bucket routes is allowed only when not in CI") |
|
|
|
|
|
|
|
def test_route_on_ci_bucket(self): |
|
|
|
|
|
|
|
self.assertTrue(self.test_route_on_bucket, "Route not on CI bucket. " + |
|
|
|
|
|
|
|
"This is fine to fail for WIP car ports, just let us know and we can upload your routes to the CI bucket.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@parameterized_class(('platform', 'test_route'), get_test_cases()) |
|
|
|
@parameterized_class(('platform', 'test_route'), get_test_cases()) |
|
|
|
@pytest.mark.xdist_group_class_property('test_route') |
|
|
|
@pytest.mark.xdist_group_class_property('test_route') |
|
|
|