openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

356 lines
13 KiB

#!/usr/bin/env python3
# pylint: disable=E1101
import os
import importlib
2 years ago
import time
import unittest
from collections import defaultdict, Counter
from typing import List, Optional, Tuple
from parameterized import parameterized_class
from cereal import log, car
3 years ago
from common.basedir import BASEDIR
from common.realtime import DT_CTRL
from selfdrive.car.fingerprints import all_known_cars
2 years ago
from selfdrive.car.car_helpers import FRAME_FINGERPRINT, interfaces
from selfdrive.car.gm.values import CAR as GM
from selfdrive.car.honda.values import CAR as HONDA, HONDA_BOSCH
from selfdrive.car.hyundai.values import CAR as HYUNDAI
from selfdrive.car.tests.routes import non_tested_cars, routes, CarTestRoute
from selfdrive.test.openpilotci import get_url
from tools.lib.logreader import LogReader
2 years ago
from tools.lib.route import Route, SegmentName, RouteName
2 years ago
from selfdrive.car import gen_empty_fingerprint
from panda.tests.libpanda import libpanda_py
PandaType = log.PandaState.PandaType
NUM_JOBS = int(os.environ.get("NUM_JOBS", "1"))
JOB_ID = int(os.environ.get("JOB_ID", "0"))
2 years ago
INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "")
ignore_addr_checks_valid = [
GM.BUICK_REGAL,
HYUNDAI.GENESIS_G70_2020,
]
2 years ago
def load_test_cases_from_file(file_path):
test_cases: List[Tuple[str, Optional[CarTestRoute]]] = []
with open(os.path.join(BASEDIR, file_path), "r") as f:
3 years ago
seg_list = f.read().splitlines()
platforms, segs = seg_list[0::2], seg_list[1::2]
2 years ago
for platform, seg in zip(platforms, segs):
2 years ago
segment_name = SegmentName(seg)
2 years ago
test_cases.append((platform[2:], CarTestRoute(segment_name.route_name.canonical_name, platform[2:],
segment=segment_name.segment_num)))
2 years ago
return test_cases
# build list of test cases
test_cases: List[Tuple[str, Optional[CarTestRoute]]] = []
if len(INTERNAL_SEG_LIST):
test_cases = load_test_cases_from_file(INTERNAL_SEG_LIST)
3 years ago
else:
routes_by_car = defaultdict(set)
for r in routes:
routes_by_car[r.car_model].add(r)
for i, c in enumerate(sorted(all_known_cars())):
if i % NUM_JOBS == JOB_ID:
test_cases.extend(sorted((c, r) for r in routes_by_car.get(c, (None, ))))
SKIP_ENV_VAR = "SKIP_LONG_TESTS"
class TestCarModelBase(unittest.TestCase):
car_model = None
test_route = None
ci = True
@unittest.skipIf(SKIP_ENV_VAR in os.environ, f"Long running test skipped. Unset {SKIP_ENV_VAR} to run")
@classmethod
def setUpClass(cls):
if cls.__name__ == 'TestCarModel' or cls.__name__.endswith('Base'):
raise unittest.SkipTest
if 'FILTER' in os.environ:
if not cls.car_model.startswith(tuple(os.environ.get('FILTER').split(','))):
raise unittest.SkipTest
if cls.test_route is None:
if cls.car_model in non_tested_cars:
print(f"Skipping tests for {cls.car_model}: missing route")
raise unittest.SkipTest
raise Exception(f"missing test route for {cls.car_model}")
experimental_long = False
test_segs = (2, 1, 0)
if cls.test_route.segment is not None:
test_segs = (cls.test_route.segment,)
for seg in test_segs:
try:
2 years ago
if len(INTERNAL_SEG_LIST):
2 years ago
route_name = RouteName(cls.test_route.route)
lr = LogReader(f"cd:/{route_name.dongle_id}/{route_name.time_str}/{seg}/rlog.bz2")
elif cls.ci:
lr = LogReader(get_url(cls.test_route.route, seg))
else:
lr = LogReader(Route(cls.test_route.route).log_paths()[seg])
except Exception:
continue
2 years ago
t = time.perf_counter()
car_fw = []
can_msgs = []
fingerprint = defaultdict(dict)
2 years ago
start_time = None
# fingerprint = gen_empty_fingerprint()
for idx, msg in enumerate(lr):
if msg.which() == "can":
2 years ago
# gather fingerprint for 2s
can_msgs.append(msg)
if idx > FRAME_FINGERPRINT * 2:
continue
for m in msg.can:
if m.src < 64:
fingerprint[m.src][m.address] = len(m.dat)
2 years ago
elif msg.which() == "carParams":
car_fw = msg.carParams.carFw
if msg.carParams.openpilotLongitudinalControl:
experimental_long = True
if cls.car_model is None and not cls.ci:
cls.car_model = msg.carParams.carFingerprint
if len(can_msgs) > int(50 / DT_CTRL):
2 years ago
print('can_msgs', time.perf_counter() - t)
break
else:
raise Exception(f"Route: {repr(cls.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded?")
cls.can_msgs = sorted(can_msgs, key=lambda msg: msg.logMonoTime)
cls.CarInterface, cls.CarController, cls.CarState = interfaces[cls.car_model]
cls.CP = cls.CarInterface.get_params(cls.car_model, fingerprint, car_fw, experimental_long, docs=False)
assert cls.CP
assert cls.CP.carFingerprint == cls.car_model
@classmethod
def tearDownClass(cls):
del cls.can_msgs
def setUp(self):
self.CI = self.CarInterface(self.CP, self.CarController, self.CarState)
assert self.CI
# TODO: check safetyModel is in release panda build
self.safety = libpanda_py.libpanda
cfg = self.CP.safetyConfigs[-1]
set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam)
self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
self.safety.init_tests()
def test_car_params(self):
if self.CP.dashcamOnly:
self.skipTest("no need to check carParams for dashcamOnly")
# make sure car params are within a valid range
self.assertGreater(self.CP.mass, 1)
if self.CP.steerControlType != car.CarParams.SteerControlType.angle:
tuning = self.CP.lateralTuning.which()
if tuning == 'pid':
self.assertTrue(len(self.CP.lateralTuning.pid.kpV))
elif tuning == 'torque':
self.assertTrue(self.CP.lateralTuning.torque.kf > 0)
elif tuning == 'indi':
self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV))
else:
raise Exception("unknown tuning")
2 years ago
# def test_car_interface(self):
# # TODO: also check for checksum violations from can parser
# can_invalid_cnt = 0
# can_valid = False
# CC = car.CarControl.new_message()
#
# for i, msg in enumerate(self.can_msgs):
# CS = self.CI.update(CC, (msg.as_builder().to_bytes(),))
# self.CI.apply(CC, msg.logMonoTime)
#
# if CS.canValid:
# can_valid = True
#
# # wait max of 2s for low frequency msgs to be seen
# if i > 200 or can_valid:
# can_invalid_cnt += not CS.canValid
#
# self.assertEqual(can_invalid_cnt, 0)
#
# def test_radar_interface(self):
# os.environ['NO_RADAR_SLEEP'] = "1"
# RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface
# RI = RadarInterface(self.CP)
# assert RI
#
# error_cnt = 0
# for i, msg in enumerate(self.can_msgs):
# rr = RI.update((msg.as_builder().to_bytes(),))
# if rr is not None and i > 50:
# error_cnt += car.RadarData.Error.canError in rr.errors
# self.assertEqual(error_cnt, 0)
#
# def test_panda_safety_rx_valid(self):
# if self.CP.dashcamOnly:
# self.skipTest("no need to check panda safety for dashcamOnly")
#
# start_ts = self.can_msgs[0].logMonoTime
#
# failed_addrs = Counter()
# for can in self.can_msgs:
# # update panda timer
# t = (can.logMonoTime - start_ts) / 1e3
# self.safety.set_timer(int(t))
#
# # run all msgs through the safety RX hook
# for msg in can.can:
# if msg.src >= 64:
# continue
#
# to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
# if self.safety.safety_rx_hook(to_send) != 1:
# failed_addrs[hex(msg.address)] += 1
#
# # ensure all msgs defined in the addr checks are valid
# if self.car_model not in ignore_addr_checks_valid:
# self.safety.safety_tick_current_rx_checks()
# if t > 1e6:
# self.assertTrue(self.safety.addr_checks_valid())
# self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}")
# def test_panda_safety_tx_cases(self, data=None):
# """Asserts we can tx common messages"""
# if self.CP.notCar:
# self.skipTest("Skipping test for notCar")
#
# def test_car_controller(car_control):
# now_nanos = 0
# msgs_sent = 0
# CI = self.CarInterface(self.CP, self.CarController, self.CarState)
# for _ in range(round(10.0 / DT_CTRL)): # make sure we hit the slowest messages
# CI.update(car_control, [])
# _, sendcan = CI.apply(car_control, now_nanos)
#
# now_nanos += DT_CTRL * 1e9
# msgs_sent += len(sendcan)
# for addr, _, dat, bus in sendcan:
# to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat)
# self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus))
#
# # Make sure we attempted to send messages
# self.assertGreater(msgs_sent, 50)
#
# # Make sure we can send all messages while inactive
# CC = car.CarControl.new_message()
# test_car_controller(CC)
#
# # Test cancel + general messages (controls_allowed=False & cruise_engaged=True)
# self.safety.set_cruise_engaged_prev(True)
# CC = car.CarControl.new_message(cruiseControl={'cancel': True})
# test_car_controller(CC)
#
# # Test resume + general messages (controls_allowed=True & cruise_engaged=True)
# self.safety.set_controls_allowed(True)
# CC = car.CarControl.new_message(cruiseControl={'resume': True})
# test_car_controller(CC)
# def test_panda_safety_carstate(self):
# """
# Assert that panda safety matches openpilot's carState
# """
# if self.CP.dashcamOnly:
# self.skipTest("no need to check panda safety for dashcamOnly")
#
# CC = car.CarControl.new_message()
#
# # warm up pass, as initial states may be different
# for can in self.can_msgs[:300]:
# self.CI.update(CC, (can.as_builder().to_bytes(), ))
# for msg in filter(lambda m: m.src in range(64), can.can):
# to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
# self.safety.safety_rx_hook(to_send)
#
# controls_allowed_prev = False
# CS_prev = car.CarState.new_message()
# checks = defaultdict(lambda: 0)
# for idx, can in enumerate(self.can_msgs):
# CS = self.CI.update(CC, (can.as_builder().to_bytes(), ))
# for msg in filter(lambda m: m.src in range(64), can.can):
# to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat)
# ret = self.safety.safety_rx_hook(to_send)
# self.assertEqual(1, ret, f"safety rx failed ({ret=}): {to_send}")
#
# # Skip first frame so CS_prev is properly initialized
# if idx == 0:
# CS_prev = CS
# # Button may be left pressed in warm up period
# if not self.CP.pcmCruise:
# self.safety.set_controls_allowed(0)
# continue
#
# # TODO: check rest of panda's carstate (steering, ACC main on, etc.)
#
# checks['gasPressed'] += CS.gasPressed != self.safety.get_gas_pressed_prev()
# if self.CP.carName not in ("hyundai", "body"):
# # TODO: fix standstill mismatches for other makes
# checks['standstill'] += CS.standstill == self.safety.get_vehicle_moving()
#
# # TODO: remove this exception once this mismatch is resolved
# brake_pressed = CS.brakePressed
# if CS.brakePressed and not self.safety.get_brake_pressed_prev():
# if self.CP.carFingerprint in (HONDA.PILOT, HONDA.RIDGELINE) and CS.brake > 0.05:
# brake_pressed = False
# checks['brakePressed'] += brake_pressed != self.safety.get_brake_pressed_prev()
# checks['regenBraking'] += CS.regenBraking != self.safety.get_regen_braking_prev()
#
# if self.CP.pcmCruise:
# # On most pcmCruise cars, openpilot's state is always tied to the PCM's cruise state.
# # On Honda Nidec, we always engage on the rising edge of the PCM cruise state, but
# # openpilot brakes to zero even if the min ACC speed is non-zero (i.e. the PCM disengages).
# if self.CP.carName == "honda" and self.CP.carFingerprint not in HONDA_BOSCH:
# # only the rising edges are expected to match
# if CS.cruiseState.enabled and not CS_prev.cruiseState.enabled:
# checks['controlsAllowed'] += not self.safety.get_controls_allowed()
# else:
# checks['controlsAllowed'] += not CS.cruiseState.enabled and self.safety.get_controls_allowed()
# else:
# # Check for enable events on rising edge of controls allowed
# button_enable = any(evt.enable for evt in CS.events)
# mismatch = button_enable != (self.safety.get_controls_allowed() and not controls_allowed_prev)
# checks['controlsAllowed'] += mismatch
# controls_allowed_prev = self.safety.get_controls_allowed()
# if button_enable and not mismatch:
# self.safety.set_controls_allowed(False)
#
# if self.CP.carName == "honda":
# checks['mainOn'] += CS.cruiseState.available != self.safety.get_acc_main_on()
#
# CS_prev = CS
#
# failed_checks = {k: v for k, v in checks.items() if v > 0}
# self.assertFalse(len(failed_checks), f"panda safety doesn't agree with openpilot: {failed_checks}")
@parameterized_class(('car_model', 'test_route'), test_cases)
class TestCarModel(TestCarModelBase):
pass
if __name__ == "__main__":
unittest.main()