#!/usr/bin/env python3 import os import random import time import unittest from collections import defaultdict import cereal.messaging as messaging from cereal import car, log from common.params import Params from common.spinner import Spinner from common.timeout import Timeout from selfdrive.boardd.boardd import can_list_to_can_capnp from selfdrive.car import make_can_msg from system.hardware import TICI from selfdrive.test.helpers import phone_only, with_processes class TestBoardd(unittest.TestCase): @classmethod def setUpClass(cls): os.environ['STARTED'] = '1' os.environ['BOARDD_LOOPBACK'] = '1' cls.spinner = Spinner() @classmethod def tearDownClass(cls): cls.spinner.close() @phone_only @with_processes(['pandad']) def test_loopback(self): # wait for boardd to init time.sleep(2) with Timeout(60, "boardd didn't start"): sm = messaging.SubMaster(['pandaStates']) while sm.rcv_frame['pandaStates'] < 1 or len(sm['pandaStates']) == 0 or \ any(ps.pandaType == log.PandaState.PandaType.unknown for ps in sm['pandaStates']): sm.update(1000) num_pandas = len(sm['pandaStates']) expected_pandas = 2 if TICI and "SINGLE_PANDA" not in os.environ else 1 self.assertEqual(num_pandas, expected_pandas, "connected pandas ({num_pandas}) doesn't match expected panda count ({expected_pandas}). \ connect another panda for multipanda tests.") # boardd blocks on FirmwareQueryDone, ControlsReady, and CarParams cp = car.CarParams.new_message() safety_config = car.CarParams.SafetyConfig.new_message() safety_config.safetyModel = car.CarParams.SafetyModel.allOutput cp.safetyConfigs = [safety_config]*num_pandas params = Params() params.put_bool("FirmwareQueryDone", True) params.put_bool("ControlsReady", True) params.put("CarParams", cp.to_bytes()) sendcan = messaging.pub_sock('sendcan') can = messaging.sub_sock('can', conflate=False, timeout=100) time.sleep(0.2) n = 200 for i in range(n): self.spinner.update(f"boardd loopback {i}/{n}") sent_msgs = defaultdict(set) for _ in range(random.randrange(10)): to_send = [] for __ in range(random.randrange(100)): bus = random.choice([b for b in range(3*num_pandas) if b % 4 != 3]) addr = random.randrange(1, 1<<29) dat = bytes(random.getrandbits(8) for _ in range(random.randrange(1, 9))) sent_msgs[bus].add((addr, dat)) to_send.append(make_can_msg(addr, dat, bus)) sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan')) for _ in range(100 * 2): recvd = messaging.drain_sock(can, wait_for_one=True) for msg in recvd: for m in msg.can: if m.src >= 128: key = (m.address, m.dat) assert key in sent_msgs[m.src-128], f"got unexpected msg: {m.src=} {m.address=} {m.dat=}" sent_msgs[m.src-128].discard(key) if all(len(v) == 0 for v in sent_msgs.values()): break # if a set isn't empty, messages got dropped for bus in sent_msgs.keys(): assert not len(sent_msgs[bus]), f"loop {i}: bus {bus} missing {len(sent_msgs[bus])} messages" if __name__ == "__main__": unittest.main()