openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
5.3 KiB

import numpy as np
import pyray as rl
from cereal import messaging, log
from msgq.visionipc import VisionStreamType
from openpilot.system.ui.onroad.model_renderer import ModelRenderer
from openpilot.system.ui.widgets.cameraview import CameraView
from openpilot.system.ui.lib.application import gui_app
from openpilot.common.transformations.camera import DEVICE_CAMERAS, DeviceCameraConfig, view_frame_from_device_frame
from openpilot.common.transformations.orientation import rot_from_euler
CALIBRATED = log.LiveCalibrationData.Status.calibrated
DEFAULT_DEVICE_CAMERA = DEVICE_CAMERAS["tici", "ar0231"]
class AugmentedRoadView(CameraView):
def __init__(self, sm: messaging.SubMaster, stream_type: VisionStreamType):
super().__init__("camerad", stream_type)
self.sm = sm
self.stream_type = stream_type
self.is_wide_camera = stream_type == VisionStreamType.VISION_STREAM_WIDE_ROAD
self.device_camera: DeviceCameraConfig | None = None
self.view_from_calib = view_frame_from_device_frame.copy()
self.view_from_wide_calib = view_frame_from_device_frame.copy()
self._last_calib_time: float = 0
self._last_rect_dims = (0.0, 0.0)
self._cached_matrix: np.ndarray | None = None
self.model_renderer = ModelRenderer()
def render(self, rect):
# Update calibration before rendering
self._update_calibration()
# Render the base camera view
super().render(rect)
# TODO: Add road visualization overlays like:
# - Lane lines and road edges
# - Path prediction
# - Lead vehicle indicators
# - Additional features
self.model_renderer.draw(rect, self.sm)
def _update_calibration(self):
# Update device camera if not already set
if not self.device_camera and sm.seen['roadCameraState'] and sm.seen['deviceState']:
self.device_camera = DEVICE_CAMERAS[(str(sm['deviceState'].deviceType), str(sm['roadCameraState'].sensor))]
# Check if live calibration data is available and valid
if not (sm.updated["liveCalibration"] and sm.valid['liveCalibration']):
return
calib = self.sm['liveCalibration']
if len(calib.rpyCalib) != 3 or calib.calStatus != CALIBRATED:
return
# Update view_from_calib matrix
device_from_calib = rot_from_euler(calib.rpyCalib)
self.view_from_calib = view_frame_from_device_frame @ device_from_calib
# Update wide calibration if available
if hasattr(calib, 'wideFromDeviceEuler') and len(calib.wideFromDeviceEuler) == 3:
wide_from_device = rot_from_euler(calib.wideFromDeviceEuler)
self.view_from_wide_calib = view_frame_from_device_frame @ wide_from_device @ device_from_calib
def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray:
# Check if we can use cached matrix
calib_time = self.sm.recv_frame.get('liveCalibration', 0)
current_dims = (rect.width, rect.height)
if (self._last_calib_time == calib_time and
self._last_rect_dims == current_dims and
self._cached_matrix is not None):
return self._cached_matrix
# Get camera configuration
device_camera = self.device_camera or DEFAULT_DEVICE_CAMERA
intrinsic = device_camera.ecam.intrinsics if self.is_wide_camera else device_camera.fcam.intrinsics
calibration = self.view_from_wide_calib if self.is_wide_camera else self.view_from_calib
zoom = 2.0 if self.is_wide_camera else 1.1
# Calculate transforms for vanishing point
inf_point = np.array([1000.0, 0.0, 0.0])
calib_transform = intrinsic @ calibration
kep = calib_transform @ inf_point
# Calculate center points and dimensions
w, h = current_dims
cx, cy = intrinsic[0, 2], intrinsic[1, 2]
# Calculate max allowed offsets with margins
margin = 5
max_x_offset = cx * zoom - w / 2 - margin
max_y_offset = cy * zoom - h / 2 - margin
# Calculate and clamp offsets to prevent out-of-bounds issues
try:
if abs(kep[2]) > 1e-6:
x_offset = np.clip((kep[0] / kep[2] - cx) * zoom, -max_x_offset, max_x_offset)
y_offset = np.clip((kep[1] / kep[2] - cy) * zoom, -max_y_offset, max_y_offset)
else:
x_offset, y_offset = 0, 0
except (ZeroDivisionError, OverflowError):
x_offset, y_offset = 0, 0
# Update cache values
self._last_calib_time = calib_time
self._last_rect_dims = current_dims
self._cached_matrix = np.array([
[zoom * 2 * cx / w, 0, -x_offset / w * 2],
[0, zoom * 2 * cy / h, -y_offset / h * 2],
[0, 0, 1.0]
])
video_transform = np.array([
[zoom, 0.0, (w / 2 - x_offset) - (cx * zoom)],
[0.0, zoom, (h / 2 - y_offset) - (cy * zoom)],
[0.0, 0.0, 1.0]
])
self.model_renderer.set_transform(video_transform @ calib_transform)
return self._cached_matrix
if __name__ == "__main__":
gui_app.init_window("OnRoad Camera View")
sm = messaging.SubMaster(["modelV2", "controlsState", "liveCalibration", "radarState", "deviceState",
"pandaStates", "carParams", "driverMonitoringState", "carState", "driverStateV2",
"roadCameraState", "wideRoadCameraState", "managerState", "selfdriveState", "longitudinalPlan"])
road_camera_view = AugmentedRoadView(sm, VisionStreamType.VISION_STREAM_ROAD)
try:
for _ in gui_app.render():
sm.update(0)
road_camera_view.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
finally:
road_camera_view.close()