system/ui: add specialized camera views with custom transformations (#35346)
* add specialized camera views with custom transformations for driver and road * improve * return np array * cached matrixpull/35349/head
parent
193df11a1c
commit
3d3e9599d8
3 changed files with 207 additions and 5 deletions
@ -0,0 +1,127 @@ |
||||
import numpy as np |
||||
import pyray as rl |
||||
|
||||
from cereal import messaging, log |
||||
from msgq.visionipc import VisionStreamType |
||||
from openpilot.system.ui.widgets.cameraview import CameraView |
||||
from openpilot.system.ui.lib.application import gui_app |
||||
from openpilot.common.transformations.camera import DEVICE_CAMERAS, DeviceCameraConfig, view_frame_from_device_frame |
||||
from openpilot.common.transformations.orientation import rot_from_euler |
||||
|
||||
|
||||
CALIBRATED = log.LiveCalibrationData.Status.calibrated |
||||
DEFAULT_DEVICE_CAMERA = DEVICE_CAMERAS["tici", "ar0231"] |
||||
|
||||
|
||||
class AugmentedRoadView(CameraView): |
||||
def __init__(self, sm: messaging.SubMaster, stream_type: VisionStreamType): |
||||
super().__init__("camerad", stream_type) |
||||
|
||||
self.sm = sm |
||||
self.stream_type = stream_type |
||||
self.is_wide_camera = stream_type == VisionStreamType.VISION_STREAM_WIDE_ROAD |
||||
|
||||
self.device_camera: DeviceCameraConfig | None = None |
||||
self.view_from_calib = view_frame_from_device_frame.copy() |
||||
self.view_from_wide_calib = view_frame_from_device_frame.copy() |
||||
|
||||
self._last_calib_time: float = 0 |
||||
self._last_recect_dims = (0.0, 0.0) |
||||
self._cacheed_matrix: np.ndarray | None = None |
||||
|
||||
def render(self, rect): |
||||
# Update calibration before rendering |
||||
self._update_calibration() |
||||
|
||||
# Render the base camera view |
||||
super().render(rect) |
||||
|
||||
# TODO: Add road visualization overlays like: |
||||
# - Lane lines and road edges |
||||
# - Path prediction |
||||
# - Lead vehicle indicators |
||||
# - Additional features |
||||
|
||||
def _update_calibration(self): |
||||
# Update device camera if not already set |
||||
if not self.device_camera and sm.seen['roadCameraState'] and sm.seen['deviceState']: |
||||
self.device_camera = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))] |
||||
|
||||
# Check if live calibration data is available and valid |
||||
if not (sm.updated["liveCalibration"] and sm.valid['liveCalibration']): |
||||
return |
||||
|
||||
calib = self.sm['liveCalibration'] |
||||
if len(calib.rpyCalib) != 3 or calib.calStatus != CALIBRATED: |
||||
return |
||||
|
||||
# Update view_from_calib matrix |
||||
device_from_calib = rot_from_euler(calib.rpyCalib) |
||||
self.view_from_calib = view_frame_from_device_frame @ device_from_calib |
||||
|
||||
# Update wide calibration if available |
||||
if hasattr(calib, 'wideFromDeviceEuler') and len(calib.wideFromDeviceEuler) == 3: |
||||
wide_from_device = rot_from_euler(calib.wideFromDeviceEuler) |
||||
self.view_from_wide_calib = view_frame_from_device_frame @ wide_from_device @ device_from_calib |
||||
|
||||
def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray: |
||||
# Check if we can use cached matrix |
||||
calib_time = self.sm.recv_frame.get('liveCalibration', 0) |
||||
current_dims = (rect.width, rect.height) |
||||
if (self._last_calib_time == calib_time and |
||||
self._last_recect_dims == current_dims and |
||||
self._cacheed_matrix is not None): |
||||
return self._cacheed_matrix |
||||
|
||||
# Get camera configuration |
||||
device_camera = self.device_camera or DEFAULT_DEVICE_CAMERA |
||||
intrinsic = device_camera.ecam.intrinsics if self.is_wide_camera else device_camera.fcam.intrinsics |
||||
calibration = self.view_from_wide_calib if self.is_wide_camera else self.view_from_calib |
||||
zoom = 2.0 if self.is_wide_camera else 1.1 |
||||
|
||||
# Calculate transforms for vanishing point |
||||
inf_point = np.array([1000.0, 0.0, 0.0]) |
||||
calib_transform = intrinsic @ calibration |
||||
kep = calib_transform @ inf_point |
||||
|
||||
# Calculate center points and dimensions |
||||
w, h = current_dims |
||||
cx, cy = intrinsic[0, 2], intrinsic[1, 2] |
||||
|
||||
# Calculate max allowed offsets with margins |
||||
margin = 5 |
||||
max_x_offset = cx * zoom - w / 2 - margin |
||||
max_y_offset = cy * zoom - h / 2 - margin |
||||
|
||||
# Calculate and clamp offsets to prevent out-of-bounds issues |
||||
try: |
||||
if abs(kep[2]) > 1e-6: |
||||
x_offset = np.clip((kep[0] / kep[2] - cx) * zoom, -max_x_offset, max_x_offset) |
||||
y_offset = np.clip((kep[1] / kep[2] - cy) * zoom, -max_y_offset, max_y_offset) |
||||
else: |
||||
x_offset, y_offset = 0, 0 |
||||
except (ZeroDivisionError, OverflowError): |
||||
x_offset, y_offset = 0, 0 |
||||
|
||||
# Update cache values |
||||
self._last_calib_time = calib_time |
||||
self._last_recect_dims = current_dims |
||||
self._cacheed_matrix = np.array([ |
||||
[zoom * 2 * cx / w, 0, -x_offset / w * 2], |
||||
[0, zoom * 2 * cy / h, -y_offset / h * 2], |
||||
[0, 0, 1.0] |
||||
]) |
||||
|
||||
return self._cacheed_matrix |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
gui_app.init_window("OnRoad Camera View") |
||||
sm = messaging.SubMaster(['deviceState', 'liveCalibration', 'roadCameraState']) |
||||
road_camera_view = AugmentedRoadView(sm, VisionStreamType.VISION_STREAM_ROAD) |
||||
try: |
||||
for _ in gui_app.render(): |
||||
sm.update(0) |
||||
road_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) |
||||
finally: |
||||
road_camera_view.close() |
@ -0,0 +1,46 @@ |
||||
import numpy as np |
||||
import pyray as rl |
||||
from openpilot.system.ui.widgets.cameraview import CameraView |
||||
from msgq.visionipc import VisionStreamType |
||||
from openpilot.system.ui.lib.application import gui_app |
||||
|
||||
|
||||
class DriverCameraView(CameraView): |
||||
def __init__(self, stream_type: VisionStreamType): |
||||
super().__init__("camerad", stream_type) |
||||
|
||||
def render(self, rect): |
||||
super().render(rect) |
||||
|
||||
# TODO: Add additional rendering logic |
||||
|
||||
def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray: |
||||
driver_view_ratio = 2.0 |
||||
|
||||
# Get stream dimensions |
||||
if self.frame: |
||||
stream_width = self.frame.width |
||||
stream_height = self.frame.height |
||||
else: |
||||
# Default values if frame not available |
||||
stream_width = 1928 |
||||
stream_height = 1208 |
||||
|
||||
yscale = stream_height * driver_view_ratio / stream_width |
||||
xscale = yscale * rect.height / rect.width * stream_width / stream_height |
||||
|
||||
return np.array([ |
||||
[xscale, 0.0, 0.0], |
||||
[0.0, yscale, 0.0], |
||||
[0.0, 0.0, 1.0] |
||||
]) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
gui_app.init_window("Driver Camera View") |
||||
driver_camera_view = DriverCameraView(VisionStreamType.VISION_STREAM_DRIVER) |
||||
try: |
||||
for _ in gui_app.render(): |
||||
driver_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height)) |
||||
finally: |
||||
driver_camera_view.close() |
Loading…
Reference in new issue