selfdrive/car: rm make_can_msg (#33231)

* rm make_can_msg

* fix
pull/33232/head
Shane Smiskol 1 year ago committed by GitHub
parent 40e54a3b4f
commit bd4f0cec18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      selfdrive/car/__init__.py
  2. 7
      selfdrive/car/can_definitions.py
  3. 12
      selfdrive/car/gm/gmcan.py
  4. 10
      selfdrive/car/interfaces.py
  5. 9
      selfdrive/car/isotp_parallel_query.py
  6. 4
      selfdrive/car/tests/test_fw_fingerprint.py
  7. 5
      selfdrive/car/toyota/carcontroller.py
  8. 4
      selfdrive/pandad/tests/test_pandad_loopback.py

@ -9,11 +9,10 @@ import capnp
from cereal import car
from panda.python.uds import SERVICE_TYPE
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.docs_definitions import CarDocs
from openpilot.selfdrive.car.helpers import clip, interp
CanMsgType = tuple[int, bytes, int]
# set up logging
carlog = logging.getLogger('carlog')
carlog.setLevel(logging.INFO)
@ -195,10 +194,6 @@ def get_friction(lateral_accel_error: float, lateral_accel_deadzone: float, fric
return friction
def make_can_msg(addr: int, dat: bytes, bus: int) -> CanMsgType:
return addr, dat, bus
def make_tester_present_msg(addr, bus, subaddr=None, suppress_response=False):
dat = [0x02, SERVICE_TYPE.TESTER_PRESENT]
if subaddr is not None:
@ -206,7 +201,7 @@ def make_tester_present_msg(addr, bus, subaddr=None, suppress_response=False):
dat.append(0x80 if suppress_response else 0x0) # sub-function
dat.extend([0x0] * (8 - len(dat)))
return make_can_msg(addr, bytes(dat), bus)
return CanData(addr, bytes(dat), bus)
def get_safety_config(safety_model, safety_param = None):

@ -0,0 +1,7 @@
from typing import NamedTuple
class CanData(NamedTuple):
address: int
dat: bytes
src: int

@ -1,4 +1,4 @@
from openpilot.selfdrive.car import make_can_msg
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.gm.values import CAR
@ -49,7 +49,7 @@ def create_steering_control(packer, bus, apply_steer, idx, lkas_active):
def create_adas_keepalive(bus):
dat = b"\x00\x00\x00\x00\x00\x00\x00"
return [make_can_msg(0x409, dat, bus), make_can_msg(0x40a, dat, bus)]
return [CanData(0x409, dat, bus), CanData(0x40a, dat, bus)]
def create_gas_regen_command(packer, bus, throttle, idx, enabled, at_full_stop):
@ -125,14 +125,14 @@ def create_adas_time_status(bus, tt, idx):
chksum = 0x1000 - dat[0] - dat[1] - dat[2] - dat[3]
chksum = chksum & 0xfff
dat += [0x40 + (chksum >> 8), chksum & 0xff, 0x12]
return make_can_msg(0xa1, bytes(dat), bus)
return CanData(0xa1, bytes(dat), bus)
def create_adas_steering_status(bus, idx):
dat = [idx << 6, 0xf0, 0x20, 0, 0, 0]
chksum = 0x60 + sum(dat)
dat += [chksum >> 8, chksum & 0xff]
return make_can_msg(0x306, bytes(dat), bus)
return CanData(0x306, bytes(dat), bus)
def create_adas_accelerometer_speed_status(bus, speed_ms, idx):
@ -146,7 +146,7 @@ def create_adas_accelerometer_speed_status(bus, speed_ms, idx):
dat = [0x08, spd >> 4, ((spd & 0xf) << 4) | (accel >> 8), accel & 0xff, 0]
chksum = 0x62 + far_range_mode + (idx << 2) + dat[0] + dat[1] + dat[2] + dat[3] + dat[4]
dat += [(idx << 5) + (far_range_mode << 4) + (near_range_mode << 3) + (chksum >> 8), chksum & 0xff]
return make_can_msg(0x308, bytes(dat), bus)
return CanData(0x308, bytes(dat), bus)
def create_adas_headlights_status(packer, bus):
@ -170,4 +170,4 @@ def create_lka_icon_command(bus, active, critical, steer):
dat = b"\x40\x40\x18"
else:
dat = b"\x00\x00\x00"
return make_can_msg(0x104c006c, dat, bus)
return CanData(0x104c006c, dat, bus)

@ -11,8 +11,8 @@ from functools import cache
from cereal import car
from openpilot.common.basedir import BASEDIR
from openpilot.common.simple_kalman import KF1D, get_kalman_gain
from openpilot.selfdrive.car import CanMsgType, DT_CTRL, apply_hysteresis, gen_empty_fingerprint, scale_rot_inertia, scale_tire_stiffness, get_friction, \
STD_CARGO_KG
from openpilot.selfdrive.car import DT_CTRL, apply_hysteresis, gen_empty_fingerprint, scale_rot_inertia, scale_tire_stiffness, get_friction, STD_CARGO_KG
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.conversions import Conversions as CV
from openpilot.selfdrive.car.helpers import clip
from openpilot.selfdrive.car.values import PLATFORMS
@ -108,7 +108,7 @@ class CarInterfaceBase(ABC):
dbc_name = "" if self.cp is None else self.cp.dbc_name
self.CC: CarControllerBase = CarController(dbc_name, CP)
def apply(self, c: car.CarControl, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanMsgType]]:
def apply(self, c: car.CarControl, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanData]]:
return self.CC.update(c, self.CS, now_nanos)
@staticmethod
@ -228,7 +228,7 @@ class CarInterfaceBase(ABC):
def _update(self, c: car.CarControl) -> car.CarState:
pass
def update(self, c: car.CarControl, can_packets: list[tuple[int, list[CanMsgType]]]) -> car.CarState:
def update(self, c: car.CarControl, can_packets: list[tuple[int, list[CanData]]]) -> car.CarState:
# parse can
for cp in self.can_parsers:
if cp is not None:
@ -467,7 +467,7 @@ class CarControllerBase(ABC):
self.frame = 0
@abstractmethod
def update(self, CC: car.CarControl.Actuators, CS: car.CarState, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanMsgType]]:
def update(self, CC: car.CarControl.Actuators, CS: car.CarState, now_nanos: int) -> tuple[car.CarControl.Actuators, list[CanData]]:
pass

@ -3,7 +3,8 @@ from collections import defaultdict
from functools import partial
import cereal.messaging as messaging
from openpilot.selfdrive.car import carlog, CanMsgType
from openpilot.selfdrive.car import carlog
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.fw_query_definitions import AddrType
from openpilot.selfdrive.pandad import can_list_to_can_capnp
from panda.python.uds import CanClient, IsoTpMessage, FUNCTIONAL_ADDRS, get_rx_addr_for_tx_addr
@ -27,16 +28,16 @@ class IsoTpParallelQuery:
assert tx_addr not in FUNCTIONAL_ADDRS, f"Functional address should be defined in functional_addrs: {hex(tx_addr)}"
self.msg_addrs = {tx_addr: get_rx_addr_for_tx_addr(tx_addr[0], rx_offset=response_offset) for tx_addr in real_addrs}
self.msg_buffer: dict[int, list[CanMsgType]] = defaultdict(list)
self.msg_buffer: dict[int, list[CanData]] = defaultdict(list)
def rx(self):
def rx(self) -> None:
"""Drain can socket and sort messages into buffers based on address"""
can_packets = messaging.drain_sock(self.logcan, wait_for_one=True)
for packet in can_packets:
for msg in packet.can:
if msg.src == self.bus and msg.address in self.msg_addrs.values():
self.msg_buffer[msg.address].append((msg.address, msg.dat, msg.src))
self.msg_buffer[msg.address].append(CanData(msg.address, msg.dat, msg.src))
def _can_tx(self, tx_addr, dat, bus):
"""Helper function to send single message"""

@ -5,7 +5,7 @@ from collections import defaultdict
from parameterized import parameterized
from cereal import car
from openpilot.selfdrive.car import make_can_msg
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.car_helpers import interfaces
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
from openpilot.selfdrive.car.fw_versions import ESSENTIAL_ECUS, FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \
@ -21,7 +21,7 @@ ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()}
class FakeSocket:
def receive(self, non_blocking=False):
return (can_list_to_can_capnp([make_can_msg(random.randint(0x600, 0x800), b'\x00' * 8, 0)])
return (can_list_to_can_capnp([CanData(random.randint(0x600, 0x800), b'\x00' * 8, 0)])
if random.uniform(0, 1) > 0.5 else None)
def send(self, msg):

@ -1,5 +1,6 @@
from cereal import car
from openpilot.selfdrive.car import apply_meas_steer_torque_limits, apply_std_steer_angle_limits, common_fault_avoidance, make_can_msg, make_tester_present_msg
from openpilot.selfdrive.car import apply_meas_steer_torque_limits, apply_std_steer_angle_limits, common_fault_avoidance, make_tester_present_msg
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.selfdrive.car.helpers import clip
from openpilot.selfdrive.car.interfaces import CarControllerBase
from openpilot.selfdrive.car.toyota import toyotacan
@ -161,7 +162,7 @@ class CarController(CarControllerBase):
# *** static msgs ***
for addr, cars, bus, fr_step, vl in STATIC_DSU_MSGS:
if self.frame % fr_step == 0 and self.CP.enableDsu and self.CP.carFingerprint in cars:
can_sends.append(make_can_msg(addr, vl, bus))
can_sends.append(CanData(addr, vl, bus))
# keep radar disabled
if self.frame % 20 == 0 and self.CP.flags & ToyotaFlags.DISABLE_RADAR.value:

@ -12,7 +12,7 @@ from openpilot.common.retry import retry
from openpilot.common.params import Params
from openpilot.common.timeout import Timeout
from openpilot.selfdrive.pandad import can_list_to_can_capnp
from openpilot.selfdrive.car import make_can_msg
from openpilot.selfdrive.car.can_definitions import CanData
from openpilot.system.hardware import TICI
from openpilot.selfdrive.test.helpers import phone_only, with_processes
@ -60,7 +60,7 @@ def send_random_can_messages(sendcan, count, num_pandas=1):
if (addr, dat) in sent_msgs[bus]:
continue
sent_msgs[bus].add((addr, dat))
to_send.append(make_can_msg(addr, dat, bus))
to_send.append(CanData(addr, dat, bus))
sendcan.send(can_list_to_can_capnp(to_send, msgtype='sendcan'))
return sent_msgs

Loading…
Cancel
Save