test controls step with CI and lat/long controllers, car controller.update, etc

pull/31282/head
Shane Smiskol 2 years ago
parent 9aa59b8473
commit 95ce6fdd6d
  1. 36
      selfdrive/car/tests/test_car_interfaces.py
  2. 4
      selfdrive/controls/controlsd.py
  3. 13
      selfdrive/test/fuzzy_generation.py

@ -2,6 +2,7 @@
import os
import math
import unittest
from unittest.mock import patch
import hypothesis.strategies as st
from hypothesis import Phase, given, settings
import importlib
@ -14,6 +15,10 @@ from openpilot.selfdrive.car.car_helpers import interfaces
from openpilot.selfdrive.car.fingerprints import all_known_cars
from openpilot.selfdrive.car.fw_versions import FW_VERSIONS
from openpilot.selfdrive.car.interfaces import get_interface_attr
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.controlsd import Controls
from openpilot.selfdrive.test.fuzzy_generation import DrawType, FuzzyGenerator
ALL_ECUS = list({ecu for ecus in FW_VERSIONS.values() for ecu in ecus.keys()})
@ -49,11 +54,13 @@ class TestCarInterfaces(unittest.TestCase):
# FIXME: Due to the lists used in carParams, Phase.target is very slow and will cause
# many generated examples to overrun when max_examples > ~20, don't use it
@parameterized.expand([(car,) for car in sorted(all_known_cars())])
@parameterized.expand([(car,) for car in sorted(all_known_cars()) if 'BOLT EUV' in car])
# @parameterized.expand([(car,) for car in sorted(all_known_cars())])
@settings(max_examples=MAX_EXAMPLES, deadline=None,
phases=(Phase.reuse, Phase.generate, Phase.shrink))
phases=(Phase.reuse, Phase.generate))
@given(data=st.data())
def test_car_interfaces(self, car_name, data):
print(car_name)
CarInterface, CarController, CarState = interfaces[car_name]
args = get_fuzzy_car_interface_args(data.draw)
@ -87,6 +94,18 @@ class TestCarInterfaces(unittest.TestCase):
self.assertTrue(not math.isnan(tune.torque.kf) and tune.torque.kf > 0)
self.assertTrue(not math.isnan(tune.torque.friction) and tune.torque.friction > 0)
# # Test lat and long controllers
# controlsd = Controls(CI=car_interface)
# for _ in range(10):
# controlsd.state_control(car.CarState.new_message())
# if car_params.steerControlType == car.CarParams.SteerControlType.angle:
# lat_control = LatControlAngle(car_params, car_interface)
# elif car_params.lateralTuning.which() == 'pid':
# lat_control = LatControlPID(car_params, car_interface)
# elif car_params.lateralTuning.which() == 'torque':
# lat_control = LatControlTorque(car_params, car_interface)
cc_msg = FuzzyGenerator.get_random_msg(data.draw, car.CarControl, real_floats=True)
# Run car interface
now_nanos = 0
@ -105,6 +124,17 @@ class TestCarInterfaces(unittest.TestCase):
car_interface.apply(CC, now_nanos)
now_nanos += DT_CTRL * 1e9 # 10ms
# Test lat and long controllers
controlsd = Controls(CI=car_interface)
controlsd.initialized = True
cs_msg = FuzzyGenerator.get_random_msg(data.draw, car.CarState, real_floats=True, ignore_deprecated=True)
CS = car.CarState.new_message(**cs_msg)
# # print('looping')
with (patch('openpilot.selfdrive.car.interfaces.CarInterfaceBase.update', lambda *args, **kwargs: CS),
patch('cereal.messaging.drain_sock_raw', lambda *args, **kwargs: [])):
for _ in range(10):
controlsd.step()
# Test radar interface
RadarInterface = importlib.import_module(f'selfdrive.car.{car_params.carName}.radar_interface').RadarInterface
radar_interface = RadarInterface(car_params)
@ -122,6 +152,8 @@ class TestCarInterfaces(unittest.TestCase):
rr = radar_interface.update(cans)
self.assertTrue(rr is None or len(rr.errors) > 0)
print('finished!')
def test_interface_attrs(self):
"""Asserts basic behavior of interface attribute getter"""
num_brands = len(get_interface_attr('CAR'))

@ -466,6 +466,7 @@ class Controls:
self.distance_traveled += CS.vEgo * DT_CTRL
# print(CS)
return CS
def state_transition(self, CS):
@ -735,6 +736,7 @@ class Controls:
if self.enabled:
clear_event_types.add(ET.NO_ENTRY)
# print(self.events)
alerts = self.events.create_alerts(self.current_alert_types, [self.CP, CS, self.sm, self.is_metric, self.soft_disable_timer])
self.AM.add_many(self.sm.frame, alerts)
current_alert = self.AM.process_alerts(self.sm.frame, clear_event_types)
@ -841,10 +843,12 @@ class Controls:
self.CC = CC
def step(self):
# print(self.sm.frame)
start_time = time.monotonic()
# Sample data from sockets and get a carState
CS = self.data_sample()
# print(CS)
cloudlog.timestamp("Data sampled")
self.update_events(CS)

@ -8,9 +8,10 @@ DrawType = Callable[[st.SearchStrategy], Any]
class FuzzyGenerator:
def __init__(self, draw: DrawType, real_floats: bool):
def __init__(self, draw: DrawType, real_floats: bool, ignore_deprecated: bool):
self.draw = draw
self.real_floats = real_floats
self.ignore_deprecated = ignore_deprecated
def generate_native_type(self, field: str) -> st.SearchStrategy[Union[bool, int, float, str, bytes]]:
def floats(**kwargs) -> st.SearchStrategy[float]:
@ -57,7 +58,10 @@ class FuzzyGenerator:
return st.lists(rec(field_type.list.elementType))
elif field_type.which() == 'enum':
schema = field.schema.elementType if base_type == 'list' else field.schema
return st.sampled_from(list(schema.enumerants.keys()))
elements = list(schema.enumerants.keys())
if self.ignore_deprecated:
elements = [k for k in elements if not k.endswith('DEPRECATED')]
return st.sampled_from(elements)
else:
return self.generate_native_type(field_type.which())
@ -73,8 +77,9 @@ class FuzzyGenerator:
return st.fixed_dictionaries({field: self.generate_field(schema.fields[field]) for field in full_fill + single_fill})
@classmethod
def get_random_msg(cls, draw: DrawType, struct: capnp.lib.capnp._StructModule, real_floats: bool = False) -> Dict[str, Any]:
fg = cls(draw, real_floats=real_floats)
def get_random_msg(cls, draw: DrawType, struct: capnp.lib.capnp._StructModule, real_floats: bool = False,
ignore_deprecated: bool = False) -> Dict[str, Any]:
fg = cls(draw, real_floats=real_floats, ignore_deprecated=ignore_deprecated)
data: Dict[str, Any] = draw(fg.generate_struct(struct.schema))
return data

Loading…
Cancel
Save