ui: implement smooth camera stream switching with seamless transitions (#35449)

implement smooth camera stream switching with seamless transitions

Co-authored-by: Shane Smiskol <shane@smiskol.com>
pull/35465/head
Dean Lee 5 months ago committed by GitHub
parent 8b516abb31
commit 462301d2e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 14
      selfdrive/ui/onroad/augmented_road_view.py
  2. 81
      selfdrive/ui/onroad/cameraview.py

@ -16,6 +16,8 @@ from openpilot.common.transformations.orientation import rot_from_euler
OpState = log.SelfdriveState.OpenpilotState OpState = log.SelfdriveState.OpenpilotState
CALIBRATED = log.LiveCalibrationData.Status.calibrated CALIBRATED = log.LiveCalibrationData.Status.calibrated
ROAD_CAM = VisionStreamType.VISION_STREAM_ROAD
WIDE_CAM = VisionStreamType.VISION_STREAM_WIDE_ROAD
DEFAULT_DEVICE_CAMERA = DEVICE_CAMERAS["tici", "ar0231"] DEFAULT_DEVICE_CAMERA = DEVICE_CAMERAS["tici", "ar0231"]
BORDER_COLORS = { BORDER_COLORS = {
@ -36,6 +38,7 @@ class AugmentedRoadView(CameraView):
self._last_calib_time: float = 0 self._last_calib_time: float = 0
self._last_rect_dims = (0.0, 0.0) self._last_rect_dims = (0.0, 0.0)
self._last_stream_type = stream_type
self._cached_matrix: np.ndarray | None = None self._cached_matrix: np.ndarray | None = None
self._content_rect = rl.Rectangle() self._content_rect = rl.Rectangle()
@ -120,6 +123,7 @@ class AugmentedRoadView(CameraView):
current_dims = (self._content_rect.width, self._content_rect.height) current_dims = (self._content_rect.width, self._content_rect.height)
if (self._last_calib_time == calib_time and if (self._last_calib_time == calib_time and
self._last_rect_dims == current_dims and self._last_rect_dims == current_dims and
self._last_stream_type == self.stream_type and
self._cached_matrix is not None): self._cached_matrix is not None):
return self._cached_matrix return self._cached_matrix
@ -155,9 +159,10 @@ class AugmentedRoadView(CameraView):
except (ZeroDivisionError, OverflowError): except (ZeroDivisionError, OverflowError):
x_offset, y_offset = 0, 0 x_offset, y_offset = 0, 0
# Update cache values # Cache the computed transformation matrix to avoid recalculations
self._last_calib_time = calib_time self._last_calib_time = calib_time
self._last_rect_dims = current_dims self._last_rect_dims = current_dims
self._last_stream_type = self.stream_type
self._cached_matrix = np.array([ self._cached_matrix = np.array([
[zoom * 2 * cx / w, 0, -x_offset / w * 2], [zoom * 2 * cx / w, 0, -x_offset / w * 2],
[0, zoom * 2 * cy / h, -y_offset / h * 2], [0, zoom * 2 * cy / h, -y_offset / h * 2],
@ -176,14 +181,15 @@ class AugmentedRoadView(CameraView):
if __name__ == "__main__": if __name__ == "__main__":
gui_app.init_window("OnRoad Camera View") gui_app.init_window("OnRoad Camera View")
road_camera_view = AugmentedRoadView(VisionStreamType.VISION_STREAM_ROAD) road_camera_view = AugmentedRoadView(ROAD_CAM)
print("***press space to switch camera view***") print("***press space to switch camera view***")
try: try:
for _ in gui_app.render(): for _ in gui_app.render():
ui_state.update() ui_state.update()
if rl.is_key_released(rl.KeyboardKey.KEY_SPACE): if rl.is_key_released(rl.KeyboardKey.KEY_SPACE):
is_wide = road_camera_view.stream_type == VisionStreamType.VISION_STREAM_WIDE_ROAD if WIDE_CAM in road_camera_view.available_streams:
road_camera_view.switch_stream(VisionStreamType.VISION_STREAM_ROAD if is_wide else VisionStreamType.VISION_STREAM_WIDE_ROAD) stream = ROAD_CAM if road_camera_view.stream_type == WIDE_CAM else WIDE_CAM
road_camera_view.switch_stream(stream)
road_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) road_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
finally: finally:
road_camera_view.close() road_camera_view.close()

@ -57,9 +57,17 @@ else:
class CameraView: class CameraView:
def __init__(self, name: str, stream_type: VisionStreamType): def __init__(self, name: str, stream_type: VisionStreamType):
self.client = VisionIpcClient(name, stream_type, conflate=True)
self._name = name self._name = name
# Primary stream
self.client = VisionIpcClient(name, stream_type, conflate=True)
self._stream_type = stream_type self._stream_type = stream_type
self.available_streams: list[VisionStreamType] = []
# Target stream for switching
self._target_client: VisionIpcClient | None = None
self._target_stream_type: VisionStreamType | None = None
self._switching: bool = False
self._texture_needs_update = True self._texture_needs_update = True
self.last_connection_attempt: float = 0.0 self.last_connection_attempt: float = 0.0
@ -91,12 +99,20 @@ class CameraView:
self._placeholder_color = color self._placeholder_color = color
def switch_stream(self, stream_type: VisionStreamType) -> None: def switch_stream(self, stream_type: VisionStreamType) -> None:
if self._stream_type != stream_type: if self._stream_type == stream_type:
cloudlog.debug(f'switching stream from {self._stream_type} to {stream_type}') return
self._clear_textures()
self.frame = None if self._switching and self._target_stream_type == stream_type:
self._stream_type = stream_type return
self.client = VisionIpcClient(self._name, stream_type, conflate=True)
cloudlog.debug(f'Preparing switch from {self._stream_type} to {stream_type}')
if self._target_client:
del self._target_client
self._target_stream_type = stream_type
self._target_client = VisionIpcClient(self._name, stream_type, conflate=True)
self._switching = True
@property @property
def stream_type(self) -> VisionStreamType: def stream_type(self) -> VisionStreamType:
@ -135,6 +151,9 @@ class CameraView:
]) ])
def render(self, rect: rl.Rectangle): def render(self, rect: rl.Rectangle):
if self._switching:
self._handle_switch()
if not self._ensure_connection(): if not self._ensure_connection():
self._draw_placeholder(rect) self._draw_placeholder(rect)
return return
@ -226,6 +245,7 @@ class CameraView:
def _ensure_connection(self) -> bool: def _ensure_connection(self) -> bool:
if not self.client.is_connected(): if not self.client.is_connected():
self.frame = None self.frame = None
self.available_streams.clear()
# Throttle connection attempts # Throttle connection attempts
current_time = rl.get_time() current_time = rl.get_time()
@ -237,16 +257,57 @@ class CameraView:
return False return False
cloudlog.debug(f"Connected to {self._name} stream: {self._stream_type}, buffers: {self.client.num_buffers}") cloudlog.debug(f"Connected to {self._name} stream: {self._stream_type}, buffers: {self.client.num_buffers}")
self._clear_textures() self._initialize_textures()
self.available_streams = self.client.available_streams(self._name, block=False)
return True
def _handle_switch(self) -> None:
"""Check if target stream is ready and switch immediately."""
if not self._target_client or not self._switching:
return
# Try to connect target if needed
if not self._target_client.is_connected():
if not self._target_client.connect(False) or not self._target_client.num_buffers:
return
cloudlog.debug(f"Target stream connected: {self._target_stream_type}")
# Check if target has frames ready
target_frame = self._target_client.recv(timeout_ms=0)
if target_frame:
self.frame = target_frame # Update current frame to target frame
self._complete_switch()
def _complete_switch(self) -> None:
"""Instantly switch to target stream."""
cloudlog.debug(f"Switching to {self._target_stream_type}")
# Clean up current resources
if self.client:
del self.client
# Switch to target
self.client = self._target_client
self._stream_type = self._target_stream_type
self._texture_needs_update = True
# Reset state
self._target_client = None
self._target_stream_type = None
self._switching = False
# Initialize textures for new stream
self._initialize_textures()
def _initialize_textures(self):
self._clear_textures()
if not TICI: if not TICI:
self.texture_y = rl.load_texture_from_image(rl.Image(None, int(self.client.stride), self.texture_y = rl.load_texture_from_image(rl.Image(None, int(self.client.stride),
int(self.client.height), 1, rl.PixelFormat.PIXELFORMAT_UNCOMPRESSED_GRAYSCALE)) int(self.client.height), 1, rl.PixelFormat.PIXELFORMAT_UNCOMPRESSED_GRAYSCALE))
self.texture_uv = rl.load_texture_from_image(rl.Image(None, int(self.client.stride // 2), self.texture_uv = rl.load_texture_from_image(rl.Image(None, int(self.client.stride // 2),
int(self.client.height // 2), 1, rl.PixelFormat.PIXELFORMAT_UNCOMPRESSED_GRAY_ALPHA)) int(self.client.height // 2), 1, rl.PixelFormat.PIXELFORMAT_UNCOMPRESSED_GRAY_ALPHA))
return True
def _clear_textures(self): def _clear_textures(self):
if self.texture_y and self.texture_y.id: if self.texture_y and self.texture_y.id:
rl.unload_texture(self.texture_y) rl.unload_texture(self.texture_y)

Loading…
Cancel
Save