IsoTpParallelQuery: type hinting (#31370)

* start typing

* fixes

* we only plan on using this for lists

* sort

* this is the correct way to type it

* it knows this!

* it's getting smarter
pull/31254/head^2
Shane Smiskol 1 year ago committed by GitHub
parent b6d195f010
commit 74ec33f7c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      selfdrive/car/fw_versions.py
  2. 11
      selfdrive/car/isotp_parallel_query.py

@ -1,6 +1,6 @@
#!/usr/bin/env python3
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Optional, Set
from typing import Any, DefaultDict, Dict, Iterator, List, Optional, Set, TypeVar
from tqdm import tqdm
import capnp
@ -24,8 +24,10 @@ VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True)
MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e}
REQUESTS = [(brand, config, r) for brand, config in FW_QUERY_CONFIGS.items() for r in config.requests]
T = TypeVar('T')
def chunks(l, n=128):
def chunks(l: List[T], n: int = 128) -> Iterator[List[T]]:
for i in range(0, len(l), n):
yield l[i:i + n]

@ -5,11 +5,14 @@ from functools import partial
import cereal.messaging as messaging
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp
from openpilot.selfdrive.car.fw_query_definitions import AddrType
from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_addr_for_tx_addr
class IsoTpParallelQuery:
def __init__(self, sendcan, logcan, bus, addrs, request, response, response_offset=0x8, functional_addrs=None, debug=False, response_pending_timeout=10):
def __init__(self, sendcan: messaging.PubSocket, logcan: messaging.SubSocket, bus: int, addrs: list[int] | list[AddrType],
request: list[bytes], response: list[bytes], response_offset: int = 0x8,
functional_addrs: list[int] | None = None, debug: bool = False, response_pending_timeout: float = 10) -> None:
self.sendcan = sendcan
self.logcan = logcan
self.bus = bus
@ -24,7 +27,7 @@ class IsoTpParallelQuery:
assert tx_addr not in FUNCTIONAL_ADDRS, f"Functional address should be defined in functional_addrs: {hex(tx_addr)}"
self.msg_addrs = {tx_addr: get_rx_addr_for_tx_addr(tx_addr[0], rx_offset=response_offset) for tx_addr in real_addrs}
self.msg_buffer = defaultdict(list)
self.msg_buffer: dict[int, list[tuple[int, int, bytes, int]]] = defaultdict(list)
def rx(self):
"""Drain can socket and sort messages into buffers based on address"""
@ -63,7 +66,7 @@ class IsoTpParallelQuery:
messaging.drain_sock_raw(self.logcan)
self.msg_buffer = defaultdict(list)
def _create_isotp_msg(self, tx_addr, sub_addr, rx_addr):
def _create_isotp_msg(self, tx_addr: int, sub_addr: int | None, rx_addr: int):
can_client = CanClient(self._can_tx, partial(self._can_rx, rx_addr, sub_addr=sub_addr), tx_addr, rx_addr,
self.bus, sub_addr=sub_addr, debug=self.debug)
@ -73,7 +76,7 @@ class IsoTpParallelQuery:
# as well as reduces chances we process messages from previous queries
return IsoTpMessage(can_client, timeout=0, separation_time=0.01, debug=self.debug, max_len=max_len)
def get_data(self, timeout, total_timeout=60.):
def get_data(self, timeout: float, total_timeout: float = 60.) -> dict[AddrType, bytes]:
self._drain_rx()
# Create message objects

Loading…
Cancel
Save