Revert "simple fuzzing test for all processes (#28584)"
This reverts commit bac193bdd5
.
pull/28645/head
parent
16e3d4f69f
commit
7783dc602c
4 changed files with 169 additions and 34 deletions
@ -1,30 +1,174 @@ |
||||
#!/usr/bin/env python3 |
||||
from hypothesis import given, HealthCheck, Phase, settings |
||||
import hypothesis.strategies as st |
||||
from parameterized import parameterized |
||||
import sys |
||||
import unittest |
||||
|
||||
import hypothesis.strategies as st |
||||
import numpy as np |
||||
from hypothesis import given, settings, note |
||||
|
||||
from cereal import log |
||||
from selfdrive.car.toyota.values import CAR as TOYOTA |
||||
from selfdrive.test.fuzzy_generation import FuzzyGenerator |
||||
import selfdrive.test.process_replay.process_replay as pr |
||||
|
||||
# These processes currently fail because of unrealistic data breaking assumptions |
||||
# that openpilot makes causing error with NaN, inf, int size, array indexing ... |
||||
# TODO: Make each one testable |
||||
NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad'] |
||||
TEST_CASES = [(cfg.proc_name, cfg) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED] |
||||
|
||||
class TestFuzzProcesses(unittest.TestCase): |
||||
def get_process_config(process): |
||||
return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0] |
||||
|
||||
|
||||
def get_event_union_strategy(r, name): |
||||
return st.fixed_dictionaries({ |
||||
'valid': st.just(True), |
||||
'logMonoTime': st.integers(min_value=0, max_value=2**64-1), |
||||
name: r[name[0].upper() + name[1:]], |
||||
}) |
||||
|
||||
|
||||
def get_strategy_for_events(event_types, finite=False): |
||||
# TODO: generate automatically based on capnp definitions |
||||
def floats(**kwargs): |
||||
allow_nan = False if finite else None |
||||
allow_infinity = False if finite else None |
||||
return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity) |
||||
|
||||
r = {} |
||||
r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({ |
||||
'value': st.lists(floats(), min_size=3, max_size=3), |
||||
'std': st.lists(floats(), min_size=3, max_size=3), |
||||
'valid': st.just(True), |
||||
}) |
||||
r['LiveLocationKalman'] = st.fixed_dictionaries({ |
||||
'angularVelocityCalibrated': r['liveLocationKalman.Measurement'], |
||||
'inputsOK': st.booleans(), |
||||
'posenetOK': st.booleans(), |
||||
}) |
||||
r['CarState'] = st.fixed_dictionaries({ |
||||
'vEgo': floats(width=32), |
||||
'vEgoRaw': floats(width=32), |
||||
'steeringPressed': st.booleans(), |
||||
'steeringAngleDeg': floats(width=32), |
||||
}) |
||||
r['CameraOdometry'] = st.fixed_dictionaries({ |
||||
'frameId': st.integers(min_value=0, max_value=2**32 - 1), |
||||
'timestampEof': st.integers(min_value=0, max_value=2**64 - 1), |
||||
'trans': st.lists(floats(width=32), min_size=3, max_size=3), |
||||
'rot': st.lists(floats(width=32), min_size=3, max_size=3), |
||||
'transStd': st.lists(floats(width=32), min_size=3, max_size=3), |
||||
'rotStd': st.lists(floats(width=32), min_size=3, max_size=3), |
||||
}) |
||||
r['SensorEventData.SensorVec'] = st.fixed_dictionaries({ |
||||
'v': st.lists(floats(width=32), min_size=3, max_size=3), |
||||
'status': st.just(1), |
||||
}) |
||||
r['SensorEventData_gyro'] = st.fixed_dictionaries({ |
||||
'version': st.just(1), |
||||
'sensor': st.just(5), |
||||
'type': st.just(16), |
||||
'timestamp': st.integers(min_value=0, max_value=2**63 - 1), |
||||
'source': st.just(8), # BMX055 |
||||
'gyroUncalibrated': r['SensorEventData.SensorVec'], |
||||
}) |
||||
r['SensorEventData_accel'] = st.fixed_dictionaries({ |
||||
'version': st.just(1), |
||||
'sensor': st.just(1), |
||||
'type': st.just(1), |
||||
'timestamp': st.integers(min_value=0, max_value=2**63 - 1), |
||||
'source': st.just(8), # BMX055 |
||||
'acceleration': r['SensorEventData.SensorVec'], |
||||
}) |
||||
r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1) |
||||
r['GpsLocationExternal'] = st.fixed_dictionaries({ |
||||
'flags': st.just(1), |
||||
'latitude': floats(), |
||||
'longitude': floats(), |
||||
'altitude': floats(), |
||||
'speed': floats(width=32), |
||||
'bearingDeg': floats(width=32), |
||||
'accuracy': floats(width=32), |
||||
'timestamp': st.integers(min_value=0, max_value=2**63 - 1), |
||||
'source': st.just(6), # Ublox |
||||
'vNED': st.lists(floats(width=32), min_size=3, max_size=3), |
||||
'verticalAccuracy': floats(width=32), |
||||
'bearingAccuracyDeg': floats(width=32), |
||||
'speedAccuracy': floats(width=32), |
||||
}) |
||||
r['LiveCalibration'] = st.fixed_dictionaries({ |
||||
'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3), |
||||
}) |
||||
|
||||
return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types])) |
||||
|
||||
|
||||
def get_strategy_for_process(process, finite=False): |
||||
return get_strategy_for_events(get_process_config(process).pubs, finite) |
||||
|
||||
|
||||
def convert_to_lr(msgs): |
||||
return [log.Event.new_message(**m).as_reader() for m in msgs] |
||||
|
||||
|
||||
def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value |
||||
ret = True |
||||
for k, v in d.items(): |
||||
name = prefix + f"{k}" |
||||
if name in exclude: |
||||
continue |
||||
|
||||
if isinstance(v, dict): |
||||
if not is_finite(v, exclude, name + "."): |
||||
ret = False |
||||
else: |
||||
try: |
||||
if not np.isfinite(v).all(): |
||||
note((name, v)) |
||||
ret = False |
||||
except TypeError: |
||||
pass |
||||
|
||||
return ret |
||||
|
||||
|
||||
def test_process(dat, name): |
||||
cfg = get_process_config(name) |
||||
lr = convert_to_lr(dat) |
||||
pr.TIMEOUT = 0.1 |
||||
return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2) |
||||
|
||||
|
||||
class TestFuzzy(unittest.TestCase): |
||||
@given(get_strategy_for_process('paramsd')) |
||||
@settings(deadline=1000) |
||||
def test_paramsd(self, dat): |
||||
for r in test_process(dat, 'paramsd'): |
||||
d = r.liveParameters.to_dict() |
||||
assert is_finite(d) |
||||
|
||||
@given(get_strategy_for_process('locationd', finite=True)) |
||||
@settings(deadline=1000) |
||||
def test_locationd(self, dat): |
||||
exclude = [ |
||||
'positionGeodetic.std', |
||||
'velocityNED.std', |
||||
'orientationNED.std', |
||||
'calibratedOrientationECEF.std', |
||||
] |
||||
for r in test_process(dat, 'locationd'): |
||||
d = r.liveLocationKalman.to_dict() |
||||
assert is_finite(d, exclude) |
||||
|
||||
@parameterized.expand(TEST_CASES) |
||||
@given(st.data()) |
||||
@settings(phases=[Phase.generate, Phase.target], max_examples=50, deadline=1000, suppress_health_check=[HealthCheck.too_slow, HealthCheck.data_too_large]) |
||||
def test_fuzz_process(self, proc_name, cfg, data): |
||||
msgs = data.draw(FuzzyGenerator.get_random_event_msg(required=cfg.pubs, real_floats=True)) |
||||
lr = [log.Event.new_message(**m).as_reader() for m in msgs] |
||||
cfg.timeout = 5 |
||||
pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2, disable_progress=True) |
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
||||
procs = { |
||||
'locationd': TestFuzzy().test_locationd, |
||||
'paramsd': TestFuzzy().test_paramsd, |
||||
} |
||||
|
||||
if len(sys.argv) != 2: |
||||
print("Usage: ./test_fuzzy.py <process name>") |
||||
sys.exit(0) |
||||
|
||||
proc = sys.argv[1] |
||||
if proc not in procs: |
||||
print(f"{proc} not available") |
||||
sys.exit(0) |
||||
else: |
||||
procs[proc]() |
||||
|
Loading…
Reference in new issue