From 3d3e9599d804149e0f3036cb46ec1b74fbc0c04c Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Tue, 27 May 2025 01:42:50 +0800 Subject: [PATCH] system/ui: add specialized camera views with custom transformations (#35346) * add specialized camera views with custom transformations for driver and road * improve * return np array * cached matrix --- system/ui/onroad/augmented_road_view.py | 127 ++++++++++++++++++++++++ system/ui/widgets/cameraview.py | 39 +++++++- system/ui/widgets/driver_camera_view.py | 46 +++++++++ 3 files changed, 207 insertions(+), 5 deletions(-) create mode 100644 system/ui/onroad/augmented_road_view.py create mode 100644 system/ui/widgets/driver_camera_view.py diff --git a/system/ui/onroad/augmented_road_view.py b/system/ui/onroad/augmented_road_view.py new file mode 100644 index 0000000000..58a37460d5 --- /dev/null +++ b/system/ui/onroad/augmented_road_view.py @@ -0,0 +1,127 @@ +import numpy as np +import pyray as rl + +from cereal import messaging, log +from msgq.visionipc import VisionStreamType +from openpilot.system.ui.widgets.cameraview import CameraView +from openpilot.system.ui.lib.application import gui_app +from openpilot.common.transformations.camera import DEVICE_CAMERAS, DeviceCameraConfig, view_frame_from_device_frame +from openpilot.common.transformations.orientation import rot_from_euler + + +CALIBRATED = log.LiveCalibrationData.Status.calibrated +DEFAULT_DEVICE_CAMERA = DEVICE_CAMERAS["tici", "ar0231"] + + +class AugmentedRoadView(CameraView): + def __init__(self, sm: messaging.SubMaster, stream_type: VisionStreamType): + super().__init__("camerad", stream_type) + + self.sm = sm + self.stream_type = stream_type + self.is_wide_camera = stream_type == VisionStreamType.VISION_STREAM_WIDE_ROAD + + self.device_camera: DeviceCameraConfig | None = None + self.view_from_calib = view_frame_from_device_frame.copy() + self.view_from_wide_calib = view_frame_from_device_frame.copy() + + self._last_calib_time: float = 0 + self._last_recect_dims = (0.0, 0.0) + self._cacheed_matrix: np.ndarray | None = None + + def render(self, rect): + # Update calibration before rendering + self._update_calibration() + + # Render the base camera view + super().render(rect) + + # TODO: Add road visualization overlays like: + # - Lane lines and road edges + # - Path prediction + # - Lead vehicle indicators + # - Additional features + + def _update_calibration(self): + # Update device camera if not already set + if not self.device_camera and sm.seen['roadCameraState'] and sm.seen['deviceState']: + self.device_camera = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))] + + # Check if live calibration data is available and valid + if not (sm.updated["liveCalibration"] and sm.valid['liveCalibration']): + return + + calib = self.sm['liveCalibration'] + if len(calib.rpyCalib) != 3 or calib.calStatus != CALIBRATED: + return + + # Update view_from_calib matrix + device_from_calib = rot_from_euler(calib.rpyCalib) + self.view_from_calib = view_frame_from_device_frame @ device_from_calib + + # Update wide calibration if available + if hasattr(calib, 'wideFromDeviceEuler') and len(calib.wideFromDeviceEuler) == 3: + wide_from_device = rot_from_euler(calib.wideFromDeviceEuler) + self.view_from_wide_calib = view_frame_from_device_frame @ wide_from_device @ device_from_calib + + def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray: + # Check if we can use cached matrix + calib_time = self.sm.recv_frame.get('liveCalibration', 0) + current_dims = (rect.width, rect.height) + if (self._last_calib_time == calib_time and + self._last_recect_dims == current_dims and + self._cacheed_matrix is not None): + return self._cacheed_matrix + + # Get camera configuration + device_camera = self.device_camera or DEFAULT_DEVICE_CAMERA + intrinsic = device_camera.ecam.intrinsics if self.is_wide_camera else device_camera.fcam.intrinsics + calibration = self.view_from_wide_calib if self.is_wide_camera else self.view_from_calib + zoom = 2.0 if self.is_wide_camera else 1.1 + + # Calculate transforms for vanishing point + inf_point = np.array([1000.0, 0.0, 0.0]) + calib_transform = intrinsic @ calibration + kep = calib_transform @ inf_point + + # Calculate center points and dimensions + w, h = current_dims + cx, cy = intrinsic[0, 2], intrinsic[1, 2] + + # Calculate max allowed offsets with margins + margin = 5 + max_x_offset = cx * zoom - w / 2 - margin + max_y_offset = cy * zoom - h / 2 - margin + + # Calculate and clamp offsets to prevent out-of-bounds issues + try: + if abs(kep[2]) > 1e-6: + x_offset = np.clip((kep[0] / kep[2] - cx) * zoom, -max_x_offset, max_x_offset) + y_offset = np.clip((kep[1] / kep[2] - cy) * zoom, -max_y_offset, max_y_offset) + else: + x_offset, y_offset = 0, 0 + except (ZeroDivisionError, OverflowError): + x_offset, y_offset = 0, 0 + + # Update cache values + self._last_calib_time = calib_time + self._last_recect_dims = current_dims + self._cacheed_matrix = np.array([ + [zoom * 2 * cx / w, 0, -x_offset / w * 2], + [0, zoom * 2 * cy / h, -y_offset / h * 2], + [0, 0, 1.0] + ]) + + return self._cacheed_matrix + + +if __name__ == "__main__": + gui_app.init_window("OnRoad Camera View") + sm = messaging.SubMaster(['deviceState', 'liveCalibration', 'roadCameraState']) + road_camera_view = AugmentedRoadView(sm, VisionStreamType.VISION_STREAM_ROAD) + try: + for _ in gui_app.render(): + sm.update(0) + road_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) + finally: + road_camera_view.close() diff --git a/system/ui/widgets/cameraview.py b/system/ui/widgets/cameraview.py index fe53507cef..bff13a7910 100644 --- a/system/ui/widgets/cameraview.py +++ b/system/ui/widgets/cameraview.py @@ -1,4 +1,6 @@ +import numpy as np import pyray as rl + from openpilot.system.hardware import TICI from msgq.visionipc import VisionIpcClient, VisionStreamType, VisionBuf from openpilot.system.ui.lib.application import gui_app @@ -89,6 +91,24 @@ class CameraView: if self.shader and self.shader.id: rl.unload_shader(self.shader) + def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray: + if not self.frame: + return np.eye(3) + + # Calculate aspect ratios + widget_aspect_ratio = rect.width / rect.height + frame_aspect_ratio = self.frame.width / self.frame.height + + # Calculate scaling factors to maintain aspect ratio + zx = min(frame_aspect_ratio / widget_aspect_ratio, 1.0) + zy = min(widget_aspect_ratio / frame_aspect_ratio, 1.0) + + return np.array([ + [zx, 0.0, 0.0], + [0.0, zy, 0.0], + [0.0, 0.0, 1.0] + ]) + def render(self, rect: rl.Rectangle): if not self._ensure_connection(): return @@ -102,12 +122,21 @@ class CameraView: if not self.frame: return - # Calculate scaling to maintain aspect ratio - scale = min(rect.width / self.frame.width, rect.height / self.frame.height) - x_offset = rect.x + (rect.width - (self.frame.width * scale)) / 2 - y_offset = rect.y + (rect.height - (self.frame.height * scale)) / 2 + transform = self._calc_frame_matrix(rect) src_rect = rl.Rectangle(0, 0, float(self.frame.width), float(self.frame.height)) - dst_rect = rl.Rectangle(x_offset, y_offset, self.frame.width * scale, self.frame.height * scale) + + # Calculate scale + scale_x = rect.width * transform[0, 0] # zx + scale_y = rect.height * transform[1, 1] # zy + + # Calculate base position (centered) + x_offset = rect.x + (rect.width - scale_x) / 2 + y_offset = rect.y + (rect.height - scale_y) / 2 + + x_offset += transform[0, 2] * rect.width / 2 + y_offset += transform[1, 2] * rect.height / 2 + + dst_rect = rl.Rectangle(x_offset, y_offset, scale_x, scale_y) # Render with appropriate method if TICI: diff --git a/system/ui/widgets/driver_camera_view.py b/system/ui/widgets/driver_camera_view.py new file mode 100644 index 0000000000..174fac378d --- /dev/null +++ b/system/ui/widgets/driver_camera_view.py @@ -0,0 +1,46 @@ +import numpy as np +import pyray as rl +from openpilot.system.ui.widgets.cameraview import CameraView +from msgq.visionipc import VisionStreamType +from openpilot.system.ui.lib.application import gui_app + + +class DriverCameraView(CameraView): + def __init__(self, stream_type: VisionStreamType): + super().__init__("camerad", stream_type) + + def render(self, rect): + super().render(rect) + + # TODO: Add additional rendering logic + + def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray: + driver_view_ratio = 2.0 + + # Get stream dimensions + if self.frame: + stream_width = self.frame.width + stream_height = self.frame.height + else: + # Default values if frame not available + stream_width = 1928 + stream_height = 1208 + + yscale = stream_height * driver_view_ratio / stream_width + xscale = yscale * rect.height / rect.width * stream_width / stream_height + + return np.array([ + [xscale, 0.0, 0.0], + [0.0, yscale, 0.0], + [0.0, 0.0, 1.0] + ]) + + +if __name__ == "__main__": + gui_app.init_window("Driver Camera View") + driver_camera_view = DriverCameraView(VisionStreamType.VISION_STREAM_DRIVER) + try: + for _ in gui_app.render(): + driver_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) + finally: + driver_camera_view.close()