@ -36,6 +36,7 @@ JOB_ID = int(os.environ.get("JOB_ID", "0"))
INTERNAL_SEG_LIST = os . environ . get ( " INTERNAL_SEG_LIST " , " " )
INTERNAL_SEG_CNT = int ( os . environ . get ( " INTERNAL_SEG_CNT " , " 0 " ) )
MAX_EXAMPLES = int ( os . environ . get ( " MAX_EXAMPLES " , " 50 " ) )
CI = os . environ . get ( " CI " , None ) is not None
def get_test_cases ( ) - > List [ Tuple [ str , Optional [ CarTestRoute ] ] ] :
@ -67,13 +68,94 @@ def get_test_cases() -> List[Tuple[str, Optional[CarTestRoute]]]:
class TestCarModelBase ( unittest . TestCase ) :
car_model : Optional [ str ] = None
test_route : Optional [ CarTestRoute ] = None
ci : bool = True
test_route_on_bucket : bool = True # whether the route is on the preserved CI bucket
can_msgs : List [ capnp . lib . capnp . _DynamicStructReader ]
fingerprint : dict [ int , dict [ int , int ] ]
elm_frame : Optional [ int ]
car_safety_mode_frame : Optional [ int ]
@classmethod
def get_logreader ( cls , seg ) :
if len ( INTERNAL_SEG_LIST ) :
route_name = RouteName ( cls . test_route . route )
return LogReader ( f " cd:/ { route_name . dongle_id } / { route_name . time_str } / { seg } /rlog.bz2 " )
else :
return LogReader ( get_url ( cls . test_route . route , seg ) )
@classmethod
def get_testing_data_from_logreader ( cls , lr ) :
car_fw = [ ]
can_msgs = [ ]
cls . elm_frame = None
cls . car_safety_mode_frame = None
cls . fingerprint = gen_empty_fingerprint ( )
experimental_long = False
for msg in lr :
if msg . which ( ) == " can " :
can_msgs . append ( msg )
if len ( can_msgs ) < = FRAME_FINGERPRINT :
for m in msg . can :
if m . src < 64 :
cls . fingerprint [ m . src ] [ m . address ] = len ( m . dat )
elif msg . which ( ) == " carParams " :
car_fw = msg . carParams . carFw
if msg . carParams . openpilotLongitudinalControl :
experimental_long = True
if cls . car_model is None and not cls . ci :
cls . car_model = msg . carParams . carFingerprint
# Log which can frame the panda safety mode left ELM327, for CAN validity checks
elif msg . which ( ) == ' pandaStates ' :
for ps in msg . pandaStates :
if cls . elm_frame is None and ps . safetyModel != SafetyModel . elm327 :
cls . elm_frame = len ( can_msgs )
if cls . car_safety_mode_frame is None and ps . safetyModel not in \
( SafetyModel . elm327 , SafetyModel . noOutput ) :
cls . car_safety_mode_frame = len ( can_msgs )
elif msg . which ( ) == ' pandaStateDEPRECATED ' :
if cls . elm_frame is None and msg . pandaStateDEPRECATED . safetyModel != SafetyModel . elm327 :
cls . elm_frame = len ( can_msgs )
if cls . car_safety_mode_frame is None and msg . pandaStateDEPRECATED . safetyModel not in \
( SafetyModel . elm327 , SafetyModel . noOutput ) :
cls . car_safety_mode_frame = len ( can_msgs )
if len ( can_msgs ) > int ( 50 / DT_CTRL ) :
return car_fw , can_msgs , experimental_long
raise Exception ( " no can data found " )
@classmethod
def get_testing_data ( cls ) :
test_segs = ( 2 , 1 , 0 )
if cls . test_route . segment is not None :
test_segs = ( cls . test_route . segment , )
# Try the primary method first (CI or internal)
for seg in test_segs :
try :
lr = cls . get_logreader ( seg )
return cls . get_testing_data_from_logreader ( lr )
except Exception :
pass
# Route is not in CI bucket, assume either user has access (private), or it is public
# test_route_on_ci_bucket will fail when running in CI
if not len ( INTERNAL_SEG_LIST ) :
cls . test_route_on_bucket = False
for seg in test_segs :
try :
lr = LogReader ( Route ( cls . test_route . route ) . log_paths ( ) [ seg ] )
return cls . get_testing_data_from_logreader ( lr )
except Exception :
pass
raise Exception ( f " Route: { repr ( cls . test_route . route ) } with segments: { test_segs } not found or no CAN msgs found. Is it uploaded and public? " )
@classmethod
def setUpClass ( cls ) :
if cls . __name__ == ' TestCarModel ' or cls . __name__ . endswith ( ' Base ' ) :
@ -89,63 +171,7 @@ class TestCarModelBase(unittest.TestCase):
raise unittest . SkipTest
raise Exception ( f " missing test route for { cls . car_model } " )
test_segs = ( 2 , 1 , 0 )
if cls . test_route . segment is not None :
test_segs = ( cls . test_route . segment , )
for seg in test_segs :
try :
if len ( INTERNAL_SEG_LIST ) :
route_name = RouteName ( cls . test_route . route )
lr = LogReader ( f " cd:/ { route_name . dongle_id } / { route_name . time_str } / { seg } /rlog.bz2 " )
elif cls . ci :
lr = LogReader ( get_url ( cls . test_route . route , seg ) )
else :
lr = LogReader ( Route ( cls . test_route . route ) . log_paths ( ) [ seg ] )
except Exception :
continue
car_fw = [ ]
can_msgs = [ ]
cls . elm_frame = None
cls . car_safety_mode_frame = None
cls . fingerprint = gen_empty_fingerprint ( )
experimental_long = False
for msg in lr :
if msg . which ( ) == " can " :
can_msgs . append ( msg )
if len ( can_msgs ) < = FRAME_FINGERPRINT :
for m in msg . can :
if m . src < 64 :
cls . fingerprint [ m . src ] [ m . address ] = len ( m . dat )
elif msg . which ( ) == " carParams " :
car_fw = msg . carParams . carFw
if msg . carParams . openpilotLongitudinalControl :
experimental_long = True
if cls . car_model is None and not cls . ci :
cls . car_model = msg . carParams . carFingerprint
# Log which can frame the panda safety mode left ELM327, for CAN validity checks
elif msg . which ( ) == ' pandaStates ' :
for ps in msg . pandaStates :
if cls . elm_frame is None and ps . safetyModel != SafetyModel . elm327 :
cls . elm_frame = len ( can_msgs )
if cls . car_safety_mode_frame is None and ps . safetyModel not in \
( SafetyModel . elm327 , SafetyModel . noOutput ) :
cls . car_safety_mode_frame = len ( can_msgs )
elif msg . which ( ) == ' pandaStateDEPRECATED ' :
if cls . elm_frame is None and msg . pandaStateDEPRECATED . safetyModel != SafetyModel . elm327 :
cls . elm_frame = len ( can_msgs )
if cls . car_safety_mode_frame is None and msg . pandaStateDEPRECATED . safetyModel not in \
( SafetyModel . elm327 , SafetyModel . noOutput ) :
cls . car_safety_mode_frame = len ( can_msgs )
if len ( can_msgs ) > int ( 50 / DT_CTRL ) :
break
else :
raise Exception ( f " Route: { repr ( cls . test_route . route ) } with segments: { test_segs } not found or no CAN msgs found. Is it uploaded? " )
car_fw , can_msgs , experimental_long = cls . get_testing_data ( )
# if relay is expected to be open in the route
cls . openpilot_enabled = cls . car_safety_mode_frame is not None
@ -451,6 +477,11 @@ class TestCarModelBase(unittest.TestCase):
failed_checks = { k : v for k , v in checks . items ( ) if v > 0 }
self . assertFalse ( len ( failed_checks ) , f " panda safety doesn ' t agree with openpilot: { failed_checks } " )
@unittest . skipIf ( not CI , " Accessing non CI-bucket routes is allowed only when not in CI " )
def test_route_on_ci_bucket ( self ) :
self . assertTrue ( self . test_route_on_bucket , " Route not on CI bucket. " +
" This is fine to fail for WIP car ports, just let us know and we can upload your routes to the CI bucket. " )
@parameterized_class ( ( ' car_model ' , ' test_route ' ) , get_test_cases ( ) )
@pytest . mark . xdist_group_class_property ( ' test_route ' )