@ -1,15 +1,14 @@
import os
import time
from collections . abc import Callable
from cereal import car
from openpilot . selfdrive . car import carlog
from openpilot . selfdrive . car . can_definitions import CanRecvCallable , CanSendCallable
from openpilot . selfdrive . car . interfaces import get_interface_attr
from openpilot . selfdrive . car . fingerprints import eliminate_incompatible_cars , all_legacy_fingerprint_cars
from openpilot . selfdrive . car . vin import get_vin , is_valid_vin , VIN_UNKNOWN
from openpilot . selfdrive . car . fw_versions import get_fw_versions_ordered , get_present_ecus , match_fw_to_car
from openpilot . selfdrive . car . fw_versions import ObdCallback , get_fw_versions_ordered , get_present_ecus , match_fw_to_car
from openpilot . selfdrive . car . mock . values import CAR as MOCK
import cereal . messaging as messaging
from openpilot . selfdrive . car import gen_empty_fingerprint
FRAME_FINGERPRINT = 100 # 1s
@ -41,7 +40,7 @@ interface_names = _get_interface_names()
interfaces = load_interfaces ( interface_names )
def can_fingerprint ( can_recv : Callable ) - > tuple [ str | None , dict [ int , dict ] ] :
def can_fingerprint ( can_recv : CanRecvCa llable ) - > tuple [ str | None , dict [ int , dict ] ] :
finger = gen_empty_fingerprint ( )
candidate_cars = { i : all_legacy_fingerprint_cars ( ) for i in [ 0 , 1 ] } # attempt fingerprint on both bus 0 and 1
frame = 0
@ -49,10 +48,10 @@ def can_fingerprint(can_recv: Callable) -> tuple[str | None, dict[int, dict]]:
done = False
while not done :
# can_recv() may return zero or multiple packets, so we increment frame for each one we receive
can_packets = can_recv ( )
# can_recv(wait_for_one=True ) may return zero or multiple packets, so we increment frame for each one we receive
can_packets = can_recv ( wait_for_one = True )
for can_packet in can_packets :
for can in can_packet . can :
for can in can_packet :
# The fingerprint dict is generated for all buses, this way the car interface
# can use it to detect a (valid) multipanda setup and initialize accordingly
if can . src < 128 :
@ -83,7 +82,7 @@ def can_fingerprint(can_recv: Callable) -> tuple[str | None, dict[int, dict]]:
# **** for use live only ****
def fingerprint ( logcan , sendcan , set_obd_multiplexing , num_pandas , cached_params_raw ) :
def fingerprint ( can_recv : CanRecvCallable , can_send : CanSendCallable , set_obd_multiplexing : ObdCallback , num_pandas : int , cached_params_raw : bytes | None ) :
fixed_fingerprint = os . environ . get ( ' FINGERPRINT ' , " " )
skip_fw_query = os . environ . get ( ' SKIP_FW_QUERY ' , False )
disable_fw_cache = os . environ . get ( ' DISABLE_FW_CACHE ' , False )
@ -109,9 +108,9 @@ def fingerprint(logcan, sendcan, set_obd_multiplexing, num_pandas, cached_params
# NOTE: this takes ~0.1s and is relied on to allow sendcan subscriber to connect in time
set_obd_multiplexing ( True )
# VIN query only reliably works through OBDII
vin_rx_addr , vin_rx_bus , vin = get_vin ( log can, sendcan , ( 0 , 1 ) )
ecu_rx_addrs = get_present_ecus ( log can, sendcan , set_obd_multiplexing , num_pandas = num_pandas )
car_fw = get_fw_versions_ordered ( log can, sendcan , set_obd_multiplexing , vin , ecu_rx_addrs , num_pandas = num_pandas )
vin_rx_addr , vin_rx_bus , vin = get_vin ( can_recv , can_ send, ( 0 , 1 ) )
ecu_rx_addrs = get_present_ecus ( can_recv , can_ send, set_obd_multiplexing , num_pandas = num_pandas )
car_fw = get_fw_versions_ordered ( can_recv , can_ send, set_obd_multiplexing , vin , ecu_rx_addrs , num_pandas = num_pandas )
cached = False
exact_fw_match , fw_candidates = match_fw_to_car ( car_fw , vin )
@ -132,8 +131,8 @@ def fingerprint(logcan, sendcan, set_obd_multiplexing, num_pandas, cached_params
# CAN fingerprint
# drain CAN socket so we get the latest messages
messaging . drain_sock_raw ( logcan )
car_fingerprint , finger = can_fingerprint ( lambda : messaging . drain_sock ( logcan , wait_for_one = True ) )
can_recv ( )
car_fingerprint , finger = can_fingerprint ( can_recv )
exact_match = True
source = car . CarParams . FingerprintSource . can
@ -160,8 +159,9 @@ def get_car_interface(CP):
return CarInterface ( CP , CarController , CarState )
def get_car ( logcan , sendcan , set_obd_multiplexing , experimental_long_allowed , num_pandas = 1 , cached_params = None ) :
candidate , fingerprints , vin , car_fw , source , exact_match = fingerprint ( logcan , sendcan , set_obd_multiplexing , num_pandas , cached_params )
def get_car ( can_recv : CanRecvCallable , can_send : CanSendCallable , set_obd_multiplexing : ObdCallback , experimental_long_allowed : bool ,
num_pandas : int = 1 , cached_params : bytes | None = None ) :
candidate , fingerprints , vin , car_fw , source , exact_match = fingerprint ( can_recv , can_send , set_obd_multiplexing , num_pandas , cached_params )
if candidate is None :
carlog . error ( { " event " : " car doesn ' t match any fingerprints " , " fingerprints " : repr ( fingerprints ) } )