|
|
|
@ -1,10 +1,13 @@ |
|
|
|
|
import capnp |
|
|
|
|
import numpy as np |
|
|
|
|
from typing import List, Dict |
|
|
|
|
from openpilot.selfdrive.modeld.models.driving_pyx import PublishState |
|
|
|
|
from openpilot.selfdrive.modeld.constants import T_IDXS, X_IDXS, LEAD_T_IDXS, META_T_IDXS, LEAD_T_OFFSETS, Meta, Plan |
|
|
|
|
from openpilot.selfdrive.modeld.constants import T_IDXS, X_IDXS, LEAD_T_IDXS, META_T_IDXS, LEAD_T_OFFSETS, Meta, Plan, FCW_THRESHOLDS_5MS2, FCW_THRESHOLDS_3MS2 |
|
|
|
|
|
|
|
|
|
# TODO: use Enum Slices when possible instead of hardcoded indices |
|
|
|
|
class PublishState: |
|
|
|
|
def __init__(self): |
|
|
|
|
self.disengage_buffer = np.zeros(5*5, dtype=np.float32) |
|
|
|
|
self.prev_brake_5ms2_probs = np.zeros(5, dtype=np.float32) |
|
|
|
|
self.prev_brake_3ms2_probs = np.zeros(3, dtype=np.float32) |
|
|
|
|
|
|
|
|
|
def fill_xyzt(builder, t, x, y, z, x_std=None, y_std=None, z_std=None): |
|
|
|
|
builder.t = t |
|
|
|
@ -26,7 +29,7 @@ def fill_xyvat(builder, t, x, y, v, a, x_std=None, y_std=None, v_std=None, a_std |
|
|
|
|
if v_std is not None:builder.vStd = v_std.tolist() |
|
|
|
|
if a_std is not None:builder.aStd = a_std.tolist() |
|
|
|
|
|
|
|
|
|
def fill_model_msg(msg: capnp._DynamicStructBuilder, net_output_data: Dict[str, np.ndarray], ps: PublishState, |
|
|
|
|
def fill_model_msg(msg: capnp._DynamicStructBuilder, net_output_data: Dict[str, np.ndarray], publish_state: PublishState, |
|
|
|
|
vipc_frame_id: int, vipc_frame_id_extra: int, frame_id: int, frame_drop: float, |
|
|
|
|
timestamp_eof: int, timestamp_llk: int, model_execution_time: float, |
|
|
|
|
nav_enabled: bool, valid: bool) -> None: |
|
|
|
@ -96,9 +99,14 @@ def fill_model_msg(msg: capnp._DynamicStructBuilder, net_output_data: Dict[str, |
|
|
|
|
disengage_predictions.steerOverrideProbs = net_output_data['meta'][0,Meta.STEER_OVERRIDE].tolist() |
|
|
|
|
disengage_predictions.brake3MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_3].tolist() |
|
|
|
|
disengage_predictions.brake4MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_4].tolist() |
|
|
|
|
disengage_predictions.brake5MetersPerSecondSquaredProbs =net_output_data['meta'][0,Meta.HARD_BRAKE_5].tolist() |
|
|
|
|
|
|
|
|
|
meta.hardBrakePredicted = False |
|
|
|
|
disengage_predictions.brake5MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_5].tolist() |
|
|
|
|
|
|
|
|
|
publish_state.prev_brake_5ms2_probs[:-1] = publish_state.prev_brake_5ms2_probs[1:] |
|
|
|
|
publish_state.prev_brake_5ms2_probs[-1] = net_output_data['meta'][0,Meta.HARD_BRAKE_5][0] |
|
|
|
|
publish_state.prev_brake_3ms2_probs[:-1] = publish_state.prev_brake_3ms2_probs[1:] |
|
|
|
|
publish_state.prev_brake_3ms2_probs[-1] = net_output_data['meta'][0,Meta.HARD_BRAKE_3][0] |
|
|
|
|
hard_brake_predicted = (publish_state.prev_brake_5ms2_probs > FCW_THRESHOLDS_5MS2).all() and (publish_state.prev_brake_3ms2_probs > FCW_THRESHOLDS_3MS2).all() |
|
|
|
|
meta.hardBrakePredicted = hard_brake_predicted.item() |
|
|
|
|
|
|
|
|
|
# temporal pose |
|
|
|
|
temporal_pose = modelV2.temporalPose |
|
|
|
|