import os import time from cereal import car from openpilot.selfdrive.car import carlog, gen_empty_fingerprint from openpilot.selfdrive.car.can_definitions import CanRecvCallable, CanSendCallable from openpilot.selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars from openpilot.selfdrive.car.fw_versions import ObdCallback, get_fw_versions_ordered, get_present_ecus, match_fw_to_car from openpilot.selfdrive.car.interfaces import get_interface_attr from openpilot.selfdrive.car.mock.values import CAR as MOCK from openpilot.selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN FRAME_FINGERPRINT = 100 # 1s def load_interfaces(brand_names): ret = {} for brand_name in brand_names: path = f'openpilot.selfdrive.car.{brand_name}' CarInterface = __import__(path + '.interface', fromlist=['CarInterface']).CarInterface CarState = __import__(path + '.carstate', fromlist=['CarState']).CarState CarController = __import__(path + '.carcontroller', fromlist=['CarController']).CarController for model_name in brand_names[brand_name]: ret[model_name] = (CarInterface, CarController, CarState) return ret def _get_interface_names() -> dict[str, list[str]]: # returns a dict of brand name and its respective models brand_names = {} for brand_name, brand_models in get_interface_attr("CAR").items(): brand_names[brand_name] = [model.value for model in brand_models] return brand_names # imports from directory selfdrive/car// interface_names = _get_interface_names() interfaces = load_interfaces(interface_names) def can_fingerprint(can_recv: CanRecvCallable) -> tuple[str | None, dict[int, dict]]: finger = gen_empty_fingerprint() candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1 frame = 0 car_fingerprint = None done = False while not done: # can_recv(wait_for_one=True) may return zero or multiple packets, so we increment frame for each one we receive can_packets = can_recv(wait_for_one=True) for can_packet in can_packets: for can in can_packet: # The fingerprint dict is generated for all buses, this way the car interface # can use it to detect a (valid) multipanda setup and initialize accordingly if can.src < 128: if can.src not in finger: finger[can.src] = {} finger[can.src][can.address] = len(can.dat) for b in candidate_cars: # Ignore extended messages and VIN query response. if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8): candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b]) # if we only have one car choice and the time since we got our first # message has elapsed, exit for b in candidate_cars: if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT: # fingerprint done car_fingerprint = candidate_cars[b][0] # bail if no cars left or we've been waiting for more than 2s failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200 succeeded = car_fingerprint is not None done = failed or succeeded frame += 1 return car_fingerprint, finger # **** for use live only **** def fingerprint(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, num_pandas: int, cached_params: type[car.CarParams] | None) -> tuple[str | None, dict, str, list, int, bool]: fixed_fingerprint = os.environ.get('FINGERPRINT', "") skip_fw_query = os.environ.get('SKIP_FW_QUERY', False) disable_fw_cache = os.environ.get('DISABLE_FW_CACHE', False) ecu_rx_addrs = set() start_time = time.monotonic() if not skip_fw_query: if cached_params is not None and cached_params.carName != "mock" and len(cached_params.carFw) > 0 and \ cached_params.carVin is not VIN_UNKNOWN and not disable_fw_cache: carlog.warning("Using cached CarParams") vin_rx_addr, vin_rx_bus, vin = -1, -1, cached_params.carVin car_fw = list(cached_params.carFw) cached = True else: carlog.warning("Getting VIN & FW versions") # enable OBD multiplexing for VIN query # NOTE: this takes ~0.1s and is relied on to allow sendcan subscriber to connect in time set_obd_multiplexing(True) # VIN query only reliably works through OBDII vin_rx_addr, vin_rx_bus, vin = get_vin(can_recv, can_send, (0, 1)) ecu_rx_addrs = get_present_ecus(can_recv, can_send, set_obd_multiplexing, num_pandas=num_pandas) car_fw = get_fw_versions_ordered(can_recv, can_send, set_obd_multiplexing, vin, ecu_rx_addrs, num_pandas=num_pandas) cached = False exact_fw_match, fw_candidates = match_fw_to_car(car_fw, vin) else: vin_rx_addr, vin_rx_bus, vin = -1, -1, VIN_UNKNOWN exact_fw_match, fw_candidates, car_fw = True, set(), [] cached = False if not is_valid_vin(vin): carlog.error({"event": "Malformed VIN", "vin": vin}) vin = VIN_UNKNOWN carlog.warning("VIN %s", vin) # disable OBD multiplexing for CAN fingerprinting and potential ECU knockouts set_obd_multiplexing(False) fw_query_time = time.monotonic() - start_time # CAN fingerprint # drain CAN socket so we get the latest messages can_recv() car_fingerprint, finger = can_fingerprint(can_recv) exact_match = True source = car.CarParams.FingerprintSource.can # If FW query returns exactly 1 candidate, use it if len(fw_candidates) == 1: car_fingerprint = list(fw_candidates)[0] source = car.CarParams.FingerprintSource.fw exact_match = exact_fw_match if fixed_fingerprint: car_fingerprint = fixed_fingerprint source = car.CarParams.FingerprintSource.fixed carlog.error({"event": "fingerprinted", "car_fingerprint": str(car_fingerprint), "source": source, "fuzzy": not exact_match, "cached": cached, "fw_count": len(car_fw), "ecu_responses": list(ecu_rx_addrs), "vin_rx_addr": vin_rx_addr, "vin_rx_bus": vin_rx_bus, "fingerprints": repr(finger), "fw_query_time": fw_query_time}) return car_fingerprint, finger, vin, car_fw, source, exact_match def get_car_interface(CP): CarInterface, CarController, CarState = interfaces[CP.carFingerprint] return CarInterface(CP, CarController, CarState) def get_car(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, experimental_long_allowed: bool, num_pandas: int = 1, cached_params: type[car.CarParams] | None = None): candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(can_recv, can_send, set_obd_multiplexing, num_pandas, cached_params) if candidate is None: carlog.error({"event": "car doesn't match any fingerprints", "fingerprints": repr(fingerprints)}) candidate = "MOCK" CarInterface, _, _ = interfaces[candidate] CP = CarInterface.get_params(candidate, fingerprints, car_fw, experimental_long_allowed, docs=False) CP.carVin = vin CP.carFw = car_fw CP.fingerprintSource = source CP.fuzzyFingerprint = not exact_match return get_car_interface(CP) def get_demo_car_params(): platform = MOCK.MOCK CarInterface, _, _ = interfaces[platform] CP = CarInterface.get_non_essential_params(platform) return CP