|  |  | @ -1,9 +1,11 @@ | 
			
		
	
		
		
			
				
					
					|  |  |  | import os |  |  |  | import os | 
			
		
	
		
		
			
				
					
					|  |  |  | import time |  |  |  | import time | 
			
		
	
		
		
			
				
					
					|  |  |  | from collections.abc import Callable |  |  |  | from collections.abc import Callable | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | from types import SimpleNamespace | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | from cereal import car |  |  |  | from cereal import car | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.car import carlog |  |  |  | from openpilot.selfdrive.car import carlog | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | from openpilot.selfdrive.car.can_definitions import CanData, CanSendCallable | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.car.interfaces import get_interface_attr |  |  |  | from openpilot.selfdrive.car.interfaces import get_interface_attr | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars |  |  |  | from openpilot.selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars | 
			
		
	
		
		
			
				
					
					|  |  |  | from openpilot.selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN |  |  |  | from openpilot.selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN | 
			
		
	
	
		
		
			
				
					|  |  | @ -40,7 +42,7 @@ interface_names = _get_interface_names() | 
			
		
	
		
		
			
				
					
					|  |  |  | interfaces = load_interfaces(interface_names) |  |  |  | interfaces = load_interfaces(interface_names) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def can_fingerprint(next_can: Callable) -> tuple[str | None, dict[int, dict]]: |  |  |  | def can_fingerprint(logcan: SimpleNamespace) -> tuple[str | None, dict[int, dict]]: | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   finger = gen_empty_fingerprint() |  |  |  |   finger = gen_empty_fingerprint() | 
			
		
	
		
		
			
				
					
					|  |  |  |   candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]}  # attempt fingerprint on both bus 0 and 1 |  |  |  |   candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]}  # attempt fingerprint on both bus 0 and 1 | 
			
		
	
		
		
			
				
					
					|  |  |  |   frame = 0 |  |  |  |   frame = 0 | 
			
		
	
	
		
		
			
				
					|  |  | @ -48,32 +50,36 @@ def can_fingerprint(next_can: Callable) -> tuple[str | None, dict[int, dict]]: | 
			
		
	
		
		
			
				
					
					|  |  |  |   done = False |  |  |  |   done = False | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   while not done: |  |  |  |   while not done: | 
			
		
	
		
		
			
				
					
					|  |  |  |     for can in next_can(): |  |  |  |     # logcan.drain(wait_for_one=True) may return empty list or multiple packets, | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       # The fingerprint dict is generated for all buses, this way the car interface |  |  |  |     # so we increment frame for each one we receive | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       # can use it to detect a (valid) multipanda setup and initialize accordingly |  |  |  |     can_packets = logcan.drain(wait_for_one=True) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       if can.src < 128: |  |  |  |     for can_packet in can_packets: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         if can.src not in finger: |  |  |  |       for can in can_packet: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |           finger[can.src] = {} |  |  |  |         # The fingerprint dict is generated for all buses, this way the car interface | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         finger[can.src][can.address] = len(can.dat) |  |  |  |         # can use it to detect a (valid) multipanda setup and initialize accordingly | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |         if can.src < 128: | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |           if can.src not in finger: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |             finger[can.src] = {} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |           finger[can.src][can.address] = len(can.dat) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         for b in candidate_cars: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |           # Ignore extended messages and VIN query response. | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |           if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8): | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |             candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b]) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       # if we only have one car choice and the time since we got our first | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       # message has elapsed, exit | 
			
		
	
		
		
			
				
					
					|  |  |  |       for b in candidate_cars: |  |  |  |       for b in candidate_cars: | 
			
		
	
		
		
			
				
					
					|  |  |  |         # Ignore extended messages and VIN query response. |  |  |  |         if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8): |  |  |  |           # fingerprint done | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |           candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b]) |  |  |  |           car_fingerprint = candidate_cars[b][0] | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     # if we only have one car choice and the time since we got our first |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     # message has elapsed, exit |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     for b in candidate_cars: |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |       if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT: |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |         # fingerprint done |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |         car_fingerprint = candidate_cars[b][0] |  |  |  |  | 
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     # bail if no cars left or we've been waiting for more than 2s |  |  |  |       # bail if no cars left or we've been waiting for more than 2s | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200 |  |  |  |       failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     succeeded = car_fingerprint is not None |  |  |  |       succeeded = car_fingerprint is not None | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     done = failed or succeeded |  |  |  |       done = failed or succeeded | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     frame += 1 |  |  |  |       frame += 1 | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   return car_fingerprint, finger |  |  |  |   return car_fingerprint, finger | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | @ -129,7 +135,7 @@ def fingerprint(logcan, can_send, set_obd_multiplexing, num_pandas, cached_param | 
			
		
	
		
		
			
				
					
					|  |  |  |   # CAN fingerprint |  |  |  |   # CAN fingerprint | 
			
		
	
		
		
			
				
					
					|  |  |  |   # drain CAN socket so we get the latest messages |  |  |  |   # drain CAN socket so we get the latest messages | 
			
		
	
		
		
			
				
					
					|  |  |  |   logcan.drain() |  |  |  |   logcan.drain() | 
			
		
	
		
		
			
				
					
					|  |  |  |   car_fingerprint, finger = can_fingerprint(logcan.get_one_can) |  |  |  |   car_fingerprint, finger = can_fingerprint(logcan) | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   exact_match = True |  |  |  |   exact_match = True | 
			
		
	
		
		
			
				
					
					|  |  |  |   source = car.CarParams.FingerprintSource.can |  |  |  |   source = car.CarParams.FingerprintSource.can | 
			
		
	
	
		
		
			
				
					|  |  | 
 |