|
|
|
@ -1,20 +1,18 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
import sys |
|
|
|
|
import unittest |
|
|
|
|
from collections import Counter |
|
|
|
|
|
|
|
|
|
import hypothesis.strategies as st |
|
|
|
|
import numpy as np |
|
|
|
|
from hypothesis import assume, given, settings |
|
|
|
|
from hypothesis import given, settings |
|
|
|
|
|
|
|
|
|
from cereal import log |
|
|
|
|
from selfdrive.car.toyota.values import CAR as TOYOTA |
|
|
|
|
from selfdrive.test.process_replay.process_replay import (CONFIGS, |
|
|
|
|
replay_process) |
|
|
|
|
import selfdrive.test.process_replay.process_replay as pr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_process_config(process): |
|
|
|
|
return [cfg for cfg in CONFIGS if cfg.proc_name == process][0] |
|
|
|
|
return [cfg for cfg in pr.CONFIGS if cfg.proc_name == process][0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_event_union_strategy(r, name): |
|
|
|
@ -109,12 +107,6 @@ def convert_to_lr(msgs): |
|
|
|
|
return [log.Event.new_message(**m).as_reader() for m in msgs] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def assume_all_services_present(cfg, lr): |
|
|
|
|
tps = Counter([m.which() for m in lr]) |
|
|
|
|
for p in cfg.pub_sub: |
|
|
|
|
assume(tps[p] > 0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-value |
|
|
|
|
ret = True |
|
|
|
|
for k, v in d.items(): |
|
|
|
@ -136,8 +128,8 @@ def is_finite(d, exclude=[], prefix=""): # pylint: disable=dangerous-default-va |
|
|
|
|
def test_process(dat, name): |
|
|
|
|
cfg = get_process_config(name) |
|
|
|
|
lr = convert_to_lr(dat) |
|
|
|
|
assume_all_services_present(cfg, lr) |
|
|
|
|
return replay_process(cfg, lr, TOYOTA.COROLLA_TSS2) |
|
|
|
|
pr.TIMEOUT = 0.1 |
|
|
|
|
return pr.replay_process(cfg, lr, TOYOTA.COROLLA_TSS2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestFuzzy(unittest.TestCase): |
|
|
|
@ -149,7 +141,7 @@ class TestFuzzy(unittest.TestCase): |
|
|
|
|
assert is_finite(lp) |
|
|
|
|
|
|
|
|
|
@given(get_strategy_for_process('locationd', finite=True)) |
|
|
|
|
@settings(deadline=10000) |
|
|
|
|
@settings(deadline=1000) |
|
|
|
|
def test_locationd(self, dat): |
|
|
|
|
exclude = [ |
|
|
|
|
'positionGeodetic.std', |
|
|
|
|