diff --git a/selfdrive/car/can_definitions.py b/selfdrive/car/can_definitions.py index ab60aa40d5..f43e03af80 100644 --- a/selfdrive/car/can_definitions.py +++ b/selfdrive/car/can_definitions.py @@ -1,3 +1,11 @@ from collections.abc import Callable +from dataclasses import dataclass CanSendCallable = Callable[[list[tuple[int, bytes, int]]], None] + + +@dataclass +class CanData: + address: int + dat: bytes + src: int diff --git a/selfdrive/car/card.py b/selfdrive/car/card.py index cfdfc7a55e..e1dfeaf6d5 100755 --- a/selfdrive/car/card.py +++ b/selfdrive/car/card.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import os import time -from dataclasses import dataclass from types import SimpleNamespace import cereal.messaging as messaging @@ -16,7 +15,7 @@ from openpilot.common.swaglog import cloudlog, ForwardingHandler from openpilot.selfdrive.pandad import can_list_to_can_capnp, can_capnp_to_list from openpilot.selfdrive.car import DT_CTRL, carlog -from openpilot.selfdrive.car.can_definitions import CanSendCallable +from openpilot.selfdrive.car.can_definitions import CanData, CanSendCallable from openpilot.selfdrive.car.fw_versions import ObdCallback from openpilot.selfdrive.car.car_helpers import get_car from openpilot.selfdrive.car.interfaces import CarInterfaceBase @@ -41,13 +40,6 @@ def obd_callback(params: Params) -> ObdCallback: return set_obd_multiplexing -@dataclass -class CanData: - address: int - dat: bytes - src: int - - def get_one_can(logcan: messaging.SubSocket) -> list[CanData]: while True: can = messaging.recv_one_retry(logcan) diff --git a/selfdrive/car/tests/test_can_fingerprint.py b/selfdrive/car/tests/test_can_fingerprint.py index f236986d8e..6f9922129f 100644 --- a/selfdrive/car/tests/test_can_fingerprint.py +++ b/selfdrive/car/tests/test_can_fingerprint.py @@ -1,6 +1,6 @@ from parameterized import parameterized -from cereal import log, messaging +from openpilot.selfdrive.car.can_definitions import CanData from openpilot.selfdrive.car.car_helpers import FRAME_FINGERPRINT, can_fingerprint from openpilot.selfdrive.car.fingerprints import _FINGERPRINTS as FINGERPRINTS @@ -11,13 +11,11 @@ class TestCanFingerprint: """Tests online fingerprinting function on offline fingerprints""" for fingerprint in fingerprints: # can have multiple fingerprints for each platform - can = messaging.new_message('can', 1) - can.can = [log.CanData(address=address, dat=b'\x00' * length, src=src) - for address, length in fingerprint.items() for src in (0, 1)] + can = [CanData(address=address, dat=b'\x00' * length, src=src) + for address, length in fingerprint.items() for src in (0, 1)] fingerprint_iter = iter([can]) - empty_can = messaging.new_message('can', 0) - car_fingerprint, finger = can_fingerprint(lambda: next(fingerprint_iter, empty_can)) # noqa: B023 + car_fingerprint, finger = can_fingerprint(lambda: next(fingerprint_iter, [])) # noqa: B023 assert car_fingerprint == car_model assert finger[0] == fingerprint @@ -32,19 +30,16 @@ class TestCanFingerprint: cases = [] # case 1 - one match, make sure we keep going for 100 frames - can = messaging.new_message('can', 1) - can.can = [log.CanData(address=address, dat=b'\x00' * length, src=src) - for address, length in fingerprint.items() for src in (0, 1)] + can = [CanData(address=address, dat=b'\x00' * length, src=src) + for address, length in fingerprint.items() for src in (0, 1)] cases.append((FRAME_FINGERPRINT, car_model, can)) # case 2 - no matches, make sure we keep going for 100 frames - can = messaging.new_message('can', 1) - can.can = [log.CanData(address=1, dat=b'\x00' * 1, src=src) for src in (0, 1)] # uncommon address + can = [CanData(address=1, dat=b'\x00' * 1, src=src) for src in (0, 1)] # uncommon address cases.append((FRAME_FINGERPRINT, None, can)) # case 3 - multiple matches, make sure we keep going for 200 frames to try to eliminate some - can = messaging.new_message('can', 1) - can.can = [log.CanData(address=2016, dat=b'\x00' * 8, src=src) for src in (0, 1)] # common address + can = [CanData(address=2016, dat=b'\x00' * 8, src=src) for src in (0, 1)] # common address cases.append((FRAME_FINGERPRINT * 2, None, can)) for expected_frames, car_model, can in cases: @@ -58,4 +53,4 @@ class TestCanFingerprint: car_fingerprint, _ = can_fingerprint(test) assert car_fingerprint == car_model - assert frames == expected_frames + 2# TODO: fix extra frames + assert frames == expected_frames + 2 # TODO: fix extra frames