pull/30653/head
Shane Smiskol 1 year ago
parent 921748eedd
commit b548e3fcd4
  1. 272
      selfdrive/car/tests/test_fw_fingerprint.py
  2. 0
      selfdrive/controls/tests/__init__.py
  3. 135
      selfdrive/controls/tests/test_alerts.py
  4. 158
      selfdrive/controls/tests/test_cruise_speed.py
  5. 47
      selfdrive/controls/tests/test_following_distance.py
  6. 89
      selfdrive/controls/tests/test_lateral_mpc.py
  7. 36
      selfdrive/controls/tests/test_leads.py
  8. 129
      selfdrive/controls/tests/test_startup.py
  9. 111
      selfdrive/controls/tests/test_state_machine.py
  10. 1
      selfdrive/test/longitudinal_maneuvers/.gitignore
  11. 0
      selfdrive/test/longitudinal_maneuvers/__init__.py
  12. 71
      selfdrive/test/longitudinal_maneuvers/maneuver.py
  13. 172
      selfdrive/test/longitudinal_maneuvers/plant.py
  14. 160
      selfdrive/test/longitudinal_maneuvers/test_longitudinal.py
  15. 0
      system/loggerd/tests/__init__.py
  16. 106
      system/loggerd/tests/loggerd_tests_common.py
  17. 123
      system/loggerd/tests/test_deleter.py
  18. 156
      system/loggerd/tests/test_encoder.py
  19. 74
      system/loggerd/tests/test_logger.cc
  20. 284
      system/loggerd/tests/test_loggerd.py
  21. 2
      system/loggerd/tests/test_runner.cc
  22. 191
      system/loggerd/tests/test_uploader.py
  23. 1
      system/proclogd/tests/.gitignore
  24. 155
      system/proclogd/tests/test_proclog.cc
  25. 220
      tools/replay/tests/test_replay.cc
  26. 10
      tools/replay/tests/test_runner.cc

@ -1,272 +0,0 @@
#!/usr/bin/env python3
import pytest
import random
import time
import unittest
from collections import defaultdict
from parameterized import parameterized
import threading
from cereal import car
from openpilot.common.params import Params
from openpilot.selfdrive.car.car_helpers import interfaces
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
from openpilot.selfdrive.car.fw_versions import FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \
match_fw_to_car, get_fw_versions, get_present_ecus
from openpilot.selfdrive.car.vin import get_vin
CarFw = car.CarParams.CarFw
Ecu = car.CarParams.Ecu
ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()}
class FakeSocket:
def receive(self, non_blocking=False):
pass
def send(self, msg):
pass
class TestFwFingerprint(unittest.TestCase):
def assertFingerprints(self, candidates, expected):
candidates = list(candidates)
self.assertEqual(len(candidates), 1, f"got more than one candidate: {candidates}")
self.assertEqual(candidates[0], expected)
@parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e])
def test_exact_match(self, brand, car_model, ecus):
CP = car.CarParams.new_message()
for _ in range(200):
fw = []
for ecu, fw_versions in ecus.items():
ecu_name, addr, sub_addr = ecu
fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand,
"address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
CP.carFw = fw
_, matches = match_fw_to_car(CP.carFw, allow_fuzzy=False)
self.assertFingerprints(matches, car_model)
@parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e])
def test_custom_fuzzy_match(self, brand, car_model, ecus):
# Assert brand-specific fuzzy fingerprinting function doesn't disagree with standard fuzzy function
config = FW_QUERY_CONFIGS[brand]
if config.match_fw_to_car_fuzzy is None:
raise unittest.SkipTest("Brand does not implement custom fuzzy fingerprinting function")
CP = car.CarParams.new_message()
for _ in range(5):
fw = []
for ecu, fw_versions in ecus.items():
ecu_name, addr, sub_addr = ecu
fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand,
"address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
CP.carFw = fw
_, matches = match_fw_to_car(CP.carFw, allow_exact=False, log=False)
brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw))
# If both have matches, they must agree
if len(matches) == 1 and len(brand_matches) == 1:
self.assertEqual(matches, brand_matches)
@parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e])
def test_fuzzy_match_ecu_count(self, brand, car_model, ecus):
# Asserts that fuzzy matching does not count matching FW, but ECU address keys
valid_ecus = [e for e in ecus if e[0] not in FUZZY_EXCLUDE_ECUS]
if not len(valid_ecus):
raise unittest.SkipTest("Car model has no compatible ECUs for fuzzy matching")
fw = []
for ecu in valid_ecus:
ecu_name, addr, sub_addr = ecu
for _ in range(5):
# Add multiple FW versions to simulate ECU returning to multiple queries in a brand
fw.append({"ecu": ecu_name, "fwVersion": random.choice(ecus[ecu]), 'brand': brand,
"address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
CP = car.CarParams.new_message(carFw=fw)
_, matches = match_fw_to_car(CP.carFw, allow_exact=False, log=False)
# Assert no match if there are not enough unique ECUs
unique_ecus = {(f['address'], f['subAddress']) for f in fw}
if len(unique_ecus) < 2:
self.assertEqual(len(matches), 0, car_model)
# There won't always be a match due to shared FW, but if there is it should be correct
elif len(matches):
self.assertFingerprints(matches, car_model)
def test_fw_version_lists(self):
for car_model, ecus in FW_VERSIONS.items():
with self.subTest(car_model=car_model.value):
for ecu, ecu_fw in ecus.items():
with self.subTest(ecu):
duplicates = {fw for fw in ecu_fw if ecu_fw.count(fw) > 1}
self.assertFalse(len(duplicates), f'{car_model}: Duplicate FW versions: Ecu.{ECU_NAME[ecu[0]]}, {duplicates}')
self.assertGreater(len(ecu_fw), 0, f'{car_model}: No FW versions: Ecu.{ECU_NAME[ecu[0]]}')
def test_all_addrs_map_to_one_ecu(self):
for brand, cars in VERSIONS.items():
addr_to_ecu = defaultdict(set)
for ecus in cars.values():
for ecu_type, addr, sub_addr in ecus.keys():
addr_to_ecu[(addr, sub_addr)].add(ecu_type)
ecus_for_addr = addr_to_ecu[(addr, sub_addr)]
ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_for_addr])
self.assertLessEqual(len(ecus_for_addr), 1, f"{brand} has multiple ECUs that map to one address: {ecu_strings} -> ({hex(addr)}, {sub_addr})")
def test_data_collection_ecus(self):
# Asserts no extra ECUs are in the fingerprinting database
for brand, config in FW_QUERY_CONFIGS.items():
for car_model, ecus in VERSIONS[brand].items():
bad_ecus = set(ecus).intersection(config.extra_ecus)
with self.subTest(car_model=car_model.value):
self.assertFalse(len(bad_ecus), f'{car_model}: Fingerprints contain ECUs added for data collection: {bad_ecus}')
def test_blacklisted_ecus(self):
blacklisted_addrs = (0x7c4, 0x7d0) # includes A/C ecu and an unknown ecu
for car_model, ecus in FW_VERSIONS.items():
with self.subTest(car_model=car_model.value):
CP = interfaces[car_model][0].get_non_essential_params(car_model)
if CP.carName == 'subaru':
for ecu in ecus.keys():
self.assertNotIn(ecu[1], blacklisted_addrs, f'{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})')
elif CP.carName == "chrysler":
# Some HD trucks have a combined TCM and ECM
if CP.carFingerprint.startswith("RAM HD"):
for ecu in ecus.keys():
self.assertNotEqual(ecu[0], Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})")
def test_missing_versions_and_configs(self):
brand_versions = set(VERSIONS.keys())
brand_configs = set(FW_QUERY_CONFIGS.keys())
if len(brand_configs - brand_versions):
with self.subTest():
self.fail(f"Brands do not implement FW_VERSIONS: {brand_configs - brand_versions}")
if len(brand_versions - brand_configs):
with self.subTest():
self.fail(f"Brands do not implement FW_QUERY_CONFIG: {brand_versions - brand_configs}")
def test_fw_request_ecu_whitelist(self):
for brand, config in FW_QUERY_CONFIGS.items():
with self.subTest(brand=brand):
whitelisted_ecus = {ecu for r in config.requests for ecu in r.whitelist_ecus}
brand_ecus = {fw[0] for car_fw in VERSIONS[brand].values() for fw in car_fw}
brand_ecus |= {ecu[0] for ecu in config.extra_ecus}
# each ecu in brand's fw versions + extra ecus needs to be whitelisted at least once
ecus_not_whitelisted = brand_ecus - whitelisted_ecus
ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_whitelisted])
self.assertFalse(len(whitelisted_ecus) and len(ecus_not_whitelisted),
f'{brand.title()}: ECUs not in any FW query whitelists: {ecu_strings}')
def test_fw_requests(self):
# Asserts equal length request and response lists
for brand, config in FW_QUERY_CONFIGS.items():
with self.subTest(brand=brand):
for request_obj in config.requests:
self.assertEqual(len(request_obj.request), len(request_obj.response))
# No request on the OBD port (bus 1, multiplexed) should be run on an aux panda
self.assertFalse(request_obj.auxiliary and request_obj.bus == 1 and request_obj.obd_multiplexing,
f"{brand.title()}: OBD multiplexed request is marked auxiliary: {request_obj}")
class TestFwFingerprintTiming(unittest.TestCase):
N: int = 5
TOL: float = 0.12
@staticmethod
def _run_thread(thread: threading.Thread) -> float:
params = Params()
params.put_bool("ObdMultiplexingEnabled", True)
thread.start()
t = time.perf_counter()
while thread.is_alive():
time.sleep(0.02)
if not params.get_bool("ObdMultiplexingChanged"):
params.put_bool("ObdMultiplexingChanged", True)
return time.perf_counter() - t
def _benchmark_brand(self, brand, num_pandas):
fake_socket = FakeSocket()
brand_time = 0
for _ in range(self.N):
thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand),
kwargs=dict(num_pandas=num_pandas))
brand_time += self._run_thread(thread)
return brand_time / self.N
def _assert_timing(self, avg_time, ref_time):
self.assertLess(avg_time, ref_time + self.TOL)
self.assertGreater(avg_time, ref_time - self.TOL, "Performance seems to have improved, update test refs.")
def test_startup_timing(self):
# Tests worse-case VIN query time and typical present ECU query time
vin_ref_time = 1.0
present_ecu_ref_time = 0.8
fake_socket = FakeSocket()
present_ecu_time = 0.0
for _ in range(self.N):
thread = threading.Thread(target=get_present_ecus, args=(fake_socket, fake_socket),
kwargs=dict(num_pandas=2))
present_ecu_time += self._run_thread(thread)
self._assert_timing(present_ecu_time / self.N, present_ecu_ref_time)
print(f'get_present_ecus, query time={present_ecu_time / self.N} seconds')
vin_time = 0.0
for _ in range(self.N):
thread = threading.Thread(target=get_vin, args=(fake_socket, fake_socket, 1))
vin_time += self._run_thread(thread)
self._assert_timing(vin_time / self.N, vin_ref_time)
print(f'get_vin, query time={vin_time / self.N} seconds')
@pytest.mark.timeout(60)
def test_fw_query_timing(self):
total_ref_time = 6.41
brand_ref_times = {
1: {
'body': 0.11,
'chrysler': 0.3,
'ford': 0.2,
'honda': 0.52,
'hyundai': 0.72,
'mazda': 0.2,
'nissan': 0.4,
'subaru': 0.52,
'tesla': 0.2,
'toyota': 1.6,
'volkswagen': 0.2,
},
2: {
'ford': 0.3,
'hyundai': 1.12,
}
}
total_time = 0
for num_pandas in (1, 2):
for brand, config in FW_QUERY_CONFIGS.items():
with self.subTest(brand=brand, num_pandas=num_pandas):
multi_panda_requests = [r for r in config.requests if r.bus > 3]
if not len(multi_panda_requests) and num_pandas > 1:
raise unittest.SkipTest("No multi-panda FW queries")
avg_time = self._benchmark_brand(brand, num_pandas)
total_time += avg_time
avg_time = round(avg_time, 2)
self._assert_timing(avg_time, brand_ref_times[num_pandas][brand])
print(f'{brand=}, {num_pandas=}, {len(config.requests)=}, avg FW query time={avg_time} seconds')
with self.subTest(brand='all_brands'):
total_time = round(total_time, 2)
self._assert_timing(total_time, total_ref_time)
print(f'all brands, total FW query time={total_time} seconds')
if __name__ == "__main__":
unittest.main()

@ -1,135 +0,0 @@
#!/usr/bin/env python3
import copy
import json
import os
import unittest
import random
from PIL import Image, ImageDraw, ImageFont
from cereal import log, car
from cereal.messaging import SubMaster
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.events import Alert, EVENTS, ET
from openpilot.selfdrive.controls.lib.alertmanager import set_offroad_alert
from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS
AlertSize = log.ControlsState.AlertSize
OFFROAD_ALERTS_PATH = os.path.join(BASEDIR, "selfdrive/controls/lib/alerts_offroad.json")
# TODO: add callback alerts
ALERTS = []
for event_types in EVENTS.values():
for alert in event_types.values():
ALERTS.append(alert)
class TestAlerts(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(OFFROAD_ALERTS_PATH) as f:
cls.offroad_alerts = json.loads(f.read())
# Create fake objects for callback
cls.CS = car.CarState.new_message()
cls.CP = car.CarParams.new_message()
cfg = [c for c in CONFIGS if c.proc_name == 'controlsd'][0]
cls.sm = SubMaster(cfg.pubs)
def test_events_defined(self):
# Ensure all events in capnp schema are defined in events.py
events = car.CarEvent.EventName.schema.enumerants
for name, e in events.items():
if not name.endswith("DEPRECATED"):
fail_msg = "%s @%d not in EVENTS" % (name, e)
self.assertTrue(e in EVENTS.keys(), msg=fail_msg)
# ensure alert text doesn't exceed allowed width
def test_alert_text_length(self):
font_path = os.path.join(BASEDIR, "selfdrive/assets/fonts")
regular_font_path = os.path.join(font_path, "Inter-SemiBold.ttf")
bold_font_path = os.path.join(font_path, "Inter-Bold.ttf")
semibold_font_path = os.path.join(font_path, "Inter-SemiBold.ttf")
max_text_width = 2160 - 300 # full screen width is usable, minus sidebar
draw = ImageDraw.Draw(Image.new('RGB', (0, 0)))
fonts = {
AlertSize.small: [ImageFont.truetype(semibold_font_path, 74)],
AlertSize.mid: [ImageFont.truetype(bold_font_path, 88),
ImageFont.truetype(regular_font_path, 66)],
}
for alert in ALERTS:
if not isinstance(alert, Alert):
alert = alert(self.CP, self.CS, self.sm, metric=False, soft_disable_time=100)
# for full size alerts, both text fields wrap the text,
# so it's unlikely that they would go past the max width
if alert.alert_size in (AlertSize.none, AlertSize.full):
continue
for i, txt in enumerate([alert.alert_text_1, alert.alert_text_2]):
if i >= len(fonts[alert.alert_size]):
break
font = fonts[alert.alert_size][i]
left, _, right, _ = draw.textbbox((0, 0), txt, font)
width = right - left
msg = f"type: {alert.alert_type} msg: {txt}"
self.assertLessEqual(width, max_text_width, msg=msg)
def test_alert_sanity_check(self):
for event_types in EVENTS.values():
for event_type, a in event_types.items():
# TODO: add callback alerts
if not isinstance(a, Alert):
continue
if a.alert_size == AlertSize.none:
self.assertEqual(len(a.alert_text_1), 0)
self.assertEqual(len(a.alert_text_2), 0)
elif a.alert_size == AlertSize.small:
self.assertGreater(len(a.alert_text_1), 0)
self.assertEqual(len(a.alert_text_2), 0)
elif a.alert_size == AlertSize.mid:
self.assertGreater(len(a.alert_text_1), 0)
self.assertGreater(len(a.alert_text_2), 0)
else:
self.assertGreater(len(a.alert_text_1), 0)
self.assertGreaterEqual(a.duration, 0.)
if event_type not in (ET.WARNING, ET.PERMANENT, ET.PRE_ENABLE):
self.assertEqual(a.creation_delay, 0.)
def test_offroad_alerts(self):
params = Params()
for a in self.offroad_alerts:
# set the alert
alert = copy.copy(self.offroad_alerts[a])
set_offroad_alert(a, True)
alert['extra'] = ''
self.assertTrue(json.dumps(alert) == params.get(a, encoding='utf8'))
# then delete it
set_offroad_alert(a, False)
self.assertTrue(params.get(a) is None)
def test_offroad_alerts_extra_text(self):
params = Params()
for i in range(50):
# set the alert
a = random.choice(list(self.offroad_alerts))
alert = self.offroad_alerts[a]
set_offroad_alert(a, True, extra_text="a"*i)
written_alert = json.loads(params.get(a, encoding='utf8'))
self.assertTrue("a"*i == written_alert['extra'])
self.assertTrue(alert["text"] == written_alert['text'])
if __name__ == "__main__":
unittest.main()

@ -1,158 +0,0 @@
#!/usr/bin/env python3
import itertools
import numpy as np
import unittest
from parameterized import parameterized_class
from cereal import log
from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, V_CRUISE_MIN, V_CRUISE_MAX, V_CRUISE_INITIAL, IMPERIAL_INCREMENT
from cereal import car
from openpilot.common.conversions import Conversions as CV
from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver
ButtonEvent = car.CarState.ButtonEvent
ButtonType = car.CarState.ButtonEvent.Type
def run_cruise_simulation(cruise, e2e, t_end=20.):
man = Maneuver(
'',
duration=t_end,
initial_speed=max(cruise - 1., 0.0),
lead_relevancy=True,
initial_distance_lead=100,
cruise_values=[cruise],
prob_lead_values=[0.0],
breakpoints=[0.],
e2e=e2e,
)
valid, output = man.evaluate()
assert valid
return output[-1, 3]
@parameterized_class(("e2e", "personality", "speed"), itertools.product(
[True, False], # e2e
log.LongitudinalPersonality.schema.enumerants, # personality
[5,35])) # speed
class TestCruiseSpeed(unittest.TestCase):
def test_cruise_speed(self):
params = Params()
params.put("LongitudinalPersonality", str(self.personality))
print(f'Testing {self.speed} m/s')
cruise_speed = float(self.speed)
simulation_steady_state = run_cruise_simulation(cruise_speed, self.e2e)
self.assertAlmostEqual(simulation_steady_state, cruise_speed, delta=.01, msg=f'Did not reach {self.speed} m/s')
# TODO: test pcmCruise
@parameterized_class(('pcm_cruise',), [(False,)])
class TestVCruiseHelper(unittest.TestCase):
def setUp(self):
self.CP = car.CarParams(pcmCruise=self.pcm_cruise)
self.v_cruise_helper = VCruiseHelper(self.CP)
self.reset_cruise_speed_state()
def reset_cruise_speed_state(self):
# Two resets previous cruise speed
for _ in range(2):
self.v_cruise_helper.update_v_cruise(car.CarState(cruiseState={"available": False}), enabled=False, is_metric=False)
def enable(self, v_ego, experimental_mode):
# Simulates user pressing set with a current speed
self.v_cruise_helper.initialize_v_cruise(car.CarState(vEgo=v_ego), experimental_mode)
def test_adjust_speed(self):
"""
Asserts speed changes on falling edges of buttons.
"""
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
for btn in (ButtonType.accelCruise, ButtonType.decelCruise):
for pressed in (True, False):
CS = car.CarState(cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=btn, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
self.assertEqual(pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_rising_edge_enable(self):
"""
Some car interfaces may enable on rising edge of a button,
ensure we don't adjust speed if enabled changes mid-press.
"""
# NOTE: enabled is always one frame behind the result from button press in controlsd
for enabled, pressed in ((False, False),
(False, True),
(True, False)):
CS = car.CarState(cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=enabled, is_metric=False)
if pressed:
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
# Expected diff on enabling. Speed should not change on falling edge of pressed
self.assertEqual(not pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_resume_in_standstill(self):
"""
Asserts we don't increment set speed if user presses resume/accel to exit cruise standstill.
"""
self.enable(0, False)
for standstill in (True, False):
for pressed in (True, False):
CS = car.CarState(cruiseState={"available": True, "standstill": standstill})
CS.buttonEvents = [ButtonEvent(type=ButtonType.accelCruise, pressed=pressed)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
# speed should only update if not at standstill and button falling edge
should_equal = standstill or pressed
self.assertEqual(should_equal, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last)
def test_set_gas_pressed(self):
"""
Asserts pressing set while enabled with gas pressed sets
the speed to the maximum of vEgo and current cruise speed.
"""
for v_ego in np.linspace(0, 100, 101):
self.reset_cruise_speed_state()
self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False)
# first decrement speed, then perform gas pressed logic
expected_v_cruise_kph = self.v_cruise_helper.v_cruise_kph - IMPERIAL_INCREMENT
expected_v_cruise_kph = max(expected_v_cruise_kph, v_ego * CV.MS_TO_KPH) # clip to min of vEgo
expected_v_cruise_kph = float(np.clip(round(expected_v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX))
CS = car.CarState(vEgo=float(v_ego), gasPressed=True, cruiseState={"available": True})
CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=False)]
self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False)
# TODO: fix skipping first run due to enabled on rising edge exception
if v_ego == 0.0:
continue
self.assertEqual(expected_v_cruise_kph, self.v_cruise_helper.v_cruise_kph)
def test_initialize_v_cruise(self):
"""
Asserts allowed cruise speeds on enabling with SET.
"""
for experimental_mode in (True, False):
for v_ego in np.linspace(0, 100, 101):
self.reset_cruise_speed_state()
self.assertFalse(self.v_cruise_helper.v_cruise_initialized)
self.enable(float(v_ego), experimental_mode)
self.assertTrue(V_CRUISE_INITIAL <= self.v_cruise_helper.v_cruise_kph <= V_CRUISE_MAX)
self.assertTrue(self.v_cruise_helper.v_cruise_initialized)
if __name__ == "__main__":
unittest.main()

@ -1,47 +0,0 @@
#!/usr/bin/env python3
import unittest
import itertools
from parameterized import parameterized_class
from openpilot.common.params import Params
from cereal import log
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import desired_follow_distance, get_T_FOLLOW
from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver
def run_following_distance_simulation(v_lead, t_end=100.0, e2e=False):
man = Maneuver(
'',
duration=t_end,
initial_speed=float(v_lead),
lead_relevancy=True,
initial_distance_lead=100,
speed_lead_values=[v_lead],
breakpoints=[0.],
e2e=e2e,
)
valid, output = man.evaluate()
assert valid
return output[-1,2] - output[-1,1]
@parameterized_class(("e2e", "personality", "speed"), itertools.product(
[True, False], # e2e
[log.LongitudinalPersonality.relaxed, # personality
log.LongitudinalPersonality.standard,
log.LongitudinalPersonality.aggressive],
[0,10,35])) # speed
class TestFollowingDistance(unittest.TestCase):
def test_following_distance(self):
params = Params()
params.put("LongitudinalPersonality", str(self.personality))
v_lead = float(self.speed)
simulation_steady_state = run_following_distance_simulation(v_lead, e2e=self.e2e)
correct_steady_state = desired_follow_distance(v_lead, v_lead, get_T_FOLLOW(self.personality))
err_ratio = 0.2 if self.e2e else 0.1
self.assertAlmostEqual(simulation_steady_state, correct_steady_state, delta=(err_ratio * correct_steady_state + .5))
if __name__ == "__main__":
unittest.main()

@ -1,89 +0,0 @@
import unittest
import numpy as np
from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import LateralMpc
from openpilot.selfdrive.controls.lib.drive_helpers import CAR_ROTATION_RADIUS
from openpilot.selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import N as LAT_MPC_N
def run_mpc(lat_mpc=None, v_ref=30., x_init=0., y_init=0., psi_init=0., curvature_init=0.,
lane_width=3.6, poly_shift=0.):
if lat_mpc is None:
lat_mpc = LateralMpc()
lat_mpc.set_weights(1., .1, 0.0, .05, 800)
y_pts = poly_shift * np.ones(LAT_MPC_N + 1)
heading_pts = np.zeros(LAT_MPC_N + 1)
curv_rate_pts = np.zeros(LAT_MPC_N + 1)
x0 = np.array([x_init, y_init, psi_init, curvature_init])
p = np.column_stack([v_ref * np.ones(LAT_MPC_N + 1),
CAR_ROTATION_RADIUS * np.ones(LAT_MPC_N + 1)])
# converge in no more than 10 iterations
for _ in range(10):
lat_mpc.run(x0, p,
y_pts, heading_pts, curv_rate_pts)
return lat_mpc.x_sol
class TestLateralMpc(unittest.TestCase):
def _assert_null(self, sol, curvature=1e-6):
for i in range(len(sol)):
self.assertAlmostEqual(sol[0,i,1], 0., delta=curvature)
self.assertAlmostEqual(sol[0,i,2], 0., delta=curvature)
self.assertAlmostEqual(sol[0,i,3], 0., delta=curvature)
def _assert_simmetry(self, sol, curvature=1e-6):
for i in range(len(sol)):
self.assertAlmostEqual(sol[0,i,1], -sol[1,i,1], delta=curvature)
self.assertAlmostEqual(sol[0,i,2], -sol[1,i,2], delta=curvature)
self.assertAlmostEqual(sol[0,i,3], -sol[1,i,3], delta=curvature)
self.assertAlmostEqual(sol[0,i,0], sol[1,i,0], delta=curvature)
def test_straight(self):
sol = run_mpc()
self._assert_null(np.array([sol]))
def test_y_symmetry(self):
sol = []
for y_init in [-0.5, 0.5]:
sol.append(run_mpc(y_init=y_init))
self._assert_simmetry(np.array(sol))
def test_poly_symmetry(self):
sol = []
for poly_shift in [-1., 1.]:
sol.append(run_mpc(poly_shift=poly_shift))
self._assert_simmetry(np.array(sol))
def test_curvature_symmetry(self):
sol = []
for curvature_init in [-0.1, 0.1]:
sol.append(run_mpc(curvature_init=curvature_init))
self._assert_simmetry(np.array(sol))
def test_psi_symmetry(self):
sol = []
for psi_init in [-0.1, 0.1]:
sol.append(run_mpc(psi_init=psi_init))
self._assert_simmetry(np.array(sol))
def test_no_overshoot(self):
y_init = 1.
sol = run_mpc(y_init=y_init)
for y in list(sol[:,1]):
self.assertGreaterEqual(y_init, abs(y))
def test_switch_convergence(self):
lat_mpc = LateralMpc()
sol = run_mpc(lat_mpc=lat_mpc, poly_shift=3.0, v_ref=7.0)
right_psi_deg = np.degrees(sol[:,2])
sol = run_mpc(lat_mpc=lat_mpc, poly_shift=-3.0, v_ref=7.0)
left_psi_deg = np.degrees(sol[:,2])
np.testing.assert_almost_equal(right_psi_deg, -left_psi_deg, decimal=3)
if __name__ == "__main__":
unittest.main()

@ -1,36 +0,0 @@
#!/usr/bin/env python3
import unittest
import cereal.messaging as messaging
from openpilot.selfdrive.test.process_replay import replay_process_with_name
from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA
class TestLeads(unittest.TestCase):
def test_radar_fault(self):
# if there's no radar-related can traffic, radard should either not respond or respond with an error
# this is tightly coupled with underlying car radar_interface implementation, but it's a good sanity check
def single_iter_pkg():
# single iter package, with meaningless cans and empty carState/modelV2
msgs = []
for _ in range(5):
can = messaging.new_message("can", 1)
cs = messaging.new_message("carState")
msgs.append(can.as_reader())
msgs.append(cs.as_reader())
model = messaging.new_message("modelV2")
msgs.append(model.as_reader())
return msgs
msgs = [m for _ in range(3) for m in single_iter_pkg()]
out = replay_process_with_name("radard", msgs, fingerprint=TOYOTA.COROLLA_TSS2)
states = [m for m in out if m.which() == "radarState"]
failures = [not state.valid and len(state.radarState.radarErrors) for state in states]
self.assertTrue(len(states) == 0 or all(failures))
if __name__ == "__main__":
unittest.main()

@ -1,129 +0,0 @@
#!/usr/bin/env python3
import time
import unittest
from parameterized import parameterized
from cereal import log, car
import cereal.messaging as messaging
from openpilot.common.params import Params
from openpilot.selfdrive.boardd.boardd_api_impl import can_list_to_can_capnp
from openpilot.selfdrive.car.fingerprints import _FINGERPRINTS
from openpilot.selfdrive.car.toyota.values import CAR as TOYOTA
from openpilot.selfdrive.car.mazda.values import CAR as MAZDA
from openpilot.selfdrive.controls.lib.events import EVENT_NAME
from openpilot.selfdrive.test.helpers import with_processes
EventName = car.CarEvent.EventName
Ecu = car.CarParams.Ecu
COROLLA_FW_VERSIONS = [
(Ecu.engine, 0x7e0, None, b'\x0230ZC2000\x00\x00\x00\x00\x00\x00\x00\x0050212000\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.abs, 0x7b0, None, b'F152602190\x00\x00\x00\x00\x00\x00'),
(Ecu.eps, 0x7a1, None, b'8965B02181\x00\x00\x00\x00\x00\x00'),
(Ecu.fwdRadar, 0x750, 0xf, b'8821F4702100\x00\x00\x00\x00'),
(Ecu.fwdCamera, 0x750, 0x6d, b'8646F0201101\x00\x00\x00\x00'),
(Ecu.dsu, 0x791, None, b'881510201100\x00\x00\x00\x00'),
]
COROLLA_FW_VERSIONS_FUZZY = COROLLA_FW_VERSIONS[:-1] + [(Ecu.dsu, 0x791, None, b'xxxxxx')]
COROLLA_FW_VERSIONS_NO_DSU = COROLLA_FW_VERSIONS[:-1]
CX5_FW_VERSIONS = [
(Ecu.engine, 0x7e0, None, b'PYNF-188K2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.abs, 0x760, None, b'K123-437K2-E\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.eps, 0x730, None, b'KJ01-3210X-G-00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.fwdRadar, 0x764, None, b'K123-67XK2-F\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.fwdCamera, 0x706, None, b'B61L-67XK2-T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
(Ecu.transmission, 0x7e1, None, b'PYNC-21PS1-B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
]
class TestStartup(unittest.TestCase):
@parameterized.expand([
# TODO: test EventName.startup for release branches
# officially supported car
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"),
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS, "toyota"),
# dashcamOnly car
(EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"),
(EventName.startupNoControl, MAZDA.CX5, CX5_FW_VERSIONS, "mazda"),
# unrecognized car with no fw
(EventName.startupNoFw, None, None, ""),
(EventName.startupNoFw, None, None, ""),
# unrecognized car
(EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"),
(EventName.startupNoCar, None, COROLLA_FW_VERSIONS[:1], "toyota"),
# fuzzy match
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"),
(EventName.startupMaster, TOYOTA.COROLLA, COROLLA_FW_VERSIONS_FUZZY, "toyota"),
])
@with_processes(['controlsd'])
def test_startup_alert(self, expected_event, car_model, fw_versions, brand):
# TODO: this should be done without any real sockets
controls_sock = messaging.sub_sock("controlsState")
pm = messaging.PubMaster(['can', 'pandaStates'])
params = Params()
params.clear_all()
params.put_bool("Passive", False)
params.put_bool("OpenpilotEnabledToggle", True)
# Build capnn version of FW array
if fw_versions is not None:
car_fw = []
cp = car.CarParams.new_message()
for ecu, addr, subaddress, version in fw_versions:
f = car.CarParams.CarFw.new_message()
f.ecu = ecu
f.address = addr
f.fwVersion = version
f.brand = brand
if subaddress is not None:
f.subAddress = subaddress
car_fw.append(f)
cp.carVin = "1" * 17
cp.carFw = car_fw
params.put("CarParamsCache", cp.to_bytes())
time.sleep(2) # wait for controlsd to be ready
pm.send('can', can_list_to_can_capnp([[0, 0, b"", 0]]))
time.sleep(0.1)
msg = messaging.new_message('pandaStates', 1)
msg.pandaStates[0].pandaType = log.PandaState.PandaType.uno
pm.send('pandaStates', msg)
# fingerprint
if (car_model is None) or (fw_versions is not None):
finger = {addr: 1 for addr in range(1, 100)}
else:
finger = _FINGERPRINTS[car_model][0]
for _ in range(1000):
# controlsd waits for boardd to echo back that it has changed the multiplexing mode
if not params.get_bool("ObdMultiplexingChanged"):
params.put_bool("ObdMultiplexingChanged", True)
msgs = [[addr, 0, b'\x00'*length, 0] for addr, length in finger.items()]
pm.send('can', can_list_to_can_capnp(msgs))
time.sleep(0.01)
msgs = messaging.drain_sock(controls_sock)
if len(msgs):
event_name = msgs[0].controlsState.alertType.split("/")[0]
self.assertEqual(EVENT_NAME[expected_event], event_name,
f"expected {EVENT_NAME[expected_event]} for '{car_model}', got {event_name}")
break
else:
self.fail(f"failed to fingerprint {car_model}")
if __name__ == "__main__":
unittest.main()

@ -1,111 +0,0 @@
#!/usr/bin/env python3
import unittest
from unittest import mock
from cereal import car, log
from openpilot.common.realtime import DT_CTRL
from openpilot.selfdrive.car.car_helpers import interfaces
from openpilot.selfdrive.controls.controlsd import Controls, SOFT_DISABLE_TIME
from openpilot.selfdrive.controls.lib.events import Events, ET, Alert, Priority, AlertSize, AlertStatus, VisualAlert, \
AudibleAlert, EVENTS as _EVENTS
State = log.ControlsState.OpenpilotState
EVENTS = _EVENTS.copy()
# The event types that maintain the current state
MAINTAIN_STATES = {State.enabled: (None,), State.disabled: (None,), State.softDisabling: (ET.SOFT_DISABLE,),
State.preEnabled: (ET.PRE_ENABLE,), State.overriding: (ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)}
ALL_STATES = tuple(State.schema.enumerants.values())
# The event types checked in DISABLED section of state machine
ENABLE_EVENT_TYPES = (ET.ENABLE, ET.PRE_ENABLE, ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)
def make_event(event_types):
event = {}
for ev in event_types:
event[ev] = Alert("", "", AlertStatus.normal, AlertSize.small, Priority.LOW,
VisualAlert.none, AudibleAlert.none, 1.)
EVENTS[0] = event
return 0
@mock.patch("openpilot.selfdrive.controls.lib.events.EVENTS", EVENTS)
class TestStateMachine(unittest.TestCase):
def setUp(self):
CarInterface, CarController, CarState = interfaces["mock"]
CP = CarInterface.get_non_essential_params("mock")
CI = CarInterface(CP, CarController, CarState)
self.controlsd = Controls(CI=CI)
self.controlsd.events = Events()
self.controlsd.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
self.CS = car.CarState()
def test_immediate_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.controlsd.events.add(make_event([et, ET.IMMEDIATE_DISABLE]))
self.controlsd.state = state
self.controlsd.state_transition(self.CS)
self.assertEqual(State.disabled, self.controlsd.state)
self.controlsd.events.clear()
def test_user_disable(self):
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.controlsd.events.add(make_event([et, ET.USER_DISABLE]))
self.controlsd.state = state
self.controlsd.state_transition(self.CS)
self.assertEqual(State.disabled, self.controlsd.state)
self.controlsd.events.clear()
def test_soft_disable(self):
for state in ALL_STATES:
if state == State.preEnabled: # preEnabled considers NO_ENTRY instead
continue
for et in MAINTAIN_STATES[state]:
self.controlsd.events.add(make_event([et, ET.SOFT_DISABLE]))
self.controlsd.state = state
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.disabled if state == State.disabled else State.softDisabling)
self.controlsd.events.clear()
def test_soft_disable_timer(self):
self.controlsd.state = State.enabled
self.controlsd.events.add(make_event([ET.SOFT_DISABLE]))
self.controlsd.state_transition(self.CS)
for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)):
self.assertEqual(self.controlsd.state, State.softDisabling)
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.disabled)
def test_no_entry(self):
# Make sure noEntry keeps us disabled
for et in ENABLE_EVENT_TYPES:
self.controlsd.events.add(make_event([ET.NO_ENTRY, et]))
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.disabled)
self.controlsd.events.clear()
def test_no_entry_pre_enable(self):
# preEnabled with noEntry event
self.controlsd.state = State.preEnabled
self.controlsd.events.add(make_event([ET.NO_ENTRY, ET.PRE_ENABLE]))
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, State.preEnabled)
def test_maintain_states(self):
# Given current state's event type, we should maintain state
for state in ALL_STATES:
for et in MAINTAIN_STATES[state]:
self.controlsd.state = state
self.controlsd.events.add(make_event([et]))
self.controlsd.state_transition(self.CS)
self.assertEqual(self.controlsd.state, state)
self.controlsd.events.clear()
if __name__ == "__main__":
unittest.main()

@ -1,71 +0,0 @@
import numpy as np
from openpilot.selfdrive.test.longitudinal_maneuvers.plant import Plant
class Maneuver:
def __init__(self, title, duration, **kwargs):
# Was tempted to make a builder class
self.distance_lead = kwargs.get("initial_distance_lead", 200.0)
self.speed = kwargs.get("initial_speed", 0.0)
self.lead_relevancy = kwargs.get("lead_relevancy", 0)
self.breakpoints = kwargs.get("breakpoints", [0.0, duration])
self.speed_lead_values = kwargs.get("speed_lead_values", [0.0 for i in range(len(self.breakpoints))])
self.prob_lead_values = kwargs.get("prob_lead_values", [1.0 for i in range(len(self.breakpoints))])
self.cruise_values = kwargs.get("cruise_values", [50.0 for i in range(len(self.breakpoints))])
self.only_lead2 = kwargs.get("only_lead2", False)
self.only_radar = kwargs.get("only_radar", False)
self.ensure_start = kwargs.get("ensure_start", False)
self.enabled = kwargs.get("enabled", True)
self.e2e = kwargs.get("e2e", False)
self.force_decel = kwargs.get("force_decel", False)
self.duration = duration
self.title = title
def evaluate(self):
plant = Plant(
lead_relevancy=self.lead_relevancy,
speed=self.speed,
distance_lead=self.distance_lead,
enabled=self.enabled,
only_lead2=self.only_lead2,
only_radar=self.only_radar,
e2e=self.e2e,
force_decel=self.force_decel,
)
valid = True
logs = []
while plant.current_time < self.duration:
speed_lead = np.interp(plant.current_time, self.breakpoints, self.speed_lead_values)
prob = np.interp(plant.current_time, self.breakpoints, self.prob_lead_values)
cruise = np.interp(plant.current_time, self.breakpoints, self.cruise_values)
log = plant.step(speed_lead, prob, cruise)
d_rel = log['distance_lead'] - log['distance'] if self.lead_relevancy else 200.
v_rel = speed_lead - log['speed'] if self.lead_relevancy else 0.
log['d_rel'] = d_rel
log['v_rel'] = v_rel
logs.append(np.array([plant.current_time,
log['distance'],
log['distance_lead'],
log['speed'],
speed_lead,
log['acceleration']]))
if d_rel < .4 and (self.only_radar or prob > 0.5):
print("Crashed!!!!")
valid = False
if self.ensure_start and log['v_rel'] > 0 and log['speeds'][-1] <= 0.1:
print('LongitudinalPlanner not starting!')
valid = False
if self.force_decel and log['speed'] > 1e-1 and log['acceleration'] > -0.04:
print('Not stopping with force decel')
valid = False
print("maneuver end", valid)
return valid, np.array(logs)

@ -1,172 +0,0 @@
#!/usr/bin/env python3
import time
import numpy as np
from cereal import log
import cereal.messaging as messaging
from openpilot.common.realtime import Ratekeeper, DT_MDL
from openpilot.selfdrive.controls.lib.longcontrol import LongCtrlState
from openpilot.selfdrive.modeld.constants import ModelConstants
from openpilot.selfdrive.controls.lib.longitudinal_planner import LongitudinalPlanner
from openpilot.selfdrive.controls.radard import _LEAD_ACCEL_TAU
class Plant:
messaging_initialized = False
def __init__(self, lead_relevancy=False, speed=0.0, distance_lead=2.0,
enabled=True, only_lead2=False, only_radar=False, e2e=False, force_decel=False):
self.rate = 1. / DT_MDL
if not Plant.messaging_initialized:
Plant.radar = messaging.pub_sock('radarState')
Plant.controls_state = messaging.pub_sock('controlsState')
Plant.car_state = messaging.pub_sock('carState')
Plant.plan = messaging.sub_sock('longitudinalPlan')
Plant.messaging_initialized = True
self.v_lead_prev = 0.0
self.distance = 0.
self.speed = speed
self.acceleration = 0.0
self.speeds = []
# lead car
self.lead_relevancy = lead_relevancy
self.distance_lead = distance_lead
self.enabled = enabled
self.only_lead2 = only_lead2
self.only_radar = only_radar
self.e2e = e2e
self.force_decel = force_decel
self.rk = Ratekeeper(self.rate, print_delay_threshold=100.0)
self.ts = 1. / self.rate
time.sleep(0.1)
self.sm = messaging.SubMaster(['longitudinalPlan'])
from openpilot.selfdrive.car.honda.values import CAR
from openpilot.selfdrive.car.honda.interface import CarInterface
self.planner = LongitudinalPlanner(CarInterface.get_non_essential_params(CAR.CIVIC), init_v=self.speed)
@property
def current_time(self):
return float(self.rk.frame) / self.rate
def step(self, v_lead=0.0, prob=1.0, v_cruise=50.):
# ******** publish a fake model going straight and fake calibration ********
# note that this is worst case for MPC, since model will delay long mpc by one time step
radar = messaging.new_message('radarState')
control = messaging.new_message('controlsState')
car_state = messaging.new_message('carState')
model = messaging.new_message('modelV2')
a_lead = (v_lead - self.v_lead_prev)/self.ts
self.v_lead_prev = v_lead
if self.lead_relevancy:
d_rel = np.maximum(0., self.distance_lead - self.distance)
v_rel = v_lead - self.speed
if self.only_radar:
status = True
elif prob > .5:
status = True
else:
status = False
else:
d_rel = 200.
v_rel = 0.
prob = 0.0
status = False
lead = log.RadarState.LeadData.new_message()
lead.dRel = float(d_rel)
lead.yRel = float(0.0)
lead.vRel = float(v_rel)
lead.aRel = float(a_lead - self.acceleration)
lead.vLead = float(v_lead)
lead.vLeadK = float(v_lead)
lead.aLeadK = float(a_lead)
# TODO use real radard logic for this
lead.aLeadTau = float(_LEAD_ACCEL_TAU)
lead.status = status
lead.modelProb = float(prob)
if not self.only_lead2:
radar.radarState.leadOne = lead
radar.radarState.leadTwo = lead
# Simulate model predicting slightly faster speed
# this is to ensure lead policy is effective when model
# does not predict slowdown in e2e mode
position = log.XYZTData.new_message()
position.x = [float(x) for x in (self.speed + 0.5) * np.array(ModelConstants.T_IDXS)]
model.modelV2.position = position
velocity = log.XYZTData.new_message()
velocity.x = [float(x) for x in (self.speed + 0.5) * np.ones_like(ModelConstants.T_IDXS)]
model.modelV2.velocity = velocity
acceleration = log.XYZTData.new_message()
acceleration.x = [float(x) for x in np.zeros_like(ModelConstants.T_IDXS)]
model.modelV2.acceleration = acceleration
control.controlsState.longControlState = LongCtrlState.pid if self.enabled else LongCtrlState.off
control.controlsState.vCruise = float(v_cruise * 3.6)
control.controlsState.experimentalMode = self.e2e
control.controlsState.forceDecel = self.force_decel
car_state.carState.vEgo = float(self.speed)
car_state.carState.standstill = self.speed < 0.01
# ******** get controlsState messages for plotting ***
sm = {'radarState': radar.radarState,
'carState': car_state.carState,
'controlsState': control.controlsState,
'modelV2': model.modelV2}
self.planner.update(sm)
self.speed = self.planner.v_desired_filter.x
self.acceleration = self.planner.a_desired
self.speeds = self.planner.v_desired_trajectory.tolist()
fcw = self.planner.fcw
self.distance_lead = self.distance_lead + v_lead * self.ts
# ******** run the car ********
#print(self.distance, speed)
if self.speed <= 0:
self.speed = 0
self.acceleration = 0
self.distance = self.distance + self.speed * self.ts
# *** radar model ***
if self.lead_relevancy:
d_rel = np.maximum(0., self.distance_lead - self.distance)
v_rel = v_lead - self.speed
else:
d_rel = 200.
v_rel = 0.
# print at 5hz
# if (self.rk.frame % (self.rate // 5)) == 0:
# print("%2.2f sec %6.2f m %6.2f m/s %6.2f m/s2 lead_rel: %6.2f m %6.2f m/s"
# % (self.current_time, self.distance, self.speed, self.acceleration, d_rel, v_rel))
# ******** update prevs ********
self.rk.monitor_time()
return {
"distance": self.distance,
"speed": self.speed,
"acceleration": self.acceleration,
"speeds": self.speeds,
"distance_lead": self.distance_lead,
"fcw": fcw,
}
# simple engage in standalone mode
def plant_thread():
plant = Plant()
while 1:
plant.step()
if __name__ == "__main__":
plant_thread()

@ -1,160 +0,0 @@
#!/usr/bin/env python3
import itertools
import unittest
from parameterized import parameterized_class
from openpilot.selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import STOP_DISTANCE
from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver
# TODO: make new FCW tests
def create_maneuvers(kwargs):
maneuvers = [
Maneuver(
'approach stopped car at 25m/s, initial distance: 120m',
duration=20.,
initial_speed=25.,
lead_relevancy=True,
initial_distance_lead=120.,
speed_lead_values=[30., 0.],
breakpoints=[0., 1.],
**kwargs,
),
Maneuver(
'approach stopped car at 20m/s, initial distance 90m',
duration=20.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=90.,
speed_lead_values=[20., 0.],
breakpoints=[0., 1.],
**kwargs,
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 1m/s^2',
duration=50.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
breakpoints=[0., 15., 35.0],
**kwargs,
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 2m/s^2',
duration=50.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
breakpoints=[0., 15., 25.0],
**kwargs,
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 3m/s^2',
duration=50.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
breakpoints=[0., 15., 21.66],
**kwargs,
),
Maneuver(
'steady state following a car at 20m/s, then lead decel to 0mph at 3+m/s^2',
duration=40.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=35.,
speed_lead_values=[20., 20., 0.],
prob_lead_values=[0., 1., 1.],
cruise_values=[20., 20., 20.],
breakpoints=[2., 2.01, 8.8],
**kwargs,
),
Maneuver(
"approach stopped car at 20m/s, with prob_lead_values",
duration=30.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=120.,
speed_lead_values=[0.0, 0., 0.],
prob_lead_values=[0.0, 0., 1.],
cruise_values=[20., 20., 20.],
breakpoints=[0.0, 2., 2.01],
**kwargs,
),
Maneuver(
"approach slower cut-in car at 20m/s",
duration=20.,
initial_speed=20.,
lead_relevancy=True,
initial_distance_lead=50.,
speed_lead_values=[15., 15.],
breakpoints=[1., 11.],
only_lead2=True,
**kwargs,
),
Maneuver(
"stay stopped behind radar override lead",
duration=20.,
initial_speed=0.,
lead_relevancy=True,
initial_distance_lead=10.,
speed_lead_values=[0., 0.],
prob_lead_values=[0., 0.],
breakpoints=[1., 11.],
only_radar=True,
**kwargs,
),
Maneuver(
"NaN recovery",
duration=30.,
initial_speed=15.,
lead_relevancy=True,
initial_distance_lead=60.,
speed_lead_values=[0., 0., 0.0],
breakpoints=[1., 1.01, 11.],
cruise_values=[float("nan"), 15., 15.],
**kwargs,
),
Maneuver(
'cruising at 25 m/s while disabled',
duration=20.,
initial_speed=25.,
lead_relevancy=False,
enabled=False,
**kwargs,
),
]
if not kwargs['force_decel']:
# controls relies on planner commanding to move for stock-ACC resume spamming
maneuvers.append(Maneuver(
"resume from a stop",
duration=20.,
initial_speed=0.,
lead_relevancy=True,
initial_distance_lead=STOP_DISTANCE,
speed_lead_values=[0., 0., 2.],
breakpoints=[1., 10., 15.],
ensure_start=True,
**kwargs,
))
return maneuvers
@parameterized_class(("e2e", "force_decel"), itertools.product([True, False], repeat=2))
class LongitudinalControl(unittest.TestCase):
e2e: bool
force_decel: bool
def test_maneuver(self):
for maneuver in create_maneuvers({"e2e": self.e2e, "force_decel": self.force_decel}):
with self.subTest(title=maneuver.title, e2e=maneuver.e2e, force_decel=maneuver.force_decel):
print(maneuver.title, f'in {"e2e" if maneuver.e2e else "acc"} mode')
valid, _ = maneuver.evaluate()
self.assertTrue(valid)
if __name__ == "__main__":
unittest.main(failfast=True)

@ -1,106 +0,0 @@
import os
import random
import unittest
from pathlib import Path
from typing import Optional
from openpilot.system.hardware.hw import Paths
import openpilot.system.loggerd.deleter as deleter
import openpilot.system.loggerd.uploader as uploader
from openpilot.system.loggerd.xattr_cache import setxattr
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: Optional[bytes] = None) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
if lock:
lock_path = str(file_path) + ".lock"
os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL))
chunks = 128
chunk_bytes = int(size_mb * 1024 * 1024 / chunks)
data = os.urandom(chunk_bytes)
with open(file_path, "wb") as f:
for _ in range(chunks):
f.write(data)
if upload_xattr is not None:
setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr)
class MockResponse():
def __init__(self, text, status_code):
self.text = text
self.status_code = status_code
class MockApi():
def __init__(self, dongle_id):
pass
def get(self, *args, **kwargs):
return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200)
def get_token(self):
return "fake-token"
class MockApiIgnore():
def __init__(self, dongle_id):
pass
def get(self, *args, **kwargs):
return MockResponse('', 412)
def get_token(self):
return "fake-token"
class MockParams():
def __init__(self):
self.params = {
"DongleId": b"0000000000000000",
"IsOffroad": b"1",
}
def get(self, k, block=False, encoding=None):
val = self.params[k]
if encoding is not None:
return val.decode(encoding)
else:
return val
def get_bool(self, k):
val = self.params[k]
return (val == b'1')
class UploaderTestCase(unittest.TestCase):
f_type = "UNKNOWN"
root: Path
seg_num: int
seg_format: str
seg_format2: str
seg_dir: str
def set_ignore(self):
uploader.Api = MockApiIgnore
def setUp(self):
uploader.Api = MockApi
uploader.Params = MockParams
uploader.fake_upload = True
uploader.force_wifi = True
uploader.allow_sleep = False
self.seg_num = random.randint(1, 300)
self.seg_format = "2019-04-18--12-52-54--{}"
self.seg_format2 = "2019-05-18--11-22-33--{}"
self.seg_dir = self.seg_format.format(self.seg_num)
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path:
file_path = Path(Paths.log_root()) / f_dir / fn
create_random_file(file_path, size_mb, lock, upload_xattr)
if preserve_xattr is not None:
setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr)
return file_path

@ -1,123 +0,0 @@
#!/usr/bin/env python3
import time
import threading
import unittest
from collections import namedtuple
from pathlib import Path
from typing import Sequence
import openpilot.system.loggerd.deleter as deleter
from openpilot.common.timeout import Timeout, TimeoutException
from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase
Stats = namedtuple("Stats", ['f_bavail', 'f_blocks', 'f_frsize'])
class TestDeleter(UploaderTestCase):
def fake_statvfs(self, d):
return self.fake_stats
def setUp(self):
self.f_type = "fcamera.hevc"
super().setUp()
self.fake_stats = Stats(f_bavail=0, f_blocks=10, f_frsize=4096)
deleter.os.statvfs = self.fake_statvfs
def start_thread(self):
self.end_event = threading.Event()
self.del_thread = threading.Thread(target=deleter.deleter_thread, args=[self.end_event])
self.del_thread.daemon = True
self.del_thread.start()
def join_thread(self):
self.end_event.set()
self.del_thread.join()
def test_delete(self):
f_path = self.make_file_with_data(self.seg_dir, self.f_type, 1)
self.start_thread()
try:
with Timeout(2, "Timeout waiting for file to be deleted"):
while f_path.exists():
time.sleep(0.01)
finally:
self.join_thread()
def assertDeleteOrder(self, f_paths: Sequence[Path], timeout: int = 5) -> None:
deleted_order = []
self.start_thread()
try:
with Timeout(timeout, "Timeout waiting for files to be deleted"):
while True:
for f in f_paths:
if not f.exists() and f not in deleted_order:
deleted_order.append(f)
if len(deleted_order) == len(f_paths):
break
time.sleep(0.01)
except TimeoutException:
print("Not deleted:", [f for f in f_paths if f not in deleted_order])
raise
finally:
self.join_thread()
self.assertEqual(deleted_order, f_paths, "Files not deleted in expected order")
def test_delete_order(self):
self.assertDeleteOrder([
self.make_file_with_data(self.seg_format.format(0), self.f_type),
self.make_file_with_data(self.seg_format.format(1), self.f_type),
self.make_file_with_data(self.seg_format2.format(0), self.f_type),
])
def test_delete_many_preserved(self):
self.assertDeleteOrder([
self.make_file_with_data(self.seg_format.format(0), self.f_type),
self.make_file_with_data(self.seg_format.format(1), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE),
self.make_file_with_data(self.seg_format.format(2), self.f_type),
] + [
self.make_file_with_data(self.seg_format2.format(i), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE)
for i in range(5)
])
def test_delete_last(self):
self.assertDeleteOrder([
self.make_file_with_data(self.seg_format.format(1), self.f_type),
self.make_file_with_data(self.seg_format2.format(0), self.f_type),
self.make_file_with_data(self.seg_format.format(0), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE),
self.make_file_with_data("boot", self.seg_format[:-4]),
self.make_file_with_data("crash", self.seg_format2[:-4]),
])
def test_no_delete_when_available_space(self):
f_path = self.make_file_with_data(self.seg_dir, self.f_type)
block_size = 4096
available = (10 * 1024 * 1024 * 1024) / block_size # 10GB free
self.fake_stats = Stats(f_bavail=available, f_blocks=10, f_frsize=block_size)
self.start_thread()
start_time = time.monotonic()
while f_path.exists() and time.monotonic() - start_time < 2:
time.sleep(0.01)
self.join_thread()
self.assertTrue(f_path.exists(), "File deleted with available space")
def test_no_delete_with_lock_file(self):
f_path = self.make_file_with_data(self.seg_dir, self.f_type, lock=True)
self.start_thread()
start_time = time.monotonic()
while f_path.exists() and time.monotonic() - start_time < 2:
time.sleep(0.01)
self.join_thread()
self.assertTrue(f_path.exists(), "File deleted when locked")
if __name__ == "__main__":
unittest.main()

@ -1,156 +0,0 @@
#!/usr/bin/env python3
import math
import os
import pytest
import random
import shutil
import subprocess
import time
import unittest
from pathlib import Path
from parameterized import parameterized
from tqdm import trange
from openpilot.common.params import Params
from openpilot.common.timeout import Timeout
from openpilot.system.hardware import TICI
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.tools.lib.logreader import LogReader
from openpilot.system.hardware.hw import Paths
SEGMENT_LENGTH = 2
FULL_SIZE = 2507572
CAMERAS = [
("fcamera.hevc", 20, FULL_SIZE, "roadEncodeIdx"),
("dcamera.hevc", 20, FULL_SIZE, "driverEncodeIdx"),
("ecamera.hevc", 20, FULL_SIZE, "wideRoadEncodeIdx"),
("qcamera.ts", 20, 130000, None),
]
# we check frame count, so we don't have to be too strict on size
FILE_SIZE_TOLERANCE = 0.5
@pytest.mark.tici # TODO: all of loggerd should work on PC
class TestEncoder(unittest.TestCase):
def setUp(self):
self._clear_logs()
os.environ["LOGGERD_TEST"] = "1"
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(SEGMENT_LENGTH)
def tearDown(self):
self._clear_logs()
def _clear_logs(self):
if os.path.exists(Paths.log_root()):
shutil.rmtree(Paths.log_root())
def _get_latest_segment_path(self):
last_route = sorted(Path(Paths.log_root()).iterdir())[-1]
return os.path.join(Paths.log_root(), last_route)
# TODO: this should run faster than real time
@parameterized.expand([(True, ), (False, )])
def test_log_rotation(self, record_front):
Params().put_bool("RecordFront", record_front)
managed_processes['sensord'].start()
managed_processes['loggerd'].start()
managed_processes['encoderd'].start()
time.sleep(1.0)
managed_processes['camerad'].start()
num_segments = int(os.getenv("SEGMENTS", random.randint(10, 15)))
# wait for loggerd to make the dir for first segment
route_prefix_path = None
with Timeout(int(SEGMENT_LENGTH*3)):
while route_prefix_path is None:
try:
route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0]
except Exception:
time.sleep(0.1)
def check_seg(i):
# check each camera file size
counts = []
first_frames = []
for camera, fps, size, encode_idx_name in CAMERAS:
if not record_front and "dcamera" in camera:
continue
file_path = f"{route_prefix_path}--{i}/{camera}"
# check file exists
self.assertTrue(os.path.exists(file_path), f"segment #{i}: '{file_path}' missing")
# TODO: this ffprobe call is really slow
# check frame count
cmd = f"ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 {file_path}"
if TICI:
cmd = "LD_LIBRARY_PATH=/usr/local/lib " + cmd
expected_frames = fps * SEGMENT_LENGTH
probe = subprocess.check_output(cmd, shell=True, encoding='utf8')
frame_count = int(probe.split('\n')[0].strip())
counts.append(frame_count)
self.assertEqual(frame_count, expected_frames,
f"segment #{i}: {camera} failed frame count check: expected {expected_frames}, got {frame_count}")
# sanity check file size
file_size = os.path.getsize(file_path)
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE),
f"{file_path} size {file_size} isn't close to target size {size}")
# Check encodeIdx
if encode_idx_name is not None:
rlog_path = f"{route_prefix_path}--{i}/rlog"
msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name]
encode_msgs = [getattr(m, encode_idx_name) for m in msgs]
valid = [m.valid for m in msgs]
segment_idxs = [m.segmentId for m in encode_msgs]
encode_idxs = [m.encodeId for m in encode_msgs]
frame_idxs = [m.frameId for m in encode_msgs]
# Check frame count
self.assertEqual(frame_count, len(segment_idxs))
self.assertEqual(frame_count, len(encode_idxs))
# Check for duplicates or skips
self.assertEqual(0, segment_idxs[0])
self.assertEqual(len(set(segment_idxs)), len(segment_idxs))
self.assertTrue(all(valid))
self.assertEqual(expected_frames * i, encode_idxs[0])
first_frames.append(frame_idxs[0])
self.assertEqual(len(set(encode_idxs)), len(encode_idxs))
self.assertEqual(1, len(set(first_frames)))
if TICI:
expected_frames = fps * SEGMENT_LENGTH
self.assertEqual(min(counts), expected_frames)
shutil.rmtree(f"{route_prefix_path}--{i}")
try:
for i in trange(num_segments):
# poll for next segment
with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"):
while Path(f"{route_prefix_path}--{i+1}") not in Path(Paths.log_root()).iterdir():
time.sleep(0.1)
check_seg(i)
finally:
managed_processes['loggerd'].stop()
managed_processes['encoderd'].stop()
managed_processes['camerad'].stop()
managed_processes['sensord'].stop()
if __name__ == "__main__":
unittest.main()

@ -1,74 +0,0 @@
#include "catch2/catch.hpp"
#include "system/loggerd/logger.h"
typedef cereal::Sentinel::SentinelType SentinelType;
void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) {
const std::string segment_path = route_path + "--" + std::to_string(segment);
SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT;
SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT;
REQUIRE(!util::file_exists(segment_path + "/rlog.lock"));
for (const char *fn : {"/rlog", "/qlog"}) {
const std::string log_file = segment_path + fn;
std::string log = util::read_file(log_file);
REQUIRE(!log.empty());
int event_cnt = 0, i = 0;
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
while (words.size() > 0) {
try {
capnp::FlatArrayMessageReader reader(words);
auto event = reader.getRoot<cereal::Event>();
words = kj::arrayPtr(reader.getEnd(), words.end());
if (i == 0) {
REQUIRE(event.which() == cereal::Event::INIT_DATA);
} else if (i == 1) {
REQUIRE(event.which() == cereal::Event::SENTINEL);
REQUIRE(event.getSentinel().getType() == begin_sentinel);
REQUIRE(event.getSentinel().getSignal() == 0);
} else if (words.size() > 0) {
REQUIRE(event.which() == cereal::Event::CLOCKS);
++event_cnt;
} else {
// the last event must be SENTINEL
REQUIRE(event.which() == cereal::Event::SENTINEL);
REQUIRE(event.getSentinel().getType() == end_sentinel);
REQUIRE(event.getSentinel().getSignal() == (end_sentinel == SentinelType::END_OF_ROUTE ? 1 : 0));
}
++i;
} catch (const kj::Exception &ex) {
INFO("failed parse " << i << " exception :" << ex.getDescription());
REQUIRE(0);
break;
}
}
REQUIRE(event_cnt == required_event_cnt);
}
}
void write_msg(LoggerState *logger) {
MessageBuilder msg;
msg.initEvent().initClocks();
logger->write(msg.toBytes(), true);
}
TEST_CASE("logger") {
const int segment_cnt = 100;
const std::string log_root = "/tmp/test_logger";
system(("rm " + log_root + " -rf").c_str());
std::string route_name;
{
LoggerState logger(log_root);
route_name = logger.routeName();
for (int i = 0; i < segment_cnt; ++i) {
REQUIRE(logger.next());
REQUIRE(util::file_exists(logger.segmentPath() + "/rlog.lock"));
REQUIRE(logger.segment() == i);
write_msg(&logger);
}
logger.setExitSignal(1);
}
for (int i = 0; i < segment_cnt; ++i) {
verify_segment(log_root + "/" + route_name, i, segment_cnt, 1);
}
}

@ -1,284 +0,0 @@
#!/usr/bin/env python3
import numpy as np
import os
import random
import string
import subprocess
import time
import unittest
from collections import defaultdict
from pathlib import Path
from typing import Dict, List
import cereal.messaging as messaging
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.common.timeout import Timeout
from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import getxattr
from openpilot.system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.system.version import get_version
from openpilot.tools.lib.logreader import LogReader
from cereal.visionipc import VisionIpcServer, VisionStreamType
from openpilot.common.transformations.camera import tici_f_frame_size, tici_d_frame_size, tici_e_frame_size
SentinelType = log.Sentinel.SentinelType
CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in SERVICE_LIST
and SERVICE_LIST[f].should_log and "encode" not in f.lower()]
class TestLoggerd(unittest.TestCase):
def setUp(self):
os.environ.pop("LOG_ROOT", None)
def _get_latest_log_dir(self):
log_dirs = sorted(Path(Paths.log_root()).iterdir(), key=lambda f: f.stat().st_mtime)
return log_dirs[-1]
def _get_log_dir(self, x):
for l in x.splitlines():
for p in l.split(' '):
path = Path(p.strip())
if path.is_dir():
return path
return None
def _get_log_fn(self, x):
for l in x.splitlines():
for p in l.split(' '):
path = Path(p.strip())
if path.is_file():
return path
return None
def _gen_bootlog(self):
with Timeout(5):
out = subprocess.check_output("./bootlog", cwd=os.path.join(BASEDIR, "system/loggerd"), encoding='utf-8')
log_fn = self._get_log_fn(out)
# check existence
assert log_fn is not None
return log_fn
def _check_init_data(self, msgs):
msg = msgs[0]
self.assertEqual(msg.which(), 'initData')
def _check_sentinel(self, msgs, route):
start_type = SentinelType.startOfRoute if route else SentinelType.startOfSegment
self.assertTrue(msgs[1].sentinel.type == start_type)
end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment
self.assertTrue(msgs[-1].sentinel.type == end_type)
def _publish_random_messages(self, services: List[str]) -> Dict[str, list]:
pm = messaging.PubMaster(services)
managed_processes["loggerd"].start()
for s in services:
self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5))
sent_msgs = defaultdict(list)
for _ in range(random.randint(2, 10) * 100):
for s in services:
try:
m = messaging.new_message(s)
except Exception:
m = messaging.new_message(s, random.randint(2, 10))
pm.send(s, m)
sent_msgs[s].append(m)
time.sleep(0.01)
for s in services:
self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5))
managed_processes["loggerd"].stop()
return sent_msgs
def test_init_data_values(self):
os.environ["CLEAN"] = random.choice(["0", "1"])
dongle = ''.join(random.choice(string.printable) for n in range(random.randint(1, 100)))
fake_params = [
# param, initData field, value
("DongleId", "dongleId", dongle),
("GitCommit", "gitCommit", "commit"),
("GitBranch", "gitBranch", "branch"),
("GitRemote", "gitRemote", "remote"),
]
params = Params()
params.clear_all()
for k, _, v in fake_params:
params.put(k, v)
params.put("LaikadEphemerisV3", "abc")
lr = list(LogReader(str(self._gen_bootlog())))
initData = lr[0].initData
self.assertTrue(initData.dirty != bool(os.environ["CLEAN"]))
self.assertEqual(initData.version, get_version())
if os.path.isfile("/proc/cmdline"):
with open("/proc/cmdline") as f:
self.assertEqual(list(initData.kernelArgs), f.read().strip().split(" "))
with open("/proc/version") as f:
self.assertEqual(initData.kernelVersion, f.read())
# check params
logged_params = {entry.key: entry.value for entry in initData.params.entries}
expected_params = {k for k, _, __ in fake_params} | {'LaikadEphemerisV3'}
assert set(logged_params.keys()) == expected_params, set(logged_params.keys()) ^ expected_params
assert logged_params['LaikadEphemerisV3'] == b'', f"DONT_LOG param value was logged: {repr(logged_params['LaikadEphemerisV3'])}"
for param_key, initData_key, v in fake_params:
self.assertEqual(getattr(initData, initData_key), v)
self.assertEqual(logged_params[param_key].decode(), v)
params.put("LaikadEphemerisV3", "")
def test_rotation(self):
os.environ["LOGGERD_TEST"] = "1"
Params().put("RecordFront", "1")
expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"}
streams = [(VisionStreamType.VISION_STREAM_ROAD, (*tici_f_frame_size, 2048*2346, 2048, 2048*1216), "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, (*tici_d_frame_size, 2048*2346, 2048, 2048*1216), "driverCameraState"),
(VisionStreamType.VISION_STREAM_WIDE_ROAD, (*tici_e_frame_size, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")]
pm = messaging.PubMaster(["roadCameraState", "driverCameraState", "wideRoadCameraState"])
vipc_server = VisionIpcServer("camerad")
for stream_type, frame_spec, _ in streams:
vipc_server.create_buffers_with_sizes(stream_type, 40, False, *(frame_spec))
vipc_server.start_listener()
num_segs = random.randint(2, 5)
length = random.randint(1, 3)
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(length)
managed_processes["loggerd"].start()
managed_processes["encoderd"].start()
time.sleep(1)
fps = 20.0
for n in range(1, int(num_segs*length*fps)+1):
time_start = time.monotonic()
for stream_type, frame_spec, state in streams:
dat = np.empty(frame_spec[2], dtype=np.uint8)
vipc_server.send(stream_type, dat[:].flatten().tobytes(), n, n/fps, n/fps)
camera_state = messaging.new_message(state)
frame = getattr(camera_state, state)
frame.frameId = n
pm.send(state, camera_state)
time.sleep(max((1.0/fps) - (time.monotonic() - time_start), 0))
managed_processes["loggerd"].stop()
managed_processes["encoderd"].stop()
route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0]
for n in range(num_segs):
p = Path(f"{route_path}--{n}")
logged = {f.name for f in p.iterdir() if f.is_file()}
diff = logged ^ expected_files
self.assertEqual(len(diff), 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}")
def test_bootlog(self):
# generate bootlog with fake launch log
launch_log = ''.join(str(random.choice(string.printable)) for _ in range(100))
with open("/tmp/launch_log", "w") as f:
f.write(launch_log)
bootlog_path = self._gen_bootlog()
lr = list(LogReader(str(bootlog_path)))
# check length
assert len(lr) == 2 # boot + initData
self._check_init_data(lr)
# check msgs
bootlog_msgs = [m for m in lr if m.which() == 'boot']
assert len(bootlog_msgs) == 1
# sanity check values
boot = bootlog_msgs.pop().boot
assert abs(boot.wallTimeNanos - time.time_ns()) < 5*1e9 # within 5s
assert boot.launchLog == launch_log
for fn in ["console-ramoops", "pmsg-ramoops-0"]:
path = Path(os.path.join("/sys/fs/pstore/", fn))
if path.is_file():
with open(path, "rb") as f:
expected_val = f.read()
bootlog_val = [e.value for e in boot.pstore.entries if e.key == fn][0]
self.assertEqual(expected_val, bootlog_val)
def test_qlog(self):
qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is not None]
no_qlog_services = [s for s in CEREAL_SERVICES if SERVICE_LIST[s].decimation is None]
services = random.sample(qlog_services, random.randint(2, min(10, len(qlog_services)))) + \
random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services))))
sent_msgs = self._publish_random_messages(services)
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog")
lr = list(LogReader(qlog_path))
# check initData and sentinel
self._check_init_data(lr)
self._check_sentinel(lr, True)
recv_msgs = defaultdict(list)
for m in lr:
recv_msgs[m.which()].append(m)
for s, msgs in sent_msgs.items():
recv_cnt = len(recv_msgs[s])
if s in no_qlog_services:
# check services with no specific decimation aren't in qlog
self.assertEqual(recv_cnt, 0, f"got {recv_cnt} {s} msgs in qlog")
else:
# check logged message count matches decimation
expected_cnt = (len(msgs) - 1) // SERVICE_LIST[s].decimation + 1
self.assertEqual(recv_cnt, expected_cnt, f"expected {expected_cnt} msgs for {s}, got {recv_cnt}")
def test_rlog(self):
services = random.sample(CEREAL_SERVICES, random.randint(5, 10))
sent_msgs = self._publish_random_messages(services)
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog")))
# check initData and sentinel
self._check_init_data(lr)
self._check_sentinel(lr, True)
# check all messages were logged and in order
lr = lr[2:-1] # slice off initData and both sentinels
for m in lr:
sent = sent_msgs[m.which()].pop(0)
sent.clear_write_flag()
self.assertEqual(sent.to_bytes(), m.as_builder().to_bytes())
def test_preserving_flagged_segments(self):
services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) | {"userFlag"}
self._publish_random_messages(services)
segment_dir = self._get_latest_log_dir()
self.assertEqual(getxattr(segment_dir, PRESERVE_ATTR_NAME), PRESERVE_ATTR_VALUE)
def test_not_preserving_unflagged_segments(self):
services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) - {"userFlag"}
self._publish_random_messages(services)
segment_dir = self._get_latest_log_dir()
self.assertIsNone(getxattr(segment_dir, PRESERVE_ATTR_NAME))
if __name__ == "__main__":
unittest.main()

@ -1,2 +0,0 @@
#define CATCH_CONFIG_MAIN
#include "catch2/catch.hpp"

@ -1,191 +0,0 @@
#!/usr/bin/env python3
import os
import time
import threading
import unittest
import logging
import json
from pathlib import Path
from typing import List, Optional
from openpilot.system.hardware.hw import Paths
from openpilot.common.swaglog import cloudlog
from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE
from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase
class FakeLogHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
self.reset()
def reset(self):
self.upload_order = list()
self.upload_ignored = list()
def emit(self, record):
try:
j = json.loads(record.getMessage())
if j["event"] == "upload_success":
self.upload_order.append(j["key"])
if j["event"] == "upload_ignored":
self.upload_ignored.append(j["key"])
except Exception:
pass
log_handler = FakeLogHandler()
cloudlog.addHandler(log_handler)
class TestUploader(UploaderTestCase):
def setUp(self):
super().setUp()
log_handler.reset()
def start_thread(self):
self.end_event = threading.Event()
self.up_thread = threading.Thread(target=uploader_fn, args=[self.end_event])
self.up_thread.daemon = True
self.up_thread.start()
def join_thread(self):
self.end_event.set()
self.up_thread.join()
def gen_files(self, lock=False, xattr: Optional[bytes] = None, boot=True) -> List[Path]:
f_paths = []
for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]:
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr))
if boot:
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, upload_xattr=xattr))
return f_paths
def gen_order(self, seg1: List[int], seg2: List[int], boot=True) -> List[str]:
keys = []
if boot:
keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1]
keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2]
keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2]
return keys
def test_upload(self):
self.gen_files(lock=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order:
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
def test_upload_with_wrong_xattr(self):
self.gen_files(lock=False, xattr=b'0')
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order:
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
def test_upload_ignored(self):
self.set_ignore()
self.gen_files(lock=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
exp_order = self.gen_order([self.seg_num], [])
self.assertTrue(len(log_handler.upload_order) == 0, "Some files were not ignored")
self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore")
self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice")
for f_path in exp_order:
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not ignored")
self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order")
def test_upload_files_in_create_order(self):
seg1_nums = [0, 1, 2, 10, 20]
for i in seg1_nums:
self.seg_dir = self.seg_format.format(i)
self.gen_files(boot=False)
seg2_nums = [5, 50, 51]
for i in seg2_nums:
self.seg_dir = self.seg_format2.format(i)
self.gen_files(boot=False)
exp_order = self.gen_order(seg1_nums, seg2_nums, boot=False)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
self.assertTrue(len(log_handler.upload_ignored) == 0, "Some files were ignored")
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order:
self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
def test_no_upload_with_lock_file(self):
self.start_thread()
time.sleep(0.25)
f_paths = self.gen_files(lock=True, boot=False)
# allow enough time that files should have been uploaded if they would be uploaded
time.sleep(5)
self.join_thread()
for f_path in f_paths:
fn = f_path.with_suffix(f_path.suffix.replace(".bz2", ""))
uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE
self.assertFalse(uploaded, "File upload when locked")
def test_no_upload_with_xattr(self):
self.gen_files(lock=False, xattr=UPLOAD_ATTR_VALUE)
self.start_thread()
# allow enough time that files could upload twice if there is a bug in the logic
time.sleep(5)
self.join_thread()
self.assertEqual(len(log_handler.upload_order), 0, "File uploaded again")
def test_clear_locks_on_startup(self):
f_paths = self.gen_files(lock=True, boot=False)
self.start_thread()
time.sleep(1)
self.join_thread()
for f_path in f_paths:
lock_path = f_path.with_suffix(f_path.suffix + ".lock")
self.assertFalse(lock_path.is_file(), "File lock not cleared on startup")
if __name__ == "__main__":
unittest.main()

@ -1 +0,0 @@
test_proclog

@ -1,155 +0,0 @@
#define CATCH_CONFIG_MAIN
#include "catch2/catch.hpp"
#include "common/util.h"
#include "system/proclogd/proclog.h"
const std::string allowed_states = "RSDTZtWXxKWPI";
TEST_CASE("Parser::procStat") {
SECTION("from string") {
const std::string stat_str =
"33012 (code )) S 32978 6620 6620 0 -1 4194368 2042377 0 144 0 24510 11627 0 "
"0 20 0 39 0 53077 830029824 62214 18446744073709551615 94257242783744 94257366235808 "
"140735738643248 0 0 0 0 4098 1073808632 0 0 0 17 2 0 0 2 0 0 94257370858656 94257371248232 "
"94257404952576 140735738648768 140735738648823 140735738648823 140735738650595 0";
auto stat = Parser::procStat(stat_str);
REQUIRE(stat);
REQUIRE(stat->pid == 33012);
REQUIRE(stat->name == "code )");
REQUIRE(stat->state == 'S');
REQUIRE(stat->ppid == 32978);
REQUIRE(stat->utime == 24510);
REQUIRE(stat->stime == 11627);
REQUIRE(stat->cutime == 0);
REQUIRE(stat->cstime == 0);
REQUIRE(stat->priority == 20);
REQUIRE(stat->nice == 0);
REQUIRE(stat->num_threads == 39);
REQUIRE(stat->starttime == 53077);
REQUIRE(stat->vms == 830029824);
REQUIRE(stat->rss == 62214);
REQUIRE(stat->processor == 2);
}
SECTION("all processes") {
std::vector<int> pids = Parser::pids();
REQUIRE(pids.size() > 1);
for (int pid : pids) {
std::string stat_path = "/proc/" + std::to_string(pid) + "/stat";
INFO(stat_path);
if (auto stat = Parser::procStat(util::read_file(stat_path))) {
REQUIRE(stat->pid == pid);
REQUIRE(allowed_states.find(stat->state) != std::string::npos);
} else {
REQUIRE(util::file_exists(stat_path) == false);
}
}
}
}
TEST_CASE("Parser::cpuTimes") {
SECTION("from string") {
std::string stat =
"cpu 0 0 0 0 0 0 0 0 0 0\n"
"cpu0 1 2 3 4 5 6 7 8 9 10\n"
"cpu1 1 2 3 4 5 6 7 8 9 10\n";
std::istringstream stream(stat);
auto stats = Parser::cpuTimes(stream);
REQUIRE(stats.size() == 2);
for (int i = 0; i < stats.size(); ++i) {
REQUIRE(stats[i].id == i);
REQUIRE(stats[i].utime == 1);
REQUIRE(stats[i].ntime ==2);
REQUIRE(stats[i].stime == 3);
REQUIRE(stats[i].itime == 4);
REQUIRE(stats[i].iowtime == 5);
REQUIRE(stats[i].irqtime == 6);
REQUIRE(stats[i].sirqtime == 7);
}
}
SECTION("all cpus") {
std::istringstream stream(util::read_file("/proc/stat"));
auto stats = Parser::cpuTimes(stream);
REQUIRE(stats.size() == sysconf(_SC_NPROCESSORS_ONLN));
for (int i = 0; i < stats.size(); ++i) {
REQUIRE(stats[i].id == i);
}
}
}
TEST_CASE("Parser::memInfo") {
SECTION("from string") {
std::istringstream stream("MemTotal: 1024 kb\nMemFree: 2048 kb\n");
auto meminfo = Parser::memInfo(stream);
REQUIRE(meminfo["MemTotal:"] == 1024 * 1024);
REQUIRE(meminfo["MemFree:"] == 2048 * 1024);
}
SECTION("from /proc/meminfo") {
std::string require_keys[] = {"MemTotal:", "MemFree:", "MemAvailable:", "Buffers:", "Cached:", "Active:", "Inactive:", "Shmem:"};
std::istringstream stream(util::read_file("/proc/meminfo"));
auto meminfo = Parser::memInfo(stream);
for (auto &key : require_keys) {
REQUIRE(meminfo.find(key) != meminfo.end());
REQUIRE(meminfo[key] > 0);
}
}
}
void test_cmdline(std::string cmdline, const std::vector<std::string> requires) {
std::stringstream ss;
ss.write(&cmdline[0], cmdline.size());
auto cmds = Parser::cmdline(ss);
REQUIRE(cmds.size() == requires.size());
for (int i = 0; i < requires.size(); ++i) {
REQUIRE(cmds[i] == requires[i]);
}
}
TEST_CASE("Parser::cmdline") {
test_cmdline(std::string("a\0b\0c\0", 7), {"a", "b", "c"});
test_cmdline(std::string("a\0\0c\0", 6), {"a", "c"});
test_cmdline(std::string("a\0b\0c\0\0\0", 9), {"a", "b", "c"});
}
TEST_CASE("buildProcLogerMessage") {
MessageBuilder msg;
buildProcLogMessage(msg);
kj::Array<capnp::word> buf = capnp::messageToFlatArray(msg);
capnp::FlatArrayMessageReader reader(buf);
auto log = reader.getRoot<cereal::Event>().getProcLog();
REQUIRE(log.totalSize().wordCount > 0);
// test cereal::ProcLog::CPUTimes
auto cpu_times = log.getCpuTimes();
REQUIRE(cpu_times.size() == sysconf(_SC_NPROCESSORS_ONLN));
REQUIRE(cpu_times[cpu_times.size() - 1].getCpuNum() == cpu_times.size() - 1);
// test cereal::ProcLog::Mem
auto mem = log.getMem();
REQUIRE(mem.getTotal() > 0);
REQUIRE(mem.getShared() > 0);
// test cereal::ProcLog::Process
auto procs = log.getProcs();
for (auto p : procs) {
REQUIRE(allowed_states.find(p.getState()) != std::string::npos);
if (p.getPid() == ::getpid()) {
REQUIRE(p.getName() == "test_proclog");
REQUIRE(p.getState() == 'R');
REQUIRE_THAT(p.getExe().cStr(), Catch::Matchers::Contains("test_proclog"));
REQUIRE_THAT(p.getCmdline()[0], Catch::Matchers::Contains("test_proclog"));
} else {
std::string cmd_path = "/proc/" + std::to_string(p.getPid()) + "/cmdline";
if (util::file_exists(cmd_path)) {
std::ifstream stream(cmd_path);
auto cmdline = Parser::cmdline(stream);
REQUIRE(cmdline.size() == p.getCmdline().size());
// do not check the cmdline of pytest as it will change.
if (cmdline.size() > 0 && cmdline[0].find("[pytest") != 0) {
for (int i = 0; i < p.getCmdline().size(); ++i) {
REQUIRE(cmdline[i] == p.getCmdline()[i].cStr());
}
}
}
}
}
}

@ -1,220 +0,0 @@
#include <chrono>
#include <thread>
#include <QDebug>
#include <QEventLoop>
#include "catch2/catch.hpp"
#include "common/util.h"
#include "tools/replay/replay.h"
#include "tools/replay/util.h"
const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2";
const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3";
bool download_to_file(const std::string &url, const std::string &local_file, int chunk_size = 5 * 1024 * 1024, int retries = 3) {
do {
if (httpDownload(url, local_file, chunk_size)) {
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
} while (--retries >= 0);
return false;
}
TEST_CASE("httpMultiPartDownload") {
char filename[] = "/tmp/XXXXXX";
close(mkstemp(filename));
const size_t chunk_size = 5 * 1024 * 1024;
std::string content;
SECTION("download to file") {
REQUIRE(download_to_file(TEST_RLOG_URL, filename, chunk_size));
content = util::read_file(filename);
}
SECTION("download to buffer") {
for (int i = 0; i < 3 && content.empty(); ++i) {
content = httpGet(TEST_RLOG_URL, chunk_size);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
REQUIRE(!content.empty());
}
REQUIRE(content.size() == 9112651);
REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM);
}
TEST_CASE("FileReader") {
auto enable_local_cache = GENERATE(true, false);
std::string cache_file = cacheFilePath(TEST_RLOG_URL);
system(("rm " + cache_file + " -f").c_str());
FileReader reader(enable_local_cache);
std::string content = reader.read(TEST_RLOG_URL);
REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM);
if (enable_local_cache) {
REQUIRE(sha256(util::read_file(cache_file)) == TEST_RLOG_CHECKSUM);
} else {
REQUIRE(util::file_exists(cache_file) == false);
}
}
TEST_CASE("LogReader") {
SECTION("corrupt log") {
FileReader reader(true);
std::string corrupt_content = reader.read(TEST_RLOG_URL);
corrupt_content.resize(corrupt_content.length() / 2);
corrupt_content = decompressBZ2(corrupt_content);
LogReader log;
REQUIRE(log.load((std::byte *)corrupt_content.data(), corrupt_content.size()));
REQUIRE(log.events.size() > 0);
}
}
void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) {
QEventLoop loop;
Segment segment(n, segment_file, flags);
QObject::connect(&segment, &Segment::loadFinished, [&]() {
REQUIRE(segment.isLoaded() == true);
REQUIRE(segment.log != nullptr);
REQUIRE(segment.frames[RoadCam] != nullptr);
if (flags & REPLAY_FLAG_DCAM) {
REQUIRE(segment.frames[DriverCam] != nullptr);
}
if (flags & REPLAY_FLAG_ECAM) {
REQUIRE(segment.frames[WideRoadCam] != nullptr);
}
// test LogReader & FrameReader
REQUIRE(segment.log->events.size() > 0);
REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan()));
for (auto cam : ALL_CAMERAS) {
auto &fr = segment.frames[cam];
if (!fr) continue;
if (cam == RoadCam || cam == WideRoadCam) {
REQUIRE(fr->getFrameCount() == 1200);
}
auto [nv12_width, nv12_height, nv12_buffer_size] = get_nv12_info(fr->width, fr->height);
VisionBuf buf;
buf.allocate(nv12_buffer_size);
buf.init_yuv(fr->width, fr->height, nv12_width, nv12_width * nv12_height);
// sequence get 100 frames
for (int i = 0; i < 100; ++i) {
REQUIRE(fr->get(i, &buf));
}
}
loop.quit();
});
loop.exec();
}
std::string download_demo_route() {
static std::string data_dir;
if (data_dir == "") {
char tmp_path[] = "/tmp/root_XXXXXX";
data_dir = mkdtemp(tmp_path);
Route remote_route(DEMO_ROUTE);
assert(remote_route.load());
// Create a local route from remote for testing
const std::string route_name = DEMO_ROUTE.mid(17).toStdString();
for (int i = 0; i < 2; ++i) {
std::string log_path = util::string_format("%s/%s--%d/", data_dir.c_str(), route_name.c_str(), i);
util::create_directories(log_path, 0755);
REQUIRE(download_to_file(remote_route.at(i).rlog.toStdString(), log_path + "rlog.bz2"));
REQUIRE(download_to_file(remote_route.at(i).driver_cam.toStdString(), log_path + "dcamera.hevc"));
REQUIRE(download_to_file(remote_route.at(i).wide_road_cam.toStdString(), log_path + "ecamera.hevc"));
REQUIRE(download_to_file(remote_route.at(i).qcamera.toStdString(), log_path + "qcamera.ts"));
}
}
return data_dir;
}
TEST_CASE("Local route") {
std::string data_dir = download_demo_route();
auto flags = GENERATE(REPLAY_FLAG_DCAM | REPLAY_FLAG_ECAM, REPLAY_FLAG_QCAMERA);
Route route(DEMO_ROUTE, QString::fromStdString(data_dir));
REQUIRE(route.load());
REQUIRE(route.segments().size() == 2);
for (int i = 0; i < route.segments().size(); ++i) {
read_segment(i, route.at(i), flags);
}
}
TEST_CASE("Remote route") {
auto flags = GENERATE(REPLAY_FLAG_DCAM | REPLAY_FLAG_ECAM, REPLAY_FLAG_QCAMERA);
Route route(DEMO_ROUTE);
REQUIRE(route.load());
REQUIRE(route.segments().size() == 13);
for (int i = 0; i < 2; ++i) {
read_segment(i, route.at(i), flags);
}
}
// helper class for unit tests
class TestReplay : public Replay {
public:
TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {}
void test_seek();
void testSeekTo(int seek_to);
};
void TestReplay::testSeekTo(int seek_to) {
seekTo(seek_to, false);
while (true) {
std::unique_lock lk(stream_lock_);
stream_cv_.wait(lk, [=]() { return events_updated_ == true; });
events_updated_ = false;
if (cur_mono_time_ != route_start_ts_ + seek_to * 1e9) {
// wake up by the previous merging, skip it.
continue;
}
Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_);
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) {
qDebug() << "waiting for events...";
continue;
}
REQUIRE(std::is_sorted(events_->begin(), events_->end(), Event::lessThan()));
const int seek_to_segment = seek_to / 60;
const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9;
current_segment_ = event_seconds / 60;
INFO("seek to [" << seek_to << "s segment " << seek_to_segment << "], events [" << event_seconds << "s segment" << current_segment_ << "]");
REQUIRE(event_seconds >= seek_to);
if (event_seconds > seek_to) {
auto it = segments_.lower_bound(seek_to_segment);
REQUIRE(it->first == current_segment_);
}
break;
}
}
void TestReplay::test_seek() {
// create a dummy stream thread
stream_thread_ = new QThread(this);
QEventLoop loop;
std::thread thread = std::thread([&]() {
for (int i = 0; i < 10; ++i) {
testSeekTo(util::random_int(0, 2 * 60));
}
loop.quit();
});
loop.exec();
thread.join();
}
TEST_CASE("Replay") {
TestReplay replay(DEMO_ROUTE);
REQUIRE(replay.load());
replay.test_seek();
}

@ -1,10 +0,0 @@
#define CATCH_CONFIG_RUNNER
#include "catch2/catch.hpp"
#include <QCoreApplication>
int main(int argc, char **argv) {
// unit tests for Qt
QCoreApplication app(argc, argv);
const int res = Catch::Session().run(argc, argv);
return (res < 0xff ? res : 0xff);
}
Loading…
Cancel
Save