boardd: check CAN in SPI test (#32400)

* test

* little more

---------

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: 69f57fb6a8
097
Adeeb Shihadeh 11 months ago committed by GitHub
parent 86b6103657
commit c868694a23
  1. 2
      selfdrive/boardd/tests/test_boardd_loopback.py
  2. 36
      selfdrive/boardd/tests/test_boardd_spi.py

@ -8,6 +8,7 @@ from pprint import pprint
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import car, log from cereal import car, log
from openpilot.common.retry import retry
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.timeout import Timeout from openpilot.common.timeout import Timeout
from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp
@ -16,6 +17,7 @@ from openpilot.system.hardware import TICI
from openpilot.selfdrive.test.helpers import phone_only, with_processes from openpilot.selfdrive.test.helpers import phone_only, with_processes
@retry(attempts=3)
def setup_boardd(num_pandas): def setup_boardd(num_pandas):
params = Params() params = Params()
params.put_bool("IsOnroad", False) params.put_bool("IsOnroad", False)

@ -2,12 +2,13 @@ import os
import time import time
import numpy as np import numpy as np
import pytest import pytest
import random
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal.services import SERVICE_LIST from cereal.services import SERVICE_LIST
from openpilot.system.hardware import HARDWARE from openpilot.system.hardware import HARDWARE
from openpilot.selfdrive.test.helpers import phone_only, with_processes from openpilot.selfdrive.test.helpers import phone_only, with_processes
from openpilot.selfdrive.boardd.tests.test_boardd_loopback import setup_boardd from openpilot.selfdrive.boardd.tests.test_boardd_loopback import setup_boardd, send_random_can_messages
@pytest.mark.tici @pytest.mark.tici
@ -18,6 +19,7 @@ class TestBoarddSpi:
pytest.skip("only for spi pandas") pytest.skip("only for spi pandas")
os.environ['STARTED'] = '1' os.environ['STARTED'] = '1'
os.environ['BOARDD_LOOPBACK'] = '1' os.environ['BOARDD_LOOPBACK'] = '1'
#os.environ['SPI_ERR_PROB'] = '-1'
os.environ['SPI_ERR_PROB'] = '0.001' os.environ['SPI_ERR_PROB'] = '0.001'
@phone_only @phone_only
@ -25,30 +27,48 @@ class TestBoarddSpi:
def test_spi_corruption(self, subtests): def test_spi_corruption(self, subtests):
setup_boardd(1) setup_boardd(1)
sendcan = messaging.pub_sock('sendcan')
socks = {s: messaging.sub_sock(s, conflate=False, timeout=100) for s in ('can', 'pandaStates', 'peripheralState')} socks = {s: messaging.sub_sock(s, conflate=False, timeout=100) for s in ('can', 'pandaStates', 'peripheralState')}
time.sleep(2) time.sleep(2)
for s in socks.values(): for s in socks.values():
messaging.drain_sock_raw(s) messaging.drain_sock_raw(s)
total_recv_count = 0
total_sent_count = 0
sent_msgs = {bus: list() for bus in range(3)}
st = time.monotonic() st = time.monotonic()
ts = {s: list() for s in socks.keys()} ts = {s: list() for s in socks.keys()}
for _ in range(20): for _ in range(20):
# send some CAN messages
sent = send_random_can_messages(sendcan, random.randrange(2, 20))
for k, v in sent.items():
sent_msgs[k].extend(list(v))
total_sent_count += len(v)
for service, sock in socks.items(): for service, sock in socks.items():
for m in messaging.drain_sock(sock): for m in messaging.drain_sock(sock):
ts[service].append(m.logMonoTime) ts[service].append(m.logMonoTime)
# sanity check for corruption # sanity check for corruption
assert m.valid assert m.valid or (service == "can")
if service == "can": if service == "can":
assert len(m.can) == 0 for msg in m.can:
if msg.src > 4:
continue
key = (msg.address, msg.dat)
assert key in sent_msgs[msg.src], f"got unexpected msg: {msg.src=} {msg.address=} {msg.dat=}"
# TODO: enable this
#sent_msgs[msg.src].remove(key)
total_recv_count += 1
elif service == "pandaStates": elif service == "pandaStates":
assert len(m.pandaStates) == 1 assert len(m.pandaStates) == 1
ps = m.pandaStates[0] ps = m.pandaStates[0]
assert ps.uptime < 100 assert ps.uptime < 1000
assert ps.pandaType == "tres" assert ps.pandaType == "tres"
assert ps.ignitionLine assert ps.ignitionLine
assert not ps.ignitionCan assert not ps.ignitionCan
assert ps.voltage < 14000 assert 4000 < ps.voltage < 14000
elif service == "peripheralState": elif service == "peripheralState":
ps = m.peripheralState ps = m.peripheralState
assert ps.pandaType == "tres" assert ps.pandaType == "tres"
@ -66,6 +86,10 @@ class TestBoarddSpi:
with subtests.test(msg="timing check", service=service): with subtests.test(msg="timing check", service=service):
edt = 1e3 / SERVICE_LIST[service].frequency edt = 1e3 / SERVICE_LIST[service].frequency
assert edt*0.9 < np.mean(dts) < edt*1.1 assert edt*0.9 < np.mean(dts) < edt*1.1
assert np.max(dts) < edt*3 assert np.max(dts) < edt*20
assert np.min(dts) < edt assert np.min(dts) < edt
assert len(dts) >= ((et-0.5)*SERVICE_LIST[service].frequency*0.8) assert len(dts) >= ((et-0.5)*SERVICE_LIST[service].frequency*0.8)
with subtests.test(msg="CAN traffic"):
print(f"Sent {total_sent_count} CAN messages, got {total_recv_count} back. {total_recv_count/total_sent_count:.2%} received")
assert total_recv_count > 20

Loading…
Cancel
Save