|
|
|
@ -60,6 +60,24 @@ def can_comm_callbacks(logcan: messaging.SubSocket, sendcan: messaging.PubSocket |
|
|
|
|
return can_recv, can_send |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def asdict(obj) -> dict[str, any]: |
|
|
|
|
"""Note that this function returns references rather than copies where possible""" |
|
|
|
|
|
|
|
|
|
if not dataclasses.is_dataclass(obj): |
|
|
|
|
raise TypeError("asdict() should be called on dataclass instances") |
|
|
|
|
|
|
|
|
|
def _asdict_inner(obj: any) -> dict[str, any]: |
|
|
|
|
if dataclasses.is_dataclass(obj): |
|
|
|
|
ret = {} |
|
|
|
|
for f in dataclasses.fields(obj): |
|
|
|
|
ret[f.name] = _asdict_inner(getattr(obj, f.name)) |
|
|
|
|
return ret |
|
|
|
|
else: |
|
|
|
|
return obj |
|
|
|
|
|
|
|
|
|
return _asdict_inner(obj) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_capnp(struct: structs.CarParams | structs.CarState | structs.CarControl.Actuators) -> capnp.lib.capnp._DynamicStructBuilder: |
|
|
|
|
struct_dict = dataclasses.asdict(struct) |
|
|
|
|
|
|
|
|
@ -74,8 +92,10 @@ def convert_to_capnp(struct: structs.CarParams | structs.CarState | structs.CarC |
|
|
|
|
setattr(struct_capnp.lateralTuning, which, lateralTuning_dict) |
|
|
|
|
elif isinstance(struct, structs.CarState): |
|
|
|
|
struct_capnp = car.CarState.new_message(**struct_dict) |
|
|
|
|
else: |
|
|
|
|
elif isinstance(struct, structs.CarControl.Actuators): |
|
|
|
|
struct_capnp = car.CarControl.Actuators.new_message(**struct_dict) |
|
|
|
|
else: |
|
|
|
|
raise ValueError(f"Unsupported struct type: {type(struct)}") |
|
|
|
|
|
|
|
|
|
return struct_capnp |
|
|
|
|
|
|
|
|
|