simple fuzzing test for all processes (#28661)

* Revert "Revert "simple fuzzing test for all processes (#28584)""

This reverts commit a5be0640a3780156fdd5c04c1e6204fa41b307a8.

* determinism

* fingerprint
old-commit-hash: 0ebec253d0
beeps
Maxime Desroches 2 years ago committed by GitHub
parent 27b53b914b
commit 1693786962
  1. 1
      .github/workflows/selfdrive_tests.yaml
  2. 8
      selfdrive/car/tests/test_car_interfaces.py
  3. 21
      selfdrive/test/fuzzy_generation.py
  4. 182
      selfdrive/test/process_replay/test_fuzzy.py

@ -254,6 +254,7 @@ jobs:
./tools/replay/tests/test_replay && \ ./tools/replay/tests/test_replay && \
./tools/cabana/tests/test_cabana && \ ./tools/cabana/tests/test_cabana && \
./system/camerad/test/ae_gray_test && \ ./system/camerad/test/ae_gray_test && \
./selfdrive/test/process_replay/test_fuzzy.py && \
coverage xml" coverage xml"
- name: "Upload coverage to Codecov" - name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v2 uses: codecov/codecov-action@v2

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import math import math
import unittest import unittest
import hypothesis.strategies as st
from hypothesis import given, settings from hypothesis import given, settings
import importlib import importlib
from parameterized import parameterized from parameterized import parameterized
@ -9,15 +10,15 @@ from cereal import car
from selfdrive.car import gen_empty_fingerprint from selfdrive.car import gen_empty_fingerprint
from selfdrive.car.car_helpers import interfaces from selfdrive.car.car_helpers import interfaces
from selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS, all_known_cars from selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS, all_known_cars
from selfdrive.test.fuzzy_generation import get_random_msg from selfdrive.test.fuzzy_generation import FuzzyGenerator
class TestCarInterfaces(unittest.TestCase): class TestCarInterfaces(unittest.TestCase):
@parameterized.expand([(car,) for car in all_known_cars()]) @parameterized.expand([(car,) for car in all_known_cars()])
@settings(max_examples=5) @settings(max_examples=5)
@given(cc_msg=get_random_msg(car.CarControl, real_floats=True)) @given(data=st.data())
def test_car_interfaces(self, car_name, cc_msg): def test_car_interfaces(self, car_name, data):
if car_name in FINGERPRINTS: if car_name in FINGERPRINTS:
fingerprint = FINGERPRINTS[car_name][0] fingerprint = FINGERPRINTS[car_name][0]
else: else:
@ -60,6 +61,7 @@ class TestCarInterfaces(unittest.TestCase):
elif tune.which() == 'indi': elif tune.which() == 'indi':
self.assertTrue(len(tune.indi.outerLoopGainV)) self.assertTrue(len(tune.indi.outerLoopGainV))
cc_msg=FuzzyGenerator.get_random_msg(data.draw, car.CarControl, real_floats=True)
# Run car interface # Run car interface
CC = car.CarControl.new_message(**cc_msg) CC = car.CarControl.new_message(**cc_msg)
for _ in range(10): for _ in range(10):

@ -1,8 +1,10 @@
import hypothesis.strategies as st import hypothesis.strategies as st
import random
from cereal import log
class FuzzyGenerator: class FuzzyGenerator:
def __init__(self, real_floats): def __init__(self, draw, real_floats):
self.draw = draw
self.real_floats=real_floats self.real_floats=real_floats
def generate_native_type(self, field): def generate_native_type(self, field):
@ -60,10 +62,17 @@ class FuzzyGenerator:
else: else:
return self.generate_struct(field.schema) return self.generate_struct(field.schema)
def generate_struct(self, schema): def generate_struct(self, schema, event=None):
full_fill = list(schema.non_union_fields) if schema.non_union_fields else [] full_fill = list(schema.non_union_fields) if schema.non_union_fields else []
single_fill = [random.choice(schema.union_fields)] if schema.union_fields else [] single_fill = [event] if event else [self.draw(st.sampled_from(schema.union_fields))] if schema.union_fields else []
return st.fixed_dictionaries(dict((field, self.generate_field(schema.fields[field])) for field in full_fill + single_fill)) return st.fixed_dictionaries(dict((field, self.generate_field(schema.fields[field])) for field in full_fill + single_fill))
def get_random_msg(struct, real_floats=False): @classmethod
return FuzzyGenerator(real_floats=real_floats).generate_struct(struct.schema) def get_random_msg(cls, draw, struct, real_floats=False):
fg = cls(draw, real_floats=real_floats)
return draw(fg.generate_struct(struct.schema))
@classmethod
def get_random_event_msg(cls, draw, events, real_floats=False):
fg = cls(draw, real_floats=real_floats)
return [draw(fg.generate_struct(log.Event.schema, e)) for e in sorted(events)]

@ -1,174 +1,30 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import sys from hypothesis import given, HealthCheck, Phase, settings
import unittest
import hypothesis.strategies as st import hypothesis.strategies as st
import numpy as np from parameterized import parameterized
from hypothesis import given, settings, note import unittest
from cereal import log from cereal import log
from selfdrive.car.toyota.values import CAR as TOYOTA from selfdrive.car.toyota.values import CAR as TOYOTA
from selfdrive.test.fuzzy_generation import FuzzyGenerator
import selfdrive.test.process_replay.process_replay as pr import selfdrive.test.process_replay.process_replay as pr
# These processes currently fail because of unrealistic data breaking assumptions
# that openpilot makes causing error with NaN, inf, int size, array indexing ...
# TODO: Make each one testable
NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad', 'dmonitoringmodeld', 'modeld']
TEST_CASES = [(cfg.proc_name, cfg) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED]
def get_process_config(process): class TestFuzzProcesses(unittest.TestCase):
return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0]
def get_event_union_strategy(r, name):
return st.fixed_dictionaries({
'valid': st.just(True),
'logMonoTime': st.integers(min_value=0, max_value=2**64-1),
name: r[name[0].upper() + name[1:]],
})
def get_strategy_for_events(event_types, finite=False):
# TODO: generate automatically based on capnp definitions
def floats(**kwargs):
allow_nan = False if finite else None
allow_infinity = False if finite else None
return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity)
r = {}
r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({
'value': st.lists(floats(), min_size=3, max_size=3),
'std': st.lists(floats(), min_size=3, max_size=3),
'valid': st.just(True),
})
r['LiveLocationKalman'] = st.fixed_dictionaries({
'angularVelocityCalibrated': r['liveLocationKalman.Measurement'],
'inputsOK': st.booleans(),
'posenetOK': st.booleans(),
})
r['CarState'] = st.fixed_dictionaries({
'vEgo': floats(width=32),
'vEgoRaw': floats(width=32),
'steeringPressed': st.booleans(),
'steeringAngleDeg': floats(width=32),
})
r['CameraOdometry'] = st.fixed_dictionaries({
'frameId': st.integers(min_value=0, max_value=2**32 - 1),
'timestampEof': st.integers(min_value=0, max_value=2**64 - 1),
'trans': st.lists(floats(width=32), min_size=3, max_size=3),
'rot': st.lists(floats(width=32), min_size=3, max_size=3),
'transStd': st.lists(floats(width=32), min_size=3, max_size=3),
'rotStd': st.lists(floats(width=32), min_size=3, max_size=3),
})
r['SensorEventData.SensorVec'] = st.fixed_dictionaries({
'v': st.lists(floats(width=32), min_size=3, max_size=3),
'status': st.just(1),
})
r['SensorEventData_gyro'] = st.fixed_dictionaries({
'version': st.just(1),
'sensor': st.just(5),
'type': st.just(16),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(8), # BMX055
'gyroUncalibrated': r['SensorEventData.SensorVec'],
})
r['SensorEventData_accel'] = st.fixed_dictionaries({
'version': st.just(1),
'sensor': st.just(1),
'type': st.just(1),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(8), # BMX055
'acceleration': r['SensorEventData.SensorVec'],
})
r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1)
r['GpsLocationExternal'] = st.fixed_dictionaries({
'flags': st.just(1),
'latitude': floats(),
'longitude': floats(),
'altitude': floats(),
'speed': floats(width=32),
'bearingDeg': floats(width=32),
'accuracy': floats(width=32),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(6), # Ublox
'vNED': st.lists(floats(width=32), min_size=3, max_size=3),
'verticalAccuracy': floats(width=32),
'bearingAccuracyDeg': floats(width=32),
'speedAccuracy': floats(width=32),
})
r['LiveCalibration'] = st.fixed_dictionaries({
'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3),
})
return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types]))
def get_strategy_for_process(process, finite=False):
return get_strategy_for_events(get_process_config(process).pubs, finite)
def convert_to_lr(msgs):
return [log.Event.new_message(**m).as_reader() for m in msgs]
def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value
ret = True
for k, v in d.items():
name = prefix + f"{k}"
if name in exclude:
continue
if isinstance(v, dict):
if not is_finite(v, exclude, name + "."):
ret = False
else:
try:
if not np.isfinite(v).all():
note((name, v))
ret = False
except TypeError:
pass
return ret
def test_process(dat, name):
cfg = get_process_config(name)
lr = convert_to_lr(dat)
pr.TIMEOUT = 0.1
return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2)
class TestFuzzy(unittest.TestCase):
@given(get_strategy_for_process('paramsd'))
@settings(deadline=1000)
def test_paramsd(self, dat):
for r in test_process(dat, 'paramsd'):
d = r.liveParameters.to_dict()
assert is_finite(d)
@given(get_strategy_for_process('locationd', finite=True))
@settings(deadline=1000)
def test_locationd(self, dat):
exclude = [
'positionGeodetic.std',
'velocityNED.std',
'orientationNED.std',
'calibratedOrientationECEF.std',
]
for r in test_process(dat, 'locationd'):
d = r.liveLocationKalman.to_dict()
assert is_finite(d, exclude)
@parameterized.expand(TEST_CASES)
@given(st.data())
@settings(phases=[Phase.generate, Phase.target], max_examples=50, deadline=1000, suppress_health_check=[HealthCheck.too_slow, HealthCheck.data_too_large])
def test_fuzz_process(self, proc_name, cfg, data):
msgs = FuzzyGenerator.get_random_event_msg(data.draw, events=cfg.pubs, real_floats=True)
lr = [log.Event.new_message(**m).as_reader() for m in msgs]
cfg.timeout = 5
pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2, TOYOTA.COROLLA_TSS2, disable_progress=True)
if __name__ == "__main__": if __name__ == "__main__":
procs = { unittest.main()
'locationd': TestFuzzy().test_locationd,
'paramsd': TestFuzzy().test_paramsd,
}
if len(sys.argv) != 2:
print("Usage: ./test_fuzzy.py <process name>")
sys.exit(0)
proc = sys.argv[1]
if proc not in procs:
print(f"{proc} not available")
sys.exit(0)
else:
procs[proc]()

Loading…
Cancel
Save