|
|
|
@ -3,11 +3,22 @@ import capnp |
|
|
|
|
import numpy as np |
|
|
|
|
from cereal import log |
|
|
|
|
from openpilot.selfdrive.modeld.constants import ModelConstants, Plan, Meta |
|
|
|
|
from openpilot.selfdrive.controls.lib.drive_helpers import MIN_SPEED |
|
|
|
|
|
|
|
|
|
SEND_RAW_PRED = os.getenv('SEND_RAW_PRED') |
|
|
|
|
|
|
|
|
|
ConfidenceClass = log.ModelDataV2.ConfidenceClass |
|
|
|
|
|
|
|
|
|
def curv_from_psis(psi_target, psi_rate, vego, delay): |
|
|
|
|
vego = np.clip(vego, MIN_SPEED, np.inf) |
|
|
|
|
curv_from_psi = psi_target / (vego * delay) # epsilon to prevent divide-by-zero |
|
|
|
|
return 2*curv_from_psi - psi_rate / vego |
|
|
|
|
|
|
|
|
|
def get_curvature_from_plan(plan, vego, delay): |
|
|
|
|
psi_target = np.interp(delay, ModelConstants.T_IDXS, plan[:, Plan.T_FROM_CURRENT_EULER][:, 2]) |
|
|
|
|
psi_rate = plan[:, Plan.ORIENTATION_RATE][0, 2] |
|
|
|
|
return curv_from_psis(psi_target, psi_rate, vego, delay) |
|
|
|
|
|
|
|
|
|
class PublishState: |
|
|
|
|
def __init__(self): |
|
|
|
|
self.disengage_buffer = np.zeros(ModelConstants.CONFIDENCE_BUFFER_LEN*ModelConstants.DISENGAGE_WIDTH, dtype=np.float32) |
|
|
|
@ -55,14 +66,17 @@ def fill_lane_line_meta(builder, lane_lines, lane_line_probs): |
|
|
|
|
builder.rightProb = lane_line_probs[2] |
|
|
|
|
|
|
|
|
|
def fill_model_msg(base_msg: capnp._DynamicStructBuilder, extended_msg: capnp._DynamicStructBuilder, |
|
|
|
|
net_output_data: dict[str, np.ndarray], publish_state: PublishState, |
|
|
|
|
vipc_frame_id: int, vipc_frame_id_extra: int, frame_id: int, frame_drop: float, |
|
|
|
|
timestamp_eof: int, model_execution_time: float, valid: bool) -> None: |
|
|
|
|
net_output_data: dict[str, np.ndarray], v_ego: float, delay: float, |
|
|
|
|
publish_state: PublishState, vipc_frame_id: int, vipc_frame_id_extra: int, |
|
|
|
|
frame_id: int, frame_drop: float, timestamp_eof: int, model_execution_time: float, |
|
|
|
|
valid: bool) -> None: |
|
|
|
|
frame_age = frame_id - vipc_frame_id if frame_id > vipc_frame_id else 0 |
|
|
|
|
frame_drop_perc = frame_drop * 100 |
|
|
|
|
extended_msg.valid = valid |
|
|
|
|
base_msg.valid = valid |
|
|
|
|
|
|
|
|
|
desired_curv = float(get_curvature_from_plan(net_output_data['plan'][0], v_ego, delay)) |
|
|
|
|
|
|
|
|
|
driving_model_data = base_msg.drivingModelData |
|
|
|
|
|
|
|
|
|
driving_model_data.frameId = vipc_frame_id |
|
|
|
@ -71,7 +85,7 @@ def fill_model_msg(base_msg: capnp._DynamicStructBuilder, extended_msg: capnp._D |
|
|
|
|
driving_model_data.modelExecutionTime = model_execution_time |
|
|
|
|
|
|
|
|
|
action = driving_model_data.action |
|
|
|
|
action.desiredCurvature = float(net_output_data['desired_curvature'][0,0]) |
|
|
|
|
action.desiredCurvature = desired_curv |
|
|
|
|
|
|
|
|
|
modelV2 = extended_msg.modelV2 |
|
|
|
|
modelV2.frameId = vipc_frame_id |
|
|
|
@ -106,7 +120,7 @@ def fill_model_msg(base_msg: capnp._DynamicStructBuilder, extended_msg: capnp._D |
|
|
|
|
|
|
|
|
|
# lateral planning |
|
|
|
|
action = modelV2.action |
|
|
|
|
action.desiredCurvature = float(net_output_data['desired_curvature'][0,0]) |
|
|
|
|
action.desiredCurvature = desired_curv |
|
|
|
|
|
|
|
|
|
# times at X_IDXS according to model plan |
|
|
|
|
PLAN_T_IDXS = [np.nan] * ModelConstants.IDX_N |
|
|
|
|