@ -5,6 +5,7 @@ from tqdm import tqdm
import panda . python . uds as uds
from cereal import car
from common . params import Params
from selfdrive . car . ecu_addrs import get_ecu_addrs
from selfdrive . car . interfaces import get_interface_attr
from selfdrive . car . fingerprints import FW_VERSIONS
@ -89,7 +90,7 @@ def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None):
return set ( )
def match_fw_to_car_exact ( fw_versions_dict ) :
def match_fw_to_car_exact ( fw_versions_dict ) - > Set [ str ] :
""" Do an exact FW match. Returns all cars that match the given
FW versions for a list of " essential " ECUs . If an ECU is not considered
essential the FW version can be missing to get a fingerprint , but if it ' s present it
@ -202,6 +203,7 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand
all_car_fw = [ ]
brand_matches = get_brand_ecu_matches ( ecu_rx_addrs )
matched_brand : Optional [ str ] = None
for brand in sorted ( brand_matches , key = lambda b : len ( brand_matches [ b ] ) , reverse = True ) :
car_fw = get_fw_versions ( logcan , sendcan , query_brand = brand , timeout = timeout , num_pandas = num_pandas , debug = debug , progress = progress )
@ -209,12 +211,25 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand
# Try to match using FW returned from this brand only
matches = match_fw_to_car_exact ( build_fw_dict ( car_fw ) )
if len ( matches ) == 1 :
matched_brand = brand
break
# Do non-OBD queries for matched brand, or all if no match is found
params = Params ( )
params . put_bool ( " FirmwareObdQueryDone " , True )
cloudlog . warning ( " Waiting for OBD multiplexing to be disabled " )
params . get_bool ( " ObdMultiplexingDisabled " , block = True )
cloudlog . warning ( " OBD multiplexing disabled " )
for brand in FW_QUERY_CONFIGS . keys ( ) :
if brand == matched_brand or matched_brand is None :
all_car_fw . extend ( get_fw_versions ( logcan , sendcan , query_brand = brand , timeout = timeout , num_pandas = num_pandas , obd_multiplexed = False , debug = debug , progress = progress ) )
return all_car_fw
def get_fw_versions ( logcan , sendcan , query_brand = None , extra = None , timeout = 0.1 , num_pandas = 1 , debug = False , progress = False ) :
def get_fw_versions ( logcan , sendcan , query_brand = None , extra = None , timeout = 0.1 , num_pandas = 1 , obd_multiplexed = True , debug = False , progress = False ) :
versions = VERSIONS . copy ( )
# Each brand can define extra ECUs to query for data collection
@ -262,6 +277,9 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
# Skip query if no panda available
if r . bus > num_pandas * 4 - 1 :
continue
# Or if request is not designated for current multiplexing mode
elif r . non_obd == obd_multiplexed :
continue
try :
addrs = [ ( a , s ) for ( b , a , s ) in addr_chunk if b in ( brand , ' any ' ) and