#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  sys 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  hypothesis . strategies  as  st 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  hypothesis  import  given ,  settings ,  note 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . car . toyota . values  import  CAR  as  TOYOTA 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  selfdrive . test . process_replay . process_replay  as  pr 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_process_config ( process ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  return  [ cfg  for  cfg  in  pr . CONFIGS  if  cfg . proc_name  ==  process ] [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_event_union_strategy ( r ,  name ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' valid ' :  st . just ( True ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' logMonoTime ' :  st . integers ( min_value = 0 ,  max_value = 2 * * 64 - 1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    name :  r [ name [ 0 ] . upper ( )  +  name [ 1 : ] ] , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_strategy_for_events ( event_types ,  finite = False ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # TODO: generate automatically based on capnp definitions 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  floats ( * * kwargs ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    allow_nan  =  False  if  finite  else  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    allow_infinity  =  False  if  finite  else  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  st . floats ( * * kwargs ,  allow_nan = allow_nan ,  allow_infinity = allow_infinity ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r  =  { } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' liveLocationKalman.Measurement ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' value ' :  st . lists ( floats ( ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' std ' :  st . lists ( floats ( ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' valid ' :  st . just ( True ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' LiveLocationKalman ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' angularVelocityCalibrated ' :  r [ ' liveLocationKalman.Measurement ' ] , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' inputsOK ' :  st . booleans ( ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' posenetOK ' :  st . booleans ( ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' CarState ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' vEgo ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' vEgoRaw ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' steeringPressed ' :  st . booleans ( ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' steeringAngleDeg ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' CameraOdometry ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' frameId ' :  st . integers ( min_value = 0 ,  max_value = 2 * * 32  -  1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' timestampEof ' :  st . integers ( min_value = 0 ,  max_value = 2 * * 64  -  1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' trans ' :  st . lists ( floats ( width = 32 ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' rot ' :  st . lists ( floats ( width = 32 ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' transStd ' :  st . lists ( floats ( width = 32 ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' rotStd ' :  st . lists ( floats ( width = 32 ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' SensorEventData.SensorVec ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' v ' :  st . lists ( floats ( width = 32 ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' status ' :  st . just ( 1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' SensorEventData_gyro ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' version ' :  st . just ( 1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' sensor ' :  st . just ( 5 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' type ' :  st . just ( 16 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' timestamp ' :  st . integers ( min_value = 0 ,  max_value = 2 * * 63  -  1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' source ' :  st . just ( 8 ) ,   # BMX055 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' gyroUncalibrated ' :  r [ ' SensorEventData.SensorVec ' ] , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' SensorEventData_accel ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' version ' :  st . just ( 1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' sensor ' :  st . just ( 1 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' type ' :  st . just ( 1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' timestamp ' :  st . integers ( min_value = 0 ,  max_value = 2 * * 63  -  1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' source ' :  st . just ( 8 ) ,   # BMX055 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' acceleration ' :  r [ ' SensorEventData.SensorVec ' ] , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' SensorEvents ' ]  =  st . lists ( st . one_of ( r [ ' SensorEventData_gyro ' ] ,  r [ ' SensorEventData_accel ' ] ) ,  min_size = 1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' GpsLocationExternal ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' flags ' :  st . just ( 1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' latitude ' :  floats ( ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' longitude ' :  floats ( ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' altitude ' :  floats ( ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' speed ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' bearingDeg ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' accuracy ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ' timestamp ' :  st . integers ( min_value = 0 ,  max_value = 2 * * 63  -  1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' source ' :  st . just ( 6 ) ,   # Ublox 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' vNED ' :  st . lists ( floats ( width = 32 ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' verticalAccuracy ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' bearingAccuracyDeg ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' speedAccuracy ' :  floats ( width = 32 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r [ ' LiveCalibration ' ]  =  st . fixed_dictionaries ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' calStatus ' :  st . integers ( min_value = 0 ,  max_value = 1 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' rpyCalib ' :  st . lists ( floats ( width = 32 ) ,  min_size = 3 ,  max_size = 3 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  st . lists ( st . one_of ( * [ get_event_union_strategy ( r ,  n )  for  n  in  event_types ] ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  get_strategy_for_process ( process ,  finite = False ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  get_strategy_for_events ( get_process_config ( process ) . pub_sub . keys ( ) ,  finite ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  convert_to_lr ( msgs ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  [ log . Event . new_message ( * * m ) . as_reader ( )  for  m  in  msgs ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  is_finite ( d ,  exclude = [ ] ,  prefix = " " ) :   # pylint: disable=dangerous-default-value 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ret  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  k ,  v  in  d . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    name  =  prefix  +  f " { k } " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  name  in  exclude : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  isinstance ( v ,  dict ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  not  is_finite ( v ,  exclude ,  name  +  " . " ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ret  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        if  not  np . isfinite ( v ) . all ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          note ( ( name ,  v ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          ret  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      except  TypeError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  ret 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  test_process ( dat ,  name ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cfg  =  get_process_config ( name ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  lr  =  convert_to_lr ( dat ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  pr . TIMEOUT  =  0.1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  pr . replay_process ( cfg ,  lr ,  TOYOTA . COROLLA_TSS2 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestFuzzy ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @given ( get_strategy_for_process ( ' paramsd ' ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @settings ( deadline = 1000 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_paramsd ( self ,  dat ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  r  in  test_process ( dat ,  ' paramsd ' ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      d  =  r . liveParameters . to_dict ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  is_finite ( d ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @given ( get_strategy_for_process ( ' locationd ' ,  finite = True ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @settings ( deadline = 1000 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_locationd ( self ,  dat ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    exclude  =  [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ' positionGeodetic.std ' , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ' velocityNED.std ' , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ' orientationNED.std ' , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ' calibratedOrientationECEF.std ' , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  r  in  test_process ( dat ,  ' locationd ' ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      d  =  r . liveLocationKalman . to_dict ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  is_finite ( d ,  exclude ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  procs  =  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' locationd ' :  TestFuzzy ( ) . test_locationd , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' paramsd ' :  TestFuzzy ( ) . test_paramsd , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  len ( sys . argv )  !=  2 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( " Usage: ./test_fuzzy.py <process name> " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sys . exit ( 0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  proc  =  sys . argv [ 1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  proc  not  in  procs : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( f " { proc }  not available " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sys . exit ( 0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    procs [ proc ] ( )