fw_versions: add some typing (#28369)

* typing

* sort

* brand addrs

* don't type match_fw_to_car_fuzzy

* or this unfortunately

* no space

* add back

* and this
pull/28373/head
Shane Smiskol 2 years ago committed by GitHub
parent e2545c5b24
commit 23430dd35f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      selfdrive/car/fw_versions.py

@ -1,7 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List, Set from typing import Any, DefaultDict, Dict, List, Optional, Set, Tuple
from tqdm import tqdm from tqdm import tqdm
import capnp
import panda.python.uds as uds import panda.python.uds as uds
from cereal import car from cereal import car
@ -27,7 +28,8 @@ def chunks(l, n=128):
yield l[i:i + n] yield l[i:i + n]
def build_fw_dict(fw_versions, filter_brand=None): def build_fw_dict(fw_versions: List[capnp.lib.capnp._DynamicStructBuilder],
filter_brand: Optional[str] = None) -> Dict[Tuple[int, Optional[int]], Set[bytes]]:
fw_versions_dict = defaultdict(set) fw_versions_dict = defaultdict(set)
for fw in fw_versions: for fw in fw_versions:
if (filter_brand is None or fw.brand == filter_brand) and not fw.logging: if (filter_brand is None or fw.brand == filter_brand) and not fw.logging:
@ -36,14 +38,14 @@ def build_fw_dict(fw_versions, filter_brand=None):
return dict(fw_versions_dict) return dict(fw_versions_dict)
def get_brand_addrs(): def get_brand_addrs() -> Dict[str, Set[Tuple[int, Optional[int]]]]:
brand_addrs = defaultdict(set) brand_addrs: DefaultDict[str, Set[Tuple[int, Optional[int]]]] = defaultdict(set)
for brand, cars in VERSIONS.items(): for brand, cars in VERSIONS.items():
# Add ecus in database + extra ecus to match against # Add ecus in database + extra ecus to match against
brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in FW_QUERY_CONFIGS[brand].extra_ecus} brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in FW_QUERY_CONFIGS[brand].extra_ecus}
for fw in cars.values(): for fw in cars.values():
brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in fw.keys()} brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in fw.keys()}
return brand_addrs return dict(brand_addrs)
def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None): def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None):
@ -214,7 +216,8 @@ def set_obd_multiplexing(params: Params, obd_multiplexing: bool):
cloudlog.warning("OBD multiplexing set successfully") cloudlog.warning("OBD multiplexing set successfully")
def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False): def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \
List[capnp.lib.capnp._DynamicStructBuilder]:
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" """Queries for FW versions ordering brands by likelihood, breaks when exact match is found"""
all_car_fw = [] all_car_fw = []
@ -235,7 +238,8 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand
return all_car_fw return all_car_fw
def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False): def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \
List[capnp.lib.capnp._DynamicStructBuilder]:
versions = VERSIONS.copy() versions = VERSIONS.copy()
params = Params() params = Params()
@ -286,11 +290,11 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
set_obd_multiplexing(params, r.obd_multiplexing) set_obd_multiplexing(params, r.obd_multiplexing)
try: try:
addrs = [(a, s) for (b, a, s) in addr_chunk if b in (brand, 'any') and query_addrs = [(a, s) for (b, a, s) in addr_chunk if b in (brand, 'any') and
(len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)] (len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)]
if addrs: if query_addrs:
query = IsoTpParallelQuery(sendcan, logcan, r.bus, addrs, r.request, r.response, r.rx_offset, debug=debug) query = IsoTpParallelQuery(sendcan, logcan, r.bus, query_addrs, r.request, r.response, r.rx_offset, debug=debug)
for (tx_addr, sub_addr), version in query.get_data(timeout).items(): for (tx_addr, sub_addr), version in query.get_data(timeout).items():
f = car.CarParams.CarFw.new_message() f = car.CarParams.CarFw.new_message()

Loading…
Cancel
Save