|
|
|
@ -76,46 +76,16 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
car_safety_mode_frame: Optional[int] |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def get_logreader(cls, seg): |
|
|
|
|
def get_logreader_primary(cls, seg): |
|
|
|
|
if len(INTERNAL_SEG_LIST): |
|
|
|
|
route_name = RouteName(cls.test_route.route) |
|
|
|
|
return LogReader(f"cd:/{route_name.dongle_id}/{route_name.time_str}/{seg}/rlog.bz2") |
|
|
|
|
else: |
|
|
|
|
# Attempt to use CI bucket first |
|
|
|
|
try: |
|
|
|
|
return LogReader(get_url(cls.test_route.route, seg)) |
|
|
|
|
except Exception: |
|
|
|
|
cls.test_route_on_bucket = False |
|
|
|
|
|
|
|
|
|
# Route is not in CI bucket, assume either user has access (private), or it is public |
|
|
|
|
# test_route_on_ci_bucket will fail when running in CI |
|
|
|
|
return LogReader(Route(cls.test_route.route).log_paths()[seg]) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def setUpClass(cls): |
|
|
|
|
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'): |
|
|
|
|
raise unittest.SkipTest |
|
|
|
|
|
|
|
|
|
if 'FILTER' in os.environ: |
|
|
|
|
if not cls.car_model.startswith(tuple(os.environ.get('FILTER').split(','))): |
|
|
|
|
raise unittest.SkipTest |
|
|
|
|
|
|
|
|
|
if cls.test_route is None: |
|
|
|
|
if cls.car_model in non_tested_cars: |
|
|
|
|
print(f"Skipping tests for {cls.car_model}: missing route") |
|
|
|
|
raise unittest.SkipTest |
|
|
|
|
raise Exception(f"missing test route for {cls.car_model}") |
|
|
|
|
|
|
|
|
|
test_segs = (2, 1, 0) |
|
|
|
|
if cls.test_route.segment is not None: |
|
|
|
|
test_segs = (cls.test_route.segment,) |
|
|
|
|
|
|
|
|
|
for seg in test_segs: |
|
|
|
|
try: |
|
|
|
|
lr = cls.get_logreader(seg) |
|
|
|
|
except Exception: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
def get_testing_data(cls, lr): |
|
|
|
|
car_fw = [] |
|
|
|
|
can_msgs = [] |
|
|
|
|
cls.elm_frame = None |
|
|
|
@ -154,10 +124,55 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
cls.car_safety_mode_frame = len(can_msgs) |
|
|
|
|
|
|
|
|
|
if len(can_msgs) > int(50 / DT_CTRL): |
|
|
|
|
break |
|
|
|
|
else: |
|
|
|
|
return car_fw, can_msgs, experimental_long |
|
|
|
|
|
|
|
|
|
raise Exception("no can data found") |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def get_route_data(cls, test_segs): |
|
|
|
|
# Attempt to get route data for one of test_segs |
|
|
|
|
for seg in test_segs: |
|
|
|
|
try: |
|
|
|
|
lr = cls.get_logreader_primary(seg) |
|
|
|
|
return cls.get_testing_data(lr) |
|
|
|
|
except Exception: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
if not len(INTERNAL_SEG_LIST): |
|
|
|
|
cls.test_route_on_bucket = False |
|
|
|
|
|
|
|
|
|
# fallback to public route |
|
|
|
|
for seg in test_segs: |
|
|
|
|
try: |
|
|
|
|
lr = LogReader(Route(cls.test_route.route).log_paths()[seg]) |
|
|
|
|
return cls.get_testing_data(lr) |
|
|
|
|
except Exception: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded and public?") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def setUpClass(cls): |
|
|
|
|
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'): |
|
|
|
|
raise unittest.SkipTest |
|
|
|
|
|
|
|
|
|
if 'FILTER' in os.environ: |
|
|
|
|
if not cls.car_model.startswith(tuple(os.environ.get('FILTER').split(','))): |
|
|
|
|
raise unittest.SkipTest |
|
|
|
|
|
|
|
|
|
if cls.test_route is None: |
|
|
|
|
if cls.car_model in non_tested_cars: |
|
|
|
|
print(f"Skipping tests for {cls.car_model}: missing route") |
|
|
|
|
raise unittest.SkipTest |
|
|
|
|
raise Exception(f"missing test route for {cls.car_model}") |
|
|
|
|
|
|
|
|
|
test_segs = (2, 1, 0) |
|
|
|
|
if cls.test_route.segment is not None: |
|
|
|
|
test_segs = (cls.test_route.segment,) |
|
|
|
|
|
|
|
|
|
car_fw, can_msgs, experimental_long = cls.get_route_data(test_segs) |
|
|
|
|
|
|
|
|
|
# if relay is expected to be open in the route |
|
|
|
|
cls.openpilot_enabled = cls.car_safety_mode_frame is not None |
|
|
|
|
|
|
|
|
|