@ -16,7 +16,7 @@ from selfdrive.car.gm.values import CAR as GM
from selfdrive . car . honda . values import CAR as HONDA , HONDA_BOSCH
from selfdrive . car . honda . values import CAR as HONDA , HONDA_BOSCH
from selfdrive . car . hyundai . values import CAR as HYUNDAI
from selfdrive . car . hyundai . values import CAR as HYUNDAI
from selfdrive . car . toyota . values import CAR as TOYOTA
from selfdrive . car . toyota . values import CAR as TOYOTA
from selfdrive . car . tests . routes import routes , non_tested_cars
from selfdrive . car . tests . routes import non_tested_cars , routes , TestRoute
from selfdrive . test . openpilotci import get_url
from selfdrive . test . openpilotci import get_url
from tools . lib . logreader import LogReader
from tools . lib . logreader import LogReader
@ -36,9 +36,9 @@ ignore_addr_checks_valid = [
# build list of test cases
# build list of test cases
routes_by_car = defaultdict ( set )
routes_by_car = defaultdict ( set )
for r in routes :
for r in routes :
routes_by_car [ r . car_fingerprint ] . add ( r . route )
routes_by_car [ r . car_fingerprint ] . add ( r )
test_cases : List [ Tuple [ str , Optional [ str ] ] ] = [ ]
test_cases : List [ Tuple [ str , Optional [ TestRoute ] ] ] = [ ]
for i , c in enumerate ( sorted ( all_known_cars ( ) ) ) :
for i , c in enumerate ( sorted ( all_known_cars ( ) ) ) :
if i % NUM_JOBS == JOB_ID :
if i % NUM_JOBS == JOB_ID :
test_cases . extend ( ( c , r ) for r in routes_by_car . get ( c , ( None , ) ) )
test_cases . extend ( ( c , r ) for r in routes_by_car . get ( c , ( None , ) ) )
@ -46,22 +46,26 @@ for i, c in enumerate(sorted(all_known_cars())):
SKIP_ENV_VAR = " SKIP_LONG_TESTS "
SKIP_ENV_VAR = " SKIP_LONG_TESTS "
@parameterized_class ( ( ' car_model ' , ' route ' ) , test_cases )
@parameterized_class ( ( ' car_model ' , ' test_ route' ) , test_cases )
class TestCarModel ( unittest . TestCase ) :
class TestCarModel ( unittest . TestCase ) :
@unittest . skipIf ( SKIP_ENV_VAR in os . environ , f " Long running test skipped. Unset { SKIP_ENV_VAR } to run " )
@unittest . skipIf ( SKIP_ENV_VAR in os . environ , f " Long running test skipped. Unset { SKIP_ENV_VAR } to run " )
@classmethod
@classmethod
def setUpClass ( cls ) :
def setUpClass ( cls ) :
if cls . route is None :
if cls . test_ route is None :
if cls . car_model in non_tested_cars :
if cls . car_model in non_tested_cars :
print ( f " Skipping tests for { cls . car_model } : missing route " )
print ( f " Skipping tests for { cls . car_model } : missing route " )
raise unittest . SkipTest
raise unittest . SkipTest
raise Exception ( f " missing test route for { cls . car_model } " )
raise Exception ( f " missing test route for { cls . car_model } " )
disable_radar = False
disable_radar = False
for seg in ( 2 , 1 , 0 ) :
test_segs = ( 2 , 1 , 0 )
if cls . test_route . segment is not None :
test_segs = ( cls . test_route . segment , )
for seg in test_segs :
try :
try :
lr = LogReader ( get_url ( cls . route , seg ) )
lr = LogReader ( get_url ( cls . test_route . route , seg ) )
except Exception :
except Exception :
continue
continue
@ -80,7 +84,7 @@ class TestCarModel(unittest.TestCase):
if len ( can_msgs ) > int ( 50 / DT_CTRL ) :
if len ( can_msgs ) > int ( 50 / DT_CTRL ) :
break
break
else :
else :
raise Exception ( f " Route { repr ( cls . route ) } not found or no CAN msgs found. Is it uploaded? " )
raise Exception ( f " Route: { repr ( cls . test_route . route ) } with segments: { test_segs } not found or no CAN msgs found. Is it uploaded? " )
cls . can_msgs = sorted ( can_msgs , key = lambda msg : msg . logMonoTime )
cls . can_msgs = sorted ( can_msgs , key = lambda msg : msg . logMonoTime )