openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

161 lines
5.3 KiB

#!/usr/bin/env python3
import random
import unittest
from opendbc.car.structs import CarParams
from panda import DLC_TO_LEN, USBPACKET_MAX_SIZE, pack_can_buffer, unpack_can_buffer
from panda.tests.libpanda import libpanda_py
lpp = libpanda_py.libpanda
CHUNK_SIZE = USBPACKET_MAX_SIZE
TX_QUEUES = (lpp.tx1_q, lpp.tx2_q, lpp.tx3_q)
def unpackage_can_msg(pkt):
dat_len = DLC_TO_LEN[pkt[0].data_len_code]
dat = bytes(pkt[0].data[0:dat_len])
return pkt[0].addr, dat, pkt[0].bus
def random_can_messages(n, bus=None):
msgs = []
for _ in range(n):
if bus is None:
bus = random.randint(0, 3)
address = random.randint(1, (1 << 29) - 1)
data = bytes([random.getrandbits(8) for _ in range(DLC_TO_LEN[random.randrange(0, len(DLC_TO_LEN))])])
msgs.append((address, data, bus))
return msgs
class TestPandaComms(unittest.TestCase):
def setUp(self):
lpp.comms_can_reset()
def test_tx_queues(self):
for bus in range(len(TX_QUEUES)):
message = (0x100, b"test", bus)
can_pkt_tx = libpanda_py.make_CANPacket(message[0], message[2], message[1])
can_pkt_rx = libpanda_py.ffi.new('CANPacket_t *')
assert lpp.can_push(TX_QUEUES[bus], can_pkt_tx), "CAN push failed"
assert lpp.can_pop(TX_QUEUES[bus], can_pkt_rx), "CAN pop failed"
assert unpackage_can_msg(can_pkt_rx) == message
def test_comms_reset_rx(self):
# store some test messages in the queue
test_msg = (0x100, b"test", 0)
for _ in range(100):
can_pkt_tx = libpanda_py.make_CANPacket(test_msg[0], test_msg[2], test_msg[1])
lpp.can_push(lpp.rx_q, can_pkt_tx)
# read a small chunk such that we have some overflow
TINY_CHUNK_SIZE = 6
dat = libpanda_py.ffi.new(f"uint8_t[{TINY_CHUNK_SIZE}]")
rx_len = lpp.comms_can_read(dat, TINY_CHUNK_SIZE)
assert rx_len == TINY_CHUNK_SIZE, "comms_can_read returned too little data"
_, overflow = unpack_can_buffer(bytes(dat))
assert len(overflow) > 0, "overflow buffer should not be empty"
# reset the comms to clear the overflow buffer on the panda side
lpp.comms_can_reset()
# read a large chunk, which should now contain valid messages
LARGE_CHUNK_SIZE = 512
dat = libpanda_py.ffi.new(f"uint8_t[{LARGE_CHUNK_SIZE}]")
rx_len = lpp.comms_can_read(dat, LARGE_CHUNK_SIZE)
assert rx_len == LARGE_CHUNK_SIZE, "comms_can_read returned too little data"
msgs, _ = unpack_can_buffer(bytes(dat))
assert len(msgs) > 0, "message buffer should not be empty"
for m in msgs:
assert m == test_msg, "message buffer should contain valid test messages"
def test_comms_reset_tx(self):
# store some test messages in the queue
test_msg = (0x100, b"test", 0)
packed = pack_can_buffer([test_msg for _ in range(100)])
# write a small chunk such that we have some overflow
TINY_CHUNK_SIZE = 6
lpp.comms_can_write(packed[0][:TINY_CHUNK_SIZE], TINY_CHUNK_SIZE)
# reset the comms to clear the overflow buffer on the panda side
lpp.comms_can_reset()
# write a full valid chunk, which should now contain valid messages
lpp.comms_can_write(packed[1], len(packed[1]))
# read the messages from the queue and make sure they're valid
queue_msgs = []
pkt = libpanda_py.ffi.new('CANPacket_t *')
while lpp.can_pop(TX_QUEUES[0], pkt):
queue_msgs.append(unpackage_can_msg(pkt))
assert len(queue_msgs) > 0, "message buffer should not be empty"
for m in queue_msgs:
assert m == test_msg, "message buffer should contain valid test messages"
def test_can_send_usb(self):
lpp.set_safety_hooks(CarParams.SafetyModel.allOutput, 0)
for bus in range(3):
with self.subTest(bus=bus):
for _ in range(100):
msgs = random_can_messages(200, bus=bus)
packed = pack_can_buffer(msgs)
# Simulate USB bulk chunks
for buf in packed:
for i in range(0, len(buf), CHUNK_SIZE):
chunk_len = min(CHUNK_SIZE, len(buf) - i)
lpp.comms_can_write(buf[i:i+chunk_len], chunk_len)
# Check that they ended up in the right buffers
queue_msgs = []
pkt = libpanda_py.ffi.new('CANPacket_t *')
while lpp.can_pop(TX_QUEUES[bus], pkt):
queue_msgs.append(unpackage_can_msg(pkt))
self.assertEqual(len(queue_msgs), len(msgs))
self.assertEqual(queue_msgs, msgs)
def test_can_receive_usb(self):
msgs = random_can_messages(50000)
packets = [libpanda_py.make_CANPacket(m[0], m[2], m[1]) for m in msgs]
rx_msgs = []
overflow_buf = b""
while len(packets) > 0:
# Push into queue
while lpp.can_slots_empty(lpp.rx_q) > 0 and len(packets) > 0:
lpp.can_push(lpp.rx_q, packets.pop(0))
# Simulate USB bulk IN chunks
MAX_TRANSFER_SIZE = 16384
dat = libpanda_py.ffi.new(f"uint8_t[{CHUNK_SIZE}]")
while True:
buf = b""
while len(buf) < MAX_TRANSFER_SIZE:
max_size = min(CHUNK_SIZE, MAX_TRANSFER_SIZE - len(buf))
rx_len = lpp.comms_can_read(dat, max_size)
buf += bytes(dat[0:rx_len])
if rx_len < max_size:
break
if len(buf) == 0:
break
unpacked_msgs, overflow_buf = unpack_can_buffer(overflow_buf + buf)
rx_msgs.extend(unpacked_msgs)
self.assertEqual(len(rx_msgs), len(msgs))
self.assertEqual(rx_msgs, msgs)
if __name__ == "__main__":
unittest.main()