|
|
@ -10,6 +10,7 @@ from cereal import log, car |
|
|
|
from common.params import Params |
|
|
|
from common.params import Params |
|
|
|
from selfdrive.car.fingerprints import all_known_cars |
|
|
|
from selfdrive.car.fingerprints import all_known_cars |
|
|
|
from selfdrive.car.car_helpers import interfaces |
|
|
|
from selfdrive.car.car_helpers import interfaces |
|
|
|
|
|
|
|
from selfdrive.car.gm.values import CAR as GM |
|
|
|
from selfdrive.car.honda.values import HONDA_BOSCH, CAR as HONDA |
|
|
|
from selfdrive.car.honda.values import HONDA_BOSCH, CAR as HONDA |
|
|
|
from selfdrive.car.chrysler.values import CAR as CHRYSLER |
|
|
|
from selfdrive.car.chrysler.values import CAR as CHRYSLER |
|
|
|
from selfdrive.car.hyundai.values import CAR as HYUNDAI |
|
|
|
from selfdrive.car.hyundai.values import CAR as HYUNDAI |
|
|
@ -37,6 +38,11 @@ ignore_carstate_check = [ |
|
|
|
CHRYSLER.PACIFICA_2017_HYBRID, |
|
|
|
CHRYSLER.PACIFICA_2017_HYBRID, |
|
|
|
] |
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ignore_addr_checks_valid = [ |
|
|
|
|
|
|
|
GM.BUICK_REGAL, |
|
|
|
|
|
|
|
HYUNDAI.GENESIS_G70_2020, |
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
@parameterized_class(('car_model'), [(car,) for i, car in enumerate(sorted(all_known_cars())) if i % NUM_JOBS == JOB_ID]) |
|
|
|
@parameterized_class(('car_model'), [(car,) for i, car in enumerate(sorted(all_known_cars())) if i % NUM_JOBS == JOB_ID]) |
|
|
|
class TestCarModel(unittest.TestCase): |
|
|
|
class TestCarModel(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
|
@ -50,35 +56,35 @@ class TestCarModel(unittest.TestCase): |
|
|
|
else: |
|
|
|
else: |
|
|
|
raise Exception(f"missing test route for car {cls.car_model}") |
|
|
|
raise Exception(f"missing test route for car {cls.car_model}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
params = Params() |
|
|
|
|
|
|
|
params.clear_all() |
|
|
|
|
|
|
|
|
|
|
|
for seg in [2, 1, 0]: |
|
|
|
for seg in [2, 1, 0]: |
|
|
|
try: |
|
|
|
try: |
|
|
|
lr = LogReader(get_url(ROUTES[cls.car_model], seg)) |
|
|
|
lr = LogReader(get_url(ROUTES[cls.car_model], seg)) |
|
|
|
break |
|
|
|
|
|
|
|
except Exception: |
|
|
|
except Exception: |
|
|
|
lr = None |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if lr is None: |
|
|
|
can_msgs = [] |
|
|
|
raise Exception("Route not found. Is it uploaded?") |
|
|
|
fingerprint = {i: dict() for i in range(3)} |
|
|
|
|
|
|
|
for msg in lr: |
|
|
|
params = Params() |
|
|
|
if msg.which() == "can": |
|
|
|
params.clear_all() |
|
|
|
for m in msg.can: |
|
|
|
|
|
|
|
if m.src < 64: |
|
|
|
can_msgs = [] |
|
|
|
fingerprint[m.src][m.address] = len(m.dat) |
|
|
|
fingerprint = {i: dict() for i in range(3)} |
|
|
|
can_msgs.append(msg) |
|
|
|
for msg in lr: |
|
|
|
elif msg.which() == "carParams": |
|
|
|
if msg.which() == "can": |
|
|
|
if msg.carParams.openpilotLongitudinalControl: |
|
|
|
for m in msg.can: |
|
|
|
params.put_bool("DisableRadar", True) |
|
|
|
if m.src < 64: |
|
|
|
|
|
|
|
fingerprint[m.src][m.address] = len(m.dat) |
|
|
|
if len(can_msgs): |
|
|
|
can_msgs.append(msg) |
|
|
|
break |
|
|
|
elif msg.which() == "carParams": |
|
|
|
else: |
|
|
|
if msg.carParams.openpilotLongitudinalControl: |
|
|
|
raise Exception("Route not found or no CAN msgs found. Is it uploaded?") |
|
|
|
params.put_bool("DisableRadar", True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cls.can_msgs = sorted(can_msgs, key=lambda msg: msg.logMonoTime) |
|
|
|
cls.can_msgs = sorted(can_msgs, key=lambda msg: msg.logMonoTime) |
|
|
|
|
|
|
|
|
|
|
|
cls.CarInterface, cls.CarController, cls.CarState = interfaces[cls.car_model] |
|
|
|
cls.CarInterface, cls.CarController, cls.CarState = interfaces[cls.car_model] |
|
|
|
|
|
|
|
|
|
|
|
cls.CP = cls.CarInterface.get_params(cls.car_model, fingerprint, []) |
|
|
|
cls.CP = cls.CarInterface.get_params(cls.car_model, fingerprint, []) |
|
|
|
assert cls.CP |
|
|
|
assert cls.CP |
|
|
|
|
|
|
|
|
|
|
@ -108,6 +114,8 @@ class TestCarModel(unittest.TestCase): |
|
|
|
self.assertTrue(len(self.CP.lateralTuning.lqr.a)) |
|
|
|
self.assertTrue(len(self.CP.lateralTuning.lqr.a)) |
|
|
|
elif tuning == 'indi': |
|
|
|
elif tuning == 'indi': |
|
|
|
self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV)) |
|
|
|
self.assertTrue(len(self.CP.lateralTuning.indi.outerLoopGainV)) |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
raise Exception("unkown tuning") |
|
|
|
|
|
|
|
|
|
|
|
def test_car_interface(self): |
|
|
|
def test_car_interface(self): |
|
|
|
# TODO: also check for checkusm and counter violations from can parser |
|
|
|
# TODO: also check for checkusm and counter violations from can parser |
|
|
@ -141,14 +149,28 @@ class TestCarModel(unittest.TestCase): |
|
|
|
if self.CP.dashcamOnly: |
|
|
|
if self.CP.dashcamOnly: |
|
|
|
self.skipTest("no need to check panda safety for dashcamOnly") |
|
|
|
self.skipTest("no need to check panda safety for dashcamOnly") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start_ts = self.can_msgs[0].logMonoTime |
|
|
|
|
|
|
|
|
|
|
|
failed_addrs = Counter() |
|
|
|
failed_addrs = Counter() |
|
|
|
for can in self.can_msgs: |
|
|
|
for can in self.can_msgs: |
|
|
|
|
|
|
|
# update panda timer |
|
|
|
|
|
|
|
t = (can.logMonoTime - start_ts) / 1e3 |
|
|
|
|
|
|
|
self.safety.set_timer(int(t)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# run all msgs through the safety RX hook |
|
|
|
for msg in can.can: |
|
|
|
for msg in can.can: |
|
|
|
if msg.src >= 64: |
|
|
|
if msg.src >= 64: |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
to_send = package_can_msg([msg.address, 0, msg.dat, msg.src]) |
|
|
|
to_send = package_can_msg([msg.address, 0, msg.dat, msg.src]) |
|
|
|
if self.safety.safety_rx_hook(to_send) != 1: |
|
|
|
if self.safety.safety_rx_hook(to_send) != 1: |
|
|
|
failed_addrs[hex(msg.address)] += 1 |
|
|
|
failed_addrs[hex(msg.address)] += 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ensure all msgs defined in the addr checks are valid |
|
|
|
|
|
|
|
if self.car_model not in ignore_addr_checks_valid: |
|
|
|
|
|
|
|
self.safety.safety_tick_current_rx_checks() |
|
|
|
|
|
|
|
if t > 1e6: |
|
|
|
|
|
|
|
self.assertTrue(self.safety.addr_checks_valid()) |
|
|
|
self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") |
|
|
|
self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") |
|
|
|
|
|
|
|
|
|
|
|
def test_panda_safety_carstate(self): |
|
|
|
def test_panda_safety_carstate(self): |
|
|
|