move to can_definitions

pull/33215/head
Shane Smiskol 9 months ago
parent c212974873
commit 8dc8b291ec
  1. 5
      selfdrive/car/__init__.py
  2. 3
      selfdrive/car/can_definitions.py
  3. 11
      selfdrive/car/interfaces.py
  4. 6
      selfdrive/car/isotp_parallel_query.py

@ -9,11 +9,10 @@ import capnp
from cereal import car from cereal import car
from panda.python.uds import SERVICE_TYPE from panda.python.uds import SERVICE_TYPE
from openpilot.selfdrive.car.can_definitions import CanMsg
from openpilot.selfdrive.car.docs_definitions import CarDocs from openpilot.selfdrive.car.docs_definitions import CarDocs
from openpilot.selfdrive.car.helpers import clip, interp from openpilot.selfdrive.car.helpers import clip, interp
CanMsgType = tuple[int, bytes, int]
# set up logging # set up logging
carlog = logging.getLogger('carlog') carlog = logging.getLogger('carlog')
carlog.setLevel(logging.INFO) carlog.setLevel(logging.INFO)
@ -195,7 +194,7 @@ def get_friction(lateral_accel_error: float, lateral_accel_deadzone: float, fric
return friction return friction
def make_can_msg(addr: int, dat: bytes, bus: int) -> CanMsgType: def make_can_msg(addr: int, dat: bytes, bus: int) -> CanMsg:
return addr, dat, bus return addr, dat, bus

@ -1,7 +1,8 @@
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
CanSendCallable = Callable[[list[tuple[int, bytes, int]]], None] CanMsg = tuple[int, bytes, int]
CanSendCallable = Callable[[list[CanMsg]], None]
@dataclass @dataclass

@ -12,9 +12,8 @@ from functools import cache
from cereal import car from cereal import car
from openpilot.common.basedir import BASEDIR from openpilot.common.basedir import BASEDIR
from openpilot.common.simple_kalman import KF1D, get_kalman_gain from openpilot.common.simple_kalman import KF1D, get_kalman_gain
from openpilot.selfdrive.car import CanMsgType, DT_CTRL, apply_hysteresis, gen_empty_fingerprint, scale_rot_inertia, scale_tire_stiffness, get_friction, \ from openpilot.selfdrive.car import DT_CTRL, apply_hysteresis, gen_empty_fingerprint, scale_rot_inertia, scale_tire_stiffness, get_friction, STD_CARGO_KG
STD_CARGO_KG from openpilot.selfdrive.car.can_definitions import CanMsg, CanSendCallable
from openpilot.selfdrive.car.can_definitions import CanSendCallable
from openpilot.selfdrive.car.conversions import Conversions as CV from openpilot.selfdrive.car.conversions import Conversions as CV
from openpilot.selfdrive.car.helpers import clip from openpilot.selfdrive.car.helpers import clip
from openpilot.selfdrive.car.values import PLATFORMS from openpilot.selfdrive.car.values import PLATFORMS
@ -110,7 +109,7 @@ class CarInterfaceBase(ABC):
dbc_name = "" if self.cp is None else self.cp.dbc_name dbc_name = "" if self.cp is None else self.cp.dbc_name
self.CC: CarControllerBase = CarController(dbc_name, CP) self.CC: CarControllerBase = CarController(dbc_name, CP)
def apply(self, c: car.CarControl, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanMsgType]]: def apply(self, c: car.CarControl, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanMsg]]:
return self.CC.update(c, self.CS, now_nanos) return self.CC.update(c, self.CS, now_nanos)
@staticmethod @staticmethod
@ -230,7 +229,7 @@ class CarInterfaceBase(ABC):
def _update(self, c: car.CarControl) -> car.CarState: def _update(self, c: car.CarControl) -> car.CarState:
pass pass
def update(self, c: car.CarControl, can_packets: list[tuple[int, list[CanMsgType]]]) -> car.CarState: def update(self, c: car.CarControl, can_packets: list[tuple[int, list[CanMsg]]]) -> car.CarState:
# parse can # parse can
for cp in self.can_parsers: for cp in self.can_parsers:
if cp is not None: if cp is not None:
@ -469,7 +468,7 @@ class CarControllerBase(ABC):
self.frame = 0 self.frame = 0
@abstractmethod @abstractmethod
def update(self, CC: car.CarControl.Actuators, CS: car.CarState, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanMsgType]]: def update(self, CC: car.CarControl.Actuators, CS: car.CarState, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanMsg]]:
pass pass

@ -3,8 +3,8 @@ from collections import defaultdict
from functools import partial from functools import partial
from types import SimpleNamespace from types import SimpleNamespace
from openpilot.selfdrive.car import carlog, CanMsgType from openpilot.selfdrive.car import carlog
from openpilot.selfdrive.car.can_definitions import CanSendCallable from openpilot.selfdrive.car.can_definitions import CanMsg, CanSendCallable
from openpilot.selfdrive.car.fw_query_definitions import AddrType from openpilot.selfdrive.car.fw_query_definitions import AddrType
from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_addr_for_tx_addr from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_addr_for_tx_addr
@ -27,7 +27,7 @@ class IsoTpParallelQuery:
assert tx_addr not in FUNCTIONAL_ADDRS, f"Functional address should be defined in functional_addrs: {hex(tx_addr)}" assert tx_addr not in FUNCTIONAL_ADDRS, f"Functional address should be defined in functional_addrs: {hex(tx_addr)}"
self.msg_addrs = {tx_addr: get_rx_addr_for_tx_addr(tx_addr[0], rx_offset=response_offset) for tx_addr in real_addrs} self.msg_addrs = {tx_addr: get_rx_addr_for_tx_addr(tx_addr[0], rx_offset=response_offset) for tx_addr in real_addrs}
self.msg_buffer: dict[int, list[CanMsgType]] = defaultdict(list) self.msg_buffer: dict[int, list[CanMsg]] = defaultdict(list)
def rx(self): def rx(self):
"""Drain can socket and sort messages into buffers based on address""" """Drain can socket and sort messages into buffers based on address"""

Loading…
Cancel
Save