|
|
|
@ -147,7 +147,24 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True): |
|
|
|
|
return True, set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[Tuple[int, Optional[int], int]]: |
|
|
|
|
def get_brand_addrs_bus(num_pandas=1): |
|
|
|
|
brand_addrs = defaultdict(set) |
|
|
|
|
for brand, r in REQUESTS: |
|
|
|
|
# Skip query if no panda available |
|
|
|
|
if r.bus > num_pandas * 4 - 1: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
for brand_versions in VERSIONS[brand].values(): |
|
|
|
|
for ecu_type, addr, sub_addr in brand_versions: |
|
|
|
|
# Only query ecus in whitelist if whitelist is not empty |
|
|
|
|
if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus: |
|
|
|
|
brand_addrs[brand].add((addr, sub_addr, r.bus)) |
|
|
|
|
|
|
|
|
|
return brand_addrs |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_present_ecus_original(logcan, sendcan, num_pandas=1) -> Set[Tuple[int, Optional[int], int]]: |
|
|
|
|
# TODO: batch requests by multiplexed OBD |
|
|
|
|
queries = list() |
|
|
|
|
parallel_queries = list() |
|
|
|
@ -175,14 +192,61 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[Tuple[int, Optional[i |
|
|
|
|
response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset) |
|
|
|
|
responses.add((response_addr, sub_addr, r.bus)) |
|
|
|
|
|
|
|
|
|
# print(queries) |
|
|
|
|
# print() |
|
|
|
|
# print(parallel_queries) |
|
|
|
|
queries.insert(0, parallel_queries) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ecu_responses = set() |
|
|
|
|
for query in queries: |
|
|
|
|
print(query) |
|
|
|
|
ecu_responses.update(get_ecu_addrs(logcan, sendcan, set(query), responses, timeout=0.1)) |
|
|
|
|
return ecu_responses |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[Tuple[int, Optional[int], int]]: |
|
|
|
|
# TODO: batch requests by multiplexed OBD |
|
|
|
|
params = Params() |
|
|
|
|
queries = {"obd": [], "non_obd": []} # list() |
|
|
|
|
parallel_queries = {"obd": [], "non_obd": []} # list() |
|
|
|
|
responses = set() |
|
|
|
|
|
|
|
|
|
for brand, r in REQUESTS: |
|
|
|
|
# Skip query if no panda available |
|
|
|
|
if r.bus > num_pandas * 4 - 1: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
for brand_versions in VERSIONS[brand].values(): |
|
|
|
|
for ecu_type, addr, sub_addr in brand_versions: |
|
|
|
|
# Only query ecus in whitelist if whitelist is not empty |
|
|
|
|
if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus: |
|
|
|
|
a = (addr, sub_addr, r.bus) |
|
|
|
|
print((a, r.non_obd)) |
|
|
|
|
# Build set of queries |
|
|
|
|
multiplexed_key = "non_obd" if r.non_obd else "obd" |
|
|
|
|
if sub_addr is None: |
|
|
|
|
if a not in parallel_queries[multiplexed_key]: |
|
|
|
|
parallel_queries[multiplexed_key].append(a) |
|
|
|
|
else: # subaddresses must be queried one by one |
|
|
|
|
if [a] not in queries[multiplexed_key]: |
|
|
|
|
queries[multiplexed_key].append([a]) |
|
|
|
|
|
|
|
|
|
# Build set of expected responses to filter |
|
|
|
|
response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset) |
|
|
|
|
responses.add((response_addr, sub_addr, r.bus)) |
|
|
|
|
|
|
|
|
|
for multiplexed_key in queries: |
|
|
|
|
queries[multiplexed_key].insert(0, parallel_queries[multiplexed_key]) |
|
|
|
|
|
|
|
|
|
ecu_responses = set() |
|
|
|
|
for multiplexed_key in queries: |
|
|
|
|
set_obd_multiplexing(params, multiplexed_key == 'obd') |
|
|
|
|
for query in queries[multiplexed_key]: |
|
|
|
|
ecu_responses.update(get_ecu_addrs(logcan, sendcan, set(query), responses, timeout=0.1)) |
|
|
|
|
return ecu_responses |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_brand_ecu_matches(ecu_rx_addrs): |
|
|
|
|
"""Returns dictionary of brands and matches with ECUs in their FW versions""" |
|
|
|
|
|
|
|
|
|