tici loggerd fixes (#19622)

* fix encoder exit when encoder is uninitialized

* cleanup

* more cleanup

* more cleanup

* little more

* ptr

Co-authored-by: Comma Device <device@comma.ai>
pull/19670/head
Adeeb Shihadeh 4 years ago committed by GitHub
parent 8e6b8cdd4c
commit 54679cea58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      selfdrive/loggerd/encoder.c
  2. 209
      selfdrive/loggerd/loggerd.cc

@ -650,8 +650,8 @@ void encoder_close(EncoderState *s) {
fclose(s->of);
}
unlink(s->lock_path);
s->open = false;
}
s->open = false;
pthread_mutex_unlock(&s->lock);
}

@ -15,7 +15,6 @@
#include <string>
#include <iostream>
#include <fstream>
#include <fstream>
#include <streambuf>
#include <thread>
#include <mutex>
@ -41,8 +40,7 @@
#include "services.h"
#if !(defined(QCOM) || defined(QCOM2))
// no encoder on PC
#define DISABLE_ENCODER
#define DISABLE_ENCODER // TODO: loggerd for PC
#endif
#ifndef DISABLE_ENCODER
@ -106,9 +104,10 @@ LogCameraInfo cameras_logged[LOG_CAMERA_ID_MAX] = {
#endif
},
};
#define SEGMENT_LENGTH 60
#define LOG_ROOT "/data/media/0/realdata"
constexpr int SEGMENT_LENGTH = 60;
const char* LOG_ROOT = "/data/media/0/realdata";
namespace {
@ -127,7 +126,7 @@ static void set_do_exit(int sig) {
do_exit = 1;
}
static bool file_exists (const std::string& fn) {
static bool file_exists(const std::string& fn) {
std::ifstream f(fn);
return f.good();
}
@ -199,34 +198,17 @@ struct LoggerdState {
LoggerdState s;
#ifndef DISABLE_ENCODER
void encoder_thread(RotateState *rotate_state, int cam_idx) {
void encoder_thread(int cam_idx) {
assert(cam_idx < LOG_CAMERA_ID_MAX-1);
switch (cam_idx) {
case LOG_CAMERA_ID_DCAMERA: {
LOGW("recording front camera");
set_thread_name("FrontCameraEncoder");
break;
}
case LOG_CAMERA_ID_FCAMERA: {
set_thread_name("RearCameraEncoder");
break;
}
case LOG_CAMERA_ID_ECAMERA: {
set_thread_name("WideCameraEncoder");
break;
}
default: {
LOGE("unexpected camera index provided");
assert(false);
}
}
LogCameraInfo &cam_info = cameras_logged[cam_idx];
set_thread_name(cam_info.filename);
VisionStream stream;
RotateState &rotate_state = s.rotate_state[cam_idx];
rotate_state.enabled = true;
bool encoder_inited = false;
EncoderState encoder;
EncoderState encoder_alt;
bool has_encoder_alt = cameras_logged[cam_idx].has_qcamera;
std::vector<EncoderState*> encoders;
int encoder_segment = -1;
int cnt = 0;
@ -239,34 +221,28 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) {
while (!do_exit) {
VisionStreamBufs buf_info;
int err = visionstream_init(&stream, cameras_logged[cam_idx].stream_type, false, &buf_info);
int err = visionstream_init(&stream, cam_info.stream_type, false, &buf_info);
if (err != 0) {
LOGD("visionstream connect fail");
util::sleep_for(100);
continue;
}
if (!encoder_inited) {
if (encoders.empty()) {
LOGD("encoder init %dx%d", buf_info.width, buf_info.height);
encoder_init(&encoder, cameras_logged[cam_idx].filename,
buf_info.width,
buf_info.height,
cameras_logged[cam_idx].fps,
cameras_logged[cam_idx].bitrate,
cameras_logged[cam_idx].is_h265,
cameras_logged[cam_idx].downscale);
if (has_encoder_alt) {
encoder_init(&encoder_alt, cameras_logged[LOG_CAMERA_ID_QCAMERA].filename,
cameras_logged[LOG_CAMERA_ID_QCAMERA].frame_width,
cameras_logged[LOG_CAMERA_ID_QCAMERA].frame_height,
cameras_logged[LOG_CAMERA_ID_QCAMERA].fps,
cameras_logged[LOG_CAMERA_ID_QCAMERA].bitrate,
cameras_logged[LOG_CAMERA_ID_QCAMERA].is_h265,
cameras_logged[LOG_CAMERA_ID_QCAMERA].downscale);
}
encoder_inited = true;
// main encoder
encoders.push_back(new EncoderState());
encoder_init(encoders[0], cam_info.filename, buf_info.width, buf_info.height,
cam_info.fps, cam_info.bitrate, cam_info.is_h265, cam_info.downscale);
// qcamera encoder
if (cam_info.has_qcamera) {
LogCameraInfo &qcam_info = cameras_logged[LOG_CAMERA_ID_QCAMERA];
encoders.push_back(new EncoderState());
encoder_init(encoders[1], qcam_info.filename, qcam_info.frame_width, qcam_info.frame_height,
qcam_info.fps, qcam_info.bitrate, qcam_info.is_h265, qcam_info.downscale);
}
}
while (!do_exit) {
@ -288,23 +264,25 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) {
pthread_mutex_unlock(&s.rotate_lock);
// wait if camera pkt id is older than stream
rotate_state->waitLogThread();
rotate_state.waitLogThread();
if (do_exit) break;
// rotate the encoder if the logger is on a newer segment
if (rotate_state->should_rotate) {
if (!rotate_state->initialized) {
rotate_state->last_rotate_frame_id = extra.frame_id - 1;
rotate_state->initialized = true;
if (rotate_state.should_rotate) {
if (!rotate_state.initialized) {
rotate_state.last_rotate_frame_id = extra.frame_id - 1;
rotate_state.initialized = true;
}
while (s.rotate_seq_id != my_idx && !do_exit) { usleep(1000); }
LOGW("camera %d rotate encoder to %s.", cam_idx, s.segment_path);
encoder_rotate(&encoder, s.segment_path, s.rotate_segment);
s.rotate_seq_id = (my_idx + 1) % s.num_encoder;
if (has_encoder_alt) {
encoder_rotate(&encoder_alt, s.segment_path, s.rotate_segment);
for (auto &e : encoders) {
encoder_rotate(e, s.segment_path, s.rotate_segment);
}
s.rotate_seq_id = (my_idx + 1) % s.num_encoder;
encoder_segment = s.rotate_segment;
if (lh) {
lh_close(lh);
@ -319,28 +297,25 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) {
pthread_mutex_lock(&s.rotate_lock);
s.should_close = s.should_close == s.num_encoder ? 1 - s.num_encoder : s.should_close + 1;
encoder_close(&encoder);
encoder_open(&encoder, encoder.next_path);
encoder.segment = encoder.next_segment;
encoder.rotating = false;
if (has_encoder_alt) {
encoder_close(&encoder_alt);
encoder_open(&encoder_alt, encoder_alt.next_path);
encoder_alt.segment = encoder_alt.next_segment;
encoder_alt.rotating = false;
for (auto &e : encoders) {
encoder_close(e);
encoder_open(e, e->next_path);
e->segment = e->next_segment;
e->rotating = false;
}
s.finish_close += 1;
pthread_mutex_unlock(&s.rotate_lock);
while(s.finish_close > 0 && s.finish_close < s.num_encoder && !do_exit) { usleep(1000); }
s.finish_close = 0;
rotate_state->finish_rotate();
rotate_state.finish_rotate();
}
}
rotate_state->setStreamFrameId(extra.frame_id);
rotate_state.setStreamFrameId(extra.frame_id);
uint8_t *y = (uint8_t*)buf->addr;
uint8_t *u = y + (buf_info.width*buf_info.height);
@ -348,14 +323,12 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) {
{
// encode hevc
int out_segment = -1;
int out_id = encoder_encode_frame(&encoder,
y, u, v,
int out_id = encoder_encode_frame(encoders[0], y, u, v,
buf_info.width, buf_info.height,
&out_segment, &extra);
if (has_encoder_alt) {
if (encoders.size() > 1) {
int out_segment_alt = -1;
encoder_encode_frame(&encoder_alt,
y, u, v,
encoder_encode_frame(encoders[1], y, u, v,
buf_info.width, buf_info.height,
&out_segment_alt, &extra);
}
@ -378,8 +351,8 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) {
eidx.setSegmentNum(out_segment);
eidx.setSegmentId(out_id);
auto bytes = msg.toBytes();
if (lh) {
auto bytes = msg.toBytes();
lh_log(lh, bytes.begin(), bytes.size(), false);
}
}
@ -395,16 +368,11 @@ void encoder_thread(RotateState *rotate_state, int cam_idx) {
visionstream_destroy(&stream);
}
if (encoder_inited) {
LOG("encoder destroy");
encoder_close(&encoder);
encoder_destroy(&encoder);
}
if (has_encoder_alt) {
LOG("encoder alt destroy");
encoder_close(&encoder_alt);
encoder_destroy(&encoder_alt);
LOG("encoder destroy");
for(auto &e : encoders) {
encoder_close(e);
encoder_destroy(e);
delete e;
}
}
#endif
@ -576,18 +544,17 @@ int main(int argc, char** argv) {
signal(SIGINT, (sighandler_t)set_do_exit);
signal(SIGTERM, (sighandler_t)set_do_exit);
s.ctx = Context::create();
Poller * poller = Poller::create();
// subscribe to all services
std::vector<SubSocket*> socks;
typedef struct QlogState {
int counter, freq;
} QlogState;
std::map<SubSocket*, QlogState> qlog_states;
// setup messaging
std::vector<SubSocket*> socks;
s.ctx = Context::create();
Poller * poller = Poller::create();
for (const auto& it : services) {
std::string name = it.name;
@ -605,32 +572,31 @@ int main(int argc, char** argv) {
}
}
// init logger
{
auto words = gen_init_data();
auto bytes = words.asBytes();
logger_init(&s.logger, "rlog", bytes.begin(), bytes.size(), true);
}
// init encoders
s.rotate_seq_id = 0;
s.should_close = 0;
s.finish_close = 0;
s.num_encoder = 0;
pthread_mutex_init(&s.rotate_lock, NULL);
std::vector<std::thread> encoder_threads;
#ifndef DISABLE_ENCODER
// rear camera
std::thread encoder_thread_handle(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_FCAMERA], LOG_CAMERA_ID_FCAMERA);
s.rotate_state[LOG_CAMERA_ID_FCAMERA].enabled = true;
// front camera
std::thread front_encoder_thread_handle;
encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_FCAMERA));
if (record_front) {
front_encoder_thread_handle = std::thread(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_DCAMERA], LOG_CAMERA_ID_DCAMERA);
s.rotate_state[LOG_CAMERA_ID_DCAMERA].enabled = true;
encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_DCAMERA));
}
#ifdef QCOM2
// wide camera
std::thread wide_encoder_thread_handle(encoder_thread, &s.rotate_state[LOG_CAMERA_ID_ECAMERA], LOG_CAMERA_ID_ECAMERA);
s.rotate_state[LOG_CAMERA_ID_ECAMERA].enabled = true;
#endif
#ifdef QCOM2
encoder_threads.push_back(std::thread(encoder_thread, LOG_CAMERA_ID_ECAMERA));
#endif
#endif
uint64_t msg_count = 0;
@ -641,11 +607,11 @@ int main(int argc, char** argv) {
double last_rotate_tms = millis_since_boot();
double last_camera_seen_tms = millis_since_boot();
while (!do_exit) {
for (auto sock : poller->poll(100 * 1000)) {
Message * last_msg = nullptr;
while (true) {
for (auto sock : poller->poll(100 * 1000)) {
Message * last_msg = nullptr;
while (!do_exit) {
Message * msg = sock->receive(true);
if (msg == NULL){
if (!msg){
break;
}
delete last_msg;
@ -739,28 +705,17 @@ int main(int argc, char** argv) {
}
}
LOGW("joining threads");
for (int cid=0;cid<=MAX_CAM_IDX;cid++) { s.rotate_state[cid].cancelWait(); }
#ifndef DISABLE_ENCODER
#ifdef QCOM2
wide_encoder_thread_handle.join();
#endif
if (record_front) {
front_encoder_thread_handle.join();
}
encoder_thread_handle.join();
LOGW("encoder joined");
#endif
LOGW("closing encoders");
for (auto &r : s.rotate_state) r.cancelWait();
for (auto &t : encoder_threads) t.join();
LOGW("closing logger");
logger_close(&s.logger);
LOGW("logger closed");
for (auto s : socks){
delete s;
}
// messaging cleanup
for (auto sock : socks) delete sock;
delete poller;
delete s.ctx;
return 0;
}

Loading…
Cancel
Save