|
|
@ -26,7 +26,7 @@ from openpilot.tools.lib.logreader import LogReader, LogsUnavailable, openpilotc |
|
|
|
internal_source_zst, comma_api_source, auto_source |
|
|
|
internal_source_zst, comma_api_source, auto_source |
|
|
|
from openpilot.tools.lib.route import SegmentName |
|
|
|
from openpilot.tools.lib.route import SegmentName |
|
|
|
|
|
|
|
|
|
|
|
from panda.tests.libpanda import libpanda_py |
|
|
|
from panda.tests.libsafety import libsafety_py |
|
|
|
|
|
|
|
|
|
|
|
SafetyModel = car.CarParams.SafetyModel |
|
|
|
SafetyModel = car.CarParams.SafetyModel |
|
|
|
|
|
|
|
|
|
|
@ -169,7 +169,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
assert self.CI |
|
|
|
assert self.CI |
|
|
|
|
|
|
|
|
|
|
|
# TODO: check safetyModel is in release panda build |
|
|
|
# TODO: check safetyModel is in release panda build |
|
|
|
self.safety = libpanda_py.libpanda |
|
|
|
self.safety = libsafety_py.libsafety |
|
|
|
|
|
|
|
|
|
|
|
cfg = self.CP.safetyConfigs[-1] |
|
|
|
cfg = self.CP.safetyConfigs[-1] |
|
|
|
set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) |
|
|
|
set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) |
|
|
@ -241,7 +241,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
if msg.src >= 64: |
|
|
|
if msg.src >= 64: |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) |
|
|
|
to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) |
|
|
|
if self.safety.safety_rx_hook(to_send) != 1: |
|
|
|
if self.safety.safety_rx_hook(to_send) != 1: |
|
|
|
failed_addrs[hex(msg.address)] += 1 |
|
|
|
failed_addrs[hex(msg.address)] += 1 |
|
|
|
|
|
|
|
|
|
|
@ -280,7 +280,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
now_nanos += DT_CTRL * 1e9 |
|
|
|
now_nanos += DT_CTRL * 1e9 |
|
|
|
msgs_sent += len(sendcan) |
|
|
|
msgs_sent += len(sendcan) |
|
|
|
for addr, dat, bus in sendcan: |
|
|
|
for addr, dat, bus in sendcan: |
|
|
|
to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat) |
|
|
|
to_send = libsafety_py.make_CANPacket(addr, bus % 4, dat) |
|
|
|
self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus)) |
|
|
|
self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus)) |
|
|
|
|
|
|
|
|
|
|
|
# Make sure we attempted to send messages |
|
|
|
# Make sure we attempted to send messages |
|
|
@ -331,7 +331,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
prev_panda_cruise_engaged = self.safety.get_cruise_engaged_prev() |
|
|
|
prev_panda_cruise_engaged = self.safety.get_cruise_engaged_prev() |
|
|
|
prev_panda_acc_main_on = self.safety.get_acc_main_on() |
|
|
|
prev_panda_acc_main_on = self.safety.get_acc_main_on() |
|
|
|
|
|
|
|
|
|
|
|
to_send = libpanda_py.make_CANPacket(address, bus, dat) |
|
|
|
to_send = libsafety_py.make_CANPacket(address, bus, dat) |
|
|
|
self.safety.safety_rx_hook(to_send) |
|
|
|
self.safety.safety_rx_hook(to_send) |
|
|
|
|
|
|
|
|
|
|
|
can = [(int(time.monotonic() * 1e9), [CanData(address=address, dat=dat, src=bus)])] |
|
|
|
can = [(int(time.monotonic() * 1e9), [CanData(address=address, dat=dat, src=bus)])] |
|
|
@ -374,7 +374,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
for can in self.can_msgs[:300]: |
|
|
|
for can in self.can_msgs[:300]: |
|
|
|
self.CI.update(can_capnp_to_list((can.as_builder().to_bytes(), ))) |
|
|
|
self.CI.update(can_capnp_to_list((can.as_builder().to_bytes(), ))) |
|
|
|
for msg in filter(lambda m: m.src in range(64), can.can): |
|
|
|
for msg in filter(lambda m: m.src in range(64), can.can): |
|
|
|
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) |
|
|
|
to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) |
|
|
|
self.safety.safety_rx_hook(to_send) |
|
|
|
self.safety.safety_rx_hook(to_send) |
|
|
|
|
|
|
|
|
|
|
|
controls_allowed_prev = False |
|
|
|
controls_allowed_prev = False |
|
|
@ -383,7 +383,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
for idx, can in enumerate(self.can_msgs): |
|
|
|
for idx, can in enumerate(self.can_msgs): |
|
|
|
CS = self.CI.update(can_capnp_to_list((can.as_builder().to_bytes(), ))).as_reader() |
|
|
|
CS = self.CI.update(can_capnp_to_list((can.as_builder().to_bytes(), ))).as_reader() |
|
|
|
for msg in filter(lambda m: m.src in range(64), can.can): |
|
|
|
for msg in filter(lambda m: m.src in range(64), can.can): |
|
|
|
to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) |
|
|
|
to_send = libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) |
|
|
|
ret = self.safety.safety_rx_hook(to_send) |
|
|
|
ret = self.safety.safety_rx_hook(to_send) |
|
|
|
self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}") |
|
|
|
self.assertEqual(1, ret, f"safety rx failed ({ret=}): {(msg.address, msg.src % 4)}") |
|
|
|
|
|
|
|
|
|
|
|