You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							249 lines
						
					
					
						
							9.7 KiB
						
					
					
				
			
		
		
	
	
							249 lines
						
					
					
						
							9.7 KiB
						
					
					
				| import time
 | |
| import numpy as np
 | |
| import pyray as rl
 | |
| from cereal import log, messaging
 | |
| from msgq.visionipc import VisionStreamType
 | |
| from openpilot.selfdrive.ui.ui_state import ui_state, UIStatus, UI_BORDER_SIZE
 | |
| from openpilot.selfdrive.ui.onroad.alert_renderer import AlertRenderer
 | |
| from openpilot.selfdrive.ui.onroad.driver_state import DriverStateRenderer
 | |
| from openpilot.selfdrive.ui.onroad.hud_renderer import HudRenderer
 | |
| from openpilot.selfdrive.ui.onroad.model_renderer import ModelRenderer
 | |
| from openpilot.selfdrive.ui.onroad.cameraview import CameraView
 | |
| from openpilot.system.ui.lib.application import gui_app
 | |
| from openpilot.common.transformations.camera import DEVICE_CAMERAS, DeviceCameraConfig, view_frame_from_device_frame
 | |
| from openpilot.common.transformations.orientation import rot_from_euler
 | |
| 
 | |
| OpState = log.SelfdriveState.OpenpilotState
 | |
| CALIBRATED = log.LiveCalibrationData.Status.calibrated
 | |
| ROAD_CAM = VisionStreamType.VISION_STREAM_ROAD
 | |
| WIDE_CAM = VisionStreamType.VISION_STREAM_WIDE_ROAD
 | |
| DEFAULT_DEVICE_CAMERA = DEVICE_CAMERAS["tici", "ar0231"]
 | |
| 
 | |
| BORDER_COLORS = {
 | |
|   UIStatus.DISENGAGED: rl.Color(0x12, 0x28, 0x39, 0xFF),  # Blue for disengaged state
 | |
|   UIStatus.OVERRIDE: rl.Color(0x89, 0x92, 0x8D, 0xFF),  # Gray for override state
 | |
|   UIStatus.ENGAGED: rl.Color(0x16, 0x7F, 0x40, 0xFF),  # Green for engaged state
 | |
| }
 | |
| 
 | |
| WIDE_CAM_MAX_SPEED = 10.0  # m/s (22 mph)
 | |
| ROAD_CAM_MIN_SPEED = 15.0  # m/s (34 mph)
 | |
| 
 | |
| 
 | |
| class AugmentedRoadView(CameraView):
 | |
|   def __init__(self, stream_type: VisionStreamType = VisionStreamType.VISION_STREAM_ROAD):
 | |
|     super().__init__("camerad", stream_type)
 | |
|     self._set_placeholder_color(BORDER_COLORS[UIStatus.DISENGAGED])
 | |
| 
 | |
|     self.device_camera: DeviceCameraConfig | None = None
 | |
|     self.view_from_calib = view_frame_from_device_frame.copy()
 | |
|     self.view_from_wide_calib = view_frame_from_device_frame.copy()
 | |
| 
 | |
|     self._last_calib_time: float = 0
 | |
|     self._last_rect_dims = (0.0, 0.0)
 | |
|     self._last_stream_type = stream_type
 | |
|     self._cached_matrix: np.ndarray | None = None
 | |
|     self._content_rect = rl.Rectangle()
 | |
| 
 | |
|     self.model_renderer = ModelRenderer()
 | |
|     self._hud_renderer = HudRenderer()
 | |
|     self.alert_renderer = AlertRenderer()
 | |
|     self.driver_state_renderer = DriverStateRenderer()
 | |
| 
 | |
|     # debug
 | |
|     self._pm = messaging.PubMaster(['uiDebug'])
 | |
| 
 | |
|   def _render(self, rect):
 | |
|     # Only render when system is started to avoid invalid data access
 | |
|     start_draw = time.monotonic()
 | |
|     if not ui_state.started:
 | |
|       return
 | |
| 
 | |
|     self._switch_stream_if_needed(ui_state.sm)
 | |
| 
 | |
|     # Update calibration before rendering
 | |
|     self._update_calibration()
 | |
| 
 | |
|     # Create inner content area with border padding
 | |
|     self._content_rect = rl.Rectangle(
 | |
|       rect.x + UI_BORDER_SIZE,
 | |
|       rect.y + UI_BORDER_SIZE,
 | |
|       rect.width - 2 * UI_BORDER_SIZE,
 | |
|       rect.height - 2 * UI_BORDER_SIZE,
 | |
|     )
 | |
| 
 | |
|     # Enable scissor mode to clip all rendering within content rectangle boundaries
 | |
|     # This creates a rendering viewport that prevents graphics from drawing outside the border
 | |
|     rl.begin_scissor_mode(
 | |
|       int(self._content_rect.x),
 | |
|       int(self._content_rect.y),
 | |
|       int(self._content_rect.width),
 | |
|       int(self._content_rect.height)
 | |
|     )
 | |
| 
 | |
|     # Render the base camera view
 | |
|     super()._render(rect)
 | |
| 
 | |
|     # Draw all UI overlays
 | |
|     self.model_renderer.render(self._content_rect)
 | |
|     self._hud_renderer.render(self._content_rect)
 | |
|     self.alert_renderer.render(self._content_rect)
 | |
|     self.driver_state_renderer.render(self._content_rect)
 | |
| 
 | |
|     # Custom UI extension point - add custom overlays here
 | |
|     # Use self._content_rect for positioning within camera bounds
 | |
| 
 | |
|     # End clipping region
 | |
|     rl.end_scissor_mode()
 | |
| 
 | |
|     # Draw colored border based on driving state
 | |
|     self._draw_border(rect)
 | |
| 
 | |
|     # publish uiDebug
 | |
|     msg = messaging.new_message('uiDebug')
 | |
|     msg.uiDebug.drawTimeMillis = (time.monotonic() - start_draw) * 1000
 | |
|     self._pm.send('uiDebug', msg)
 | |
| 
 | |
|   def _handle_mouse_press(self, _):
 | |
|     if not self._hud_renderer.user_interacting() and self._click_callback is not None:
 | |
|       self._click_callback()
 | |
| 
 | |
|   def _handle_mouse_release(self, _):
 | |
|     # We only call click callback on press if not interacting with HUD
 | |
|     pass
 | |
| 
 | |
|   def _draw_border(self, rect: rl.Rectangle):
 | |
|     rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height))
 | |
|     border_roundness = 0.12
 | |
|     border_color = BORDER_COLORS.get(ui_state.status, BORDER_COLORS[UIStatus.DISENGAGED])
 | |
|     border_rect = rl.Rectangle(rect.x + UI_BORDER_SIZE, rect.y + UI_BORDER_SIZE,
 | |
|                                rect.width - 2 * UI_BORDER_SIZE, rect.height - 2 * UI_BORDER_SIZE)
 | |
|     rl.draw_rectangle_rounded_lines_ex(border_rect, border_roundness, 10, UI_BORDER_SIZE, border_color)
 | |
| 
 | |
|     # black bg around colored border
 | |
|     black_bg_thickness = UI_BORDER_SIZE
 | |
|     black_bg_rect = rl.Rectangle(
 | |
|       border_rect.x - UI_BORDER_SIZE,
 | |
|       border_rect.y - UI_BORDER_SIZE,
 | |
|       border_rect.width + 2 * UI_BORDER_SIZE,
 | |
|       border_rect.height + 2 * UI_BORDER_SIZE,
 | |
|     )
 | |
|     edge_offset = (black_bg_rect.height - border_rect.height) / 2  # distance between rect edges
 | |
|     roundness_out = (border_roundness * border_rect.height + 2 * edge_offset) / max(1.0, black_bg_rect.height)
 | |
|     rl.draw_rectangle_rounded_lines_ex(black_bg_rect, roundness_out, 10, black_bg_thickness, rl.BLACK)
 | |
|     rl.end_scissor_mode()
 | |
| 
 | |
|   def _switch_stream_if_needed(self, sm):
 | |
|     if sm['selfdriveState'].experimentalMode and WIDE_CAM in self.available_streams:
 | |
|       v_ego = sm['carState'].vEgo
 | |
|       if v_ego < WIDE_CAM_MAX_SPEED:
 | |
|         target = WIDE_CAM
 | |
|       elif v_ego > ROAD_CAM_MIN_SPEED:
 | |
|         target = ROAD_CAM
 | |
|       else:
 | |
|         # Hysteresis zone - keep current stream
 | |
|         target = self.stream_type
 | |
|     else:
 | |
|       target = ROAD_CAM
 | |
| 
 | |
|     if self.stream_type != target:
 | |
|       self.switch_stream(target)
 | |
| 
 | |
|   def _update_calibration(self):
 | |
|     # Update device camera if not already set
 | |
|     sm = ui_state.sm
 | |
|     if not self.device_camera and sm.seen['roadCameraState'] and sm.seen['deviceState']:
 | |
|       self.device_camera = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))]
 | |
| 
 | |
|     # Check if live calibration data is available and valid
 | |
|     if not (sm.updated["liveCalibration"] and sm.valid['liveCalibration']):
 | |
|       return
 | |
| 
 | |
|     calib = sm['liveCalibration']
 | |
|     if len(calib.rpyCalib) != 3 or calib.calStatus != CALIBRATED:
 | |
|       return
 | |
| 
 | |
|     # Update view_from_calib matrix
 | |
|     device_from_calib = rot_from_euler(calib.rpyCalib)
 | |
|     self.view_from_calib = view_frame_from_device_frame @ device_from_calib
 | |
| 
 | |
|     # Update wide calibration if available
 | |
|     if hasattr(calib, 'wideFromDeviceEuler') and len(calib.wideFromDeviceEuler) == 3:
 | |
|       wide_from_device = rot_from_euler(calib.wideFromDeviceEuler)
 | |
|       self.view_from_wide_calib = view_frame_from_device_frame @ wide_from_device @ device_from_calib
 | |
| 
 | |
|   def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray:
 | |
|     # Check if we can use cached matrix
 | |
|     calib_time = ui_state.sm.recv_frame['liveCalibration']
 | |
|     current_dims = (self._content_rect.width, self._content_rect.height)
 | |
|     if (self._last_calib_time == calib_time and
 | |
|         self._last_rect_dims == current_dims and
 | |
|         self._last_stream_type == self.stream_type and
 | |
|         self._cached_matrix is not None):
 | |
|       return self._cached_matrix
 | |
| 
 | |
|     # Get camera configuration
 | |
|     device_camera = self.device_camera or DEFAULT_DEVICE_CAMERA
 | |
|     is_wide_camera = self.stream_type == WIDE_CAM
 | |
|     intrinsic = device_camera.ecam.intrinsics if is_wide_camera else device_camera.fcam.intrinsics
 | |
|     calibration = self.view_from_wide_calib if is_wide_camera else self.view_from_calib
 | |
|     zoom = 2.0 if is_wide_camera else 1.1
 | |
| 
 | |
|     # Calculate transforms for vanishing point
 | |
|     inf_point = np.array([1000.0, 0.0, 0.0])
 | |
|     calib_transform = intrinsic @ calibration
 | |
|     kep = calib_transform @ inf_point
 | |
| 
 | |
|     # Calculate center points and dimensions
 | |
|     x, y = self._content_rect.x, self._content_rect.y
 | |
|     w, h = self._content_rect.width, self._content_rect.height
 | |
|     cx, cy = intrinsic[0, 2], intrinsic[1, 2]
 | |
| 
 | |
|     # Calculate max allowed offsets with margins
 | |
|     margin = 5
 | |
|     max_x_offset = cx * zoom - w / 2 - margin
 | |
|     max_y_offset = cy * zoom - h / 2 - margin
 | |
| 
 | |
|     # Calculate and clamp offsets to prevent out-of-bounds issues
 | |
|     try:
 | |
|       if abs(kep[2]) > 1e-6:
 | |
|         x_offset = np.clip((kep[0] / kep[2] - cx) * zoom, -max_x_offset, max_x_offset)
 | |
|         y_offset = np.clip((kep[1] / kep[2] - cy) * zoom, -max_y_offset, max_y_offset)
 | |
|       else:
 | |
|         x_offset, y_offset = 0, 0
 | |
|     except (ZeroDivisionError, OverflowError):
 | |
|       x_offset, y_offset = 0, 0
 | |
| 
 | |
|     # Cache the computed transformation matrix to avoid recalculations
 | |
|     self._last_calib_time = calib_time
 | |
|     self._last_rect_dims = current_dims
 | |
|     self._last_stream_type = self.stream_type
 | |
|     self._cached_matrix = np.array([
 | |
|       [zoom * 2 * cx / w, 0, -x_offset / w * 2],
 | |
|       [0, zoom * 2 * cy / h, -y_offset / h * 2],
 | |
|       [0, 0, 1.0]
 | |
|     ])
 | |
| 
 | |
|     video_transform = np.array([
 | |
|       [zoom, 0.0, (w / 2 + x - x_offset) - (cx * zoom)],
 | |
|       [0.0, zoom, (h / 2 + y - y_offset) - (cy * zoom)],
 | |
|       [0.0, 0.0, 1.0]
 | |
|     ])
 | |
|     self.model_renderer.set_transform(video_transform @ calib_transform)
 | |
| 
 | |
|     return self._cached_matrix
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   gui_app.init_window("OnRoad Camera View")
 | |
|   road_camera_view = AugmentedRoadView(ROAD_CAM)
 | |
|   print("***press space to switch camera view***")
 | |
|   try:
 | |
|     for _ in gui_app.render():
 | |
|       ui_state.update()
 | |
|       if rl.is_key_released(rl.KeyboardKey.KEY_SPACE):
 | |
|         if WIDE_CAM in road_camera_view.available_streams:
 | |
|           stream = ROAD_CAM if road_camera_view.stream_type == WIDE_CAM else WIDE_CAM
 | |
|           road_camera_view.switch_stream(stream)
 | |
|       road_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
 | |
|   finally:
 | |
|     road_camera_view.close()
 | |
| 
 |