#!/usr/bin/env python3 import subprocess import time import numpy as np from PIL import Image from typing import List import cereal.messaging as messaging from cereal.visionipc.visionipc_pyx import VisionIpcClient, VisionStreamType # pylint: disable=no-name-in-module, import-error from common.params import Params from common.realtime import DT_MDL from selfdrive.hardware import TICI, PC from selfdrive.controls.lib.alertmanager import set_offroad_alert from selfdrive.manager.process_config import managed_processes LM_THRESH = 120 # defined in selfdrive/camerad/imgproc/utils.h VISION_STREAMS = { "roadCameraState": VisionStreamType.VISION_STREAM_RGB_BACK, "driverCameraState": VisionStreamType.VISION_STREAM_RGB_FRONT, "wideRoadCameraState": VisionStreamType.VISION_STREAM_RGB_WIDE, } def jpeg_write(fn, dat): img = Image.fromarray(dat) img.save(fn, "JPEG") def extract_image(buf, w, h, stride): img = np.hstack([buf[i * stride:i * stride + 3 * w] for i in range(h)]) b = img[::3].reshape(h, w) g = img[1::3].reshape(h, w) r = img[2::3].reshape(h, w) return np.dstack([r, g, b]) def rois_in_focus(lapres: List[float]) -> float: return sum([1. / len(lapres) for sharpness in lapres if sharpness >= LM_THRESH]) def get_snapshots(frame="roadCameraState", front_frame="driverCameraState", focus_perc_threshold=0.): sockets = [s for s in (frame, front_frame) if s is not None] sm = messaging.SubMaster(sockets) vipc_clients = {s: VisionIpcClient("camerad", VISION_STREAMS[s], True) for s in sockets} # wait 4 sec from camerad startup for focus and exposure while sm[sockets[0]].frameId < int(4. / DT_MDL): sm.update() for client in vipc_clients.values(): client.connect(True) # wait for focus start_t = time.monotonic() while time.monotonic() - start_t < 10: sm.update(100) if min(sm.rcv_frame.values()) > 1 and rois_in_focus(sm[frame].sharpnessScore) >= focus_perc_threshold: break # grab images rear, front = None, None if frame is not None: c = vipc_clients[frame] rear = extract_image(c.recv(), c.width, c.height, c.stride) if front_frame is not None: c = vipc_clients[front_frame] front = extract_image(c.recv(), c.width, c.height, c.stride) return rear, front def snapshot(): params = Params() if (not params.get_bool("IsOffroad")) or params.get_bool("IsTakingSnapshot"): print("Already taking snapshot") return None, None front_camera_allowed = params.get_bool("RecordFront") params.put_bool("IsTakingSnapshot", True) set_offroad_alert("Offroad_IsTakingSnapshot", True) time.sleep(2.0) # Give thermald time to read the param, or if just started give camerad time to start # Check if camerad is already started try: subprocess.check_call(["pgrep", "camerad"]) print("Camerad already running") params.put_bool("IsTakingSnapshot", False) params.delete("Offroad_IsTakingSnapshot") return None, None except subprocess.CalledProcessError: pass try: # Allow testing on replay on PC if not PC: managed_processes['camerad'].start() frame = "wideRoadCameraState" if TICI else "roadCameraState" front_frame = "driverCameraState" if front_camera_allowed else None focus_perc_threshold = 0. if TICI else 10 / 12. rear, front = get_snapshots(frame, front_frame, focus_perc_threshold) finally: managed_processes['camerad'].stop() params.put_bool("IsTakingSnapshot", False) set_offroad_alert("Offroad_IsTakingSnapshot", False) if not front_camera_allowed: front = None return rear, front if __name__ == "__main__": pic, fpic = snapshot() if pic is not None: print(pic.shape) jpeg_write("/tmp/back.jpg", pic) if fpic is not None: jpeg_write("/tmp/front.jpg", fpic) else: print("Error taking snapshot")