simple fuzzing test for all processes (#28584)

* working test

* classmethod

* review

* add to ci
pull/28607/head
Maxime Desroches 2 years ago committed by GitHub
parent 3c398b2e2f
commit bac193bdd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .github/workflows/selfdrive_tests.yaml
  2. 4
      selfdrive/car/tests/test_car_interfaces.py
  3. 16
      selfdrive/test/fuzzy_generation.py
  4. 182
      selfdrive/test/process_replay/test_fuzzy.py

@ -228,6 +228,7 @@ jobs:
./tools/replay/tests/test_replay && \
./tools/cabana/tests/test_cabana && \
./system/camerad/test/ae_gray_test && \
./selfdrive/test/process_replay/test_fuzzy.py && \
coverage xml"
- name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v2

@ -9,14 +9,14 @@ from cereal import car
from selfdrive.car import gen_empty_fingerprint
from selfdrive.car.car_helpers import interfaces
from selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS, all_known_cars
from selfdrive.test.fuzzy_generation import get_random_msg
from selfdrive.test.fuzzy_generation import FuzzyGenerator
class TestCarInterfaces(unittest.TestCase):
@parameterized.expand([(car,) for car in all_known_cars()])
@settings(max_examples=5)
@given(cc_msg=get_random_msg(car.CarControl, real_floats=True))
@given(cc_msg=FuzzyGenerator.get_random_msg(car.CarControl, real_floats=True))
def test_car_interfaces(self, car_name, cc_msg):
if car_name in FINGERPRINTS:
fingerprint = FINGERPRINTS[car_name][0]

@ -1,6 +1,8 @@
import hypothesis.strategies as st
import random
from cereal import log
class FuzzyGenerator:
def __init__(self, real_floats):
self.real_floats=real_floats
@ -60,10 +62,16 @@ class FuzzyGenerator:
else:
return self.generate_struct(field.schema)
def generate_struct(self, schema):
def generate_struct(self, schema, required=None):
full_fill = list(schema.non_union_fields) if schema.non_union_fields else []
single_fill = [random.choice(schema.union_fields)] if schema.union_fields else []
single_fill = [required] if required else [random.choice(schema.union_fields)] if schema.union_fields else []
return st.fixed_dictionaries(dict((field, self.generate_field(schema.fields[field])) for field in full_fill + single_fill))
def get_random_msg(struct, real_floats=False):
return FuzzyGenerator(real_floats=real_floats).generate_struct(struct.schema)
@classmethod
def get_random_msg(cls, struct, real_floats=False):
return cls(real_floats=real_floats).generate_struct(struct.schema)
@classmethod
def get_random_event_msg(cls, required, real_floats=False):
fg = cls(real_floats=real_floats)
return st.tuples(*[fg.generate_struct(log.Event.schema, r) for r in required])

@ -1,174 +1,30 @@
#!/usr/bin/env python3
import sys
import unittest
from hypothesis import given, HealthCheck, Phase, settings
import hypothesis.strategies as st
import numpy as np
from hypothesis import given, settings, note
from parameterized import parameterized
import unittest
from cereal import log
from selfdrive.car.toyota.values import CAR as TOYOTA
from selfdrive.test.fuzzy_generation import FuzzyGenerator
import selfdrive.test.process_replay.process_replay as pr
# These processes currently fail because of unrealistic data breaking assumptions
# that openpilot makes causing error with NaN, inf, int size, array indexing ...
# TODO: Make each one testable
NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad']
TEST_CASES = [(cfg.proc_name, cfg) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED]
def get_process_config(process):
return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0]
def get_event_union_strategy(r, name):
return st.fixed_dictionaries({
'valid': st.just(True),
'logMonoTime': st.integers(min_value=0, max_value=2**64-1),
name: r[name[0].upper() + name[1:]],
})
def get_strategy_for_events(event_types, finite=False):
# TODO: generate automatically based on capnp definitions
def floats(**kwargs):
allow_nan = False if finite else None
allow_infinity = False if finite else None
return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity)
r = {}
r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({
'value': st.lists(floats(), min_size=3, max_size=3),
'std': st.lists(floats(), min_size=3, max_size=3),
'valid': st.just(True),
})
r['LiveLocationKalman'] = st.fixed_dictionaries({
'angularVelocityCalibrated': r['liveLocationKalman.Measurement'],
'inputsOK': st.booleans(),
'posenetOK': st.booleans(),
})
r['CarState'] = st.fixed_dictionaries({
'vEgo': floats(width=32),
'vEgoRaw': floats(width=32),
'steeringPressed': st.booleans(),
'steeringAngleDeg': floats(width=32),
})
r['CameraOdometry'] = st.fixed_dictionaries({
'frameId': st.integers(min_value=0, max_value=2**32 - 1),
'timestampEof': st.integers(min_value=0, max_value=2**64 - 1),
'trans': st.lists(floats(width=32), min_size=3, max_size=3),
'rot': st.lists(floats(width=32), min_size=3, max_size=3),
'transStd': st.lists(floats(width=32), min_size=3, max_size=3),
'rotStd': st.lists(floats(width=32), min_size=3, max_size=3),
})
r['SensorEventData.SensorVec'] = st.fixed_dictionaries({
'v': st.lists(floats(width=32), min_size=3, max_size=3),
'status': st.just(1),
})
r['SensorEventData_gyro'] = st.fixed_dictionaries({
'version': st.just(1),
'sensor': st.just(5),
'type': st.just(16),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(8), # BMX055
'gyroUncalibrated': r['SensorEventData.SensorVec'],
})
r['SensorEventData_accel'] = st.fixed_dictionaries({
'version': st.just(1),
'sensor': st.just(1),
'type': st.just(1),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(8), # BMX055
'acceleration': r['SensorEventData.SensorVec'],
})
r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1)
r['GpsLocationExternal'] = st.fixed_dictionaries({
'flags': st.just(1),
'latitude': floats(),
'longitude': floats(),
'altitude': floats(),
'speed': floats(width=32),
'bearingDeg': floats(width=32),
'accuracy': floats(width=32),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(6), # Ublox
'vNED': st.lists(floats(width=32), min_size=3, max_size=3),
'verticalAccuracy': floats(width=32),
'bearingAccuracyDeg': floats(width=32),
'speedAccuracy': floats(width=32),
})
r['LiveCalibration'] = st.fixed_dictionaries({
'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3),
})
return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types]))
def get_strategy_for_process(process, finite=False):
return get_strategy_for_events(get_process_config(process).pubs, finite)
def convert_to_lr(msgs):
return [log.Event.new_message(**m).as_reader() for m in msgs]
def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value
ret = True
for k, v in d.items():
name = prefix + f"{k}"
if name in exclude:
continue
if isinstance(v, dict):
if not is_finite(v, exclude, name + "."):
ret = False
else:
try:
if not np.isfinite(v).all():
note((name, v))
ret = False
except TypeError:
pass
return ret
def test_process(dat, name):
cfg = get_process_config(name)
lr = convert_to_lr(dat)
pr.TIMEOUT = 0.1
return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2)
class TestFuzzy(unittest.TestCase):
@given(get_strategy_for_process('paramsd'))
@settings(deadline=1000)
def test_paramsd(self, dat):
for r in test_process(dat, 'paramsd'):
d = r.liveParameters.to_dict()
assert is_finite(d)
@given(get_strategy_for_process('locationd', finite=True))
@settings(deadline=1000)
def test_locationd(self, dat):
exclude = [
'positionGeodetic.std',
'velocityNED.std',
'orientationNED.std',
'calibratedOrientationECEF.std',
]
for r in test_process(dat, 'locationd'):
d = r.liveLocationKalman.to_dict()
assert is_finite(d, exclude)
class TestFuzzProcesses(unittest.TestCase):
@parameterized.expand(TEST_CASES)
@given(st.data())
@settings(phases=[Phase.generate, Phase.target], max_examples=50, deadline=1000, suppress_health_check=[HealthCheck.too_slow, HealthCheck.data_too_large])
def test_fuzz_process(self, proc_name, cfg, data):
msgs = data.draw(FuzzyGenerator.get_random_event_msg(required=cfg.pubs, real_floats=True))
lr = [log.Event.new_message(**m).as_reader() for m in msgs]
cfg.timeout = 5
pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2, disable_progress=True)
if __name__ == "__main__":
procs = {
'locationd': TestFuzzy().test_locationd,
'paramsd': TestFuzzy().test_paramsd,
}
if len(sys.argv) != 2:
print("Usage: ./test_fuzzy.py <process name>")
sys.exit(0)
proc = sys.argv[1]
if proc not in procs:
print(f"{proc} not available")
sys.exit(0)
else:
procs[proc]()
unittest.main()

Loading…
Cancel
Save