diff --git a/system/loggerd/deleter.py b/system/loggerd/deleter.py index 5fb2b9eb41..5e7b31f583 100644 --- a/system/loggerd/deleter.py +++ b/system/loggerd/deleter.py @@ -2,15 +2,48 @@ import os import shutil import threading +from typing import List + from system.swaglog import cloudlog from system.loggerd.config import ROOT, get_available_bytes, get_available_percent from system.loggerd.uploader import listdir_by_creation +from system.loggerd.xattr_cache import getxattr MIN_BYTES = 5 * 1024 * 1024 * 1024 MIN_PERCENT = 10 DELETE_LAST = ['boot', 'crash'] +PRESERVE_ATTR_NAME = 'user.preserve' +PRESERVE_ATTR_VALUE = b'1' +PRESERVE_COUNT = 5 + + +def has_preserve_xattr(d: str) -> bool: + return getxattr(os.path.join(ROOT, d), PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE + + +def get_preserved_segments(dirs_by_creation: List[str]) -> List[str]: + preserved = [] + for n, d in enumerate(filter(has_preserve_xattr, reversed(dirs_by_creation))): + if n == PRESERVE_COUNT: + break + date_str, _, seg_str = d.rpartition("--") + + # ignore non-segment directories + if not date_str: + continue + try: + seg_num = int(seg_str) + except ValueError: + continue + + # preserve segment and its prior + preserved.append(d) + preserved.append(f"{date_str}--{seg_num - 1}") + + return preserved + def deleter_thread(exit_event): while not exit_event.is_set(): @@ -18,9 +51,13 @@ def deleter_thread(exit_event): out_of_percent = get_available_percent(default=MIN_PERCENT + 1) < MIN_PERCENT if out_of_percent or out_of_bytes: + dirs = listdir_by_creation(ROOT) + + # skip deleting most recent N preserved segments (and their prior segment) + preserved_dirs = get_preserved_segments(dirs) + # remove the earliest directory we can - dirs = sorted(listdir_by_creation(ROOT), key=lambda x: x in DELETE_LAST) - for delete_dir in dirs: + for delete_dir in sorted(dirs, key=lambda d: (d in DELETE_LAST, d in preserved_dirs)): delete_path = os.path.join(ROOT, delete_dir) if any(name.endswith(".lock") for name in os.listdir(delete_path)): diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index d1d9596e02..ced9595896 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -1,3 +1,5 @@ +#include + #include #include "system/loggerd/encoder/encoder.h" @@ -170,6 +172,19 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct return bytes_count; } +void handle_user_flag(LoggerdState *s) { + LOGW("preserving %s", s->segment_path); + +#ifdef __APPLE__ + int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0); +#else + int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0); +#endif + if (ret) { + LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->segment_path, strerror(errno)); + } +} + void loggerd_thread() { // setup messaging typedef struct QlogState { @@ -228,6 +243,10 @@ void loggerd_thread() { while (!do_exit && (msg = sock->receive(true))) { const bool in_qlog = qs.freq != -1 && (qs.counter++ % qs.freq == 0); + if (qs.name == "userFlag") { + handle_user_flag(&s); + } + if (qs.encoder) { s.last_camera_seen_tms = millis_since_boot(); bytes_count += handle_encoder_msg(&s, msg, qs.name, remote_encoders[sock], encoder_infos_dict[qs.name]); diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index e648c0e38a..4100f12f8d 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -24,6 +24,9 @@ const int MAIN_BITRATE = 10000000; const bool LOGGERD_TEST = getenv("LOGGERD_TEST"); const int SEGMENT_LENGTH = LOGGERD_TEST ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60; +constexpr char PRESERVE_ATTR_NAME[] = "user.preserve"; +constexpr char PRESERVE_ATTR_VALUE = '1'; + class EncoderInfo { public: const char *publish_name; diff --git a/system/loggerd/tests/loggerd_tests_common.py b/system/loggerd/tests/loggerd_tests_common.py index 6d1303ca6c..7d71516dfe 100644 --- a/system/loggerd/tests/loggerd_tests_common.py +++ b/system/loggerd/tests/loggerd_tests_common.py @@ -7,10 +7,12 @@ import unittest from pathlib import Path from typing import Optional +import system.loggerd.deleter as deleter import system.loggerd.uploader as uploader +from system.loggerd.xattr_cache import setxattr -def create_random_file(file_path: Path, size_mb: float, lock: bool = False, xattr: Optional[bytes] = None) -> None: +def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: Optional[bytes] = None) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) if lock: @@ -25,8 +27,8 @@ def create_random_file(file_path: Path, size_mb: float, lock: bool = False, xatt for _ in range(chunks): f.write(data) - if xattr is not None: - uploader.setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, xattr) + if upload_xattr is not None: + setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr) class MockResponse(): def __init__(self, text, status_code): @@ -105,8 +107,11 @@ class UploaderTestCase(unittest.TestCase): raise def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, - xattr: Optional[bytes] = None) -> Path: + upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: file_path = self.root / f_dir / fn - create_random_file(file_path, size_mb, lock, xattr) + create_random_file(file_path, size_mb, lock, upload_xattr) + + if preserve_xattr is not None: + setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr) return file_path diff --git a/system/loggerd/tests/test_deleter.py b/system/loggerd/tests/test_deleter.py index 596545cdeb..9474b30f82 100755 --- a/system/loggerd/tests/test_deleter.py +++ b/system/loggerd/tests/test_deleter.py @@ -3,9 +3,11 @@ import time import threading import unittest from collections import namedtuple +from pathlib import Path +from typing import Sequence -from common.timeout import Timeout, TimeoutException import system.loggerd.deleter as deleter +from common.timeout import Timeout, TimeoutException from system.loggerd.tests.loggerd_tests_common import UploaderTestCase Stats = namedtuple("Stats", ['f_bavail', 'f_blocks', 'f_frsize']) @@ -37,30 +39,59 @@ class TestDeleter(UploaderTestCase): self.start_thread() - with Timeout(5, "Timeout waiting for file to be deleted"): - while f_path.exists(): - time.sleep(0.01) - self.join_thread() - - self.assertFalse(f_path.exists(), "File not deleted") + try: + with Timeout(2, "Timeout waiting for file to be deleted"): + while f_path.exists(): + time.sleep(0.01) + finally: + self.join_thread() - def test_delete_files_in_create_order(self): - f_path_1 = self.make_file_with_data(self.seg_dir, self.f_type) - time.sleep(1) - self.seg_num += 1 - self.seg_dir = self.seg_format.format(self.seg_num) - f_path_2 = self.make_file_with_data(self.seg_dir, self.f_type) + def assertDeleteOrder(self, f_paths: Sequence[Path], timeout: int = 5) -> None: + deleted_order = [] self.start_thread() + try: + with Timeout(timeout, "Timeout waiting for files to be deleted"): + while True: + for f in f_paths: + if not f.exists() and f not in deleted_order: + deleted_order.append(f) + if len(deleted_order) == len(f_paths): + break + time.sleep(0.01) + except TimeoutException: + print("Not deleted:", [f for f in f_paths if f not in deleted_order]) + raise + finally: + self.join_thread() - with Timeout(5, "Timeout waiting for file to be deleted"): - while f_path_1.exists() and f_path_2.exists(): - time.sleep(0.01) - - self.join_thread() - - self.assertFalse(f_path_1.exists(), "Older file not deleted") - self.assertTrue(f_path_2.exists(), "Newer file deleted before older file") + self.assertEqual(deleted_order, f_paths, "Files not deleted in expected order") + + def test_delete_order(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(1), self.f_type), + self.make_file_with_data(self.seg_format2.format(0), self.f_type), + ]) + + def test_delete_many_preserved(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(1), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE), + self.make_file_with_data(self.seg_format.format(2), self.f_type), + ] + [ + self.make_file_with_data(self.seg_format2.format(i), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE) + for i in range(5) + ]) + + def test_delete_last(self): + self.assertDeleteOrder([ + self.make_file_with_data(self.seg_format.format(1), self.f_type), + self.make_file_with_data(self.seg_format2.format(0), self.f_type), + self.make_file_with_data(self.seg_format.format(0), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE), + self.make_file_with_data("boot", self.seg_format[:-4]), + self.make_file_with_data("crash", self.seg_format2[:-4]), + ]) def test_no_delete_when_available_space(self): f_path = self.make_file_with_data(self.seg_dir, self.f_type) @@ -70,15 +101,10 @@ class TestDeleter(UploaderTestCase): self.fake_stats = Stats(f_bavail=available, f_blocks=10, f_frsize=block_size) self.start_thread() - - try: - with Timeout(2, "Timeout waiting for file to be deleted"): - while f_path.exists(): - time.sleep(0.01) - except TimeoutException: - pass - finally: - self.join_thread() + start_time = time.monotonic() + while f_path.exists() and time.monotonic() - start_time < 2: + time.sleep(0.01) + self.join_thread() self.assertTrue(f_path.exists(), "File deleted with available space") @@ -86,15 +112,10 @@ class TestDeleter(UploaderTestCase): f_path = self.make_file_with_data(self.seg_dir, self.f_type, lock=True) self.start_thread() - - try: - with Timeout(2, "Timeout waiting for file to be deleted"): - while f_path.exists(): - time.sleep(0.01) - except TimeoutException: - pass - finally: - self.join_thread() + start_time = time.monotonic() + while f_path.exists() and time.monotonic() - start_time < 2: + time.sleep(0.01) + self.join_thread() self.assertTrue(f_path.exists(), "File deleted when locked") diff --git a/system/loggerd/tests/test_loggerd.py b/system/loggerd/tests/test_loggerd.py index a2166016e0..7365b256d2 100755 --- a/system/loggerd/tests/test_loggerd.py +++ b/system/loggerd/tests/test_loggerd.py @@ -8,6 +8,7 @@ import time import unittest from collections import defaultdict from pathlib import Path +from typing import Dict, List import cereal.messaging as messaging from cereal import log @@ -16,6 +17,8 @@ from common.basedir import BASEDIR from common.params import Params from common.timeout import Timeout from system.loggerd.config import ROOT +from system.loggerd.xattr_cache import getxattr +from system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE from selfdrive.manager.process_config import managed_processes from system.version import get_version from tools.lib.logreader import LogReader @@ -71,6 +74,30 @@ class TestLoggerd(unittest.TestCase): end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment self.assertTrue(msgs[-1].sentinel.type == end_type) + def _publish_random_messages(self, services: List[str]) -> Dict[str, list]: + pm = messaging.PubMaster(services) + + managed_processes["loggerd"].start() + for s in services: + self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5)) + + sent_msgs = defaultdict(list) + for _ in range(random.randint(2, 10) * 100): + for s in services: + try: + m = messaging.new_message(s) + except Exception: + m = messaging.new_message(s, random.randint(2, 10)) + pm.send(s, m) + sent_msgs[s].append(m) + time.sleep(0.01) + + for s in services: + self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5)) + managed_processes["loggerd"].stop() + + return sent_msgs + def test_init_data_values(self): os.environ["CLEAN"] = random.choice(["0", "1"]) @@ -193,29 +220,7 @@ class TestLoggerd(unittest.TestCase): services = random.sample(qlog_services, random.randint(2, min(10, len(qlog_services)))) + \ random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services)))) - - pm = messaging.PubMaster(services) - - # sleep enough for the first poll to time out - # TODO: fix loggerd bug dropping the msgs from the first poll - managed_processes["loggerd"].start() - for s in services: - while not pm.all_readers_updated(s): - time.sleep(0.1) - - sent_msgs = defaultdict(list) - for _ in range(random.randint(2, 10) * 100): - for s in services: - try: - m = messaging.new_message(s) - except Exception: - m = messaging.new_message(s, random.randint(2, 10)) - pm.send(s, m) - sent_msgs[s].append(m) - time.sleep(0.01) - - time.sleep(1) - managed_processes["loggerd"].stop() + sent_msgs = self._publish_random_messages(services) qlog_path = os.path.join(self._get_latest_log_dir(), "qlog") lr = list(LogReader(qlog_path)) @@ -241,27 +246,7 @@ class TestLoggerd(unittest.TestCase): def test_rlog(self): services = random.sample(CEREAL_SERVICES, random.randint(5, 10)) - pm = messaging.PubMaster(services) - - # sleep enough for the first poll to time out - # TODO: fix loggerd bug dropping the msgs from the first poll - managed_processes["loggerd"].start() - for s in services: - while not pm.all_readers_updated(s): - time.sleep(0.1) - - sent_msgs = defaultdict(list) - for _ in range(random.randint(2, 10) * 100): - for s in services: - try: - m = messaging.new_message(s) - except Exception: - m = messaging.new_message(s, random.randint(2, 10)) - pm.send(s, m) - sent_msgs[s].append(m) - - time.sleep(2) - managed_processes["loggerd"].stop() + sent_msgs = self._publish_random_messages(services) lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog"))) @@ -276,6 +261,20 @@ class TestLoggerd(unittest.TestCase): sent.clear_write_flag() self.assertEqual(sent.to_bytes(), m.as_builder().to_bytes()) + def test_preserving_flagged_segments(self): + services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) | {"userFlag"} + self._publish_random_messages(services) + + segment_dir = self._get_latest_log_dir() + self.assertEqual(getxattr(segment_dir, PRESERVE_ATTR_NAME), PRESERVE_ATTR_VALUE) + + def test_not_preserving_unflagged_segments(self): + services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) - {"userFlag"} + self._publish_random_messages(services) + + segment_dir = self._get_latest_log_dir() + self.assertIsNone(getxattr(segment_dir, PRESERVE_ATTR_NAME)) + if __name__ == "__main__": unittest.main() diff --git a/system/loggerd/tests/test_uploader.py b/system/loggerd/tests/test_uploader.py index 9346b770a9..580d1efae2 100755 --- a/system/loggerd/tests/test_uploader.py +++ b/system/loggerd/tests/test_uploader.py @@ -55,10 +55,10 @@ class TestUploader(UploaderTestCase): def gen_files(self, lock=False, xattr: Optional[bytes] = None, boot=True) -> List[Path]: f_paths = [] for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]: - f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, xattr=xattr)) + f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr)) if boot: - f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, xattr=xattr)) + f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, upload_xattr=xattr)) return f_paths def gen_order(self, seg1: List[int], seg2: List[int], boot=True) -> List[str]: