#!/usr/bin/env python3
import subprocess
import time
import numpy as np
from PIL import Image
from typing import List
import cereal . messaging as messaging
from cereal . visionipc . visionipc_pyx import VisionIpcClient , VisionStreamType # pylint: disable=no-name-in-module, import-error
from common . params import Params
from common . realtime import DT_MDL
from selfdrive . hardware import TICI , PC
from selfdrive . controls . lib . alertmanager import set_offroad_alert
from selfdrive . manager . process_config import managed_processes
LM_THRESH = 120 # defined in selfdrive/camerad/imgproc/utils.h
VISION_STREAMS = {
" roadCameraState " : VisionStreamType . VISION_STREAM_RGB_BACK ,
" driverCameraState " : VisionStreamType . VISION_STREAM_RGB_FRONT ,
" wideRoadCameraState " : VisionStreamType . VISION_STREAM_RGB_WIDE ,
}
def jpeg_write ( fn , dat ) :
img = Image . fromarray ( dat )
img . save ( fn , " JPEG " )
def extract_image ( buf , w , h , stride ) :
img = np . hstack ( [ buf [ i * stride : i * stride + 3 * w ] for i in range ( h ) ] )
b = img [ : : 3 ] . reshape ( h , w )
g = img [ 1 : : 3 ] . reshape ( h , w )
r = img [ 2 : : 3 ] . reshape ( h , w )
return np . dstack ( [ r , g , b ] )
def rois_in_focus ( lapres : List [ float ] ) - > float :
return sum ( [ 1. / len ( lapres ) for sharpness in lapres if sharpness > = LM_THRESH ] )
def get_snapshots ( frame = " roadCameraState " , front_frame = " driverCameraState " , focus_perc_threshold = 0. ) :
sockets = [ s for s in ( frame , front_frame ) if s is not None ]
sm = messaging . SubMaster ( sockets )
vipc_clients = { s : VisionIpcClient ( " camerad " , VISION_STREAMS [ s ] , True ) for s in sockets }
# wait 4 sec from camerad startup for focus and exposure
while sm [ sockets [ 0 ] ] . frameId < int ( 4. / DT_MDL ) :
sm . update ( )
for client in vipc_clients . values ( ) :
client . connect ( True )
# wait for focus
start_t = time . monotonic ( )
while time . monotonic ( ) - start_t < 10 :
sm . update ( 100 )
if min ( sm . rcv_frame . values ( ) ) > 1 and rois_in_focus ( sm [ frame ] . sharpnessScore ) > = focus_perc_threshold :
break
# grab images
rear , front = None , None
if frame is not None :
c = vipc_clients [ frame ]
rear = extract_image ( c . recv ( ) , c . width , c . height , c . stride )
if front_frame is not None :
c = vipc_clients [ front_frame ]
front = extract_image ( c . recv ( ) , c . width , c . height , c . stride )
return rear , front
def snapshot ( ) :
params = Params ( )
if ( not params . get_bool ( " IsOffroad " ) ) or params . get_bool ( " IsTakingSnapshot " ) :
print ( " Already taking snapshot " )
return None , None
front_camera_allowed = params . get_bool ( " RecordFront " )
params . put_bool ( " IsTakingSnapshot " , True )
set_offroad_alert ( " Offroad_IsTakingSnapshot " , True )
time . sleep ( 2.0 ) # Give thermald time to read the param, or if just started give camerad time to start
# Check if camerad is already started
try :
subprocess . check_call ( [ " pgrep " , " camerad " ] )
print ( " Camerad already running " )
params . put_bool ( " IsTakingSnapshot " , False )
params . delete ( " Offroad_IsTakingSnapshot " )
return None , None
except subprocess . CalledProcessError :
pass
try :
# Allow testing on replay on PC
if not PC :
managed_processes [ ' camerad ' ] . start ( )
frame = " wideRoadCameraState " if TICI else " roadCameraState "
front_frame = " driverCameraState " if front_camera_allowed else None
focus_perc_threshold = 0. if TICI else 10 / 12.
rear , front = get_snapshots ( frame , front_frame , focus_perc_threshold )
finally :
managed_processes [ ' camerad ' ] . stop ( )
params . put_bool ( " IsTakingSnapshot " , False )
set_offroad_alert ( " Offroad_IsTakingSnapshot " , False )
if not front_camera_allowed :
front = None
return rear , front
if __name__ == " __main__ " :
pic , fpic = snapshot ( )
if pic is not None :
print ( pic . shape )
jpeg_write ( " /tmp/back.jpg " , pic )
if fpic is not None :
jpeg_write ( " /tmp/front.jpg " , fpic )
else :
print ( " Error taking snapshot " )