|
|
@ -1,6 +1,7 @@ |
|
|
|
#!/usr/bin/env python3 |
|
|
|
#!/usr/bin/env python3 |
|
|
|
from collections import defaultdict |
|
|
|
from collections import defaultdict |
|
|
|
from collections.abc import Callable, Iterator |
|
|
|
from collections.abc import Callable, Iterator |
|
|
|
|
|
|
|
from types import SimpleNamespace |
|
|
|
from typing import Any, Protocol, TypeVar |
|
|
|
from typing import Any, Protocol, TypeVar |
|
|
|
|
|
|
|
|
|
|
|
from tqdm import tqdm |
|
|
|
from tqdm import tqdm |
|
|
@ -11,6 +12,7 @@ from cereal import car |
|
|
|
from openpilot.selfdrive.car import carlog |
|
|
|
from openpilot.selfdrive.car import carlog |
|
|
|
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs |
|
|
|
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs |
|
|
|
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS |
|
|
|
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS |
|
|
|
|
|
|
|
from openpilot.selfdrive.car.can_definitions import CanSendCallable |
|
|
|
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions |
|
|
|
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions |
|
|
|
from openpilot.selfdrive.car.interfaces import get_interface_attr |
|
|
|
from openpilot.selfdrive.car.interfaces import get_interface_attr |
|
|
|
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery |
|
|
|
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery |
|
|
@ -171,7 +173,7 @@ def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], vi |
|
|
|
return True, set() |
|
|
|
return True, set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_present_ecus(logcan, sendcan, set_obd_multiplexing: ObdCallback, num_pandas: int = 1) -> set[EcuAddrBusType]: |
|
|
|
def get_present_ecus(logcan: SimpleNamespace, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, num_pandas: int = 1) -> set[EcuAddrBusType]: |
|
|
|
# queries are split by OBD multiplexing mode |
|
|
|
# queries are split by OBD multiplexing mode |
|
|
|
queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []} |
|
|
|
queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []} |
|
|
|
parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []} |
|
|
|
parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []} |
|
|
@ -205,7 +207,7 @@ def get_present_ecus(logcan, sendcan, set_obd_multiplexing: ObdCallback, num_pan |
|
|
|
for obd_multiplexing in queries: |
|
|
|
for obd_multiplexing in queries: |
|
|
|
set_obd_multiplexing(obd_multiplexing) |
|
|
|
set_obd_multiplexing(obd_multiplexing) |
|
|
|
for query in queries[obd_multiplexing]: |
|
|
|
for query in queries[obd_multiplexing]: |
|
|
|
ecu_responses.update(get_ecu_addrs(logcan, sendcan, set(query), responses, timeout=0.1)) |
|
|
|
ecu_responses.update(get_ecu_addrs(logcan, can_send, set(query), responses, timeout=0.1)) |
|
|
|
return ecu_responses |
|
|
|
return ecu_responses |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -227,8 +229,9 @@ def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[Ad |
|
|
|
return brand_matches |
|
|
|
return brand_matches |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing: ObdCallback, vin: str, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, |
|
|
|
def get_fw_versions_ordered(logcan: SimpleNamespace, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str, |
|
|
|
num_pandas: int = 1, debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
|
|
|
|
progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" |
|
|
|
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" |
|
|
|
|
|
|
|
|
|
|
|
all_car_fw = [] |
|
|
|
all_car_fw = [] |
|
|
@ -239,7 +242,7 @@ def get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing: ObdCallback, |
|
|
|
if not len(brand_matches[brand]): |
|
|
|
if not len(brand_matches[brand]): |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
car_fw = get_fw_versions(logcan, sendcan, set_obd_multiplexing, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug, progress=progress) |
|
|
|
car_fw = get_fw_versions(logcan, can_send, set_obd_multiplexing, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug, progress=progress) |
|
|
|
all_car_fw.extend(car_fw) |
|
|
|
all_car_fw.extend(car_fw) |
|
|
|
|
|
|
|
|
|
|
|
# If there is a match using this brand's FW alone, finish querying early |
|
|
|
# If there is a match using this brand's FW alone, finish querying early |
|
|
@ -250,8 +253,9 @@ def get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing: ObdCallback, |
|
|
|
return all_car_fw |
|
|
|
return all_car_fw |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions(logcan, sendcan, set_obd_multiplexing: ObdCallback, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, |
|
|
|
def get_fw_versions(logcan: SimpleNamespace, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str = None, |
|
|
|
num_pandas: int = 1, debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
|
|
|
|
progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
versions = VERSIONS.copy() |
|
|
|
versions = VERSIONS.copy() |
|
|
|
|
|
|
|
|
|
|
|
if query_brand is not None: |
|
|
|
if query_brand is not None: |
|
|
@ -301,7 +305,7 @@ def get_fw_versions(logcan, sendcan, set_obd_multiplexing: ObdCallback, query_br |
|
|
|
(len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)] |
|
|
|
(len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)] |
|
|
|
|
|
|
|
|
|
|
|
if query_addrs: |
|
|
|
if query_addrs: |
|
|
|
query = IsoTpParallelQuery(sendcan, logcan, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) |
|
|
|
query = IsoTpParallelQuery(can_send, logcan, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) |
|
|
|
for (tx_addr, sub_addr), version in query.get_data(timeout).items(): |
|
|
|
for (tx_addr, sub_addr), version in query.get_data(timeout).items(): |
|
|
|
f = car.CarParams.CarFw.new_message() |
|
|
|
f = car.CarParams.CarFw.new_message() |
|
|
|
|
|
|
|
|
|
|
|