|
|
|
@ -1,4 +1,5 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
import dataclasses |
|
|
|
|
import os |
|
|
|
|
import time |
|
|
|
|
|
|
|
|
@ -58,8 +59,24 @@ def can_comm_callbacks(logcan: messaging.SubSocket, sendcan: messaging.PubSocket |
|
|
|
|
return can_recv, can_send |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_capnp(CP: CarParams) -> car.CarParams: |
|
|
|
|
CP_dict = dataclasses.asdict(CP) |
|
|
|
|
del CP_dict['lateralTuning'] |
|
|
|
|
CP_capnp = car.CarParams.new_message(**CP_dict) |
|
|
|
|
|
|
|
|
|
# this is the only union, special handling |
|
|
|
|
which = CP.lateralTuning.which() |
|
|
|
|
CP_capnp.lateralTuning.init(which) |
|
|
|
|
lateralTuning_dict = dataclasses.asdict(getattr(CP.lateralTuning, which)) |
|
|
|
|
setattr(CP_capnp.lateralTuning, which, lateralTuning_dict) |
|
|
|
|
|
|
|
|
|
return CP_capnp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Car: |
|
|
|
|
CI: CarInterfaceBase |
|
|
|
|
CP: CarParams |
|
|
|
|
CP_capnp: car.CarParams |
|
|
|
|
|
|
|
|
|
def __init__(self, CI=None) -> None: |
|
|
|
|
self.can_sock = messaging.sub_sock('can', timeout=20) |
|
|
|
@ -126,7 +143,9 @@ class Car: |
|
|
|
|
self.params.put("CarParamsPrevRoute", prev_cp) |
|
|
|
|
|
|
|
|
|
# Write CarParams for controls and radard |
|
|
|
|
cp_bytes = self.CP.to_bytes() |
|
|
|
|
# convert to pycapnp representation for caching and logging |
|
|
|
|
self.CP_capnp = convert_to_capnp(self.CP) |
|
|
|
|
cp_bytes = self.CP_capnp.to_bytes() |
|
|
|
|
self.params.put("CarParams", cp_bytes) |
|
|
|
|
self.params.put_nonblocking("CarParamsCache", cp_bytes) |
|
|
|
|
self.params.put_nonblocking("CarParamsPersistent", cp_bytes) |
|
|
|
@ -176,7 +195,7 @@ class Car: |
|
|
|
|
if self.sm.frame % int(50. / DT_CTRL) == 0: |
|
|
|
|
cp_send = messaging.new_message('carParams') |
|
|
|
|
cp_send.valid = True |
|
|
|
|
cp_send.carParams = self.CP |
|
|
|
|
cp_send.carParams = self.CP_capnp |
|
|
|
|
self.pm.send('carParams', cp_send) |
|
|
|
|
|
|
|
|
|
# publish new carOutput |
|
|
|
|