fw_versions: cleanup common types (#27658)

* use common type

* use it in fw_versions

* add typing to queries
old-commit-hash: 727a94cc78
beeps
Shane Smiskol 2 years ago committed by GitHub
parent cde221ba7c
commit 746f0ba63b
  1. 12
      selfdrive/car/ecu_addrs.py
  2. 10
      selfdrive/car/fw_versions.py

@ -9,6 +9,8 @@ from selfdrive.car import make_can_msg
from selfdrive.boardd.boardd import can_list_to_can_capnp
from system.swaglog import cloudlog
EcuAddrBusType = Tuple[int, Optional[int], int]
def make_tester_present_msg(addr, bus, subaddr=None):
dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0]
@ -33,16 +35,16 @@ def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subadd
return False
def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> Set[Tuple[int, Optional[int], int]]:
def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> Set[EcuAddrBusType]:
addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)]
queries: Set[Tuple[int, Optional[int], int]] = {(addr, None, bus) for addr in addr_list}
queries: Set[EcuAddrBusType] = {(addr, None, bus) for addr in addr_list}
responses = queries
return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug)
def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: Set[Tuple[int, Optional[int], int]],
responses: Set[Tuple[int, Optional[int], int]], timeout: float = 1, debug: bool = False) -> Set[Tuple[int, Optional[int], int]]:
ecu_responses: Set[Tuple[int, Optional[int], int]] = set() # set((addr, subaddr, bus),)
def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: Set[EcuAddrBusType],
responses: Set[EcuAddrBusType], timeout: float = 1, debug: bool = False) -> Set[EcuAddrBusType]:
ecu_responses: Set[EcuAddrBusType] = set() # set((addr, subaddr, bus),)
try:
msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries]

@ -1,12 +1,12 @@
#!/usr/bin/env python3
from collections import defaultdict
from typing import Any, Optional, Set, Tuple
from typing import Any, List, Optional, Set
from tqdm import tqdm
import panda.python.uds as uds
from cereal import car
from common.params import Params
from selfdrive.car.ecu_addrs import get_ecu_addrs
from selfdrive.car.ecu_addrs import EcuAddrBusType, get_ecu_addrs
from selfdrive.car.interfaces import get_interface_attr
from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
@ -146,9 +146,9 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True):
return True, set()
def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[Tuple[int, Optional[int], int]]:
queries = list()
parallel_queries = list()
def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
queries: List[List[EcuAddrBusType]] = list()
parallel_queries: List[EcuAddrBusType] = list()
responses = set()
for brand, r in REQUESTS:

Loading…
Cancel
Save