deleter: preserve flagged segments (#28814)

old-commit-hash: 98a30f10f4
beeps
Cameron Clough 2 years ago committed by GitHub
parent a7b32acd4b
commit b120aec2f5
  1. 41
      system/loggerd/deleter.py
  2. 19
      system/loggerd/loggerd.cc
  3. 3
      system/loggerd/loggerd.h
  4. 15
      system/loggerd/tests/loggerd_tests_common.py
  5. 99
      system/loggerd/tests/test_deleter.py
  6. 87
      system/loggerd/tests/test_loggerd.py
  7. 4
      system/loggerd/tests/test_uploader.py

@ -2,15 +2,48 @@
import os
import shutil
import threading
from typing import List
from system.swaglog import cloudlog
from system.loggerd.config import ROOT, get_available_bytes, get_available_percent
from system.loggerd.uploader import listdir_by_creation
from system.loggerd.xattr_cache import getxattr
MIN_BYTES = 5 * 1024 * 1024 * 1024
MIN_PERCENT = 10
DELETE_LAST = ['boot', 'crash']
PRESERVE_ATTR_NAME = 'user.preserve'
PRESERVE_ATTR_VALUE = b'1'
PRESERVE_COUNT = 5
def has_preserve_xattr(d: str) -> bool:
return getxattr(os.path.join(ROOT, d), PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE
def get_preserved_segments(dirs_by_creation: List[str]) -> List[str]:
preserved = []
for n, d in enumerate(filter(has_preserve_xattr, reversed(dirs_by_creation))):
if n == PRESERVE_COUNT:
break
date_str, _, seg_str = d.rpartition("--")
# ignore non-segment directories
if not date_str:
continue
try:
seg_num = int(seg_str)
except ValueError:
continue
# preserve segment and its prior
preserved.append(d)
preserved.append(f"{date_str}--{seg_num - 1}")
return preserved
def deleter_thread(exit_event):
while not exit_event.is_set():
@ -18,9 +51,13 @@ def deleter_thread(exit_event):
out_of_percent = get_available_percent(default=MIN_PERCENT + 1) < MIN_PERCENT
if out_of_percent or out_of_bytes:
dirs = listdir_by_creation(ROOT)
# skip deleting most recent N preserved segments (and their prior segment)
preserved_dirs = get_preserved_segments(dirs)
# remove the earliest directory we can
dirs = sorted(listdir_by_creation(ROOT), key=lambda x: x in DELETE_LAST)
for delete_dir in dirs:
for delete_dir in sorted(dirs, key=lambda d: (d in DELETE_LAST, d in preserved_dirs)):
delete_path = os.path.join(ROOT, delete_dir)
if any(name.endswith(".lock") for name in os.listdir(delete_path)):

@ -1,3 +1,5 @@
#include <sys/xattr.h>
#include <unordered_map>
#include "system/loggerd/encoder/encoder.h"
@ -170,6 +172,19 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct
return bytes_count;
}
void handle_user_flag(LoggerdState *s) {
LOGW("preserving %s", s->segment_path);
#ifdef __APPLE__
int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0, 0);
#else
int ret = setxattr(s->segment_path, PRESERVE_ATTR_NAME, &PRESERVE_ATTR_VALUE, 1, 0);
#endif
if (ret) {
LOGE("setxattr %s failed for %s: %s", PRESERVE_ATTR_NAME, s->segment_path, strerror(errno));
}
}
void loggerd_thread() {
// setup messaging
typedef struct QlogState {
@ -228,6 +243,10 @@ void loggerd_thread() {
while (!do_exit && (msg = sock->receive(true))) {
const bool in_qlog = qs.freq != -1 && (qs.counter++ % qs.freq == 0);
if (qs.name == "userFlag") {
handle_user_flag(&s);
}
if (qs.encoder) {
s.last_camera_seen_tms = millis_since_boot();
bytes_count += handle_encoder_msg(&s, msg, qs.name, remote_encoders[sock], encoder_infos_dict[qs.name]);

@ -24,6 +24,9 @@ const int MAIN_BITRATE = 10000000;
const bool LOGGERD_TEST = getenv("LOGGERD_TEST");
const int SEGMENT_LENGTH = LOGGERD_TEST ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60;
constexpr char PRESERVE_ATTR_NAME[] = "user.preserve";
constexpr char PRESERVE_ATTR_VALUE = '1';
class EncoderInfo {
public:
const char *publish_name;

@ -7,10 +7,12 @@ import unittest
from pathlib import Path
from typing import Optional
import system.loggerd.deleter as deleter
import system.loggerd.uploader as uploader
from system.loggerd.xattr_cache import setxattr
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, xattr: Optional[bytes] = None) -> None:
def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: Optional[bytes] = None) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
if lock:
@ -25,8 +27,8 @@ def create_random_file(file_path: Path, size_mb: float, lock: bool = False, xatt
for _ in range(chunks):
f.write(data)
if xattr is not None:
uploader.setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, xattr)
if upload_xattr is not None:
setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr)
class MockResponse():
def __init__(self, text, status_code):
@ -105,8 +107,11 @@ class UploaderTestCase(unittest.TestCase):
raise
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
xattr: Optional[bytes] = None) -> Path:
upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path:
file_path = self.root / f_dir / fn
create_random_file(file_path, size_mb, lock, xattr)
create_random_file(file_path, size_mb, lock, upload_xattr)
if preserve_xattr is not None:
setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr)
return file_path

@ -3,9 +3,11 @@ import time
import threading
import unittest
from collections import namedtuple
from pathlib import Path
from typing import Sequence
from common.timeout import Timeout, TimeoutException
import system.loggerd.deleter as deleter
from common.timeout import Timeout, TimeoutException
from system.loggerd.tests.loggerd_tests_common import UploaderTestCase
Stats = namedtuple("Stats", ['f_bavail', 'f_blocks', 'f_frsize'])
@ -37,30 +39,59 @@ class TestDeleter(UploaderTestCase):
self.start_thread()
with Timeout(5, "Timeout waiting for file to be deleted"):
while f_path.exists():
time.sleep(0.01)
self.join_thread()
self.assertFalse(f_path.exists(), "File not deleted")
try:
with Timeout(2, "Timeout waiting for file to be deleted"):
while f_path.exists():
time.sleep(0.01)
finally:
self.join_thread()
def test_delete_files_in_create_order(self):
f_path_1 = self.make_file_with_data(self.seg_dir, self.f_type)
time.sleep(1)
self.seg_num += 1
self.seg_dir = self.seg_format.format(self.seg_num)
f_path_2 = self.make_file_with_data(self.seg_dir, self.f_type)
def assertDeleteOrder(self, f_paths: Sequence[Path], timeout: int = 5) -> None:
deleted_order = []
self.start_thread()
try:
with Timeout(timeout, "Timeout waiting for files to be deleted"):
while True:
for f in f_paths:
if not f.exists() and f not in deleted_order:
deleted_order.append(f)
if len(deleted_order) == len(f_paths):
break
time.sleep(0.01)
except TimeoutException:
print("Not deleted:", [f for f in f_paths if f not in deleted_order])
raise
finally:
self.join_thread()
with Timeout(5, "Timeout waiting for file to be deleted"):
while f_path_1.exists() and f_path_2.exists():
time.sleep(0.01)
self.join_thread()
self.assertFalse(f_path_1.exists(), "Older file not deleted")
self.assertTrue(f_path_2.exists(), "Newer file deleted before older file")
self.assertEqual(deleted_order, f_paths, "Files not deleted in expected order")
def test_delete_order(self):
self.assertDeleteOrder([
self.make_file_with_data(self.seg_format.format(0), self.f_type),
self.make_file_with_data(self.seg_format.format(1), self.f_type),
self.make_file_with_data(self.seg_format2.format(0), self.f_type),
])
def test_delete_many_preserved(self):
self.assertDeleteOrder([
self.make_file_with_data(self.seg_format.format(0), self.f_type),
self.make_file_with_data(self.seg_format.format(1), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE),
self.make_file_with_data(self.seg_format.format(2), self.f_type),
] + [
self.make_file_with_data(self.seg_format2.format(i), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE)
for i in range(5)
])
def test_delete_last(self):
self.assertDeleteOrder([
self.make_file_with_data(self.seg_format.format(1), self.f_type),
self.make_file_with_data(self.seg_format2.format(0), self.f_type),
self.make_file_with_data(self.seg_format.format(0), self.f_type, preserve_xattr=deleter.PRESERVE_ATTR_VALUE),
self.make_file_with_data("boot", self.seg_format[:-4]),
self.make_file_with_data("crash", self.seg_format2[:-4]),
])
def test_no_delete_when_available_space(self):
f_path = self.make_file_with_data(self.seg_dir, self.f_type)
@ -70,15 +101,10 @@ class TestDeleter(UploaderTestCase):
self.fake_stats = Stats(f_bavail=available, f_blocks=10, f_frsize=block_size)
self.start_thread()
try:
with Timeout(2, "Timeout waiting for file to be deleted"):
while f_path.exists():
time.sleep(0.01)
except TimeoutException:
pass
finally:
self.join_thread()
start_time = time.monotonic()
while f_path.exists() and time.monotonic() - start_time < 2:
time.sleep(0.01)
self.join_thread()
self.assertTrue(f_path.exists(), "File deleted with available space")
@ -86,15 +112,10 @@ class TestDeleter(UploaderTestCase):
f_path = self.make_file_with_data(self.seg_dir, self.f_type, lock=True)
self.start_thread()
try:
with Timeout(2, "Timeout waiting for file to be deleted"):
while f_path.exists():
time.sleep(0.01)
except TimeoutException:
pass
finally:
self.join_thread()
start_time = time.monotonic()
while f_path.exists() and time.monotonic() - start_time < 2:
time.sleep(0.01)
self.join_thread()
self.assertTrue(f_path.exists(), "File deleted when locked")

@ -8,6 +8,7 @@ import time
import unittest
from collections import defaultdict
from pathlib import Path
from typing import Dict, List
import cereal.messaging as messaging
from cereal import log
@ -16,6 +17,8 @@ from common.basedir import BASEDIR
from common.params import Params
from common.timeout import Timeout
from system.loggerd.config import ROOT
from system.loggerd.xattr_cache import getxattr
from system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE
from selfdrive.manager.process_config import managed_processes
from system.version import get_version
from tools.lib.logreader import LogReader
@ -71,6 +74,30 @@ class TestLoggerd(unittest.TestCase):
end_type = SentinelType.endOfRoute if route else SentinelType.endOfSegment
self.assertTrue(msgs[-1].sentinel.type == end_type)
def _publish_random_messages(self, services: List[str]) -> Dict[str, list]:
pm = messaging.PubMaster(services)
managed_processes["loggerd"].start()
for s in services:
self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5))
sent_msgs = defaultdict(list)
for _ in range(random.randint(2, 10) * 100):
for s in services:
try:
m = messaging.new_message(s)
except Exception:
m = messaging.new_message(s, random.randint(2, 10))
pm.send(s, m)
sent_msgs[s].append(m)
time.sleep(0.01)
for s in services:
self.assertTrue(pm.wait_for_readers_to_update(s, timeout=5))
managed_processes["loggerd"].stop()
return sent_msgs
def test_init_data_values(self):
os.environ["CLEAN"] = random.choice(["0", "1"])
@ -193,29 +220,7 @@ class TestLoggerd(unittest.TestCase):
services = random.sample(qlog_services, random.randint(2, min(10, len(qlog_services)))) + \
random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services))))
pm = messaging.PubMaster(services)
# sleep enough for the first poll to time out
# TODO: fix loggerd bug dropping the msgs from the first poll
managed_processes["loggerd"].start()
for s in services:
while not pm.all_readers_updated(s):
time.sleep(0.1)
sent_msgs = defaultdict(list)
for _ in range(random.randint(2, 10) * 100):
for s in services:
try:
m = messaging.new_message(s)
except Exception:
m = messaging.new_message(s, random.randint(2, 10))
pm.send(s, m)
sent_msgs[s].append(m)
time.sleep(0.01)
time.sleep(1)
managed_processes["loggerd"].stop()
sent_msgs = self._publish_random_messages(services)
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog")
lr = list(LogReader(qlog_path))
@ -241,27 +246,7 @@ class TestLoggerd(unittest.TestCase):
def test_rlog(self):
services = random.sample(CEREAL_SERVICES, random.randint(5, 10))
pm = messaging.PubMaster(services)
# sleep enough for the first poll to time out
# TODO: fix loggerd bug dropping the msgs from the first poll
managed_processes["loggerd"].start()
for s in services:
while not pm.all_readers_updated(s):
time.sleep(0.1)
sent_msgs = defaultdict(list)
for _ in range(random.randint(2, 10) * 100):
for s in services:
try:
m = messaging.new_message(s)
except Exception:
m = messaging.new_message(s, random.randint(2, 10))
pm.send(s, m)
sent_msgs[s].append(m)
time.sleep(2)
managed_processes["loggerd"].stop()
sent_msgs = self._publish_random_messages(services)
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog")))
@ -276,6 +261,20 @@ class TestLoggerd(unittest.TestCase):
sent.clear_write_flag()
self.assertEqual(sent.to_bytes(), m.as_builder().to_bytes())
def test_preserving_flagged_segments(self):
services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) | {"userFlag"}
self._publish_random_messages(services)
segment_dir = self._get_latest_log_dir()
self.assertEqual(getxattr(segment_dir, PRESERVE_ATTR_NAME), PRESERVE_ATTR_VALUE)
def test_not_preserving_unflagged_segments(self):
services = set(random.sample(CEREAL_SERVICES, random.randint(5, 10))) - {"userFlag"}
self._publish_random_messages(services)
segment_dir = self._get_latest_log_dir()
self.assertIsNone(getxattr(segment_dir, PRESERVE_ATTR_NAME))
if __name__ == "__main__":
unittest.main()

@ -55,10 +55,10 @@ class TestUploader(UploaderTestCase):
def gen_files(self, lock=False, xattr: Optional[bytes] = None, boot=True) -> List[Path]:
f_paths = []
for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]:
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, xattr=xattr))
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock, upload_xattr=xattr))
if boot:
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, xattr=xattr))
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock, upload_xattr=xattr))
return f_paths
def gen_order(self, seg1: List[int], seg2: List[int], boot=True) -> List[str]:

Loading…
Cancel
Save