diff --git a/system/loggerd/logger.cc b/system/loggerd/logger.cc index 1474552a13..3af2c50aa1 100644 --- a/system/loggerd/logger.cc +++ b/system/loggerd/logger.cc @@ -1,21 +1,7 @@ #include "system/loggerd/logger.h" -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include #include -#include #include -#include -#include #include #include "common/params.h" @@ -108,159 +94,51 @@ std::string logger_get_route_name() { return route_name; } -void log_init_data(LoggerState *s) { - auto bytes = s->init_data.asBytes(); - logger_log(s, bytes.begin(), bytes.size(), s->has_qlog); -} - - -static void lh_log_sentinel(LoggerHandle *h, SentinelType type) { +static void log_sentinel(LoggerState *log, SentinelType type, int eixt_signal = 0) { MessageBuilder msg; auto sen = msg.initEvent().initSentinel(); sen.setType(type); - sen.setSignal(h->exit_signal); - auto bytes = msg.toBytes(); - - lh_log(h, bytes.begin(), bytes.size(), true); + sen.setSignal(eixt_signal); + log->write(msg.toBytes(), true); } -// ***** logging functions ***** - -void logger_init(LoggerState *s, bool has_qlog) { - pthread_mutex_init(&s->lock, NULL); - - s->part = -1; - s->has_qlog = has_qlog; - s->route_name = logger_get_route_name(); - s->init_data = logger_build_init_data(); +LoggerState::LoggerState(const std::string &log_root) { + route_name = logger_get_route_name(); + route_path = log_root + "/" + route_name; + init_data = logger_build_init_data(); } -static LoggerHandle* logger_open(LoggerState *s, const char* root_path) { - LoggerHandle *h = NULL; - for (int i=0; ihandles[i].refcnt == 0) { - h = &s->handles[i]; - break; - } +LoggerState::~LoggerState() { + if (rlog) { + log_sentinel(this, SentinelType::END_OF_ROUTE, exit_signal); + std::remove(lock_file.c_str()); } - assert(h); - - snprintf(h->segment_path, sizeof(h->segment_path), - "%s/%s--%d", root_path, s->route_name.c_str(), s->part); - - snprintf(h->log_path, sizeof(h->log_path), "%s/rlog", h->segment_path); - snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog", h->segment_path); - snprintf(h->lock_path, sizeof(h->lock_path), "%s.lock", h->log_path); - h->end_sentinel_type = SentinelType::END_OF_SEGMENT; - h->exit_signal = 0; - - if (!util::create_directories(h->segment_path, 0775)) return nullptr; - - FILE* lock_file = fopen(h->lock_path, "wb"); - if (lock_file == NULL) return NULL; - fclose(lock_file); - - h->log = std::make_unique(h->log_path); - if (s->has_qlog) { - h->q_log = std::make_unique(h->qlog_path); - } - - pthread_mutex_init(&h->lock, NULL); - h->refcnt++; - return h; } -int logger_next(LoggerState *s, const char* root_path, - char* out_segment_path, size_t out_segment_path_len, - int* out_part) { - bool is_start_of_route = !s->cur_handle; - - pthread_mutex_lock(&s->lock); - s->part++; - - LoggerHandle* next_h = logger_open(s, root_path); - if (!next_h) { - pthread_mutex_unlock(&s->lock); - return -1; +bool LoggerState::next() { + if (rlog) { + log_sentinel(this, SentinelType::END_OF_SEGMENT); + std::remove(lock_file.c_str()); } - if (s->cur_handle) { - lh_close(s->cur_handle); - } - s->cur_handle = next_h; + segment_path = route_path + "--" + std::to_string(++part); + bool ret = util::create_directories(segment_path, 0775); + assert(ret == true); - if (out_segment_path) { - snprintf(out_segment_path, out_segment_path_len, "%s", next_h->segment_path); - } - if (out_part) { - *out_part = s->part; - } + const std::string rlog_path = segment_path + "/rlog"; + lock_file = rlog_path + ".lock"; + std::ofstream{lock_file}; - pthread_mutex_unlock(&s->lock); - - // write beginning of log metadata - log_init_data(s); - lh_log_sentinel(s->cur_handle, is_start_of_route ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT); - return 0; -} + rlog.reset(new RawFile(rlog_path)); + qlog.reset(new RawFile(segment_path + "/qlog")); -LoggerHandle* logger_get_handle(LoggerState *s) { - pthread_mutex_lock(&s->lock); - LoggerHandle* h = s->cur_handle; - if (h) { - pthread_mutex_lock(&h->lock); - h->refcnt++; - pthread_mutex_unlock(&h->lock); - } - pthread_mutex_unlock(&s->lock); - return h; -} - -void logger_log(LoggerState *s, uint8_t* data, size_t data_size, bool in_qlog) { - pthread_mutex_lock(&s->lock); - if (s->cur_handle) { - lh_log(s->cur_handle, data, data_size, in_qlog); - } - pthread_mutex_unlock(&s->lock); + // log init data & sentinel type. + write(init_data.asBytes(), true); + log_sentinel(this, part > 0 ? SentinelType::START_OF_SEGMENT : SentinelType::START_OF_ROUTE); + return true; } -void logger_close(LoggerState *s, ExitHandler *exit_handler) { - pthread_mutex_lock(&s->lock); - if (s->cur_handle) { - s->cur_handle->exit_signal = exit_handler && exit_handler->signal.load(); - s->cur_handle->end_sentinel_type = SentinelType::END_OF_ROUTE; - lh_close(s->cur_handle); - } - pthread_mutex_unlock(&s->lock); -} - -void lh_log(LoggerHandle* h, uint8_t* data, size_t data_size, bool in_qlog) { - pthread_mutex_lock(&h->lock); - assert(h->refcnt > 0); - h->log->write(data, data_size); - if (in_qlog && h->q_log) { - h->q_log->write(data, data_size); - } - pthread_mutex_unlock(&h->lock); -} - -void lh_close(LoggerHandle* h) { - pthread_mutex_lock(&h->lock); - assert(h->refcnt > 0); - if (h->refcnt == 1) { - // a very ugly hack. only here can guarantee sentinel is the last msg - pthread_mutex_unlock(&h->lock); - lh_log_sentinel(h, h->end_sentinel_type); - pthread_mutex_lock(&h->lock); - } - h->refcnt--; - if (h->refcnt == 0) { - h->log.reset(nullptr); - h->q_log.reset(nullptr); - unlink(h->lock_path); - pthread_mutex_unlock(&h->lock); - pthread_mutex_destroy(&h->lock); - return; - } - pthread_mutex_unlock(&h->lock); +void LoggerState::write(uint8_t* data, size_t size, bool in_qlog) { + rlog->write(data, size); + if (in_qlog) qlog->write(data, size); } diff --git a/system/loggerd/logger.h b/system/loggerd/logger.h index 06b11e72f9..76a12b9e87 100644 --- a/system/loggerd/logger.h +++ b/system/loggerd/logger.h @@ -1,27 +1,17 @@ #pragma once -#include - #include -#include -#include #include #include -#include -#include - #include "cereal/messaging/messaging.h" #include "common/util.h" -#include "common/swaglog.h" #include "system/hardware/hw.h" -#define LOGGER_MAX_HANDLES 16 - class RawFile { public: - RawFile(const char* path) { - file = util::safe_fopen(path, "wb"); + RawFile(const std::string &path) { + file = util::safe_fopen(path.c_str(), "wb"); assert(file != nullptr); } ~RawFile() { @@ -41,39 +31,25 @@ class RawFile { typedef cereal::Sentinel::SentinelType SentinelType; -typedef struct LoggerHandle { - pthread_mutex_t lock; - SentinelType end_sentinel_type; - int exit_signal; - int refcnt; - char segment_path[4096]; - char log_path[4096]; - char qlog_path[4096]; - char lock_path[4096]; - std::unique_ptr log, q_log; -} LoggerHandle; -typedef struct LoggerState { - pthread_mutex_t lock; - int part; +class LoggerState { +public: + LoggerState(const std::string& log_root = Path::log_root()); + ~LoggerState(); + bool next(); + void write(uint8_t* data, size_t size, bool in_qlog); + inline int segment() const { return part; } + inline const std::string& segmentPath() const { return segment_path; } + inline const std::string& routeName() const { return route_name; } + inline void write(kj::ArrayPtr bytes, bool in_qlog) { write(bytes.begin(), bytes.size(), in_qlog); } + inline void setExitSignal(int signal) { exit_signal = signal; } + +protected: + int part = -1, exit_signal = 0; + std::string route_path, route_name, segment_path, lock_file; kj::Array init_data; - std::string route_name; - char log_name[64]; - bool has_qlog; - - LoggerHandle handles[LOGGER_MAX_HANDLES]; - LoggerHandle* cur_handle; -} LoggerState; + std::unique_ptr rlog, qlog; +}; kj::Array logger_build_init_data(); std::string logger_get_route_name(); -void logger_init(LoggerState *s, bool has_qlog); -int logger_next(LoggerState *s, const char* root_path, - char* out_segment_path, size_t out_segment_path_len, - int* out_part); -LoggerHandle* logger_get_handle(LoggerState *s); -void logger_close(LoggerState *s, ExitHandler *exit_handler=nullptr); -void logger_log(LoggerState *s, uint8_t* data, size_t data_size, bool in_qlog); - -void lh_log(LoggerHandle* h, uint8_t* data, size_t data_size, bool in_qlog); -void lh_close(LoggerHandle* h); diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index adb24c913c..8d5fcb95ac 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -13,9 +13,7 @@ ExitHandler do_exit; struct LoggerdState { - LoggerState logger = {}; - char segment_path[4096]; - std::atomic rotate_segment; + LoggerState logger; std::atomic last_camera_seen_tms; std::atomic ready_to_rotate; // count of encoders ready to rotate int max_waiting = 0; @@ -23,13 +21,11 @@ struct LoggerdState { }; void logger_rotate(LoggerdState *s) { - int segment = -1; - int err = logger_next(&s->logger, Path::log_root().c_str(), s->segment_path, sizeof(s->segment_path), &segment); - assert(err == 0); - s->rotate_segment = segment; + bool ret =s->logger.next(); + assert(ret); s->ready_to_rotate = 0; s->last_rotate_tms = millis_since_boot(); - LOGW((s->logger.part == 0) ? "logging to %s" : "rotated to %s", s->segment_path); + LOGW((s->logger.segment() == 0) ? "logging to %s" : "rotated to %s", s->logger.segmentPath().c_str()); } void rotate_if_needed(LoggerdState *s) { @@ -85,16 +81,16 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct } int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset; - if (offset_segment_num == s->rotate_segment) { + if (offset_segment_num == s->logger.segment()) { // loggerd is now on the segment that matches this packet // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets - if (re.current_segment != s->rotate_segment) { + if (re.current_segment != s->logger.segment()) { if (re.recording) { re.writer.reset(); re.recording = false; } - re.current_segment = s->rotate_segment; + re.current_segment = s->logger.segment(); re.marked_ready_to_rotate = false; // we are in this segment now, process any queued messages before this one if (!re.q.empty()) { @@ -117,7 +113,7 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct // if we aren't actually recording, don't create the writer if (encoder_info.record) { assert(encoder_info.filename != NULL); - re.writer.reset(new VideoWriter(s->segment_path, + re.writer.reset(new VideoWriter(s->logger.segmentPath().c_str(), encoder_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, encoder_info.frame_width, encoder_info.frame_height, encoder_info.fps, idx.getType())); // write the header @@ -149,28 +145,28 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct evt.setLogMonoTime(event.getLogMonoTime()); (evt.*(encoder_info.set_encode_idx_func))(idx); auto new_msg = bmsg.toBytes(); - logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? + s->logger.write((uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? bytes_count += new_msg.size(); // free the message, we used it delete msg; - } else if (offset_segment_num > s->rotate_segment) { + } else if (offset_segment_num > s->logger.segment()) { // encoderd packet has a newer segment, this means encoderd has rolled over if (!re.marked_ready_to_rotate) { re.marked_ready_to_rotate = true; ++s->ready_to_rotate; LOGD("rotate %d -> %d ready %d/%d for %s", - s->rotate_segment.load(), offset_segment_num, + s->logger.segment(), offset_segment_num, s->ready_to_rotate.load(), s->max_waiting, name.c_str()); } // queue up all the new segment messages, they go in after the rotate re.q.push_back(msg); } else { - LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->rotate_segment:%d re.encoderd_segment_offset:%d", - name.c_str(), idx.getSegmentNum(), s->rotate_segment.load(), re.encoderd_segment_offset); + LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d", + name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset); // free the message, it's useless. this should never happen // actually, this can happen if you restart encoderd - re.encoderd_segment_offset = -s->rotate_segment.load(); + re.encoderd_segment_offset = -s->logger.segment(); delete msg; } @@ -179,19 +175,19 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct void handle_user_flag(LoggerdState *s) { static int prev_segment = -1; - if (s->rotate_segment == prev_segment) return; + if (s->logger.segment() == prev_segment) return; - LOGW("preserving %s", s->segment_path); + LOGW("preserving %s", s->logger.segmentPath().c_str()); #ifdef __APPLE__ - int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0); + int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0); #else - int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0); + int ret = setxattr(s->logger.segmentPath().c_str(), PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0); #endif if (ret) { - LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->segment_path, strerror(errno)); + LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->logger.segmentPath().c_str(), strerror(errno)); } - prev_segment = s->rotate_segment.load(); + prev_segment = s->logger.segment(); } void loggerd_thread() { @@ -228,9 +224,8 @@ void loggerd_thread() { LoggerdState s; // init logger - logger_init(&s.logger, true); logger_rotate(&s); - Params().put("CurrentRoute", s.logger.route_name); + Params().put("CurrentRoute", s.logger.routeName()); std::map encoder_infos_dict; for (const auto &cam : cameras_logged) { @@ -261,7 +256,7 @@ void loggerd_thread() { s.last_camera_seen_tms = millis_since_boot(); bytes_count += handle_encoder_msg(&s, msg, service.name, remote_encoders[sock], encoder_infos_dict[service.name]); } else { - logger_log(&s.logger, (uint8_t *)msg->getData(), msg->getSize(), in_qlog); + s.logger.write((uint8_t *)msg->getData(), msg->getSize(), in_qlog); bytes_count += msg->getSize(); delete msg; } @@ -283,7 +278,7 @@ void loggerd_thread() { } LOGW("closing logger"); - logger_close(&s.logger, &do_exit); + s.logger.setExitSignal(do_exit.signal); if (do_exit.power_failure) { LOGE("power failure"); diff --git a/system/loggerd/tests/test_logger.cc b/system/loggerd/tests/test_logger.cc index 9f815c2189..2dae136e13 100644 --- a/system/loggerd/tests/test_logger.cc +++ b/system/loggerd/tests/test_logger.cc @@ -1,16 +1,5 @@ -#include - -#include -#include -#include -#include -#include - #include "catch2/catch.hpp" -#include "cereal/messaging/messaging.h" -#include "common/util.h" #include "system/loggerd/logger.h" -#include "tools/replay/util.h" typedef cereal::Sentinel::SentinelType SentinelType; @@ -57,91 +46,29 @@ void verify_segment(const std::string &route_path, int segment, int max_segment, } } -void write_msg(LoggerHandle *logger) { +void write_msg(LoggerState *logger) { MessageBuilder msg; msg.initEvent().initClocks(); - auto bytes = msg.toBytes(); - lh_log(logger, bytes.begin(), bytes.size(), true); + logger->write(msg.toBytes(), true); } TEST_CASE("logger") { + const int segment_cnt = 100; const std::string log_root = "/tmp/test_logger"; system(("rm " + log_root + " -rf").c_str()); - - ExitHandler do_exit; - - LoggerState logger = {}; - logger_init(&logger, true); - char segment_path[PATH_MAX] = {}; - int segment = -1; - - SECTION("single thread logging & rotation(100 segments, one thread)") { - const int segment_cnt = 100; + std::string route_name; + { + LoggerState logger(log_root); + route_name = logger.routeName(); for (int i = 0; i < segment_cnt; ++i) { - REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0); - REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.lock")); - REQUIRE(segment == i); - write_msg(logger.cur_handle); - } - do_exit = true; - do_exit.signal = 1; - logger_close(&logger, &do_exit); - for (int i = 0; i < segment_cnt; ++i) { - verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, 1); + REQUIRE(logger.next()); + REQUIRE(util::file_exists(logger.segmentPath() + "/rlog.lock")); + REQUIRE(logger.segment() == i); + write_msg(&logger); } + logger.setExitSignal(1); } - SECTION("multiple threads logging & rotation(100 segments, 10 threads") { - const int segment_cnt = 100, thread_cnt = 10; - std::atomic event_cnt[segment_cnt] = {}; - std::atomic main_segment = -1; - - auto logging_thread = [&]() -> void { - LoggerHandle *lh = logger_get_handle(&logger); - assert(lh != nullptr); - int segment = main_segment; - int delayed_cnt = 0; - while (!do_exit) { - // write 2 more messages in the current segment and then rotate to the new segment. - if (main_segment > segment && ++delayed_cnt == 2) { - lh_close(lh); - lh = logger_get_handle(&logger); - segment = main_segment; - delayed_cnt = 0; - } - write_msg(lh); - event_cnt[segment] += 1; - usleep(1); - } - lh_close(lh); - }; - - // start logging - std::vector threads; - for (int i = 0; i < segment_cnt; ++i) { - REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0); - REQUIRE(segment == i); - main_segment = segment; - if (i == 0) { - for (int j = 0; j < thread_cnt; ++j) { - threads.push_back(std::thread(logging_thread)); - } - } - for (int j = 0; j < 100; ++j) { - write_msg(logger.cur_handle); - usleep(1); - } - event_cnt[segment] += 100; - } - - // end logging - for (auto &t : threads) t.join(); - do_exit = true; - do_exit.signal = 1; - logger_close(&logger, &do_exit); - REQUIRE(logger.cur_handle->refcnt == 0); - - for (int i = 0; i < segment_cnt; ++i) { - verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, event_cnt[i]); - } + for (int i = 0; i < segment_cnt; ++i) { + verify_segment(log_root + "/" + route_name, i, segment_cnt, 1); } }