@ -11,6 +11,7 @@
# include <libyuv.h>
# include <libyuv.h>
# include <sys/resource.h>
# include <sys/resource.h>
# include <pthread.h>
# include <pthread.h>
# include <math.h>
# include <string>
# include <string>
# include <iostream>
# include <iostream>
@ -19,6 +20,7 @@
# include <thread>
# include <thread>
# include <mutex>
# include <mutex>
# include <condition_variable>
# include <condition_variable>
# include <atomic>
# include <random>
# include <random>
# include <ftw.h>
# include <ftw.h>
@ -34,7 +36,7 @@
# include "common/visionipc.h"
# include "common/visionipc.h"
# include "common/utilpp.h"
# include "common/utilpp.h"
# include "common/util.h"
# include "common/util.h"
# include "camerad/cameras/camera_common.h"
# include "logger.h"
# include "logger.h"
# include "messaging.hpp"
# include "messaging.hpp"
# include "services.h"
# include "services.h"
@ -44,28 +46,73 @@
# define DISABLE_ENCODER
# define DISABLE_ENCODER
# endif
# endif
# ifndef DISABLE_ENCODER
# ifndef DISABLE_ENCODER
# include "encoder.h"
# include "encoder.h"
# include "raw_logger.h"
# include "raw_logger.h"
# endif
# endif
# include "cereal/gen/cpp/log.capnp.h"
# define CAM_IDX_FCAM 0
# define CAM_IDX_DCAM 1
# define CAM_IDX_ECAM 2
# define CAMERA_FPS 20
# define SEGMENT_LENGTH 60
# define MAIN_BITRATE 5000000
# define MAIN_BITRATE 5000000
# define QCAM_BITRATE 128000
# define MAIN_FPS 20
# ifndef QCOM2
# ifndef QCOM2
# define MAX_CAM_IDX LOG_CAMERA_ID_DCAMERA
# define DCAM_BITRATE 2500000
# define DCAM_BITRATE 2500000
# else
# else
# define MAX_CAM_IDX LOG_CAMERA_ID_ECAMERA
# define DCAM_BITRATE MAIN_BITRATE
# define DCAM_BITRATE MAIN_BITRATE
# endif
# endif
# define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead
LogCameraInfo cameras_logged [ LOG_CAMERA_ID_MAX ] = {
[ LOG_CAMERA_ID_FCAMERA ] = {
. stream_type = VISION_STREAM_YUV ,
. filename = " fcamera.hevc " ,
. frame_packet_name = " frame " ,
. encode_idx_name = " encodeIdx " ,
. fps = MAIN_FPS ,
. bitrate = MAIN_BITRATE ,
. is_h265 = true ,
. downscale = false ,
. has_qcamera = true
} ,
[ LOG_CAMERA_ID_DCAMERA ] = {
. stream_type = VISION_STREAM_YUV_FRONT ,
. filename = " dcamera.hevc " ,
. frame_packet_name = " frontFrame " ,
. encode_idx_name = " frontEncodeIdx " ,
. fps = MAIN_FPS , // on EONs, more compressed this way
. bitrate = DCAM_BITRATE ,
. is_h265 = true ,
. downscale = false ,
. has_qcamera = false
} ,
[ LOG_CAMERA_ID_ECAMERA ] = {
. stream_type = VISION_STREAM_YUV_WIDE ,
. filename = " ecamera.hevc " ,
. frame_packet_name = " wideFrame " ,
. encode_idx_name = " wideEncodeIdx " ,
. fps = MAIN_FPS ,
. bitrate = MAIN_BITRATE ,
. is_h265 = true ,
. downscale = false ,
. has_qcamera = false
} ,
[ LOG_CAMERA_ID_QCAMERA ] = {
. filename = " qcamera.ts " ,
. fps = MAIN_FPS ,
. bitrate = QCAM_BITRATE ,
. is_h265 = false ,
. downscale = true ,
# ifndef QCOM2
. frame_width = 480 , . frame_height = 360
# else
. frame_width = 526 , . frame_height = 330 // keep pixel count the same?
# endif
} ,
} ;
# define SEGMENT_LENGTH 60
# define LOG_ROOT " / data / media / 0 / realdata"
# define LOG_ROOT " / data / media / 0 / realdata"
# define RAW_CLIP_LENGTH 100 // 5 seconds at 20fps
# define RAW_CLIP_LENGTH 100 // 5 seconds at 20fps
@ -86,44 +133,94 @@ volatile sig_atomic_t do_exit = 0;
static void set_do_exit ( int sig ) {
static void set_do_exit ( int sig ) {
do_exit = 1 ;
do_exit = 1 ;
}
}
class RotateState {
public :
SubSocket * fpkt_sock ;
uint32_t stream_frame_id , log_frame_id , last_rotate_frame_id ;
bool enabled , should_rotate ;
RotateState ( ) : fpkt_sock ( nullptr ) , stream_frame_id ( 0 ) , log_frame_id ( 0 ) ,
last_rotate_frame_id ( UINT32_MAX ) , enabled ( false ) , should_rotate ( false ) { } ;
void waitLogThread ( ) {
std : : unique_lock < std : : mutex > lk ( fid_lock ) ;
while ( stream_frame_id > log_frame_id //if the log camera is older, wait for it to catch up.
& & ( stream_frame_id - log_frame_id ) < 8 // but if its too old then there probably was a discontinuity (visiond restarted)
& & ! do_exit ) {
cv . wait ( lk ) ;
}
}
void cancelWait ( ) { cv . notify_one ( ) ; }
void setStreamFrameId ( uint32_t frame_id ) {
fid_lock . lock ( ) ;
stream_frame_id = frame_id ;
fid_lock . unlock ( ) ;
cv . notify_one ( ) ;
}
void setLogFrameId ( uint32_t frame_id ) {
fid_lock . lock ( ) ;
log_frame_id = frame_id ;
fid_lock . unlock ( ) ;
cv . notify_one ( ) ;
}
void rotate ( ) {
if ( ! enabled ) { return ; }
std : : unique_lock < std : : mutex > lk ( fid_lock ) ;
should_rotate = true ;
last_rotate_frame_id = stream_frame_id ;
}
void finish_rotate ( ) {
std : : unique_lock < std : : mutex > lk ( fid_lock ) ;
should_rotate = false ;
}
private :
std : : mutex fid_lock ;
std : : condition_variable cv ;
} ;
struct LoggerdState {
struct LoggerdState {
Context * ctx ;
Context * ctx ;
LoggerState logger ;
LoggerState logger ;
std : : mutex lock ;
std : : condition_variable cv ;
char segment_path [ 4096 ] ;
char segment_path [ 4096 ] ;
uint32_t last_frame_id ;
uint32_t rotate_last_frame_id ;
int rotate_segment ;
int rotate_segment ;
int rotate_seq_id ;
pthread_mutex_t rotate_lock ;
pthread_mutex_t rotate_lock ;
int num_encoder ;
int num_encoder ;
int should_close ;
std : : atomic < int > rotate_seq_id ;
int finish_close ;
std : : atomic < int > should_close ;
std : : atomic < int > finish_close ;
RotateState rotate_state [ LOG_CAMERA_ID_MAX - 1 ] ;
} ;
} ;
LoggerdState s ;
LoggerdState s ;
# ifndef DISABLE_ENCODER
# ifndef DISABLE_ENCODER
void encoder_thread ( bool is_streaming , bool raw_clips , int cam_idx ) {
void encoder_thread ( RotateState * rotate_state , bool is_streaming , bool raw_clips , int cam_idx ) {
int err ;
// 0:f, 1:d, 2:e
switch ( cam_idx ) {
if ( cam_idx = = CAM_IDX_DCAM ) {
case LOG_CAMERA_ID_DCAMERA : {
// TODO: add this back
LOGW ( " recording front camera " ) ;
# ifndef QCOM2
set_thread_name ( " FrontCameraEncoder " ) ;
if ( ! read_db_bool ( " RecordFront " ) ) return ;
break ;
LOGW ( " recording front camera " ) ;
}
# endif
case LOG_CAMERA_ID_FCAMERA : {
set_thread_name ( " FrontCameraEncoder " ) ;
set_thread_name ( " RearCameraEncoder " ) ;
} else if ( cam_idx = = CAM_IDX_FCAM ) {
break ;
set_thread_name ( " RearCameraEncoder " ) ;
}
} else if ( cam_idx = = CAM_IDX_ECAM ) {
case LOG_CAMERA_ID_ECAMERA : {
set_thread_name ( " WideCameraEncoder " ) ;
set_thread_name ( " WideCameraEncoder " ) ;
} else {
break ;
LOGE ( " unexpected camera index provided " ) ;
}
assert ( false ) ;
default : {
LOGE ( " unexpected camera index provided " ) ;
assert ( false ) ;
}
}
}
VisionStream stream ;
VisionStream stream ;
@ -131,30 +228,24 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) {
bool encoder_inited = false ;
bool encoder_inited = false ;
EncoderState encoder ;
EncoderState encoder ;
EncoderState encoder_alt ;
EncoderState encoder_alt ;
bool has_encoder_alt = false ;
bool has_encoder_alt = cameras_logged [ cam_idx ] . has_qcamera ;
int encoder_segment = - 1 ;
int cnt = 0 ;
rotate_state - > enabled = true ;
pthread_mutex_lock ( & s . rotate_lock ) ;
pthread_mutex_lock ( & s . rotate_lock ) ;
int my_idx = s . num_encoder ;
int my_idx = s . num_encoder ;
s . num_encoder + = 1 ;
s . num_encoder + = 1 ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
int encoder_segment = - 1 ;
PubSocket * idx_sock = PubSocket : : create ( s . ctx , cameras_logged [ cam_idx ] . encode_idx_name ) ;
int cnt = 0 ;
PubSocket * idx_sock = PubSocket : : create ( s . ctx , cam_idx = = CAM_IDX_DCAM ? " frontEncodeIdx " : ( cam_idx = = CAM_IDX_ECAM ? " wideEncodeIdx " : " encodeIdx " ) ) ;
assert ( idx_sock ! = NULL ) ;
assert ( idx_sock ! = NULL ) ;
LoggerHandle * lh = NULL ;
LoggerHandle * lh = NULL ;
while ( ! do_exit ) {
while ( ! do_exit ) {
VisionStreamBufs buf_info ;
VisionStreamBufs buf_info ;
if ( cam_idx = = CAM_IDX_DCAM ) {
int err = visionstream_init ( & stream , cameras_logged [ cam_idx ] . stream_type , false , & buf_info ) ;
err = visionstream_init ( & stream , VISION_STREAM_YUV_FRONT , false , & buf_info ) ;
} else if ( cam_idx = = CAM_IDX_FCAM ) {
err = visionstream_init ( & stream , VISION_STREAM_YUV , false , & buf_info ) ;
} else if ( cam_idx = = CAM_IDX_ECAM ) {
err = visionstream_init ( & stream , VISION_STREAM_YUV_WIDE , false , & buf_info ) ;
}
if ( err ! = 0 ) {
if ( err ! = 0 ) {
LOGD ( " visionstream connect fail " ) ;
LOGD ( " visionstream connect fail " ) ;
usleep ( 100000 ) ;
usleep ( 100000 ) ;
@ -163,15 +254,24 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) {
if ( ! encoder_inited ) {
if ( ! encoder_inited ) {
LOGD ( " encoder init %dx%d " , buf_info . width , buf_info . height ) ;
LOGD ( " encoder init %dx%d " , buf_info . width , buf_info . height ) ;
encoder_init ( & encoder , cam_idx = = CAM_IDX_DCAM ? " dcamera.hevc " : ( cam_idx = = CAM_IDX_ECAM ? " ecamera.hevc " : " fcamera.hevc " ) , buf_info . width , buf_info . height , CAMERA_FPS , cam_idx = = CAM_IDX_DCAM ? DCAM_BITRATE : MAIN_BITRATE , true , false ) ;
encoder_init ( & encoder , cameras_logged [ cam_idx ] . filename ,
buf_info . width ,
# ifndef QCOM2
buf_info . height ,
// TODO: fix qcamera on tici
cameras_logged [ cam_idx ] . fps ,
if ( cam_idx = = CAM_IDX_FCAM ) {
cameras_logged [ cam_idx ] . bitrate ,
encoder_init ( & encoder_alt , " qcamera.ts " , 480 , 360 , CAMERA_FPS , 128000 , false , true ) ;
cameras_logged [ cam_idx ] . is_h265 ,
has_encoder_alt = true ;
cameras_logged [ cam_idx ] . downscale ) ;
if ( has_encoder_alt ) {
encoder_init ( & encoder_alt , cameras_logged [ LOG_CAMERA_ID_QCAMERA ] . filename ,
cameras_logged [ LOG_CAMERA_ID_QCAMERA ] . frame_width ,
cameras_logged [ LOG_CAMERA_ID_QCAMERA ] . frame_height ,
cameras_logged [ LOG_CAMERA_ID_QCAMERA ] . fps ,
cameras_logged [ LOG_CAMERA_ID_QCAMERA ] . bitrate ,
cameras_logged [ LOG_CAMERA_ID_QCAMERA ] . is_h265 ,
cameras_logged [ LOG_CAMERA_ID_QCAMERA ] . downscale ) ;
}
}
# endif
encoder_inited = true ;
encoder_inited = true ;
if ( is_streaming ) {
if ( is_streaming ) {
encoder . zmq_ctx = zmq_ctx_new ( ) ;
encoder . zmq_ctx = zmq_ctx_new ( ) ;
@ -187,7 +287,7 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) {
RawLogger * rawlogger = NULL ;
RawLogger * rawlogger = NULL ;
if ( raw_clips ) {
if ( raw_clips ) {
rawlogger = new RawLogger ( " prcamera " , buf_info . width , buf_info . height , CA MER A_FPS) ;
rawlogger = new RawLogger ( " prcamera " , buf_info . width , buf_info . height , MAIN _FPS ) ;
}
}
while ( ! do_exit ) {
while ( ! do_exit ) {
@ -203,66 +303,46 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) {
//double msdiff = (double) diff / 1000000.0;
//double msdiff = (double) diff / 1000000.0;
// printf("logger latency to tsEof: %f\n", msdiff);
// printf("logger latency to tsEof: %f\n", msdiff);
uint8_t * y = ( uint8_t * ) buf - > addr ;
{ // all the rotation stuff
uint8_t * u = y + ( buf_info . width * buf_info . height ) ;
uint8_t * v = u + ( buf_info . width / 2 ) * ( buf_info . height / 2 ) ;
pthread_mutex_lock ( & s . rotate_lock ) ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
// wait if camera pkt id is older than stream
rotate_state - > waitLogThread ( ) ;
{
// all the rotation stuff
bool should_rotate = false ;
std : : unique_lock < std : : mutex > lk ( s . lock ) ;
if ( cam_idx = = CAM_IDX_FCAM ) { // TODO: should wait for three cameras on tici?
// wait if log camera is older on back camera
while ( extra . frame_id > s . last_frame_id //if the log camera is older, wait for it to catch up.
& & ( extra . frame_id - s . last_frame_id ) < 8 // but if its too old then there probably was a discontinuity (visiond restarted)
& & ! do_exit ) {
s . cv . wait ( lk ) ;
}
should_rotate = extra . frame_id > s . rotate_last_frame_id & & encoder_segment < s . rotate_segment & & s . rotate_seq_id = = my_idx ;
} else {
// front camera is best effort
should_rotate = encoder_segment < s . rotate_segment & & s . rotate_seq_id = = my_idx ;
}
if ( do_exit ) break ;
if ( do_exit ) break ;
// rotate the encoder if the logger is on a newer segment
// rotate the encoder if the logger is on a newer segment
if ( should_rotate ) {
if ( rotate_state - > should_rotate ) {
LOG ( " rotate encoder to %s " , s . segment_path ) ;
if ( rotate_state - > last_rotate_frame_id = = 0 ) {
rotate_state - > last_rotate_frame_id = extra . frame_id - 1 ;
}
while ( s . rotate_seq_id ! = my_idx & & ! do_exit ) { usleep ( 1000 ) ; }
LOGW ( " camera %d rotate encoder to %s. " , cam_idx , s . segment_path ) ;
encoder_rotate ( & encoder , s . segment_path , s . rotate_segment ) ;
encoder_rotate ( & encoder , s . segment_path , s . rotate_segment ) ;
s . rotate_seq_id = ( my_idx + 1 ) % s . num_encoder ;
s . rotate_seq_id = ( my_idx + 1 ) % s . num_encoder ;
if ( has_encoder_alt ) {
if ( has_encoder_alt ) {
encoder_rotate ( & encoder_alt , s . segment_path , s . rotate_segment ) ;
encoder_rotate ( & encoder_alt , s . segment_path , s . rotate_segment ) ;
}
}
if ( raw_clips ) {
if ( raw_clips ) {
rawlogger - > Rotate ( s . segment_path , s . rotate_segment ) ;
rawlogger - > Rotate ( s . segment_path , s . rotate_segment ) ;
}
}
encoder_segment = s . rotate_segment ;
encoder_segment = s . rotate_segment ;
if ( lh ) {
if ( lh ) {
lh_close ( lh ) ;
lh_close ( lh ) ;
}
}
lh = logger_get_handle ( & s . logger ) ;
lh = logger_get_handle ( & s . logger ) ;
}
if ( encoder . rotating ) {
pthread_mutex_lock ( & s . rotate_lock ) ;
pthread_mutex_lock ( & s . rotate_lock ) ;
s . should_close + = 1 ;
s . should_close + = 1 ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
while ( s . should_close > 0 & & s . should_close < s . num_encoder ) {
while ( s . should_close > 0 & & s . should_close < s . num_encoder & & ! do_exit ) { usleep ( 1000 ) ; }
// printf("%d waiting for others to reach close, %d/%d \n", my_idx, s.should_close, s.num_encoder);
s . cv . wait ( lk ) ;
}
pthread_mutex_lock ( & s . rotate_lock ) ;
pthread_mutex_lock ( & s . rotate_lock ) ;
if ( s . should_close = = s . num_encoder ) {
s . should_close = s . should_close = = s . num_encoder ? 1 - s . num_encoder : s . should_close + 1 ;
s . should_close = 1 - s . num_encoder ;
} else {
s . should_close + = 1 ;
}
encoder_close ( & encoder ) ;
encoder_close ( & encoder ) ;
encoder_open ( & encoder , encoder . next_path ) ;
encoder_open ( & encoder , encoder . next_path ) ;
encoder . segment = encoder . next_segment ;
encoder . segment = encoder . next_segment ;
@ -276,13 +356,18 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) {
s . finish_close + = 1 ;
s . finish_close + = 1 ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
pthread_mutex_unlock ( & s . rotate_lock ) ;
while ( s . finish_close > 0 & & s . finish_close < s . num_encoder ) {
while ( s . finish_close > 0 & & s . finish_close < s . num_encoder & & ! do_exit ) { usleep ( 1000 ) ; }
// printf("%d waiting for others to actually close, %d/%d \n", my_idx, s.finish_close, s.num_encoder);
s . cv . wait ( lk ) ;
}
s . finish_close = 0 ;
s . finish_close = 0 ;
rotate_state - > finish_rotate ( ) ;
}
}
}
}
rotate_state - > setStreamFrameId ( extra . frame_id ) ;
uint8_t * y = ( uint8_t * ) buf - > addr ;
uint8_t * u = y + ( buf_info . width * buf_info . height ) ;
uint8_t * v = u + ( buf_info . width / 2 ) * ( buf_info . height / 2 ) ;
{
{
// encode hevc
// encode hevc
int out_segment = - 1 ;
int out_segment = - 1 ;
@ -302,11 +387,12 @@ void encoder_thread(bool is_streaming, bool raw_clips, int cam_idx) {
MessageBuilder msg ;
MessageBuilder msg ;
auto eidx = msg . initEvent ( ) . initEncodeIdx ( ) ;
auto eidx = msg . initEvent ( ) . initEncodeIdx ( ) ;
eidx . setFrameId ( extra . frame_id ) ;
eidx . setFrameId ( extra . frame_id ) ;
# ifdef QCOM2
# ifdef QCOM2
eidx . setType ( cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ) ;
eidx . setType ( cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ) ;
# else
# else
eidx . setType ( cam_idx = = CAM_IDX_DCAM ? cereal : : EncodeIndex : : Type : : FRONT : cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ) ;
eidx . setType ( cam_idx = = LOG_CAMERA_ID_DCAMERA ? cereal : : EncodeIndex : : Type : : FRONT : cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ) ;
# endif
# endif
eidx . setEncodeId ( cnt ) ;
eidx . setEncodeId ( cnt ) ;
eidx . setSegmentNum ( out_segment ) ;
eidx . setSegmentNum ( out_segment ) ;
eidx . setSegmentId ( out_id ) ;
eidx . setSegmentId ( out_id ) ;
@ -533,6 +619,10 @@ int main(int argc, char** argv) {
if ( getenv ( " LOGGERD_TEST " ) ) {
if ( getenv ( " LOGGERD_TEST " ) ) {
segment_length = atoi ( getenv ( " LOGGERD_SEGMENT_LENGTH " ) ) ;
segment_length = atoi ( getenv ( " LOGGERD_SEGMENT_LENGTH " ) ) ;
}
}
bool record_front = true ;
# ifndef QCOM2
record_front = read_db_bool ( " RecordFront " ) ;
# endif
setpriority ( PRIO_PROCESS , 0 , - 12 ) ;
setpriority ( PRIO_PROCESS , 0 , - 12 ) ;
@ -546,7 +636,6 @@ int main(int argc, char** argv) {
// subscribe to all services
// subscribe to all services
SubSocket * frame_sock = NULL ;
std : : vector < SubSocket * > socks ;
std : : vector < SubSocket * > socks ;
std : : map < SubSocket * , int > qlog_counter ;
std : : map < SubSocket * , int > qlog_counter ;
@ -558,12 +647,11 @@ int main(int argc, char** argv) {
if ( it . should_log ) {
if ( it . should_log ) {
SubSocket * sock = SubSocket : : create ( s . ctx , name ) ;
SubSocket * sock = SubSocket : : create ( s . ctx , name ) ;
assert ( sock ! = NULL ) ;
assert ( sock ! = NULL ) ;
poller - > registerSocket ( sock ) ;
poller - > registerSocket ( sock ) ;
socks . push_back ( sock ) ;
socks . push_back ( sock ) ;
if ( name = = " frame " ) {
for ( int cid = 0 ; cid < = MAX_CAM_IDX ; cid + + ) {
frame_sock = sock ;
if ( name = = cameras_logged [ cid ] . frame_packet_name ) { s . rotate_state [ cid ] . fpkt_ sock = sock ; }
}
}
qlog_counter [ sock ] = ( it . decimation = = - 1 ) ? - 1 : 0 ;
qlog_counter [ sock ] = ( it . decimation = = - 1 ) ? - 1 : 0 ;
@ -571,7 +659,6 @@ int main(int argc, char** argv) {
}
}
}
}
{
{
auto words = gen_init_data ( ) ;
auto words = gen_init_data ( ) ;
auto bytes = words . asBytes ( ) ;
auto bytes = words . asBytes ( ) ;
@ -588,14 +675,6 @@ int main(int argc, char** argv) {
is_logging = false ;
is_logging = false ;
}
}
if ( is_logging ) {
err = logger_next ( & s . logger , LOG_ROOT , s . segment_path , sizeof ( s . segment_path ) , & s . rotate_segment ) ;
assert ( err = = 0 ) ;
LOGW ( " logging to %s " , s . segment_path ) ;
}
double start_ts = seconds_since_boot ( ) ;
double last_rotate_ts = start_ts ;
s . rotate_seq_id = 0 ;
s . rotate_seq_id = 0 ;
s . should_close = 0 ;
s . should_close = 0 ;
s . finish_close = 0 ;
s . finish_close = 0 ;
@ -603,74 +682,124 @@ int main(int argc, char** argv) {
pthread_mutex_init ( & s . rotate_lock , NULL ) ;
pthread_mutex_init ( & s . rotate_lock , NULL ) ;
# ifndef DISABLE_ENCODER
# ifndef DISABLE_ENCODER
// rear camera
// rear camera
std : : thread encoder_thread_handle ( encoder_thread , is_streaming , false , CAM_IDX_FCAM ) ;
std : : thread encoder_thread_handle ( encoder_thread , & s . rotate_state [ LOG_CAMERA_ID_FCAMERA ] , is_streaming , false , LOG_CAMERA_ID_FCAMERA ) ;
// front camera
// front camera
std : : thread front_encoder_thread_handle ( encoder_thread , false , false , CAM_IDX_DCAM ) ;
std : : thread front_encoder_thread_handle ;
if ( record_front ) {
front_encoder_thread_handle = std : : thread ( encoder_thread , & s . rotate_state [ LOG_CAMERA_ID_DCAMERA ] , false , false , LOG_CAMERA_ID_DCAMERA ) ;
}
# ifdef QCOM2
# ifdef QCOM2
// wide camera
// wide camera
std : : thread wide_encoder_thread_handle ( encoder_thread , false , false , CAM_IDX _ECAM ) ;
std : : thread wide_encoder_thread_handle ( encoder_thread , & s . rotate_state [ LOG_CAMERA_ID_ECAMERA ] , false , false , LOG_ CAMERA _ID_ECAMERA ) ;
# endif
# endif
# endif
# endif
uint64_t msg_count = 0 ;
uint64_t msg_count = 0 ;
uint64_t bytes_count = 0 ;
uint64_t bytes_count = 0 ;
kj : : Array < capnp : : word > buf = kj : : heapArray < capnp : : word > ( 1024 ) ;
double start_ts = seconds_since_boot ( ) ;
double last_rotate_tms = millis_since_boot ( ) ;
double last_camera_seen_tms = millis_since_boot ( ) ;
uint32_t last_seen_log_frame_id [ LOG_CAMERA_ID_MAX - 1 ] = { 0 } ;
uint32_t last_seen_log_frame_id_max = 0 ;
while ( ! do_exit ) {
while ( ! do_exit ) {
for ( auto sock : poller - > poll ( 100 * 1000 ) ) {
for ( auto sock : poller - > poll ( 100 * 1000 ) ) {
Message * last_msg = nullptr ;
while ( true ) {
while ( true ) {
Message * msg = sock - > receive ( true ) ;
Message * msg = sock - > receive ( true ) ;
if ( msg = = NULL ) {
if ( msg = = NULL ) {
break ;
break ;
}
}
delete last_msg ;
last_msg = msg ;
uint8_t * data = ( uint8_t * ) msg - > getData ( ) ;
logger_log ( & s . logger , ( uint8_t * ) msg - > getData ( ) , msg - > getSize ( ) , qlog_counter [ sock ] = = 0 ) ;
size_t len = msg - > getSize ( ) ;
if ( sock = = frame_sock ) {
// track camera frames to sync to encoder
auto amsg = kj : : heapArray < capnp : : word > ( ( len / sizeof ( capnp : : word ) ) + 1 ) ;
memcpy ( amsg . begin ( ) , data , len ) ;
capnp : : FlatArrayMessageReader cmsg ( amsg ) ;
cereal : : Event : : Reader event = cmsg . getRoot < cereal : : Event > ( ) ;
if ( event . isFrame ( ) ) {
std : : unique_lock < std : : mutex > lk ( s . lock ) ;
s . last_frame_id = event . getFrame ( ) . getFrameId ( ) ;
lk . unlock ( ) ;
s . cv . notify_all ( ) ;
}
}
logger_log ( & s . logger , data , len , qlog_counter [ sock ] = = 0 ) ;
delete msg ;
if ( qlog_counter [ sock ] ! = - 1 ) {
if ( qlog_counter [ sock ] ! = - 1 ) {
//printf("%p: %d/%d\n", socks[i], qlog_counter[socks[i]], qlog_freqs[socks[i]]);
//printf("%p: %d/%d\n", socks[i], qlog_counter[socks[i]], qlog_freqs[socks[i]]);
qlog_counter [ sock ] + + ;
qlog_counter [ sock ] + + ;
qlog_counter [ sock ] % = qlog_freqs [ sock ] ;
qlog_counter [ sock ] % = qlog_freqs [ sock ] ;
}
}
bytes_count + = msg - > getSize ( ) ;
bytes_count + = len ;
msg_count + + ;
msg_count + + ;
}
}
if ( last_msg ) {
int fpkt_id = - 1 ;
for ( int cid = 0 ; cid < = MAX_CAM_IDX ; cid + + ) {
if ( sock = = s . rotate_state [ cid ] . fpkt_sock ) { fpkt_id = cid ; break ; }
}
if ( fpkt_id > = 0 ) {
// track camera frames to sync to encoder
// only process last frame
const uint8_t * data = ( uint8_t * ) last_msg - > getData ( ) ;
const size_t len = last_msg - > getSize ( ) ;
const size_t size = len / sizeof ( capnp : : word ) + 1 ;
if ( buf . size ( ) < size ) {
buf = kj : : heapArray < capnp : : word > ( size ) ;
}
memcpy ( buf . begin ( ) , data , len ) ;
capnp : : FlatArrayMessageReader cmsg ( buf ) ;
cereal : : Event : : Reader event = cmsg . getRoot < cereal : : Event > ( ) ;
if ( fpkt_id = = LOG_CAMERA_ID_FCAMERA ) {
s . rotate_state [ fpkt_id ] . setLogFrameId ( event . getFrame ( ) . getFrameId ( ) ) ;
} else if ( fpkt_id = = LOG_CAMERA_ID_DCAMERA ) {
s . rotate_state [ fpkt_id ] . setLogFrameId ( event . getFrontFrame ( ) . getFrameId ( ) ) ;
} else if ( fpkt_id = = LOG_CAMERA_ID_ECAMERA ) {
s . rotate_state [ fpkt_id ] . setLogFrameId ( event . getWideFrame ( ) . getFrameId ( ) ) ;
}
last_seen_log_frame_id [ fpkt_id ] = s . rotate_state [ fpkt_id ] . log_frame_id ;
last_seen_log_frame_id_max = fmax ( last_seen_log_frame_id_max , s . rotate_state [ fpkt_id ] . log_frame_id ) ;
last_camera_seen_tms = millis_since_boot ( ) ;
}
delete last_msg ;
}
}
}
double ts = seconds_since_boot ( ) ;
double ts = seconds_since_boot ( ) ;
if ( ts - last_rotate_ts > segment_length ) {
double tms = millis_since_boot ( ) ;
// rotate the log
bool new_segment = false ;
last_rotate_ts + = segment_length ;
if ( s . logger . part > - 1 ) {
new_segment = true ;
if ( tms - last_camera_seen_tms < = NO_CAMERA_PATIENCE ) {
for ( int cid = 0 ; cid < = MAX_CAM_IDX ; cid + + ) {
// this *should* be redundant on tici since all camera frames are synced
new_segment & = ( ( ( s . rotate_state [ cid ] . stream_frame_id > = s . rotate_state [ cid ] . last_rotate_frame_id + segment_length * MAIN_FPS ) & &
( ! s . rotate_state [ cid ] . should_rotate ) ) | |
( ! s . rotate_state [ cid ] . enabled ) ) ;
if ( last_seen_log_frame_id [ cid ] + 2 < last_seen_log_frame_id_max ) { LOGW ( " camera %d lags behind " , cid ) ; }
# ifndef QCOM2
break ; // only look at fcamera frame id if not QCOM2
# endif
}
} else {
new_segment & = tms - last_rotate_tms > segment_length * 1000 ;
if ( new_segment ) { LOGW ( " no camera packet seen. auto rotated " ) ; }
}
} else if ( s . logger . part = = - 1 ) {
// always starts first segment immediately
new_segment = true ;
}
std : : lock_guard < std : : mutex > guard ( s . lock ) ;
if ( new_segment ) {
s . rotate_last_frame_id = s . last_frame_id ;
pthread_mutex_lock ( & s . rotate_lock ) ;
last_rotate_tms = millis_since_boot ( ) ;
// rotate the log
if ( is_logging ) {
if ( is_logging ) {
err = logger_next ( & s . logger , LOG_ROOT , s . segment_path , sizeof ( s . segment_path ) , & s . rotate_segment ) ;
err = logger_next ( & s . logger , LOG_ROOT , s . segment_path , sizeof ( s . segment_path ) , & s . rotate_segment ) ;
assert ( err = = 0 ) ;
assert ( err = = 0 ) ;
if ( s . logger . part = = 0 ) { LOGW ( " logging to %s " , s . segment_path ) ; }
LOGW ( " rotated to %s " , s . segment_path ) ;
LOGW ( " rotated to %s " , s . segment_path ) ;
}
}
// rotate the encoders
for ( int cid = 0 ; cid < = MAX_CAM_IDX ; cid + + ) { s . rotate_state [ cid ] . rotate ( ) ; }
pthread_mutex_unlock ( & s . rotate_lock ) ;
}
}
if ( ( msg_count % 1000 ) = = 0 ) {
if ( ( msg_count % 1000 ) = = 0 ) {
@ -679,14 +808,15 @@ int main(int argc, char** argv) {
}
}
LOGW ( " joining threads " ) ;
LOGW ( " joining threads " ) ;
s . cv . notify_all ( ) ;
for ( int cid = 0 ; cid < = MAX_CAM_IDX ; cid + + ) { s . rotate_state [ cid ] . cancelWait ( ) ; }
# ifndef DISABLE_ENCODER
# ifndef DISABLE_ENCODER
# ifdef QCOM2
# ifdef QCOM2
wide_encoder_thread_handle . join ( ) ;
wide_encoder_thread_handle . join ( ) ;
# endif
# endif
front_encoder_thread_handle . join ( ) ;
if ( record_front ) {
front_encoder_thread_handle . join ( ) ;
}
encoder_thread_handle . join ( ) ;
encoder_thread_handle . join ( ) ;
LOGW ( " encoder joined " ) ;
LOGW ( " encoder joined " ) ;
# endif
# endif