# include <thread>
# include <stdio.h>
# include <signal.h>
# include <poll.h>
# include <assert.h>
# include <unistd.h>
# include <sys/socket.h>
# if defined(QCOM) && !defined(QCOM_REPLAY)
# include "cameras/camera_qcom.h"
# elif QCOM2
# include "cameras/camera_qcom2.h"
# elif WEBCAM
# include "cameras/camera_webcam.h"
# else
# include "cameras/camera_frame_stream.h"
# endif
# include <libyuv.h>
# include "clutil.h"
# include "common/ipc.h"
# include "common/params.h"
# include "common/swaglog.h"
# include "common/util.h"
# include "common/visionipc.h"
# define MAX_CLIENTS 6
volatile sig_atomic_t do_exit = 0 ;
static void set_do_exit ( int sig ) {
do_exit = 1 ;
}
struct VisionState ;
struct VisionClientState {
VisionState * s ;
int fd ;
pthread_t thread_handle ;
bool running ;
} ;
struct VisionClientStreamState {
bool subscribed ;
int bufs_outstanding ;
bool tb ;
TBuffer * tbuffer ;
PoolQueue * queue ;
} ;
struct VisionState {
MultiCameraState cameras ;
pthread_mutex_t clients_lock ;
VisionClientState clients [ MAX_CLIENTS ] ;
} ;
static CameraBuf * get_camerabuf_by_type ( VisionState * s , VisionStreamType type ) {
assert ( type > = 0 & & type < VISION_STREAM_MAX ) ;
if ( type = = VISION_STREAM_RGB_BACK | | type = = VISION_STREAM_YUV ) {
return & s - > cameras . rear . buf ;
} else if ( type = = VISION_STREAM_RGB_FRONT | | type = = VISION_STREAM_YUV_FRONT ) {
return & s - > cameras . front . buf ;
}
# ifdef QCOM2
return & s - > cameras . wide . buf ;
# endif
assert ( 0 ) ;
}
// visionserver
void * visionserver_client_thread ( void * arg ) {
int err ;
auto * client = ( VisionClientState * ) arg ;
VisionState * s = client - > s ;
int fd = client - > fd ;
set_thread_name ( " clientthread " ) ;
VisionClientStreamState streams [ VISION_STREAM_MAX ] = { { false } } ;
LOGW ( " client start fd %d " , fd ) ;
while ( true ) {
struct pollfd polls [ 1 + VISION_STREAM_MAX ] = { { 0 } } ;
polls [ 0 ] . fd = fd ;
polls [ 0 ] . events = POLLIN ;
int poll_to_stream [ 1 + VISION_STREAM_MAX ] = { 0 } ;
int num_polls = 1 ;
for ( int i = 0 ; i < VISION_STREAM_MAX ; i + + ) {
if ( ! streams [ i ] . subscribed ) continue ;
polls [ num_polls ] . events = POLLIN ;
if ( streams [ i ] . bufs_outstanding > = 2 ) {
continue ;
}
if ( streams [ i ] . tb ) {
polls [ num_polls ] . fd = tbuffer_efd ( streams [ i ] . tbuffer ) ;
} else {
polls [ num_polls ] . fd = poolq_efd ( streams [ i ] . queue ) ;
}
poll_to_stream [ num_polls ] = i ;
num_polls + + ;
}
int ret = poll ( polls , num_polls , - 1 ) ;
if ( ret < 0 ) {
if ( errno = = EINTR | | errno = = EAGAIN ) continue ;
LOGE ( " poll failed (%d - %d) " , ret , errno ) ;
break ;
}
if ( do_exit ) break ;
if ( polls [ 0 ] . revents ) {
VisionPacket p ;
err = vipc_recv ( fd , & p ) ;
if ( err < = 0 ) {
break ;
} else if ( p . type = = VIPC_STREAM_SUBSCRIBE ) {
VisionStreamType stream_type = p . d . stream_sub . type ;
VisionPacket rep = {
. type = VIPC_STREAM_BUFS ,
. d = { . stream_bufs = { . type = stream_type } , } ,
} ;
VisionClientStreamState * stream = & streams [ stream_type ] ;
stream - > tb = p . d . stream_sub . tbuffer ;
VisionStreamBufs * stream_bufs = & rep . d . stream_bufs ;
CameraBuf * b = get_camerabuf_by_type ( s , stream_type ) ;
if ( stream_type = = VISION_STREAM_RGB_BACK | |
stream_type = = VISION_STREAM_RGB_FRONT | |
stream_type = = VISION_STREAM_RGB_WIDE ) {
stream_bufs - > width = b - > rgb_width ;
stream_bufs - > height = b - > rgb_height ;
stream_bufs - > stride = b - > rgb_stride ;
stream_bufs - > buf_len = b - > rgb_bufs [ 0 ] . len ;
rep . num_fds = UI_BUF_COUNT ;
for ( int i = 0 ; i < rep . num_fds ; i + + ) {
rep . fds [ i ] = b - > rgb_bufs [ i ] . fd ;
}
if ( stream - > tb ) {
stream - > tbuffer = & b - > ui_tb ;
} else {
assert ( false ) ;
}
} else {
stream_bufs - > width = b - > yuv_width ;
stream_bufs - > height = b - > yuv_height ;
stream_bufs - > stride = b - > yuv_width ;
stream_bufs - > buf_len = b - > yuv_buf_size ;
rep . num_fds = YUV_COUNT ;
for ( int i = 0 ; i < rep . num_fds ; i + + ) {
rep . fds [ i ] = b - > yuv_ion [ i ] . fd ;
}
if ( stream - > tb ) {
stream - > tbuffer = b - > yuv_tb ;
} else {
stream - > queue = pool_get_queue ( & b - > yuv_pool ) ;
}
}
vipc_send ( fd , & rep ) ;
streams [ stream_type ] . subscribed = true ;
} else if ( p . type = = VIPC_STREAM_RELEASE ) {
int si = p . d . stream_rel . type ;
assert ( si < VISION_STREAM_MAX ) ;
if ( streams [ si ] . tb ) {
tbuffer_release ( streams [ si ] . tbuffer , p . d . stream_rel . idx ) ;
} else {
poolq_release ( streams [ si ] . queue , p . d . stream_rel . idx ) ;
}
streams [ p . d . stream_rel . type ] . bufs_outstanding - - ;
} else {
assert ( false ) ;
}
} else {
int stream_i = VISION_STREAM_MAX ;
for ( int i = 1 ; i < num_polls ; i + + ) {
int si = poll_to_stream [ i ] ;
if ( ! streams [ si ] . subscribed ) continue ;
if ( polls [ i ] . revents ) {
stream_i = si ;
break ;
}
}
if ( stream_i < VISION_STREAM_MAX ) {
streams [ stream_i ] . bufs_outstanding + + ;
int idx ;
if ( streams [ stream_i ] . tb ) {
idx = tbuffer_acquire ( streams [ stream_i ] . tbuffer ) ;
} else {
idx = poolq_pop ( streams [ stream_i ] . queue ) ;
}
if ( idx < 0 ) {
break ;
}
VisionPacket rep = {
. type = VIPC_STREAM_ACQUIRE ,
. d = { . stream_acq = {
. type = ( VisionStreamType ) stream_i ,
. idx = idx ,
} } ,
} ;
if ( stream_i = = VISION_STREAM_YUV | |
stream_i = = VISION_STREAM_YUV_FRONT | |
stream_i = = VISION_STREAM_YUV_WIDE ) {
CameraBuf * b = get_camerabuf_by_type ( s , ( VisionStreamType ) stream_i ) ;
rep . d . stream_acq . extra . frame_id = b - > yuv_metas [ idx ] . frame_id ;
rep . d . stream_acq . extra . timestamp_sof = b - > yuv_metas [ idx ] . timestamp_sof ;
rep . d . stream_acq . extra . timestamp_eof = b - > yuv_metas [ idx ] . timestamp_eof ;
}
vipc_send ( fd , & rep ) ;
}
}
}
LOGW ( " client end fd %d " , fd ) ;
for ( auto & stream : streams ) {
if ( ! stream . subscribed ) continue ;
if ( stream . tb ) {
tbuffer_release_all ( stream . tbuffer ) ;
} else {
pool_release_queue ( stream . queue ) ;
}
}
close ( fd ) ;
pthread_mutex_lock ( & s - > clients_lock ) ;
client - > running = false ;
pthread_mutex_unlock ( & s - > clients_lock ) ;
return NULL ;
}
void * visionserver_thread ( void * arg ) {
int err ;
VisionState * s = ( VisionState * ) arg ;
set_thread_name ( " visionserver " ) ;
int sock = ipc_bind ( VIPC_SOCKET_PATH ) ;
while ( ! do_exit ) {
struct pollfd polls [ 1 ] = { { 0 } } ;
polls [ 0 ] . fd = sock ;
polls [ 0 ] . events = POLLIN ;
int ret = poll ( polls , ARRAYSIZE ( polls ) , 1000 ) ;
if ( ret < 0 ) {
if ( errno = = EINTR | | errno = = EAGAIN ) continue ;
LOGE ( " poll failed (%d - %d) " , ret , errno ) ;
break ;
}
if ( do_exit ) break ;
if ( ! polls [ 0 ] . revents ) {
continue ;
}
int fd = accept ( sock , NULL , NULL ) ;
assert ( fd > = 0 ) ;
pthread_mutex_lock ( & s - > clients_lock ) ;
int client_idx = 0 ;
for ( ; client_idx < MAX_CLIENTS ; client_idx + + ) {
if ( ! s - > clients [ client_idx ] . running ) break ;
}
if ( client_idx > = MAX_CLIENTS ) {
LOG ( " ignoring visionserver connection, max clients connected " ) ;
close ( fd ) ;
pthread_mutex_unlock ( & s - > clients_lock ) ;
continue ;
}
VisionClientState * client = & s - > clients [ client_idx ] ;
client - > s = s ;
client - > fd = fd ;
client - > running = true ;
err = pthread_create ( & client - > thread_handle , NULL ,
visionserver_client_thread , client ) ;
assert ( err = = 0 ) ;
pthread_mutex_unlock ( & s - > clients_lock ) ;
}
for ( auto & client : s - > clients ) {
pthread_mutex_lock ( & s - > clients_lock ) ;
bool running = client . running ;
pthread_mutex_unlock ( & s - > clients_lock ) ;
if ( running ) {
err = pthread_join ( client . thread_handle , NULL ) ;
assert ( err = = 0 ) ;
}
}
close ( sock ) ;
return NULL ;
}
void party ( cl_device_id device_id , cl_context context ) {
VisionState state = { } ;
VisionState * s = & state ;
cameras_init ( & s - > cameras , device_id , context ) ;
cameras_open ( & s - > cameras ) ;
std : : thread server_thread ( visionserver_thread , s ) ;
// priority for cameras
int err = set_realtime_priority ( 53 ) ;
LOG ( " setpriority returns %d " , err ) ;
cameras_run ( & s - > cameras ) ;
server_thread . join ( ) ;
}
# ifdef QCOM
# include "CL/cl_ext_qcom.h"
# endif
int main ( int argc , char * argv [ ] ) {
set_realtime_priority ( 53 ) ;
# if defined(QCOM)
set_core_affinity ( 2 ) ;
# elif defined(QCOM2)
set_core_affinity ( 6 ) ;
# endif
signal ( SIGINT , ( sighandler_t ) set_do_exit ) ;
signal ( SIGTERM , ( sighandler_t ) set_do_exit ) ;
cl_device_id device_id = cl_get_device_id ( CL_DEVICE_TYPE_DEFAULT ) ;
// TODO: do this for QCOM2 too
# if defined(QCOM)
const cl_context_properties props [ ] = { CL_CONTEXT_PRIORITY_HINT_QCOM , CL_PRIORITY_HINT_HIGH_QCOM , 0 } ;
cl_context context = CL_CHECK_ERR ( clCreateContext ( props , 1 , & device_id , NULL , NULL , & err ) ) ;
# else
cl_context context = CL_CHECK_ERR ( clCreateContext ( NULL , 1 , & device_id , NULL , NULL , & err ) ) ;
# endif
party ( device_id , context ) ;
CL_CHECK ( clReleaseContext ( context ) ) ;
}