car_helpers: common CAN fingerprint function (#29417)

* timing test

* test

* fix

* print

* loop

* clean up

* wider range

* Update selfdrive/car/tests/test_models.py

* Apply suggestions from code review

* run many times

* Update selfdrive/car/tests/test_models.py

* run for many its

* run with unittest and print as list

* Update .github/workflows/selfdrive_tests.yaml

* Update .github/workflows/selfdrive_tests.yaml

* total time is super inconsistent (body)

* Update selfdrive/car/tests/test_models.py

* clean up

* clean up

* clean up

* this works!

* draft

* test suite not as modular

* try something like this

* can do kb, but not too representative

* clean up

* remove kb? it depends on signals

* clean up

* more clean up

* rename

* just measure all CANParsers

* can do all this manually

* but this is way simpler

* comment

* stash

* draft

* draft

* remove old script

* clean up

* revert

* use it

* remove test

* opt

* no partial

* remove

* revert test_models
pull/29416/head
Shane Smiskol 2 years ago committed by GitHub
parent 2703a8e378
commit b9e36b7faa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 82
      selfdrive/car/car_helpers.py

@ -1,5 +1,5 @@
import os
from typing import Dict, List
from typing import Callable, Dict, List, Optional, Tuple
from cereal import car
from common.params import Params
@ -77,6 +77,46 @@ interface_names = _get_interface_names()
interfaces = load_interfaces(interface_names)
def can_fingerprint(next_can: Callable) -> Tuple[Optional[str], Dict[int, dict]]:
finger = gen_empty_fingerprint()
candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
frame = 0
car_fingerprint = None
done = False
while not done:
a = next_can()
for can in a.can:
# The fingerprint dict is generated for all buses, this way the car interface
# can use it to detect a (valid) multipanda setup and initialize accordingly
if can.src < 128:
if can.src not in finger:
finger[can.src] = {}
finger[can.src][can.address] = len(can.dat)
for b in candidate_cars:
# Ignore extended messages and VIN query response.
if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8):
candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b])
# if we only have one car choice and the time since we got our first
# message has elapsed, exit
for b in candidate_cars:
if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT:
# fingerprint done
car_fingerprint = candidate_cars[b][0]
# bail if no cars left or we've been waiting for more than 2s
failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200
succeeded = car_fingerprint is not None
done = failed or succeeded
frame += 1
return car_fingerprint, finger
# **** for use live only ****
def fingerprint(logcan, sendcan, num_pandas):
fixed_fingerprint = os.environ.get('FINGERPRINT', "")
@ -125,44 +165,10 @@ def fingerprint(logcan, sendcan, num_pandas):
set_obd_multiplexing(params, False)
params.put_bool("FirmwareQueryDone", True)
finger = gen_empty_fingerprint()
candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
frame = 0
car_fingerprint = None
done = False
# drain CAN socket so we always get the latest messages
# CAN fingerprint
# drain CAN socket so we get the latest messages
messaging.drain_sock_raw(logcan)
while not done:
a = get_one_can(logcan)
for can in a.can:
# The fingerprint dict is generated for all buses, this way the car interface
# can use it to detect a (valid) multipanda setup and initialize accordingly
if can.src < 128:
if can.src not in finger:
finger[can.src] = {}
finger[can.src][can.address] = len(can.dat)
for b in candidate_cars:
# Ignore extended messages and VIN query response.
if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8):
candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b])
# if we only have one car choice and the time since we got our first
# message has elapsed, exit
for b in candidate_cars:
if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT:
# fingerprint done
car_fingerprint = candidate_cars[b][0]
# bail if no cars left or we've been waiting for more than 2s
failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200
succeeded = car_fingerprint is not None
done = failed or succeeded
frame += 1
car_fingerprint, finger = can_fingerprint(lambda: get_one_can(logcan))
exact_match = True
source = car.CarParams.FingerprintSource.can

Loading…
Cancel
Save