#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . test . helpers  import  with_processes 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# only tests for EON and TICI 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . hardware  import  EON ,  TICI 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								TEST_TIMESPAN  =  30  # random.randint(60, 180) # seconds 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								SKIP_FRAME_TOLERANCE  =  0 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								LAG_FRAME_TOLERANCE  =  2  # ms 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								FPS_BASELINE  =  20 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								CAMERAS  =  { 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  " roadCameraState " :  FPS_BASELINE , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  " driverCameraState " :  FPS_BASELINE  / /  2 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  TICI : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  CAMERAS [ " driverCameraState " ]  =  FPS_BASELINE 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  CAMERAS [ " wideRoadCameraState " ]  =  FPS_BASELINE 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestCamerad ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUpClass ( cls ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  ( EON  or  TICI ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  unittest . SkipTest 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @with_processes ( [ ' camerad ' ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_frame_packets ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( " checking frame pkts continuity " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( TEST_TIMESPAN ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sm  =  messaging . SubMaster ( [ socket_name  for  socket_name  in  CAMERAS ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    last_frame_id  =  dict . fromkeys ( CAMERAS ,  None ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    last_ts  =  dict . fromkeys ( CAMERAS ,  None ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    start_time_sec  =  time . time ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    while  time . time ( ) -  start_time_sec  <  TEST_TIMESPAN : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sm . update ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  camera  in  CAMERAS : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  sm . updated [ camera ] : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          ct  =  ( sm [ camera ] . timestampEof  if  not  TICI  else  sm [ camera ] . timestampSof )  /  1e6 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  last_frame_id [ camera ]  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            last_frame_id [ camera ]  =  sm [ camera ] . frameId 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            last_ts [ camera ]  =  ct 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            continue 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          dfid  =  sm [ camera ] . frameId  -  last_frame_id [ camera ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          self . assertTrue ( abs ( dfid  -  1 )  < =  SKIP_FRAME_TOLERANCE ,  " %s  frame id diff is  %d "  %  ( camera ,  dfid ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          dts  =  ct  -  last_ts [ camera ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertTrue ( abs ( dts  -  ( 1000 / CAMERAS [ camera ] ) )  <  LAG_FRAME_TOLERANCE ,  " %s  frame t(ms) diff is  %f "  %  ( camera ,  dts ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          last_frame_id [ camera ]  =  sm [ camera ] . frameId 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          last_ts [ camera ]  =  ct 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      time . sleep ( 0.01 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )