#!/usr/bin/env python3 import os import random import time import unittest from collections import defaultdict import cereal.messaging as messaging from cereal import car from common.params import Params from common.spinner import Spinner from common.timeout import Timeout from selfdrive.boardd.boardd import can_list_to_can_capnp from selfdrive.car import make_can_msg from system.hardware import TICI from selfdrive.test.helpers import phone_only, with_processes class TestBoardd(unittest.TestCase): @classmethod def setUpClass(cls): os.environ['STARTED'] = '1' os.environ['BOARDD_LOOPBACK'] = '1' cls.spinner = Spinner() @classmethod def tearDownClass(cls): cls.spinner.close() @phone_only @with_processes(['pandad']) def test_loopback(self): # wait for boardd to init time.sleep(2) with Timeout(60, "boardd didn't start"): sm = messaging.SubMaster(['pandaStates']) while sm.rcv_frame['pandaStates'] < 1 and len(sm['pandaStates']) == 0: sm.update(1000) num_pandas = len(sm['pandaStates']) expected_pandas = 2 if TICI and "SINGLE_PANDA" not in os.environ else 1 self.assertEqual(num_pandas, expected_pandas, "connected pandas ({num_pandas}) doesn't match expected panda count ({expected_pandas}). \ connect another panda for multipanda tests.") # boardd blocks on CarVin and CarParams cp = car.CarParams.new_message() safety_config = car.CarParams.SafetyConfig.new_message() safety_config.safetyModel = car.CarParams.SafetyModel.allOutput cp.safetyConfigs = [safety_config]*num_pandas params = Params() params.put_bool("FirmwareQueryDone", True) params.put_bool("ControlsReady", True) params.put("CarParams", cp.to_bytes()) sendcan = messaging.pub_sock('sendcan') can = messaging.sub_sock('can', conflate=False, timeout=100) time.sleep(0.2) n = 200 for i in range(n): self.spinner.update(f"boardd loopback {i}/{n}") sent_msgs = defaultdict(set) for _ in range(random.randrange(10)): to_send = [] for __ in range(random.randrange(100)): bus = random.choice([b for b in range(3*num_pandas) if b % 4 != 3]) addr = random.randrange(1, 1<<29) dat = bytes(random.getrandbits(8) for _ in range(random.randrange(1, 9))) sent_msgs[bus].add((addr, dat)) to_send.append(make_can_msg(addr, dat, bus)) sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan')) for _ in range(100 * 2): recvd = messaging.drain_sock(can, wait_for_one=True) for msg in recvd: for m in msg.can: if m.src >= 128: key = (m.address, m.dat) assert key in sent_msgs[m.src-128], f"got unexpected msg: {m.src=} {m.address=} {m.dat=}" sent_msgs[m.src-128].discard(key) if all(len(v) == 0 for v in sent_msgs.values()): break # if a set isn't empty, messages got dropped for bus in sent_msgs.keys(): assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages" if __name__ == "__main__": unittest.main()