From b9e36b7faae1ad2a9ff78c72f5169a99f94ca82c Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Wed, 16 Aug 2023 03:28:50 -0700 Subject: [PATCH] car_helpers: common CAN fingerprint function (#29417) * timing test * test * fix * print * loop * clean up * wider range * Update selfdrive/car/tests/test_models.py * Apply suggestions from code review * run many times * Update selfdrive/car/tests/test_models.py * run for many its * run with unittest and print as list * Update .github/workflows/selfdrive_tests.yaml * Update .github/workflows/selfdrive_tests.yaml * total time is super inconsistent (body) * Update selfdrive/car/tests/test_models.py * clean up * clean up * clean up * this works! * draft * test suite not as modular * try something like this * can do kb, but not too representative * clean up * remove kb? it depends on signals * clean up * more clean up * rename * just measure all CANParsers * can do all this manually * but this is way simpler * comment * stash * draft * draft * remove old script * clean up * revert * use it * remove test * opt * no partial * remove * revert test_models --- selfdrive/car/car_helpers.py | 82 +++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/selfdrive/car/car_helpers.py b/selfdrive/car/car_helpers.py index 1ad584de2..652862de4 100644 --- a/selfdrive/car/car_helpers.py +++ b/selfdrive/car/car_helpers.py @@ -1,5 +1,5 @@ import os -from typing import Dict, List +from typing import Callable, Dict, List, Optional, Tuple from cereal import car from common.params import Params @@ -77,6 +77,46 @@ interface_names = _get_interface_names() interfaces = load_interfaces(interface_names) +def can_fingerprint(next_can: Callable) -> Tuple[Optional[str], Dict[int, dict]]: + finger = gen_empty_fingerprint() + candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1 + frame = 0 + car_fingerprint = None + done = False + + while not done: + a = next_can() + + for can in a.can: + # The fingerprint dict is generated for all buses, this way the car interface + # can use it to detect a (valid) multipanda setup and initialize accordingly + if can.src < 128: + if can.src not in finger: + finger[can.src] = {} + finger[can.src][can.address] = len(can.dat) + + for b in candidate_cars: + # Ignore extended messages and VIN query response. + if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8): + candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b]) + + # if we only have one car choice and the time since we got our first + # message has elapsed, exit + for b in candidate_cars: + if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT: + # fingerprint done + car_fingerprint = candidate_cars[b][0] + + # bail if no cars left or we've been waiting for more than 2s + failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200 + succeeded = car_fingerprint is not None + done = failed or succeeded + + frame += 1 + + return car_fingerprint, finger + + # **** for use live only **** def fingerprint(logcan, sendcan, num_pandas): fixed_fingerprint = os.environ.get('FINGERPRINT', "") @@ -125,44 +165,10 @@ def fingerprint(logcan, sendcan, num_pandas): set_obd_multiplexing(params, False) params.put_bool("FirmwareQueryDone", True) - finger = gen_empty_fingerprint() - candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1 - frame = 0 - car_fingerprint = None - done = False - - # drain CAN socket so we always get the latest messages + # CAN fingerprint + # drain CAN socket so we get the latest messages messaging.drain_sock_raw(logcan) - - while not done: - a = get_one_can(logcan) - - for can in a.can: - # The fingerprint dict is generated for all buses, this way the car interface - # can use it to detect a (valid) multipanda setup and initialize accordingly - if can.src < 128: - if can.src not in finger: - finger[can.src] = {} - finger[can.src][can.address] = len(can.dat) - - for b in candidate_cars: - # Ignore extended messages and VIN query response. - if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8): - candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b]) - - # if we only have one car choice and the time since we got our first - # message has elapsed, exit - for b in candidate_cars: - if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT: - # fingerprint done - car_fingerprint = candidate_cars[b][0] - - # bail if no cars left or we've been waiting for more than 2s - failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200 - succeeded = car_fingerprint is not None - done = failed or succeeded - - frame += 1 + car_fingerprint, finger = can_fingerprint(lambda: get_one_can(logcan)) exact_match = True source = car.CarParams.FingerprintSource.can