|
|
@ -37,6 +37,7 @@ INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "") |
|
|
|
INTERNAL_SEG_CNT = int(os.environ.get("INTERNAL_SEG_CNT", "0")) |
|
|
|
INTERNAL_SEG_CNT = int(os.environ.get("INTERNAL_SEG_CNT", "0")) |
|
|
|
MAX_EXAMPLES = int(os.environ.get("MAX_EXAMPLES", "50")) |
|
|
|
MAX_EXAMPLES = int(os.environ.get("MAX_EXAMPLES", "50")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CI = os.environ.get("CI", None) is not None |
|
|
|
|
|
|
|
|
|
|
|
def get_test_cases() -> List[Tuple[str, Optional[CarTestRoute]]]: |
|
|
|
def get_test_cases() -> List[Tuple[str, Optional[CarTestRoute]]]: |
|
|
|
# build list of test cases |
|
|
|
# build list of test cases |
|
|
@ -67,13 +68,33 @@ def get_test_cases() -> List[Tuple[str, Optional[CarTestRoute]]]: |
|
|
|
class TestCarModelBase(unittest.TestCase): |
|
|
|
class TestCarModelBase(unittest.TestCase): |
|
|
|
car_model: Optional[str] = None |
|
|
|
car_model: Optional[str] = None |
|
|
|
test_route: Optional[CarTestRoute] = None |
|
|
|
test_route: Optional[CarTestRoute] = None |
|
|
|
ci: bool = True |
|
|
|
test_route_on_bucket: bool = True # whether the route is on the preserved CI bucket |
|
|
|
|
|
|
|
|
|
|
|
can_msgs: List[capnp.lib.capnp._DynamicStructReader] |
|
|
|
can_msgs: List[capnp.lib.capnp._DynamicStructReader] |
|
|
|
fingerprint: dict[int, dict[int, int]] |
|
|
|
fingerprint: dict[int, dict[int, int]] |
|
|
|
elm_frame: Optional[int] |
|
|
|
elm_frame: Optional[int] |
|
|
|
car_safety_mode_frame: Optional[int] |
|
|
|
car_safety_mode_frame: Optional[int] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
|
|
|
def get_logreader(cls, seg): |
|
|
|
|
|
|
|
if len(INTERNAL_SEG_LIST): |
|
|
|
|
|
|
|
route_name = RouteName(cls.test_route.route) |
|
|
|
|
|
|
|
return LogReader(f"cd:/{route_name.dongle_id}/{route_name.time_str}/{seg}/rlog.bz2") |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
# Attempt to use CI bucket first |
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
return LogReader(get_url(cls.test_route.route, seg)) |
|
|
|
|
|
|
|
except Exception: |
|
|
|
|
|
|
|
cls.test_route_on_bucket = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Fallback to public route, which will fail the test_route_on_ci_bucket when running in CI |
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
return LogReader(Route(cls.test_route.route).log_paths()[seg]) |
|
|
|
|
|
|
|
except Exception: |
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
raise Exception("Unable to get route. Check that the route is valid, and either public or uploaded to the CI bucket.") |
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
@classmethod |
|
|
|
def setUpClass(cls): |
|
|
|
def setUpClass(cls): |
|
|
|
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'): |
|
|
|
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'): |
|
|
@ -95,13 +116,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
|
|
for seg in test_segs: |
|
|
|
for seg in test_segs: |
|
|
|
try: |
|
|
|
try: |
|
|
|
if len(INTERNAL_SEG_LIST): |
|
|
|
lr = cls.get_logreader(seg) |
|
|
|
route_name = RouteName(cls.test_route.route) |
|
|
|
|
|
|
|
lr = LogReader(f"cd:/{route_name.dongle_id}/{route_name.time_str}/{seg}/rlog.bz2") |
|
|
|
|
|
|
|
elif cls.ci: |
|
|
|
|
|
|
|
lr = LogReader(get_url(cls.test_route.route, seg)) |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
lr = LogReader(Route(cls.test_route.route).log_paths()[seg]) |
|
|
|
|
|
|
|
except Exception: |
|
|
|
except Exception: |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
@ -145,7 +160,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
if len(can_msgs) > int(50 / DT_CTRL): |
|
|
|
if len(can_msgs) > int(50 / DT_CTRL): |
|
|
|
break |
|
|
|
break |
|
|
|
else: |
|
|
|
else: |
|
|
|
raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded?") |
|
|
|
raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?") |
|
|
|
|
|
|
|
|
|
|
|
# if relay is expected to be open in the route |
|
|
|
# if relay is expected to be open in the route |
|
|
|
cls.openpilot_enabled = cls.car_safety_mode_frame is not None |
|
|
|
cls.openpilot_enabled = cls.car_safety_mode_frame is not None |
|
|
@ -451,6 +466,11 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
failed_checks = {k: v for k, v in checks.items() if v > 0} |
|
|
|
failed_checks = {k: v for k, v in checks.items() if v > 0} |
|
|
|
self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}") |
|
|
|
self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(not CI, reason="When running in CI we want to make sure all the routes are uploaded to the preserved CI bucket.") |
|
|
|
|
|
|
|
def test_route_on_ci_bucket(self): |
|
|
|
|
|
|
|
assert self.test_route_on_bucket, "Route not on CI bucket. \ |
|
|
|
|
|
|
|
This is fine to fail for WIP car ports, just let us know and we can upload your routes to the CI bucket." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@parameterized_class(('car_model', 'test_route'), get_test_cases()) |
|
|
|
@parameterized_class(('car_model', 'test_route'), get_test_cases()) |
|
|
|
@pytest.mark.xdist_group_class_property('test_route') |
|
|
|
@pytest.mark.xdist_group_class_property('test_route') |
|
|
|