From 32b263d0cc16b001e663d454e5464db33a018d79 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Wed, 7 Aug 2024 17:41:14 -0700 Subject: [PATCH] I think we can remove SimpleNamespace soon --- selfdrive/car/car_helpers.py | 56 ++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/selfdrive/car/car_helpers.py b/selfdrive/car/car_helpers.py index 921e4a5c96..ad37639f2d 100644 --- a/selfdrive/car/car_helpers.py +++ b/selfdrive/car/car_helpers.py @@ -1,9 +1,11 @@ import os import time from collections.abc import Callable +from types import SimpleNamespace from cereal import car from openpilot.selfdrive.car import carlog +from openpilot.selfdrive.car.can_definitions import CanData, CanSendCallable from openpilot.selfdrive.car.interfaces import get_interface_attr from openpilot.selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars from openpilot.selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN @@ -40,7 +42,7 @@ interface_names = _get_interface_names() interfaces = load_interfaces(interface_names) -def can_fingerprint(next_can: Callable) -> tuple[str | None, dict[int, dict]]: +def can_fingerprint(logcan: SimpleNamespace) -> tuple[str | None, dict[int, dict]]: finger = gen_empty_fingerprint() candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1 frame = 0 @@ -48,32 +50,36 @@ def can_fingerprint(next_can: Callable) -> tuple[str | None, dict[int, dict]]: done = False while not done: - for can in next_can(): - # The fingerprint dict is generated for all buses, this way the car interface - # can use it to detect a (valid) multipanda setup and initialize accordingly - if can.src < 128: - if can.src not in finger: - finger[can.src] = {} - finger[can.src][can.address] = len(can.dat) - + # logcan.drain(wait_for_one=True) may return empty list or multiple packets, + # so we increment frame for each one we receive + can_packets = logcan.drain(wait_for_one=True) + for can_packet in can_packets: + for can in can_packet: + # The fingerprint dict is generated for all buses, this way the car interface + # can use it to detect a (valid) multipanda setup and initialize accordingly + if can.src < 128: + if can.src not in finger: + finger[can.src] = {} + finger[can.src][can.address] = len(can.dat) + + for b in candidate_cars: + # Ignore extended messages and VIN query response. + if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8): + candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b]) + + # if we only have one car choice and the time since we got our first + # message has elapsed, exit for b in candidate_cars: - # Ignore extended messages and VIN query response. - if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8): - candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b]) - - # if we only have one car choice and the time since we got our first - # message has elapsed, exit - for b in candidate_cars: - if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT: - # fingerprint done - car_fingerprint = candidate_cars[b][0] + if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT: + # fingerprint done + car_fingerprint = candidate_cars[b][0] - # bail if no cars left or we've been waiting for more than 2s - failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200 - succeeded = car_fingerprint is not None - done = failed or succeeded + # bail if no cars left or we've been waiting for more than 2s + failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200 + succeeded = car_fingerprint is not None + done = failed or succeeded - frame += 1 + frame += 1 return car_fingerprint, finger @@ -129,7 +135,7 @@ def fingerprint(logcan, can_send, set_obd_multiplexing, num_pandas, cached_param # CAN fingerprint # drain CAN socket so we get the latest messages logcan.drain() - car_fingerprint, finger = can_fingerprint(logcan.get_one_can) + car_fingerprint, finger = can_fingerprint(logcan) exact_match = True source = car.CarParams.FingerprintSource.can