openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

438 lines
16 KiB

import colorsys
import numpy as np
import pyray as rl
from cereal import messaging, car
from dataclasses import dataclass, field
from openpilot.common.params import Params
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.system.ui.lib.application import DEFAULT_FPS
from openpilot.system.ui.lib.shader_polygon import draw_polygon
from openpilot.selfdrive.locationd.calibrationd import HEIGHT_INIT
CLIP_MARGIN = 500
MIN_DRAW_DISTANCE = 10.0
MAX_DRAW_DISTANCE = 100.0
PATH_COLOR_TRANSITION_DURATION = 0.5 # Seconds for color transition animation
PATH_BLEND_INCREMENT = 1.0 / (PATH_COLOR_TRANSITION_DURATION * DEFAULT_FPS)
MAX_POINTS = 200
THROTTLE_COLORS = [
rl.Color(13, 248, 122, 102), # HSLF(148/360, 0.94, 0.51, 0.4)
rl.Color(114, 255, 92, 89), # HSLF(112/360, 1.0, 0.68, 0.35)
rl.Color(114, 255, 92, 0), # HSLF(112/360, 1.0, 0.68, 0.0)
]
NO_THROTTLE_COLORS = [
rl.Color(242, 242, 242, 102), # HSLF(148/360, 0.0, 0.95, 0.4)
rl.Color(242, 242, 242, 89), # HSLF(112/360, 0.0, 0.95, 0.35)
rl.Color(242, 242, 242, 0), # HSLF(112/360, 0.0, 0.95, 0.0)
]
@dataclass
class ModelPoints:
raw_points: np.ndarray = field(default_factory=lambda: np.empty((0, 3), dtype=np.float32))
projected_points: np.ndarray = field(default_factory=lambda: np.empty((0, 2), dtype=np.float32))
@dataclass
class LeadVehicle:
glow: list[float] = field(default_factory=list)
chevron: list[float] = field(default_factory=list)
fill_alpha: int = 0
class ModelRenderer:
def __init__(self):
self._longitudinal_control = False
self._experimental_mode = False
self._blend_factor = 1.0
self._prev_allow_throttle = True
self._lane_line_probs = np.zeros(4, dtype=np.float32)
self._road_edge_stds = np.zeros(2, dtype=np.float32)
self._lead_vehicles = [LeadVehicle(), LeadVehicle()]
self._path_offset_z = HEIGHT_INIT[0]
# Initialize ModelPoints objects
self._path = ModelPoints()
self._lane_lines = [ModelPoints() for _ in range(4)]
self._road_edges = [ModelPoints() for _ in range(2)]
self._acceleration_x = np.empty((0,), dtype=np.float32)
# Transform matrix (3x3 for car space to screen space)
self._car_space_transform = np.zeros((3, 3), dtype=np.float32)
self._transform_dirty = True
self._clip_region = None
self._rect = None
# Pre-allocated arrays for polygon conversion
self._temp_points_3d = np.empty((MAX_POINTS * 2, 3), dtype=np.float32)
self._temp_proj = np.empty((3, MAX_POINTS * 2), dtype=np.float32)
self._exp_gradient = {
'start': (0.0, 1.0), # Bottom of path
'end': (0.0, 0.0), # Top of path
'colors': [],
'stops': [],
}
# Get longitudinal control setting from car parameters
if car_params := Params().get("CarParams"):
cp = messaging.log_from_bytes(car_params, car.CarParams)
self._longitudinal_control = cp.openpilotLongitudinalControl
def set_transform(self, transform: np.ndarray):
self._car_space_transform = transform.astype(np.float32)
self._transform_dirty = True
def draw(self, rect: rl.Rectangle, sm: messaging.SubMaster):
# Check if data is up-to-date
if (sm.recv_frame["liveCalibration"] < ui_state.started_frame or
sm.recv_frame["modelV2"] < ui_state.started_frame):
return
# Set up clipping region
self._rect = rect
self._clip_region = rl.Rectangle(
rect.x - CLIP_MARGIN, rect.y - CLIP_MARGIN, rect.width + 2 * CLIP_MARGIN, rect.height + 2 * CLIP_MARGIN
)
# Update state
self._experimental_mode = sm['selfdriveState'].experimentalMode
live_calib = sm['liveCalibration']
self._path_offset_z = live_calib.height[0] if live_calib.height else HEIGHT_INIT[0]
if sm.updated['carParams']:
self._longitudinal_control = sm['carParams'].openpilotLongitudinalControl
model = sm['modelV2']
radar_state = sm['radarState'] if sm.valid['radarState'] else None
lead_one = radar_state.leadOne if radar_state else None
render_lead_indicator = self._longitudinal_control and radar_state is not None
# Update model data when needed
model_updated = sm.updated['modelV2']
if model_updated or sm.updated['radarState'] or self._transform_dirty:
if model_updated:
self._update_raw_points(model)
path_x_array = self._path.raw_points[:, 0]
if path_x_array.size == 0:
return
self._update_model(lead_one, path_x_array)
if render_lead_indicator:
self._update_leads(radar_state, path_x_array)
self._transform_dirty = False
# Draw elements
self._draw_lane_lines()
self._draw_path(sm)
if render_lead_indicator and radar_state:
self._draw_lead_indicator()
def _update_raw_points(self, model):
"""Update raw 3D points from model data"""
self._path.raw_points = np.array([model.position.x, model.position.y, model.position.z], dtype=np.float32).T
for i, lane_line in enumerate(model.laneLines):
self._lane_lines[i].raw_points = np.array([lane_line.x, lane_line.y, lane_line.z], dtype=np.float32).T
for i, road_edge in enumerate(model.roadEdges):
self._road_edges[i].raw_points = np.array([road_edge.x, road_edge.y, road_edge.z], dtype=np.float32).T
self._lane_line_probs = np.array(model.laneLineProbs, dtype=np.float32)
self._road_edge_stds = np.array(model.roadEdgeStds, dtype=np.float32)
self._acceleration_x = np.array(model.acceleration.x, dtype=np.float32)
def _update_leads(self, radar_state, path_x_array):
"""Update positions of lead vehicles"""
self._lead_vehicles = [LeadVehicle(), LeadVehicle()]
leads = [radar_state.leadOne, radar_state.leadTwo]
for i, lead_data in enumerate(leads):
if lead_data and lead_data.status:
d_rel, y_rel, v_rel = lead_data.dRel, lead_data.yRel, lead_data.vRel
idx = self._get_path_length_idx(path_x_array, d_rel)
# Get z-coordinate from path at the lead vehicle position
z = self._path.raw_points[idx, 2] if idx < len(self._path.raw_points) else 0.0
point = self._map_to_screen(d_rel, -y_rel, z + self._path_offset_z)
if point:
self._lead_vehicles[i] = self._update_lead_vehicle(d_rel, v_rel, point, self._rect)
def _update_model(self, lead, path_x_array):
"""Update model visualization data based on model message"""
max_distance = np.clip(path_x_array[-1], MIN_DRAW_DISTANCE, MAX_DRAW_DISTANCE)
max_idx = self._get_path_length_idx(self._lane_lines[0].raw_points[:, 0], max_distance)
# Update lane lines using raw points
for i, lane_line in enumerate(self._lane_lines):
lane_line.projected_points = self._map_line_to_polygon(
lane_line.raw_points, 0.025 * self._lane_line_probs[i], 0.0, max_idx
)
# Update road edges using raw points
for road_edge in self._road_edges:
road_edge.projected_points = self._map_line_to_polygon(road_edge.raw_points, 0.025, 0.0, max_idx)
# Update path using raw points
if lead and lead.status:
lead_d = lead.dRel * 2.0
max_distance = np.clip(lead_d - min(lead_d * 0.35, 10.0), 0.0, max_distance)
max_idx = self._get_path_length_idx(path_x_array, max_distance)
self._path.projected_points = self._map_line_to_polygon(
self._path.raw_points, 0.9, self._path_offset_z, max_idx, allow_invert=False
)
self._update_experimental_gradient(self._rect.height)
def _update_experimental_gradient(self, height):
"""Pre-calculate experimental mode gradient colors"""
if not self._experimental_mode:
return
max_len = min(len(self._path.projected_points) // 2, len(self._acceleration_x))
segment_colors = []
gradient_stops = []
i = 0
while i < max_len:
track_idx = max_len - i - 1 # flip idx to start from bottom right
track_y = self._path.projected_points[track_idx][1]
if track_y < 0 or track_y > height:
i += 1
continue
# Calculate color based on acceleration
lin_grad_point = (height - track_y) / height
# speed up: 120, slow down: 0
path_hue = max(min(60 + self._acceleration_x[i] * 35, 120), 0)
path_hue = int(path_hue * 100 + 0.5) / 100
saturation = min(abs(self._acceleration_x[i] * 1.5), 1)
lightness = self._map_val(saturation, 0.0, 1.0, 0.95, 0.62)
alpha = self._map_val(lin_grad_point, 0.75 / 2.0, 0.75, 0.4, 0.0)
# Use HSL to RGB conversion
color = self._hsla_to_color(path_hue / 360.0, saturation, lightness, alpha)
gradient_stops.append(lin_grad_point)
segment_colors.append(color)
# Skip a point, unless next is last
i += 1 + (1 if (i + 2) < max_len else 0)
# Store the gradient in the path object
self._exp_gradient['colors'] = segment_colors
self._exp_gradient['stops'] = gradient_stops
def _update_lead_vehicle(self, d_rel, v_rel, point, rect):
speed_buff, lead_buff = 10.0, 40.0
# Calculate fill alpha
fill_alpha = 0
if d_rel < lead_buff:
fill_alpha = 255 * (1.0 - (d_rel / lead_buff))
if v_rel < 0:
fill_alpha += 255 * (-1 * (v_rel / speed_buff))
fill_alpha = min(fill_alpha, 255)
# Calculate size and position
sz = np.clip((25 * 30) / (d_rel / 3 + 30), 15.0, 30.0) * 2.35
x = np.clip(point[0], 0.0, rect.width - sz / 2)
y = min(point[1], rect.height - sz * 0.6)
g_xo = sz / 5
g_yo = sz / 10
glow = [(x + (sz * 1.35) + g_xo, y + sz + g_yo), (x, y - g_yo), (x - (sz * 1.35) - g_xo, y + sz + g_yo)]
chevron = [(x + (sz * 1.25), y + sz), (x, y), (x - (sz * 1.25), y + sz)]
return LeadVehicle(glow=glow, chevron=chevron, fill_alpha=int(fill_alpha))
def _draw_lane_lines(self):
"""Draw lane lines and road edges"""
for i, lane_line in enumerate(self._lane_lines):
if lane_line.projected_points.size == 0:
continue
alpha = np.clip(self._lane_line_probs[i], 0.0, 0.7)
color = rl.Color(255, 255, 255, int(alpha * 255))
draw_polygon(self._rect, lane_line.projected_points, color)
for i, road_edge in enumerate(self._road_edges):
if road_edge.projected_points.size == 0:
continue
alpha = np.clip(1.0 - self._road_edge_stds[i], 0.0, 1.0)
color = rl.Color(255, 0, 0, int(alpha * 255))
draw_polygon(self._rect, road_edge.projected_points, color)
def _draw_path(self, sm):
"""Draw path with dynamic coloring based on mode and throttle state."""
if not self._path.projected_points.size:
return
if self._experimental_mode:
# Draw with acceleration coloring
if len(self._exp_gradient['colors']) > 2:
draw_polygon(self._rect, self._path.projected_points, gradient=self._exp_gradient)
else:
draw_polygon(self._rect, self._path.projected_points, rl.Color(255, 255, 255, 30))
else:
# Draw with throttle/no throttle gradient
allow_throttle = sm['longitudinalPlan'].allowThrottle or not self._longitudinal_control
# Start transition if throttle state changes
if allow_throttle != self._prev_allow_throttle:
self._prev_allow_throttle = allow_throttle
self._blend_factor = max(1.0 - self._blend_factor, 0.0)
# Update blend factor
if self._blend_factor < 1.0:
self._blend_factor = min(self._blend_factor + PATH_BLEND_INCREMENT, 1.0)
begin_colors = NO_THROTTLE_COLORS if allow_throttle else THROTTLE_COLORS
end_colors = THROTTLE_COLORS if allow_throttle else NO_THROTTLE_COLORS
# Blend colors based on transition
blended_colors = self._blend_colors(begin_colors, end_colors, self._blend_factor)
gradient = {
'start': (0.0, 1.0), # Bottom of path
'end': (0.0, 0.0), # Top of path
'colors': blended_colors,
'stops': [0.0, 0.5, 1.0],
}
draw_polygon(self._rect, self._path.projected_points, gradient=gradient)
def _draw_lead_indicator(self):
# Draw lead vehicles if available
for lead in self._lead_vehicles:
if not lead.glow or not lead.chevron:
continue
rl.draw_triangle_fan(lead.glow, len(lead.glow), rl.Color(218, 202, 37, 255))
rl.draw_triangle_fan(lead.chevron, len(lead.chevron), rl.Color(201, 34, 49, lead.fill_alpha))
@staticmethod
def _get_path_length_idx(pos_x_array: np.ndarray, path_height: float) -> int:
"""Get the index corresponding to the given path height"""
if len(pos_x_array) == 0:
return 0
indices = np.where(pos_x_array <= path_height)[0]
return indices[-1] if indices.size > 0 else 0
def _map_to_screen(self, in_x, in_y, in_z):
"""Project a point in car space to screen space"""
input_pt = np.array([in_x, in_y, in_z])
pt = self._car_space_transform @ input_pt
if abs(pt[2]) < 1e-6:
return None
x, y = pt[0] / pt[2], pt[1] / pt[2]
clip = self._clip_region
if not (clip.x <= x <= clip.x + clip.width and clip.y <= y <= clip.y + clip.height):
return None
return (x, y)
def _map_line_to_polygon(self, line: np.ndarray, y_off: float, z_off: float, max_idx: int, allow_invert: bool = True) -> np.ndarray:
"""Convert 3D line to 2D polygon for rendering."""
if line.shape[0] == 0:
return np.empty((0, 2), dtype=np.float32)
# Slice points and filter non-negative x-coordinates
points = line[:max_idx + 1]
points = points[points[:, 0] >= 0]
if points.shape[0] == 0:
return np.empty((0, 2), dtype=np.float32)
# Create left and right 3D points in one array
n_points = points.shape[0]
points_3d = self._temp_points_3d[:n_points * 2]
points_3d[:n_points, 0] = points_3d[n_points:, 0] = points[:, 0]
points_3d[:n_points, 1] = points[:, 1] - y_off
points_3d[n_points:, 1] = points[:, 1] + y_off
points_3d[:n_points, 2] = points_3d[n_points:, 2] = points[:, 2] + z_off
# Single matrix multiplication for projections
proj = np.ascontiguousarray(self._temp_proj[:, :n_points * 2]) # Slice the pre-allocated array
np.dot(self._car_space_transform, points_3d.T, out=proj)
valid_z = np.abs(proj[2]) > 1e-6
if not np.any(valid_z):
return np.empty((0, 2), dtype=np.float32)
# Compute screen coordinates
screen = proj[:2, valid_z] / proj[2, valid_z][None, :]
left_screen = screen[:, :n_points].T
right_screen = screen[:, n_points:].T
# Ensure consistent shapes by re-aligning valid points
valid_points = np.minimum(left_screen.shape[0], right_screen.shape[0])
if valid_points == 0:
return np.empty((0, 2), dtype=np.float32)
left_screen = left_screen[:valid_points]
right_screen = right_screen[:valid_points]
if self._clip_region:
clip = self._clip_region
bounds_mask = (
(left_screen[:, 0] >= clip.x) & (left_screen[:, 0] <= clip.x + clip.width) &
(left_screen[:, 1] >= clip.y) & (left_screen[:, 1] <= clip.y + clip.height) &
(right_screen[:, 0] >= clip.x) & (right_screen[:, 0] <= clip.x + clip.width) &
(right_screen[:, 1] >= clip.y) & (right_screen[:, 1] <= clip.y + clip.height)
)
if not np.any(bounds_mask):
return np.empty((0, 2), dtype=np.float32)
left_screen = left_screen[bounds_mask]
right_screen = right_screen[bounds_mask]
if not allow_invert and left_screen.shape[0] > 1:
keep = np.concatenate(([True], np.diff(left_screen[:, 1]) < 0))
left_screen = left_screen[keep]
right_screen = right_screen[keep]
if left_screen.shape[0] == 0:
return np.empty((0, 2), dtype=np.float32)
return np.vstack((left_screen, right_screen[::-1])).astype(np.float32)
@staticmethod
def _map_val(x, x0, x1, y0, y1):
x = np.clip(x, x0, x1)
ra = x1 - x0
rb = y1 - y0
return (x - x0) * rb / ra + y0 if ra != 0 else y0
@staticmethod
def _hsla_to_color(h, s, l, a):
rgb = colorsys.hls_to_rgb(h, l, s)
return rl.Color(
int(rgb[0] * 255),
int(rgb[1] * 255),
int(rgb[2] * 255),
int(a * 255)
)
@staticmethod
def _blend_colors(begin_colors, end_colors, t):
if t >= 1.0:
return end_colors
if t <= 0.0:
return begin_colors
inv_t = 1.0 - t
return [rl.Color(
int(inv_t * start.r + t * end.r),
int(inv_t * start.g + t * end.g),
int(inv_t * start.b + t * end.b),
int(inv_t * start.a + t * end.a)
) for start, end in zip(begin_colors, end_colors, strict=True)]