clean up common type

pull/27656/head
Shane Smiskol 2 years ago
parent 19c0e29139
commit 654b4e9f7e
  1. 12
      selfdrive/car/ecu_addrs.py
  2. 10
      selfdrive/car/fw_versions.py

@ -9,6 +9,8 @@ from selfdrive.car import make_can_msg
from selfdrive.boardd.boardd import can_list_to_can_capnp from selfdrive.boardd.boardd import can_list_to_can_capnp
from system.swaglog import cloudlog from system.swaglog import cloudlog
EcuAddrBusType = Tuple[int, Optional[int], int]
def make_tester_present_msg(addr, bus, subaddr=None): def make_tester_present_msg(addr, bus, subaddr=None):
dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0] dat = [0x02, SERVICE_TYPE.TESTER_PRESENT, 0x0]
@ -33,16 +35,16 @@ def is_tester_present_response(msg: capnp.lib.capnp._DynamicStructReader, subadd
return False return False
def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> Set[Tuple[int, Optional[int], int]]: def get_all_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, bus: int, timeout: float = 1, debug: bool = True) -> Set[EcuAddrBusType]:
addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)] addr_list = [0x700 + i for i in range(256)] + [0x18da00f1 + (i << 8) for i in range(256)]
queries: Set[Tuple[int, Optional[int], int]] = {(addr, None, bus) for addr in addr_list} queries: Set[EcuAddrBusType] = {(addr, None, bus) for addr in addr_list}
responses = queries responses = queries
return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug) return get_ecu_addrs(logcan, sendcan, queries, responses, timeout=timeout, debug=debug)
def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: Set[Tuple[int, Optional[int], int]], def get_ecu_addrs(logcan: messaging.SubSocket, sendcan: messaging.PubSocket, queries: Set[EcuAddrBusType],
responses: Set[Tuple[int, Optional[int], int]], timeout: float = 1, debug: bool = False) -> Set[Tuple[int, Optional[int], int]]: responses: Set[EcuAddrBusType], timeout: float = 1, debug: bool = False) -> Set[EcuAddrBusType]:
ecu_responses: Set[Tuple[int, Optional[int], int]] = set() # set((addr, subaddr, bus),) ecu_responses: Set[EcuAddrBusType] = set() # set((addr, subaddr, bus),)
try: try:
msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries] msgs = [make_tester_present_msg(addr, bus, subaddr) for addr, subaddr, bus in queries]

@ -1,13 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import time import time
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple from typing import Any, Dict, List, Set
from tqdm import tqdm from tqdm import tqdm
import panda.python.uds as uds import panda.python.uds as uds
from cereal import car from cereal import car
from common.params import Params from common.params import Params
from selfdrive.car.ecu_addrs import get_ecu_addrs from selfdrive.car.ecu_addrs import EcuAddrBusType, get_ecu_addrs
from selfdrive.car.interfaces import get_interface_attr from selfdrive.car.interfaces import get_interface_attr
from selfdrive.car.fingerprints import FW_VERSIONS from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
@ -147,10 +147,10 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True):
return True, set() return True, set()
def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[Tuple[int, Optional[int], int]]: def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
params = Params() params = Params()
queries: Dict[bool, List[List[Tuple[int, Optional[int], int]]]] = defaultdict(list) queries: Dict[bool, List[List[EcuAddrBusType]]] = defaultdict(list)
parallel_queries: Dict[bool, List[Tuple[int, Optional[int], int]]] = defaultdict(list) parallel_queries: Dict[bool, List[EcuAddrBusType]] = defaultdict(list)
responses = set() responses = set()
for brand, r in REQUESTS: for brand, r in REQUESTS:

Loading…
Cancel
Save