#!/usr/bin/env python3 import sys import unittest import hypothesis.strategies as st import numpy as np from hypothesis import given, settings, note from cereal import log from selfdrive.car.toyota.values import CAR as TOYOTA import selfdrive.test.process_replay.process_replay as pr def get_process_config(process): return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0] def get_event_union_strategy(r, name): return st.fixed_dictionaries({ 'valid': st.just(True), 'logMonoTime': st.integers(min_value=0, max_value=2**64-1), name: r[name[0].upper() + name[1:]], }) def get_strategy_for_events(event_types, finite=False): # TODO: generate automatically based on capnp definitions def floats(**kwargs): allow_nan = False if finite else None allow_infinity = False if finite else None return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity) r = {} r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({ 'value': st.lists(floats(), min_size=3, max_size=3), 'std': st.lists(floats(), min_size=3, max_size=3), 'valid': st.just(True), }) r['LiveLocationKalman'] = st.fixed_dictionaries({ 'angularVelocityCalibrated': r['liveLocationKalman.Measurement'], 'inputsOK': st.booleans(), 'posenetOK': st.booleans(), }) r['CarState'] = st.fixed_dictionaries({ 'vEgo': floats(width=32), 'vEgoRaw': floats(width=32), 'steeringPressed': st.booleans(), 'steeringAngleDeg': floats(width=32), }) r['CameraOdometry'] = st.fixed_dictionaries({ 'frameId': st.integers(min_value=0, max_value=2**32 - 1), 'timestampEof': st.integers(min_value=0, max_value=2**64 - 1), 'trans': st.lists(floats(width=32), min_size=3, max_size=3), 'rot': st.lists(floats(width=32), min_size=3, max_size=3), 'transStd': st.lists(floats(width=32), min_size=3, max_size=3), 'rotStd': st.lists(floats(width=32), min_size=3, max_size=3), }) r['SensorEventData.SensorVec'] = st.fixed_dictionaries({ 'v': st.lists(floats(width=32), min_size=3, max_size=3), 'status': st.just(1), }) r['SensorEventData_gyro'] = st.fixed_dictionaries({ 'version': st.just(1), 'sensor': st.just(5), 'type': st.just(16), 'timestamp': st.integers(min_value=0, max_value=2**63 - 1), 'source': st.just(8), # BMX055 'gyroUncalibrated': r['SensorEventData.SensorVec'], }) r['SensorEventData_accel'] = st.fixed_dictionaries({ 'version': st.just(1), 'sensor': st.just(1), 'type': st.just(1), 'timestamp': st.integers(min_value=0, max_value=2**63 - 1), 'source': st.just(8), # BMX055 'acceleration': r['SensorEventData.SensorVec'], }) r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1) r['GpsLocationExternal'] = st.fixed_dictionaries({ 'flags': st.just(1), 'latitude': floats(), 'longitude': floats(), 'altitude': floats(), 'speed': floats(width=32), 'bearingDeg': floats(width=32), 'accuracy': floats(width=32), 'timestamp': st.integers(min_value=0, max_value=2**63 - 1), 'source': st.just(6), # Ublox 'vNED': st.lists(floats(width=32), min_size=3, max_size=3), 'verticalAccuracy': floats(width=32), 'bearingAccuracyDeg': floats(width=32), 'speedAccuracy': floats(width=32), }) r['LiveCalibration'] = st.fixed_dictionaries({ 'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3), }) return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types])) def get_strategy_for_process(process, finite=False): return get_strategy_for_events(get_process_config(process).pubs, finite) def convert_to_lr(msgs): return [log.Event.new_message(**m).as_reader() for m in msgs] def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value ret = True for k, v in d.items(): name = prefix + f"{k}" if name in exclude: continue if isinstance(v, dict): if not is_finite(v, exclude, name + "."): ret = False else: try: if not np.isfinite(v).all(): note((name, v)) ret = False except TypeError: pass return ret def test_process(dat, name): cfg = get_process_config(name) lr = convert_to_lr(dat) pr.TIMEOUT = 0.1 return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2) class TestFuzzy(unittest.TestCase): @given(get_strategy_for_process('paramsd')) @settings(deadline=1000) def test_paramsd(self, dat): for r in test_process(dat, 'paramsd'): d = r.liveParameters.to_dict() assert is_finite(d) @given(get_strategy_for_process('locationd', finite=True)) @settings(deadline=1000) def test_locationd(self, dat): exclude = [ 'positionGeodetic.std', 'velocityNED.std', 'orientationNED.std', 'calibratedOrientationECEF.std', ] for r in test_process(dat, 'locationd'): d = r.liveLocationKalman.to_dict() assert is_finite(d, exclude) if __name__ == "__main__": procs = { 'locationd': TestFuzzy().test_locationd, 'paramsd': TestFuzzy().test_paramsd, } if len(sys.argv) != 2: print("Usage: ./test_fuzzy.py ") sys.exit(0) proc = sys.argv[1] if proc not in procs: print(f"{proc} not available") sys.exit(0) else: procs[proc]()