From 6d392a2e7bca7fa69e5bca990b45ce90f988a1ca Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Mon, 26 Aug 2024 11:52:41 -0700 Subject: [PATCH] car helpers file (#33379) * helpers file * clean up --- selfdrive/car/card.py | 69 +------------------------------------- selfdrive/car/helpers.py | 72 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 68 deletions(-) create mode 100644 selfdrive/car/helpers.py diff --git a/selfdrive/car/card.py b/selfdrive/car/card.py index 1ef7a44eff..4a205f15c4 100755 --- a/selfdrive/car/card.py +++ b/selfdrive/car/card.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 -import capnp import os import time -from typing import Any import cereal.messaging as messaging @@ -21,10 +19,10 @@ from opendbc.car.car_helpers import get_car from opendbc.car.interfaces import CarInterfaceBase from openpilot.selfdrive.pandad import can_capnp_to_list, can_list_to_can_capnp from openpilot.selfdrive.car.car_specific import CarSpecificEvents, MockCarState +from openpilot.selfdrive.car.helpers import convert_carControl, convert_to_capnp from openpilot.selfdrive.controls.lib.events import Events REPLAY = "REPLAY" in os.environ -_FIELDS = '__dataclass_fields__' # copy of dataclasses._FIELDS EventName = car.CarEvent.EventName @@ -61,71 +59,6 @@ def can_comm_callbacks(logcan: messaging.SubSocket, sendcan: messaging.PubSocket return can_recv, can_send -def is_dataclass(obj): - """Similar to dataclasses.is_dataclass without instance type check checking""" - return hasattr(obj, _FIELDS) - - -def _asdictref_inner(obj) -> dict[str, Any] | Any: - if is_dataclass(obj): - ret = {} - for field in getattr(obj, _FIELDS): # similar to dataclasses.fields() - ret[field] = _asdictref_inner(getattr(obj, field)) - return ret - elif isinstance(obj, (tuple, list)): - return type(obj)(_asdictref_inner(v) for v in obj) - else: - return obj - - -def asdictref(obj) -> dict[str, Any]: - """ - Similar to dataclasses.asdict without recursive type checking and copy.deepcopy - Note that the resulting dict will contain references to the original struct as a result - """ - if not is_dataclass(obj): - raise TypeError("asdictref() should be called on dataclass instances") - - return _asdictref_inner(obj) - - -def convert_to_capnp(struct: structs.CarParams | structs.CarState | structs.CarControl.Actuators) -> capnp.lib.capnp._DynamicStructBuilder: - struct_dict = asdictref(struct) - - if isinstance(struct, structs.CarParams): - del struct_dict['lateralTuning'] - struct_capnp = car.CarParams.new_message(**struct_dict) - - # this is the only union, special handling - which = struct.lateralTuning.which() - struct_capnp.lateralTuning.init(which) - lateralTuning_dict = asdictref(getattr(struct.lateralTuning, which)) - setattr(struct_capnp.lateralTuning, which, lateralTuning_dict) - elif isinstance(struct, structs.CarState): - struct_capnp = car.CarState.new_message(**struct_dict) - elif isinstance(struct, structs.CarControl.Actuators): - struct_capnp = car.CarControl.Actuators.new_message(**struct_dict) - else: - raise ValueError(f"Unsupported struct type: {type(struct)}") - - return struct_capnp - - -def convert_carControl(struct: capnp.lib.capnp._DynamicStructReader) -> structs.CarControl: - # TODO: recursively handle any car struct as needed - def remove_deprecated(s: dict) -> dict: - return {k: v for k, v in s.items() if not k.endswith('DEPRECATED')} - - struct_dict = struct.to_dict() - struct_dataclass = structs.CarControl(**remove_deprecated({k: v for k, v in struct_dict.items() if not isinstance(k, dict)})) - - struct_dataclass.actuators = structs.CarControl.Actuators(**remove_deprecated(struct_dict.get('actuators', {}))) - struct_dataclass.cruiseControl = structs.CarControl.CruiseControl(**remove_deprecated(struct_dict.get('cruiseControl', {}))) - struct_dataclass.hudControl = structs.CarControl.HUDControl(**remove_deprecated(struct_dict.get('hudControl', {}))) - - return struct_dataclass - - class Car: CI: CarInterfaceBase CP: structs.CarParams diff --git a/selfdrive/car/helpers.py b/selfdrive/car/helpers.py new file mode 100644 index 0000000000..f4d8993468 --- /dev/null +++ b/selfdrive/car/helpers.py @@ -0,0 +1,72 @@ +import capnp +from typing import Any + +from cereal import car +from opendbc.car import structs + +_FIELDS = '__dataclass_fields__' # copy of dataclasses._FIELDS + + +def is_dataclass(obj): + """Similar to dataclasses.is_dataclass without instance type check checking""" + return hasattr(obj, _FIELDS) + + +def _asdictref_inner(obj) -> dict[str, Any] | Any: + if is_dataclass(obj): + ret = {} + for field in getattr(obj, _FIELDS): # similar to dataclasses.fields() + ret[field] = _asdictref_inner(getattr(obj, field)) + return ret + elif isinstance(obj, (tuple, list)): + return type(obj)(_asdictref_inner(v) for v in obj) + else: + return obj + + +def asdictref(obj) -> dict[str, Any]: + """ + Similar to dataclasses.asdict without recursive type checking and copy.deepcopy + Note that the resulting dict will contain references to the original struct as a result + """ + if not is_dataclass(obj): + raise TypeError("asdictref() should be called on dataclass instances") + + return _asdictref_inner(obj) + + +def convert_to_capnp(struct: structs.CarParams | structs.CarState | structs.CarControl.Actuators) -> capnp.lib.capnp._DynamicStructBuilder: + struct_dict = asdictref(struct) + + if isinstance(struct, structs.CarParams): + del struct_dict['lateralTuning'] + struct_capnp = car.CarParams.new_message(**struct_dict) + + # this is the only union, special handling + which = struct.lateralTuning.which() + struct_capnp.lateralTuning.init(which) + lateralTuning_dict = asdictref(getattr(struct.lateralTuning, which)) + setattr(struct_capnp.lateralTuning, which, lateralTuning_dict) + elif isinstance(struct, structs.CarState): + struct_capnp = car.CarState.new_message(**struct_dict) + elif isinstance(struct, structs.CarControl.Actuators): + struct_capnp = car.CarControl.Actuators.new_message(**struct_dict) + else: + raise ValueError(f"Unsupported struct type: {type(struct)}") + + return struct_capnp + + +def convert_carControl(struct: capnp.lib.capnp._DynamicStructReader) -> structs.CarControl: + # TODO: recursively handle any car struct as needed + def remove_deprecated(s: dict) -> dict: + return {k: v for k, v in s.items() if not k.endswith('DEPRECATED')} + + struct_dict = struct.to_dict() + struct_dataclass = structs.CarControl(**remove_deprecated({k: v for k, v in struct_dict.items() if not isinstance(k, dict)})) + + struct_dataclass.actuators = structs.CarControl.Actuators(**remove_deprecated(struct_dict.get('actuators', {}))) + struct_dataclass.cruiseControl = structs.CarControl.CruiseControl(**remove_deprecated(struct_dict.get('cruiseControl', {}))) + struct_dataclass.hudControl = structs.CarControl.HUDControl(**remove_deprecated(struct_dict.get('hudControl', {}))) + + return struct_dataclass