|
|
@ -4,7 +4,6 @@ from collections.abc import Callable, Iterator |
|
|
|
from typing import Any, Protocol, TypeVar |
|
|
|
from typing import Any, Protocol, TypeVar |
|
|
|
|
|
|
|
|
|
|
|
from tqdm import tqdm |
|
|
|
from tqdm import tqdm |
|
|
|
import capnp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import panda.python.uds as uds |
|
|
|
import panda.python.uds as uds |
|
|
|
from openpilot.selfdrive.car import carlog |
|
|
|
from openpilot.selfdrive.car import carlog |
|
|
@ -40,7 +39,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool: |
|
|
|
return filter_brand is None or brand == filter_brand |
|
|
|
return filter_brand is None or brand == filter_brand |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], filter_brand: str = None) -> dict[AddrType, set[bytes]]: |
|
|
|
def build_fw_dict(fw_versions: list[CarParams.CarFw], filter_brand: str = None) -> dict[AddrType, set[bytes]]: |
|
|
|
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set) |
|
|
|
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set) |
|
|
|
for fw in fw_versions: |
|
|
|
for fw in fw_versions: |
|
|
|
if is_brand(fw.brand, filter_brand) and not fw.logging: |
|
|
|
if is_brand(fw.brand, filter_brand) and not fw.logging: |
|
|
@ -145,8 +144,8 @@ def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = N |
|
|
|
return set(candidates.keys()) - invalid |
|
|
|
return set(candidates.keys()) - invalid |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], vin: str, |
|
|
|
def match_fw_to_car(fw_versions: list[CarParams.CarFw], vin: str, allow_exact: bool = True, |
|
|
|
allow_exact: bool = True, allow_fuzzy: bool = True, log: bool = True) -> tuple[bool, set[str]]: |
|
|
|
allow_fuzzy: bool = True, log: bool = True) -> tuple[bool, set[str]]: |
|
|
|
# Try exact matching first |
|
|
|
# Try exact matching first |
|
|
|
exact_matches: list[tuple[bool, MatchFwToCar]] = [] |
|
|
|
exact_matches: list[tuple[bool, MatchFwToCar]] = [] |
|
|
|
if allow_exact: |
|
|
|
if allow_exact: |
|
|
@ -230,7 +229,7 @@ def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[Ad |
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str, |
|
|
|
def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str, |
|
|
|
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
progress: bool = False) -> list[CarParams.CarFw]: |
|
|
|
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" |
|
|
|
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" |
|
|
|
|
|
|
|
|
|
|
|
all_car_fw = [] |
|
|
|
all_car_fw = [] |
|
|
@ -255,7 +254,7 @@ def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable |
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str = None, |
|
|
|
def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str = None, |
|
|
|
extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
progress: bool = False) -> list[CarParams.CarFw]: |
|
|
|
versions = VERSIONS.copy() |
|
|
|
versions = VERSIONS.copy() |
|
|
|
|
|
|
|
|
|
|
|
if query_brand is not None: |
|
|
|
if query_brand is not None: |
|
|
@ -307,7 +306,7 @@ def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_ob |
|
|
|
if query_addrs: |
|
|
|
if query_addrs: |
|
|
|
query = IsoTpParallelQuery(can_send, can_recv, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) |
|
|
|
query = IsoTpParallelQuery(can_send, can_recv, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) |
|
|
|
for (tx_addr, sub_addr), version in query.get_data(timeout).items(): |
|
|
|
for (tx_addr, sub_addr), version in query.get_data(timeout).items(): |
|
|
|
f = CarParams.CarFw.new_message() |
|
|
|
f = CarParams.CarFw() |
|
|
|
|
|
|
|
|
|
|
|
f.ecu = ecu_types.get((brand, tx_addr, sub_addr), Ecu.unknown) |
|
|
|
f.ecu = ecu_types.get((brand, tx_addr, sub_addr), Ecu.unknown) |
|
|
|
f.fwVersion = version |
|
|
|
f.fwVersion = version |
|
|
@ -386,7 +385,7 @@ if __name__ == "__main__": |
|
|
|
for version in fw_vers: |
|
|
|
for version in fw_vers: |
|
|
|
subaddr = None if version.subAddress == 0 else hex(version.subAddress) |
|
|
|
subaddr = None if version.subAddress == 0 else hex(version.subAddress) |
|
|
|
print(f" Brand: {version.brand:{padding}}, bus: {version.bus}, OBD: {version.obdMultiplexing} - " + |
|
|
|
print(f" Brand: {version.brand:{padding}}, bus: {version.bus}, OBD: {version.obdMultiplexing} - " + |
|
|
|
f"(Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion}]") |
|
|
|
f"(Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion!r}]") |
|
|
|
print("}") |
|
|
|
print("}") |
|
|
|
|
|
|
|
|
|
|
|
print() |
|
|
|
print() |
|
|
|