|
|
@ -23,9 +23,8 @@ from openpilot.selfdrive.car.tests.routes import non_tested_cars, routes, CarTes |
|
|
|
from openpilot.selfdrive.controls.controlsd import Controls |
|
|
|
from openpilot.selfdrive.controls.controlsd import Controls |
|
|
|
from openpilot.selfdrive.test.helpers import read_segment_list |
|
|
|
from openpilot.selfdrive.test.helpers import read_segment_list |
|
|
|
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT |
|
|
|
from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT |
|
|
|
from openpilot.tools.lib.comma_car_segments import get_url |
|
|
|
from openpilot.tools.lib.logreader import LogReader, internal_source, openpilotci_source |
|
|
|
from openpilot.tools.lib.logreader import LogReader |
|
|
|
from openpilot.tools.lib.route import SegmentName |
|
|
|
from openpilot.tools.lib.route import Route, SegmentName, RouteName |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from panda.tests.libpanda import libpanda_py |
|
|
|
from panda.tests.libpanda import libpanda_py |
|
|
|
|
|
|
|
|
|
|
@ -75,14 +74,6 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
elm_frame: Optional[int] |
|
|
|
elm_frame: Optional[int] |
|
|
|
car_safety_mode_frame: Optional[int] |
|
|
|
car_safety_mode_frame: Optional[int] |
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
|
|
|
def get_logreader(cls, seg): |
|
|
|
|
|
|
|
if len(INTERNAL_SEG_LIST): |
|
|
|
|
|
|
|
route_name = RouteName(cls.test_route.route) |
|
|
|
|
|
|
|
return LogReader(f"cd:/{route_name.dongle_id}/{route_name.time_str}/{seg}/rlog.bz2") |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
return LogReader(get_url(cls.test_route.route, seg)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
@classmethod |
|
|
|
def get_testing_data_from_logreader(cls, lr): |
|
|
|
def get_testing_data_from_logreader(cls, lr): |
|
|
|
car_fw = [] |
|
|
|
car_fw = [] |
|
|
@ -133,22 +124,26 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
if cls.test_route.segment is not None: |
|
|
|
if cls.test_route.segment is not None: |
|
|
|
test_segs = (cls.test_route.segment,) |
|
|
|
test_segs = (cls.test_route.segment,) |
|
|
|
|
|
|
|
|
|
|
|
# Try the primary method first (CI or internal) |
|
|
|
is_internal = len(INTERNAL_SEG_LIST) |
|
|
|
|
|
|
|
|
|
|
|
for seg in test_segs: |
|
|
|
for seg in test_segs: |
|
|
|
|
|
|
|
segment_range = f"{cls.test_route.route}/{seg}" |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
try: |
|
|
|
lr = cls.get_logreader(seg) |
|
|
|
lr = LogReader(segment_range, default_source=internal_source if is_internal else openpilotci_source) |
|
|
|
return cls.get_testing_data_from_logreader(lr) |
|
|
|
return cls.get_testing_data_from_logreader(lr) |
|
|
|
except Exception: |
|
|
|
except Exception: |
|
|
|
pass |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
# Route is not in CI bucket, assume either user has access (private), or it is public |
|
|
|
# Route is not in CI bucket, assume either user has access (private), or it is public |
|
|
|
# test_route_on_ci_bucket will fail when running in CI |
|
|
|
# test_route_on_ci_bucket will fail when running in CI |
|
|
|
if not len(INTERNAL_SEG_LIST): |
|
|
|
if not is_internal: |
|
|
|
cls.test_route_on_bucket = False |
|
|
|
cls.test_route_on_bucket = False |
|
|
|
|
|
|
|
|
|
|
|
for seg in test_segs: |
|
|
|
for seg in test_segs: |
|
|
|
|
|
|
|
segment_range = f"{cls.test_route.route}/{seg}" |
|
|
|
try: |
|
|
|
try: |
|
|
|
lr = LogReader(Route(cls.test_route.route).log_paths()[seg]) |
|
|
|
lr = LogReader(segment_range) |
|
|
|
return cls.get_testing_data_from_logreader(lr) |
|
|
|
return cls.get_testing_data_from_logreader(lr) |
|
|
|
except Exception: |
|
|
|
except Exception: |
|
|
|
pass |
|
|
|
pass |
|
|
|