diff --git a/.github/workflows/selfdrive_tests.yaml b/.github/workflows/selfdrive_tests.yaml index ebd1526cd4..fe211aa480 100644 --- a/.github/workflows/selfdrive_tests.yaml +++ b/.github/workflows/selfdrive_tests.yaml @@ -214,6 +214,7 @@ jobs: $UNIT_TEST selfdrive/thermald && \ $UNIT_TEST tools/lib/tests && \ ./selfdrive/common/tests/test_util && \ + ./selfdrive/loggerd/tests/test_logger &&\ ./selfdrive/proclogd/tests/test_proclog && \ ./selfdrive/camerad/test/ae_gray_test" - name: Upload coverage to Codecov diff --git a/selfdrive/loggerd/.gitignore b/selfdrive/loggerd/.gitignore index 55081595af..4a8755b956 100644 --- a/selfdrive/loggerd/.gitignore +++ b/selfdrive/loggerd/.gitignore @@ -1 +1,2 @@ loggerd +tests/test_logger diff --git a/selfdrive/loggerd/SConscript b/selfdrive/loggerd/SConscript index 18325c7a49..ca8dc8285b 100644 --- a/selfdrive/loggerd/SConscript +++ b/selfdrive/loggerd/SConscript @@ -26,3 +26,6 @@ if arch == "Darwin": env.Program(src, LIBS=libs) env.Program('bootlog.cc', LIBS=libs) + +if GetOption('test'): + env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=[libs]) diff --git a/selfdrive/loggerd/logger.cc b/selfdrive/loggerd/logger.cc index 9600319382..620171174d 100644 --- a/selfdrive/loggerd/logger.cc +++ b/selfdrive/loggerd/logger.cc @@ -17,7 +17,6 @@ #include #endif -#include "cereal/messaging/messaging.h" #include "selfdrive/common/params.h" #include "selfdrive/common/swaglog.h" #include "selfdrive/common/version.h" @@ -132,14 +131,14 @@ void log_init_data(LoggerState *s) { } -static void log_sentinel(LoggerState *s, cereal::Sentinel::SentinelType type, int signal=0) { +static void lh_log_sentinel(LoggerHandle *h, SentinelType type) { MessageBuilder msg; auto sen = msg.initEvent().initSentinel(); sen.setType(type); - sen.setSignal(signal); + sen.setSignal(h->exit_signal); auto bytes = msg.toBytes(); - logger_log(s, bytes.begin(), bytes.size(), true); + lh_log(h, bytes.begin(), bytes.size(), true); } // ***** logging functions ***** @@ -172,6 +171,8 @@ static LoggerHandle* logger_open(LoggerState *s, const char* root_path) { snprintf(h->log_path, sizeof(h->log_path), "%s/%s.bz2", h->segment_path, s->log_name); snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog.bz2", h->segment_path); snprintf(h->lock_path, sizeof(h->lock_path), "%s.lock", h->log_path); + h->end_sentinel_type = SentinelType::END_OF_SEGMENT; + h->exit_signal = 0; err = logger_mkpath(h->log_path); if (err) return NULL; @@ -194,7 +195,6 @@ int logger_next(LoggerState *s, const char* root_path, char* out_segment_path, size_t out_segment_path_len, int* out_part) { bool is_start_of_route = !s->cur_handle; - if (!is_start_of_route) log_sentinel(s, cereal::Sentinel::SentinelType::END_OF_SEGMENT); pthread_mutex_lock(&s->lock); s->part++; @@ -221,7 +221,7 @@ int logger_next(LoggerState *s, const char* root_path, // write beggining of log metadata log_init_data(s); - log_sentinel(s, is_start_of_route ? cereal::Sentinel::SentinelType::START_OF_ROUTE : cereal::Sentinel::SentinelType::START_OF_SEGMENT); + lh_log_sentinel(s->cur_handle, is_start_of_route ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT); return 0; } @@ -246,11 +246,10 @@ void logger_log(LoggerState *s, uint8_t* data, size_t data_size, bool in_qlog) { } void logger_close(LoggerState *s, ExitHandler *exit_handler) { - int signal = exit_handler == nullptr ? 0 : exit_handler->signal.load(); - log_sentinel(s, cereal::Sentinel::SentinelType::END_OF_ROUTE, signal); - pthread_mutex_lock(&s->lock); if (s->cur_handle) { + s->cur_handle->exit_signal = exit_handler && exit_handler->signal.load(); + s->cur_handle->end_sentinel_type = SentinelType::END_OF_ROUTE; lh_close(s->cur_handle); } pthread_mutex_unlock(&s->lock); @@ -269,6 +268,12 @@ void lh_log(LoggerHandle* h, uint8_t* data, size_t data_size, bool in_qlog) { void lh_close(LoggerHandle* h) { pthread_mutex_lock(&h->lock); assert(h->refcnt > 0); + if (h->refcnt == 1) { + // a very ugly hack. only here can guarantee sentinel is the last msg + pthread_mutex_unlock(&h->lock); + lh_log_sentinel(h, h->end_sentinel_type); + pthread_mutex_lock(&h->lock); + } h->refcnt--; if (h->refcnt == 0) { h->log.reset(nullptr); diff --git a/selfdrive/loggerd/logger.h b/selfdrive/loggerd/logger.h index 96c7a549b3..88d3afd700 100644 --- a/selfdrive/loggerd/logger.h +++ b/selfdrive/loggerd/logger.h @@ -11,6 +11,7 @@ #include #include +#include "cereal/messaging/messaging.h" #include "selfdrive/common/util.h" #include "selfdrive/common/swaglog.h" #include "selfdrive/hardware/hw.h" @@ -56,8 +57,12 @@ class BZFile { BZFILE* bz_file = nullptr; }; +typedef cereal::Sentinel::SentinelType SentinelType; + typedef struct LoggerHandle { pthread_mutex_t lock; + SentinelType end_sentinel_type; + int exit_signal; int refcnt; char segment_path[4096]; char log_path[4096]; diff --git a/selfdrive/loggerd/tests/test_logger.cc b/selfdrive/loggerd/tests/test_logger.cc new file mode 100644 index 0000000000..c621d00c42 --- /dev/null +++ b/selfdrive/loggerd/tests/test_logger.cc @@ -0,0 +1,172 @@ +#include + +#include +#include + +#include + +#include "catch2/catch.hpp" +#include "cereal/messaging/messaging.h" +#include "selfdrive/common/util.h" +#include "selfdrive/loggerd/logger.h" + +typedef cereal::Sentinel::SentinelType SentinelType; + +bool decompressBZ2(std::vector &dest, const char srcData[], size_t srcSize, size_t outputSizeIncrement = 0x100000U) { + bz_stream strm = {}; + int ret = BZ2_bzDecompressInit(&strm, 0, 0); + assert(ret == BZ_OK); + dest.resize(1024 * 1024); + strm.next_in = const_cast(srcData); + strm.avail_in = srcSize; + do { + strm.next_out = (char *)&dest[strm.total_out_lo32]; + strm.avail_out = dest.size() - strm.total_out_lo32; + ret = BZ2_bzDecompress(&strm); + if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) { + dest.resize(dest.size() + outputSizeIncrement); + } + } while (ret == BZ_OK && strm.avail_in > 0); + + BZ2_bzDecompressEnd(&strm); + dest.resize(strm.total_out_lo32); + return ret == BZ_STREAM_END; +} + +void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) { + const std::string segment_path = route_path + "--" + std::to_string(segment); + SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT; + SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT; + + REQUIRE(!util::file_exists(segment_path + "/rlog.bz2.lock")); + for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) { + const std::string log_file = segment_path + fn; + INFO(log_file); + std::string log_bz2 = util::read_file(log_file); + REQUIRE(log_bz2.size() > 0); + + std::vector log; + bool ret = decompressBZ2(log, log_bz2.data(), log_bz2.size()); + REQUIRE(ret); + + int event_cnt = 0, i = 0; + kj::ArrayPtr words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); + while (words.size() > 0) { + try { + capnp::FlatArrayMessageReader reader(words); + auto event = reader.getRoot(); + words = kj::arrayPtr(reader.getEnd(), words.end()); + if (i == 0) { + REQUIRE(event.which() == cereal::Event::INIT_DATA); + } else if (i == 1) { + REQUIRE(event.which() == cereal::Event::SENTINEL); + REQUIRE(event.getSentinel().getType() == begin_sentinel); + REQUIRE(event.getSentinel().getSignal() == 0); + } else if (words.size() > 0) { + REQUIRE(event.which() == cereal::Event::CLOCKS); + ++event_cnt; + } else { + // the last event must be SENTINEL + REQUIRE(event.which() == cereal::Event::SENTINEL); + REQUIRE(event.getSentinel().getType() == end_sentinel); + REQUIRE(event.getSentinel().getSignal() == (end_sentinel == SentinelType::END_OF_ROUTE ? 1 : 0)); + } + ++i; + } catch (const kj::Exception &ex) { + INFO("failed parse " << i << " excpetion :" << ex.getDescription()); + REQUIRE(0); + break; + } + } + REQUIRE(event_cnt == required_event_cnt); + } +} + +void write_msg(LoggerHandle *logger) { + MessageBuilder msg; + msg.initEvent().initClocks(); + auto bytes = msg.toBytes(); + lh_log(logger, bytes.begin(), bytes.size(), true); +} + +TEST_CASE("logger") { + const std::string log_root = "/tmp/test_logger"; + system(("rm " + log_root + " -rf").c_str()); + + ExitHandler do_exit; + + LoggerState logger = {}; + logger_init(&logger, "rlog", true); + char segment_path[PATH_MAX] = {}; + int segment = -1; + + SECTION("single thread logging & rotation(100 segments, one thread)") { + const int segment_cnt = 100; + for (int i = 0; i < segment_cnt; ++i) { + REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0); + REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.bz2.lock")); + REQUIRE(segment == i); + write_msg(logger.cur_handle); + } + do_exit = true; + do_exit.signal = 1; + logger_close(&logger, &do_exit); + for (int i = 0; i < segment_cnt; ++i) { + verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, 1); + } + } + SECTION("multiple threads logging & rotation(100 segments, 10 threads") { + const int segment_cnt = 100, thread_cnt = 10; + std::atomic event_cnt[segment_cnt] = {}; + std::atomic main_segment = -1; + + auto logging_thread = [&]() -> void { + LoggerHandle *lh = logger_get_handle(&logger); + REQUIRE(lh != nullptr); + int segment = main_segment; + int delayed_cnt = 0; + while (!do_exit) { + // write 2 more messages in the current segment and then rotate to the new segment. + if (main_segment > segment && ++delayed_cnt == 2) { + lh_close(lh); + lh = logger_get_handle(&logger); + segment = main_segment; + delayed_cnt = 0; + } + write_msg(lh); + event_cnt[segment] += 1; + usleep(1); + } + lh_close(lh); + }; + + // start logging + std::vector threads; + for (int i = 0; i < segment_cnt; ++i) { + REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0); + REQUIRE(segment == i); + main_segment = segment; + if (i == 0) { + for (int i = 0; i < thread_cnt; ++i) { + threads.push_back(std::thread(logging_thread)); + } + } + for (int i = 0; i < 100; ++i) { + write_msg(logger.cur_handle); + usleep(1); + } + event_cnt[segment] += 100; + } + + // end logging + for (auto &t : threads) t.join(); + do_exit = true; + do_exit.signal = 1; + logger_close(&logger, &do_exit); + REQUIRE(logger.cur_handle->refcnt == 0); + + for (int i = 0; i < segment_cnt; ++i) { + verify_segment(log_root + "/" + logger.route_name, i, segment_cnt, event_cnt[i]); + } + } +} diff --git a/selfdrive/loggerd/tests/test_runner.cc b/selfdrive/loggerd/tests/test_runner.cc new file mode 100644 index 0000000000..62bf7476a1 --- /dev/null +++ b/selfdrive/loggerd/tests/test_runner.cc @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch2/catch.hpp"