Common interface attribute function (#24731)

* replace get_attr_from_cars with get_interface_attr

* and not combining the brands

* explicit check

* minimize diff

* values
old-commit-hash: 3fbbb7f4b0
taco
Shane Smiskol 3 years ago committed by GitHub
parent f1f5924a71
commit c0c0250077
  1. 18
      selfdrive/car/car_helpers.py
  2. 28
      selfdrive/car/docs.py
  3. 40
      selfdrive/car/fingerprints.py
  4. 8
      selfdrive/car/fw_versions.py
  5. 34
      selfdrive/car/interfaces.py
  6. 2
      selfdrive/car/tests/test_docs.py

@ -1,9 +1,11 @@
import os
from typing import Any, Dict, List
from typing import Dict, List
from cereal import car
from common.params import Params
from common.basedir import BASEDIR
from selfdrive.version import is_comma_remote, is_tested_branch
from selfdrive.car.interfaces import get_interface_attr
from selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
from selfdrive.car.vin import get_vin, VIN_UNKNOWN
from selfdrive.car.fw_versions import get_fw_versions, match_fw_to_car
@ -11,7 +13,6 @@ from selfdrive.swaglog import cloudlog
import cereal.messaging as messaging
from selfdrive.car import gen_empty_fingerprint
from cereal import car
EventName = car.CarEvent.EventName
@ -59,19 +60,6 @@ def load_interfaces(brand_names):
return ret
def get_interface_attr(attr: str) -> Dict[str, Any]:
# returns given attribute from each interface
brand_names = {}
for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]):
try:
brand_name = car_folder.split('/')[-1]
attr_data = getattr(__import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr]), attr, None)
brand_names[brand_name] = attr_data
except (ImportError, OSError):
pass
return brand_names
def _get_interface_names() -> Dict[str, List[str]]:
# returns a dict of brand name and its respective models
brand_names = {}

@ -15,9 +15,8 @@ from selfdrive.car.tests.routes import non_tested_cars
def get_all_footnotes() -> Dict[Enum, int]:
all_footnotes = []
for _, footnotes in get_interface_attr("Footnote").items():
if footnotes is not None:
all_footnotes += footnotes
for footnotes in get_interface_attr("Footnote", ignore_none=True).values():
all_footnotes += footnotes
return {fn: idx + 1 for idx, fn in enumerate(all_footnotes)}
@ -28,21 +27,20 @@ CARS_MD_TEMPLATE = os.path.join(BASEDIR, "selfdrive", "car", "CARS_template.md")
def get_all_car_info() -> List[CarInfo]:
all_car_info: List[CarInfo] = []
for models in get_interface_attr("CAR_INFO").values():
for model, car_info in models.items():
# Hyundai exception: those with radar have openpilot longitudinal
fingerprint = {0: {}, 1: {HKG_RADAR_START_ADDR: 8}, 2: {}, 3: {}}
CP = interfaces[model][0].get_params(model, fingerprint=fingerprint, disable_radar=True)
for model, car_info in get_interface_attr("CAR_INFO", combine_brands=True).items():
# Hyundai exception: those with radar have openpilot longitudinal
fingerprint = {0: {}, 1: {HKG_RADAR_START_ADDR: 8}, 2: {}, 3: {}}
CP = interfaces[model][0].get_params(model, fingerprint=fingerprint, disable_radar=True)
if CP.dashcamOnly or car_info is None:
continue
if CP.dashcamOnly or car_info is None:
continue
# A platform can include multiple car models
if not isinstance(car_info, list):
car_info = (car_info,)
# A platform can include multiple car models
if not isinstance(car_info, list):
car_info = (car_info,)
for _car_info in car_info:
all_car_info.append(_car_info.init(CP, non_tested_cars, ALL_FOOTNOTES))
for _car_info in car_info:
all_car_info.append(_car_info.init(CP, non_tested_cars, ALL_FOOTNOTES))
# Sort cars by make and model + year
sorted_cars: List[CarInfo] = natsorted(all_car_info, key=lambda car: (car.make + car.model).lower())

@ -1,44 +1,12 @@
import os
from common.basedir import BASEDIR
from selfdrive.car.interfaces import get_interface_attr
def get_attr_from_cars(attr, result=dict, combine_brands=True):
# read all the folders in selfdrive/car and return a dict where:
# - keys are all the car models
# - values are attr values from all car folders
result = result()
for car_folder in [x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]:
try:
car_name = car_folder.split('/')[-1]
values = __import__(f'selfdrive.car.{car_name}.values', fromlist=[attr])
if hasattr(values, attr):
attr_values = getattr(values, attr)
else:
continue
if isinstance(attr_values, dict):
for f, v in attr_values.items():
if combine_brands:
result[f] = v
else:
if car_name not in result:
result[car_name] = {}
result[car_name][f] = v
elif isinstance(attr_values, list):
result += attr_values
except (ImportError, OSError):
pass
return result
FW_VERSIONS = get_attr_from_cars('FW_VERSIONS')
_FINGERPRINTS = get_attr_from_cars('FINGERPRINTS')
FW_VERSIONS = get_interface_attr('FW_VERSIONS', combine_brands=True, ignore_none=True)
_FINGERPRINTS = get_interface_attr('FINGERPRINTS', combine_brands=True, ignore_none=True)
_DEBUG_ADDRESS = {1880: 8} # reserved for debug purposes
def is_valid_for_fingerprint(msg, car_fingerprint):
adr = msg.address
# ignore addresses that are more than 11 bits

@ -1,15 +1,15 @@
#!/usr/bin/env python3
import struct
import traceback
from typing import Any, List
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, List
from tqdm import tqdm
import panda.python.uds as uds
from cereal import car
from selfdrive.car.fingerprints import FW_VERSIONS, get_attr_from_cars
from selfdrive.car.interfaces import get_interface_attr
from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
from selfdrive.car.toyota.values import CAR as TOYOTA
from selfdrive.swaglog import cloudlog
@ -303,7 +303,7 @@ def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progr
addrs = []
parallel_addrs = []
versions = get_attr_from_cars('FW_VERSIONS', combine_brands=False)
versions = get_interface_attr('FW_VERSIONS', ignore_none=True)
if extra is not None:
versions.update(extra)

@ -1,13 +1,14 @@
import os
import time
from abc import abstractmethod, ABC
from typing import Dict, Tuple, List
from typing import Any, Dict, Tuple, List
from cereal import car
from common.basedir import BASEDIR
from common.conversions import Conversions as CV
from common.kalman.simple_kalman import KF1D
from common.realtime import DT_CTRL
from selfdrive.car import gen_empty_fingerprint
from common.conversions import Conversions as CV
from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX
from selfdrive.controls.lib.events import Events
from selfdrive.controls.lib.vehicle_model import VehicleModel
@ -22,7 +23,6 @@ ACCEL_MIN = -3.5
# generic car and radar interfaces
class CarInterfaceBase(ABC):
def __init__(self, CP, CarController, CarState):
self.CP = CP
@ -293,3 +293,31 @@ class CarStateBase(ABC):
@staticmethod
def get_loopback_can_parser(CP):
return None
# interface-specific helpers
def get_interface_attr(attr: str, combine_brands: bool = False, ignore_none: bool = False) -> Dict[str, Any]:
# read all the folders in selfdrive/car and return a dict where:
# - keys are all the car models or brand names
# - values are attr values from all car folders
result = {}
for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]):
try:
brand_name = car_folder.split('/')[-1]
brand_values = __import__(f'selfdrive.car.{brand_name}.values', fromlist=[attr])
if hasattr(brand_values, attr) or not ignore_none:
attr_data = getattr(brand_values, attr, None)
else:
continue
if combine_brands:
if isinstance(attr_data, dict):
for f, v in attr_data.items():
result[f] = v
else:
result[brand_name] = attr_data
except (ImportError, OSError):
pass
return result

@ -18,7 +18,7 @@ class TestCarDocs(unittest.TestCase):
"Run selfdrive/car/docs.py to generate new supported cars documentation")
def test_missing_car_info(self):
all_car_info_platforms = [p for i in get_interface_attr("CAR_INFO").values() for p in i]
all_car_info_platforms = get_interface_attr("CAR_INFO", combine_brands=True).keys()
for platform in sorted(interfaces.keys()):
if platform not in all_car_info_platforms:
self.fail("Platform: {} doesn't exist in CarInfo".format(platform))

Loading…
Cancel
Save