delay bz2 compression from logging to uploading (#24392)

* remove log_name

* log without compression

* fix tests

* remove extension for bootlog

* another test fix

* uploader compresses

* also compress in athena

* only compress qlog

* more generic check in do_upload

* fix bootlog compression

* lower loggerd cpu usage

* dont link against bz2

* set core affinity to little cluster

* handle old files
pull/24409/head
Willem Melching 3 years ago committed by GitHub
parent 061b18805e
commit 77a6f3d034
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      selfdrive/athena/athenad.py
  2. 3
      selfdrive/athena/tests/test_athenad.py
  3. 4
      selfdrive/loggerd/SConscript
  4. 4
      selfdrive/loggerd/bootlog.cc
  5. 11
      selfdrive/loggerd/logger.cc
  6. 32
      selfdrive/loggerd/logger.h
  7. 2
      selfdrive/loggerd/loggerd.cc
  8. 2
      selfdrive/loggerd/tests/test_encoder.py
  9. 10
      selfdrive/loggerd/tests/test_logger.cc
  10. 6
      selfdrive/loggerd/tests/test_loggerd.py
  11. 13
      selfdrive/loggerd/tests/test_uploader.py
  12. 21
      selfdrive/loggerd/uploader.py
  13. 6
      selfdrive/test/process_replay/regen.py
  14. 10
      selfdrive/test/test_onroad.py

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import base64 import base64
import bz2
import hashlib import hashlib
import io import io
import json import json
@ -30,7 +31,7 @@ from common.api import Api
from common.basedir import PERSIST from common.basedir import PERSIST
from common.file_helpers import CallbackReader from common.file_helpers import CallbackReader
from common.params import Params from common.params import Params
from common.realtime import sec_since_boot from common.realtime import sec_since_boot, set_core_affinity
from selfdrive.hardware import HARDWARE, PC, TICI from selfdrive.hardware import HARDWARE, PC, TICI
from selfdrive.loggerd.config import ROOT from selfdrive.loggerd.config import ROOT
from selfdrive.loggerd.xattr_cache import getxattr, setxattr from selfdrive.loggerd.xattr_cache import getxattr, setxattr
@ -64,6 +65,13 @@ UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', '
cur_upload_items: Dict[int, Any] = {} cur_upload_items: Dict[int, Any] = {}
def strip_bz2_extension(fn):
if fn.endswith('.bz2'):
return fn[:-4]
return fn
class AbortTransferException(Exception): class AbortTransferException(Exception):
pass pass
@ -227,14 +235,29 @@ def upload_handler(end_event: threading.Event) -> None:
def _do_upload(upload_item, callback=None): def _do_upload(upload_item, callback=None):
with open(upload_item.path, "rb") as f: path = upload_item.path
size = os.fstat(f.fileno()).st_size compress = False
# If file does not exist, but does exist without the .bz2 extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)):
path = strip_bz2_extension(path)
compress = True
with open(path, "rb") as f:
if compress:
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
data = bz2.compress(f.read())
size = len(data)
data = io.BytesIO(data)
else:
size = os.fstat(f.fileno()).st_size
data = f
if callback: if callback:
f = CallbackReader(f, callback, size) data = CallbackReader(data, callback, size)
return requests.put(upload_item.url, return requests.put(upload_item.url,
data=f, data=data,
headers={**upload_item.headers, 'Content-Length': str(size)}, headers={**upload_item.headers, 'Content-Length': str(size)},
timeout=30) timeout=30)
@ -335,8 +358,9 @@ def uploadFilesToUrls(files_data):
if len(fn) == 0 or fn[0] == '/' or '..' in fn or 'url' not in file: if len(fn) == 0 or fn[0] == '/' or '..' in fn or 'url' not in file:
failed.append(fn) failed.append(fn)
continue continue
path = os.path.join(ROOT, fn) path = os.path.join(ROOT, fn)
if not os.path.exists(path): if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)):
failed.append(fn) failed.append(fn)
continue continue
@ -680,6 +704,11 @@ def backoff(retries):
def main(): def main():
try:
set_core_affinity([0, 1, 2, 3])
except Exception:
cloudlog.exception("failed to set core affinity")
params = Params() params = Params()
dongle_id = params.get("DongleId", encoding='utf-8') dongle_id = params.get("DongleId", encoding='utf-8')
UploadQueueCache.initialize(upload_queue) UploadQueueCache.initialize(upload_queue)

@ -81,7 +81,8 @@ class TestAthenadMethods(unittest.TestCase):
def test_listDataDirectory(self): def test_listDataDirectory(self):
route = '2021-03-29--13-32-47' route = '2021-03-29--13-32-47'
segments = [0, 1, 2, 3, 11] segments = [0, 1, 2, 3, 11]
filenames = ['qlog.bz2', 'qcamera.ts', 'rlog.bz2', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
files = [f'{route}--{s}/{f}' for s in segments for f in filenames] files = [f'{route}--{s}/{f}' for s in segments for f in filenames]
for file in files: for file in files:
fn = os.path.join(athenad.ROOT, file) fn = os.path.join(athenad.ROOT, file)

@ -3,7 +3,7 @@ Import('env', 'arch', 'cereal', 'messaging', 'common', 'visionipc')
libs = [common, cereal, messaging, visionipc, libs = [common, cereal, messaging, visionipc,
'zmq', 'capnp', 'kj', 'z', 'zmq', 'capnp', 'kj', 'z',
'avformat', 'avcodec', 'swscale', 'avutil', 'avformat', 'avcodec', 'swscale', 'avutil',
'yuv', 'bz2', 'OpenCL', 'pthread'] 'yuv', 'OpenCL', 'pthread']
src = ['logger.cc', 'loggerd.cc', 'video_writer.cc', 'remote_encoder.cc'] src = ['logger.cc', 'loggerd.cc', 'video_writer.cc', 'remote_encoder.cc']
if arch == "larch64": if arch == "larch64":
@ -25,4 +25,4 @@ if arch == "larch64":
env.Program('bootlog.cc', LIBS=libs) env.Program('bootlog.cc', LIBS=libs)
if GetOption('test'): if GetOption('test'):
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_loggerd.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=libs + ['curl', 'crypto']) env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_loggerd.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto'])

@ -49,14 +49,14 @@ static kj::Array<capnp::word> build_boot_log() {
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
const std::string path = LOG_ROOT + "/boot/" + logger_get_route_name() + ".bz2"; const std::string path = LOG_ROOT + "/boot/" + logger_get_route_name();
LOGW("bootlog to %s", path.c_str()); LOGW("bootlog to %s", path.c_str());
// Open bootlog // Open bootlog
bool r = util::create_directories(LOG_ROOT + "/boot/", 0775); bool r = util::create_directories(LOG_ROOT + "/boot/", 0775);
assert(r); assert(r);
BZFile bz_file(path.c_str()); RawFile bz_file(path.c_str());
// Write initdata // Write initdata
bz_file.write(logger_build_init_data().asBytes()); bz_file.write(logger_build_init_data().asBytes());

@ -109,13 +109,12 @@ static void lh_log_sentinel(LoggerHandle *h, SentinelType type) {
// ***** logging functions ***** // ***** logging functions *****
void logger_init(LoggerState *s, const char* log_name, bool has_qlog) { void logger_init(LoggerState *s, bool has_qlog) {
pthread_mutex_init(&s->lock, NULL); pthread_mutex_init(&s->lock, NULL);
s->part = -1; s->part = -1;
s->has_qlog = has_qlog; s->has_qlog = has_qlog;
s->route_name = logger_get_route_name(); s->route_name = logger_get_route_name();
snprintf(s->log_name, sizeof(s->log_name), "%s", log_name);
s->init_data = logger_build_init_data(); s->init_data = logger_build_init_data();
} }
@ -132,8 +131,8 @@ static LoggerHandle* logger_open(LoggerState *s, const char* root_path) {
snprintf(h->segment_path, sizeof(h->segment_path), snprintf(h->segment_path, sizeof(h->segment_path),
"%s/%s--%d", root_path, s->route_name.c_str(), s->part); "%s/%s--%d", root_path, s->route_name.c_str(), s->part);
snprintf(h->log_path, sizeof(h->log_path), "%s/%s.bz2", h->segment_path, s->log_name); snprintf(h->log_path, sizeof(h->log_path), "%s/rlog", h->segment_path);
snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog.bz2", h->segment_path); snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog", h->segment_path);
snprintf(h->lock_path, sizeof(h->lock_path), "%s.lock", h->log_path); snprintf(h->lock_path, sizeof(h->lock_path), "%s.lock", h->log_path);
h->end_sentinel_type = SentinelType::END_OF_SEGMENT; h->end_sentinel_type = SentinelType::END_OF_SEGMENT;
h->exit_signal = 0; h->exit_signal = 0;
@ -144,9 +143,9 @@ static LoggerHandle* logger_open(LoggerState *s, const char* root_path) {
if (lock_file == NULL) return NULL; if (lock_file == NULL) return NULL;
fclose(lock_file); fclose(lock_file);
h->log = std::make_unique<BZFile>(h->log_path); h->log = std::make_unique<RawFile>(h->log_path);
if (s->has_qlog) { if (s->has_qlog) {
h->q_log = std::make_unique<BZFile>(h->qlog_path); h->q_log = std::make_unique<RawFile>(h->qlog_path);
} }
pthread_mutex_init(&h->lock, NULL); pthread_mutex_init(&h->lock, NULL);

@ -7,7 +7,6 @@
#include <cstdio> #include <cstdio>
#include <memory> #include <memory>
#include <bzlib.h>
#include <capnp/serialize.h> #include <capnp/serialize.h>
#include <kj/array.h> #include <kj/array.h>
@ -20,42 +19,25 @@ const std::string LOG_ROOT = Path::log_root();
#define LOGGER_MAX_HANDLES 16 #define LOGGER_MAX_HANDLES 16
class BZFile { class RawFile {
public: public:
BZFile(const char* path) { RawFile(const char* path) {
file = util::safe_fopen(path, "wb"); file = util::safe_fopen(path, "wb");
assert(file != nullptr); assert(file != nullptr);
int bzerror;
bz_file = BZ2_bzWriteOpen(&bzerror, file, 9, 0, 30);
assert(bzerror == BZ_OK);
} }
~BZFile() { ~RawFile() {
int bzerror;
BZ2_bzWriteClose(&bzerror, bz_file, 0, nullptr, nullptr);
if (bzerror != BZ_OK) {
LOGE("BZ2_bzWriteClose error, bzerror=%d", bzerror);
}
util::safe_fflush(file); util::safe_fflush(file);
int err = fclose(file); int err = fclose(file);
assert(err == 0); assert(err == 0);
} }
inline void write(void* data, size_t size) { inline void write(void* data, size_t size) {
int bzerror; int written = util::safe_fwrite(data, 1, size, file);
do { assert(written == size);
BZ2_bzWrite(&bzerror, bz_file, data, size);
} while (bzerror == BZ_IO_ERROR && errno == EINTR);
if (bzerror != BZ_OK && !error_logged) {
LOGE("BZ2_bzWrite error, bzerror=%d", bzerror);
error_logged = true;
}
} }
inline void write(kj::ArrayPtr<capnp::byte> array) { write(array.begin(), array.size()); } inline void write(kj::ArrayPtr<capnp::byte> array) { write(array.begin(), array.size()); }
private: private:
bool error_logged = false;
FILE* file = nullptr; FILE* file = nullptr;
BZFILE* bz_file = nullptr;
}; };
typedef cereal::Sentinel::SentinelType SentinelType; typedef cereal::Sentinel::SentinelType SentinelType;
@ -69,7 +51,7 @@ typedef struct LoggerHandle {
char log_path[4096]; char log_path[4096];
char qlog_path[4096]; char qlog_path[4096];
char lock_path[4096]; char lock_path[4096];
std::unique_ptr<BZFile> log, q_log; std::unique_ptr<RawFile> log, q_log;
} LoggerHandle; } LoggerHandle;
typedef struct LoggerState { typedef struct LoggerState {
@ -86,7 +68,7 @@ typedef struct LoggerState {
kj::Array<capnp::word> logger_build_init_data(); kj::Array<capnp::word> logger_build_init_data();
std::string logger_get_route_name(); std::string logger_get_route_name();
void logger_init(LoggerState *s, const char* log_name, bool has_qlog); void logger_init(LoggerState *s, bool has_qlog);
int logger_next(LoggerState *s, const char* root_path, int logger_next(LoggerState *s, const char* root_path,
char* out_segment_path, size_t out_segment_path_len, char* out_segment_path, size_t out_segment_path_len,
int* out_part); int* out_part);

@ -217,7 +217,7 @@ void loggerd_thread() {
LoggerdState s; LoggerdState s;
// init logger // init logger
logger_init(&s.logger, "rlog", true); logger_init(&s.logger, true);
logger_rotate(&s); logger_rotate(&s);
Params().put("CurrentRoute", s.logger.route_name); Params().put("CurrentRoute", s.logger.route_name);

@ -112,7 +112,7 @@ class TestEncoder(unittest.TestCase):
# Check encodeIdx # Check encodeIdx
if encode_idx_name is not None: if encode_idx_name is not None:
rlog_path = f"{route_prefix_path}--{i}/rlog.bz2" rlog_path = f"{route_prefix_path}--{i}/rlog"
msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name] msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name]
encode_msgs = [getattr(m, encode_idx_name) for m in msgs] encode_msgs = [getattr(m, encode_idx_name) for m in msgs]

@ -18,10 +18,10 @@ void verify_segment(const std::string &route_path, int segment, int max_segment,
SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT; SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT;
SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT; SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT;
REQUIRE(!util::file_exists(segment_path + "/rlog.bz2.lock")); REQUIRE(!util::file_exists(segment_path + "/rlog.lock"));
for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) { for (const char *fn : {"/rlog", "/qlog"}) {
const std::string log_file = segment_path + fn; const std::string log_file = segment_path + fn;
std::string log = decompressBZ2(util::read_file(log_file)); std::string log = util::read_file(log_file);
REQUIRE(!log.empty()); REQUIRE(!log.empty());
int event_cnt = 0, i = 0; int event_cnt = 0, i = 0;
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
@ -70,7 +70,7 @@ TEST_CASE("logger") {
ExitHandler do_exit; ExitHandler do_exit;
LoggerState logger = {}; LoggerState logger = {};
logger_init(&logger, "rlog", true); logger_init(&logger, true);
char segment_path[PATH_MAX] = {}; char segment_path[PATH_MAX] = {};
int segment = -1; int segment = -1;
@ -78,7 +78,7 @@ TEST_CASE("logger") {
const int segment_cnt = 100; const int segment_cnt = 100;
for (int i = 0; i < segment_cnt; ++i) { for (int i = 0; i < segment_cnt; ++i) {
REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0); REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0);
REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.bz2.lock")); REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.lock"));
REQUIRE(segment == i); REQUIRE(segment == i);
write_msg(logger.cur_handle); write_msg(logger.cur_handle);
} }

@ -108,7 +108,7 @@ class TestLoggerd(unittest.TestCase):
os.environ["LOGGERD_TEST"] = "1" os.environ["LOGGERD_TEST"] = "1"
Params().put("RecordFront", "1") Params().put("RecordFront", "1")
expected_files = {"rlog.bz2", "qlog.bz2", "qcamera.ts", "fcamera.hevc", "dcamera.hevc"} expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc"}
streams = [(VisionStreamType.VISION_STREAM_ROAD, tici_f_frame_size if TICI else eon_f_frame_size, "roadCameraState"), streams = [(VisionStreamType.VISION_STREAM_ROAD, tici_f_frame_size if TICI else eon_f_frame_size, "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, tici_d_frame_size if TICI else eon_d_frame_size, "driverCameraState")] (VisionStreamType.VISION_STREAM_DRIVER, tici_d_frame_size if TICI else eon_d_frame_size, "driverCameraState")]
if TICI: if TICI:
@ -208,7 +208,7 @@ class TestLoggerd(unittest.TestCase):
time.sleep(1) time.sleep(1)
managed_processes["loggerd"].stop() managed_processes["loggerd"].stop()
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog.bz2") qlog_path = os.path.join(self._get_latest_log_dir(), "qlog")
lr = list(LogReader(qlog_path)) lr = list(LogReader(qlog_path))
# check initData and sentinel # check initData and sentinel
@ -254,7 +254,7 @@ class TestLoggerd(unittest.TestCase):
time.sleep(2) time.sleep(2)
managed_processes["loggerd"].stop() managed_processes["loggerd"].stop()
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog.bz2"))) lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog")))
# check initData and sentinel # check initData and sentinel
self._check_init_data(lr) self._check_init_data(lr)

@ -54,11 +54,11 @@ class TestUploader(UploaderTestCase):
def gen_files(self, lock=False, boot=True): def gen_files(self, lock=False, boot=True):
f_paths = list() f_paths = list()
for t in ["qlog.bz2", "rlog.bz2", "dcamera.hevc", "fcamera.hevc"]: for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]:
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock)) f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock))
if boot: if boot:
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}.bz2", 1, lock=lock)) f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock))
return f_paths return f_paths
def gen_order(self, seg1, seg2, boot=True): def gen_order(self, seg1, seg2, boot=True):
@ -84,7 +84,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order: for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not uploaded") self.assertTrue(getxattr(os.path.join(self.root, f_path.replace('.bz2', '')), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -103,7 +103,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore") self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore")
self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice") self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice")
for f_path in exp_order: for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not ignored") self.assertTrue(getxattr(os.path.join(self.root, f_path.replace('.bz2', '')), uploader.UPLOAD_ATTR_NAME), "All files not ignored")
self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order") self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order")
@ -128,7 +128,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order: for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not uploaded") self.assertTrue(getxattr(os.path.join(self.root, f_path.replace('.bz2', '')), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -143,8 +143,7 @@ class TestUploader(UploaderTestCase):
self.join_thread() self.join_thread()
for f_path in f_paths: for f_path in f_paths:
self.assertFalse(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "File upload when locked") self.assertFalse(getxattr(f_path.replace('.bz2', ''), uploader.UPLOAD_ATTR_NAME), "File upload when locked")
def test_clear_locks_on_startup(self): def test_clear_locks_on_startup(self):
f_paths = self.gen_files(lock=True, boot=False) f_paths = self.gen_files(lock=True, boot=False)

@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import bz2
import json import json
import os import os
import random import random
@ -12,6 +13,7 @@ from cereal import log
import cereal.messaging as messaging import cereal.messaging as messaging
from common.api import Api from common.api import Api
from common.params import Params from common.params import Params
from common.realtime import set_core_affinity
from selfdrive.hardware import TICI from selfdrive.hardware import TICI
from selfdrive.loggerd.xattr_cache import getxattr, setxattr from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.loggerd.config import ROOT from selfdrive.loggerd.config import ROOT
@ -69,7 +71,7 @@ class Uploader():
self.last_filename = "" self.last_filename = ""
self.immediate_folders = ["crash/", "boot/"] self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog.bz2": 0, "qcamera.ts": 1} self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
def get_upload_sort(self, name): def get_upload_sort(self, name):
if name in self.immediate_priority: if name in self.immediate_priority:
@ -149,7 +151,12 @@ class Uploader():
self.last_resp = FakeResponse() self.last_resp = FakeResponse()
else: else:
with open(fn, "rb") as f: with open(fn, "rb") as f:
self.last_resp = requests.put(url, data=f, headers=headers, timeout=10) data = f.read()
if key.endswith('.bz2') and not fn.endswith('.bz2'):
data = bz2.compress(data)
self.last_resp = requests.put(url, data=data, headers=headers, timeout=10)
except Exception as e: except Exception as e:
self.last_exc = (e, traceback.format_exc()) self.last_exc = (e, traceback.format_exc())
raise raise
@ -212,7 +219,13 @@ class Uploader():
us.lastFilename = self.last_filename us.lastFilename = self.last_filename
return msg return msg
def uploader_fn(exit_event): def uploader_fn(exit_event):
try:
set_core_affinity([0, 1, 2, 3])
except Exception:
cloudlog.exception("failed to set core affinity")
clear_locks(ROOT) clear_locks(ROOT)
params = Params() params = Params()
@ -247,6 +260,10 @@ def uploader_fn(exit_event):
key, fn = d key, fn = d
# qlogs and bootlogs need to be compressed before uploading
if key.endswith('qlog') or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
success = uploader.upload(key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) success = uploader.upload(key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered)
if success: if success:
backoff = 0.1 backoff = 0.1

@ -256,7 +256,7 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA):
segment = params.get("CurrentRoute", encoding='utf-8') + "--0" segment = params.get("CurrentRoute", encoding='utf-8') + "--0"
seg_path = os.path.join(outdir, segment) seg_path = os.path.join(outdir, segment)
# check to make sure openpilot is engaged in the route # check to make sure openpilot is engaged in the route
if not check_enabled(LogReader(os.path.join(seg_path, "rlog.bz2"))): if not check_enabled(LogReader(os.path.join(seg_path, "rlog"))):
raise Exception(f"Route never enabled: {segment}") raise Exception(f"Route never enabled: {segment}")
return seg_path return seg_path
@ -268,11 +268,11 @@ def regen_and_save(route, sidx, upload=False, use_route_meta=False):
lr = LogReader(r.log_paths()[args.seg]) lr = LogReader(r.log_paths()[args.seg])
fr = FrameReader(r.camera_paths()[args.seg]) fr = FrameReader(r.camera_paths()[args.seg])
else: else:
lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog.bz2") lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog")
fr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc") fr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc")
rpath = regen_segment(lr, {'roadCameraState': fr}) rpath = regen_segment(lr, {'roadCameraState': fr})
lr = LogReader(os.path.join(rpath, 'rlog.bz2')) lr = LogReader(os.path.join(rpath, 'rlog2'))
controls_state_active = [m.controlsState.active for m in lr if m.which() == 'controlsState'] controls_state_active = [m.controlsState.active for m in lr if m.which() == 'controlsState']
assert any(controls_state_active), "Segment did not engage" assert any(controls_state_active), "Segment did not engage"

@ -22,7 +22,7 @@ from tools.lib.logreader import LogReader
# Baseline CPU usage by process # Baseline CPU usage by process
PROCS = { PROCS = {
"selfdrive.controls.controlsd": 31.0, "selfdrive.controls.controlsd": 31.0,
"./loggerd": 70.0, "./loggerd": 50.0,
"./camerad": 26.0, "./camerad": 26.0,
"./locationd": 9.1, "./locationd": 9.1,
"selfdrive.controls.plannerd": 11.7, "selfdrive.controls.plannerd": 11.7,
@ -108,9 +108,9 @@ class TestOnroad(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
if "DEBUG" in os.environ: if "DEBUG" in os.environ:
segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog.bz2")), Path(ROOT).iterdir()) segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog")), Path(ROOT).iterdir())
segs = sorted(segs, key=lambda x: x.stat().st_mtime) segs = sorted(segs, key=lambda x: x.stat().st_mtime)
cls.lr = list(LogReader(os.path.join(segs[-1], "rlog.bz2"))) cls.lr = list(LogReader(os.path.join(segs[-1], "rlog")))
return return
# setup env # setup env
@ -160,10 +160,10 @@ class TestOnroad(unittest.TestCase):
if proc.wait(60) is None: if proc.wait(60) is None:
proc.kill() proc.kill()
cls.lrs = [list(LogReader(os.path.join(str(s), "rlog.bz2"))) for s in cls.segments] cls.lrs = [list(LogReader(os.path.join(str(s), "rlog"))) for s in cls.segments]
# use the second segment by default as it's the first full segment # use the second segment by default as it's the first full segment
cls.lr = list(LogReader(os.path.join(str(cls.segments[1]), "rlog.bz2"))) cls.lr = list(LogReader(os.path.join(str(cls.segments[1]), "rlog")))
def test_cloudlog_size(self): def test_cloudlog_size(self):
msgs = [m for m in self.lr if m.which() == 'logMessage'] msgs = [m for m in self.lr if m.which() == 'logMessage']

Loading…
Cancel
Save