@ -1,5 +1,4 @@
#!/usr/bin/env python3
import os
import subprocess
import time
@ -8,24 +7,29 @@ from PIL import Image
from typing import List
import cereal . messaging as messaging
from cereal . visionipc . visionipc_pyx import VisionIpcClient , VisionStreamType # pylint: disable=no-name-in-module, import-error
from common . params import Params
from common . realtime import DT_MDL
from common . transformations . camera import eon_f_frame_size , eon_d_frame_size , tici_f_frame_size
from selfdrive . hardware import TICI
from selfdrive . hardware import TICI , PC
from selfdrive . controls . lib . alertmanager import set_offroad_alert
from selfdrive . manager . process_config import managed_processes
LM_THRESH = 120 # defined in selfdrive/camerad/imgproc/utils.h
VISION_STREAMS = {
" roadCameraState " : VisionStreamType . VISION_STREAM_RGB_BACK ,
" driverCameraState " : VisionStreamType . VISION_STREAM_RGB_FRONT ,
" wideRoadCameraState " : VisionStreamType . VISION_STREAM_RGB_WIDE ,
}
def jpeg_write ( fn , dat ) :
img = Image . fromarray ( dat )
img . save ( fn , " JPEG " )
def extract_image ( dat , frame_sizes ) :
img = np . frombuffer ( dat , dtype = np . uint8 )
w , h = frame_sizes [ len ( img ) / / 3 ]
def extract_image ( buf , w , h , stride ) :
img = np . hstack ( [ buf [ i * stride : i * stride + 3 * w ] for i in range ( h ) ] )
b = img [ : : 3 ] . reshape ( h , w )
g = img [ 1 : : 3 ] . reshape ( h , w )
r = img [ 2 : : 3 ] . reshape ( h , w )
@ -33,45 +37,47 @@ def extract_image(dat, frame_sizes):
def rois_in_focus ( lapres : List [ float ] ) - > float :
sz = len ( lapres )
return sum ( [ 1. / sz for sharpness in
lapres if sharpness > = LM_THRESH ] )
return sum ( [ 1. / len ( lapres ) for sharpness in lapres if sharpness > = LM_THRESH ] )
def get_snapshots ( frame = " roadCameraState " , front_frame = " driverCameraState " , focus_perc_threshold = 0. ) :
frame_sizes = [ eon_f_frame_size , eon_d_frame_size , tici_f_frame_size ]
frame_sizes = { w * h : ( w , h ) for ( w , h ) in frame_sizes }
sockets = [ ]
if frame is not None :
sockets . append ( frame )
if front_frame is not None :
sockets . append ( front_frame )
sockets = [ s for s in ( frame , front_frame ) if s is not None ]
sm = messaging . SubMaster ( sockets )
vipc_clients = { s : VisionIpcClient ( " camerad " , VISION_STREAMS [ s ] , True ) for s in sockets }
# wait 4 sec from camerad startup for focus and exposure
sm = messaging . SubMaster ( sockets )
while sm [ sockets [ 0 ] ] . frameId < int ( 4. / DT_MDL ) :
sm . update ( )
for client in vipc_clients . values ( ) :
client . connect ( True )
# wait for focus
start_t = time . monotonic ( )
while time . monotonic ( ) - start_t < 10 :
sm . update ( )
sm . update ( 100 )
if min ( sm . rcv_frame . values ( ) ) > 1 and rois_in_focus ( sm [ frame ] . sharpnessScore ) > = focus_perc_threshold :
break
rear = extract_image ( sm [ frame ] . image , frame_sizes ) if frame is not None else None
front = extract_image ( sm [ front_frame ] . image , frame_sizes ) if front_frame is not None else None
# grab images
rear , front = None , None
if frame is not None :
c = vipc_clients [ frame ]
rear = extract_image ( c . recv ( ) , c . width , c . height , c . stride )
if front_frame is not None :
c = vipc_clients [ front_frame ]
front = extract_image ( c . recv ( ) , c . width , c . height , c . stride )
return rear , front
def snapshot ( ) :
params = Params ( )
front_camera_allowed = params . get_bool ( " RecordFront " )
if ( not params . get_bool ( " IsOffroad " ) ) or params . get_bool ( " IsTakingSnapshot " ) :
print ( " Already taking snapshot " )
return None , None
front_camera_allowed = params . get_bool ( " RecordFront " )
params . put_bool ( " IsTakingSnapshot " , True )
set_offroad_alert ( " Offroad_IsTakingSnapshot " , True )
time . sleep ( 2.0 ) # Give thermald time to read the param, or if just started give camerad time to start
@ -86,13 +92,11 @@ def snapshot():
except subprocess . CalledProcessError :
pass
os . environ [ " SEND_ROAD " ] = " 1 "
os . environ [ " SEND_WIDE_ROAD " ] = " 1 "
if front_camera_allowed :
os . environ [ " SEND_DRIVER " ] = " 1 "
try :
managed_processes [ ' camerad ' ] . start ( )
# Allow testing on replay on PC
if not PC :
managed_processes [ ' camerad ' ] . start ( )
frame = " wideRoadCameraState " if TICI else " roadCameraState "
front_frame = " driverCameraState " if front_camera_allowed else None
focus_perc_threshold = 0. if TICI else 10 / 12.