Revert "simple fuzzing test for all processes (#28584)"

This reverts commit 0e98836e857583849aa4e960b097103a4aeb863d.

old-commit-hash: 7783dc602c
beeps
Adeeb Shihadeh 2 years ago
parent 83fd21a79e
commit ffcc2e4c61
  1. 1
      .github/workflows/selfdrive_tests.yaml
  2. 4
      selfdrive/car/tests/test_car_interfaces.py
  3. 16
      selfdrive/test/fuzzy_generation.py
  4. 182
      selfdrive/test/process_replay/test_fuzzy.py

@ -254,7 +254,6 @@ jobs:
./tools/replay/tests/test_replay && \ ./tools/replay/tests/test_replay && \
./tools/cabana/tests/test_cabana && \ ./tools/cabana/tests/test_cabana && \
./system/camerad/test/ae_gray_test && \ ./system/camerad/test/ae_gray_test && \
./selfdrive/test/process_replay/test_fuzzy.py && \
coverage xml" coverage xml"
- name: "Upload coverage to Codecov" - name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v2 uses: codecov/codecov-action@v2

@ -9,14 +9,14 @@ from cereal import car
from selfdrive.car import gen_empty_fingerprint from selfdrive.car import gen_empty_fingerprint
from selfdrive.car.car_helpers import interfaces from selfdrive.car.car_helpers import interfaces
from selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS, all_known_cars from selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS, all_known_cars
from selfdrive.test.fuzzy_generation import FuzzyGenerator from selfdrive.test.fuzzy_generation import get_random_msg
class TestCarInterfaces(unittest.TestCase): class TestCarInterfaces(unittest.TestCase):
@parameterized.expand([(car,) for car in all_known_cars()]) @parameterized.expand([(car,) for car in all_known_cars()])
@settings(max_examples=5) @settings(max_examples=5)
@given(cc_msg=FuzzyGenerator.get_random_msg(car.CarControl, real_floats=True)) @given(cc_msg=get_random_msg(car.CarControl, real_floats=True))
def test_car_interfaces(self, car_name, cc_msg): def test_car_interfaces(self, car_name, cc_msg):
if car_name in FINGERPRINTS: if car_name in FINGERPRINTS:
fingerprint = FINGERPRINTS[car_name][0] fingerprint = FINGERPRINTS[car_name][0]

@ -1,8 +1,6 @@
import hypothesis.strategies as st import hypothesis.strategies as st
import random import random
from cereal import log
class FuzzyGenerator: class FuzzyGenerator:
def __init__(self, real_floats): def __init__(self, real_floats):
self.real_floats=real_floats self.real_floats=real_floats
@ -62,16 +60,10 @@ class FuzzyGenerator:
else: else:
return self.generate_struct(field.schema) return self.generate_struct(field.schema)
def generate_struct(self, schema, required=None): def generate_struct(self, schema):
full_fill = list(schema.non_union_fields) if schema.non_union_fields else [] full_fill = list(schema.non_union_fields) if schema.non_union_fields else []
single_fill = [required] if required else [random.choice(schema.union_fields)] if schema.union_fields else [] single_fill = [random.choice(schema.union_fields)] if schema.union_fields else []
return st.fixed_dictionaries(dict((field, self.generate_field(schema.fields[field])) for field in full_fill + single_fill)) return st.fixed_dictionaries(dict((field, self.generate_field(schema.fields[field])) for field in full_fill + single_fill))
@classmethod def get_random_msg(struct, real_floats=False):
def get_random_msg(cls, struct, real_floats=False): return FuzzyGenerator(real_floats=real_floats).generate_struct(struct.schema)
return cls(real_floats=real_floats).generate_struct(struct.schema)
@classmethod
def get_random_event_msg(cls, required, real_floats=False):
fg = cls(real_floats=real_floats)
return st.tuples(*[fg.generate_struct(log.Event.schema, r) for r in required])

@ -1,30 +1,174 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from hypothesis import given, HealthCheck, Phase, settings import sys
import hypothesis.strategies as st
from parameterized import parameterized
import unittest import unittest
import hypothesis.strategies as st
import numpy as np
from hypothesis import given, settings, note
from cereal import log from cereal import log
from selfdrive.car.toyota.values import CAR as TOYOTA from selfdrive.car.toyota.values import CAR as TOYOTA
from selfdrive.test.fuzzy_generation import FuzzyGenerator
import selfdrive.test.process_replay.process_replay as pr import selfdrive.test.process_replay.process_replay as pr
# These processes currently fail because of unrealistic data breaking assumptions
# that openpilot makes causing error with NaN, inf, int size, array indexing ...
# TODO: Make each one testable
NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad']
TEST_CASES = [(cfg.proc_name, cfg) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED]
class TestFuzzProcesses(unittest.TestCase): def get_process_config(process):
return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0]
def get_event_union_strategy(r, name):
return st.fixed_dictionaries({
'valid': st.just(True),
'logMonoTime': st.integers(min_value=0, max_value=2**64-1),
name: r[name[0].upper() + name[1:]],
})
def get_strategy_for_events(event_types, finite=False):
# TODO: generate automatically based on capnp definitions
def floats(**kwargs):
allow_nan = False if finite else None
allow_infinity = False if finite else None
return st.floats(**kwargs, allow_nan=allow_nan, allow_infinity=allow_infinity)
r = {}
r['liveLocationKalman.Measurement'] = st.fixed_dictionaries({
'value': st.lists(floats(), min_size=3, max_size=3),
'std': st.lists(floats(), min_size=3, max_size=3),
'valid': st.just(True),
})
r['LiveLocationKalman'] = st.fixed_dictionaries({
'angularVelocityCalibrated': r['liveLocationKalman.Measurement'],
'inputsOK': st.booleans(),
'posenetOK': st.booleans(),
})
r['CarState'] = st.fixed_dictionaries({
'vEgo': floats(width=32),
'vEgoRaw': floats(width=32),
'steeringPressed': st.booleans(),
'steeringAngleDeg': floats(width=32),
})
r['CameraOdometry'] = st.fixed_dictionaries({
'frameId': st.integers(min_value=0, max_value=2**32 - 1),
'timestampEof': st.integers(min_value=0, max_value=2**64 - 1),
'trans': st.lists(floats(width=32), min_size=3, max_size=3),
'rot': st.lists(floats(width=32), min_size=3, max_size=3),
'transStd': st.lists(floats(width=32), min_size=3, max_size=3),
'rotStd': st.lists(floats(width=32), min_size=3, max_size=3),
})
r['SensorEventData.SensorVec'] = st.fixed_dictionaries({
'v': st.lists(floats(width=32), min_size=3, max_size=3),
'status': st.just(1),
})
r['SensorEventData_gyro'] = st.fixed_dictionaries({
'version': st.just(1),
'sensor': st.just(5),
'type': st.just(16),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(8), # BMX055
'gyroUncalibrated': r['SensorEventData.SensorVec'],
})
r['SensorEventData_accel'] = st.fixed_dictionaries({
'version': st.just(1),
'sensor': st.just(1),
'type': st.just(1),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(8), # BMX055
'acceleration': r['SensorEventData.SensorVec'],
})
r['SensorEvents'] = st.lists(st.one_of(r['SensorEventData_gyro'], r['SensorEventData_accel']), min_size=1)
r['GpsLocationExternal'] = st.fixed_dictionaries({
'flags': st.just(1),
'latitude': floats(),
'longitude': floats(),
'altitude': floats(),
'speed': floats(width=32),
'bearingDeg': floats(width=32),
'accuracy': floats(width=32),
'timestamp': st.integers(min_value=0, max_value=2**63 - 1),
'source': st.just(6), # Ublox
'vNED': st.lists(floats(width=32), min_size=3, max_size=3),
'verticalAccuracy': floats(width=32),
'bearingAccuracyDeg': floats(width=32),
'speedAccuracy': floats(width=32),
})
r['LiveCalibration'] = st.fixed_dictionaries({
'rpyCalib': st.lists(floats(width=32), min_size=3, max_size=3),
})
return st.lists(st.one_of(*[get_event_union_strategy(r, n) for n in event_types]))
def get_strategy_for_process(process, finite=False):
return get_strategy_for_events(get_process_config(process).pubs, finite)
def convert_to_lr(msgs):
return [log.Event.new_message(**m).as_reader() for m in msgs]
def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value
ret = True
for k, v in d.items():
name = prefix + f"{k}"
if name in exclude:
continue
if isinstance(v, dict):
if not is_finite(v, exclude, name + "."):
ret = False
else:
try:
if not np.isfinite(v).all():
note((name, v))
ret = False
except TypeError:
pass
return ret
def test_process(dat, name):
cfg = get_process_config(name)
lr = convert_to_lr(dat)
pr.TIMEOUT = 0.1
return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2)
class TestFuzzy(unittest.TestCase):
@given(get_strategy_for_process('paramsd'))
@settings(deadline=1000)
def test_paramsd(self, dat):
for r in test_process(dat, 'paramsd'):
d = r.liveParameters.to_dict()
assert is_finite(d)
@given(get_strategy_for_process('locationd', finite=True))
@settings(deadline=1000)
def test_locationd(self, dat):
exclude = [
'positionGeodetic.std',
'velocityNED.std',
'orientationNED.std',
'calibratedOrientationECEF.std',
]
for r in test_process(dat, 'locationd'):
d = r.liveLocationKalman.to_dict()
assert is_finite(d, exclude)
@parameterized.expand(TEST_CASES)
@given(st.data())
@settings(phases=[Phase.generate, Phase.target], max_examples=50, deadline=1000, suppress_health_check=[HealthCheck.too_slow, HealthCheck.data_too_large])
def test_fuzz_process(self, proc_name, cfg, data):
msgs = data.draw(FuzzyGenerator.get_random_event_msg(required=cfg.pubs, real_floats=True))
lr = [log.Event.new_message(**m).as_reader() for m in msgs]
cfg.timeout = 5
pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2, disable_progress=True)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() procs = {
'locationd': TestFuzzy().test_locationd,
'paramsd': TestFuzzy().test_paramsd,
}
if len(sys.argv) != 2:
print("Usage: ./test_fuzzy.py <process name>")
sys.exit(0)
proc = sys.argv[1]
if proc not in procs:
print(f"{proc} not available")
sys.exit(0)
else:
procs[proc]()

Loading…
Cancel
Save