#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  capnp 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  importlib 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  pytest 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  random 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  collections  import  defaultdict ,  Counter 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  hypothesis . strategies  as  st 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  hypothesis  import  Phase ,  given ,  settings 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  typing  import  List ,  Optional ,  Tuple 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  parameterized  import  parameterized_class 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  cereal  import  messaging ,  log ,  car 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . basedir  import  BASEDIR 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . realtime  import  DT_CTRL 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car  import  gen_empty_fingerprint 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . fingerprints  import  all_known_cars 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . car_helpers  import  FRAME_FINGERPRINT ,  interfaces 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . honda . values  import  CAR  as  HONDA ,  HONDA_BOSCH 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . tests . routes  import  non_tested_cars ,  routes ,  CarTestRoute 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . selfdrive . controls . controlsd  import  Controls 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . selfdrive . test . helpers  import  read_segment_list 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . system . hardware . hw  import  DEFAULT_DOWNLOAD_CACHE_ROOT 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . tools . lib . logreader  import  LogReader ,  internal_source ,  openpilotci_source 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . tools . lib . route  import  SegmentName 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  panda . tests . libpanda  import  libpanda_py 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								EventName  =  car . CarEvent . EventName 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								PandaType  =  log . PandaState . PandaType 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								SafetyModel  =  car . CarParams . SafetyModel 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								NUM_JOBS  =  int ( os . environ . get ( " NUM_JOBS " ,  " 1 " ) ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								JOB_ID  =  int ( os . environ . get ( " JOB_ID " ,  " 0 " ) ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								INTERNAL_SEG_LIST  =  os . environ . get ( " INTERNAL_SEG_LIST " ,  " " ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								INTERNAL_SEG_CNT  =  int ( os . environ . get ( " INTERNAL_SEG_CNT " ,  " 0 " ) ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								MAX_EXAMPLES  =  int ( os . environ . get ( " MAX_EXAMPLES " ,  " 300 " ) ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								CI  =  os . environ . get ( " CI " ,  None )  is  not  None 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  get_test_cases ( )  - >  List [ Tuple [ str ,  Optional [ CarTestRoute ] ] ] : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  # build list of test cases 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  test_cases  =  [ ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  if  not  len ( INTERNAL_SEG_LIST ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    routes_by_car  =  defaultdict ( set ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  r  in  routes : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      routes_by_car [ r . car_model ] . add ( r ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  i ,  c  in  enumerate ( sorted ( all_known_cars ( ) ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  i  %  NUM_JOBS  ==  JOB_ID : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        test_cases . extend ( sorted ( ( c . value ,  r )  for  r  in  routes_by_car . get ( c ,  ( None , ) ) ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  else : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    segment_list  =  read_segment_list ( os . path . join ( BASEDIR ,  INTERNAL_SEG_LIST ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    segment_list  =  random . sample ( segment_list ,  INTERNAL_SEG_CNT  or  len ( segment_list ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  platform ,  segment  in  segment_list : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      segment_name  =  SegmentName ( segment ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      test_cases . append ( ( platform ,  CarTestRoute ( segment_name . route_name . canonical_name ,  platform , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                                segment = segment_name . segment_num ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  test_cases 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								@pytest . mark . slow 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								@pytest . mark . shared_download_cache 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  TestCarModelBase ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  car_model :  Optional [ str ]  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  test_route :  Optional [ CarTestRoute ]  =  None 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  test_route_on_bucket :  bool  =  True   # whether the route is on the preserved CI bucket 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  can_msgs :  List [ capnp . lib . capnp . _DynamicStructReader ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  fingerprint :  dict [ int ,  dict [ int ,  int ] ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  elm_frame :  Optional [ int ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  car_safety_mode_frame :  Optional [ int ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  get_testing_data_from_logreader ( cls ,  lr ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    car_fw  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    can_msgs  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . elm_frame  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . car_safety_mode_frame  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . fingerprint  =  gen_empty_fingerprint ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    experimental_long  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  msg  in  lr : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  msg . which ( )  ==  " can " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        can_msgs . append ( msg ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  len ( can_msgs )  < =  FRAME_FINGERPRINT : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          for  m  in  msg . can : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            if  m . src  <  64 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              cls . fingerprint [ m . src ] [ m . address ]  =  len ( m . dat ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  msg . which ( )  ==  " carParams " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        car_fw  =  msg . carParams . carFw 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  msg . carParams . openpilotLongitudinalControl : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          experimental_long  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  cls . car_model  is  None  and  not  cls . ci : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cls . car_model  =  msg . carParams . carFingerprint 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Log which can frame the panda safety mode left ELM327, for CAN validity checks 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  msg . which ( )  ==  ' pandaStates ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  ps  in  msg . pandaStates : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  cls . elm_frame  is  None  and  ps . safetyModel  !=  SafetyModel . elm327 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            cls . elm_frame  =  len ( can_msgs ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  cls . car_safety_mode_frame  is  None  and  ps . safetyModel  not  in  \
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            ( SafetyModel . elm327 ,  SafetyModel . noOutput ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            cls . car_safety_mode_frame  =  len ( can_msgs ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  msg . which ( )  ==  ' pandaStateDEPRECATED ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  cls . elm_frame  is  None  and  msg . pandaStateDEPRECATED . safetyModel  !=  SafetyModel . elm327 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cls . elm_frame  =  len ( can_msgs ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  cls . car_safety_mode_frame  is  None  and  msg . pandaStateDEPRECATED . safetyModel  not  in  \
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          ( SafetyModel . elm327 ,  SafetyModel . noOutput ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cls . car_safety_mode_frame  =  len ( can_msgs ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( can_msgs )  >  int ( 50  /  DT_CTRL ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return  car_fw ,  can_msgs ,  experimental_long 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  Exception ( " no can data found " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  get_testing_data ( cls ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    test_segs  =  ( 2 ,  1 ,  0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  cls . test_route . segment  is  not  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      test_segs  =  ( cls . test_route . segment , ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    is_internal  =  len ( INTERNAL_SEG_LIST ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  seg  in  test_segs : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      segment_range  =  f " { cls . test_route . route } / { seg } " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        lr  =  LogReader ( segment_range ,  default_source = internal_source  if  is_internal  else  openpilotci_source ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        return  cls . get_testing_data_from_logreader ( lr ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Route is not in CI bucket, assume either user has access (private), or it is public 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # test_route_on_ci_bucket will fail when running in CI 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  not  is_internal : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      cls . test_route_on_bucket  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  seg  in  test_segs : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        segment_range  =  f " { cls . test_route . route } / { seg } " 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          lr  =  LogReader ( segment_range ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          return  cls . get_testing_data_from_logreader ( lr ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    raise  Exception ( f " Route:  { repr ( cls . test_route . route ) }  with segments:  { test_segs }  not found or no CAN msgs found. Is it uploaded and public? " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUpClass ( cls ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  cls . __name__  ==  ' TestCarModel '  or  cls . __name__ . endswith ( ' Base ' ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  unittest . SkipTest 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ' FILTER '  in  os . environ : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  not  cls . car_model . startswith ( tuple ( os . environ . get ( ' FILTER ' ) . split ( ' , ' ) ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        raise  unittest . SkipTest 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  cls . test_route  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  cls . car_model  in  non_tested_cars : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        print ( f " Skipping tests for  { cls . car_model } : missing route " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        raise  unittest . SkipTest 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  Exception ( f " missing test route for  { cls . car_model } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    car_fw ,  can_msgs ,  experimental_long  =  cls . get_testing_data ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # if relay is expected to be open in the route 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cls . openpilot_enabled  =  cls . car_safety_mode_frame  is  not  None 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . can_msgs  =  sorted ( can_msgs ,  key = lambda  msg :  msg . logMonoTime ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . CarInterface ,  cls . CarController ,  cls . CarState  =  interfaces [ cls . car_model ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cls . CP  =  cls . CarInterface . get_params ( cls . car_model ,  cls . fingerprint ,  car_fw ,  experimental_long ,  docs = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  cls . CP 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  cls . CP . carFingerprint  ==  cls . car_model 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    os . environ [ " COMMA_CACHE " ]  =  DEFAULT_DOWNLOAD_CACHE_ROOT 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  tearDownClass ( cls ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    del  cls . can_msgs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUp ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . CI  =  self . CarInterface ( self . CP . copy ( ) ,  self . CarController ,  self . CarState ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . CI 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    Params ( ) . put_bool ( " OpenpilotEnabledToggle " ,  self . openpilot_enabled ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # TODO: check safetyModel is in release panda build 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . safety  =  libpanda_py . libpanda 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cfg  =  self . CP . safetyConfigs [ - 1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    set_status  =  self . safety . set_safety_hooks ( cfg . safetyModel . raw ,  cfg . safetyParam ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertEqual ( 0 ,  set_status ,  f " failed to set safetyModel  { cfg } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . safety . init_tests ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_car_params ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . CP . dashcamOnly : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . skipTest ( " no need to check carParams for dashcamOnly " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # make sure car params are within a valid range 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertGreater ( self . CP . mass ,  1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . CP . steerControlType  !=  car . CarParams . SteerControlType . angle : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      tuning  =  self . CP . lateralTuning . which ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  tuning  ==  ' pid ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertTrue ( len ( self . CP . lateralTuning . pid . kpV ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      elif  tuning  ==  ' torque ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertTrue ( self . CP . lateralTuning . torque . kf  >  0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        raise  Exception ( " unknown tuning " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_car_interface ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # TODO: also check for checksum violations from can parser 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    can_invalid_cnt  =  0 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    can_valid  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CC  =  car . CarControl . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  i ,  msg  in  enumerate ( self . can_msgs ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CS  =  self . CI . update ( CC ,  ( msg . as_builder ( ) . to_bytes ( ) , ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . CI . apply ( CC ,  msg . logMonoTime ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  CS . canValid : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        can_valid  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # wait max of 2s for low frequency msgs to be seen 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  i  >  200  or  can_valid : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        can_invalid_cnt  + =  not  CS . canValid 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertEqual ( can_invalid_cnt ,  0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_radar_interface ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    RadarInterface  =  importlib . import_module ( f ' selfdrive.car. { self . CP . carName } .radar_interface ' ) . RadarInterface 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    RI  =  RadarInterface ( self . CP ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  RI 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Since OBD port is multiplexed to bus 1 (commonly radar bus) while fingerprinting, 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # start parsing CAN messages after we've left ELM mode and can expect CAN traffic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    error_cnt  =  0 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  i ,  msg  in  enumerate ( self . can_msgs [ self . elm_frame : ] ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      rr  =  RI . update ( ( msg . as_builder ( ) . to_bytes ( ) , ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  rr  is  not  None  and  i  >  50 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        error_cnt  + =  car . RadarData . Error . canError  in  rr . errors 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertEqual ( error_cnt ,  0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_panda_safety_rx_checks ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . CP . dashcamOnly : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . skipTest ( " no need to check panda safety for dashcamOnly " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    start_ts  =  self . can_msgs [ 0 ] . logMonoTime 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    failed_addrs  =  Counter ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  can  in  self . can_msgs : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # update panda timer 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      t  =  ( can . logMonoTime  -  start_ts )  /  1e3 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . safety . set_timer ( int ( t ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # run all msgs through the safety RX hook 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  msg  in  can . can : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  msg . src  > =  64 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        to_send  =  libpanda_py . make_CANPacket ( msg . address ,  msg . src  %  4 ,  msg . dat ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . safety . safety_rx_hook ( to_send )  !=  1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          failed_addrs [ hex ( msg . address ) ]  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # ensure all msgs defined in the addr checks are valid 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . safety . safety_tick_current_safety_config ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  t  >  1e6 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . assertTrue ( self . safety . safety_config_valid ( ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      # Don't check relay malfunction on disabled routes (relay closed), 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # or before fingerprinting is done (elm327 and noOutput) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . openpilot_enabled  and  t  /  1e4  >  self . car_safety_mode_frame : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertFalse ( self . safety . get_relay_malfunction ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . safety . set_relay_malfunction ( False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( len ( failed_addrs ) ,  f " panda safety RX check failed:  { failed_addrs } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # ensure RX checks go invalid after small time with no traffic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . safety . set_timer ( int ( t  +  ( 2 * 1e6 ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . safety . safety_tick_current_safety_config ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( self . safety . safety_config_valid ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_panda_safety_tx_cases ( self ,  data = None ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ Asserts we can tx common messages """ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . CP . notCar : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . skipTest ( " Skipping test for notCar " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def  test_car_controller ( car_control ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      now_nanos  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      msgs_sent  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CI  =  self . CarInterface ( self . CP ,  self . CarController ,  self . CarState ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  _  in  range ( round ( 10.0  /  DT_CTRL ) ) :   # make sure we hit the slowest messages 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        CI . update ( car_control ,  [ ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        _ ,  sendcan  =  CI . apply ( car_control ,  now_nanos ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        now_nanos  + =  DT_CTRL  *  1e9 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        msgs_sent  + =  len ( sendcan ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  addr ,  _ ,  dat ,  bus  in  sendcan : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          to_send  =  libpanda_py . make_CANPacket ( addr ,  bus  %  4 ,  dat ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertTrue ( self . safety . safety_tx_hook ( to_send ) ,  ( addr ,  dat ,  bus ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Make sure we attempted to send messages 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . assertGreater ( msgs_sent ,  50 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Make sure we can send all messages while inactive 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CC  =  car . CarControl . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    test_car_controller ( CC ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Test cancel + general messages (controls_allowed=False & cruise_engaged=True) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . safety . set_cruise_engaged_prev ( True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CC  =  car . CarControl . new_message ( cruiseControl = { ' cancel ' :  True } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    test_car_controller ( CC ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Test resume + general messages (controls_allowed=True & cruise_engaged=True) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . safety . set_controls_allowed ( True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CC  =  car . CarControl . new_message ( cruiseControl = { ' resume ' :  True } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    test_car_controller ( CC ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  # Skip stdout/stderr capture with pytest, causes elevated memory usage 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @pytest . mark . nocapture 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @settings ( max_examples = MAX_EXAMPLES ,  deadline = None , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            phases = ( Phase . reuse ,  Phase . generate ,  Phase . shrink ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @given ( data = st . data ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_panda_safety_carstate_fuzzy ( self ,  data ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      For  each  example ,  pick  a  random  CAN  message  on  the  bus  and  fuzz  its  data , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      checking  for  panda  state  mismatches . 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . CP . dashcamOnly : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . skipTest ( " no need to check panda safety for dashcamOnly " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    valid_addrs  =  [ ( addr ,  bus ,  size )  for  bus ,  addrs  in  self . fingerprint . items ( )  for  addr ,  size  in  addrs . items ( ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    address ,  bus ,  size  =  data . draw ( st . sampled_from ( valid_addrs ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    msg_strategy  =  st . binary ( min_size = size ,  max_size = size ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    msgs  =  data . draw ( st . lists ( msg_strategy ,  min_size = 20 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CC  =  car . CarControl . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  dat  in  msgs : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # due to panda updating state selectively, only edges are expected to match 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # TODO: warm up CarState with real CAN messages to check edge of both sources 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #  (eg. toyota's gasPressed is the inverse of a signal being set) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      prev_panda_gas  =  self . safety . get_gas_pressed_prev ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      prev_panda_brake  =  self . safety . get_brake_pressed_prev ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      prev_panda_regen_braking  =  self . safety . get_regen_braking_prev ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      prev_panda_vehicle_moving  =  self . safety . get_vehicle_moving ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      prev_panda_cruise_engaged  =  self . safety . get_cruise_engaged_prev ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      prev_panda_acc_main_on  =  self . safety . get_acc_main_on ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      to_send  =  libpanda_py . make_CANPacket ( address ,  bus ,  dat ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . safety . safety_rx_hook ( to_send ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      can  =  messaging . new_message ( ' can ' ,  1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      can . can  =  [ log . CanData ( address = address ,  dat = dat ,  src = bus ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CS  =  self . CI . update ( CC ,  ( can . to_bytes ( ) , ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . safety . get_gas_pressed_prev ( )  !=  prev_panda_gas : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertEqual ( CS . gasPressed ,  self . safety . get_gas_pressed_prev ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . safety . get_brake_pressed_prev ( )  !=  prev_panda_brake : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # TODO: remove this exception once this mismatch is resolved 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        brake_pressed  =  CS . brakePressed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  CS . brakePressed  and  not  self . safety . get_brake_pressed_prev ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  self . CP . carFingerprint  in  ( HONDA . PILOT ,  HONDA . RIDGELINE )  and  CS . brake  >  0.05 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            brake_pressed  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertEqual ( brake_pressed ,  self . safety . get_brake_pressed_prev ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . safety . get_regen_braking_prev ( )  !=  prev_panda_regen_braking : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertEqual ( CS . regenBraking ,  self . safety . get_regen_braking_prev ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . safety . get_vehicle_moving ( )  !=  prev_panda_vehicle_moving : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertEqual ( not  CS . standstill ,  self . safety . get_vehicle_moving ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  not  ( self . CP . carName  ==  " honda "  and  self . CP . carFingerprint  not  in  HONDA_BOSCH ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . safety . get_cruise_engaged_prev ( )  !=  prev_panda_cruise_engaged : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertEqual ( CS . cruiseState . enabled ,  self . safety . get_cruise_engaged_prev ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . CP . carName  ==  " honda " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . safety . get_acc_main_on ( )  !=  prev_panda_acc_main_on : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertEqual ( CS . cruiseState . available ,  self . safety . get_acc_main_on ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_panda_safety_carstate ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      Assert  that  panda  safety  matches  openpilot ' s carState 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . CP . dashcamOnly : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . skipTest ( " no need to check panda safety for dashcamOnly " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CC  =  car . CarControl . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # warm up pass, as initial states may be different 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  can  in  self . can_msgs [ : 300 ] : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . CI . update ( CC ,  ( can . as_builder ( ) . to_bytes ( ) ,  ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      for  msg  in  filter ( lambda  m :  m . src  in  range ( 64 ) ,  can . can ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        to_send  =  libpanda_py . make_CANPacket ( msg . address ,  msg . src  %  4 ,  msg . dat ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . safety . safety_rx_hook ( to_send ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    controls_allowed_prev  =  False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    CS_prev  =  car . CarState . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    checks  =  defaultdict ( lambda :  0 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    controlsd  =  Controls ( CI = self . CI ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    controlsd . initialized  =  True 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  idx ,  can  in  enumerate ( self . can_msgs ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CS  =  self . CI . update ( CC ,  ( can . as_builder ( ) . to_bytes ( ) ,  ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      for  msg  in  filter ( lambda  m :  m . src  in  range ( 64 ) ,  can . can ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        to_send  =  libpanda_py . make_CANPacket ( msg . address ,  msg . src  %  4 ,  msg . dat ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ret  =  self . safety . safety_rx_hook ( to_send ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertEqual ( 1 ,  ret ,  f " safety rx failed ( { ret =} ):  { to_send } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      # Skip first frame so CS_prev is properly initialized 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  idx  ==  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        CS_prev  =  CS 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Button may be left pressed in warm up period 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  not  self . CP . pcmCruise : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . safety . set_controls_allowed ( 0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # TODO: check rest of panda's carstate (steering, ACC main on, etc.) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      checks [ ' gasPressed ' ]  + =  CS . gasPressed  !=  self . safety . get_gas_pressed_prev ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      checks [ ' standstill ' ]  + =  CS . standstill  ==  self . safety . get_vehicle_moving ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # TODO: remove this exception once this mismatch is resolved 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      brake_pressed  =  CS . brakePressed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  CS . brakePressed  and  not  self . safety . get_brake_pressed_prev ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        if  self . CP . carFingerprint  in  ( HONDA . PILOT ,  HONDA . RIDGELINE )  and  CS . brake  >  0.05 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          brake_pressed  =  False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      checks [ ' brakePressed ' ]  + =  brake_pressed  !=  self . safety . get_brake_pressed_prev ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      checks [ ' regenBraking ' ]  + =  CS . regenBraking  !=  self . safety . get_regen_braking_prev ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . CP . pcmCruise : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        # On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages). 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . CP . carName  ==  " honda "  and  self . CP . carFingerprint  not  in  HONDA_BOSCH : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          # only the rising edges are expected to match 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  CS . cruiseState . enabled  and  not  CS_prev . cruiseState . enabled : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            checks [ ' controlsAllowed ' ]  + =  not  self . safety . get_controls_allowed ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          checks [ ' controlsAllowed ' ]  + =  not  CS . cruiseState . enabled  and  self . safety . get_controls_allowed ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # TODO: fix notCar mismatch 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  not  self . CP . notCar : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          checks [ ' cruiseState ' ]  + =  CS . cruiseState . enabled  !=  self . safety . get_cruise_engaged_prev ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Check for enable events on rising edge of controls allowed 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        controlsd . update_events ( CS ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        controlsd . CS_prev  =  CS 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        button_enable  =  ( any ( evt . enable  for  evt  in  CS . events )  and 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                         not  any ( evt  ==  EventName . pedalPressed  for  evt  in  controlsd . events . names ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        mismatch  =  button_enable  !=  ( self . safety . get_controls_allowed ( )  and  not  controls_allowed_prev ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        checks [ ' controlsAllowed ' ]  + =  mismatch 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        controls_allowed_prev  =  self . safety . get_controls_allowed ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  button_enable  and  not  mismatch : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . safety . set_controls_allowed ( False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . CP . carName  ==  " honda " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        checks [ ' mainOn ' ]  + =  CS . cruiseState . available  !=  self . safety . get_acc_main_on ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      CS_prev  =  CS 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    failed_checks  =  { k :  v  for  k ,  v  in  checks . items ( )  if  v  >  0 } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( len ( failed_checks ) ,  f " panda safety doesn ' t agree with openpilot:  { failed_checks } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @unittest . skipIf ( not  CI ,  " Accessing non CI-bucket routes is allowed only when not in CI " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_route_on_ci_bucket ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( self . test_route_on_bucket ,  " Route not on CI bucket.  "  + 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                    " This is fine to fail for WIP car ports, just let us know and we can upload your routes to the CI bucket. " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								@parameterized_class ( ( ' car_model ' ,  ' test_route ' ) ,  get_test_cases ( ) ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								@pytest . mark . xdist_group_class_property ( ' test_route ' ) 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  TestCarModel ( TestCarModelBase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )