#!/usr/bin/env python3 from collections import defaultdict from typing import Any, Optional, Set, Tuple from tqdm import tqdm import panda.python.uds as uds from cereal import car from common.params import Params from selfdrive.car.ecu_addrs import get_ecu_addrs from selfdrive.car.interfaces import get_interface_attr from selfdrive.car.fingerprints import FW_VERSIONS from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery from system.swaglog import cloudlog Ecu = car.CarParams.Ecu ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] FW_QUERY_CONFIGS = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True) VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True) MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e} REQUESTS = [(brand, r) for brand, config in FW_QUERY_CONFIGS.items() for r in config.requests] def chunks(l, n=128): for i in range(0, len(l), n): yield l[i:i + n] def build_fw_dict(fw_versions, filter_brand=None): fw_versions_dict = defaultdict(set) for fw in fw_versions: if (filter_brand is None or fw.brand == filter_brand) and not fw.logging: sub_addr = fw.subAddress if fw.subAddress != 0 else None fw_versions_dict[(fw.address, sub_addr)].add(fw.fwVersion) return dict(fw_versions_dict) def get_brand_addrs(): brand_addrs = defaultdict(set) for brand, cars in VERSIONS.items(): for fw in cars.values(): brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in fw.keys()} return brand_addrs def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None): """Do a fuzzy FW match. This function will return a match, and the number of firmware version that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars the match is rejected.""" # These ECUs are known to be shared between models (EPS only between hybrid/ICE version) # Getting this exactly right isn't crucial, but excluding camera and radar makes it almost # impossible to get 3 matching versions, even if two models with shared parts are released at the same # time and only one is in our database. exclude_types = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] # Build lookup table from (addr, sub_addr, fw) to list of candidate cars all_fw_versions = defaultdict(list) for candidate, fw_by_addr in FW_VERSIONS.items(): if candidate == exclude: continue for addr, fws in fw_by_addr.items(): if addr[0] in exclude_types: continue for f in fws: all_fw_versions[(addr[1], addr[2], f)].append(candidate) match_count = 0 candidate = None for addr, versions in fw_versions_dict.items(): for version in versions: # All cars that have this FW response on the specified address candidates = all_fw_versions[(addr[0], addr[1], version)] if len(candidates) == 1: match_count += 1 if candidate is None: candidate = candidates[0] # We uniquely matched two different cars. No fuzzy match possible elif candidate != candidates[0]: return set() if match_count >= 2: if log: cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {match_count} matching ECUs") return {candidate} else: return set() def match_fw_to_car_exact(fw_versions_dict) -> Set[str]: """Do an exact FW match. Returns all cars that match the given FW versions for a list of "essential" ECUs. If an ECU is not considered essential the FW version can be missing to get a fingerprint, but if it's present it needs to match the database.""" invalid = [] candidates = FW_VERSIONS for candidate, fws in candidates.items(): for ecu, expected_versions in fws.items(): config = FW_QUERY_CONFIGS[MODEL_TO_BRAND[candidate]] ecu_type = ecu[0] addr = ecu[1:] found_versions = fw_versions_dict.get(addr, set()) if not len(found_versions): # Some models can sometimes miss an ecu, or show on two different addresses if candidate in config.non_essential_ecus.get(ecu_type, []): continue # Ignore non essential ecus if ecu_type not in ESSENTIAL_ECUS: continue # Virtual debug ecu doesn't need to match the database if ecu_type == Ecu.debug: continue if not any([found_version in expected_versions for found_version in found_versions]): invalid.append(candidate) break return set(candidates.keys()) - set(invalid) def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True): # Try exact matching first exact_matches = [] if allow_exact: exact_matches = [(True, match_fw_to_car_exact)] if allow_fuzzy: exact_matches.append((False, match_fw_to_car_fuzzy)) for exact_match, match_func in exact_matches: # For each brand, attempt to fingerprint using all FW returned from its queries matches = set() for brand in VERSIONS.keys(): fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand) matches |= match_func(fw_versions_dict) if len(matches): return exact_match, matches return True, set() def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[Tuple[int, Optional[int], int]]: queries = list() parallel_queries = list() responses = set() for brand, r in REQUESTS: # Skip query if no panda available if r.bus > num_pandas * 4 - 1: continue for brand_versions in VERSIONS[brand].values(): for ecu_type, addr, sub_addr in brand_versions: # Only query ecus in whitelist if whitelist is not empty if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus: a = (addr, sub_addr, r.bus) # Build set of queries if sub_addr is None: if a not in parallel_queries: parallel_queries.append(a) else: # subaddresses must be queried one by one if [a] not in queries: queries.append([a]) # Build set of expected responses to filter response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset) responses.add((response_addr, sub_addr, r.bus)) queries.insert(0, parallel_queries) ecu_responses = set() for query in queries: ecu_responses.update(get_ecu_addrs(logcan, sendcan, set(query), responses, timeout=0.1)) return ecu_responses def get_brand_ecu_matches(ecu_rx_addrs): """Returns dictionary of brands and matches with ECUs in their FW versions""" brand_addrs = get_brand_addrs() brand_matches = {brand: set() for brand, _ in REQUESTS} brand_rx_offsets = set((brand, r.rx_offset) for brand, r in REQUESTS) for addr, sub_addr, _ in ecu_rx_addrs: # Since we can't know what request an ecu responded to, add matches for all possible rx offsets for brand, rx_offset in brand_rx_offsets: a = (uds.get_rx_addr_for_tx_addr(addr, -rx_offset), sub_addr) if a in brand_addrs[brand]: brand_matches[brand].add(a) return brand_matches def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False): """Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" all_car_fw = [] brand_matches = get_brand_ecu_matches(ecu_rx_addrs) matched_brand: Optional[str] = None for brand in sorted(brand_matches, key=lambda b: len(brand_matches[b]), reverse=True): car_fw = get_fw_versions(logcan, sendcan, query_brand=brand, timeout=timeout, num_pandas=num_pandas, debug=debug, progress=progress) all_car_fw.extend(car_fw) # Try to match using FW returned from this brand only matches = match_fw_to_car_exact(build_fw_dict(car_fw)) if len(matches) == 1: matched_brand = brand break # Do non-OBD queries for matched brand, or all if no match is found params = Params() params.put_bool("FirmwareObdQueryDone", True) cloudlog.warning("Waiting for OBD multiplexing to be disabled") params.get_bool("ObdMultiplexingDisabled", block=True) cloudlog.warning("OBD multiplexing disabled") for brand in FW_QUERY_CONFIGS.keys(): if brand == matched_brand or matched_brand is None: all_car_fw.extend(get_fw_versions(logcan, sendcan, query_brand=brand, timeout=timeout, num_pandas=num_pandas, obd_multiplexed=False, debug=debug, progress=progress)) return all_car_fw def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, obd_multiplexed=True, debug=False, progress=False): versions = VERSIONS.copy() # Each brand can define extra ECUs to query for data collection for brand, config in FW_QUERY_CONFIGS.items(): versions[brand]["debug"] = {ecu: [] for ecu in config.extra_ecus} if query_brand is not None: versions = {query_brand: versions[query_brand]} if extra is not None: versions.update(extra) # Extract ECU addresses to query from fingerprints # ECUs using a subaddress need be queried one by one, the rest can be done in parallel addrs = [] parallel_addrs = [] logging_addrs = [] ecu_types = {} for brand, brand_versions in versions.items(): for candidate, ecu in brand_versions.items(): for ecu_type, addr, sub_addr in ecu.keys(): a = (brand, addr, sub_addr) if a not in ecu_types: ecu_types[a] = ecu_type if a not in logging_addrs and candidate == "debug": logging_addrs.append(a) if sub_addr is None: if a not in parallel_addrs: parallel_addrs.append(a) else: if [a] not in addrs: addrs.append([a]) addrs.insert(0, parallel_addrs) # Get versions and build capnp list to put into CarParams car_fw = [] requests = [(brand, r) for brand, r in REQUESTS if query_brand is None or brand == query_brand] for addr in tqdm(addrs, disable=not progress): for addr_chunk in chunks(addr): for brand, r in requests: # Skip query if no panda available if r.bus > num_pandas * 4 - 1: continue # Or if request is not designated for current multiplexing mode elif r.non_obd == obd_multiplexed: continue try: addrs = [(a, s) for (b, a, s) in addr_chunk if b in (brand, 'any') and (len(r.whitelist_ecus) == 0 or ecu_types[(b, a, s)] in r.whitelist_ecus)] if addrs: query = IsoTpParallelQuery(sendcan, logcan, r.bus, addrs, r.request, r.response, r.rx_offset, debug=debug) for (tx_addr, sub_addr), version in query.get_data(timeout).items(): f = car.CarParams.CarFw.new_message() ecu_key = (brand, tx_addr, sub_addr) f.ecu = ecu_types.get(ecu_key, Ecu.unknown) f.fwVersion = version f.address = tx_addr f.responseAddress = uds.get_rx_addr_for_tx_addr(tx_addr, r.rx_offset) f.request = r.request f.brand = brand f.bus = r.bus f.logging = r.logging or ecu_key in logging_addrs if sub_addr is not None: f.subAddress = sub_addr car_fw.append(f) except Exception: cloudlog.exception("FW query exception") return car_fw if __name__ == "__main__": import time import argparse import cereal.messaging as messaging from selfdrive.car.vin import get_vin parser = argparse.ArgumentParser(description='Get firmware version of ECUs') parser.add_argument('--scan', action='store_true') parser.add_argument('--debug', action='store_true') parser.add_argument('--brand', help='Only query addresses/with requests for this brand') args = parser.parse_args() logcan = messaging.sub_sock('can') pandaStates_sock = messaging.sub_sock('pandaStates') sendcan = messaging.pub_sock('sendcan') extra: Any = None if args.scan: extra = {} # Honda for i in range(256): extra[(Ecu.unknown, 0x18da00f1 + (i << 8), None)] = [] extra[(Ecu.unknown, 0x700 + i, None)] = [] extra[(Ecu.unknown, 0x750, i)] = [] extra = {"any": {"debug": extra}} time.sleep(1.) num_pandas = len(messaging.recv_one_retry(pandaStates_sock).pandaStates) t = time.time() print("Getting vin...") vin_rx_addr, vin = get_vin(logcan, sendcan, 1, retry=10, debug=args.debug) print(f'RX: {hex(vin_rx_addr)}, VIN: {vin}') print(f"Getting VIN took {time.time() - t:.3f} s") print() t = time.time() fw_vers = get_fw_versions(logcan, sendcan, query_brand=args.brand, extra=extra, num_pandas=num_pandas, debug=args.debug, progress=True) _, candidates = match_fw_to_car(fw_vers) print() print("Found FW versions") print("{") padding = max([len(fw.brand) for fw in fw_vers] or [0]) for version in fw_vers: subaddr = None if version.subAddress == 0 else hex(version.subAddress) print(f" Brand: {version.brand:{padding}}, bus: {version.bus} - (Ecu.{version.ecu}, {hex(version.address)}, {subaddr}): [{version.fwVersion}]") print("}") print() print("Possible matches:", candidates) print(f"Getting fw took {time.time() - t:.3f} s")