system/ui: optimize ModelRenderer (#35369)

* optimize ModelRenderer with vectorized operations

* pre-calculate the exp mode colors

* cleanup

* improve batch map line to polygon

* pre-calc leads
pull/35390/head
Dean Lee 2 weeks ago committed by GitHub
parent b8f3e7bcf0
commit 29010cae23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 423
      system/ui/onroad/model_renderer.py

@ -1,8 +1,8 @@
import colorsys
import bisect
import numpy as np
import pyray as rl
from cereal import messaging, car
from dataclasses import dataclass, field
from openpilot.common.params import Params
from openpilot.system.ui.lib.application import DEFAULT_FPS
from openpilot.system.ui.lib.shader_polygon import draw_polygon
@ -27,6 +27,18 @@ NO_THROTTLE_COLORS = [
]
@dataclass
class ModelPoints:
raw_points: np.ndarray = field(default_factory=lambda: np.empty((0, 3), dtype=np.float32))
projected_points: np.ndarray = field(default_factory=lambda: np.empty((0, 2), dtype=np.float32))
@dataclass
class LeadVehicle:
glow: list[float] = field(default_factory=list)
chevron: list[float] = field(default_factory=list)
fill_alpha: int = 0
class ModelRenderer:
def __init__(self):
self._longitudinal_control = False
@ -35,28 +47,34 @@ class ModelRenderer:
self._prev_allow_throttle = True
self._lane_line_probs = np.zeros(4, dtype=np.float32)
self._road_edge_stds = np.zeros(2, dtype=np.float32)
self._lead_vehicles = [LeadVehicle(), LeadVehicle()]
self._path_offset_z = 1.22
# Initialize empty polygon vertices
self._track_vertices = np.empty((0, 2), dtype=np.float32)
self._lane_line_vertices = [np.empty((0, 2), dtype=np.float32) for _ in range(4)]
self._road_edge_vertices = [np.empty((0, 2), dtype=np.float32) for _ in range(2)]
self._lead_vertices = [None, None]
# Initialize ModelPoints objects
self._path = ModelPoints()
self._lane_lines = [ModelPoints() for _ in range(4)]
self._road_edges = [ModelPoints() for _ in range(2)]
self._acceleration_x = np.empty((0,), dtype=np.float32)
# Transform matrix (3x3 for car space to screen space)
self._car_space_transform = np.zeros((3, 3))
self._car_space_transform = np.zeros((3, 3), dtype=np.float32)
self._transform_dirty = True
self._clip_region = None
self._rect = None
self._exp_gradient = {
'start': (0.0, 1.0), # Bottom of path
'end': (0.0, 0.0), # Top of path
'colors': [],
'stops': [],
}
# Get longitudinal control setting from car parameters
car_params = Params().get("CarParams")
if car_params:
if car_params := Params().get("CarParams"):
cp = messaging.log_from_bytes(car_params, car.CarParams)
self._longitudinal_control = cp.openpilotLongitudinalControl
def set_transform(self, transform: np.ndarray):
self._car_space_transform = transform
self._car_space_transform = transform.astype(np.float32)
self._transform_dirty = True
def draw(self, rect: rl.Rectangle, sm: messaging.SubMaster):
@ -70,161 +88,189 @@ class ModelRenderer:
rect.x - CLIP_MARGIN, rect.y - CLIP_MARGIN, rect.width + 2 * CLIP_MARGIN, rect.height + 2 * CLIP_MARGIN
)
# Update flags based on car state
# Update state
self._experimental_mode = sm['selfdriveState'].experimentalMode
self._path_offset_z = sm['liveCalibration'].height[0]
if sm.updated['carParams']:
self._longitudinal_control = sm['carParams'].openpilotLongitudinalControl
# Get model and radar data
model = sm['modelV2']
radar_state = sm['radarState'] if sm.valid['radarState'] else None
lead_one = radar_state.leadOne if radar_state else None
render_lead_indicator = self._longitudinal_control and radar_state is not None
# Update model data when needed
if self._transform_dirty or sm.updated['modelV2'] or sm.updated['radarState']:
self._update_model(model, lead_one)
model_updated = sm.updated['modelV2']
if model_updated or sm.updated['radarState'] or self._transform_dirty:
if model_updated:
self._update_raw_points(model)
pos_x_array = self._path.raw_points[:, 0]
if pos_x_array.size == 0:
return
self._update_model(lead_one, pos_x_array)
if render_lead_indicator:
self._update_leads(radar_state, model.position)
self._update_leads(radar_state, pos_x_array)
self._transform_dirty = False
# Draw elements
self._draw_lane_lines()
self._draw_path(sm, model, rect.height)
self._draw_path(sm)
# Draw lead vehicles if available
if render_lead_indicator and radar_state:
lead_two = radar_state.leadTwo
self._draw_lead_indicator()
def _update_raw_points(self, model):
"""Update raw 3D points from model data"""
self._path.raw_points = np.array([model.position.x, model.position.y, model.position.z], dtype=np.float32).T
if lead_one and lead_one.status:
self._draw_lead(lead_one, self._lead_vertices[0], rect)
for i, lane_line in enumerate(model.laneLines):
self._lane_lines[i].raw_points = np.array([lane_line.x, lane_line.y, lane_line.z], dtype=np.float32).T
if lead_two and lead_two.status and lead_one and (abs(lead_one.dRel - lead_two.dRel) > 3.0):
self._draw_lead(lead_two, self._lead_vertices[1], rect)
for i, road_edge in enumerate(model.roadEdges):
self._road_edges[i].raw_points = np.array([road_edge.x, road_edge.y, road_edge.z], dtype=np.float32).T
def _update_leads(self, radar_state, line):
self._lane_line_probs = np.array(model.laneLineProbs, dtype=np.float32)
self._road_edge_stds = np.array(model.roadEdgeStds, dtype=np.float32)
self._acceleration_x = np.array(model.acceleration.x, dtype=np.float32)
def _update_leads(self, radar_state, pos_x_array):
"""Update positions of lead vehicles"""
self._lead_vehicles = [LeadVehicle(), LeadVehicle()]
leads = [radar_state.leadOne, radar_state.leadTwo]
for i, lead_data in enumerate(leads):
if lead_data and lead_data.status:
d_rel = lead_data.dRel
y_rel = lead_data.yRel
idx = self._get_path_length_idx(line, d_rel)
z = line.z[idx]
self._lead_vertices[i] = self._map_to_screen(d_rel, -y_rel, z + self._path_offset_z)
def _update_model(self, model, lead):
d_rel, y_rel, v_rel = lead_data.dRel, lead_data.yRel, lead_data.vRel
idx = self._get_path_length_idx(pos_x_array, d_rel)
z = self._path.raw_points[idx, 2] if idx < len(self._path.raw_points) else 0.0
point = self._map_to_screen(d_rel, -y_rel, z + self._path_offset_z)
if point:
self._lead_vehicles[i] = self._update_lead_vehicle(d_rel, v_rel, point, self._rect)
def _update_model(self, lead, pos_x_array):
"""Update model visualization data based on model message"""
model_position = model.position
# Determine max distance to render
max_distance = np.clip(model_position.x[-1], MIN_DRAW_DISTANCE, MAX_DRAW_DISTANCE)
max_distance = np.clip(pos_x_array[-1], MIN_DRAW_DISTANCE, MAX_DRAW_DISTANCE)
max_idx = self._get_path_length_idx(pos_x_array, max_distance)
# Update lane lines
lane_lines = model.laneLines
line_probs = model.laneLineProbs
max_idx = self._get_path_length_idx(lane_lines[0], max_distance)
for i in range(4):
self._lane_line_probs[i] = line_probs[i]
self._lane_line_vertices[i] = self._map_line_to_polygon(
lane_lines[i], 0.025 * self._lane_line_probs[i], 0, max_idx
# Update lane lines using raw points
for i, lane_line in enumerate(self._lane_lines):
lane_line.projected_points = self._map_line_to_polygon(
lane_line.raw_points, 0.025 * self._lane_line_probs[i], 0.0, max_idx
)
# Update road edges
road_edges = model.roadEdges
edge_stds = model.roadEdgeStds
for i in range(2):
self._road_edge_stds[i] = edge_stds[i]
self._road_edge_vertices[i] = self._map_line_to_polygon(road_edges[i], 0.025, 0, max_idx)
# Update road edges using raw points
for road_edge in self._road_edges:
road_edge.projected_points = self._map_line_to_polygon(road_edge.raw_points, 0.025, 0.0, max_idx)
# Update path
# Update path using raw points
if lead and lead.status:
lead_d = lead.dRel * 2.0
max_distance = np.clip(lead_d - min(lead_d * 0.35, 10.0), 0.0, max_distance)
max_idx = self._get_path_length_idx(model_position, max_distance)
max_idx = self._get_path_length_idx(pos_x_array, max_distance)
self._track_vertices = self._map_line_to_polygon(model_position, 0.9, self._path_offset_z, max_idx, False)
self._path.projected_points = self._map_line_to_polygon(
self._path.raw_points, 0.9, self._path_offset_z, max_idx, allow_invert=False
)
def _draw_lane_lines(self):
"""Draw lane lines and road edges"""
for i, vertices in enumerate(self._lane_line_vertices):
# Skip if no vertices
if vertices.size == 0:
continue
self._update_experimental_gradient(self._rect.height)
# Draw lane line
alpha = np.clip(self._lane_line_probs[i], 0.0, 0.7)
color = rl.Color(255, 255, 255, int(alpha * 255))
draw_polygon(self._rect, vertices, color)
def _update_experimental_gradient(self, height):
"""Pre-calculate experimental mode gradient colors"""
if not self._experimental_mode:
return
for i, vertices in enumerate(self._road_edge_vertices):
# Skip if no vertices
if vertices.size == 0:
max_len = min(len(self._path.projected_points) // 2, len(self._acceleration_x))
segment_colors = []
gradient_stops = []
i = 0
while i < max_len:
track_idx = max_len - i - 1 # flip idx to start from bottom right
track_y = self._path.projected_points[track_idx][1]
if track_y < 0 or track_y > height:
i += 1
continue
# Draw road edge
alpha = np.clip(1.0 - self._road_edge_stds[i], 0.0, 1.0)
color = rl.Color(255, 0, 0, int(alpha * 255))
draw_polygon(self._rect, vertices, color)
# Calculate color based on acceleration
lin_grad_point = (height - track_y) / height
def _draw_path(self, sm, model, height):
"""Draw the path polygon with gradient based on acceleration"""
if self._track_vertices.size == 0:
return
# speed up: 120, slow down: 0
path_hue = max(min(60 + self._acceleration_x[i] * 35, 120), 0)
path_hue = int(path_hue * 100 + 0.5) / 100
if self._experimental_mode:
# Draw with acceleration coloring
acceleration = model.acceleration.x
max_len = min(len(self._track_vertices) // 2, len(acceleration))
saturation = min(abs(self._acceleration_x[i] * 1.5), 1)
lightness = self._map_val(saturation, 0.0, 1.0, 0.95, 0.62)
alpha = self._map_val(lin_grad_point, 0.75 / 2.0, 0.75, 0.4, 0.0)
# Create segments for gradient coloring
segment_colors = []
gradient_stops = []
# Use HSL to RGB conversion
color = self._hsla_to_color(path_hue / 360.0, saturation, lightness, alpha)
i = 0
while i < max_len:
track_idx = max_len - i - 1 # flip idx to start from bottom right
track_y = self._track_vertices[track_idx][1]
if track_y < 0 or track_y > height:
i += 1
continue
gradient_stops.append(lin_grad_point)
segment_colors.append(color)
# Calculate color based on acceleration
lin_grad_point = (height - track_y) / height
# Skip a point, unless next is last
i += 1 + (1 if (i + 2) < max_len else 0)
# speed up: 120, slow down: 0
path_hue = max(min(60 + acceleration[i] * 35, 120), 0)
path_hue = int(path_hue * 100 + 0.5) / 100
# Store the gradient in the path object
self._exp_gradient['colors'] = segment_colors
self._exp_gradient['stops'] = gradient_stops
saturation = min(abs(acceleration[i] * 1.5), 1)
lightness = self._map_val(saturation, 0.0, 1.0, 0.95, 0.62)
alpha = self._map_val(lin_grad_point, 0.75 / 2.0, 0.75, 0.4, 0.0)
def _update_lead_vehicle(self, d_rel, v_rel, point, rect):
speed_buff, lead_buff = 10.0, 40.0
# Use HSL to RGB conversion
color = self._hsla_to_color(path_hue / 360.0, saturation, lightness, alpha)
# Calculate fill alpha
fill_alpha = 0
if d_rel < lead_buff:
fill_alpha = 255 * (1.0 - (d_rel / lead_buff))
if v_rel < 0:
fill_alpha += 255 * (-1 * (v_rel / speed_buff))
fill_alpha = min(fill_alpha, 255)
# Create quad segment
gradient_stops.append(lin_grad_point)
segment_colors.append(color)
# Calculate size and position
sz = np.clip((25 * 30) / (d_rel / 3 + 30), 15.0, 30.0) * 2.35
x = np.clip(point[0], 0.0, rect.width - sz / 2)
y = min(point[1], rect.height - sz * 0.6)
# Skip a point, unless next is last
i += 1 + (1 if (i + 2) < max_len else 0)
g_xo = sz / 5
g_yo = sz / 10
if len(segment_colors) < 2:
draw_polygon(self._rect, self._track_vertices, rl.Color(255, 255, 255, 30))
return
glow = [(x + (sz * 1.35) + g_xo, y + sz + g_yo), (x, y - g_yo), (x - (sz * 1.35) - g_xo, y + sz + g_yo)]
chevron = [(x + (sz * 1.25), y + sz), (x, y), (x - (sz * 1.25), y + sz)]
# Create gradient specification
gradient = {
'start': (0.0, 1.0), # Bottom of path
'end': (0.0, 0.0), # Top of path
'colors': segment_colors,
'stops': gradient_stops,
}
draw_polygon(self._rect, self._track_vertices, gradient=gradient)
return LeadVehicle(glow=glow,chevron=chevron, fill_alpha=int(fill_alpha))
def _draw_lane_lines(self):
"""Draw lane lines and road edges"""
for i, lane_line in enumerate(self._lane_lines):
if lane_line.projected_points.size == 0:
continue
alpha = np.clip(self._lane_line_probs[i], 0.0, 0.7)
color = rl.Color(255, 255, 255, int(alpha * 255))
draw_polygon(self._rect, lane_line.projected_points, color)
for i, road_edge in enumerate(self._road_edges):
if road_edge.projected_points.size == 0:
continue
alpha = np.clip(1.0 - self._road_edge_stds[i], 0.0, 1.0)
color = rl.Color(255, 0, 0, int(alpha * 255))
draw_polygon(self._rect, road_edge.projected_points, color)
def _draw_path(self, sm):
"""Draw path with dynamic coloring based on mode and throttle state."""
if not self._path.projected_points.size:
return
if self._experimental_mode:
# Draw with acceleration coloring
if len(self._exp_gradient['colors']) > 2:
draw_polygon(self._rect, self._path.projected_points, gradient=self._exp_gradient)
else:
draw_polygon(self._rect, self._path.projected_points, rl.Color(255, 255, 255, 30))
else:
# Draw with throttle/no throttle gradient
allow_throttle = sm['longitudinalPlan'].allowThrottle or not self._longitudinal_control
@ -249,46 +295,22 @@ class ModelRenderer:
'colors': blended_colors,
'stops': [0.0, 0.5, 1.0],
}
draw_polygon(self._rect, self._track_vertices, gradient=gradient)
def _draw_lead(self, lead_data, vd, rect):
"""Draw lead vehicle indicator"""
if not vd:
return
draw_polygon(self._rect, self._path.projected_points, gradient=gradient)
speed_buff = 10.0
lead_buff = 40.0
d_rel = lead_data.dRel
v_rel = lead_data.vRel
# Calculate fill alpha
fill_alpha = 0
if d_rel < lead_buff:
fill_alpha = 255 * (1.0 - (d_rel / lead_buff))
if v_rel < 0:
fill_alpha += 255 * (-1 * (v_rel / speed_buff))
fill_alpha = min(fill_alpha, 255)
# Calculate size and position
sz = np.clip((25 * 30) / (d_rel / 3 + 30), 15.0, 30.0) * 2.35
x = np.clip(vd[0], 0.0, rect.width - sz / 2)
y = min(vd[1], rect.height - sz * 0.6)
g_xo = sz / 5
g_yo = sz / 10
def _draw_lead_indicator(self):
# Draw lead vehicles if available
for lead in self._lead_vehicles:
if not lead.glow or not lead.chevron:
continue
# Draw glow
glow = [(x + (sz * 1.35) + g_xo, y + sz + g_yo), (x, y - g_yo), (x - (sz * 1.35) - g_xo, y + sz + g_yo)]
rl.draw_triangle_fan(glow, len(glow), rl.Color(218, 202, 37, 255))
# Draw chevron
chevron = [(x + (sz * 1.25), y + sz), (x, y), (x - (sz * 1.25), y + sz)]
rl.draw_triangle_fan(chevron, len(chevron), rl.Color(201, 34, 49, int(fill_alpha)))
rl.draw_triangle_fan(lead.glow, len(lead.glow), rl.Color(218, 202, 37, 255))
rl.draw_triangle_fan(lead.chevron, len(lead.chevron), rl.Color(201, 34, 49, lead.fill_alpha))
@staticmethod
def _get_path_length_idx(line, path_height):
def _get_path_length_idx(pos_x_array: np.ndarray, path_height: float) -> int:
"""Get the index corresponding to the given path height"""
return bisect.bisect_right(line.x, path_height) - 1
idx = np.searchsorted(pos_x_array, path_height, side='right')
return int(np.clip(idx - 1, 0, len(pos_x_array) - 1))
def _map_to_screen(self, in_x, in_y, in_z):
"""Project a point in car space to screen space"""
@ -298,43 +320,71 @@ class ModelRenderer:
if abs(pt[2]) < 1e-6:
return None
x = pt[0] / pt[2]
y = pt[1] / pt[2]
x, y = pt[0] / pt[2], pt[1] / pt[2]
clip = self._clip_region
if x < clip.x or x > clip.x + clip.width or y < clip.y or y > clip.y + clip.height:
if not (clip.x <= x <= clip.x + clip.width and clip.y <= y <= clip.y + clip.height):
return None
return (x, y)
def _map_line_to_polygon(self, line, y_off, z_off, max_idx, allow_invert=True)-> np.ndarray:
"""Convert a 3D line to a 2D polygon for drawing"""
line_x = line.x
line_y = line.y
line_z = line.z
points: list = []
for i in range(max_idx + 1):
# Skip points with negative x (behind camera)
if line_x[i] < 0:
continue
left = self._map_to_screen(line_x[i], line_y[i] - y_off, line_z[i] + z_off)
right = self._map_to_screen(line_x[i], line_y[i] + y_off, line_z[i] + z_off)
if left and right:
# Check for inversion when going over hills
if not allow_invert and points and left[1] > points[-1][1]:
continue
points.append(left)
points.insert(0, right)
if not points:
return np.empty((0, 2), dtype=np.float32)
return np.array(points, dtype=np.float32)
def _map_line_to_polygon(self, line: np.ndarray, y_off: float, z_off: float, max_idx: int, allow_invert: bool = True) -> np.ndarray:
"""Convert 3D line to 2D polygon for rendering."""
if line.shape[0] == 0:
return np.empty((0, 2), dtype=np.float32)
# Slice points and filter non-negative x-coordinates
points = line[:max_idx + 1][line[:max_idx + 1, 0] >= 0]
if points.shape[0] == 0:
return np.empty((0, 2), dtype=np.float32)
# Create left and right 3D points in one array
n_points = points.shape[0]
points_3d = np.empty((n_points * 2, 3), dtype=np.float32)
points_3d[:n_points, 0] = points_3d[n_points:, 0] = points[:, 0]
points_3d[:n_points, 1] = points[:, 1] - y_off
points_3d[n_points:, 1] = points[:, 1] + y_off
points_3d[:n_points, 2] = points_3d[n_points:, 2] = points[:, 2] + z_off
# Single matrix multiplication for projections
proj = self._car_space_transform @ points_3d.T
valid_z = np.abs(proj[2]) > 1e-6
if not np.any(valid_z):
return np.empty((0, 2), dtype=np.float32)
# Compute screen coordinates
screen = proj[:2, valid_z] / proj[2, valid_z][None, :]
left_screen = screen[:, :n_points].T
right_screen = screen[:, n_points:].T
# Ensure consistent shapes by re-aligning valid points
valid_points = np.minimum(left_screen.shape[0], right_screen.shape[0])
if valid_points == 0:
return np.empty((0, 2), dtype=np.float32)
left_screen = left_screen[:valid_points]
right_screen = right_screen[:valid_points]
if self._clip_region:
clip = self._clip_region
bounds_mask = (
(left_screen[:, 0] >= clip.x) & (left_screen[:, 0] <= clip.x + clip.width) &
(left_screen[:, 1] >= clip.y) & (left_screen[:, 1] <= clip.y + clip.height) &
(right_screen[:, 0] >= clip.x) & (right_screen[:, 0] <= clip.x + clip.width) &
(right_screen[:, 1] >= clip.y) & (right_screen[:, 1] <= clip.y + clip.height)
)
if not np.any(bounds_mask):
return np.empty((0, 2), dtype=np.float32)
left_screen = left_screen[bounds_mask]
right_screen = right_screen[bounds_mask]
if not allow_invert and left_screen.shape[0] > 1:
keep = np.concatenate(([True], np.diff(left_screen[:, 1]) < 0))
left_screen = left_screen[keep]
right_screen = right_screen[keep]
if left_screen.shape[0] == 0:
return np.empty((0, 2), dtype=np.float32)
return np.vstack((left_screen, right_screen[::-1])).astype(np.float32)
@staticmethod
def _map_val(x, x0, x1, y0, y1):
@ -345,17 +395,8 @@ class ModelRenderer:
@staticmethod
def _hsla_to_color(h, s, l, a):
"""Convert HSLA color to Raylib Color using colorsys module"""
# colorsys uses HLS format (Hue, Lightness, Saturation)
r, g, b = colorsys.hls_to_rgb(h, l, s)
# Ensure values are in valid range
r_val = max(0, min(255, int(r * 255)))
g_val = max(0, min(255, int(g * 255)))
b_val = max(0, min(255, int(b * 255)))
a_val = max(0, min(255, int(a * 255)))
return rl.Color(r_val, g_val, b_val, a_val)
r, g, b = [max(0, min(255, int(v * 255))) for v in colorsys.hls_to_rgb(h, l, s)]
return rl.Color(r, g, b, max(0, min(255, int(a * 255))))
@staticmethod
def _blend_colors(begin_colors, end_colors, t):

Loading…
Cancel
Save