openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
8.5 KiB

2 years ago
# ruff: noqa: E701
import capnp
import numpy as np
2 years ago
from typing import Dict
from cereal import log
2 years ago
from openpilot.selfdrive.modeld.constants import (
DISENGAGE_WIDTH, DESIRE_PRED_LEN, FCW_THRESHOLDS_5MS2, FCW_THRESHOLDS_3MS2, IDX_N, LEAD_T_IDXS,
LEAD_T_OFFSETS, META_T_IDXS, MODEL_FREQ, RYG_GREEN, RYG_YELLOW, T_IDXS, X_IDXS, Plan, Meta
2 years ago
)
2 years ago
ConfidenceClass = log.ModelDataV2.ConfidenceClass
class PublishState:
def __init__(self):
self.disengage_buffer = np.zeros(DISENGAGE_WIDTH*(DESIRE_PRED_LEN+1), dtype=np.float32)
2 years ago
self.prev_brake_5ms2_probs = np.zeros(DISENGAGE_WIDTH, dtype=np.float32)
self.prev_brake_3ms2_probs = np.zeros(DISENGAGE_WIDTH, dtype=np.float32)
def enqueue(self, x, sample, length=None):
if length is None: length = len(sample)
x[:-length] = x[length:]
x[-length:] = sample
def fill_xyzt(builder, t, x, y, z, x_std=None, y_std=None, z_std=None):
builder.t = t
builder.x = x.tolist()
builder.y = y.tolist()
builder.z = z.tolist()
if x_std is not None: builder.xStd = x_std.tolist()
if y_std is not None: builder.yStd = y_std.tolist()
if z_std is not None: builder.zStd = z_std.tolist()
def fill_xyvat(builder, t, x, y, v, a, x_std=None, y_std=None, v_std=None, a_std=None):
builder.t = t
builder.x = x.tolist()
builder.y = y.tolist()
builder.v = v.tolist()
builder.a = a.tolist()
if x_std is not None: builder.xStd = x_std.tolist()
if y_std is not None: builder.yStd = y_std.tolist()
if v_std is not None:builder.vStd = v_std.tolist()
if a_std is not None:builder.aStd = a_std.tolist()
def fill_model_msg(msg: capnp._DynamicStructBuilder, net_output_data: Dict[str, np.ndarray], publish_state: PublishState,
vipc_frame_id: int, vipc_frame_id_extra: int, frame_id: int, frame_drop: float,
timestamp_eof: int, timestamp_llk: int, model_execution_time: float,
nav_enabled: bool, valid: bool) -> None:
frame_age = frame_id - vipc_frame_id if frame_id > vipc_frame_id else 0
msg.valid = valid
modelV2 = msg.modelV2
modelV2.frameId = vipc_frame_id
modelV2.frameIdExtra = vipc_frame_id_extra
modelV2.frameAge = frame_age
modelV2.frameDropPerc = frame_drop * 100
modelV2.timestampEof = timestamp_eof
modelV2.locationMonoTime = timestamp_llk
modelV2.modelExecutionTime = model_execution_time
modelV2.navEnabled = nav_enabled
# plan
2 years ago
position = modelV2.position
fill_xyzt(position, T_IDXS, *net_output_data['plan'][0,:,Plan.POSITION].T, *net_output_data['plan_stds'][0,:,Plan.POSITION].T)
2 years ago
velocity = modelV2.velocity
fill_xyzt(velocity, T_IDXS, *net_output_data['plan'][0,:,Plan.VELOCITY].T)
2 years ago
acceleration = modelV2.acceleration
fill_xyzt(acceleration, T_IDXS, *net_output_data['plan'][0,:,Plan.ACCELERATION].T)
2 years ago
orientation = modelV2.orientation
fill_xyzt(orientation, T_IDXS, *net_output_data['plan'][0,:,Plan.T_FROM_CURRENT_EULER].T)
2 years ago
orientation_rate = modelV2.orientationRate
fill_xyzt(orientation_rate, T_IDXS, *net_output_data['plan'][0,:,Plan.ORIENTATION_RATE].T)
2 years ago
# times at X_IDXS according to model plan
2 years ago
PLAN_T_IDXS = [np.nan] * IDX_N
PLAN_T_IDXS[0] = 0.0
plan_x = net_output_data['plan'][0,:,Plan.POSITION][:,0].tolist()
for xidx in range(1, IDX_N):
tidx = 0
# increment tidx until we find an element that's further away than the current xidx
while tidx < IDX_N - 1 and plan_x[tidx+1] < X_IDXS[xidx]:
tidx += 1
if tidx == IDX_N - 1:
# if the Plan doesn't extend far enough, set plan_t to the max value (10s), then break
PLAN_T_IDXS[xidx] = T_IDXS[IDX_N - 1]
break
# interpolate to find `t` for the current xidx
current_x_val = plan_x[tidx]
next_x_val = plan_x[tidx+1]
p = (X_IDXS[xidx] - current_x_val) / (next_x_val - current_x_val)
PLAN_T_IDXS[xidx] = p * T_IDXS[tidx+1] + (1 - p) * T_IDXS[tidx]
2 years ago
# lane lines
modelV2.init('laneLines', 4)
for i in range(4):
2 years ago
lane_line = modelV2.laneLines[i]
2 years ago
fill_xyzt(lane_line, PLAN_T_IDXS, np.array(X_IDXS), net_output_data['lane_lines'][0,i,:,0], net_output_data['lane_lines'][0,i,:,1])
modelV2.laneLineStds = net_output_data['lane_lines_stds'][0,:,0,0].tolist()
modelV2.laneLineProbs = net_output_data['lane_lines_prob'][0,1::2].tolist()
# road edges
modelV2.init('roadEdges', 2)
for i in range(2):
2 years ago
road_edge = modelV2.roadEdges[i]
2 years ago
fill_xyzt(road_edge, PLAN_T_IDXS, np.array(X_IDXS), net_output_data['road_edges'][0,i,:,0], net_output_data['road_edges'][0,i,:,1])
modelV2.roadEdgeStds = net_output_data['road_edges_stds'][0,:,0,0].tolist()
# leads
2 years ago
modelV2.init('leadsV3', 3)
for i in range(3):
2 years ago
lead = modelV2.leadsV3[i]
2 years ago
fill_xyvat(lead, LEAD_T_IDXS, *net_output_data['lead'][0,i].T, *net_output_data['lead_stds'][0,i].T)
2 years ago
lead.prob = net_output_data['lead_prob'][0,i].tolist()
lead.probTime = LEAD_T_OFFSETS[i]
# meta
2 years ago
meta = modelV2.meta
meta.desireState = net_output_data['desire_state'][0].reshape(-1).tolist()
meta.desirePrediction = net_output_data['desire_pred'][0].reshape(-1).tolist()
2 years ago
meta.engagedProb = net_output_data['meta'][0,Meta.ENGAGED].item()
meta.init('disengagePredictions')
disengage_predictions = meta.disengagePredictions
disengage_predictions.t = META_T_IDXS
disengage_predictions.brakeDisengageProbs = net_output_data['meta'][0,Meta.BRAKE_DISENGAGE].tolist()
disengage_predictions.gasDisengageProbs = net_output_data['meta'][0,Meta.GAS_DISENGAGE].tolist()
disengage_predictions.steerOverrideProbs = net_output_data['meta'][0,Meta.STEER_OVERRIDE].tolist()
disengage_predictions.brake3MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_3].tolist()
disengage_predictions.brake4MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_4].tolist()
disengage_predictions.brake5MetersPerSecondSquaredProbs = net_output_data['meta'][0,Meta.HARD_BRAKE_5].tolist()
2 years ago
publish_state.enqueue(publish_state.prev_brake_5ms2_probs, net_output_data['meta'][0,Meta.HARD_BRAKE_5][0], length=1)
publish_state.enqueue(publish_state.prev_brake_3ms2_probs, net_output_data['meta'][0,Meta.HARD_BRAKE_3][0], length=1)
hard_brake_predicted = (publish_state.prev_brake_5ms2_probs > FCW_THRESHOLDS_5MS2).all() and (publish_state.prev_brake_3ms2_probs > FCW_THRESHOLDS_3MS2).all()
meta.hardBrakePredicted = hard_brake_predicted.item()
# temporal pose
2 years ago
temporal_pose = modelV2.temporalPose
temporal_pose.trans = net_output_data['sim_pose'][0,:3].tolist()
temporal_pose.transStd = net_output_data['sim_pose_stds'][0,:3].tolist()
temporal_pose.rot = net_output_data['sim_pose'][0,3:].tolist()
temporal_pose.rotStd = net_output_data['sim_pose_stds'][0,3:].tolist()
2 years ago
# confidence
if vipc_frame_id % (2*MODEL_FREQ) == 0:
# any disengage prob
2 years ago
brake_disengage_probs = net_output_data['meta'][0,Meta.BRAKE_DISENGAGE]
gas_disengage_probs = net_output_data['meta'][0,Meta.GAS_DISENGAGE]
steer_override_probs = net_output_data['meta'][0,Meta.STEER_OVERRIDE]
any_disengage_probs = 1-((1-brake_disengage_probs)*(1-gas_disengage_probs)*(1-steer_override_probs))
2 years ago
# independent disengage prob for each 2s slice
ind_disengage_probs = np.r_[any_disengage_probs[0], np.diff(any_disengage_probs) / (1 - any_disengage_probs[:-1])]
2 years ago
# rolling buf for 2, 4, 6, 8, 10s
publish_state.enqueue(publish_state.disengage_buffer, ind_disengage_probs, length=DISENGAGE_WIDTH)
2 years ago
score = publish_state.disengage_buffer[DISENGAGE_WIDTH-1:DISENGAGE_WIDTH*DISENGAGE_WIDTH-1:DISENGAGE_WIDTH-1].sum().item()/DISENGAGE_WIDTH
modelV2.confidence = (score<RYG_GREEN)*ConfidenceClass.green + (RYG_GREEN<score<RYG_YELLOW)*ConfidenceClass.yellow + (score>RYG_YELLOW)*ConfidenceClass.red
def fill_pose_msg(msg: capnp._DynamicStructBuilder, net_output_data: Dict[str, np.ndarray],
vipc_frame_id: int, vipc_dropped_frames: int, timestamp_eof: int, live_calib_seen: bool) -> None:
msg.valid = live_calib_seen & (vipc_dropped_frames < 1)
cameraOdometry = msg.cameraOdometry
cameraOdometry.frameId = vipc_frame_id
cameraOdometry.timestampEof = timestamp_eof
cameraOdometry.trans = net_output_data['pose'][0,:3].tolist()
cameraOdometry.rot = net_output_data['pose'][0,3:].tolist()
cameraOdometry.wideFromDeviceEuler = net_output_data['wide_from_device_euler'][0,:].tolist()
cameraOdometry.roadTransformTrans = net_output_data['road_transform'][0,:3].tolist()
cameraOdometry.transStd = net_output_data['pose_stds'][0,:3].tolist()
cameraOdometry.rotStd = net_output_data['pose_stds'][0,3:].tolist()
cameraOdometry.wideFromDeviceEulerStd = net_output_data['wide_from_device_euler_stds'][0,:].tolist()
cameraOdometry.roadTransformTransStd = net_output_data['road_transform_stds'][0,:3].tolist()