loggerd: c++ LoggerState (#25869)

* c++ LoggerState

* trigger ci

* trigger ci

* merge master

---------

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: 3527c1da67
chrysler-long2
Dean Lee 1 year ago committed by GitHub
parent e11869d4c9
commit 00e5f37d2a
  1. 182
      system/loggerd/logger.cc
  2. 62
      system/loggerd/logger.h
  3. 51
      system/loggerd/loggerd.cc
  4. 101
      system/loggerd/tests/test_logger.cc

@ -1,21 +1,7 @@
#include "system/loggerd/logger.h"
#include <sys/stat.h>
#include <unistd.h>
#include <ftw.h>
#include <cassert>
#include <cerrno>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <fstream>
#include <iostream>
#include <map>
#include <streambuf>
#include <string>
#include <vector>
#include "common/params.h"
@ -108,159 +94,51 @@ std::string logger_get_route_name() {
return route_name;
}
void log_init_data(LoggerState *s) {
auto bytes = s->init_data.asBytes();
logger_log(s, bytes.begin(), bytes.size(), s->has_qlog);
}
static void lh_log_sentinel(LoggerHandle *h, SentinelType type) {
static void log_sentinel(LoggerState *log, SentinelType type, int eixt_signal = 0) {
MessageBuilder msg;
auto sen = msg.initEvent().initSentinel();
sen.setType(type);
sen.setSignal(h->exit_signal);
auto bytes = msg.toBytes();
lh_log(h, bytes.begin(), bytes.size(), true);
sen.setSignal(eixt_signal);
log->write(msg.toBytes(), true);
}
// ***** logging functions *****
void logger_init(LoggerState *s, bool has_qlog) {
pthread_mutex_init(&s->lock, NULL);
s->part = -1;
s->has_qlog = has_qlog;
s->route_name = logger_get_route_name();
s->init_data = logger_build_init_data();
LoggerState::LoggerState(const std::string &log_root) {
route_name = logger_get_route_name();
route_path = log_root + "/" + route_name;
init_data = logger_build_init_data();
}
static LoggerHandle* logger_open(LoggerState *s, const char* root_path) {
LoggerHandle *h = NULL;
for (int i=0; i<LOGGER_MAX_HANDLES; i++) {
if (s->handles[i].refcnt == 0) {
h = &s->handles[i];
break;
}
LoggerState::~LoggerState() {
if (rlog) {
log_sentinel(this, SentinelType::END_OF_ROUTE, exit_signal);
std::remove(lock_file.c_str());
}
assert(h);
snprintf(h->segment_path, sizeof(h->segment_path),
"%s/%s--%d", root_path, s->route_name.c_str(), s->part);
snprintf(h->log_path, sizeof(h->log_path), "%s/rlog", h->segment_path);
snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog", h->segment_path);
snprintf(h->lock_path, sizeof(h->lock_path), "%s.lock", h->log_path);
h->end_sentinel_type = SentinelType::END_OF_SEGMENT;
h->exit_signal = 0;
if (!util::create_directories(h->segment_path, 0775)) return nullptr;
FILE* lock_file = fopen(h->lock_path, "wb");
if (lock_file == NULL) return NULL;
fclose(lock_file);
h->log = std::make_unique<RawFile>(h->log_path);
if (s->has_qlog) {
h->q_log = std::make_unique<RawFile>(h->qlog_path);
}
pthread_mutex_init(&h->lock, NULL);
h->refcnt++;
return h;
}
int logger_next(LoggerState *s, const char* root_path,
char* out_segment_path, size_t out_segment_path_len,
int* out_part) {
bool is_start_of_route = !s->cur_handle;
pthread_mutex_lock(&s->lock);
s->part++;
LoggerHandle* next_h = logger_open(s, root_path);
if (!next_h) {
pthread_mutex_unlock(&s->lock);
return -1;
bool LoggerState::next() {
if (rlog) {
log_sentinel(this, SentinelType::END_OF_SEGMENT);
std::remove(lock_file.c_str());
}
if (s->cur_handle) {
lh_close(s->cur_handle);
}
s->cur_handle = next_h;
segment_path = route_path + "--" + std::to_string(++part);
bool ret = util::create_directories(segment_path, 0775);
assert(ret == true);
if (out_segment_path) {
snprintf(out_segment_path, out_segment_path_len, "%s", next_h->segment_path);
}
if (out_part) {
*out_part = s->part;
}
const std::string rlog_path = segment_path + "/rlog";
lock_file = rlog_path + ".lock";
std::ofstream{lock_file};
pthread_mutex_unlock(&s->lock);
// write beginning of log metadata
log_init_data(s);
lh_log_sentinel(s->cur_handle, is_start_of_route ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT);
return 0;
}
rlog.reset(new RawFile(rlog_path));
qlog.reset(new RawFile(segment_path + "/qlog"));
LoggerHandle* logger_get_handle(LoggerState *s) {
pthread_mutex_lock(&s->lock);
LoggerHandle* h = s->cur_handle;
if (h) {
pthread_mutex_lock(&h->lock);
h->refcnt++;
pthread_mutex_unlock(&h->lock);
}
pthread_mutex_unlock(&s->lock);
return h;
}
void logger_log(LoggerState *s, uint8_t* data, size_t data_size, bool in_qlog) {
pthread_mutex_lock(&s->lock);
if (s->cur_handle) {
lh_log(s->cur_handle, data, data_size, in_qlog);
}
pthread_mutex_unlock(&s->lock);
// log init data & sentinel type.
write(init_data.asBytes(), true);
log_sentinel(this, part > 0 ? SentinelType::START_OF_SEGMENT : SentinelType::START_OF_ROUTE);
return true;
}
void logger_close(LoggerState *s, ExitHandler *exit_handler) {
pthread_mutex_lock(&s->lock);
if (s->cur_handle) {
s->cur_handle->exit_signal = exit_handler && exit_handler->signal.load();
s->cur_handle->end_sentinel_type = SentinelType::END_OF_ROUTE;
lh_close(s->cur_handle);
}
pthread_mutex_unlock(&s->lock);
}
void lh_log(LoggerHandle* h, uint8_t* data, size_t data_size, bool in_qlog) {
pthread_mutex_lock(&h->lock);
assert(h->refcnt > 0);
h->log->write(data, data_size);
if (in_qlog && h->q_log) {
h->q_log->write(data, data_size);
}
pthread_mutex_unlock(&h->lock);
}
void lh_close(LoggerHandle* h) {
pthread_mutex_lock(&h->lock);
assert(h->refcnt > 0);
if (h->refcnt == 1) {
// a very ugly hack. only here can guarantee sentinel is the last msg
pthread_mutex_unlock(&h->lock);
lh_log_sentinel(h, h->end_sentinel_type);
pthread_mutex_lock(&h->lock);
}
h->refcnt--;
if (h->refcnt == 0) {
h->log.reset(nullptr);
h->q_log.reset(nullptr);
unlink(h->lock_path);
pthread_mutex_unlock(&h->lock);
pthread_mutex_destroy(&h->lock);
return;
}
pthread_mutex_unlock(&h->lock);
void LoggerState::write(uint8_t* data, size_t size, bool in_qlog) {
rlog->write(data, size);
if (in_qlog) qlog->write(data, size);
}

@ -1,27 +1,17 @@
#pragma once
#include <pthread.h>
#include <cassert>
#include <cstdint>
#include <cstdio>
#include <memory>
#include <string>
#include <capnp/serialize.h>
#include <kj/array.h>
#include "cereal/messaging/messaging.h"
#include "common/util.h"
#include "common/swaglog.h"
#include "system/hardware/hw.h"
#define LOGGER_MAX_HANDLES 16
class RawFile {
public:
RawFile(const char* path) {
file = util::safe_fopen(path, "wb");
RawFile(const std::string &path) {
file = util::safe_fopen(path.c_str(), "wb");
assert(file != nullptr);
}
~RawFile() {
@ -41,39 +31,25 @@ class RawFile {
typedef cereal::Sentinel::SentinelType SentinelType;
typedef struct LoggerHandle {
pthread_mutex_t lock;
SentinelType end_sentinel_type;
int exit_signal;
int refcnt;
char segment_path[4096];
char log_path[4096];
char qlog_path[4096];
char lock_path[4096];
std::unique_ptr<RawFile> log, q_log;
} LoggerHandle;
typedef struct LoggerState {
pthread_mutex_t lock;
int part;
class LoggerState {
public:
LoggerState(const std::string& log_root = Path::log_root());
~LoggerState();
bool next();
void write(uint8_t* data, size_t size, bool in_qlog);
inline int segment() const { return part; }
inline const std::string& segmentPath() const { return segment_path; }
inline const std::string& routeName() const { return route_name; }
inline void write(kj::ArrayPtr<kj::byte> bytes, bool in_qlog) { write(bytes.begin(), bytes.size(), in_qlog); }
inline void setExitSignal(int signal) { exit_signal = signal; }
protected:
int part = -1, exit_signal = 0;
std::string route_path, route_name, segment_path, lock_file;
kj::Array<capnp::word> init_data;
std::string route_name;
char log_name[64];
bool has_qlog;
LoggerHandle handles[LOGGER_MAX_HANDLES];
LoggerHandle* cur_handle;
} LoggerState;
std::unique_ptr<RawFile> rlog, qlog;
};
kj::Array<capnp::word> logger_build_init_data();
std::string logger_get_route_name();
void logger_init(LoggerState *s, bool has_qlog);
int logger_next(LoggerState *s, const char* root_path,
char* out_segment_path, size_t out_segment_path_len,
int* out_part);
LoggerHandle* logger_get_handle(LoggerState *s);
void logger_close(LoggerState *s, ExitHandler *exit_handler=nullptr);
void logger_log(LoggerState *s, uint8_t* data, size_t data_size, bool in_qlog);
void lh_log(LoggerHandle* h, uint8_t* data, size_t data_size, bool in_qlog);
void lh_close(LoggerHandle* h);

@ -13,9 +13,7 @@
ExitHandler do_exit;
struct LoggerdState {
LoggerState logger = {};
char segment_path[4096];
std::atomic<int> rotate_segment;
LoggerState logger;
std::atomic<double> last_camera_seen_tms;
std::atomic<int> ready_to_rotate; // count of encoders ready to rotate
int max_waiting = 0;
@ -23,13 +21,11 @@ struct LoggerdState {
};
void logger_rotate(LoggerdState *s) {
int segment = -1;
int err = logger_next(&s->logger, Path::log_root().c_str(), s->segment_path, sizeof(s->segment_path), &segment);
assert(err == 0);
s->rotate_segment = segment;
bool ret =s->logger.next();
assert(ret);
s->ready_to_rotate = 0;
s->last_rotate_tms = millis_since_boot();
LOGW((s->logger.part == 0) ? "logging to %s" : "rotated to %s", s->segment_path);
LOGW((s->logger.segment() == 0) ? "logging to %s" : "rotated to %s", s->logger.segmentPath().c_str());
}
void rotate_if_needed(LoggerdState *s) {
@ -85,16 +81,16 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
}
int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset;
if (offset_segment_num == s->rotate_segment) {
if (offset_segment_num == s->logger.segment()) {
// loggerd is now on the segment that matches this packet
// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
if (re.current_segment != s->rotate_segment) {
if (re.current_segment != s->logger.segment()) {
if (re.recording) {
re.writer.reset();
re.recording = false;
}
re.current_segment = s->rotate_segment;
re.current_segment = s->logger.segment();
re.marked_ready_to_rotate = false;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
@ -117,7 +113,7 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
// if we aren't actually recording, don't create the writer
if (encoder_info.record) {
assert(encoder_info.filename != NULL);
re.writer.reset(new VideoWriter(s->segment_path,
re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(),
encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C,
encoder_info.frame_width, encoder_info.frame_height, encoder_info.fps, idx.getType()));
// write the header
@ -149,28 +145,28 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
evt.setLogMonoTime(event.getLogMonoTime());
(evt.*(encoder_info.set_encode_idx_func))(idx);
auto new_msg = bmsg.toBytes();
logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog?
s->logger.write((uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog?
bytes_count += new_msg.size();
// free the message, we used it
delete msg;
} else if (offset_segment_num > s->rotate_segment) {
} else if (offset_segment_num > s->logger.segment()) {
// encoderd packet has a newer segment, this means encoderd has rolled over
if (!re.marked_ready_to_rotate) {
re.marked_ready_to_rotate = true;
++s->ready_to_rotate;
LOGD("rotate %d -> %d ready %d/%d for %s",
s->rotate_segment.load(), offset_segment_num,
s->logger.segment(), offset_segment_num,
s->ready_to_rotate.load(), s->max_waiting, name.c_str());
}
// queue up all the new segment messages, they go in after the rotate
re.q.push_back(msg);
} else {
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->rotate_segment:%d re.encoderd_segment_offset:%d",
name.c_str(), idx.getSegmentNum(), s->rotate_segment.load(), re.encoderd_segment_offset);
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d",
name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset);
// free the message, it's useless. this should never happen
// actually, this can happen if you restart encoderd
re.encoderd_segment_offset = -s->rotate_segment.load();
re.encoderd_segment_offset = -s->logger.segment();
delete msg;
}
@ -179,19 +175,19 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
void handle_user_flag(LoggerdState *s) {
static int prev_segment = -1;
if (s->rotate_segment == prev_segment) return;
if (s->logger.segment() == prev_segment) return;
LOGW("preserving %s", s->segment_path);
LOGW("preserving %s", s->logger.segmentPath().c_str());
#ifdef __APPLE__
int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0);
int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0);
#else
int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0);
int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0);
#endif
if (ret) {
LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->segment_path, strerror(errno));
LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->logger.segmentPath().c_str(), strerror(errno));
}
prev_segment = s->rotate_segment.load();
prev_segment = s->logger.segment();
}
void loggerd_thread() {
@ -228,9 +224,8 @@ void loggerd_thread() {
LoggerdState s;
// init logger
logger_init(&s.logger, true);
logger_rotate(&s);
Params().put("CurrentRoute", s.logger.route_name);
Params().put("CurrentRoute", s.logger.routeName());
std::map<std::string, EncoderInfo> encoder_infos_dict;
for (const auto &cam : cameras_logged) {
@ -261,7 +256,7 @@ void loggerd_thread() {
s.last_camera_seen_tms = millis_since_boot();
bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]);
} else {
logger_log(&s.logger, (uint8_t *)msg->getData(), msg->getSize(), in_qlog);
s.logger.write((uint8_t *)msg->getData(), msg->getSize(), in_qlog);
bytes_count += msg->getSize();
delete msg;
}
@ -283,7 +278,7 @@ void loggerd_thread() {
}
LOGW("closing logger");
logger_close(&s.logger, &do_exit);
s.logger.setExitSignal(do_exit.signal);
if (do_exit.power_failure) {
LOGE("power failure");

@ -1,16 +1,5 @@
#include <sys/stat.h>
#include <climits>
#include <condition_variable>
#include <sstream>
#include <thread>
#include <utility>
#include "catch2/catch.hpp"
#include "cereal/messaging/messaging.h"
#include "common/util.h"
#include "system/loggerd/logger.h"
#include "tools/replay/util.h"
typedef cereal::Sentinel::SentinelType SentinelType;
@ -57,91 +46,29 @@ void verify_segment(const std::string &route_path, int segment, int max_segment,
}
}
void write_msg(LoggerHandle *logger) {
void write_msg(LoggerState *logger) {
MessageBuilder msg;
msg.initEvent().initClocks();
auto bytes = msg.toBytes();
lh_log(logger, bytes.begin(), bytes.size(), true);
logger->write(msg.toBytes(), true);
}
TEST_CASE("logger") {
const int segment_cnt = 100;
const std::string log_root = "/tmp/test_logger";
system(("rm " + log_root + " -rf").c_str());
ExitHandler do_exit;
LoggerState logger = {};
logger_init(&logger, true);
char segment_path[PATH_MAX] = {};
int segment = -1;
SECTION("single thread logging & rotation(100 segments, one thread)") {
const int segment_cnt = 100;
std::string route_name;
{
LoggerState logger(log_root);
route_name = logger.routeName();
for (int i = 0; i < segment_cnt; ++i) {
REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0);
REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.lock"));
REQUIRE(segment == i);
write_msg(logger.cur_handle);
}
do_exit = true;
do_exit.signal = 1;
logger_close(&logger, &do_exit);
for (int i = 0; i < segment_cnt; ++i) {
verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, 1);
REQUIRE(logger.next());
REQUIRE(util::file_exists(logger.segmentPath() + "/rlog.lock"));
REQUIRE(logger.segment() == i);
write_msg(&logger);
}
logger.setExitSignal(1);
}
SECTION("multiple threads logging & rotation(100 segments, 10 threads") {
const int segment_cnt = 100, thread_cnt = 10;
std::atomic<int> event_cnt[segment_cnt] = {};
std::atomic<int> main_segment = -1;
auto logging_thread = [&]() -> void {
LoggerHandle *lh = logger_get_handle(&logger);
assert(lh != nullptr);
int segment = main_segment;
int delayed_cnt = 0;
while (!do_exit) {
// write 2 more messages in the current segment and then rotate to the new segment.
if (main_segment > segment && ++delayed_cnt == 2) {
lh_close(lh);
lh = logger_get_handle(&logger);
segment = main_segment;
delayed_cnt = 0;
}
write_msg(lh);
event_cnt[segment] += 1;
usleep(1);
}
lh_close(lh);
};
// start logging
std::vector<std::thread> threads;
for (int i = 0; i < segment_cnt; ++i) {
REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0);
REQUIRE(segment == i);
main_segment = segment;
if (i == 0) {
for (int j = 0; j < thread_cnt; ++j) {
threads.push_back(std::thread(logging_thread));
}
}
for (int j = 0; j < 100; ++j) {
write_msg(logger.cur_handle);
usleep(1);
}
event_cnt[segment] += 100;
}
// end logging
for (auto &t : threads) t.join();
do_exit = true;
do_exit.signal = 1;
logger_close(&logger, &do_exit);
REQUIRE(logger.cur_handle->refcnt == 0);
for (int i = 0; i < segment_cnt; ++i) {
verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, event_cnt[i]);
}
for (int i = 0; i < segment_cnt; ++i) {
verify_segment(log_root + "/" + route_name, i, segment_cnt, 1);
}
}

Loading…
Cancel
Save