|
|
@ -1,21 +1,20 @@ |
|
|
|
#!/usr/bin/env python3 |
|
|
|
#!/usr/bin/env python3 |
|
|
|
from collections import defaultdict |
|
|
|
from collections import defaultdict |
|
|
|
from collections.abc import Callable, Iterator |
|
|
|
from collections.abc import Callable, Iterator |
|
|
|
from typing import Any, Protocol, TypeVar |
|
|
|
from typing import Protocol, TypeVar |
|
|
|
|
|
|
|
|
|
|
|
from tqdm import tqdm |
|
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
|
|
import panda.python.uds as uds |
|
|
|
import panda.python.uds as uds |
|
|
|
from openpilot.selfdrive.car import carlog |
|
|
|
from openpilot.selfdrive.car import carlog, structs |
|
|
|
from openpilot.selfdrive.car.can_definitions import CanRecvCallable, CanSendCallable |
|
|
|
from openpilot.selfdrive.car.can_definitions import CanRecvCallable, CanSendCallable |
|
|
|
from openpilot.selfdrive.car.structs import CarParams |
|
|
|
|
|
|
|
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs |
|
|
|
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs |
|
|
|
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS |
|
|
|
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS |
|
|
|
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions |
|
|
|
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions |
|
|
|
from openpilot.selfdrive.car.interfaces import get_interface_attr |
|
|
|
from openpilot.selfdrive.car.interfaces import get_interface_attr |
|
|
|
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery |
|
|
|
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery |
|
|
|
|
|
|
|
|
|
|
|
Ecu = CarParams.Ecu |
|
|
|
Ecu = structs.CarParams.Ecu |
|
|
|
ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] |
|
|
|
ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] |
|
|
|
FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] |
|
|
|
FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] |
|
|
|
|
|
|
|
|
|
|
@ -39,7 +38,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool: |
|
|
|
return filter_brand is None or brand == filter_brand |
|
|
|
return filter_brand is None or brand == filter_brand |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_fw_dict(fw_versions: list[CarParams.CarFw], filter_brand: str = None) -> dict[AddrType, set[bytes]]: |
|
|
|
def build_fw_dict(fw_versions: list[structs.CarParams.CarFw], filter_brand: str = None) -> dict[AddrType, set[bytes]]: |
|
|
|
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set) |
|
|
|
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set) |
|
|
|
for fw in fw_versions: |
|
|
|
for fw in fw_versions: |
|
|
|
if is_brand(fw.brand, filter_brand) and not fw.logging: |
|
|
|
if is_brand(fw.brand, filter_brand) and not fw.logging: |
|
|
@ -144,7 +143,7 @@ def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = N |
|
|
|
return set(candidates.keys()) - invalid |
|
|
|
return set(candidates.keys()) - invalid |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match_fw_to_car(fw_versions: list[CarParams.CarFw], vin: str, allow_exact: bool = True, |
|
|
|
def match_fw_to_car(fw_versions: list[structs.CarParams.CarFw], vin: str, allow_exact: bool = True, |
|
|
|
allow_fuzzy: bool = True, log: bool = True) -> tuple[bool, set[str]]: |
|
|
|
allow_fuzzy: bool = True, log: bool = True) -> tuple[bool, set[str]]: |
|
|
|
# Try exact matching first |
|
|
|
# Try exact matching first |
|
|
|
exact_matches: list[tuple[bool, MatchFwToCar]] = [] |
|
|
|
exact_matches: list[tuple[bool, MatchFwToCar]] = [] |
|
|
@ -229,7 +228,7 @@ def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[Ad |
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str, |
|
|
|
def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str, |
|
|
|
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
progress: bool = False) -> list[CarParams.CarFw]: |
|
|
|
progress: bool = False) -> list[structs.CarParams.CarFw]: |
|
|
|
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" |
|
|
|
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" |
|
|
|
|
|
|
|
|
|
|
|
all_car_fw = [] |
|
|
|
all_car_fw = [] |
|
|
@ -254,7 +253,7 @@ def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable |
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str = None, |
|
|
|
def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str = None, |
|
|
|
extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, |
|
|
|
progress: bool = False) -> list[CarParams.CarFw]: |
|
|
|
progress: bool = False) -> list[structs.CarParams.CarFw]: |
|
|
|
versions = VERSIONS.copy() |
|
|
|
versions = VERSIONS.copy() |
|
|
|
|
|
|
|
|
|
|
|
if query_brand is not None: |
|
|
|
if query_brand is not None: |
|
|
@ -306,7 +305,7 @@ def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_ob |
|
|
|
if query_addrs: |
|
|
|
if query_addrs: |
|
|
|
query = IsoTpParallelQuery(can_send, can_recv, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) |
|
|
|
query = IsoTpParallelQuery(can_send, can_recv, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) |
|
|
|
for (tx_addr, sub_addr), version in query.get_data(timeout).items(): |
|
|
|
for (tx_addr, sub_addr), version in query.get_data(timeout).items(): |
|
|
|
f = CarParams.CarFw() |
|
|
|
f = structs.CarParams.CarFw() |
|
|
|
|
|
|
|
|
|
|
|
f.ecu = ecu_types.get((brand, tx_addr, sub_addr), Ecu.unknown) |
|
|
|
f.ecu = ecu_types.get((brand, tx_addr, sub_addr), Ecu.unknown) |
|
|
|
f.fwVersion = version |
|
|
|
f.fwVersion = version |
|
|
@ -326,69 +325,3 @@ def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_ob |
|
|
|
carlog.exception("FW query exception") |
|
|
|
carlog.exception("FW query exception") |
|
|
|
|
|
|
|
|
|
|
|
return car_fw |
|
|
|
return car_fw |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
import time |
|
|
|
|
|
|
|
import argparse |
|
|
|
|
|
|
|
import cereal.messaging as messaging |
|
|
|
|
|
|
|
from openpilot.common.params import Params |
|
|
|
|
|
|
|
from openpilot.selfdrive.car.vin import get_vin |
|
|
|
|
|
|
|
from openpilot.selfdrive.car.card import can_comm_callbacks, obd_callback |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Get firmware version of ECUs') |
|
|
|
|
|
|
|
parser.add_argument('--scan', action='store_true') |
|
|
|
|
|
|
|
parser.add_argument('--debug', action='store_true') |
|
|
|
|
|
|
|
parser.add_argument('--brand', help='Only query addresses/with requests for this brand') |
|
|
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logcan = messaging.sub_sock('can') |
|
|
|
|
|
|
|
pandaStates_sock = messaging.sub_sock('pandaStates') |
|
|
|
|
|
|
|
sendcan = messaging.pub_sock('sendcan') |
|
|
|
|
|
|
|
can_callbacks = can_comm_callbacks(logcan, sendcan) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Set up params for pandad |
|
|
|
|
|
|
|
params = Params() |
|
|
|
|
|
|
|
params.remove("FirmwareQueryDone") |
|
|
|
|
|
|
|
params.put_bool("IsOnroad", False) |
|
|
|
|
|
|
|
time.sleep(0.2) # thread is 10 Hz |
|
|
|
|
|
|
|
params.put_bool("IsOnroad", True) |
|
|
|
|
|
|
|
set_obd_multiplexing = obd_callback(params) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
extra: Any = None |
|
|
|
|
|
|
|
if args.scan: |
|
|
|
|
|
|
|
extra = {} |
|
|
|
|
|
|
|
# Honda |
|
|
|
|
|
|
|
for i in range(256): |
|
|
|
|
|
|
|
extra[(Ecu.unknown, 0x18da00f1 + (i << 8), None)] = [] |
|
|
|
|
|
|
|
extra[(Ecu.unknown, 0x700 + i, None)] = [] |
|
|
|
|
|
|
|
extra[(Ecu.unknown, 0x750, i)] = [] |
|
|
|
|
|
|
|
extra = {"any": {"debug": extra}} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
num_pandas = len(messaging.recv_one_retry(pandaStates_sock).pandaStates) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
t = time.time() |
|
|
|
|
|
|
|
print("Getting vin...") |
|
|
|
|
|
|
|
set_obd_multiplexing(True) |
|
|
|
|
|
|
|
vin_rx_addr, vin_rx_bus, vin = get_vin(*can_callbacks, (0, 1), debug=args.debug) |
|
|
|
|
|
|
|
print(f'RX: {hex(vin_rx_addr)}, BUS: {vin_rx_bus}, VIN: {vin}') |
|
|
|
|
|
|
|
print(f"Getting VIN took {time.time() - t:.3f} s") |
|
|
|
|
|
|
|
print() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
t = time.time() |
|
|
|
|
|
|
|
fw_vers = get_fw_versions(*can_callbacks, set_obd_multiplexing, query_brand=args.brand, extra=extra, num_pandas=num_pandas, debug=args.debug, progress=True) |
|
|
|
|
|
|
|
_, candidates = match_fw_to_car(fw_vers, vin) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print() |
|
|
|
|
|
|
|
print("Found FW versions") |
|
|
|
|
|
|
|
print("{") |
|
|
|
|
|
|
|
padding = max([len(fw.brand) for fw in fw_vers] or [0]) |
|
|
|
|
|
|
|
for version in fw_vers: |
|
|
|
|
|
|
|
subaddr = None if version.subAddress == 0 else hex(version.subAddress) |
|
|
|
|
|
|
|
print(f" Brand: {version.brand:{padding}}, bus: {version.bus}, OBD: {version.obdMultiplexing} - " + |
|
|
|
|
|
|
|
f"(Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion!r}]") |
|
|
|
|
|
|
|
print("}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print() |
|
|
|
|
|
|
|
print("Possible matches:", candidates) |
|
|
|
|
|
|
|
print(f"Getting fw took {time.time() - t:.3f} s") |
|
|
|
|
|
|
|