@ -1,6 +1,6 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
from collections import defaultdict
from collections import defaultdict
from typing import Any , List , Optional , Set
from typing import Any , Dict , List , Set
from tqdm import tqdm
from tqdm import tqdm
import panda . python . uds as uds
import panda . python . uds as uds
@ -147,8 +147,10 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True):
def get_present_ecus ( logcan , sendcan , num_pandas = 1 ) - > Set [ EcuAddrBusType ] :
def get_present_ecus ( logcan , sendcan , num_pandas = 1 ) - > Set [ EcuAddrBusType ] :
queries : List [ List [ EcuAddrBusType ] ] = list ( )
params = Params ( )
parallel_queries : List [ EcuAddrBusType ] = list ( )
# queries are split by OBD multiplexing mode
queries : Dict [ bool , List [ List [ EcuAddrBusType ] ] ] = { True : [ ] , False : [ ] }
parallel_queries : Dict [ bool , List [ EcuAddrBusType ] ] = { True : [ ] , False : [ ] }
responses = set ( )
responses = set ( )
for brand , r in REQUESTS :
for brand , r in REQUESTS :
@ -163,21 +165,24 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
a = ( addr , sub_addr , r . bus )
a = ( addr , sub_addr , r . bus )
# Build set of queries
# Build set of queries
if sub_addr is None :
if sub_addr is None :
if a not in parallel_queries :
if a not in parallel_queries [ r . obd_multiplexing ] :
parallel_queries . append ( a )
parallel_queries [ r . obd_multiplexing ] . append ( a )
else : # subaddresses must be queried one by one
else : # subaddresses must be queried one by one
if [ a ] not in queries :
if [ a ] not in queries [ r . obd_multiplexing ] :
queries . append ( [ a ] )
queries [ r . obd_multiplexing ] . append ( [ a ] )
# Build set of expected responses to filter
# Build set of expected responses to filter
response_addr = uds . get_rx_addr_for_tx_addr ( addr , r . rx_offset )
response_addr = uds . get_rx_addr_for_tx_addr ( addr , r . rx_offset )
responses . add ( ( response_addr , sub_addr , r . bus ) )
responses . add ( ( response_addr , sub_addr , r . bus ) )
queries . insert ( 0 , parallel_queries )
for obd_multiplexing in queries :
queries [ obd_multiplexing ] . insert ( 0 , parallel_queries [ obd_multiplexing ] )
ecu_responses = set ( )
ecu_responses = set ( )
for query in queries :
for obd_multiplexing in queries :
ecu_responses . update ( get_ecu_addrs ( logcan , sendcan , set ( query ) , responses , timeout = 0.1 ) )
set_obd_multiplexing ( params , obd_multiplexing )
for query in queries [ obd_multiplexing ] :
ecu_responses . update ( get_ecu_addrs ( logcan , sendcan , set ( query ) , responses , timeout = 0.1 ) )
return ecu_responses
return ecu_responses
@ -198,13 +203,13 @@ def get_brand_ecu_matches(ecu_rx_addrs):
return brand_matches
return brand_matches
def di sabl e_obd_multiplexing( params ) :
def set _obd_multiplexing ( params : Params , obd_multiplexing : bool ) :
if not params . get_bool ( " ObdMultiplexingDis abled " ) :
if params . get_bool ( " ObdMultiplexingEn abled " ) != obd_multiplexing :
params . put_bool ( " FirmwareObdQueryDone " , True )
cloudlog . warning ( f " Setting OBD multiplexing to { obd_multiplexing } " )
params . remove ( " ObdMultiplexingChanged " )
cloudlog . warning ( " Waiting for OBD multiplexing to be disabled " )
params . put_bool ( " ObdMultiplexingEnabled " , obd_multiplexing )
params . get_bool ( " ObdMultiplexingDisabl ed " , block = True )
params . get_bool ( " ObdMultiplexingChang ed " , block = True )
cloudlog . warning ( " OBD multiplexing disabled " )
cloudlog . warning ( " OBD multiplexing set successfully " )
def get_fw_versions_ordered ( logcan , sendcan , ecu_rx_addrs , timeout = 0.1 , num_pandas = 1 , debug = False , progress = False ) :
def get_fw_versions_ordered ( logcan , sendcan , ecu_rx_addrs , timeout = 0.1 , num_pandas = 1 , debug = False , progress = False ) :
@ -212,7 +217,6 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand
all_car_fw = [ ]
all_car_fw = [ ]
brand_matches = get_brand_ecu_matches ( ecu_rx_addrs )
brand_matches = get_brand_ecu_matches ( ecu_rx_addrs )
matched_brand : Optional [ str ] = None
for brand in sorted ( brand_matches , key = lambda b : len ( brand_matches [ b ] ) , reverse = True ) :
for brand in sorted ( brand_matches , key = lambda b : len ( brand_matches [ b ] ) , reverse = True ) :
car_fw = get_fw_versions ( logcan , sendcan , query_brand = brand , timeout = timeout , num_pandas = num_pandas , debug = debug , progress = progress )
car_fw = get_fw_versions ( logcan , sendcan , query_brand = brand , timeout = timeout , num_pandas = num_pandas , debug = debug , progress = progress )
@ -220,21 +224,14 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand
# Try to match using FW returned from this brand only
# Try to match using FW returned from this brand only
matches = match_fw_to_car_exact ( build_fw_dict ( car_fw ) )
matches = match_fw_to_car_exact ( build_fw_dict ( car_fw ) )
if len ( matches ) == 1 :
if len ( matches ) == 1 :
matched_brand = brand
break
break
disable_obd_multiplexing ( Params ( ) )
# Do non-OBD queries for matched brand, or all if no match is found
for brand in FW_QUERY_CONFIGS . keys ( ) :
if brand == matched_brand or matched_brand is None :
all_car_fw . extend ( get_fw_versions ( logcan , sendcan , query_brand = brand , timeout = timeout , num_pandas = num_pandas , obd_multiplexed = False , debug = debug , progress = progress ) )
return all_car_fw
return all_car_fw
def get_fw_versions ( logcan , sendcan , query_brand = None , extra = None , timeout = 0.1 , num_pandas = 1 , obd_multiplexed = True , debug = False , progress = False ) :
def get_fw_versions ( logcan , sendcan , query_brand = None , extra = None , timeout = 0.1 , num_pandas = 1 , debug = False , progress = False ) :
versions = VERSIONS . copy ( )
versions = VERSIONS . copy ( )
params = Params ( )
# Each brand can define extra ECUs to query for data collection
# Each brand can define extra ECUs to query for data collection
for brand , config in FW_QUERY_CONFIGS . items ( ) :
for brand , config in FW_QUERY_CONFIGS . items ( ) :
@ -281,9 +278,9 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
# Skip query if no panda available
# Skip query if no panda available
if r . bus > num_pandas * 4 - 1 :
if r . bus > num_pandas * 4 - 1 :
continue
continue
# Or if request is not designated for current multiplexing mode
elif r . non_obd == obd_multiplexed :
# Toggle OBD multiplexing for each request
continue
set_obd_multiplexing ( params , r . obd_multiplexing )
try :
try :
addrs = [ ( a , s ) for ( b , a , s ) in addr_chunk if b in ( brand , ' any ' ) and
addrs = [ ( a , s ) for ( b , a , s ) in addr_chunk if b in ( brand , ' any ' ) and