diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index 44607d8357..6c02e69503 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 from collections import defaultdict -from typing import Any, DefaultDict, Dict, List, Optional, Set +from typing import Any, DefaultDict, Dict, Iterator, List, Optional, Set, TypeVar from tqdm import tqdm import capnp @@ -24,8 +24,10 @@ VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True) MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e} REQUESTS = [(brand, config, r) for brand, config in FW_QUERY_CONFIGS.items() for r in config.requests] +T = TypeVar('T') -def chunks(l, n=128): + +def chunks(l: List[T], n: int = 128) -> Iterator[List[T]]: for i in range(0, len(l), n): yield l[i:i + n] diff --git a/selfdrive/car/isotp_parallel_query.py b/selfdrive/car/isotp_parallel_query.py index 696935fd35..678fe9ea46 100644 --- a/selfdrive/car/isotp_parallel_query.py +++ b/selfdrive/car/isotp_parallel_query.py @@ -5,11 +5,14 @@ from functools import partial import cereal.messaging as messaging from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp +from openpilot.selfdrive.car.fw_query_definitions import AddrType from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_addr_for_tx_addr class IsoTpParallelQuery: - def __init__(self, sendcan, logcan, bus, addrs, request, response, response_offset=0x8, functional_addrs=None, debug=False, response_pending_timeout=10): + def __init__(self, sendcan: messaging.PubSocket, logcan: messaging.SubSocket, bus: int, addrs: list[int] | list[AddrType], + request: list[bytes], response: list[bytes], response_offset: int = 0x8, + functional_addrs: list[int] | None = None, debug: bool = False, response_pending_timeout: float = 10) -> None: self.sendcan = sendcan self.logcan = logcan self.bus = bus @@ -24,7 +27,7 @@ class IsoTpParallelQuery: assert tx_addr not in FUNCTIONAL_ADDRS, f"Functional address should be defined in functional_addrs: {hex(tx_addr)}" self.msg_addrs = {tx_addr: get_rx_addr_for_tx_addr(tx_addr[0], rx_offset=response_offset) for tx_addr in real_addrs} - self.msg_buffer = defaultdict(list) + self.msg_buffer: dict[int, list[tuple[int, int, bytes, int]]] = defaultdict(list) def rx(self): """Drain can socket and sort messages into buffers based on address""" @@ -63,7 +66,7 @@ class IsoTpParallelQuery: messaging.drain_sock_raw(self.logcan) self.msg_buffer = defaultdict(list) - def _create_isotp_msg(self, tx_addr, sub_addr, rx_addr): + def _create_isotp_msg(self, tx_addr: int, sub_addr: int | None, rx_addr: int): can_client = CanClient(self._can_tx, partial(self._can_rx, rx_addr, sub_addr=sub_addr), tx_addr, rx_addr, self.bus, sub_addr=sub_addr, debug=self.debug) @@ -73,7 +76,7 @@ class IsoTpParallelQuery: # as well as reduces chances we process messages from previous queries return IsoTpMessage(can_client, timeout=0, separation_time=0.01, debug=self.debug, max_len=max_len) - def get_data(self, timeout, total_timeout=60.): + def get_data(self, timeout: float, total_timeout: float = 60.) -> dict[AddrType, bytes]: self._drain_rx() # Create message objects