#!/usr/bin/env python3 import os import subprocess import time import numpy as np from PIL import Image from typing import List import cereal.messaging as messaging from common.params import Params from common.realtime import DT_MDL from common.transformations.camera import eon_f_frame_size, eon_d_frame_size, tici_f_frame_size from selfdrive.hardware import TICI from selfdrive.controls.lib.alertmanager import set_offroad_alert from selfdrive.manager.process_config import managed_processes LM_THRESH = 120 # defined in selfdrive/camerad/imgproc/utils.h def jpeg_write(fn, dat): img = Image.fromarray(dat) img.save(fn, "JPEG") def extract_image(dat, frame_sizes): img = np.frombuffer(dat, dtype=np.uint8) w, h = frame_sizes[len(img) // 3] b = img[::3].reshape(h, w) g = img[1::3].reshape(h, w) r = img[2::3].reshape(h, w) return np.dstack([r, g, b]) def rois_in_focus(lapres: List[float]) -> float: sz = len(lapres) return sum([1. / sz for sharpness in lapres if sharpness >= LM_THRESH]) def get_snapshots(frame="roadCameraState", front_frame="driverCameraState", focus_perc_threshold=0.): frame_sizes = [eon_f_frame_size, eon_d_frame_size, tici_f_frame_size] frame_sizes = {w * h: (w, h) for (w, h) in frame_sizes} sockets = [] if frame is not None: sockets.append(frame) if front_frame is not None: sockets.append(front_frame) # wait 4 sec from camerad startup for focus and exposure sm = messaging.SubMaster(sockets) while sm[sockets[0]].frameId < int(4. / DT_MDL): sm.update() start_t = time.monotonic() while time.monotonic() - start_t < 10: sm.update() if min(sm.rcv_frame.values()) > 1 and rois_in_focus(sm[frame].sharpnessScore) >= focus_perc_threshold: break rear = extract_image(sm[frame].image, frame_sizes) if frame is not None else None front = extract_image(sm[front_frame].image, frame_sizes) if front_frame is not None else None return rear, front def snapshot(): params = Params() front_camera_allowed = params.get_bool("RecordFront") if (not params.get_bool("IsOffroad")) or params.get_bool("IsTakingSnapshot"): print("Already taking snapshot") return None, None params.put_bool("IsTakingSnapshot", True) set_offroad_alert("Offroad_IsTakingSnapshot", True) time.sleep(2.0) # Give thermald time to read the param, or if just started give camerad time to start # Check if camerad is already started try: subprocess.check_call(["pgrep", "camerad"]) print("Camerad already running") params.put_bool("IsTakingSnapshot", False) params.delete("Offroad_IsTakingSnapshot") return None, None except subprocess.CalledProcessError: pass os.environ["SEND_ROAD"] = "1" os.environ["SEND_WIDE_ROAD"] = "1" if front_camera_allowed: os.environ["SEND_DRIVER"] = "1" managed_processes['camerad'].start() frame = "wideRoadCameraState" if TICI else "roadCameraState" front_frame = "driverCameraState" if front_camera_allowed else None focus_perc_threshold = 0. if TICI else 10 / 12. rear, front = get_snapshots(frame, front_frame, focus_perc_threshold) managed_processes['camerad'].stop() params.put_bool("IsTakingSnapshot", False) set_offroad_alert("Offroad_IsTakingSnapshot", False) if not front_camera_allowed: front = None return rear, front if __name__ == "__main__": pic, fpic = snapshot() if pic is not None: print(pic.shape) jpeg_write("/tmp/back.jpg", pic) if fpic is not None: jpeg_write("/tmp/front.jpg", fpic) else: print("Error taking snapshot")