diff --git a/.github/workflows/selfdrive_tests.yaml b/.github/workflows/selfdrive_tests.yaml index ca80d3bd5d..268a3f3bcd 100644 --- a/.github/workflows/selfdrive_tests.yaml +++ b/.github/workflows/selfdrive_tests.yaml @@ -254,7 +254,6 @@ jobs: ./tools/replay/tests/test_replay && \ ./tools/cabana/tests/test_cabana && \ ./system/camerad/test/ae_gray_test && \ - ./selfdrive/test/process_replay/test_fuzzy.py && \ coverage xml" - name: "Upload coverage to Codecov" uses: codecov/codecov-action@v2 diff --git a/selfdrive/car/tests/test_car_interfaces.py b/selfdrive/car/tests/test_car_interfaces.py index 21c7af3063..918d2cf873 100755 --- a/selfdrive/car/tests/test_car_interfaces.py +++ b/selfdrive/car/tests/test_car_interfaces.py @@ -9,14 +9,14 @@ from cereal import car from selfdrive.car import gen_empty_fingerprint from selfdrive.car.car_helpers import interfaces from selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS, all_known_cars -from selfdrive.test.fuzzy_generation import FuzzyGenerator +from selfdrive.test.fuzzy_generation import get_random_msg class TestCarInterfaces(unittest.TestCase): @parameterized.expand([(car,) for car in all_known_cars()]) @settings(max_examples=5) - @given(cc_msg=FuzzyGenerator.get_random_msg(car.CarControl, real_floats=True)) + @given(cc_msg=get_random_msg(car.CarControl, real_floats=True)) def test_car_interfaces(self, car_name, cc_msg): if car_name in FINGERPRINTS: fingerprint = FINGERPRINTS[car_name][0] diff --git a/selfdrive/test/fuzzy_generation.py b/selfdrive/test/fuzzy_generation.py index 40b56593d2..3dc43e7347 100644 --- a/selfdrive/test/fuzzy_generation.py +++ b/selfdrive/test/fuzzy_generation.py @@ -1,8 +1,6 @@ import hypothesis.strategies as st import random -from cereal import log - class FuzzyGenerator: def __init__(self, real_floats): self.real_floats=real_floats @@ -62,16 +60,10 @@ class FuzzyGenerator: else: return self.generate_struct(field.schema) - def generate_struct(self, schema, required=None): + def generate_struct(self, schema): full_fill = list(schema.non_union_fields) if schema.non_union_fields else [] - single_fill = [required] if required else [random.choice(schema.union_fields)] if schema.union_fields else [] + single_fill = [random.choice(schema.union_fields)] if schema.union_fields else [] return st.fixed_dictionaries(dict((field, self.generate_field(schema.fields[field])) for field in full_fill + single_fill)) - @classmethod - def get_random_msg(cls, struct, real_floats=False): - return cls(real_floats=real_floats).generate_struct(struct.schema) - - @classmethod - def get_random_event_msg(cls, required, real_floats=False): - fg = cls(real_floats=real_floats) - return st.tuples(*[fg.generate_struct(log.Event.schema, r) for r in required]) +def get_random_msg(struct, real_floats=False): + return FuzzyGenerator(real_floats=real_floats).generate_struct(struct.schema) diff --git a/selfdrive/test/process_replay/test_fuzzy.py b/selfdrive/test/process_replay/test_fuzzy.py index 768acb5feb..12f5fca37d 100755 --- a/selfdrive/test/process_replay/test_fuzzy.py +++ b/selfdrive/test/process_replay/test_fuzzy.py @@ -1,30 +1,174 @@ #!/usr/bin/env python3 -from hypothesis import given, HealthCheck, Phase, settings -import hypothesis.strategies as st -from parameterized import parameterized +import sys import unittest +import hypothesis.strategies as st +import numpy as np +from hypothesis import given, settings, note + from cereal import log from selfdrive.car.toyota.values import CAR as TOYOTA -from selfdrive.test.fuzzy_generation import FuzzyGenerator import selfdrive.test.process_replay.process_replay as pr -# These processes currently fail because of unrealistic data breaking assumptions -# that openpilot makes causing error with NaN, inf, int size, array indexing ... -# TODO: Make each one testable -NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad'] -TEST_CASES = [(cfg.proc_name, cfg) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED] -class TestFuzzProcesses(unittest.TestCase): +def get_process_config(process): + return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0] + + +def get_event_union_strategy(r, name): + return st.fixed_dictionaries({ + 'valid': st.just(True), + 'logMonoTime': st.integers(min_value=0, max_value=2**64-1), + name: r[name[0].upper() + name[1:]], + }) + + +def get_strategy_for_events(event_types, finite=False): + # TODO: generate automatically based on capnp definitions + def floats(**kwargs): + allow_nan = False if finite else None + allow_infinity = False if finite else None + return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity) + + r = {} + r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({ + 'value': st.lists(floats(), min_size=3, max_size=3), + 'std': st.lists(floats(), min_size=3, max_size=3), + 'valid': st.just(True), + }) + r['LiveLocationKalman'] = st.fixed_dictionaries({ + 'angularVelocityCalibrated': r['liveLocationKalman.Measurement'], + 'inputsOK': st.booleans(), + 'posenetOK': st.booleans(), + }) + r['CarState'] = st.fixed_dictionaries({ + 'vEgo': floats(width=32), + 'vEgoRaw': floats(width=32), + 'steeringPressed': st.booleans(), + 'steeringAngleDeg': floats(width=32), + }) + r['CameraOdometry'] = st.fixed_dictionaries({ + 'frameId': st.integers(min_value=0, max_value=2**32 - 1), + 'timestampEof': st.integers(min_value=0, max_value=2**64 - 1), + 'trans': st.lists(floats(width=32), min_size=3, max_size=3), + 'rot': st.lists(floats(width=32), min_size=3, max_size=3), + 'transStd': st.lists(floats(width=32), min_size=3, max_size=3), + 'rotStd': st.lists(floats(width=32), min_size=3, max_size=3), + }) + r['SensorEventData.SensorVec'] = st.fixed_dictionaries({ + 'v': st.lists(floats(width=32), min_size=3, max_size=3), + 'status': st.just(1), + }) + r['SensorEventData_gyro'] = st.fixed_dictionaries({ + 'version': st.just(1), + 'sensor': st.just(5), + 'type': st.just(16), + 'timestamp': st.integers(min_value=0, max_value=2**63 - 1), + 'source': st.just(8), # BMX055 + 'gyroUncalibrated': r['SensorEventData.SensorVec'], + }) + r['SensorEventData_accel'] = st.fixed_dictionaries({ + 'version': st.just(1), + 'sensor': st.just(1), + 'type': st.just(1), + 'timestamp': st.integers(min_value=0, max_value=2**63 - 1), + 'source': st.just(8), # BMX055 + 'acceleration': r['SensorEventData.SensorVec'], + }) + r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1) + r['GpsLocationExternal'] = st.fixed_dictionaries({ + 'flags': st.just(1), + 'latitude': floats(), + 'longitude': floats(), + 'altitude': floats(), + 'speed': floats(width=32), + 'bearingDeg': floats(width=32), + 'accuracy': floats(width=32), + 'timestamp': st.integers(min_value=0, max_value=2**63 - 1), + 'source': st.just(6), # Ublox + 'vNED': st.lists(floats(width=32), min_size=3, max_size=3), + 'verticalAccuracy': floats(width=32), + 'bearingAccuracyDeg': floats(width=32), + 'speedAccuracy': floats(width=32), + }) + r['LiveCalibration'] = st.fixed_dictionaries({ + 'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3), + }) + + return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types])) + + +def get_strategy_for_process(process, finite=False): + return get_strategy_for_events(get_process_config(process).pubs, finite) + + +def convert_to_lr(msgs): + return [log.Event.new_message(**m).as_reader() for m in msgs] + + +def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value + ret = True + for k, v in d.items(): + name = prefix + f"{k}" + if name in exclude: + continue + + if isinstance(v, dict): + if not is_finite(v, exclude, name + "."): + ret = False + else: + try: + if not np.isfinite(v).all(): + note((name, v)) + ret = False + except TypeError: + pass + + return ret + + +def test_process(dat, name): + cfg = get_process_config(name) + lr = convert_to_lr(dat) + pr.TIMEOUT = 0.1 + return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2) + + +class TestFuzzy(unittest.TestCase): + @given(get_strategy_for_process('paramsd')) + @settings(deadline=1000) + def test_paramsd(self, dat): + for r in test_process(dat, 'paramsd'): + d = r.liveParameters.to_dict() + assert is_finite(d) + + @given(get_strategy_for_process('locationd', finite=True)) + @settings(deadline=1000) + def test_locationd(self, dat): + exclude = [ + 'positionGeodetic.std', + 'velocityNED.std', + 'orientationNED.std', + 'calibratedOrientationECEF.std', + ] + for r in test_process(dat, 'locationd'): + d = r.liveLocationKalman.to_dict() + assert is_finite(d, exclude) - @parameterized.expand(TEST_CASES) - @given(st.data()) - @settings(phases=[Phase.generate, Phase.target], max_examples=50, deadline=1000, suppress_health_check=[HealthCheck.too_slow, HealthCheck.data_too_large]) - def test_fuzz_process(self, proc_name, cfg, data): - msgs = data.draw(FuzzyGenerator.get_random_event_msg(required=cfg.pubs, real_floats=True)) - lr = [log.Event.new_message(**m).as_reader() for m in msgs] - cfg.timeout = 5 - pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2, disable_progress=True) if __name__ == "__main__": - unittest.main() + procs = { + 'locationd': TestFuzzy().test_locationd, + 'paramsd': TestFuzzy().test_paramsd, + } + + if len(sys.argv) != 2: + print("Usage: ./test_fuzzy.py ") + sys.exit(0) + + proc = sys.argv[1] + if proc not in procs: + print(f"{proc} not available") + sys.exit(0) + else: + procs[proc]()