diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index 1c0d5003ec..3c18c2e7bd 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 from collections import defaultdict -from typing import Any, Dict, List, Set +from typing import Any, DefaultDict, Dict, List, Optional, Set, Tuple from tqdm import tqdm +import capnp import panda.python.uds as uds from cereal import car @@ -27,7 +28,8 @@ def chunks(l, n=128): yield l[i:i + n] -def build_fw_dict(fw_versions, filter_brand=None): +def build_fw_dict(fw_versions: List[capnp.lib.capnp._DynamicStructBuilder], + filter_brand: Optional[str] = None) -> Dict[Tuple[int, Optional[int]], Set[bytes]]: fw_versions_dict = defaultdict(set) for fw in fw_versions: if (filter_brand is None or fw.brand == filter_brand) and not fw.logging: @@ -36,14 +38,14 @@ def build_fw_dict(fw_versions, filter_brand=None): return dict(fw_versions_dict) -def get_brand_addrs(): - brand_addrs = defaultdict(set) +def get_brand_addrs() -> Dict[str, Set[Tuple[int, Optional[int]]]]: + brand_addrs: DefaultDict[str, Set[Tuple[int, Optional[int]]]] = defaultdict(set) for brand, cars in VERSIONS.items(): # Add ecus in database + extra ecus to match against brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in FW_QUERY_CONFIGS[brand].extra_ecus} for fw in cars.values(): brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in fw.keys()} - return brand_addrs + return dict(brand_addrs) def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None): @@ -214,7 +216,8 @@ def set_obd_multiplexing(params: Params, obd_multiplexing: bool): cloudlog.warning("OBD multiplexing set successfully") -def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False): +def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \ + List[capnp.lib.capnp._DynamicStructBuilder]: """Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" all_car_fw = [] @@ -235,7 +238,8 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand return all_car_fw -def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False): +def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \ + List[capnp.lib.capnp._DynamicStructBuilder]: versions = VERSIONS.copy() params = Params() @@ -286,11 +290,11 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, set_obd_multiplexing(params, r.obd_multiplexing) try: - addrs = [(a, s) for (b, a, s) in addr_chunk if b in (brand, 'any') and - (len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)] + query_addrs = [(a, s) for (b, a, s) in addr_chunk if b in (brand, 'any') and + (len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)] - if addrs: - query = IsoTpParallelQuery(sendcan, logcan, r.bus, addrs, r.request, r.response, r.rx_offset, debug=debug) + if query_addrs: + query = IsoTpParallelQuery(sendcan, logcan, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug) for (tx_addr, sub_addr), version in query.get_data(timeout).items(): f = car.CarParams.CarFw.new_message()