System: cleanup paths to use a common class (#29816)

* use OP prefix for logmessage

* cleanup paths too

* cleanup the paths too

* add hw.py to release

* fix those issues

* fix unittests

* fix unittests

* fix unittests

* do swaglog_ipc properly across all the files

* fix that

* fix swaglog in c++

* review suggestions
old-commit-hash: bfe990b112
test-msgs
Justin Newberry 2 years ago committed by GitHub
parent 2716ef3a23
commit c7029ea15a
  1. 5
      common/prefix.py
  2. 2
      common/swaglog.cc
  3. 3
      common/tests/test_swaglog.cc
  4. 6
      common/util.h
  5. 1
      release/files_common
  6. 18
      selfdrive/athena/athenad.py
  7. 13
      selfdrive/athena/tests/test_athenad.py
  8. 3
      selfdrive/test/helpers.py
  9. 12
      selfdrive/test/test_onroad.py
  10. 4
      selfdrive/tombstoned.py
  11. 32
      system/hardware/hw.h
  12. 29
      system/hardware/hw.py
  13. 12
      system/loggerd/config.py
  14. 10
      system/loggerd/deleter.py
  15. 5
      system/loggerd/tests/fill.py
  16. 15
      system/loggerd/tests/loggerd_tests_common.py
  17. 1
      system/loggerd/tests/test_deleter.py
  18. 12
      system/loggerd/tests/test_encoder.py
  19. 4
      system/loggerd/tests/test_loggerd.py
  20. 9
      system/loggerd/tests/test_uploader.py
  21. 4
      system/loggerd/tools/mark_all_uploaded.py
  22. 6
      system/loggerd/uploader.py
  23. 4
      system/logmessaged.py
  24. 14
      system/swaglog.py
  25. 17
      system/tests/test_logmessaged.py

@ -5,10 +5,11 @@ import uuid
from typing import Optional from typing import Optional
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.system.hardware.hw import Paths
class OpenpilotPrefix: class OpenpilotPrefix:
def __init__(self, prefix: Optional[str] = None, clean_dirs_on_exit: bool = True): def __init__(self, prefix: Optional[str] = None, clean_dirs_on_exit: bool = True):
self.prefix = prefix if prefix else str(uuid.uuid4()) self.prefix = prefix if prefix else str(uuid.uuid4().hex[0:15])
self.msgq_path = os.path.join('/dev/shm', self.prefix) self.msgq_path = os.path.join('/dev/shm', self.prefix)
self.clean_dirs_on_exit = clean_dirs_on_exit self.clean_dirs_on_exit = clean_dirs_on_exit
@ -18,6 +19,7 @@ class OpenpilotPrefix:
os.mkdir(self.msgq_path) os.mkdir(self.msgq_path)
except FileExistsError: except FileExistsError:
pass pass
os.makedirs(Paths.log_root(), exist_ok=True)
return self return self
@ -36,3 +38,4 @@ class OpenpilotPrefix:
shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True) shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True)
os.remove(symlink_path) os.remove(symlink_path)
shutil.rmtree(self.msgq_path, ignore_errors=True) shutil.rmtree(self.msgq_path, ignore_errors=True)
shutil.rmtree(Paths.log_root(), ignore_errors=True)

@ -20,7 +20,7 @@
class SwaglogState : public LogState { class SwaglogState : public LogState {
public: public:
SwaglogState() : LogState("ipc:///tmp/logmessage") {} SwaglogState() : LogState(Path::swaglog_ipc().c_str()) {}
json11::Json::object ctx_j; json11::Json::object ctx_j;

@ -9,7 +9,6 @@
#include "system/hardware/hw.h" #include "system/hardware/hw.h"
#include "third_party/json11/json11.hpp" #include "third_party/json11/json11.hpp"
const char *SWAGLOG_ADDR = "ipc:///tmp/logmessage";
std::string daemon_name = "testy"; std::string daemon_name = "testy";
std::string dongle_id = "test_dongle_id"; std::string dongle_id = "test_dongle_id";
int LINE_NO = 0; int LINE_NO = 0;
@ -25,7 +24,7 @@ void log_thread(int thread_id, int msg_cnt) {
void recv_log(int thread_cnt, int thread_msg_cnt) { void recv_log(int thread_cnt, int thread_msg_cnt) {
void *zctx = zmq_ctx_new(); void *zctx = zmq_ctx_new();
void *sock = zmq_socket(zctx, ZMQ_PULL); void *sock = zmq_socket(zctx, ZMQ_PULL);
zmq_bind(sock, SWAGLOG_ADDR); zmq_bind(sock, Path::swaglog_ipc().c_str());
std::vector<int> thread_msgs(thread_cnt); std::vector<int> thread_msgs(thread_cnt);
int total_count = 0; int total_count = 0;

@ -188,9 +188,9 @@ class LogState {
void *zctx = nullptr; void *zctx = nullptr;
void *sock = nullptr; void *sock = nullptr;
int print_level; int print_level;
const char* endpoint; std::string endpoint;
LogState(const char* _endpoint) { LogState(std::string _endpoint) {
endpoint = _endpoint; endpoint = _endpoint;
} }
@ -202,7 +202,7 @@ class LogState {
int timeout = 100; int timeout = 100;
zmq_setsockopt(sock, ZMQ_LINGER, &timeout, sizeof(timeout)); zmq_setsockopt(sock, ZMQ_LINGER, &timeout, sizeof(timeout));
zmq_connect(sock, endpoint); zmq_connect(sock, endpoint.c_str());
initialized = true; initialized = true;
} }

@ -209,6 +209,7 @@ system/hardware/__init__.py
system/hardware/base.h system/hardware/base.h
system/hardware/base.py system/hardware/base.py
system/hardware/hw.h system/hardware/hw.h
system/hardware/hw.py
system/hardware/tici/__init__.py system/hardware/tici/__init__.py
system/hardware/tici/hardware.h system/hardware/tici/hardware.h
system/hardware/tici/hardware.py system/hardware/tici/hardware.py

@ -36,11 +36,11 @@ from openpilot.common.file_helpers import CallbackReader
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import HARDWARE, PC, AGNOS from openpilot.system.hardware import HARDWARE, PC, AGNOS
from openpilot.system.loggerd.config import ROOT
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
from openpilot.selfdrive.statsd import STATS_DIR from openpilot.selfdrive.statsd import STATS_DIR
from openpilot.system.swaglog import SWAGLOG_DIR, cloudlog from openpilot.system.swaglog import cloudlog
from openpilot.system.version import get_commit, get_origin, get_short_branch, get_version from openpilot.system.version import get_commit, get_origin, get_short_branch, get_version
from selfdrive.hardware.hw import Paths
# TODO: use socket constant when mypy recognizes this as a valid attribute # TODO: use socket constant when mypy recognizes this as a valid attribute
@ -352,7 +352,7 @@ def scan_dir(path: str, prefix: str) -> List[str]:
# (glob and friends traverse entire dir tree) # (glob and friends traverse entire dir tree)
with os.scandir(path) as i: with os.scandir(path) as i:
for e in i: for e in i:
rel_path = os.path.relpath(e.path, ROOT) rel_path = os.path.relpath(e.path, Paths.log_root())
if e.is_dir(follow_symlinks=False): if e.is_dir(follow_symlinks=False):
# add trailing slash # add trailing slash
rel_path = os.path.join(rel_path, '') rel_path = os.path.join(rel_path, '')
@ -367,7 +367,7 @@ def scan_dir(path: str, prefix: str) -> List[str]:
@dispatcher.add_method @dispatcher.add_method
def listDataDirectory(prefix='') -> List[str]: def listDataDirectory(prefix='') -> List[str]:
return scan_dir(ROOT, prefix) return scan_dir(Paths.log_root(), prefix)
@dispatcher.add_method @dispatcher.add_method
@ -408,7 +408,7 @@ def uploadFilesToUrls(files_data: List[UploadFileDict]) -> UploadFilesToUrlRespo
failed.append(file.fn) failed.append(file.fn)
continue continue
path = os.path.join(ROOT, file.fn) path = os.path.join(Paths.log_root(), file.fn)
if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)): if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)):
failed.append(file.fn) failed.append(file.fn)
continue continue
@ -572,8 +572,8 @@ def get_logs_to_send_sorted() -> List[str]:
# TODO: scan once then use inotify to detect file creation/deletion # TODO: scan once then use inotify to detect file creation/deletion
curr_time = int(time.time()) curr_time = int(time.time())
logs = [] logs = []
for log_entry in os.listdir(SWAGLOG_DIR): for log_entry in os.listdir(Paths.swaglog_root()):
log_path = os.path.join(SWAGLOG_DIR, log_entry) log_path = os.path.join(Paths.swaglog_root(), log_entry)
time_sent = 0 time_sent = 0
try: try:
value = getxattr(log_path, LOG_ATTR_NAME) value = getxattr(log_path, LOG_ATTR_NAME)
@ -608,7 +608,7 @@ def log_handler(end_event: threading.Event) -> None:
cloudlog.debug(f"athena.log_handler.forward_request {log_entry}") cloudlog.debug(f"athena.log_handler.forward_request {log_entry}")
try: try:
curr_time = int(time.time()) curr_time = int(time.time())
log_path = os.path.join(SWAGLOG_DIR, log_entry) log_path = os.path.join(Paths.swaglog_root(), log_entry)
setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder))
with open(log_path) as f: with open(log_path) as f:
jsonrpc = { jsonrpc = {
@ -635,7 +635,7 @@ def log_handler(end_event: threading.Event) -> None:
log_success = "result" in log_resp and log_resp["result"].get("success") log_success = "result" in log_resp and log_resp["result"].get("success")
cloudlog.debug(f"athena.log_handler.forward_response {log_entry} {log_success}") cloudlog.debug(f"athena.log_handler.forward_response {log_entry} {log_success}")
if log_entry and log_success: if log_entry and log_success:
log_path = os.path.join(SWAGLOG_DIR, log_entry) log_path = os.path.join(Paths.swaglog_root(), log_entry)
try: try:
setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME)
except OSError: except OSError:

@ -3,7 +3,6 @@ import json
import os import os
import requests import requests
import shutil import shutil
import tempfile
import time import time
import threading import threading
import queue import queue
@ -18,11 +17,11 @@ from unittest import mock
from websocket import ABNF from websocket import ABNF
from websocket._exceptions import WebSocketConnectionClosedException from websocket._exceptions import WebSocketConnectionClosedException
from openpilot.system import swaglog
from openpilot.selfdrive.athena import athenad from openpilot.selfdrive.athena import athenad
from openpilot.selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher from openpilot.selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher
from openpilot.selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server from openpilot.selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server
from cereal import messaging from cereal import messaging
from selfdrive.hardware.hw import Paths
class TestAthenadMethods(unittest.TestCase): class TestAthenadMethods(unittest.TestCase):
@ -30,8 +29,6 @@ class TestAthenadMethods(unittest.TestCase):
def setUpClass(cls): def setUpClass(cls):
cls.SOCKET_PORT = 45454 cls.SOCKET_PORT = 45454
athenad.Params = MockParams athenad.Params = MockParams
athenad.ROOT = tempfile.mkdtemp()
athenad.SWAGLOG_DIR = swaglog.SWAGLOG_DIR = tempfile.mkdtemp()
athenad.Api = MockApi athenad.Api = MockApi
athenad.LOCAL_PORT_WHITELIST = {cls.SOCKET_PORT} athenad.LOCAL_PORT_WHITELIST = {cls.SOCKET_PORT}
@ -41,8 +38,8 @@ class TestAthenadMethods(unittest.TestCase):
athenad.cur_upload_items.clear() athenad.cur_upload_items.clear()
athenad.cancelled_uploads.clear() athenad.cancelled_uploads.clear()
for i in os.listdir(athenad.ROOT): for i in os.listdir(Paths.log_root()):
p = os.path.join(athenad.ROOT, i) p = os.path.join(Paths.log_root(), i)
if os.path.isdir(p): if os.path.isdir(p):
shutil.rmtree(p) shutil.rmtree(p)
else: else:
@ -61,7 +58,7 @@ class TestAthenadMethods(unittest.TestCase):
@staticmethod @staticmethod
def _create_file(file: str, parent: Optional[str] = None) -> str: def _create_file(file: str, parent: Optional[str] = None) -> str:
fn = os.path.join(athenad.ROOT if parent is None else parent, file) fn = os.path.join(Paths.log_root() if parent is None else parent, file)
os.makedirs(os.path.dirname(fn), exist_ok=True) os.makedirs(os.path.dirname(fn), exist_ok=True)
Path(fn).touch() Path(fn).touch()
return fn return fn
@ -418,7 +415,7 @@ class TestAthenadMethods(unittest.TestCase):
fl = list() fl = list()
for i in range(10): for i in range(10):
file = f'swaglog.{i:010}' file = f'swaglog.{i:010}'
self._create_file(file, athenad.SWAGLOG_DIR) self._create_file(file, Paths.swaglog_root())
fl.append(file) fl.append(file)
# ensure the list is all logs except most recent # ensure the list is all logs except most recent

@ -111,7 +111,4 @@ def string_context(context):
temporary_dir = temporary_mock_dir([], "temp_dir") temporary_dir = temporary_mock_dir([], "temp_dir")
temporary_cache_dir = temporary_mock_dir("openpilot.tools.lib.url_file.CACHE_DIR") temporary_cache_dir = temporary_mock_dir("openpilot.tools.lib.url_file.CACHE_DIR")
temporary_swaglog_dir = temporary_mock_dir("openpilot.system.swaglog.SWAGLOG_DIR", "temp_dir")
temporary_laikad_downloads_dir = temporary_mock_dir("openpilot.selfdrive.locationd.laikad.DOWNLOADS_CACHE_FOLDER") temporary_laikad_downloads_dir = temporary_mock_dir("openpilot.selfdrive.locationd.laikad.DOWNLOADS_CACHE_FOLDER")
temporary_swaglog_ipc = temporary_mock_dir(["openpilot.system.swaglog.SWAGLOG_IPC", "system.logmessaged.SWAGLOG_IPC"],
generator=string_context("/tmp/test_swaglog_ipc"))

@ -19,8 +19,8 @@ from openpilot.common.timeout import Timeout
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.selfdrive.controls.lib.events import EVENTS, ET from openpilot.selfdrive.controls.lib.events import EVENTS, ET
from openpilot.system.hardware import HARDWARE from openpilot.system.hardware import HARDWARE
from openpilot.system.loggerd.config import ROOT
from openpilot.selfdrive.test.helpers import set_params_enabled, release_only from openpilot.selfdrive.test.helpers import set_params_enabled, release_only
from openpilot.system.hardware.hw import Paths
from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.logreader import LogReader
# Baseline CPU usage by process # Baseline CPU usage by process
@ -102,7 +102,7 @@ class TestOnroad(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
if "DEBUG" in os.environ: if "DEBUG" in os.environ:
segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog")), Path(ROOT).iterdir()) segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog")), Path(Paths.log_root()).iterdir())
segs = sorted(segs, key=lambda x: x.stat().st_mtime) segs = sorted(segs, key=lambda x: x.stat().st_mtime)
print(segs[-3]) print(segs[-3])
cls.lr = list(LogReader(os.path.join(segs[-3], "rlog"))) cls.lr = list(LogReader(os.path.join(segs[-3], "rlog")))
@ -115,8 +115,8 @@ class TestOnroad(unittest.TestCase):
params.remove("CurrentRoute") params.remove("CurrentRoute")
set_params_enabled() set_params_enabled()
os.environ['TESTING_CLOSET'] = '1' os.environ['TESTING_CLOSET'] = '1'
if os.path.exists(ROOT): if os.path.exists(Paths.log_root()):
shutil.rmtree(ROOT) shutil.rmtree(Paths.log_root())
os.system("rm /dev/shm/*") os.system("rm /dev/shm/*")
# Make sure athena isn't running # Make sure athena isn't running
@ -143,8 +143,8 @@ class TestOnroad(unittest.TestCase):
while len(cls.segments) < 3: while len(cls.segments) < 3:
segs = set() segs = set()
if Path(ROOT).exists(): if Path(Paths.log_root()).exists():
segs = set(Path(ROOT).glob(f"{route}--*")) segs = set(Path(Paths.log_root()).glob(f"{route}--*"))
cls.segments = sorted(segs, key=lambda s: int(str(s).rsplit('--')[-1])) cls.segments = sorted(segs, key=lambda s: int(str(s).rsplit('--')[-1]))
time.sleep(2) time.sleep(2)

@ -10,8 +10,8 @@ import glob
from typing import NoReturn from typing import NoReturn
from openpilot.common.file_helpers import mkdirs_exists_ok from openpilot.common.file_helpers import mkdirs_exists_ok
from openpilot.system.loggerd.config import ROOT
import openpilot.selfdrive.sentry as sentry import openpilot.selfdrive.sentry as sentry
from openpilot.system.hardware.hw import Paths
from openpilot.system.swaglog import cloudlog from openpilot.system.swaglog import cloudlog
from openpilot.system.version import get_commit from openpilot.system.version import get_commit
@ -130,7 +130,7 @@ def report_tombstone_apport(fn):
new_fn = f"{date}_{get_commit(default='nocommit')[:8]}_{safe_fn(clean_path)}"[:MAX_TOMBSTONE_FN_LEN] new_fn = f"{date}_{get_commit(default='nocommit')[:8]}_{safe_fn(clean_path)}"[:MAX_TOMBSTONE_FN_LEN]
crashlog_dir = os.path.join(ROOT, "crash") crashlog_dir = os.path.join(Paths.log_root(), "crash")
mkdirs_exists_ok(crashlog_dir) mkdirs_exists_ok(crashlog_dir)
# Files could be on different filesystems, copy, then delete # Files could be on different filesystems, copy, then delete

@ -14,16 +14,30 @@
#endif #endif
namespace Path { namespace Path {
inline std::string log_root() { inline std::string openpilot_prefix() {
return util::getenv("OPENPILOT_PREFIX", "");
}
inline std::string comma_home() {
return util::getenv("HOME") + "/.comma" + Path::openpilot_prefix();
}
inline std::string log_root() {
if (const char *env = getenv("LOG_ROOT")) { if (const char *env = getenv("LOG_ROOT")) {
return env; return env;
} }
return Hardware::PC() ? util::getenv("HOME") + "/.comma/media/0/realdata" : "/data/media/0/realdata"; return Hardware::PC() ? Path::comma_home() + "/media/0/realdata" : "/data/media/0/realdata";
} }
inline std::string params() {
return Hardware::PC() ? util::getenv("PARAMS_ROOT", util::getenv("HOME") + "/.comma/params") : "/data/params"; inline std::string params() {
} return Hardware::PC() ? util::getenv("PARAMS_ROOT", Path::comma_home() + "/params") : "/data/params";
inline std::string rsa_file() { }
return Hardware::PC() ? util::getenv("HOME") + "/.comma/persist/comma/id_rsa" : "/persist/comma/id_rsa";
} inline std::string rsa_file() {
return Hardware::PC() ? Path::comma_home() + "/persist/comma/id_rsa" : "/persist/comma/id_rsa";
}
inline std::string swaglog_ipc() {
return "ipc:///tmp/logmessage" + Path::openpilot_prefix();
}
} // namespace Path } // namespace Path

@ -0,0 +1,29 @@
import os
from pathlib import Path
from openpilot.selfdrive.hardware import PC
class Paths:
@staticmethod
def comma_home() -> str:
return os.path.join(str(Path.home()), ".comma" + os.environ.get("OPENPILOT_PREFIX", ""))
@staticmethod
def log_root() -> str:
if os.environ.get('LOG_ROOT', False):
return os.environ['LOG_ROOT']
elif PC:
return str(Path(Paths.comma_home()) / "media" / "0" / "realdata")
else:
return '/data/media/0/realdata/'
@staticmethod
def swaglog_root() -> str:
if PC:
return os.path.join(Paths.comma_home(), "log")
else:
return "/data/log/"
@staticmethod
def swaglog_ipc() -> str:
return "ipc:///tmp/logmessage" + os.environ.get("OPENPILOT_PREFIX", "")

@ -1,13 +1,7 @@
import os import os
from pathlib import Path from pathlib import Path
from openpilot.system.hardware import PC from openpilot.system.hardware import PC
from openpilot.selfdrive.hardware.hw import Paths
if os.environ.get('LOG_ROOT', False):
ROOT = os.environ['LOG_ROOT']
elif PC:
ROOT = str(Path.home() / ".comma" / "media" / "0" / "realdata")
else:
ROOT = '/data/media/0/realdata/'
CAMERA_FPS = 20 CAMERA_FPS = 20
@ -23,7 +17,7 @@ STATS_FLUSH_TIME_S = 60
def get_available_percent(default=None): def get_available_percent(default=None):
try: try:
statvfs = os.statvfs(ROOT) statvfs = os.statvfs(Paths.log_root())
available_percent = 100.0 * statvfs.f_bavail / statvfs.f_blocks available_percent = 100.0 * statvfs.f_bavail / statvfs.f_blocks
except OSError: except OSError:
available_percent = default available_percent = default
@ -33,7 +27,7 @@ def get_available_percent(default=None):
def get_available_bytes(default=None): def get_available_bytes(default=None):
try: try:
statvfs = os.statvfs(ROOT) statvfs = os.statvfs(Paths.log_root())
available_bytes = statvfs.f_bavail * statvfs.f_frsize available_bytes = statvfs.f_bavail * statvfs.f_frsize
except OSError: except OSError:
available_bytes = default available_bytes = default

@ -3,9 +3,9 @@ import os
import shutil import shutil
import threading import threading
from typing import List from typing import List
from openpilot.system.hardware.hw import Paths
from openpilot.system.swaglog import cloudlog from openpilot.system.swaglog import cloudlog
from openpilot.system.loggerd.config import ROOT, get_available_bytes, get_available_percent from openpilot.system.loggerd.config import get_available_bytes, get_available_percent
from openpilot.system.loggerd.uploader import listdir_by_creation from openpilot.system.loggerd.uploader import listdir_by_creation
from openpilot.system.loggerd.xattr_cache import getxattr from openpilot.system.loggerd.xattr_cache import getxattr
@ -20,7 +20,7 @@ PRESERVE_COUNT = 5
def has_preserve_xattr(d: str) -> bool: def has_preserve_xattr(d: str) -> bool:
return getxattr(os.path.join(ROOT, d), PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE return getxattr(os.path.join(Paths.log_root(), d), PRESERVE_ATTR_NAME) == PRESERVE_ATTR_VALUE
def get_preserved_segments(dirs_by_creation: List[str]) -> List[str]: def get_preserved_segments(dirs_by_creation: List[str]) -> List[str]:
@ -51,14 +51,14 @@ def deleter_thread(exit_event):
out_of_percent = get_available_percent(default=MIN_PERCENT + 1) < MIN_PERCENT out_of_percent = get_available_percent(default=MIN_PERCENT + 1) < MIN_PERCENT
if out_of_percent or out_of_bytes: if out_of_percent or out_of_bytes:
dirs = listdir_by_creation(ROOT) dirs = listdir_by_creation(Paths.log_root())
# skip deleting most recent N preserved segments (and their prior segment) # skip deleting most recent N preserved segments (and their prior segment)
preserved_dirs = get_preserved_segments(dirs) preserved_dirs = get_preserved_segments(dirs)
# remove the earliest directory we can # remove the earliest directory we can
for delete_dir in sorted(dirs, key=lambda d: (d in DELETE_LAST, d in preserved_dirs)): for delete_dir in sorted(dirs, key=lambda d: (d in DELETE_LAST, d in preserved_dirs)):
delete_path = os.path.join(ROOT, delete_dir) delete_path = os.path.join(Paths.log_root(), delete_dir)
if any(name.endswith(".lock") for name in os.listdir(delete_path)): if any(name.endswith(".lock") for name in os.listdir(delete_path)):
continue continue

@ -3,7 +3,8 @@
from pathlib import Path from pathlib import Path
from openpilot.system.loggerd.config import ROOT, get_available_percent from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.config import get_available_percent
from openpilot.system.loggerd.tests.loggerd_tests_common import create_random_file from openpilot.system.loggerd.tests.loggerd_tests_common import create_random_file
@ -11,7 +12,7 @@ if __name__ == "__main__":
segment_idx = 0 segment_idx = 0
while True: while True:
seg_name = f"1970-01-01--00-00-00--{segment_idx}" seg_name = f"1970-01-01--00-00-00--{segment_idx}"
seg_path = Path(ROOT) / seg_name seg_path = Path(Paths.log_root()) / seg_name
print(seg_path) print(seg_path)

@ -1,11 +1,9 @@
import os import os
import errno
import shutil
import random import random
import tempfile
import unittest import unittest
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from openpilot.system.hardware.hw import Paths
import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.deleter as deleter
import openpilot.system.loggerd.uploader as uploader import openpilot.system.loggerd.uploader as uploader
@ -87,8 +85,6 @@ class UploaderTestCase(unittest.TestCase):
uploader.Api = MockApiIgnore uploader.Api = MockApiIgnore
def setUp(self): def setUp(self):
self.root = Path(tempfile.mkdtemp())
uploader.ROOT = str(self.root) # Monkey patch root dir
uploader.Api = MockApi uploader.Api = MockApi
uploader.Params = MockParams uploader.Params = MockParams
uploader.fake_upload = True uploader.fake_upload = True
@ -99,16 +95,9 @@ class UploaderTestCase(unittest.TestCase):
self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_format2 = "2019-05-18--11-22-33--{}"
self.seg_dir = self.seg_format.format(self.seg_num) self.seg_dir = self.seg_format.format(self.seg_num)
def tearDown(self):
try:
shutil.rmtree(self.root)
except OSError as e:
if e.errno != errno.ENOENT:
raise
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path:
file_path = self.root / f_dir / fn file_path = Path(Paths.log_root()) / f_dir / fn
create_random_file(file_path, size_mb, lock, upload_xattr) create_random_file(file_path, size_mb, lock, upload_xattr)
if preserve_xattr is not None: if preserve_xattr is not None:

@ -22,7 +22,6 @@ class TestDeleter(UploaderTestCase):
super().setUp() super().setUp()
self.fake_stats = Stats(f_bavail=0, f_blocks=10, f_frsize=4096) self.fake_stats = Stats(f_bavail=0, f_blocks=10, f_frsize=4096)
deleter.os.statvfs = self.fake_statvfs deleter.os.statvfs = self.fake_statvfs
deleter.ROOT = str(self.root)
def start_thread(self): def start_thread(self):
self.end_event = threading.Event() self.end_event = threading.Event()

@ -14,9 +14,9 @@ from tqdm import trange
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.timeout import Timeout from openpilot.common.timeout import Timeout
from openpilot.system.hardware import TICI from openpilot.system.hardware import TICI
from openpilot.system.loggerd.config import ROOT
from openpilot.selfdrive.manager.process_config import managed_processes from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.hardware.hw import Paths
SEGMENT_LENGTH = 2 SEGMENT_LENGTH = 2
FULL_SIZE = 2507572 FULL_SIZE = 2507572
@ -48,12 +48,12 @@ class TestEncoder(unittest.TestCase):
self._clear_logs() self._clear_logs()
def _clear_logs(self): def _clear_logs(self):
if os.path.exists(ROOT): if os.path.exists(Paths.log_root()):
shutil.rmtree(ROOT) shutil.rmtree(Paths.log_root())
def _get_latest_segment_path(self): def _get_latest_segment_path(self):
last_route = sorted(Path(ROOT).iterdir())[-1] last_route = sorted(Path(Paths.log_root()).iterdir())[-1]
return os.path.join(ROOT, last_route) return os.path.join(Paths.log_root(), last_route)
# TODO: this should run faster than real time # TODO: this should run faster than real time
@parameterized.expand([(True, ), (False, )]) @parameterized.expand([(True, ), (False, )])
@ -146,7 +146,7 @@ class TestEncoder(unittest.TestCase):
for i in trange(num_segments): for i in trange(num_segments):
# poll for next segment # poll for next segment
with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"): with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"):
while Path(f"{route_prefix_path}--{i+1}") not in Path(ROOT).iterdir(): while Path(f"{route_prefix_path}--{i+1}") not in Path(Paths.log_root()).iterdir():
time.sleep(0.1) time.sleep(0.1)
check_seg(i) check_seg(i)
finally: finally:

@ -16,7 +16,7 @@ from cereal.services import service_list
from openpilot.common.basedir import BASEDIR from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.timeout import Timeout from openpilot.common.timeout import Timeout
from openpilot.system.loggerd.config import ROOT from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import getxattr from openpilot.system.loggerd.xattr_cache import getxattr
from openpilot.system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE from openpilot.system.loggerd.deleter import PRESERVE_ATTR_NAME, PRESERVE_ATTR_VALUE
from openpilot.selfdrive.manager.process_config import managed_processes from openpilot.selfdrive.manager.process_config import managed_processes
@ -33,7 +33,7 @@ CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in service_list
class TestLoggerd(unittest.TestCase): class TestLoggerd(unittest.TestCase):
def _get_latest_log_dir(self): def _get_latest_log_dir(self):
log_dirs = sorted(Path(ROOT).iterdir(), key=lambda f: f.stat().st_mtime) log_dirs = sorted(Path(Paths.log_root()).iterdir(), key=lambda f: f.stat().st_mtime)
return log_dirs[-1] return log_dirs[-1]
def _get_log_dir(self, x): def _get_log_dir(self, x):

@ -7,6 +7,7 @@ import logging
import json import json
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
from openpilot.system.hardware.hw import Paths
from openpilot.system.swaglog import cloudlog from openpilot.system.swaglog import cloudlog
from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE
@ -84,7 +85,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order: for f_path in exp_order:
self.assertEqual(os.getxattr((self.root / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -102,7 +103,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order: for f_path in exp_order:
self.assertEqual(os.getxattr((self.root / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -121,7 +122,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore") self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore")
self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice") self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice")
for f_path in exp_order: for f_path in exp_order:
self.assertEqual(os.getxattr((self.root / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not ignored") self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not ignored")
self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order") self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order")
@ -146,7 +147,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload") self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice") self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order: for f_path in exp_order:
self.assertEqual(os.getxattr((self.root / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded") self.assertEqual(os.getxattr((Path(Paths.log_root()) / f_path).with_suffix(""), UPLOAD_ATTR_NAME), UPLOAD_ATTR_VALUE, "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order") self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")

@ -1,8 +1,8 @@
import os import os
from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.uploader import UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE from openpilot.system.loggerd.uploader import UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE
from openpilot.system.loggerd.config import ROOT for folder in os.walk(Paths.log_root()):
for folder in os.walk(ROOT):
for file1 in folder[2]: for file1 in folder[2]:
full_path = os.path.join(folder[0], file1) full_path = os.path.join(folder[0], file1)
os.setxattr(full_path, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) os.setxattr(full_path, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE)

@ -17,8 +17,8 @@ from openpilot.common.api import Api
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import TICI from openpilot.system.hardware import TICI
from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
from openpilot.system.loggerd.config import ROOT
from openpilot.system.swaglog import cloudlog from openpilot.system.swaglog import cloudlog
NetworkType = log.DeviceState.NetworkType NetworkType = log.DeviceState.NetworkType
@ -244,7 +244,7 @@ def uploader_fn(exit_event: threading.Event) -> None:
except Exception: except Exception:
cloudlog.exception("failed to set core affinity") cloudlog.exception("failed to set core affinity")
clear_locks(ROOT) clear_locks(Paths.log_root())
params = Params() params = Params()
dongle_id = params.get("DongleId", encoding='utf8') dongle_id = params.get("DongleId", encoding='utf8')
@ -258,7 +258,7 @@ def uploader_fn(exit_event: threading.Event) -> None:
sm = messaging.SubMaster(['deviceState']) sm = messaging.SubMaster(['deviceState'])
pm = messaging.PubMaster(['uploaderState']) pm = messaging.PubMaster(['uploaderState'])
uploader = Uploader(dongle_id, ROOT) uploader = Uploader(dongle_id, Paths.log_root())
backoff = 0.1 backoff = 0.1
while not exit_event.is_set(): while not exit_event.is_set():

@ -4,8 +4,8 @@ from typing import NoReturn
import cereal.messaging as messaging import cereal.messaging as messaging
from openpilot.common.logging_extra import SwagLogFileFormatter from openpilot.common.logging_extra import SwagLogFileFormatter
from openpilot.system.hardware.hw import Paths
from openpilot.system.swaglog import get_file_handler from openpilot.system.swaglog import get_file_handler
from system.swaglog import SWAGLOG_IPC
def main() -> NoReturn: def main() -> NoReturn:
@ -15,7 +15,7 @@ def main() -> NoReturn:
ctx = zmq.Context.instance() ctx = zmq.Context.instance()
sock = ctx.socket(zmq.PULL) sock = ctx.socket(zmq.PULL)
sock.bind(f"ipc://{SWAGLOG_IPC}") sock.bind(Paths.swaglog_ipc())
# and we publish them # and we publish them
log_message_sock = messaging.pub_sock('logMessage') log_message_sock = messaging.pub_sock('logMessage')

@ -8,18 +8,12 @@ from logging.handlers import BaseRotatingHandler
import zmq import zmq
from openpilot.common.logging_extra import SwagLogger, SwagFormatter, SwagLogFileFormatter from openpilot.common.logging_extra import SwagLogger, SwagFormatter, SwagLogFileFormatter
from openpilot.system.hardware import PC from system.hardware.hw import Paths
if PC:
SWAGLOG_DIR = os.path.join(str(Path.home()), ".comma", "log")
else:
SWAGLOG_DIR = "/data/log/"
SWAGLOG_IPC = "/tmp/logmessage"
def get_file_handler(): def get_file_handler():
Path(SWAGLOG_DIR).mkdir(parents=True, exist_ok=True) Path(Paths.swaglog_root()).mkdir(parents=True, exist_ok=True)
base_filename = os.path.join(SWAGLOG_DIR, "swaglog") base_filename = os.path.join(Paths.swaglog_root(), "swaglog")
handler = SwaglogRotatingFileHandler(base_filename) handler = SwaglogRotatingFileHandler(base_filename)
return handler return handler
@ -91,7 +85,7 @@ class UnixDomainSocketHandler(logging.Handler):
self.zctx = zmq.Context() self.zctx = zmq.Context()
self.sock = self.zctx.socket(zmq.PUSH) self.sock = self.zctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, 10) self.sock.setsockopt(zmq.LINGER, 10)
self.sock.connect(f"ipc://{SWAGLOG_IPC}") self.sock.connect(Paths.swaglog_ipc())
self.pid = os.getpid() self.pid = os.getpid()
def emit(self, record): def emit(self, record):

@ -6,17 +6,16 @@ import unittest
import cereal.messaging as messaging import cereal.messaging as messaging
from openpilot.selfdrive.manager.process_config import managed_processes from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.system.hardware.hw import Paths
from openpilot.system.swaglog import cloudlog, ipchandler from openpilot.system.swaglog import cloudlog, ipchandler
from openpilot.selfdrive.test.helpers import temporary_swaglog_dir, temporary_swaglog_ipc
class TestLogmessaged(unittest.TestCase): class TestLogmessaged(unittest.TestCase):
def _setup(self, temp_dir): def setUp(self):
# clear the IPC buffer in case some other tests used cloudlog and filled it # clear the IPC buffer in case some other tests used cloudlog and filled it
ipchandler.close() ipchandler.close()
ipchandler.connect() ipchandler.connect()
self.temp_dir = temp_dir
managed_processes['logmessaged'].start() managed_processes['logmessaged'].start()
self.sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False) self.sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False)
self.error_sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False) self.error_sock = messaging.sub_sock("logMessage", timeout=1000, conflate=False)
@ -32,12 +31,9 @@ class TestLogmessaged(unittest.TestCase):
managed_processes['logmessaged'].stop(block=True) managed_processes['logmessaged'].stop(block=True)
def _get_log_files(self): def _get_log_files(self):
return list(glob.glob(os.path.join(self.temp_dir, "swaglog.*"))) return list(glob.glob(os.path.join(Paths.swaglog_root(), "swaglog.*")))
@temporary_swaglog_dir def test_simple_log(self):
@temporary_swaglog_ipc
def test_simple_log(self, temp_dir):
self._setup(temp_dir)
msgs = [f"abc {i}" for i in range(10)] msgs = [f"abc {i}" for i in range(10)]
for m in msgs: for m in msgs:
cloudlog.error(m) cloudlog.error(m)
@ -46,10 +42,7 @@ class TestLogmessaged(unittest.TestCase):
assert len(m) == len(msgs) assert len(m) == len(msgs)
assert len(self._get_log_files()) >= 1 assert len(self._get_log_files()) >= 1
@temporary_swaglog_dir def test_big_log(self):
@temporary_swaglog_ipc
def test_big_log(self, temp_dir):
self._setup(temp_dir)
n = 10 n = 10
msg = "a"*3*1024*1024 msg = "a"*3*1024*1024
for _ in range(n): for _ in range(n):

Loading…
Cancel
Save