diff --git a/.importlinter b/.importlinter index 49c6bc215f..15fc2f50a5 100644 --- a/.importlinter +++ b/.importlinter @@ -37,12 +37,10 @@ ignore_imports = openpilot.selfdrive.car.interfaces -> openpilot.common.basedir # params will need to move to new openpilot files that just call car files - openpilot.selfdrive.car.fw_versions -> openpilot.common.params openpilot.selfdrive.car.ecu_addrs -> openpilot.common.params openpilot.selfdrive.car.vin -> cereal.messaging openpilot.selfdrive.car.disable_ecu -> cereal.messaging openpilot.selfdrive.car.ecu_addrs -> cereal.messaging - openpilot.selfdrive.car.fw_versions -> cereal.messaging # these are okay openpilot.selfdrive.car.card -> openpilot.common.swaglog diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index 0aa0f4754f..78ddb3a5e9 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -1,21 +1,20 @@ #!/usr/bin/env python3 from collections import defaultdict from collections.abc import Callable, Iterator -from typing import Any, Protocol, TypeVar +from typing import Protocol, TypeVar from tqdm import tqdm import panda.python.uds as uds -from openpilot.selfdrive.car import carlog +from openpilot.selfdrive.car import carlog, structs from openpilot.selfdrive.car.can_definitions import CanRecvCallable, CanSendCallable -from openpilot.selfdrive.car.structs import CarParams from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs from openpilot.selfdrive.car.fingerprints import FW_VERSIONS from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions from openpilot.selfdrive.car.interfaces import get_interface_attr from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery -Ecu = CarParams.Ecu +Ecu = structs.CarParams.Ecu ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] @@ -39,7 +38,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool: return filter_brand is None or brand == filter_brand -def build_fw_dict(fw_versions: list[CarParams.CarFw], filter_brand: str = None) -> dict[AddrType, set[bytes]]: +def build_fw_dict(fw_versions: list[structs.CarParams.CarFw], filter_brand: str = None) -> dict[AddrType, set[bytes]]: fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set) for fw in fw_versions: if is_brand(fw.brand, filter_brand) and not fw.logging: @@ -144,7 +143,7 @@ def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = N return set(candidates.keys()) - invalid -def match_fw_to_car(fw_versions: list[CarParams.CarFw], vin: str, allow_exact: bool = True, +def match_fw_to_car(fw_versions: list[structs.CarParams.CarFw], vin: str, allow_exact: bool = True, allow_fuzzy: bool = True, log: bool = True) -> tuple[bool, set[str]]: # Try exact matching first exact_matches: list[tuple[bool, MatchFwToCar]] = [] @@ -229,7 +228,7 @@ def get_brand_ecu_matches(ecu_rx_addrs: set[EcuAddrBusType]) -> dict[str, set[Ad def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, vin: str, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, - progress: bool = False) -> list[CarParams.CarFw]: + progress: bool = False) -> list[structs.CarParams.CarFw]: """Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" all_car_fw = [] @@ -254,7 +253,7 @@ def get_fw_versions_ordered(can_recv: CanRecvCallable, can_send: CanSendCallable def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, debug: bool = False, - progress: bool = False) -> list[CarParams.CarFw]: + progress: bool = False) -> list[structs.CarParams.CarFw]: versions = VERSIONS.copy() if query_brand is not None: @@ -306,7 +305,7 @@ def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_ob if query_addrs: query = IsoTpParallelQuery(can_send, can_recv, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) for (tx_addr, sub_addr), version in query.get_data(timeout).items(): - f = CarParams.CarFw() + f = structs.CarParams.CarFw() f.ecu = ecu_types.get((brand, tx_addr, sub_addr), Ecu.unknown) f.fwVersion = version @@ -326,69 +325,3 @@ def get_fw_versions(can_recv: CanRecvCallable, can_send: CanSendCallable, set_ob carlog.exception("FW query exception") return car_fw - - -if __name__ == "__main__": - import time - import argparse - import cereal.messaging as messaging - from openpilot.common.params import Params - from openpilot.selfdrive.car.vin import get_vin - from openpilot.selfdrive.car.card import can_comm_callbacks, obd_callback - - parser = argparse.ArgumentParser(description='Get firmware version of ECUs') - parser.add_argument('--scan', action='store_true') - parser.add_argument('--debug', action='store_true') - parser.add_argument('--brand', help='Only query addresses/with requests for this brand') - args = parser.parse_args() - - logcan = messaging.sub_sock('can') - pandaStates_sock = messaging.sub_sock('pandaStates') - sendcan = messaging.pub_sock('sendcan') - can_callbacks = can_comm_callbacks(logcan, sendcan) - - # Set up params for pandad - params = Params() - params.remove("FirmwareQueryDone") - params.put_bool("IsOnroad", False) - time.sleep(0.2) # thread is 10 Hz - params.put_bool("IsOnroad", True) - set_obd_multiplexing = obd_callback(params) - - extra: Any = None - if args.scan: - extra = {} - # Honda - for i in range(256): - extra[(Ecu.unknown, 0x18da00f1 + (i << 8), None)] = [] - extra[(Ecu.unknown, 0x700 + i, None)] = [] - extra[(Ecu.unknown, 0x750, i)] = [] - extra = {"any": {"debug": extra}} - - num_pandas = len(messaging.recv_one_retry(pandaStates_sock).pandaStates) - - t = time.time() - print("Getting vin...") - set_obd_multiplexing(True) - vin_rx_addr, vin_rx_bus, vin = get_vin(*can_callbacks, (0, 1), debug=args.debug) - print(f'RX: {hex(vin_rx_addr)}, BUS: {vin_rx_bus}, VIN: {vin}') - print(f"Getting VIN took {time.time() - t:.3f} s") - print() - - t = time.time() - fw_vers = get_fw_versions(*can_callbacks, set_obd_multiplexing, query_brand=args.brand, extra=extra, num_pandas=num_pandas, debug=args.debug, progress=True) - _, candidates = match_fw_to_car(fw_vers, vin) - - print() - print("Found FW versions") - print("{") - padding = max([len(fw.brand) for fw in fw_vers] or [0]) - for version in fw_vers: - subaddr = None if version.subAddress == 0 else hex(version.subAddress) - print(f" Brand: {version.brand:{padding}}, bus: {version.bus}, OBD: {version.obdMultiplexing} - " + - f"(Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion!r}]") - print("}") - - print() - print("Possible matches:", candidates) - print(f"Getting fw took {time.time() - t:.3f} s") diff --git a/selfdrive/debug/car/disable_ecu.py b/selfdrive/debug/car/disable_ecu.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selfdrive/debug/car/ecu_addrs.py b/selfdrive/debug/car/ecu_addrs.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selfdrive/debug/car/fw_versions.py b/selfdrive/debug/car/fw_versions.py new file mode 100644 index 0000000000..7433d9e6c9 --- /dev/null +++ b/selfdrive/debug/car/fw_versions.py @@ -0,0 +1,69 @@ +import time +import argparse +import cereal.messaging as messaging +from openpilot.common.params import Params +from openpilot.selfdrive.car import structs +from openpilot.selfdrive.car.card import can_comm_callbacks, obd_callback +from openpilot.selfdrive.car.fw_versions import get_fw_versions, match_fw_to_car +from openpilot.selfdrive.car.vin import get_vin +from typing import Any + +Ecu = structs.CarParams.Ecu + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Get firmware version of ECUs') + parser.add_argument('--scan', action='store_true') + parser.add_argument('--debug', action='store_true') + parser.add_argument('--brand', help='Only query addresses/with requests for this brand') + args = parser.parse_args() + + logcan = messaging.sub_sock('can') + pandaStates_sock = messaging.sub_sock('pandaStates') + sendcan = messaging.pub_sock('sendcan') + can_callbacks = can_comm_callbacks(logcan, sendcan) + + # Set up params for pandad + params = Params() + params.remove("FirmwareQueryDone") + params.put_bool("IsOnroad", False) + time.sleep(0.2) # thread is 10 Hz + params.put_bool("IsOnroad", True) + set_obd_multiplexing = obd_callback(params) + + extra: Any = None + if args.scan: + extra = {} + # Honda + for i in range(256): + extra[(Ecu.unknown, 0x18da00f1 + (i << 8), None)] = [] + extra[(Ecu.unknown, 0x700 + i, None)] = [] + extra[(Ecu.unknown, 0x750, i)] = [] + extra = {"any": {"debug": extra}} + + num_pandas = len(messaging.recv_one_retry(pandaStates_sock).pandaStates) + + t = time.time() + print("Getting vin...") + set_obd_multiplexing(True) + vin_rx_addr, vin_rx_bus, vin = get_vin(*can_callbacks, (0, 1), debug=args.debug) + print(f'RX: {hex(vin_rx_addr)}, BUS: {vin_rx_bus}, VIN: {vin}') + print(f"Getting VIN took {time.time() - t:.3f} s") + print() + + t = time.time() + fw_vers = get_fw_versions(*can_callbacks, set_obd_multiplexing, query_brand=args.brand, extra=extra, num_pandas=num_pandas, debug=args.debug, progress=True) + _, candidates = match_fw_to_car(fw_vers, vin) + + print() + print("Found FW versions") + print("{") + padding = max([len(fw.brand) for fw in fw_vers] or [0]) + for version in fw_vers: + subaddr = None if version.subAddress == 0 else hex(version.subAddress) + print(f" Brand: {version.brand:{padding}}, bus: {version.bus}, OBD: {version.obdMultiplexing} - " + + f"(Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion!r}]") + print("}") + + print() + print("Possible matches:", candidates) + print(f"Getting fw took {time.time() - t:.3f} s") diff --git a/selfdrive/debug/car/vin.py b/selfdrive/debug/car/vin.py new file mode 100644 index 0000000000..e69de29bb2