@ -19,7 +19,7 @@ FW_QUERY_CONFIGS = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True)
VERSIONS = get_interface_attr ( ' FW_VERSIONS ' , ignore_none = True )
MODEL_TO_BRAND = { c : b for b , e in VERSIONS . items ( ) for c in e }
REQUESTS = [ ( brand , r ) for brand , config in FW_QUERY_CONFIGS . items ( ) for r in config . requests ]
REQUESTS = [ ( brand , config , r ) for brand , config in FW_QUERY_CONFIGS . items ( ) for r in config . requests ]
def chunks ( l , n = 128 ) :
@ -153,7 +153,7 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
parallel_queries : Dict [ bool , List [ EcuAddrBusType ] ] = { True : [ ] , False : [ ] }
responses = set ( )
for brand , r in REQUESTS :
for brand , _ , r in REQUESTS :
# Skip query if no panda available
if r . bus > num_pandas * 4 - 1 :
continue
@ -190,9 +190,9 @@ def get_brand_ecu_matches(ecu_rx_addrs):
""" Returns dictionary of brands and matches with ECUs in their FW versions """
brand_addrs = get_brand_addrs ( )
brand_matches = { brand : set ( ) for brand , _ in REQUESTS }
brand_matches = { brand : set ( ) for brand , _ , _ in REQUESTS }
brand_rx_offsets = set ( ( brand , r . rx_offset ) for brand , r in REQUESTS )
brand_rx_offsets = set ( ( brand , r . rx_offset ) for brand , _ , r in REQUESTS )
for addr , sub_addr , _ in ecu_rx_addrs :
# Since we can't know what request an ecu responded to, add matches for all possible rx offsets
for brand , rx_offset in brand_rx_offsets :
@ -247,19 +247,15 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
# ECUs using a subaddress need be queried one by one, the rest can be done in parallel
addrs = [ ]
parallel_addrs = [ ]
logging_addrs = [ ]
ecu_types = { }
for brand , brand_versions in versions . items ( ) :
for candidate , ecu in brand_versions . item s( ) :
for ecu in brand_versions . value s( ) :
for ecu_type , addr , sub_addr in ecu . keys ( ) :
a = ( brand , addr , sub_addr )
if a not in ecu_types :
ecu_types [ a ] = ecu_type
if a not in logging_addrs and candidate == " debug " :
logging_addrs . append ( a )
if sub_addr is None :
if a not in parallel_addrs :
parallel_addrs . append ( a )
@ -271,10 +267,10 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
# Get versions and build capnp list to put into CarParams
car_fw = [ ]
requests = [ ( brand , r ) for brand , r in REQUESTS if query_brand is None or brand == query_brand ]
requests = [ ( brand , config , r ) for brand , config , r in REQUESTS if query_brand is None or brand == query_brand ]
for addr in tqdm ( addrs , disable = not progress ) :
for addr_chunk in chunks ( addr ) :
for brand , r in requests :
for brand , config , r in requests :
# Skip query if no panda available
if r . bus > num_pandas * 4 - 1 :
continue
@ -292,15 +288,14 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
for ( tx_addr , sub_addr ) , version in query . get_data ( timeout ) . items ( ) :
f = car . CarParams . CarFw . new_message ( )
ecu_key = ( brand , tx_addr , sub_addr )
f . ecu = ecu_types . get ( ecu_key , Ecu . unknown )
f . ecu = ecu_types . get ( ( brand , tx_addr , sub_addr ) , Ecu . unknown )
f . fwVersion = version
f . address = tx_addr
f . responseAddress = uds . get_rx_addr_for_tx_addr ( tx_addr , r . rx_offset )
f . request = r . request
f . brand = brand
f . bus = r . bus
f . logging = r . logging or ecu_key in logging_addr s
f . logging = r . logging or ( f . ecu , tx_addr , sub_addr ) in config . extra_ecu s
f . obdMultiplexing = r . obd_multiplexing
if sub_addr is not None :