You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							175 lines
						
					
					
						
							5.3 KiB
						
					
					
				
			
		
		
	
	
							175 lines
						
					
					
						
							5.3 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import sys
 | |
| import unittest
 | |
| 
 | |
| import hypothesis.strategies as st
 | |
| import numpy as np
 | |
| from hypothesis import given, settings, note
 | |
| 
 | |
| from cereal import log
 | |
| from selfdrive.car.toyota.values import CAR as TOYOTA
 | |
| import selfdrive.test.process_replay.process_replay as pr
 | |
| 
 | |
| 
 | |
| def get_process_config(process):
 | |
|   return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0]
 | |
| 
 | |
| 
 | |
| def get_event_union_strategy(r, name):
 | |
|   return st.fixed_dictionaries({
 | |
|     'valid': st.just(True),
 | |
|     'logMonoTime': st.integers(min_value=0, max_value=2**64-1),
 | |
|     name: r[name[0].upper() + name[1:]],
 | |
|   })
 | |
| 
 | |
| 
 | |
| def get_strategy_for_events(event_types, finite=False):
 | |
|   # TODO: generate automatically based on capnp definitions
 | |
|   def floats(**kwargs):
 | |
|     allow_nan = False if finite else None
 | |
|     allow_infinity = False if finite else None
 | |
|     return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity)
 | |
| 
 | |
|   r = {}
 | |
|   r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({
 | |
|     'value': st.lists(floats(), min_size=3, max_size=3),
 | |
|     'std': st.lists(floats(), min_size=3, max_size=3),
 | |
|     'valid': st.just(True),
 | |
|   })
 | |
|   r['LiveLocationKalman'] = st.fixed_dictionaries({
 | |
|     'angularVelocityCalibrated': r['liveLocationKalman.Measurement'],
 | |
|     'inputsOK': st.booleans(),
 | |
|     'posenetOK': st.booleans(),
 | |
|   })
 | |
|   r['CarState'] = st.fixed_dictionaries({
 | |
|     'vEgo': floats(width=32),
 | |
|     'vEgoRaw': floats(width=32),
 | |
|     'steeringPressed': st.booleans(),
 | |
|     'steeringAngleDeg': floats(width=32),
 | |
|   })
 | |
|   r['CameraOdometry'] = st.fixed_dictionaries({
 | |
|     'frameId': st.integers(min_value=0, max_value=2**32 - 1),
 | |
|     'timestampEof': st.integers(min_value=0, max_value=2**64 - 1),
 | |
|     'trans': st.lists(floats(width=32), min_size=3, max_size=3),
 | |
|     'rot': st.lists(floats(width=32), min_size=3, max_size=3),
 | |
|     'transStd': st.lists(floats(width=32), min_size=3, max_size=3),
 | |
|     'rotStd': st.lists(floats(width=32), min_size=3, max_size=3),
 | |
|   })
 | |
|   r['SensorEventData.SensorVec'] = st.fixed_dictionaries({
 | |
|     'v': st.lists(floats(width=32), min_size=3, max_size=3),
 | |
|     'status': st.just(1),
 | |
|   })
 | |
|   r['SensorEventData_gyro'] = st.fixed_dictionaries({
 | |
|     'version': st.just(1),
 | |
|     'sensor': st.just(5),
 | |
|     'type': st.just(16),
 | |
|     'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
 | |
|     'source': st.just(8),  # BMX055
 | |
|     'gyroUncalibrated': r['SensorEventData.SensorVec'],
 | |
|   })
 | |
|   r['SensorEventData_accel'] = st.fixed_dictionaries({
 | |
|     'version': st.just(1),
 | |
|     'sensor': st.just(1),
 | |
|     'type': st.just(1),
 | |
|     'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
 | |
|     'source': st.just(8),  # BMX055
 | |
|     'acceleration': r['SensorEventData.SensorVec'],
 | |
|   })
 | |
|   r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1)
 | |
|   r['GpsLocationExternal'] = st.fixed_dictionaries({
 | |
|     'flags': st.just(1),
 | |
|     'latitude': floats(),
 | |
|     'longitude': floats(),
 | |
|     'altitude': floats(),
 | |
|     'speed': floats(width=32),
 | |
|     'bearingDeg': floats(width=32),
 | |
|     'accuracy': floats(width=32),
 | |
|     'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
 | |
|     'source': st.just(6),  # Ublox
 | |
|     'vNED': st.lists(floats(width=32), min_size=3, max_size=3),
 | |
|     'verticalAccuracy': floats(width=32),
 | |
|     'bearingAccuracyDeg': floats(width=32),
 | |
|     'speedAccuracy': floats(width=32),
 | |
|   })
 | |
|   r['LiveCalibration'] = st.fixed_dictionaries({
 | |
|     'calStatus': st.integers(min_value=0, max_value=1),
 | |
|     'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3),
 | |
|   })
 | |
| 
 | |
|   return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types]))
 | |
| 
 | |
| 
 | |
| def get_strategy_for_process(process, finite=False):
 | |
|   return get_strategy_for_events(get_process_config(process).pub_sub.keys(), finite)
 | |
| 
 | |
| 
 | |
| def convert_to_lr(msgs):
 | |
|   return [log.Event.new_message(**m).as_reader() for m in msgs]
 | |
| 
 | |
| 
 | |
| def is_finite(d, exclude=[], prefix=""):  # pylint: disable=dangerous-default-value
 | |
|   ret = True
 | |
|   for k, v in d.items():
 | |
|     name = prefix + f"{k}"
 | |
|     if name in exclude:
 | |
|       continue
 | |
| 
 | |
|     if isinstance(v, dict):
 | |
|       if not is_finite(v, exclude, name + "."):
 | |
|         ret = False
 | |
|     else:
 | |
|       try:
 | |
|         if not np.isfinite(v).all():
 | |
|           note((name, v))
 | |
|           ret = False
 | |
|       except TypeError:
 | |
|         pass
 | |
| 
 | |
|   return ret
 | |
| 
 | |
| 
 | |
| def test_process(dat, name):
 | |
|   cfg = get_process_config(name)
 | |
|   lr = convert_to_lr(dat)
 | |
|   pr.TIMEOUT = 0.1
 | |
|   return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2)
 | |
| 
 | |
| 
 | |
| class TestFuzzy(unittest.TestCase):
 | |
|   @given(get_strategy_for_process('paramsd'))
 | |
|   @settings(deadline=1000)
 | |
|   def test_paramsd(self, dat):
 | |
|     for r in test_process(dat, 'paramsd'):
 | |
|       d = r.liveParameters.to_dict()
 | |
|       assert is_finite(d)
 | |
| 
 | |
|   @given(get_strategy_for_process('locationd', finite=True))
 | |
|   @settings(deadline=1000)
 | |
|   def test_locationd(self, dat):
 | |
|     exclude = [
 | |
|       'positionGeodetic.std',
 | |
|       'velocityNED.std',
 | |
|       'orientationNED.std',
 | |
|       'calibratedOrientationECEF.std',
 | |
|     ]
 | |
|     for r in test_process(dat, 'locationd'):
 | |
|       d = r.liveLocationKalman.to_dict()
 | |
|       assert is_finite(d, exclude)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   procs = {
 | |
|     'locationd': TestFuzzy().test_locationd,
 | |
|     'paramsd': TestFuzzy().test_paramsd,
 | |
|   }
 | |
| 
 | |
|   if len(sys.argv) != 2:
 | |
|     print("Usage: ./test_fuzzy.py <process name>")
 | |
|     sys.exit(0)
 | |
| 
 | |
|   proc = sys.argv[1]
 | |
|   if proc not in procs:
 | |
|     print(f"{proc} not available")
 | |
|     sys.exit(0)
 | |
|   else:
 | |
|     procs[proc]()
 | |
| 
 |