diff --git a/selfdrive/car/tests/test_car_interfaces.py b/selfdrive/car/tests/test_car_interfaces.py index d44c50ed90..490c849863 100755 --- a/selfdrive/car/tests/test_car_interfaces.py +++ b/selfdrive/car/tests/test_car_interfaces.py @@ -7,7 +7,7 @@ from hypothesis import Phase, given, settings import importlib from parameterized import parameterized -from cereal import car +from cereal import car, messaging from openpilot.common.realtime import DT_CTRL from openpilot.selfdrive.car import gen_empty_fingerprint from openpilot.selfdrive.car.car_helpers import interfaces @@ -119,6 +119,12 @@ class TestCarInterfaces(unittest.TestCase): hasattr(radar_interface, '_update') and hasattr(radar_interface, 'trigger_msg'): radar_interface._update([radar_interface.trigger_msg]) + # Test radar fault + if not car_params.radarUnavailable and radar_interface.rcp is not None: + cans = [messaging.new_message('can', 1).to_bytes() for _ in range(5)] + rr = radar_interface.update(cans) + self.assertTrue(rr is None or len(rr.errors) > 0) + def test_interface_attrs(self): """Asserts basic behavior of interface attribute getter""" num_brands = len(get_interface_attr('CAR')) diff --git a/selfdrive/controls/tests/test_leads.py b/selfdrive/controls/tests/test_leads.py new file mode 100644 index 0000000000..bd8baf096e --- /dev/null +++ b/selfdrive/controls/tests/test_leads.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +import unittest + +import cereal.messaging as messaging + +from selfdrive.test.process_replay import replay_process_with_name +from selfdrive.car.toyota.values import CAR as TOYOTA + + +class TestLeads(unittest.TestCase): + def test_radar_fault(self): + # if there's no radar-related can traffic, radard should either not respond or respond with an error + # this is tightly coupled with underlying car radar_interface implementation, but it's a good sanity check + def single_iter_pkg(): + # single iter package, with meaningless cans and empty carState/modelV2 + msgs = [] + for _ in range(5): + can = messaging.new_message("can", 1) + cs = messaging.new_message("carState") + msgs.append(can.as_reader()) + msgs.append(cs.as_reader()) + model = messaging.new_message("modelV2") + msgs.append(model.as_reader()) + + return msgs + + msgs = [m for _ in range(3) for m in single_iter_pkg()] + out = replay_process_with_name("radard", msgs, fingerprint=TOYOTA.COROLLA_TSS2) + states = [m for m in out if m.which() == "radarState"] + failures = [not state.valid and len(state.radarState.radarErrors) for state in states] + + self.assertTrue(len(states) == 0 or all(failures)) + + +if __name__ == "__main__": + unittest.main()