|
|
|
@ -1,4 +1,5 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
import capnp |
|
|
|
|
import dataclasses |
|
|
|
|
import os |
|
|
|
|
import time |
|
|
|
@ -15,7 +16,7 @@ from openpilot.common.swaglog import cloudlog, ForwardingHandler |
|
|
|
|
|
|
|
|
|
from openpilot.selfdrive.pandad import can_capnp_to_list, can_list_to_can_capnp |
|
|
|
|
from openpilot.selfdrive.car import DT_CTRL, carlog |
|
|
|
|
from openpilot.selfdrive.car.structs import CarParams |
|
|
|
|
from openpilot.selfdrive.car.structs import CarParams, CarState |
|
|
|
|
from openpilot.selfdrive.car.can_definitions import CanData, CanRecvCallable, CanSendCallable |
|
|
|
|
from openpilot.selfdrive.car.fw_versions import ObdCallback |
|
|
|
|
from openpilot.selfdrive.car.car_helpers import get_car |
|
|
|
@ -59,19 +60,22 @@ def can_comm_callbacks(logcan: messaging.SubSocket, sendcan: messaging.PubSocket |
|
|
|
|
return can_recv, can_send |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_capnp(CP: CarParams) -> car.CarParams: |
|
|
|
|
# TODO: better name or support CarState, CarControl, etc. |
|
|
|
|
CP_dict = dataclasses.asdict(CP) |
|
|
|
|
del CP_dict['lateralTuning'] |
|
|
|
|
CP_capnp = car.CarParams.new_message(**CP_dict) |
|
|
|
|
def convert_to_capnp(struct: CarParams | CarState) -> capnp.lib.capnp._DynamicStructBuilder: |
|
|
|
|
struct_dict = dataclasses.asdict(struct) |
|
|
|
|
|
|
|
|
|
# this is the only union, special handling |
|
|
|
|
which = CP.lateralTuning.which() |
|
|
|
|
CP_capnp.lateralTuning.init(which) |
|
|
|
|
lateralTuning_dict = dataclasses.asdict(getattr(CP.lateralTuning, which)) |
|
|
|
|
setattr(CP_capnp.lateralTuning, which, lateralTuning_dict) |
|
|
|
|
if isinstance(struct, CarParams): |
|
|
|
|
del struct_dict['lateralTuning'] |
|
|
|
|
struct_capnp = car.CarParams.new_message(**struct_dict) |
|
|
|
|
|
|
|
|
|
return CP_capnp |
|
|
|
|
# this is the only union, special handling |
|
|
|
|
which = struct.lateralTuning.which() |
|
|
|
|
struct_capnp.lateralTuning.init(which) |
|
|
|
|
lateralTuning_dict = dataclasses.asdict(getattr(struct.lateralTuning, which)) |
|
|
|
|
setattr(struct_capnp.lateralTuning, which, lateralTuning_dict) |
|
|
|
|
else: |
|
|
|
|
struct_capnp = car.CarState.new_message(**struct_dict) |
|
|
|
|
|
|
|
|
|
return struct_capnp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Car: |
|
|
|
@ -161,7 +165,7 @@ class Car: |
|
|
|
|
|
|
|
|
|
# Update carState from CAN |
|
|
|
|
can_strs = messaging.drain_sock_raw(self.can_sock, wait_for_one=True) |
|
|
|
|
CS = self.CI.update(self.CC_prev, can_capnp_to_list(can_strs)) |
|
|
|
|
CS = convert_to_capnp(self.CI.update(self.CC_prev, can_capnp_to_list(can_strs))) |
|
|
|
|
|
|
|
|
|
self.sm.update(0) |
|
|
|
|
|
|
|
|
|