diff --git a/selfdrive/car/car_helpers.py b/selfdrive/car/car_helpers.py index fc441ac2e..7863f0d88 100644 --- a/selfdrive/car/car_helpers.py +++ b/selfdrive/car/car_helpers.py @@ -1,9 +1,11 @@ import os -from typing import Any, Dict, List +from typing import Dict, List +from cereal import car from common.params import Params from common.basedir import BASEDIR from selfdrive.version import is_comma_remote, is_tested_branch +from selfdrive.car.interfaces import get_interface_attr from selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars from selfdrive.car.vin import get_vin, VIN_UNKNOWN from selfdrive.car.fw_versions import get_fw_versions, match_fw_to_car @@ -11,7 +13,6 @@ from selfdrive.swaglog import cloudlog import cereal.messaging as messaging from selfdrive.car import gen_empty_fingerprint -from cereal import car EventName = car.CarEvent.EventName @@ -59,19 +60,6 @@ def load_interfaces(brand_names): return ret -def get_interface_attr(attr: str) -> Dict[str, Any]: - # returns given attribute from each interface - brand_names = {} - for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]): - try: - brand_name = car_folder.split('/')[-1] - attr_data = getattr(__import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr]), attr, None) - brand_names[brand_name] = attr_data - except (ImportError, OSError): - pass - return brand_names - - def _get_interface_names() -> Dict[str, List[str]]: # returns a dict of brand name and its respective models brand_names = {} diff --git a/selfdrive/car/docs.py b/selfdrive/car/docs.py index fad110689..899f2c087 100755 --- a/selfdrive/car/docs.py +++ b/selfdrive/car/docs.py @@ -15,9 +15,8 @@ from selfdrive.car.tests.routes import non_tested_cars def get_all_footnotes() -> Dict[Enum, int]: all_footnotes = [] - for _, footnotes in get_interface_attr("Footnote").items(): - if footnotes is not None: - all_footnotes += footnotes + for footnotes in get_interface_attr("Footnote", ignore_none=True).values(): + all_footnotes += footnotes return {fn: idx + 1 for idx, fn in enumerate(all_footnotes)} @@ -28,21 +27,20 @@ CARS_MD_TEMPLATE = os.path.join(BASEDIR, "selfdrive", "car", "CARS_template.md") def get_all_car_info() -> List[CarInfo]: all_car_info: List[CarInfo] = [] - for models in get_interface_attr("CAR_INFO").values(): - for model, car_info in models.items(): - # Hyundai exception: those with radar have openpilot longitudinal - fingerprint = {0: {}, 1: {HKG_RADAR_START_ADDR: 8}, 2: {}, 3: {}} - CP = interfaces[model][0].get_params(model, fingerprint=fingerprint, disable_radar=True) + for model, car_info in get_interface_attr("CAR_INFO", combine_brands=True).items(): + # Hyundai exception: those with radar have openpilot longitudinal + fingerprint = {0: {}, 1: {HKG_RADAR_START_ADDR: 8}, 2: {}, 3: {}} + CP = interfaces[model][0].get_params(model, fingerprint=fingerprint, disable_radar=True) - if CP.dashcamOnly or car_info is None: - continue + if CP.dashcamOnly or car_info is None: + continue - # A platform can include multiple car models - if not isinstance(car_info, list): - car_info = (car_info,) + # A platform can include multiple car models + if not isinstance(car_info, list): + car_info = (car_info,) - for _car_info in car_info: - all_car_info.append(_car_info.init(CP, non_tested_cars, ALL_FOOTNOTES)) + for _car_info in car_info: + all_car_info.append(_car_info.init(CP, non_tested_cars, ALL_FOOTNOTES)) # Sort cars by make and model + year sorted_cars: List[CarInfo] = natsorted(all_car_info, key=lambda car: (car.make + car.model).lower()) diff --git a/selfdrive/car/fingerprints.py b/selfdrive/car/fingerprints.py index 9b280f3b4..1a9bb8c4e 100644 --- a/selfdrive/car/fingerprints.py +++ b/selfdrive/car/fingerprints.py @@ -1,44 +1,12 @@ -import os -from common.basedir import BASEDIR +from selfdrive.car.interfaces import get_interface_attr -def get_attr_from_cars(attr, result=dict, combine_brands=True): - # read all the folders in selfdrive/car and return a dict where: - # - keys are all the car models - # - values are attr values from all car folders - result = result() - - for car_folder in [x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]: - try: - car_name = car_folder.split('/')[-1] - values = __import__(f'selfdrive.car.{car_name}.values', fromlist=[attr]) - if hasattr(values, attr): - attr_values = getattr(values, attr) - else: - continue - - if isinstance(attr_values, dict): - for f, v in attr_values.items(): - if combine_brands: - result[f] = v - else: - if car_name not in result: - result[car_name] = {} - result[car_name][f] = v - elif isinstance(attr_values, list): - result += attr_values - - except (ImportError, OSError): - pass - - return result - - -FW_VERSIONS = get_attr_from_cars('FW_VERSIONS') -_FINGERPRINTS = get_attr_from_cars('FINGERPRINTS') +FW_VERSIONS = get_interface_attr('FW_VERSIONS', combine_brands=True, ignore_none=True) +_FINGERPRINTS = get_interface_attr('FINGERPRINTS', combine_brands=True, ignore_none=True) _DEBUG_ADDRESS = {1880: 8} # reserved for debug purposes + def is_valid_for_fingerprint(msg, car_fingerprint): adr = msg.address # ignore addresses that are more than 11 bits diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index 120174c45..4c823f09e 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -1,15 +1,15 @@ #!/usr/bin/env python3 import struct import traceback -from typing import Any, List from collections import defaultdict from dataclasses import dataclass - +from typing import Any, List from tqdm import tqdm import panda.python.uds as uds from cereal import car -from selfdrive.car.fingerprints import FW_VERSIONS, get_attr_from_cars +from selfdrive.car.interfaces import get_interface_attr +from selfdrive.car.fingerprints import FW_VERSIONS from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery from selfdrive.car.toyota.values import CAR as TOYOTA from selfdrive.swaglog import cloudlog @@ -303,7 +303,7 @@ def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progr addrs = [] parallel_addrs = [] - versions = get_attr_from_cars('FW_VERSIONS', combine_brands=False) + versions = get_interface_attr('FW_VERSIONS', ignore_none=True) if extra is not None: versions.update(extra) diff --git a/selfdrive/car/interfaces.py b/selfdrive/car/interfaces.py index 1ee237fe3..86da2b0de 100644 --- a/selfdrive/car/interfaces.py +++ b/selfdrive/car/interfaces.py @@ -1,13 +1,14 @@ import os import time from abc import abstractmethod, ABC -from typing import Dict, Tuple, List +from typing import Any, Dict, Tuple, List from cereal import car +from common.basedir import BASEDIR +from common.conversions import Conversions as CV from common.kalman.simple_kalman import KF1D from common.realtime import DT_CTRL from selfdrive.car import gen_empty_fingerprint -from common.conversions import Conversions as CV from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX from selfdrive.controls.lib.events import Events from selfdrive.controls.lib.vehicle_model import VehicleModel @@ -22,7 +23,6 @@ ACCEL_MIN = -3.5 # generic car and radar interfaces - class CarInterfaceBase(ABC): def __init__(self, CP, CarController, CarState): self.CP = CP @@ -293,3 +293,31 @@ class CarStateBase(ABC): @staticmethod def get_loopback_can_parser(CP): return None + + +# interface-specific helpers + +def get_interface_attr(attr: str, combine_brands: bool = False, ignore_none: bool = False) -> Dict[str, Any]: + # read all the folders in selfdrive/car and return a dict where: + # - keys are all the car models or brand names + # - values are attr values from all car folders + result = {} + for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]): + try: + brand_name = car_folder.split('/')[-1] + brand_values = __import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr]) + if hasattr(brand_values, attr) or not ignore_none: + attr_data = getattr(brand_values, attr, None) + else: + continue + + if combine_brands: + if isinstance(attr_data, dict): + for f, v in attr_data.items(): + result[f] = v + else: + result[brand_name] = attr_data + except (ImportError, OSError): + pass + + return result diff --git a/selfdrive/car/tests/test_docs.py b/selfdrive/car/tests/test_docs.py index fcc90b958..fed6989d4 100755 --- a/selfdrive/car/tests/test_docs.py +++ b/selfdrive/car/tests/test_docs.py @@ -18,7 +18,7 @@ class TestCarDocs(unittest.TestCase): "Run selfdrive/car/docs.py to generate new supported cars documentation") def test_missing_car_info(self): - all_car_info_platforms = [p for i in get_interface_attr("CAR_INFO").values() for p in i] + all_car_info_platforms = get_interface_attr("CAR_INFO", combine_brands=True).keys() for platform in sorted(interfaces.keys()): if platform not in all_car_info_platforms: self.fail("Platform: {} doesn't exist in CarInfo".format(platform))