openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

188 lines
7.3 KiB

import numpy as np
import pyray as rl
from cereal import log
from msgq.visionipc import VisionStreamType
from openpilot.selfdrive.ui.ui_state import ui_state, UIStatus, UI_BORDER_SIZE
from openpilot.selfdrive.ui.onroad.alert_renderer import AlertRenderer
from openpilot.selfdrive.ui.onroad.driver_state import DriverStateRenderer
from openpilot.selfdrive.ui.onroad.hud_renderer import HudRenderer
from openpilot.selfdrive.ui.onroad.model_renderer import ModelRenderer
from openpilot.selfdrive.ui.onroad.cameraview import CameraView
from openpilot.system.ui.lib.application import gui_app
from openpilot.common.transformations.camera import DEVICE_CAMERAS, DeviceCameraConfig, view_frame_from_device_frame
from openpilot.common.transformations.orientation import rot_from_euler
OpState = log.SelfdriveState.OpenpilotState
CALIBRATED = log.LiveCalibrationData.Status.calibrated
DEFAULT_DEVICE_CAMERA = DEVICE_CAMERAS["tici", "ar0231"]
BORDER_COLORS = {
UIStatus.DISENGAGED: rl.Color(0x17, 0x33, 0x49, 0xC8), # Blue for disengaged state
UIStatus.OVERRIDE: rl.Color(0x91, 0x9B, 0x95, 0xF1), # Gray for override state
UIStatus.ENGAGED: rl.Color(0x17, 0x86, 0x44, 0xF1), # Green for engaged state
}
class AugmentedRoadView(CameraView):
def __init__(self, stream_type: VisionStreamType):
super().__init__("camerad", stream_type)
self.device_camera: DeviceCameraConfig | None = None
self.view_from_calib = view_frame_from_device_frame.copy()
self.view_from_wide_calib = view_frame_from_device_frame.copy()
self._last_calib_time: float = 0
self._last_rect_dims = (0.0, 0.0)
self._cached_matrix: np.ndarray | None = None
self._content_rect = rl.Rectangle()
self.model_renderer = ModelRenderer()
self._hud_renderer = HudRenderer()
self.alert_renderer = AlertRenderer()
self.driver_state_renderer = DriverStateRenderer()
def render(self, rect):
# Only render when system is started to avoid invalid data access
if not ui_state.started:
return
# Update calibration before rendering
self._update_calibration()
# Create inner content area with border padding
self._content_rect = rl.Rectangle(
rect.x + UI_BORDER_SIZE,
rect.y + UI_BORDER_SIZE,
rect.width - 2 * UI_BORDER_SIZE,
rect.height - 2 * UI_BORDER_SIZE,
)
# Draw colored border based on driving state
self._draw_border(rect)
# Enable scissor mode to clip all rendering within content rectangle boundaries
# This creates a rendering viewport that prevents graphics from drawing outside the border
rl.begin_scissor_mode(
int(self._content_rect.x),
int(self._content_rect.y),
int(self._content_rect.width),
int(self._content_rect.height)
)
# Render the base camera view
super().render(rect)
# Draw all UI overlays
self.model_renderer.draw(self._content_rect, ui_state.sm)
self._hud_renderer.draw(self._content_rect, ui_state.sm)
self.alert_renderer.draw(self._content_rect, ui_state.sm)
self.driver_state_renderer.draw(self._content_rect, ui_state.sm)
# Custom UI extension point - add custom overlays here
# Use self._content_rect for positioning within camera bounds
# End clipping region
rl.end_scissor_mode()
def _draw_border(self, rect: rl.Rectangle):
border_color = BORDER_COLORS.get(ui_state.status, BORDER_COLORS[UIStatus.DISENGAGED])
rl.draw_rectangle_lines_ex(rect, UI_BORDER_SIZE, border_color)
def _update_calibration(self):
# Update device camera if not already set
sm = ui_state.sm
if not self.device_camera and sm.seen['roadCameraState'] and sm.seen['deviceState']:
self.device_camera = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))]
# Check if live calibration data is available and valid
if not (sm.updated["liveCalibration"] and sm.valid['liveCalibration']):
return
calib = sm['liveCalibration']
if len(calib.rpyCalib) != 3 or calib.calStatus != CALIBRATED:
return
# Update view_from_calib matrix
device_from_calib = rot_from_euler(calib.rpyCalib)
self.view_from_calib = view_frame_from_device_frame @ device_from_calib
# Update wide calibration if available
if hasattr(calib, 'wideFromDeviceEuler') and len(calib.wideFromDeviceEuler) == 3:
wide_from_device = rot_from_euler(calib.wideFromDeviceEuler)
self.view_from_wide_calib = view_frame_from_device_frame @ wide_from_device @ device_from_calib
def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray:
# Check if we can use cached matrix
calib_time = ui_state.sm.recv_frame['liveCalibration']
current_dims = (self._content_rect.width, self._content_rect.height)
if (self._last_calib_time == calib_time and
self._last_rect_dims == current_dims and
self._cached_matrix is not None):
return self._cached_matrix
# Get camera configuration
device_camera = self.device_camera or DEFAULT_DEVICE_CAMERA
is_wide_camera = self.stream_type == VisionStreamType.VISION_STREAM_WIDE_ROAD
intrinsic = device_camera.ecam.intrinsics if is_wide_camera else device_camera.fcam.intrinsics
calibration = self.view_from_wide_calib if is_wide_camera else self.view_from_calib
zoom = 2.0 if is_wide_camera else 1.1
# Calculate transforms for vanishing point
inf_point = np.array([1000.0, 0.0, 0.0])
calib_transform = intrinsic @ calibration
kep = calib_transform @ inf_point
# Calculate center points and dimensions
x, y = self._content_rect.x, self._content_rect.y
w, h = self._content_rect.width, self._content_rect.height
cx, cy = intrinsic[0, 2], intrinsic[1, 2]
# Calculate max allowed offsets with margins
margin = 5
max_x_offset = cx * zoom - w / 2 - margin
max_y_offset = cy * zoom - h / 2 - margin
# Calculate and clamp offsets to prevent out-of-bounds issues
try:
if abs(kep[2]) > 1e-6:
x_offset = np.clip((kep[0] / kep[2] - cx) * zoom, -max_x_offset, max_x_offset)
y_offset = np.clip((kep[1] / kep[2] - cy) * zoom, -max_y_offset, max_y_offset)
else:
x_offset, y_offset = 0, 0
except (ZeroDivisionError, OverflowError):
x_offset, y_offset = 0, 0
# Update cache values
self._last_calib_time = calib_time
self._last_rect_dims = current_dims
self._cached_matrix = np.array([
[zoom * 2 * cx / w, 0, -x_offset / w * 2],
[0, zoom * 2 * cy / h, -y_offset / h * 2],
[0, 0, 1.0]
])
video_transform = np.array([
[zoom, 0.0, (w / 2 + x - x_offset) - (cx * zoom)],
[0.0, zoom, (h / 2 + y - y_offset) - (cy * zoom)],
[0.0, 0.0, 1.0]
])
self.model_renderer.set_transform(video_transform @ calib_transform)
return self._cached_matrix
if __name__ == "__main__":
gui_app.init_window("OnRoad Camera View")
road_camera_view = AugmentedRoadView(VisionStreamType.VISION_STREAM_ROAD)
print("***press space to switch camera view***")
try:
for _ in gui_app.render():
ui_state.update()
if rl.is_key_released(rl.KeyboardKey.KEY_SPACE):
is_wide = road_camera_view.stream_type == VisionStreamType.VISION_STREAM_WIDE_ROAD
road_camera_view.switch_stream(VisionStreamType.VISION_STREAM_ROAD if is_wide else VisionStreamType.VISION_STREAM_WIDE_ROAD)
road_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
finally:
road_camera_view.close()