@ -15,13 +15,6 @@ from openpilot.selfdrive.car import gen_empty_fingerprint
FRAME_FINGERPRINT = 100 # 1s
def get_one_can ( logcan ) :
while True :
can = messaging . recv_one_retry ( logcan )
if len ( can . can ) > 0 :
return can
def load_interfaces ( brand_names ) :
ret = { }
for brand_name in brand_names :
@ -48,7 +41,7 @@ interface_names = _get_interface_names()
interfaces = load_interfaces ( interface_names )
def can_fingerprint ( next_ can: Callable ) - > tuple [ str | None , dict [ int , dict ] ] :
def can_fingerprint ( can_recv : Callable ) - > tuple [ str | None , dict [ int , dict ] ] :
finger = gen_empty_fingerprint ( )
candidate_cars = { i : all_legacy_fingerprint_cars ( ) for i in [ 0 , 1 ] } # attempt fingerprint on both bus 0 and 1
frame = 0
@ -56,34 +49,35 @@ def can_fingerprint(next_can: Callable) -> tuple[str | None, dict[int, dict]]:
done = False
while not done :
a = next_can ( )
for can in a . can :
# The fingerprint dict is generated for all buses, this way the car interface
# can use it to detect a (valid) multipanda setup and initialize accordingly
if can . src < 128 :
if can . src not in finger :
finger [ can . src ] = { }
finger [ can . src ] [ can . address ] = len ( can . dat )
# can_recv() may return zero or multiple packets, so we increment frame for each one we receive
can_packets = can_recv ( )
for can_packet in can_packets :
for can in can_packet . can :
# The fingerprint dict is generated for all buses, this way the car interface
# can use it to detect a (valid) multipanda setup and initialize accordingly
if can . src < 128 :
if can . src not in finger :
finger [ can . src ] = { }
finger [ can . src ] [ can . address ] = len ( can . dat )
for b in candidate_cars :
# Ignore extended messages and VIN query response.
if can . src == b and can . address < 0x800 and can . address not in ( 0x7df , 0x7e0 , 0x7e8 ) :
candidate_cars [ b ] = eliminate_incompatible_cars ( can , candidate_cars [ b ] )
# if we only have one car choice and the time since we got our first
# message has elapsed, exit
for b in candidate_cars :
# Ignore extended messages and VIN query response.
if can . src == b and can . address < 0x800 and can . address not in ( 0x7df , 0x7e0 , 0x7e8 ) :
candidate_cars [ b ] = eliminate_incompatible_cars ( can , candidate_cars [ b ] )
# if we only have one car choice and the time since we got our first
# message has elapsed, exit
for b in candidate_cars :
if len ( candidate_cars [ b ] ) == 1 and frame > FRAME_FINGERPRINT :
# fingerprint done
car_fingerprint = candidate_cars [ b ] [ 0 ]
if len ( candidate_cars [ b ] ) == 1 and frame > FRAME_FINGERPRINT :
# fingerprint done
car_fingerprint = candidate_cars [ b ] [ 0 ]
# bail if no cars left or we've been waiting for more than 2s
failed = ( all ( len ( cc ) == 0 for cc in candidate_cars . values ( ) ) and frame > FRAME_FINGERPRINT ) or frame > 200
succeeded = car_fingerprint is not None
done = failed or succeeded
# bail if no cars left or we've been waiting for more than 2s
failed = ( all ( len ( cc ) == 0 for cc in candidate_cars . values ( ) ) and frame > FRAME_FINGERPRINT ) or frame > 200
succeeded = car_fingerprint is not None
done = failed or succeeded
frame + = 1
frame + = 1
return car_fingerprint , finger
@ -139,7 +133,7 @@ def fingerprint(logcan, sendcan, set_obd_multiplexing, num_pandas, cached_params
# CAN fingerprint
# drain CAN socket so we get the latest messages
messaging . drain_sock_raw ( logcan )
car_fingerprint , finger = can_fingerprint ( lambda : get_one_can ( logcan ) )
car_fingerprint , finger = can_fingerprint ( lambda : messaging . drain_sock ( logcan , wait_for_one = True ) )
exact_match = True
source = car . CarParams . FingerprintSource . can