boardd: longer loopback test (#28239)

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: 2014b10ec3
beeps
Adeeb Shihadeh 2 years ago committed by GitHub
parent 3c001d5c89
commit 9ed5feca61
  1. 26
      selfdrive/boardd/tests/test_boardd_loopback.py

@ -1,9 +1,11 @@
#!/usr/bin/env python3
import os
import copy
import random
import time
import unittest
from collections import defaultdict
from pprint import pprint
import cereal.messaging as messaging
from cereal import car, log
@ -63,12 +65,13 @@ class TestBoardd(unittest.TestCase):
n = 200
for i in range(n):
print(f"boardd loopback {i}/{n}")
self.spinner.update(f"boardd loopback {i}/{n}")
sent_msgs = defaultdict(set)
for _ in range(random.randrange(10)):
for _ in range(random.randrange(20, 100)):
to_send = []
for __ in range(random.randrange(100)):
for __ in range(random.randrange(50)):
bus = random.choice([b for b in range(3*num_pandas) if b % 4 != 3])
addr = random.randrange(1, 1<<29)
dat = bytes(random.getrandbits(8) for _ in range(random.randrange(1, 9)))
@ -76,22 +79,25 @@ class TestBoardd(unittest.TestCase):
to_send.append(make_can_msg(addr, dat, bus))
sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan'))
for _ in range(100 * 2):
sent_loopback = copy.deepcopy(sent_msgs)
sent_loopback.update({k+128: copy.deepcopy(v) for k, v in sent_msgs.items()})
sent_total = {k: len(v) for k, v in sent_loopback.items()}
for _ in range(100 * 5):
recvd = messaging.drain_sock(can, wait_for_one=True)
for msg in recvd:
for m in msg.can:
if m.src >= 128:
key = (m.address, m.dat)
assert key in sent_msgs[m.src-128], f"got unexpected msg: {m.src=} {m.address=} {m.dat=}"
sent_msgs[m.src-128].discard(key)
assert key in sent_loopback[m.src], f"got unexpected msg: {m.src=} {m.address=} {m.dat=}"
sent_loopback[m.src].discard(key)
if all(len(v) == 0 for v in sent_msgs.values()):
if all(len(v) == 0 for v in sent_loopback.values()):
break
# if a set isn't empty, messages got dropped
for bus in sent_msgs.keys():
assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages"
pprint(sent_msgs)
pprint(sent_loopback)
for bus in sent_loopback.keys():
assert not len(sent_loopback[bus]), f"loop {i}: bus {bus} missing {len(sent_loopback[bus])} out of {sent_total[bus]} messages"
if __name__ == "__main__":
unittest.main()

Loading…
Cancel
Save