|
|
|
@ -1,6 +1,6 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
from collections import defaultdict |
|
|
|
|
from collections.abc import Iterator |
|
|
|
|
from collections.abc import Callable, Iterator |
|
|
|
|
from typing import Any, Protocol, TypeVar |
|
|
|
|
|
|
|
|
|
from tqdm import tqdm |
|
|
|
@ -27,6 +27,7 @@ MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e} |
|
|
|
|
REQUESTS = [(brand, config, r) for brand, config in FW_QUERY_CONFIGS.items() for r in config.requests] |
|
|
|
|
|
|
|
|
|
T = TypeVar('T') |
|
|
|
|
ObdCallback = Callable[[bool], None] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunks(l: list[T], n: int = 128) -> Iterator[list[T]]: |
|
|
|
@ -171,8 +172,7 @@ def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], vi |
|
|
|
|
return True, set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_present_ecus(logcan, sendcan, num_pandas: int = 1) -> set[EcuAddrBusType]: |
|
|
|
|
params = Params() |
|
|
|
|
def get_present_ecus(logcan, sendcan, set_obd_multiplexing: ObdCallback, num_pandas: int = 1) -> set[EcuAddrBusType]: |
|
|
|
|
# queries are split by OBD multiplexing mode |
|
|
|
|
queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []} |
|
|
|
|
parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []} |
|
|
|
@ -204,7 +204,7 @@ def get_present_ecus(logcan, sendcan, num_pandas: int = 1) -> set[EcuAddrBusType |
|
|
|
|
|
|
|
|
|
ecu_responses = set() |
|
|
|
|
for obd_multiplexing in queries: |
|
|
|
|
set_obd_multiplexing(params, obd_multiplexing) |
|
|
|
|
set_obd_multiplexing(obd_multiplexing) |
|
|
|
|
for query in queries[obd_multiplexing]: |
|
|
|
|
ecu_responses.update(get_ecu_addrs(logcan, sendcan, set(query), responses, timeout=0.1)) |
|
|
|
|
return ecu_responses |
|
|
|
@ -228,17 +228,8 @@ def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[Ad |
|
|
|
|
return brand_matches |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_obd_multiplexing(params: Params, obd_multiplexing: bool): |
|
|
|
|
if params.get_bool("ObdMultiplexingEnabled") != obd_multiplexing: |
|
|
|
|
carlog.warning(f"Setting OBD multiplexing to {obd_multiplexing}") |
|
|
|
|
params.remove("ObdMultiplexingChanged") |
|
|
|
|
params.put_bool("ObdMultiplexingEnabled", obd_multiplexing) |
|
|
|
|
params.get_bool("ObdMultiplexingChanged", block=True) |
|
|
|
|
carlog.warning("OBD multiplexing set successfully") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions_ordered(logcan, sendcan, vin: str, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, |
|
|
|
|
debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
|
def get_fw_versions_ordered(logcan, sendcan, set_obd_multiplexing: ObdCallback, vin: str, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, |
|
|
|
|
num_pandas: int = 1, debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
|
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" |
|
|
|
|
|
|
|
|
|
all_car_fw = [] |
|
|
|
@ -249,7 +240,7 @@ def get_fw_versions_ordered(logcan, sendcan, vin: str, ecu_rx_addrs: set[EcuAddr |
|
|
|
|
if not len(brand_matches[brand]): |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
car_fw = get_fw_versions(logcan, sendcan, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug, progress=progress) |
|
|
|
|
car_fw = get_fw_versions(logcan, sendcan, set_obd_multiplexing, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug, progress=progress) |
|
|
|
|
all_car_fw.extend(car_fw) |
|
|
|
|
|
|
|
|
|
# If there is a match using this brand's FW alone, finish querying early |
|
|
|
@ -260,10 +251,9 @@ def get_fw_versions_ordered(logcan, sendcan, vin: str, ecu_rx_addrs: set[EcuAddr |
|
|
|
|
return all_car_fw |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_fw_versions(logcan, sendcan, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, |
|
|
|
|
debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
|
def get_fw_versions(logcan, sendcan, set_obd_multiplexing: ObdCallback, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, |
|
|
|
|
num_pandas: int = 1, debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: |
|
|
|
|
versions = VERSIONS.copy() |
|
|
|
|
params = Params() |
|
|
|
|
|
|
|
|
|
if query_brand is not None: |
|
|
|
|
versions = {query_brand: versions[query_brand]} |
|
|
|
@ -305,7 +295,7 @@ def get_fw_versions(logcan, sendcan, query_brand: str = None, extra: OfflineFwVe |
|
|
|
|
|
|
|
|
|
# Toggle OBD multiplexing for each request |
|
|
|
|
if r.bus % 4 == 1: |
|
|
|
|
set_obd_multiplexing(params, r.obd_multiplexing) |
|
|
|
|
set_obd_multiplexing(r.obd_multiplexing) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
query_addrs = [(a, s) for (b, a, s) in addr_chunk if b in (brand, 'any') and |
|
|
|
@ -340,7 +330,9 @@ if __name__ == "__main__": |
|
|
|
|
import time |
|
|
|
|
import argparse |
|
|
|
|
import cereal.messaging as messaging |
|
|
|
|
from openpilot.common.params import Params |
|
|
|
|
from openpilot.selfdrive.car.vin import get_vin |
|
|
|
|
from openpilot.selfdrive.car.card import obd_callback |
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='Get firmware version of ECUs') |
|
|
|
|
parser.add_argument('--scan', action='store_true') |
|
|
|
@ -358,6 +350,7 @@ if __name__ == "__main__": |
|
|
|
|
params.put_bool("IsOnroad", False) |
|
|
|
|
time.sleep(0.2) # thread is 10 Hz |
|
|
|
|
params.put_bool("IsOnroad", True) |
|
|
|
|
set_obd_multiplexing = obd_callback(params) |
|
|
|
|
|
|
|
|
|
extra: Any = None |
|
|
|
|
if args.scan: |
|
|
|
@ -373,14 +366,14 @@ if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
t = time.time() |
|
|
|
|
print("Getting vin...") |
|
|
|
|
set_obd_multiplexing(params, True) |
|
|
|
|
set_obd_multiplexing(True) |
|
|
|
|
vin_rx_addr, vin_rx_bus, vin = get_vin(logcan, sendcan, (0, 1), debug=args.debug) |
|
|
|
|
print(f'RX: {hex(vin_rx_addr)}, BUS: {vin_rx_bus}, VIN: {vin}') |
|
|
|
|
print(f"Getting VIN took {time.time() - t:.3f} s") |
|
|
|
|
print() |
|
|
|
|
|
|
|
|
|
t = time.time() |
|
|
|
|
fw_vers = get_fw_versions(logcan, sendcan, query_brand=args.brand, extra=extra, num_pandas=num_pandas, debug=args.debug, progress=True) |
|
|
|
|
fw_vers = get_fw_versions(logcan, sendcan, set_obd_multiplexing, query_brand=args.brand, extra=extra, num_pandas=num_pandas, debug=args.debug, progress=True) |
|
|
|
|
_, candidates = match_fw_to_car(fw_vers, vin) |
|
|
|
|
|
|
|
|
|
print() |
|
|
|
|