diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py new file mode 100755 index 0000000000..5721896787 --- /dev/null +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +import pytest +import random +import time +import unittest +from collections import defaultdict +from parameterized import parameterized +import threading + +from cereal import car +from openpilot.common.params import Params +from openpilot.selfdrive.car.car_helpers import interfaces +from openpilot.selfdrive.car.fingerprints import FW_VERSIONS +from openpilot.selfdrive.car.fw_versions import FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \ + match_fw_to_car, get_fw_versions, get_present_ecus +from openpilot.selfdrive.car.vin import get_vin + +CarFw = car.CarParams.CarFw +Ecu = car.CarParams.Ecu + +ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} + + +class FakeSocket: + def receive(self, non_blocking=False): + pass + + def send(self, msg): + pass + + +class TestFwFingerprint(unittest.TestCase): + def assertFingerprints(self, candidates, expected): + candidates = list(candidates) + self.assertEqual(len(candidates), 1, f"got more than one candidate: {candidates}") + self.assertEqual(candidates[0], expected) + + @parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e]) + def test_exact_match(self, brand, car_model, ecus): + CP = car.CarParams.new_message() + for _ in range(200): + fw = [] + for ecu, fw_versions in ecus.items(): + ecu_name, addr, sub_addr = ecu + fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand, + "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) + CP.carFw = fw + _, matches = match_fw_to_car(CP.carFw, allow_fuzzy=False) + self.assertFingerprints(matches, car_model) + + @parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e]) + def test_custom_fuzzy_match(self, brand, car_model, ecus): + # Assert brand-specific fuzzy fingerprinting function doesn't disagree with standard fuzzy function + config = FW_QUERY_CONFIGS[brand] + if config.match_fw_to_car_fuzzy is None: + raise unittest.SkipTest("Brand does not implement custom fuzzy fingerprinting function") + + CP = car.CarParams.new_message() + for _ in range(5): + fw = [] + for ecu, fw_versions in ecus.items(): + ecu_name, addr, sub_addr = ecu + fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand, + "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) + CP.carFw = fw + _, matches = match_fw_to_car(CP.carFw, allow_exact=False, log=False) + brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw)) + + # If both have matches, they must agree + if len(matches) == 1 and len(brand_matches) == 1: + self.assertEqual(matches, brand_matches) + + @parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e]) + def test_fuzzy_match_ecu_count(self, brand, car_model, ecus): + # Asserts that fuzzy matching does not count matching FW, but ECU address keys + valid_ecus = [e for e in ecus if e[0] not in FUZZY_EXCLUDE_ECUS] + if not len(valid_ecus): + raise unittest.SkipTest("Car model has no compatible ECUs for fuzzy matching") + + fw = [] + for ecu in valid_ecus: + ecu_name, addr, sub_addr = ecu + for _ in range(5): + # Add multiple FW versions to simulate ECU returning to multiple queries in a brand + fw.append({"ecu": ecu_name, "fwVersion": random.choice(ecus[ecu]), 'brand': brand, + "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) + CP = car.CarParams.new_message(carFw=fw) + _, matches = match_fw_to_car(CP.carFw, allow_exact=False, log=False) + + # Assert no match if there are not enough unique ECUs + unique_ecus = {(f['address'], f['subAddress']) for f in fw} + if len(unique_ecus) < 2: + self.assertEqual(len(matches), 0, car_model) + # There won't always be a match due to shared FW, but if there is it should be correct + elif len(matches): + self.assertFingerprints(matches, car_model) + + def test_fw_version_lists(self): + for car_model, ecus in FW_VERSIONS.items(): + with self.subTest(car_model=car_model.value): + for ecu, ecu_fw in ecus.items(): + with self.subTest(ecu): + duplicates = {fw for fw in ecu_fw if ecu_fw.count(fw) > 1} + self.assertFalse(len(duplicates), f'{car_model}: Duplicate FW versions: Ecu.{ECU_NAME[ecu[0]]}, {duplicates}') + self.assertGreater(len(ecu_fw), 0, f'{car_model}: No FW versions: Ecu.{ECU_NAME[ecu[0]]}') + + def test_all_addrs_map_to_one_ecu(self): + for brand, cars in VERSIONS.items(): + addr_to_ecu = defaultdict(set) + for ecus in cars.values(): + for ecu_type, addr, sub_addr in ecus.keys(): + addr_to_ecu[(addr, sub_addr)].add(ecu_type) + ecus_for_addr = addr_to_ecu[(addr, sub_addr)] + ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_for_addr]) + self.assertLessEqual(len(ecus_for_addr), 1, f"{brand} has multiple ECUs that map to one address: {ecu_strings} -> ({hex(addr)}, {sub_addr})") + + def test_data_collection_ecus(self): + # Asserts no extra ECUs are in the fingerprinting database + for brand, config in FW_QUERY_CONFIGS.items(): + for car_model, ecus in VERSIONS[brand].items(): + bad_ecus = set(ecus).intersection(config.extra_ecus) + with self.subTest(car_model=car_model.value): + self.assertFalse(len(bad_ecus), f'{car_model}: Fingerprints contain ECUs added for data collection: {bad_ecus}') + + def test_blacklisted_ecus(self): + blacklisted_addrs = (0x7c4, 0x7d0) # includes A/C ecu and an unknown ecu + for car_model, ecus in FW_VERSIONS.items(): + with self.subTest(car_model=car_model.value): + CP = interfaces[car_model][0].get_non_essential_params(car_model) + if CP.carName == 'subaru': + for ecu in ecus.keys(): + self.assertNotIn(ecu[1], blacklisted_addrs, f'{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})') + + elif CP.carName == "chrysler": + # Some HD trucks have a combined TCM and ECM + if CP.carFingerprint.startswith("RAM HD"): + for ecu in ecus.keys(): + self.assertNotEqual(ecu[0], Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})") + + def test_missing_versions_and_configs(self): + brand_versions = set(VERSIONS.keys()) + brand_configs = set(FW_QUERY_CONFIGS.keys()) + if len(brand_configs - brand_versions): + with self.subTest(): + self.fail(f"Brands do not implement FW_VERSIONS: {brand_configs - brand_versions}") + + if len(brand_versions - brand_configs): + with self.subTest(): + self.fail(f"Brands do not implement FW_QUERY_CONFIG: {brand_versions - brand_configs}") + + def test_fw_request_ecu_whitelist(self): + for brand, config in FW_QUERY_CONFIGS.items(): + with self.subTest(brand=brand): + whitelisted_ecus = {ecu for r in config.requests for ecu in r.whitelist_ecus} + brand_ecus = {fw[0] for car_fw in VERSIONS[brand].values() for fw in car_fw} + brand_ecus |= {ecu[0] for ecu in config.extra_ecus} + + # each ecu in brand's fw versions + extra ecus needs to be whitelisted at least once + ecus_not_whitelisted = brand_ecus - whitelisted_ecus + + ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_whitelisted]) + self.assertFalse(len(whitelisted_ecus) and len(ecus_not_whitelisted), + f'{brand.title()}: ECUs not in any FW query whitelists: {ecu_strings}') + + def test_fw_requests(self): + # Asserts equal length request and response lists + for brand, config in FW_QUERY_CONFIGS.items(): + with self.subTest(brand=brand): + for request_obj in config.requests: + self.assertEqual(len(request_obj.request), len(request_obj.response)) + + # No request on the OBD port (bus 1, multiplexed) should be run on an aux panda + self.assertFalse(request_obj.auxiliary and request_obj.bus == 1 and request_obj.obd_multiplexing, + f"{brand.title()}: OBD multiplexed request is marked auxiliary: {request_obj}") + + +class TestFwFingerprintTiming(unittest.TestCase): + N: int = 5 + TOL: float = 0.12 + + @staticmethod + def _run_thread(thread: threading.Thread) -> float: + params = Params() + params.put_bool("ObdMultiplexingEnabled", True) + thread.start() + t = time.perf_counter() + while thread.is_alive(): + time.sleep(0.02) + if not params.get_bool("ObdMultiplexingChanged"): + params.put_bool("ObdMultiplexingChanged", True) + return time.perf_counter() - t + + def _benchmark_brand(self, brand, num_pandas): + fake_socket = FakeSocket() + brand_time = 0 + for _ in range(self.N): + thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand), + kwargs=dict(num_pandas=num_pandas)) + brand_time += self._run_thread(thread) + + return brand_time / self.N + + def _assert_timing(self, avg_time, ref_time): + self.assertLess(avg_time, ref_time + self.TOL) + self.assertGreater(avg_time, ref_time - self.TOL, "Performance seems to have improved, update test refs.") + + def test_startup_timing(self): + # Tests worse-case VIN query time and typical present ECU query time + vin_ref_time = 1.0 + present_ecu_ref_time = 0.8 + + fake_socket = FakeSocket() + present_ecu_time = 0.0 + for _ in range(self.N): + thread = threading.Thread(target=get_present_ecus, args=(fake_socket, fake_socket), + kwargs=dict(num_pandas=2)) + present_ecu_time += self._run_thread(thread) + self._assert_timing(present_ecu_time / self.N, present_ecu_ref_time) + print(f'get_present_ecus, query time={present_ecu_time / self.N} seconds') + + vin_time = 0.0 + for _ in range(self.N): + thread = threading.Thread(target=get_vin, args=(fake_socket, fake_socket, 1)) + vin_time += self._run_thread(thread) + self._assert_timing(vin_time / self.N, vin_ref_time) + print(f'get_vin, query time={vin_time / self.N} seconds') + + @pytest.mark.timeout(60) + def test_fw_query_timing(self): + total_ref_time = 6.41 + brand_ref_times = { + 1: { + 'body': 0.11, + 'chrysler': 0.3, + 'ford': 0.2, + 'honda': 0.52, + 'hyundai': 0.72, + 'mazda': 0.2, + 'nissan': 0.4, + 'subaru': 0.52, + 'tesla': 0.2, + 'toyota': 1.6, + 'volkswagen': 0.2, + }, + 2: { + 'ford': 0.3, + 'hyundai': 1.12, + } + } + + total_time = 0 + for num_pandas in (1, 2): + for brand, config in FW_QUERY_CONFIGS.items(): + with self.subTest(brand=brand, num_pandas=num_pandas): + multi_panda_requests = [r for r in config.requests if r.bus > 3] + if not len(multi_panda_requests) and num_pandas > 1: + raise unittest.SkipTest("No multi-panda FW queries") + + avg_time = self._benchmark_brand(brand, num_pandas) + total_time += avg_time + avg_time = round(avg_time, 2) + self._assert_timing(avg_time, brand_ref_times[num_pandas][brand]) + print(f'{brand=}, {num_pandas=}, {len(config.requests)=}, avg FW query time={avg_time} seconds') + + with self.subTest(brand='all_brands'): + total_time = round(total_time, 2) + self._assert_timing(total_time, total_ref_time) + print(f'all brands, total FW query time={total_time} seconds') + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/__init__.py b/selfdrive/controls/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selfdrive/controls/tests/test_alerts.py b/selfdrive/controls/tests/test_alerts.py new file mode 100755 index 0000000000..7b4fba0dce --- /dev/null +++ b/selfdrive/controls/tests/test_alerts.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 +import copy +import json +import os +import unittest +import random +from PIL import Image, ImageDraw, ImageFont + +from cereal import log, car +from cereal.messaging import SubMaster +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.selfdrive.controls.lib.events import Alert, EVENTS, ET +from openpilot.selfdrive.controls.lib.alertmanager import set_offroad_alert +from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS + +AlertSize = log.ControlsState.AlertSize + +OFFROAD_ALERTS_PATH = os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json") + +# TODO: add callback alerts +ALERTS = [] +for event_types in EVENTS.values(): + for alert in event_types.values(): + ALERTS.append(alert) + + +class TestAlerts(unittest.TestCase): + + @classmethod + def setUpClass(cls): + with open(OFFROAD_ALERTS_PATH) as f: + cls.offroad_alerts = json.loads(f.read()) + + # Create fake objects for callback + cls.CS = car.CarState.new_message() + cls.CP = car.CarParams.new_message() + cfg = [c for c in CONFIGS if c.proc_name == 'controlsd'][0] + cls.sm = SubMaster(cfg.pubs) + + def test_events_defined(self): + # Ensure all events in capnp schema are defined in events.py + events = car.CarEvent.EventName.schema.enumerants + + for name, e in events.items(): + if not name.endswith("DEPRECATED"): + fail_msg = "%s @%d not in EVENTS" % (name, e) + self.assertTrue(e in EVENTS.keys(), msg=fail_msg) + + # ensure alert text doesn't exceed allowed width + def test_alert_text_length(self): + font_path = os.path.join(BASEDIR, "selfdrive/assets/fonts") + regular_font_path = os.path.join(font_path, "Inter-SemiBold.ttf") + bold_font_path = os.path.join(font_path, "Inter-Bold.ttf") + semibold_font_path = os.path.join(font_path, "Inter-SemiBold.ttf") + + max_text_width = 2160 - 300 # full screen width is usable, minus sidebar + draw = ImageDraw.Draw(Image.new('RGB', (0, 0))) + + fonts = { + AlertSize.small: [ImageFont.truetype(semibold_font_path, 74)], + AlertSize.mid: [ImageFont.truetype(bold_font_path, 88), + ImageFont.truetype(regular_font_path, 66)], + } + + for alert in ALERTS: + if not isinstance(alert, Alert): + alert = alert(self.CP, self.CS, self.sm, metric=False, soft_disable_time=100) + + # for full size alerts, both text fields wrap the text, + # so it's unlikely that they would go past the max width + if alert.alert_size in (AlertSize.none, AlertSize.full): + continue + + for i, txt in enumerate([alert.alert_text_1, alert.alert_text_2]): + if i >= len(fonts[alert.alert_size]): + break + + font = fonts[alert.alert_size][i] + left, _, right, _ = draw.textbbox((0, 0), txt, font) + width = right - left + msg = f"type: {alert.alert_type} msg: {txt}" + self.assertLessEqual(width, max_text_width, msg=msg) + + def test_alert_sanity_check(self): + for event_types in EVENTS.values(): + for event_type, a in event_types.items(): + # TODO: add callback alerts + if not isinstance(a, Alert): + continue + + if a.alert_size == AlertSize.none: + self.assertEqual(len(a.alert_text_1), 0) + self.assertEqual(len(a.alert_text_2), 0) + elif a.alert_size == AlertSize.small: + self.assertGreater(len(a.alert_text_1), 0) + self.assertEqual(len(a.alert_text_2), 0) + elif a.alert_size == AlertSize.mid: + self.assertGreater(len(a.alert_text_1), 0) + self.assertGreater(len(a.alert_text_2), 0) + else: + self.assertGreater(len(a.alert_text_1), 0) + + self.assertGreaterEqual(a.duration, 0.) + + if event_type not in (ET.WARNING, ET.PERMANENT, ET.PRE_ENABLE): + self.assertEqual(a.creation_delay, 0.) + + def test_offroad_alerts(self): + params = Params() + for a in self.offroad_alerts: + # set the alert + alert = copy.copy(self.offroad_alerts[a]) + set_offroad_alert(a, True) + alert['extra'] = '' + self.assertTrue(json.dumps(alert) == params.get(a, encoding='utf8')) + + # then delete it + set_offroad_alert(a, False) + self.assertTrue(params.get(a) is None) + + def test_offroad_alerts_extra_text(self): + params = Params() + for i in range(50): + # set the alert + a = random.choice(list(self.offroad_alerts)) + alert = self.offroad_alerts[a] + set_offroad_alert(a, True, extra_text="a"*i) + + written_alert = json.loads(params.get(a, encoding='utf8')) + self.assertTrue("a"*i == written_alert['extra']) + self.assertTrue(alert["text"] == written_alert['text']) + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_cruise_speed.py b/selfdrive/controls/tests/test_cruise_speed.py new file mode 100755 index 0000000000..76a2222e85 --- /dev/null +++ b/selfdrive/controls/tests/test_cruise_speed.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +import itertools +import numpy as np +import unittest + +from parameterized import parameterized_class +from cereal import log +from openpilot.common.params import Params +from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, V_CRUISE_MIN, V_CRUISE_MAX, V_CRUISE_INITIAL, IMPERIAL_INCREMENT +from cereal import car +from openpilot.common.conversions import Conversions as CV +from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver + +ButtonEvent = car.CarState.ButtonEvent +ButtonType = car.CarState.ButtonEvent.Type + + +def run_cruise_simulation(cruise, e2e, t_end=20.): + man = Maneuver( + '', + duration=t_end, + initial_speed=max(cruise - 1., 0.0), + lead_relevancy=True, + initial_distance_lead=100, + cruise_values=[cruise], + prob_lead_values=[0.0], + breakpoints=[0.], + e2e=e2e, + ) + valid, output = man.evaluate() + assert valid + return output[-1, 3] + + +@parameterized_class(("e2e", "personality", "speed"), itertools.product( + [True, False], # e2e + log.LongitudinalPersonality.schema.enumerants, # personality + [5,35])) # speed +class TestCruiseSpeed(unittest.TestCase): + def test_cruise_speed(self): + params = Params() + params.put("LongitudinalPersonality", str(self.personality)) + print(f'Testing {self.speed} m/s') + cruise_speed = float(self.speed) + + simulation_steady_state = run_cruise_simulation(cruise_speed, self.e2e) + self.assertAlmostEqual(simulation_steady_state, cruise_speed, delta=.01, msg=f'Did not reach {self.speed} m/s') + + +# TODO: test pcmCruise +@parameterized_class(('pcm_cruise',), [(False,)]) +class TestVCruiseHelper(unittest.TestCase): + def setUp(self): + self.CP = car.CarParams(pcmCruise=self.pcm_cruise) + self.v_cruise_helper = VCruiseHelper(self.CP) + self.reset_cruise_speed_state() + + def reset_cruise_speed_state(self): + # Two resets previous cruise speed + for _ in range(2): + self.v_cruise_helper.update_v_cruise(car.CarState(cruiseState={"available": False}), enabled=False, is_metric=False) + + def enable(self, v_ego, experimental_mode): + # Simulates user pressing set with a current speed + self.v_cruise_helper.initialize_v_cruise(car.CarState(vEgo=v_ego), experimental_mode) + + def test_adjust_speed(self): + """ + Asserts speed changes on falling edges of buttons. + """ + + self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) + + for btn in (ButtonType.accelCruise, ButtonType.decelCruise): + for pressed in (True, False): + CS = car.CarState(cruiseState={"available": True}) + CS.buttonEvents = [ButtonEvent(type=btn, pressed=pressed)] + + self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) + self.assertEqual(pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) + + def test_rising_edge_enable(self): + """ + Some car interfaces may enable on rising edge of a button, + ensure we don't adjust speed if enabled changes mid-press. + """ + + # NOTE: enabled is always one frame behind the result from button press in controlsd + for enabled, pressed in ((False, False), + (False, True), + (True, False)): + CS = car.CarState(cruiseState={"available": True}) + CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=pressed)] + self.v_cruise_helper.update_v_cruise(CS, enabled=enabled, is_metric=False) + if pressed: + self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) + + # Expected diff on enabling. Speed should not change on falling edge of pressed + self.assertEqual(not pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) + + def test_resume_in_standstill(self): + """ + Asserts we don't increment set speed if user presses resume/accel to exit cruise standstill. + """ + + self.enable(0, False) + + for standstill in (True, False): + for pressed in (True, False): + CS = car.CarState(cruiseState={"available": True, "standstill": standstill}) + CS.buttonEvents = [ButtonEvent(type=ButtonType.accelCruise, pressed=pressed)] + self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) + + # speed should only update if not at standstill and button falling edge + should_equal = standstill or pressed + self.assertEqual(should_equal, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) + + def test_set_gas_pressed(self): + """ + Asserts pressing set while enabled with gas pressed sets + the speed to the maximum of vEgo and current cruise speed. + """ + + for v_ego in np.linspace(0, 100, 101): + self.reset_cruise_speed_state() + self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) + + # first decrement speed, then perform gas pressed logic + expected_v_cruise_kph = self.v_cruise_helper.v_cruise_kph - IMPERIAL_INCREMENT + expected_v_cruise_kph = max(expected_v_cruise_kph, v_ego * CV.MS_TO_KPH) # clip to min of vEgo + expected_v_cruise_kph = float(np.clip(round(expected_v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX)) + + CS = car.CarState(vEgo=float(v_ego), gasPressed=True, cruiseState={"available": True}) + CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=False)] + self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) + + # TODO: fix skipping first run due to enabled on rising edge exception + if v_ego == 0.0: + continue + self.assertEqual(expected_v_cruise_kph, self.v_cruise_helper.v_cruise_kph) + + def test_initialize_v_cruise(self): + """ + Asserts allowed cruise speeds on enabling with SET. + """ + + for experimental_mode in (True, False): + for v_ego in np.linspace(0, 100, 101): + self.reset_cruise_speed_state() + self.assertFalse(self.v_cruise_helper.v_cruise_initialized) + + self.enable(float(v_ego), experimental_mode) + self.assertTrue(V_CRUISE_INITIAL <= self.v_cruise_helper.v_cruise_kph <= V_CRUISE_MAX) + self.assertTrue(self.v_cruise_helper.v_cruise_initialized) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_following_distance.py b/selfdrive/controls/tests/test_following_distance.py new file mode 100755 index 0000000000..3b31632721 --- /dev/null +++ b/selfdrive/controls/tests/test_following_distance.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +import unittest +import itertools +from parameterized import parameterized_class + +from openpilot.common.params import Params +from cereal import log + +from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import desired_follow_distance, get_T_FOLLOW +from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver + + +def run_following_distance_simulation(v_lead, t_end=100.0, e2e=False): + man = Maneuver( + '', + duration=t_end, + initial_speed=float(v_lead), + lead_relevancy=True, + initial_distance_lead=100, + speed_lead_values=[v_lead], + breakpoints=[0.], + e2e=e2e, + ) + valid, output = man.evaluate() + assert valid + return output[-1,2] - output[-1,1] + + +@parameterized_class(("e2e", "personality", "speed"), itertools.product( + [True, False], # e2e + [log.LongitudinalPersonality.relaxed, # personality + log.LongitudinalPersonality.standard, + log.LongitudinalPersonality.aggressive], + [0,10,35])) # speed +class TestFollowingDistance(unittest.TestCase): + def test_following_distance(self): + params = Params() + params.put("LongitudinalPersonality", str(self.personality)) + v_lead = float(self.speed) + simulation_steady_state = run_following_distance_simulation(v_lead, e2e=self.e2e) + correct_steady_state = desired_follow_distance(v_lead, v_lead, get_T_FOLLOW(self.personality)) + err_ratio = 0.2 if self.e2e else 0.1 + self.assertAlmostEqual(simulation_steady_state, correct_steady_state, delta=(err_ratio * correct_steady_state + .5)) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_lateral_mpc.py b/selfdrive/controls/tests/test_lateral_mpc.py new file mode 100644 index 0000000000..8c09f46b60 --- /dev/null +++ b/selfdrive/controls/tests/test_lateral_mpc.py @@ -0,0 +1,89 @@ +import unittest +import numpy as np +from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import LateralMpc +from openpilot.selfdrive.controls.lib.drive_helpers import CAR_ROTATION_RADIUS +from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import N as LAT_MPC_N + + +def run_mpc(lat_mpc=None, v_ref=30., x_init=0., y_init=0., psi_init=0., curvature_init=0., + lane_width=3.6, poly_shift=0.): + + if lat_mpc is None: + lat_mpc = LateralMpc() + lat_mpc.set_weights(1., .1, 0.0, .05, 800) + + y_pts = poly_shift * np.ones(LAT_MPC_N + 1) + heading_pts = np.zeros(LAT_MPC_N + 1) + curv_rate_pts = np.zeros(LAT_MPC_N + 1) + + x0 = np.array([x_init, y_init, psi_init, curvature_init]) + p = np.column_stack([v_ref * np.ones(LAT_MPC_N + 1), + CAR_ROTATION_RADIUS * np.ones(LAT_MPC_N + 1)]) + + # converge in no more than 10 iterations + for _ in range(10): + lat_mpc.run(x0, p, + y_pts, heading_pts, curv_rate_pts) + return lat_mpc.x_sol + + +class TestLateralMpc(unittest.TestCase): + + def _assert_null(self, sol, curvature=1e-6): + for i in range(len(sol)): + self.assertAlmostEqual(sol[0,i,1], 0., delta=curvature) + self.assertAlmostEqual(sol[0,i,2], 0., delta=curvature) + self.assertAlmostEqual(sol[0,i,3], 0., delta=curvature) + + def _assert_simmetry(self, sol, curvature=1e-6): + for i in range(len(sol)): + self.assertAlmostEqual(sol[0,i,1], -sol[1,i,1], delta=curvature) + self.assertAlmostEqual(sol[0,i,2], -sol[1,i,2], delta=curvature) + self.assertAlmostEqual(sol[0,i,3], -sol[1,i,3], delta=curvature) + self.assertAlmostEqual(sol[0,i,0], sol[1,i,0], delta=curvature) + + def test_straight(self): + sol = run_mpc() + self._assert_null(np.array([sol])) + + def test_y_symmetry(self): + sol = [] + for y_init in [-0.5, 0.5]: + sol.append(run_mpc(y_init=y_init)) + self._assert_simmetry(np.array(sol)) + + def test_poly_symmetry(self): + sol = [] + for poly_shift in [-1., 1.]: + sol.append(run_mpc(poly_shift=poly_shift)) + self._assert_simmetry(np.array(sol)) + + def test_curvature_symmetry(self): + sol = [] + for curvature_init in [-0.1, 0.1]: + sol.append(run_mpc(curvature_init=curvature_init)) + self._assert_simmetry(np.array(sol)) + + def test_psi_symmetry(self): + sol = [] + for psi_init in [-0.1, 0.1]: + sol.append(run_mpc(psi_init=psi_init)) + self._assert_simmetry(np.array(sol)) + + def test_no_overshoot(self): + y_init = 1. + sol = run_mpc(y_init=y_init) + for y in list(sol[:,1]): + self.assertGreaterEqual(y_init, abs(y)) + + def test_switch_convergence(self): + lat_mpc = LateralMpc() + sol = run_mpc(lat_mpc=lat_mpc, poly_shift=3.0, v_ref=7.0) + right_psi_deg = np.degrees(sol[:,2]) + sol = run_mpc(lat_mpc=lat_mpc, poly_shift=-3.0, v_ref=7.0) + left_psi_deg = np.degrees(sol[:,2]) + np.testing.assert_almost_equal(right_psi_deg, -left_psi_deg, decimal=3) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_leads.py b/selfdrive/controls/tests/test_leads.py new file mode 100755 index 0000000000..268d9c47a7 --- /dev/null +++ b/selfdrive/controls/tests/test_leads.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +import unittest + +import cereal.messaging as messaging + +from openpilot.selfdrive.test.process_replay import replay_process_with_name +from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA + + +class TestLeads(unittest.TestCase): + def test_radar_fault(self): + # if there's no radar-related can traffic, radard should either not respond or respond with an error + # this is tightly coupled with underlying car radar_interface implementation, but it's a good sanity check + def single_iter_pkg(): + # single iter package, with meaningless cans and empty carState/modelV2 + msgs = [] + for _ in range(5): + can = messaging.new_message("can", 1) + cs = messaging.new_message("carState") + msgs.append(can.as_reader()) + msgs.append(cs.as_reader()) + model = messaging.new_message("modelV2") + msgs.append(model.as_reader()) + + return msgs + + msgs = [m for _ in range(3) for m in single_iter_pkg()] + out = replay_process_with_name("radard", msgs, fingerprint=TOYOTA.COROLLA_TSS2) + states = [m for m in out if m.which() == "radarState"] + failures = [not state.valid and len(state.radarState.radarErrors) for state in states] + + self.assertTrue(len(states) == 0 or all(failures)) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_startup.py b/selfdrive/controls/tests/test_startup.py new file mode 100755 index 0000000000..6eb803e8aa --- /dev/null +++ b/selfdrive/controls/tests/test_startup.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +import time +import unittest +from parameterized import parameterized + +from cereal import log, car +import cereal.messaging as messaging +from openpilot.common.params import Params +from openpilot.selfdrive.boardd.boardd_api_impl import can_list_to_can_capnp +from openpilot.selfdrive.car.fingerprints import _FINGERPRINTS +from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA +from openpilot.selfdrive.car.mazda.values import CAR as MAZDA +from openpilot.selfdrive.controls.lib.events import EVENT_NAME +from openpilot.selfdrive.test.helpers import with_processes + +EventName = car.CarEvent.EventName +Ecu = car.CarParams.Ecu + +COROLLA_FW_VERSIONS = [ + (Ecu.engine, 0x7e0, None, b'\x0230ZC2000\x00\x00\x00\x00\x00\x00\x00\x0050212000\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.abs, 0x7b0, None, b'F152602190\x00\x00\x00\x00\x00\x00'), + (Ecu.eps, 0x7a1, None, b'8965B02181\x00\x00\x00\x00\x00\x00'), + (Ecu.fwdRadar, 0x750, 0xf, b'8821F4702100\x00\x00\x00\x00'), + (Ecu.fwdCamera, 0x750, 0x6d, b'8646F0201101\x00\x00\x00\x00'), + (Ecu.dsu, 0x791, None, b'881510201100\x00\x00\x00\x00'), +] +COROLLA_FW_VERSIONS_FUZZY = COROLLA_FW_VERSIONS[:-1] + [(Ecu.dsu, 0x791, None, b'xxxxxx')] +COROLLA_FW_VERSIONS_NO_DSU = COROLLA_FW_VERSIONS[:-1] + +CX5_FW_VERSIONS = [ + (Ecu.engine, 0x7e0, None, b'PYNF-188K2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.abs, 0x760, None, b'K123-437K2-E\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.eps, 0x730, None, b'KJ01-3210X-G-00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.fwdRadar, 0x764, None, b'K123-67XK2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.fwdCamera, 0x706, None, b'B61L-67XK2-T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), + (Ecu.transmission, 0x7e1, None, b'PYNC-21PS1-B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), +] + +class TestStartup(unittest.TestCase): + + @parameterized.expand([ + # TODO: test EventName.startup for release branches + + # officially supported car + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"), + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"), + + # dashcamOnly car + (EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"), + (EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"), + + # unrecognized car with no fw + (EventName.startupNoFw, None, None, ""), + (EventName.startupNoFw, None, None, ""), + + # unrecognized car + (EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"), + (EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"), + + # fuzzy match + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"), + (EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"), + ]) + @with_processes(['controlsd']) + def test_startup_alert(self, expected_event, car_model, fw_versions, brand): + + # TODO: this should be done without any real sockets + controls_sock = messaging.sub_sock("controlsState") + pm = messaging.PubMaster(['can', 'pandaStates']) + + params = Params() + params.clear_all() + params.put_bool("Passive", False) + params.put_bool("OpenpilotEnabledToggle", True) + + # Build capnn version of FW array + if fw_versions is not None: + car_fw = [] + cp = car.CarParams.new_message() + for ecu, addr, subaddress, version in fw_versions: + f = car.CarParams.CarFw.new_message() + f.ecu = ecu + f.address = addr + f.fwVersion = version + f.brand = brand + + if subaddress is not None: + f.subAddress = subaddress + + car_fw.append(f) + cp.carVin = "1" * 17 + cp.carFw = car_fw + params.put("CarParamsCache", cp.to_bytes()) + + time.sleep(2) # wait for controlsd to be ready + + pm.send('can', can_list_to_can_capnp([[0, 0, b"", 0]])) + time.sleep(0.1) + + msg = messaging.new_message('pandaStates', 1) + msg.pandaStates[0].pandaType = log.PandaState.PandaType.uno + pm.send('pandaStates', msg) + + # fingerprint + if (car_model is None) or (fw_versions is not None): + finger = {addr: 1 for addr in range(1, 100)} + else: + finger = _FINGERPRINTS[car_model][0] + + for _ in range(1000): + # controlsd waits for boardd to echo back that it has changed the multiplexing mode + if not params.get_bool("ObdMultiplexingChanged"): + params.put_bool("ObdMultiplexingChanged", True) + + msgs = [[addr, 0, b'\x00'*length, 0] for addr, length in finger.items()] + pm.send('can', can_list_to_can_capnp(msgs)) + + time.sleep(0.01) + msgs = messaging.drain_sock(controls_sock) + if len(msgs): + event_name = msgs[0].controlsState.alertType.split("/")[0] + self.assertEqual(EVENT_NAME[expected_event], event_name, + f"expected {EVENT_NAME[expected_event]} for '{car_model}', got {event_name}") + break + else: + self.fail(f"failed to fingerprint {car_model}") + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/controls/tests/test_state_machine.py b/selfdrive/controls/tests/test_state_machine.py new file mode 100755 index 0000000000..a05934ca90 --- /dev/null +++ b/selfdrive/controls/tests/test_state_machine.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +import unittest +from unittest import mock + +from cereal import car, log +from openpilot.common.realtime import DT_CTRL +from openpilot.selfdrive.car.car_helpers import interfaces +from openpilot.selfdrive.controls.controlsd import Controls, SOFT_DISABLE_TIME +from openpilot.selfdrive.controls.lib.events import Events, ET, Alert, Priority, AlertSize, AlertStatus, VisualAlert, \ + AudibleAlert, EVENTS as _EVENTS + +State = log.ControlsState.OpenpilotState + +EVENTS = _EVENTS.copy() +# The event types that maintain the current state +MAINTAIN_STATES = {State.enabled: (None,), State.disabled: (None,), State.softDisabling: (ET.SOFT_DISABLE,), + State.preEnabled: (ET.PRE_ENABLE,), State.overriding: (ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)} +ALL_STATES = tuple(State.schema.enumerants.values()) +# The event types checked in DISABLED section of state machine +ENABLE_EVENT_TYPES = (ET.ENABLE, ET.PRE_ENABLE, ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL) + + +def make_event(event_types): + event = {} + for ev in event_types: + event[ev] = Alert("", "", AlertStatus.normal, AlertSize.small, Priority.LOW, + VisualAlert.none, AudibleAlert.none, 1.) + EVENTS[0] = event + return 0 + + +@mock.patch("openpilot.selfdrive.controls.lib.events.EVENTS", EVENTS) +class TestStateMachine(unittest.TestCase): + + def setUp(self): + CarInterface, CarController, CarState = interfaces["mock"] + CP = CarInterface.get_non_essential_params("mock") + CI = CarInterface(CP, CarController, CarState) + + self.controlsd = Controls(CI=CI) + self.controlsd.events = Events() + self.controlsd.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL) + self.CS = car.CarState() + + def test_immediate_disable(self): + for state in ALL_STATES: + for et in MAINTAIN_STATES[state]: + self.controlsd.events.add(make_event([et, ET.IMMEDIATE_DISABLE])) + self.controlsd.state = state + self.controlsd.state_transition(self.CS) + self.assertEqual(State.disabled, self.controlsd.state) + self.controlsd.events.clear() + + def test_user_disable(self): + for state in ALL_STATES: + for et in MAINTAIN_STATES[state]: + self.controlsd.events.add(make_event([et, ET.USER_DISABLE])) + self.controlsd.state = state + self.controlsd.state_transition(self.CS) + self.assertEqual(State.disabled, self.controlsd.state) + self.controlsd.events.clear() + + def test_soft_disable(self): + for state in ALL_STATES: + if state == State.preEnabled: # preEnabled considers NO_ENTRY instead + continue + for et in MAINTAIN_STATES[state]: + self.controlsd.events.add(make_event([et, ET.SOFT_DISABLE])) + self.controlsd.state = state + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, State.disabled if state == State.disabled else State.softDisabling) + self.controlsd.events.clear() + + def test_soft_disable_timer(self): + self.controlsd.state = State.enabled + self.controlsd.events.add(make_event([ET.SOFT_DISABLE])) + self.controlsd.state_transition(self.CS) + for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)): + self.assertEqual(self.controlsd.state, State.softDisabling) + self.controlsd.state_transition(self.CS) + + self.assertEqual(self.controlsd.state, State.disabled) + + def test_no_entry(self): + # Make sure noEntry keeps us disabled + for et in ENABLE_EVENT_TYPES: + self.controlsd.events.add(make_event([ET.NO_ENTRY, et])) + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, State.disabled) + self.controlsd.events.clear() + + def test_no_entry_pre_enable(self): + # preEnabled with noEntry event + self.controlsd.state = State.preEnabled + self.controlsd.events.add(make_event([ET.NO_ENTRY, ET.PRE_ENABLE])) + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, State.preEnabled) + + def test_maintain_states(self): + # Given current state's event type, we should maintain state + for state in ALL_STATES: + for et in MAINTAIN_STATES[state]: + self.controlsd.state = state + self.controlsd.events.add(make_event([et])) + self.controlsd.state_transition(self.CS) + self.assertEqual(self.controlsd.state, state) + self.controlsd.events.clear() + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/test/longitudinal_maneuvers/.gitignore b/selfdrive/test/longitudinal_maneuvers/.gitignore new file mode 100644 index 0000000000..d42ab353ab --- /dev/null +++ b/selfdrive/test/longitudinal_maneuvers/.gitignore @@ -0,0 +1 @@ +out/* diff --git a/selfdrive/test/longitudinal_maneuvers/__init__.py b/selfdrive/test/longitudinal_maneuvers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selfdrive/test/longitudinal_maneuvers/maneuver.py b/selfdrive/test/longitudinal_maneuvers/maneuver.py new file mode 100644 index 0000000000..000225ab77 --- /dev/null +++ b/selfdrive/test/longitudinal_maneuvers/maneuver.py @@ -0,0 +1,71 @@ +import numpy as np +from openpilot.selfdrive.test.longitudinal_maneuvers.plant import Plant + + +class Maneuver: + def __init__(self, title, duration, **kwargs): + # Was tempted to make a builder class + self.distance_lead = kwargs.get("initial_distance_lead", 200.0) + self.speed = kwargs.get("initial_speed", 0.0) + self.lead_relevancy = kwargs.get("lead_relevancy", 0) + + self.breakpoints = kwargs.get("breakpoints", [0.0, duration]) + self.speed_lead_values = kwargs.get("speed_lead_values", [0.0 for i in range(len(self.breakpoints))]) + self.prob_lead_values = kwargs.get("prob_lead_values", [1.0 for i in range(len(self.breakpoints))]) + self.cruise_values = kwargs.get("cruise_values", [50.0 for i in range(len(self.breakpoints))]) + + self.only_lead2 = kwargs.get("only_lead2", False) + self.only_radar = kwargs.get("only_radar", False) + self.ensure_start = kwargs.get("ensure_start", False) + self.enabled = kwargs.get("enabled", True) + self.e2e = kwargs.get("e2e", False) + self.force_decel = kwargs.get("force_decel", False) + + self.duration = duration + self.title = title + + def evaluate(self): + plant = Plant( + lead_relevancy=self.lead_relevancy, + speed=self.speed, + distance_lead=self.distance_lead, + enabled=self.enabled, + only_lead2=self.only_lead2, + only_radar=self.only_radar, + e2e=self.e2e, + force_decel=self.force_decel, + ) + + valid = True + logs = [] + while plant.current_time < self.duration: + speed_lead = np.interp(plant.current_time, self.breakpoints, self.speed_lead_values) + prob = np.interp(plant.current_time, self.breakpoints, self.prob_lead_values) + cruise = np.interp(plant.current_time, self.breakpoints, self.cruise_values) + log = plant.step(speed_lead, prob, cruise) + + d_rel = log['distance_lead'] - log['distance'] if self.lead_relevancy else 200. + v_rel = speed_lead - log['speed'] if self.lead_relevancy else 0. + log['d_rel'] = d_rel + log['v_rel'] = v_rel + logs.append(np.array([plant.current_time, + log['distance'], + log['distance_lead'], + log['speed'], + speed_lead, + log['acceleration']])) + + if d_rel < .4 and (self.only_radar or prob > 0.5): + print("Crashed!!!!") + valid = False + + if self.ensure_start and log['v_rel'] > 0 and log['speeds'][-1] <= 0.1: + print('LongitudinalPlanner not starting!') + valid = False + if self.force_decel and log['speed'] > 1e-1 and log['acceleration'] > -0.04: + print('Not stopping with force decel') + valid = False + + + print("maneuver end", valid) + return valid, np.array(logs) diff --git a/selfdrive/test/longitudinal_maneuvers/plant.py b/selfdrive/test/longitudinal_maneuvers/plant.py new file mode 100755 index 0000000000..bb935fdc8e --- /dev/null +++ b/selfdrive/test/longitudinal_maneuvers/plant.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +import time +import numpy as np + +from cereal import log +import cereal.messaging as messaging +from openpilot.common.realtime import Ratekeeper, DT_MDL +from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState +from openpilot.selfdrive.modeld.constants import ModelConstants +from openpilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlanner +from openpilot.selfdrive.controls.radard import _LEAD_ACCEL_TAU + + +class Plant: + messaging_initialized = False + + def __init__(self, lead_relevancy=False, speed=0.0, distance_lead=2.0, + enabled=True, only_lead2=False, only_radar=False, e2e=False, force_decel=False): + self.rate = 1. / DT_MDL + + if not Plant.messaging_initialized: + Plant.radar = messaging.pub_sock('radarState') + Plant.controls_state = messaging.pub_sock('controlsState') + Plant.car_state = messaging.pub_sock('carState') + Plant.plan = messaging.sub_sock('longitudinalPlan') + Plant.messaging_initialized = True + + self.v_lead_prev = 0.0 + + self.distance = 0. + self.speed = speed + self.acceleration = 0.0 + self.speeds = [] + + # lead car + self.lead_relevancy = lead_relevancy + self.distance_lead = distance_lead + self.enabled = enabled + self.only_lead2 = only_lead2 + self.only_radar = only_radar + self.e2e = e2e + self.force_decel = force_decel + + self.rk = Ratekeeper(self.rate, print_delay_threshold=100.0) + self.ts = 1. / self.rate + time.sleep(0.1) + self.sm = messaging.SubMaster(['longitudinalPlan']) + + from openpilot.selfdrive.car.honda.values import CAR + from openpilot.selfdrive.car.honda.interface import CarInterface + + self.planner = LongitudinalPlanner(CarInterface.get_non_essential_params(CAR.CIVIC), init_v=self.speed) + + @property + def current_time(self): + return float(self.rk.frame) / self.rate + + def step(self, v_lead=0.0, prob=1.0, v_cruise=50.): + # ******** publish a fake model going straight and fake calibration ******** + # note that this is worst case for MPC, since model will delay long mpc by one time step + radar = messaging.new_message('radarState') + control = messaging.new_message('controlsState') + car_state = messaging.new_message('carState') + model = messaging.new_message('modelV2') + a_lead = (v_lead - self.v_lead_prev)/self.ts + self.v_lead_prev = v_lead + + if self.lead_relevancy: + d_rel = np.maximum(0., self.distance_lead - self.distance) + v_rel = v_lead - self.speed + if self.only_radar: + status = True + elif prob > .5: + status = True + else: + status = False + else: + d_rel = 200. + v_rel = 0. + prob = 0.0 + status = False + + lead = log.RadarState.LeadData.new_message() + lead.dRel = float(d_rel) + lead.yRel = float(0.0) + lead.vRel = float(v_rel) + lead.aRel = float(a_lead - self.acceleration) + lead.vLead = float(v_lead) + lead.vLeadK = float(v_lead) + lead.aLeadK = float(a_lead) + # TODO use real radard logic for this + lead.aLeadTau = float(_LEAD_ACCEL_TAU) + lead.status = status + lead.modelProb = float(prob) + if not self.only_lead2: + radar.radarState.leadOne = lead + radar.radarState.leadTwo = lead + + # Simulate model predicting slightly faster speed + # this is to ensure lead policy is effective when model + # does not predict slowdown in e2e mode + position = log.XYZTData.new_message() + position.x = [float(x) for x in (self.speed + 0.5) * np.array(ModelConstants.T_IDXS)] + model.modelV2.position = position + velocity = log.XYZTData.new_message() + velocity.x = [float(x) for x in (self.speed + 0.5) * np.ones_like(ModelConstants.T_IDXS)] + model.modelV2.velocity = velocity + acceleration = log.XYZTData.new_message() + acceleration.x = [float(x) for x in np.zeros_like(ModelConstants.T_IDXS)] + model.modelV2.acceleration = acceleration + + control.controlsState.longControlState = LongCtrlState.pid if self.enabled else LongCtrlState.off + control.controlsState.vCruise = float(v_cruise * 3.6) + control.controlsState.experimentalMode = self.e2e + control.controlsState.forceDecel = self.force_decel + car_state.carState.vEgo = float(self.speed) + car_state.carState.standstill = self.speed < 0.01 + + # ******** get controlsState messages for plotting *** + sm = {'radarState': radar.radarState, + 'carState': car_state.carState, + 'controlsState': control.controlsState, + 'modelV2': model.modelV2} + self.planner.update(sm) + self.speed = self.planner.v_desired_filter.x + self.acceleration = self.planner.a_desired + self.speeds = self.planner.v_desired_trajectory.tolist() + fcw = self.planner.fcw + self.distance_lead = self.distance_lead + v_lead * self.ts + + # ******** run the car ******** + #print(self.distance, speed) + if self.speed <= 0: + self.speed = 0 + self.acceleration = 0 + self.distance = self.distance + self.speed * self.ts + + # *** radar model *** + if self.lead_relevancy: + d_rel = np.maximum(0., self.distance_lead - self.distance) + v_rel = v_lead - self.speed + else: + d_rel = 200. + v_rel = 0. + + # print at 5hz + # if (self.rk.frame % (self.rate // 5)) == 0: + # print("%2.2f sec %6.2f m %6.2f m/s %6.2f m/s2 lead_rel: %6.2f m %6.2f m/s" + # % (self.current_time, self.distance, self.speed, self.acceleration, d_rel, v_rel)) + + + # ******** update prevs ******** + self.rk.monitor_time() + + return { + "distance": self.distance, + "speed": self.speed, + "acceleration": self.acceleration, + "speeds": self.speeds, + "distance_lead": self.distance_lead, + "fcw": fcw, + } + +# simple engage in standalone mode +def plant_thread(): + plant = Plant() + while 1: + plant.step() + + +if __name__ == "__main__": + plant_thread() diff --git a/selfdrive/test/longitudinal_maneuvers/test_longitudinal.py b/selfdrive/test/longitudinal_maneuvers/test_longitudinal.py new file mode 100755 index 0000000000..713b7801f8 --- /dev/null +++ b/selfdrive/test/longitudinal_maneuvers/test_longitudinal.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +import itertools +import unittest +from parameterized import parameterized_class + +from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import STOP_DISTANCE +from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver + + +# TODO: make new FCW tests +def create_maneuvers(kwargs): + maneuvers = [ + Maneuver( + 'approach stopped car at 25m/s, initial distance: 120m', + duration=20., + initial_speed=25., + lead_relevancy=True, + initial_distance_lead=120., + speed_lead_values=[30., 0.], + breakpoints=[0., 1.], + **kwargs, + ), + Maneuver( + 'approach stopped car at 20m/s, initial distance 90m', + duration=20., + initial_speed=20., + lead_relevancy=True, + initial_distance_lead=90., + speed_lead_values=[20., 0.], + breakpoints=[0., 1.], + **kwargs, + ), + Maneuver( + 'steady state following a car at 20m/s, then lead decel to 0mph at 1m/s^2', + duration=50., + initial_speed=20., + lead_relevancy=True, + initial_distance_lead=35., + speed_lead_values=[20., 20., 0.], + breakpoints=[0., 15., 35.0], + **kwargs, + ), + Maneuver( + 'steady state following a car at 20m/s, then lead decel to 0mph at 2m/s^2', + duration=50., + initial_speed=20., + lead_relevancy=True, + initial_distance_lead=35., + speed_lead_values=[20., 20., 0.], + breakpoints=[0., 15., 25.0], + **kwargs, + ), + Maneuver( + 'steady state following a car at 20m/s, then lead decel to 0mph at 3m/s^2', + duration=50., + initial_speed=20., + lead_relevancy=True, + initial_distance_lead=35., + speed_lead_values=[20., 20., 0.], + breakpoints=[0., 15., 21.66], + **kwargs, + ), + Maneuver( + 'steady state following a car at 20m/s, then lead decel to 0mph at 3+m/s^2', + duration=40., + initial_speed=20., + lead_relevancy=True, + initial_distance_lead=35., + speed_lead_values=[20., 20., 0.], + prob_lead_values=[0., 1., 1.], + cruise_values=[20., 20., 20.], + breakpoints=[2., 2.01, 8.8], + **kwargs, + ), + Maneuver( + "approach stopped car at 20m/s, with prob_lead_values", + duration=30., + initial_speed=20., + lead_relevancy=True, + initial_distance_lead=120., + speed_lead_values=[0.0, 0., 0.], + prob_lead_values=[0.0, 0., 1.], + cruise_values=[20., 20., 20.], + breakpoints=[0.0, 2., 2.01], + **kwargs, + ), + Maneuver( + "approach slower cut-in car at 20m/s", + duration=20., + initial_speed=20., + lead_relevancy=True, + initial_distance_lead=50., + speed_lead_values=[15., 15.], + breakpoints=[1., 11.], + only_lead2=True, + **kwargs, + ), + Maneuver( + "stay stopped behind radar override lead", + duration=20., + initial_speed=0., + lead_relevancy=True, + initial_distance_lead=10., + speed_lead_values=[0., 0.], + prob_lead_values=[0., 0.], + breakpoints=[1., 11.], + only_radar=True, + **kwargs, + ), + Maneuver( + "NaN recovery", + duration=30., + initial_speed=15., + lead_relevancy=True, + initial_distance_lead=60., + speed_lead_values=[0., 0., 0.0], + breakpoints=[1., 1.01, 11.], + cruise_values=[float("nan"), 15., 15.], + **kwargs, + ), + Maneuver( + 'cruising at 25 m/s while disabled', + duration=20., + initial_speed=25., + lead_relevancy=False, + enabled=False, + **kwargs, + ), + ] + if not kwargs['force_decel']: + # controls relies on planner commanding to move for stock-ACC resume spamming + maneuvers.append(Maneuver( + "resume from a stop", + duration=20., + initial_speed=0., + lead_relevancy=True, + initial_distance_lead=STOP_DISTANCE, + speed_lead_values=[0., 0., 2.], + breakpoints=[1., 10., 15.], + ensure_start=True, + **kwargs, + )) + return maneuvers + + +@parameterized_class(("e2e", "force_decel"), itertools.product([True, False], repeat=2)) +class LongitudinalControl(unittest.TestCase): + e2e: bool + force_decel: bool + + def test_maneuver(self): + for maneuver in create_maneuvers({"e2e": self.e2e, "force_decel": self.force_decel}): + with self.subTest(title=maneuver.title, e2e=maneuver.e2e, force_decel=maneuver.force_decel): + print(maneuver.title, f'in {"e2e" if maneuver.e2e else "acc"} mode') + valid, _ = maneuver.evaluate() + self.assertTrue(valid) + + +if __name__ == "__main__": + unittest.main(failfast=True) diff --git a/system/loggerd/tests/__init__.py b/system/loggerd/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/system/loggerd/tests/loggerd_tests_common.py b/system/loggerd/tests/loggerd_tests_common.py new file mode 100644 index 0000000000..8bfb571861 --- /dev/null +++ b/system/loggerd/tests/loggerd_tests_common.py @@ -0,0 +1,106 @@ +import os +import random +import unittest +from pathlib import Path +from typing import Optional +from openpilot.system.hardware.hw import Paths + +import openpilot.system.loggerd.deleter as deleter +import openpilot.system.loggerd.uploader as uploader +from openpilot.system.loggerd.xattr_cache import setxattr + + +def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: Optional[bytes] = None) -> None: + file_path.parent.mkdir(parents=True, exist_ok=True) + + if lock: + lock_path = str(file_path) + ".lock" + os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL)) + + chunks = 128 + chunk_bytes = int(size_mb * 1024 * 1024 / chunks) + data = os.urandom(chunk_bytes) + + with open(file_path, "wb") as f: + for _ in range(chunks): + f.write(data) + + if upload_xattr is not None: + setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr) + +class MockResponse(): + def __init__(self, text, status_code): + self.text = text + self.status_code = status_code + +class MockApi(): + def __init__(self, dongle_id): + pass + + def get(self, *args, **kwargs): + return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200) + + def get_token(self): + return "fake-token" + +class MockApiIgnore(): + def __init__(self, dongle_id): + pass + + def get(self, *args, **kwargs): + return MockResponse('', 412) + + def get_token(self): + return "fake-token" + +class MockParams(): + def __init__(self): + self.params = { + "DongleId": b"0000000000000000", + "IsOffroad": b"1", + } + + def get(self, k, block=False, encoding=None): + val = self.params[k] + + if encoding is not None: + return val.decode(encoding) + else: + return val + + def get_bool(self, k): + val = self.params[k] + return (val == b'1') + +class UploaderTestCase(unittest.TestCase): + f_type = "UNKNOWN" + + root: Path + seg_num: int + seg_format: str + seg_format2: str + seg_dir: str + + def set_ignore(self): + uploader.Api = MockApiIgnore + + def setUp(self): + uploader.Api = MockApi + uploader.Params = MockParams + uploader.fake_upload = True + uploader.force_wifi = True + uploader.allow_sleep = False + self.seg_num = random.randint(1, 300) + self.seg_format = "2019-04-18--12-52-54--{}" + self.seg_format2 = "2019-05-18--11-22-33--{}" + self.seg_dir = self.seg_format.format(self.seg_num) + + def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, + upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: + file_path = Path(Paths.log_root()) / f_dir / fn + create_random_file(file_path, size_mb, lock, upload_xattr) + + if preserve_xattr is not None: + setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr) + + return file_path diff --git a/system/loggerd/tests/test_deleter.py b/system/loggerd/tests/test_deleter.py new file mode 100755 index 0000000000..e4112b7b4e --- /dev/null +++ b/system/loggerd/tests/test_deleter.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +import time +import threading +import unittest +from collections import namedtuple +from pathlib import Path +from typing import Sequence + +import openpilot.system.loggerd.deleter as deleter +from openpilot.common.timeout import Timeout, TimeoutException +from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase + +Stats = namedtuple("Stats", ['f_bavail', 'f_blocks', 'f_frsize']) + + +class TestDeleter(UploaderTestCase): + def fake_statvfs(self, d): + return self.fake_stats + + def setUp(self): + self.f_type = "fcamera.hevc" + super().setUp() + self.fake_stats = Stats(f_bavail=0, f_blocks=10, f_frsize=4096) + deleter.os.statvfs = self.fake_statvfs + + def start_thread(self): + self.end_event = threading.Event() + self.del_thread = threading.Thread(target=deleter.deleter_thread, args=[self.end_event]) + self.del_thread.daemon = True + self.del_thread.start() + + def join_thread(self): + self.end_event.set() + self.del_thread.join() + + def test_delete(self): + f_path = self.make_file_with_data(self.seg_dir, self.f_type, 1) + + self.start_thread() + + try: + with Timeout(2, "Timeout waiting for file to be deleted"): + while f_path.exists(): + time.sleep(0.01) + finally: + self.join_thread() + + def assertDeleteOrder(self, f_paths: Sequence[Path], timeout: int = 5) -> None: + deleted_order = [] + + self.start_thread() + try: + with Timeout(timeout, "Timeout waiting for files to be deleted"): + while True: + for f in f_paths: + if not f.exists() and f not in deleted_order: + deleted_order.append(f) + if len(deleted_order) == len(f_paths): + break + time.sleep(0.01) + except TimeoutException: + print("Not deleted:", [f for f in f_paths if f not in deleted_order]) + raise + finally: + self.join_thread() + + self.assertEqual(deleted_order, f_paths, "Files not deleted in expected order") + + def test_delete_order(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(1), self.f_type), + self.make_file_with_data(self.seg_format2.format(0), self.f_type), + ]) + + def test_delete_many_preserved(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(1), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE), + self.make_file_with_data(self.seg_format.format(2), self.f_type), + ] + [ + self.make_file_with_data(self.seg_format2.format(i), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE) + for i in range(5) + ]) + + def test_delete_last(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(1), self.f_type), + self.make_file_with_data(self.seg_format2.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(0), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE), + self.make_file_with_data("boot", self.seg_format[:-4]), + self.make_file_with_data("crash", self.seg_format2[:-4]), + ]) + + def test_no_delete_when_available_space(self): + f_path = self.make_file_with_data(self.seg_dir, self.f_type) + + block_size = 4096 + available = (10 * 1024 * 1024 * 1024) / block_size # 10GB free + self.fake_stats = Stats(f_bavail=available, f_blocks=10, f_frsize=block_size) + + self.start_thread() + start_time = time.monotonic() + while f_path.exists() and time.monotonic() - start_time < 2: + time.sleep(0.01) + self.join_thread() + + self.assertTrue(f_path.exists(), "File deleted with available space") + + def test_no_delete_with_lock_file(self): + f_path = self.make_file_with_data(self.seg_dir, self.f_type, lock=True) + + self.start_thread() + start_time = time.monotonic() + while f_path.exists() and time.monotonic() - start_time < 2: + time.sleep(0.01) + self.join_thread() + + self.assertTrue(f_path.exists(), "File deleted when locked") + + +if __name__ == "__main__": + unittest.main() diff --git a/system/loggerd/tests/test_encoder.py b/system/loggerd/tests/test_encoder.py new file mode 100755 index 0000000000..bd076dc5f3 --- /dev/null +++ b/system/loggerd/tests/test_encoder.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +import math +import os +import pytest +import random +import shutil +import subprocess +import time +import unittest +from pathlib import Path + +from parameterized import parameterized +from tqdm import trange + +from openpilot.common.params import Params +from openpilot.common.timeout import Timeout +from openpilot.system.hardware import TICI +from openpilot.selfdrive.manager.process_config import managed_processes +from openpilot.tools.lib.logreader import LogReader +from openpilot.system.hardware.hw import Paths + +SEGMENT_LENGTH = 2 +FULL_SIZE = 2507572 +CAMERAS = [ + ("fcamera.hevc", 20, FULL_SIZE, "roadEncodeIdx"), + ("dcamera.hevc", 20, FULL_SIZE, "driverEncodeIdx"), + ("ecamera.hevc", 20, FULL_SIZE, "wideRoadEncodeIdx"), + ("qcamera.ts", 20, 130000, None), +] + +# we check frame count, so we don't have to be too strict on size +FILE_SIZE_TOLERANCE = 0.5 + + +@pytest.mark.tici # TODO: all of loggerd should work on PC +class TestEncoder(unittest.TestCase): + + def setUp(self): + self._clear_logs() + os.environ["LOGGERD_TEST"] = "1" + os.environ["LOGGERD_SEGMENT_LENGTH"] = str(SEGMENT_LENGTH) + + def tearDown(self): + self._clear_logs() + + def _clear_logs(self): + if os.path.exists(Paths.log_root()): + shutil.rmtree(Paths.log_root()) + + def _get_latest_segment_path(self): + last_route = sorted(Path(Paths.log_root()).iterdir())[-1] + return os.path.join(Paths.log_root(), last_route) + + # TODO: this should run faster than real time + @parameterized.expand([(True, ), (False, )]) + def test_log_rotation(self, record_front): + Params().put_bool("RecordFront", record_front) + + managed_processes['sensord'].start() + managed_processes['loggerd'].start() + managed_processes['encoderd'].start() + + time.sleep(1.0) + managed_processes['camerad'].start() + + num_segments = int(os.getenv("SEGMENTS", random.randint(10, 15))) + + # wait for loggerd to make the dir for first segment + route_prefix_path = None + with Timeout(int(SEGMENT_LENGTH*3)): + while route_prefix_path is None: + try: + route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0] + except Exception: + time.sleep(0.1) + + def check_seg(i): + # check each camera file size + counts = [] + first_frames = [] + for camera, fps, size, encode_idx_name in CAMERAS: + if not record_front and "dcamera" in camera: + continue + + file_path = f"{route_prefix_path}--{i}/{camera}" + + # check file exists + self.assertTrue(os.path.exists(file_path), f"segment #{i}: '{file_path}' missing") + + # TODO: this ffprobe call is really slow + # check frame count + cmd = f"ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 {file_path}" + if TICI: + cmd = "LD_LIBRARY_PATH=/usr/local/lib " + cmd + + expected_frames = fps * SEGMENT_LENGTH + probe = subprocess.check_output(cmd, shell=True, encoding='utf8') + frame_count = int(probe.split('\n')[0].strip()) + counts.append(frame_count) + + self.assertEqual(frame_count, expected_frames, + f"segment #{i}: {camera} failed frame count check: expected {expected_frames}, got {frame_count}") + + # sanity check file size + file_size = os.path.getsize(file_path) + self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE), + f"{file_path} size {file_size} isn't close to target size {size}") + + # Check encodeIdx + if encode_idx_name is not None: + rlog_path = f"{route_prefix_path}--{i}/rlog" + msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name] + encode_msgs = [getattr(m, encode_idx_name) for m in msgs] + + valid = [m.valid for m in msgs] + segment_idxs = [m.segmentId for m in encode_msgs] + encode_idxs = [m.encodeId for m in encode_msgs] + frame_idxs = [m.frameId for m in encode_msgs] + + # Check frame count + self.assertEqual(frame_count, len(segment_idxs)) + self.assertEqual(frame_count, len(encode_idxs)) + + # Check for duplicates or skips + self.assertEqual(0, segment_idxs[0]) + self.assertEqual(len(set(segment_idxs)), len(segment_idxs)) + + self.assertTrue(all(valid)) + + self.assertEqual(expected_frames * i, encode_idxs[0]) + first_frames.append(frame_idxs[0]) + self.assertEqual(len(set(encode_idxs)), len(encode_idxs)) + + self.assertEqual(1, len(set(first_frames))) + + if TICI: + expected_frames = fps * SEGMENT_LENGTH + self.assertEqual(min(counts), expected_frames) + shutil.rmtree(f"{route_prefix_path}--{i}") + + try: + for i in trange(num_segments): + # poll for next segment + with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"): + while Path(f"{route_prefix_path}--{i+1}") not in Path(Paths.log_root()).iterdir(): + time.sleep(0.1) + check_seg(i) + finally: + managed_processes['loggerd'].stop() + managed_processes['encoderd'].stop() + managed_processes['camerad'].stop() + managed_processes['sensord'].stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/system/loggerd/tests/test_logger.cc b/system/loggerd/tests/test_logger.cc new file mode 100644 index 0000000000..2dae136e13 --- /dev/null +++ b/system/loggerd/tests/test_logger.cc @@ -0,0 +1,74 @@ +#include "catch2/catch.hpp" +#include "system/loggerd/logger.h" + +typedef cereal::Sentinel::SentinelType SentinelType; + +void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) { + const std::string segment_path = route_path + "--" + std::to_string(segment); + SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT; + SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT; + + REQUIRE(!util::file_exists(segment_path + "/rlog.lock")); + for (const char *fn : {"/rlog", "/qlog"}) { + const std::string log_file = segment_path + fn; + std::string log = util::read_file(log_file); + REQUIRE(!log.empty()); + int event_cnt = 0, i = 0; + kj::ArrayPtr words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); + while (words.size() > 0) { + try { + capnp::FlatArrayMessageReader reader(words); + auto event = reader.getRoot(); + words = kj::arrayPtr(reader.getEnd(), words.end()); + if (i == 0) { + REQUIRE(event.which() == cereal::Event::INIT_DATA); + } else if (i == 1) { + REQUIRE(event.which() == cereal::Event::SENTINEL); + REQUIRE(event.getSentinel().getType() == begin_sentinel); + REQUIRE(event.getSentinel().getSignal() == 0); + } else if (words.size() > 0) { + REQUIRE(event.which() == cereal::Event::CLOCKS); + ++event_cnt; + } else { + // the last event must be SENTINEL + REQUIRE(event.which() == cereal::Event::SENTINEL); + REQUIRE(event.getSentinel().getType() == end_sentinel); + REQUIRE(event.getSentinel().getSignal() == (end_sentinel == SentinelType::END_OF_ROUTE ? 1 : 0)); + } + ++i; + } catch (const kj::Exception &ex) { + INFO("failed parse " << i << " exception :" << ex.getDescription()); + REQUIRE(0); + break; + } + } + REQUIRE(event_cnt == required_event_cnt); + } +} + +void write_msg(LoggerState *logger) { + MessageBuilder msg; + msg.initEvent().initClocks(); + logger->write(msg.toBytes(), true); +} + +TEST_CASE("logger") { + const int segment_cnt = 100; + const std::string log_root = "/tmp/test_logger"; + system(("rm " + log_root + " -rf").c_str()); + std::string route_name; + { + LoggerState logger(log_root); + route_name = logger.routeName(); + for (int i = 0; i < segment_cnt; ++i) { + REQUIRE(logger.next()); + REQUIRE(util::file_exists(logger.segmentPath() + "/rlog.lock")); + REQUIRE(logger.segment() == i); + write_msg(&logger); + } + logger.setExitSignal(1); + } + for (int i = 0; i < segment_cnt; ++i) { + verify_segment(log_root + "/" + route_name, i, segment_cnt, 1); + } +} diff --git a/system/loggerd/tests/test_loggerd.py b/system/loggerd/tests/test_loggerd.py new file mode 100755 index 0000000000..9d7d3fa7bd --- /dev/null +++ b/system/loggerd/tests/test_loggerd.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 +import numpy as np +import os +import random +import string +import subprocess +import time +import unittest +from collections import defaultdict +from pathlib import Path +from typing import Dict, List + +import cereal.messaging as messaging +from cereal import log +from cereal.services import SERVICE_LIST +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.common.timeout import Timeout +from openpilot.system.hardware.hw import Paths +from openpilot.system.loggerd.xattr_cache import getxattr +from openpilot.system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE +from openpilot.selfdrive.manager.process_config import managed_processes +from openpilot.system.version import get_version +from openpilot.tools.lib.logreader import LogReader +from cereal.visionipc import VisionIpcServer, VisionStreamType +from openpilot.common.transformations.camera import tici_f_frame_size, tici_d_frame_size, tici_e_frame_size + +SentinelType = log.Sentinel.SentinelType + +CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in SERVICE_LIST + and SERVICE_LIST[f].should_log and "encode" not in f.lower()] + + +class TestLoggerd(unittest.TestCase): + def setUp(self): + os.environ.pop("LOG_ROOT", None) + + def _get_latest_log_dir(self): + log_dirs = sorted(Path(Paths.log_root()).iterdir(), key=lambda f: f.stat().st_mtime) + return log_dirs[-1] + + def _get_log_dir(self, x): + for l in x.splitlines(): + for p in l.split(' '): + path = Path(p.strip()) + if path.is_dir(): + return path + return None + + def _get_log_fn(self, x): + for l in x.splitlines(): + for p in l.split(' '): + path = Path(p.strip()) + if path.is_file(): + return path + return None + + def _gen_bootlog(self): + with Timeout(5): + out = subprocess.check_output("./bootlog", cwd=os.path.join(BASEDIR, "system/loggerd"), encoding='utf-8') + + log_fn = self._get_log_fn(out) + + # check existence + assert log_fn is not None + + return log_fn + + def _check_init_data(self, msgs): + msg = msgs[0] + self.assertEqual(msg.which(), 'initData') + + def _check_sentinel(self, msgs, route): + start_type = SentinelType.startOfRoute if route else SentinelType.startOfSegment + self.assertTrue(msgs[1].sentinel.type == start_type) + + end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment + self.assertTrue(msgs[-1].sentinel.type == end_type) + + def _publish_random_messages(self, services: List[str]) -> Dict[str, list]: + pm = messaging.PubMaster(services) + + managed_processes["loggerd"].start() + for s in services: + self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5)) + + sent_msgs = defaultdict(list) + for _ in range(random.randint(2, 10) * 100): + for s in services: + try: + m = messaging.new_message(s) + except Exception: + m = messaging.new_message(s, random.randint(2, 10)) + pm.send(s, m) + sent_msgs[s].append(m) + time.sleep(0.01) + + for s in services: + self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5)) + managed_processes["loggerd"].stop() + + return sent_msgs + + def test_init_data_values(self): + os.environ["CLEAN"] = random.choice(["0", "1"]) + + dongle = ''.join(random.choice(string.printable) for n in range(random.randint(1, 100))) + fake_params = [ + # param, initData field, value + ("DongleId", "dongleId", dongle), + ("GitCommit", "gitCommit", "commit"), + ("GitBranch", "gitBranch", "branch"), + ("GitRemote", "gitRemote", "remote"), + ] + params = Params() + params.clear_all() + for k, _, v in fake_params: + params.put(k, v) + params.put("LaikadEphemerisV3", "abc") + + lr = list(LogReader(str(self._gen_bootlog()))) + initData = lr[0].initData + + self.assertTrue(initData.dirty != bool(os.environ["CLEAN"])) + self.assertEqual(initData.version, get_version()) + + if os.path.isfile("/proc/cmdline"): + with open("/proc/cmdline") as f: + self.assertEqual(list(initData.kernelArgs), f.read().strip().split(" ")) + + with open("/proc/version") as f: + self.assertEqual(initData.kernelVersion, f.read()) + + # check params + logged_params = {entry.key: entry.value for entry in initData.params.entries} + expected_params = {k for k, _, __ in fake_params} | {'LaikadEphemerisV3'} + assert set(logged_params.keys()) == expected_params, set(logged_params.keys()) ^ expected_params + assert logged_params['LaikadEphemerisV3'] == b'', f"DONT_LOG param value was logged: {repr(logged_params['LaikadEphemerisV3'])}" + for param_key, initData_key, v in fake_params: + self.assertEqual(getattr(initData, initData_key), v) + self.assertEqual(logged_params[param_key].decode(), v) + + params.put("LaikadEphemerisV3", "") + + def test_rotation(self): + os.environ["LOGGERD_TEST"] = "1" + Params().put("RecordFront", "1") + + expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"} + streams = [(VisionStreamType.VISION_STREAM_ROAD, (*tici_f_frame_size, 2048*2346, 2048, 2048*1216), "roadCameraState"), + (VisionStreamType.VISION_STREAM_DRIVER, (*tici_d_frame_size, 2048*2346, 2048, 2048*1216), "driverCameraState"), + (VisionStreamType.VISION_STREAM_WIDE_ROAD, (*tici_e_frame_size, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")] + + pm = messaging.PubMaster(["roadCameraState", "driverCameraState", "wideRoadCameraState"]) + vipc_server = VisionIpcServer("camerad") + for stream_type, frame_spec, _ in streams: + vipc_server.create_buffers_with_sizes(stream_type, 40, False, *(frame_spec)) + vipc_server.start_listener() + + num_segs = random.randint(2, 5) + length = random.randint(1, 3) + os.environ["LOGGERD_SEGMENT_LENGTH"] = str(length) + managed_processes["loggerd"].start() + managed_processes["encoderd"].start() + time.sleep(1) + + fps = 20.0 + for n in range(1, int(num_segs*length*fps)+1): + time_start = time.monotonic() + for stream_type, frame_spec, state in streams: + dat = np.empty(frame_spec[2], dtype=np.uint8) + vipc_server.send(stream_type, dat[:].flatten().tobytes(), n, n/fps, n/fps) + + camera_state = messaging.new_message(state) + frame = getattr(camera_state, state) + frame.frameId = n + pm.send(state, camera_state) + time.sleep(max((1.0/fps) - (time.monotonic() - time_start), 0)) + + managed_processes["loggerd"].stop() + managed_processes["encoderd"].stop() + + route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0] + for n in range(num_segs): + p = Path(f"{route_path}--{n}") + logged = {f.name for f in p.iterdir() if f.is_file()} + diff = logged ^ expected_files + self.assertEqual(len(diff), 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}") + + def test_bootlog(self): + # generate bootlog with fake launch log + launch_log = ''.join(str(random.choice(string.printable)) for _ in range(100)) + with open("/tmp/launch_log", "w") as f: + f.write(launch_log) + + bootlog_path = self._gen_bootlog() + lr = list(LogReader(str(bootlog_path))) + + # check length + assert len(lr) == 2 # boot + initData + + self._check_init_data(lr) + + # check msgs + bootlog_msgs = [m for m in lr if m.which() == 'boot'] + assert len(bootlog_msgs) == 1 + + # sanity check values + boot = bootlog_msgs.pop().boot + assert abs(boot.wallTimeNanos - time.time_ns()) < 5*1e9 # within 5s + assert boot.launchLog == launch_log + + for fn in ["console-ramoops", "pmsg-ramoops-0"]: + path = Path(os.path.join("/sys/fs/pstore/", fn)) + if path.is_file(): + with open(path, "rb") as f: + expected_val = f.read() + bootlog_val = [e.value for e in boot.pstore.entries if e.key == fn][0] + self.assertEqual(expected_val, bootlog_val) + + def test_qlog(self): + qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is not None] + no_qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is None] + + services = random.sample(qlog_services, random.randint(2, min(10, len(qlog_services)))) + \ + random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services)))) + sent_msgs = self._publish_random_messages(services) + + qlog_path = os.path.join(self._get_latest_log_dir(), "qlog") + lr = list(LogReader(qlog_path)) + + # check initData and sentinel + self._check_init_data(lr) + self._check_sentinel(lr, True) + + recv_msgs = defaultdict(list) + for m in lr: + recv_msgs[m.which()].append(m) + + for s, msgs in sent_msgs.items(): + recv_cnt = len(recv_msgs[s]) + + if s in no_qlog_services: + # check services with no specific decimation aren't in qlog + self.assertEqual(recv_cnt, 0, f"got {recv_cnt} {s} msgs in qlog") + else: + # check logged message count matches decimation + expected_cnt = (len(msgs) - 1) // SERVICE_LIST[s].decimation + 1 + self.assertEqual(recv_cnt, expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}") + + def test_rlog(self): + services = random.sample(CEREAL_SERVICES, random.randint(5, 10)) + sent_msgs = self._publish_random_messages(services) + + lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog"))) + + # check initData and sentinel + self._check_init_data(lr) + self._check_sentinel(lr, True) + + # check all messages were logged and in order + lr = lr[2:-1] # slice off initData and both sentinels + for m in lr: + sent = sent_msgs[m.which()].pop(0) + sent.clear_write_flag() + self.assertEqual(sent.to_bytes(), m.as_builder().to_bytes()) + + def test_preserving_flagged_segments(self): + services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) | {"userFlag"} + self._publish_random_messages(services) + + segment_dir = self._get_latest_log_dir() + self.assertEqual(getxattr(segment_dir, PRESERVE_ATTR_NAME), PRESERVE_ATTR_VALUE) + + def test_not_preserving_unflagged_segments(self): + services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) - {"userFlag"} + self._publish_random_messages(services) + + segment_dir = self._get_latest_log_dir() + self.assertIsNone(getxattr(segment_dir, PRESERVE_ATTR_NAME)) + + +if __name__ == "__main__": + unittest.main() diff --git a/system/loggerd/tests/test_runner.cc b/system/loggerd/tests/test_runner.cc new file mode 100644 index 0000000000..62bf7476a1 --- /dev/null +++ b/system/loggerd/tests/test_runner.cc @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch2/catch.hpp" diff --git a/system/loggerd/tests/test_uploader.py b/system/loggerd/tests/test_uploader.py new file mode 100755 index 0000000000..538d99f66f --- /dev/null +++ b/system/loggerd/tests/test_uploader.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +import os +import time +import threading +import unittest +import logging +import json +from pathlib import Path +from typing import List, Optional +from openpilot.system.hardware.hw import Paths + +from openpilot.common.swaglog import cloudlog +from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE + +from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase + + +class FakeLogHandler(logging.Handler): + def __init__(self): + logging.Handler.__init__(self) + self.reset() + + def reset(self): + self.upload_order = list() + self.upload_ignored = list() + + def emit(self, record): + try: + j = json.loads(record.getMessage()) + if j["event"] == "upload_success": + self.upload_order.append(j["key"]) + if j["event"] == "upload_ignored": + self.upload_ignored.append(j["key"]) + except Exception: + pass + +log_handler = FakeLogHandler() +cloudlog.addHandler(log_handler) + + +class TestUploader(UploaderTestCase): + def setUp(self): + super().setUp() + log_handler.reset() + + def start_thread(self): + self.end_event = threading.Event() + self.up_thread = threading.Thread(target=uploader_fn, args=[self.end_event]) + self.up_thread.daemon = True + self.up_thread.start() + + def join_thread(self): + self.end_event.set() + self.up_thread.join() + + def gen_files(self, lock=False, xattr: Optional[bytes] = None, boot=True) -> List[Path]: + f_paths = [] + for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]: + f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr)) + + if boot: + f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, upload_xattr=xattr)) + return f_paths + + def gen_order(self, seg1: List[int], seg2: List[int], boot=True) -> List[str]: + keys = [] + if boot: + keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1] + keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2] + keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1] + keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2] + return keys + + def test_upload(self): + self.gen_files(lock=False) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + exp_order = self.gen_order([self.seg_num], []) + + self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored") + self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") + + self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") + + def test_upload_with_wrong_xattr(self): + self.gen_files(lock=False, xattr=b'0') + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + exp_order = self.gen_order([self.seg_num], []) + + self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored") + self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") + + self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") + + def test_upload_ignored(self): + self.set_ignore() + self.gen_files(lock=False) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + exp_order = self.gen_order([self.seg_num], []) + + self.assertTrue(len(log_handler.upload_order) == 0, "Some files were not ignored") + self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore") + self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not ignored") + + self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order") + + def test_upload_files_in_create_order(self): + seg1_nums = [0, 1, 2, 10, 20] + for i in seg1_nums: + self.seg_dir = self.seg_format.format(i) + self.gen_files(boot=False) + seg2_nums = [5, 50, 51] + for i in seg2_nums: + self.seg_dir = self.seg_format2.format(i) + self.gen_files(boot=False) + + exp_order = self.gen_order(seg1_nums, seg2_nums, boot=False) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored") + self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") + self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") + for f_path in exp_order: + self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") + + self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") + + def test_no_upload_with_lock_file(self): + self.start_thread() + + time.sleep(0.25) + f_paths = self.gen_files(lock=True, boot=False) + + # allow enough time that files should have been uploaded if they would be uploaded + time.sleep(5) + self.join_thread() + + for f_path in f_paths: + fn = f_path.with_suffix(f_path.suffix.replace(".bz2", "")) + uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE + self.assertFalse(uploaded, "File upload when locked") + + def test_no_upload_with_xattr(self): + self.gen_files(lock=False, xattr=UPLOAD_ATTR_VALUE) + + self.start_thread() + # allow enough time that files could upload twice if there is a bug in the logic + time.sleep(5) + self.join_thread() + + self.assertEqual(len(log_handler.upload_order), 0, "File uploaded again") + + def test_clear_locks_on_startup(self): + f_paths = self.gen_files(lock=True, boot=False) + self.start_thread() + time.sleep(1) + self.join_thread() + + for f_path in f_paths: + lock_path = f_path.with_suffix(f_path.suffix + ".lock") + self.assertFalse(lock_path.is_file(), "File lock not cleared on startup") + + +if __name__ == "__main__": + unittest.main() diff --git a/system/proclogd/tests/.gitignore b/system/proclogd/tests/.gitignore new file mode 100644 index 0000000000..5230b1598d --- /dev/null +++ b/system/proclogd/tests/.gitignore @@ -0,0 +1 @@ +test_proclog diff --git a/system/proclogd/tests/test_proclog.cc b/system/proclogd/tests/test_proclog.cc new file mode 100644 index 0000000000..20298cd3d8 --- /dev/null +++ b/system/proclogd/tests/test_proclog.cc @@ -0,0 +1,155 @@ +#define CATCH_CONFIG_MAIN +#include "catch2/catch.hpp" +#include "common/util.h" +#include "system/proclogd/proclog.h" + +const std::string allowed_states = "RSDTZtWXxKWPI"; + +TEST_CASE("Parser::procStat") { + SECTION("from string") { + const std::string stat_str = + "33012 (code )) S 32978 6620 6620 0 -1 4194368 2042377 0 144 0 24510 11627 0 " + "0 20 0 39 0 53077 830029824 62214 18446744073709551615 94257242783744 94257366235808 " + "140735738643248 0 0 0 0 4098 1073808632 0 0 0 17 2 0 0 2 0 0 94257370858656 94257371248232 " + "94257404952576 140735738648768 140735738648823 140735738648823 140735738650595 0"; + auto stat = Parser::procStat(stat_str); + REQUIRE(stat); + REQUIRE(stat->pid == 33012); + REQUIRE(stat->name == "code )"); + REQUIRE(stat->state == 'S'); + REQUIRE(stat->ppid == 32978); + REQUIRE(stat->utime == 24510); + REQUIRE(stat->stime == 11627); + REQUIRE(stat->cutime == 0); + REQUIRE(stat->cstime == 0); + REQUIRE(stat->priority == 20); + REQUIRE(stat->nice == 0); + REQUIRE(stat->num_threads == 39); + REQUIRE(stat->starttime == 53077); + REQUIRE(stat->vms == 830029824); + REQUIRE(stat->rss == 62214); + REQUIRE(stat->processor == 2); + } + SECTION("all processes") { + std::vector pids = Parser::pids(); + REQUIRE(pids.size() > 1); + for (int pid : pids) { + std::string stat_path = "/proc/" + std::to_string(pid) + "/stat"; + INFO(stat_path); + if (auto stat = Parser::procStat(util::read_file(stat_path))) { + REQUIRE(stat->pid == pid); + REQUIRE(allowed_states.find(stat->state) != std::string::npos); + } else { + REQUIRE(util::file_exists(stat_path) == false); + } + } + } +} + +TEST_CASE("Parser::cpuTimes") { + SECTION("from string") { + std::string stat = + "cpu 0 0 0 0 0 0 0 0 0 0\n" + "cpu0 1 2 3 4 5 6 7 8 9 10\n" + "cpu1 1 2 3 4 5 6 7 8 9 10\n"; + std::istringstream stream(stat); + auto stats = Parser::cpuTimes(stream); + REQUIRE(stats.size() == 2); + for (int i = 0; i < stats.size(); ++i) { + REQUIRE(stats[i].id == i); + REQUIRE(stats[i].utime == 1); + REQUIRE(stats[i].ntime ==2); + REQUIRE(stats[i].stime == 3); + REQUIRE(stats[i].itime == 4); + REQUIRE(stats[i].iowtime == 5); + REQUIRE(stats[i].irqtime == 6); + REQUIRE(stats[i].sirqtime == 7); + } + } + SECTION("all cpus") { + std::istringstream stream(util::read_file("/proc/stat")); + auto stats = Parser::cpuTimes(stream); + REQUIRE(stats.size() == sysconf(_SC_NPROCESSORS_ONLN)); + for (int i = 0; i < stats.size(); ++i) { + REQUIRE(stats[i].id == i); + } + } +} + +TEST_CASE("Parser::memInfo") { + SECTION("from string") { + std::istringstream stream("MemTotal: 1024 kb\nMemFree: 2048 kb\n"); + auto meminfo = Parser::memInfo(stream); + REQUIRE(meminfo["MemTotal:"] == 1024 * 1024); + REQUIRE(meminfo["MemFree:"] == 2048 * 1024); + } + SECTION("from /proc/meminfo") { + std::string require_keys[] = {"MemTotal:", "MemFree:", "MemAvailable:", "Buffers:", "Cached:", "Active:", "Inactive:", "Shmem:"}; + std::istringstream stream(util::read_file("/proc/meminfo")); + auto meminfo = Parser::memInfo(stream); + for (auto &key : require_keys) { + REQUIRE(meminfo.find(key) != meminfo.end()); + REQUIRE(meminfo[key] > 0); + } + } +} + +void test_cmdline(std::string cmdline, const std::vector requires) { + std::stringstream ss; + ss.write(&cmdline[0], cmdline.size()); + auto cmds = Parser::cmdline(ss); + REQUIRE(cmds.size() == requires.size()); + for (int i = 0; i < requires.size(); ++i) { + REQUIRE(cmds[i] == requires[i]); + } +} +TEST_CASE("Parser::cmdline") { + test_cmdline(std::string("a\0b\0c\0", 7), {"a", "b", "c"}); + test_cmdline(std::string("a\0\0c\0", 6), {"a", "c"}); + test_cmdline(std::string("a\0b\0c\0\0\0", 9), {"a", "b", "c"}); +} + +TEST_CASE("buildProcLogerMessage") { + MessageBuilder msg; + buildProcLogMessage(msg); + + kj::Array buf = capnp::messageToFlatArray(msg); + capnp::FlatArrayMessageReader reader(buf); + auto log = reader.getRoot().getProcLog(); + REQUIRE(log.totalSize().wordCount > 0); + + // test cereal::ProcLog::CPUTimes + auto cpu_times = log.getCpuTimes(); + REQUIRE(cpu_times.size() == sysconf(_SC_NPROCESSORS_ONLN)); + REQUIRE(cpu_times[cpu_times.size() - 1].getCpuNum() == cpu_times.size() - 1); + + // test cereal::ProcLog::Mem + auto mem = log.getMem(); + REQUIRE(mem.getTotal() > 0); + REQUIRE(mem.getShared() > 0); + + // test cereal::ProcLog::Process + auto procs = log.getProcs(); + for (auto p : procs) { + REQUIRE(allowed_states.find(p.getState()) != std::string::npos); + if (p.getPid() == ::getpid()) { + REQUIRE(p.getName() == "test_proclog"); + REQUIRE(p.getState() == 'R'); + REQUIRE_THAT(p.getExe().cStr(), Catch::Matchers::Contains("test_proclog")); + REQUIRE_THAT(p.getCmdline()[0], Catch::Matchers::Contains("test_proclog")); + } else { + std::string cmd_path = "/proc/" + std::to_string(p.getPid()) + "/cmdline"; + if (util::file_exists(cmd_path)) { + std::ifstream stream(cmd_path); + auto cmdline = Parser::cmdline(stream); + REQUIRE(cmdline.size() == p.getCmdline().size()); + // do not check the cmdline of pytest as it will change. + if (cmdline.size() > 0 && cmdline[0].find("[pytest") != 0) { + for (int i = 0; i < p.getCmdline().size(); ++i) { + REQUIRE(cmdline[i] == p.getCmdline()[i].cStr()); + } + } + } + } + } +} diff --git a/tools/replay/tests/test_replay.cc b/tools/replay/tests/test_replay.cc new file mode 100644 index 0000000000..1873daaf4b --- /dev/null +++ b/tools/replay/tests/test_replay.cc @@ -0,0 +1,220 @@ +#include +#include + +#include +#include + +#include "catch2/catch.hpp" +#include "common/util.h" +#include "tools/replay/replay.h" +#include "tools/replay/util.h" + +const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2"; +const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3"; + +bool download_to_file(const std::string &url, const std::string &local_file, int chunk_size = 5 * 1024 * 1024, int retries = 3) { + do { + if (httpDownload(url, local_file, chunk_size)) { + return true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } while (--retries >= 0); + return false; +} + +TEST_CASE("httpMultiPartDownload") { + char filename[] = "/tmp/XXXXXX"; + close(mkstemp(filename)); + + const size_t chunk_size = 5 * 1024 * 1024; + std::string content; + SECTION("download to file") { + REQUIRE(download_to_file(TEST_RLOG_URL, filename, chunk_size)); + content = util::read_file(filename); + } + SECTION("download to buffer") { + for (int i = 0; i < 3 && content.empty(); ++i) { + content = httpGet(TEST_RLOG_URL, chunk_size); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + REQUIRE(!content.empty()); + } + REQUIRE(content.size() == 9112651); + REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM); +} + +TEST_CASE("FileReader") { + auto enable_local_cache = GENERATE(true, false); + std::string cache_file = cacheFilePath(TEST_RLOG_URL); + system(("rm " + cache_file + " -f").c_str()); + + FileReader reader(enable_local_cache); + std::string content = reader.read(TEST_RLOG_URL); + REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM); + if (enable_local_cache) { + REQUIRE(sha256(util::read_file(cache_file)) == TEST_RLOG_CHECKSUM); + } else { + REQUIRE(util::file_exists(cache_file) == false); + } +} + +TEST_CASE("LogReader") { + SECTION("corrupt log") { + FileReader reader(true); + std::string corrupt_content = reader.read(TEST_RLOG_URL); + corrupt_content.resize(corrupt_content.length() / 2); + corrupt_content = decompressBZ2(corrupt_content); + LogReader log; + REQUIRE(log.load((std::byte *)corrupt_content.data(), corrupt_content.size())); + REQUIRE(log.events.size() > 0); + } +} + +void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) { + QEventLoop loop; + Segment segment(n, segment_file, flags); + QObject::connect(&segment, &Segment::loadFinished, [&]() { + REQUIRE(segment.isLoaded() == true); + REQUIRE(segment.log != nullptr); + REQUIRE(segment.frames[RoadCam] != nullptr); + if (flags & REPLAY_FLAG_DCAM) { + REQUIRE(segment.frames[DriverCam] != nullptr); + } + if (flags & REPLAY_FLAG_ECAM) { + REQUIRE(segment.frames[WideRoadCam] != nullptr); + } + + // test LogReader & FrameReader + REQUIRE(segment.log->events.size() > 0); + REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan())); + + for (auto cam : ALL_CAMERAS) { + auto &fr = segment.frames[cam]; + if (!fr) continue; + + if (cam == RoadCam || cam == WideRoadCam) { + REQUIRE(fr->getFrameCount() == 1200); + } + auto [nv12_width, nv12_height, nv12_buffer_size] = get_nv12_info(fr->width, fr->height); + VisionBuf buf; + buf.allocate(nv12_buffer_size); + buf.init_yuv(fr->width, fr->height, nv12_width, nv12_width * nv12_height); + // sequence get 100 frames + for (int i = 0; i < 100; ++i) { + REQUIRE(fr->get(i, &buf)); + } + } + + loop.quit(); + }); + loop.exec(); +} + +std::string download_demo_route() { + static std::string data_dir; + + if (data_dir == "") { + char tmp_path[] = "/tmp/root_XXXXXX"; + data_dir = mkdtemp(tmp_path); + + Route remote_route(DEMO_ROUTE); + assert(remote_route.load()); + + // Create a local route from remote for testing + const std::string route_name = DEMO_ROUTE.mid(17).toStdString(); + for (int i = 0; i < 2; ++i) { + std::string log_path = util::string_format("%s/%s--%d/", data_dir.c_str(), route_name.c_str(), i); + util::create_directories(log_path, 0755); + REQUIRE(download_to_file(remote_route.at(i).rlog.toStdString(), log_path + "rlog.bz2")); + REQUIRE(download_to_file(remote_route.at(i).driver_cam.toStdString(), log_path + "dcamera.hevc")); + REQUIRE(download_to_file(remote_route.at(i).wide_road_cam.toStdString(), log_path + "ecamera.hevc")); + REQUIRE(download_to_file(remote_route.at(i).qcamera.toStdString(), log_path + "qcamera.ts")); + } + } + + return data_dir; +} + + +TEST_CASE("Local route") { + std::string data_dir = download_demo_route(); + + auto flags = GENERATE(REPLAY_FLAG_DCAM | REPLAY_FLAG_ECAM, REPLAY_FLAG_QCAMERA); + Route route(DEMO_ROUTE, QString::fromStdString(data_dir)); + REQUIRE(route.load()); + REQUIRE(route.segments().size() == 2); + for (int i = 0; i < route.segments().size(); ++i) { + read_segment(i, route.at(i), flags); + } +} + +TEST_CASE("Remote route") { + auto flags = GENERATE(REPLAY_FLAG_DCAM | REPLAY_FLAG_ECAM, REPLAY_FLAG_QCAMERA); + Route route(DEMO_ROUTE); + REQUIRE(route.load()); + REQUIRE(route.segments().size() == 13); + for (int i = 0; i < 2; ++i) { + read_segment(i, route.at(i), flags); + } +} + +// helper class for unit tests +class TestReplay : public Replay { + public: + TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {} + void test_seek(); + void testSeekTo(int seek_to); +}; + +void TestReplay::testSeekTo(int seek_to) { + seekTo(seek_to, false); + + while (true) { + std::unique_lock lk(stream_lock_); + stream_cv_.wait(lk, [=]() { return events_updated_ == true; }); + events_updated_ = false; + if (cur_mono_time_ != route_start_ts_ + seek_to * 1e9) { + // wake up by the previous merging, skip it. + continue; + } + + Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_); + auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); + if (eit == events_->end()) { + qDebug() << "waiting for events..."; + continue; + } + + REQUIRE(std::is_sorted(events_->begin(), events_->end(), Event::lessThan())); + const int seek_to_segment = seek_to / 60; + const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9; + current_segment_ = event_seconds / 60; + INFO("seek to [" << seek_to << "s segment " << seek_to_segment << "], events [" << event_seconds << "s segment" << current_segment_ << "]"); + REQUIRE(event_seconds >= seek_to); + if (event_seconds > seek_to) { + auto it = segments_.lower_bound(seek_to_segment); + REQUIRE(it->first == current_segment_); + } + break; + } +} + +void TestReplay::test_seek() { + // create a dummy stream thread + stream_thread_ = new QThread(this); + QEventLoop loop; + std::thread thread = std::thread([&]() { + for (int i = 0; i < 10; ++i) { + testSeekTo(util::random_int(0, 2 * 60)); + } + loop.quit(); + }); + loop.exec(); + thread.join(); +} + +TEST_CASE("Replay") { + TestReplay replay(DEMO_ROUTE); + REQUIRE(replay.load()); + replay.test_seek(); +} diff --git a/tools/replay/tests/test_runner.cc b/tools/replay/tests/test_runner.cc new file mode 100644 index 0000000000..b20ac86c64 --- /dev/null +++ b/tools/replay/tests/test_runner.cc @@ -0,0 +1,10 @@ +#define CATCH_CONFIG_RUNNER +#include "catch2/catch.hpp" +#include + +int main(int argc, char **argv) { + // unit tests for Qt + QCoreApplication app(argc, argv); + const int res = Catch::Session().run(argc, argv); + return (res < 0xff ? res : 0xff); +}