diff --git a/Pipfile b/Pipfile index 9ca66bf126..2b65186abf 100644 --- a/Pipfile +++ b/Pipfile @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:a6f7dbffcf05f234ff043da28079ef4747d1c7e0ce3240a646ceacb5ef6fc5e9 -size 1999 +oid sha256:d3354b5ea4f83ca0341a98695a1e513665397cf960c81038f010929584982496 +size 1886 diff --git a/Pipfile.lock b/Pipfile.lock index d640b17edb..962b4e0240 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:d4ac0578553fd40f90df291ebfac29a9447955ead9c70d21b16c35e0f0c467d2 -size 199443 +oid sha256:013025b078fa735d537faaa172ead548652b456afbd89b631df25fe18360d7d3 +size 202630 diff --git a/common/dict_helpers.py b/common/dict_helpers.py new file mode 100644 index 0000000000..62cff63b58 --- /dev/null +++ b/common/dict_helpers.py @@ -0,0 +1,9 @@ +# remove all keys that end in DEPRECATED +def strip_deprecated_keys(d): + for k in list(d.keys()): + if isinstance(k, str): + if k.endswith('DEPRECATED'): + d.pop(k) + elif isinstance(d[k], dict): + strip_deprecated_keys(d[k]) + return d diff --git a/common/logging_extra.py b/common/logging_extra.py index ce8889b410..a80a5a0821 100644 --- a/common/logging_extra.py +++ b/common/logging_extra.py @@ -3,6 +3,7 @@ import os import sys import copy import json +import uuid import socket import logging import traceback @@ -62,8 +63,48 @@ class SwagFormatter(logging.Formatter): return record_dict def format(self, record): + if self.swaglogger is None: + raise Exception("must set swaglogger before calling format()") return json_robust_dumps(self.format_dict(record)) +class SwagLogFileFormatter(SwagFormatter): + def fix_kv(self, k, v): + # append type to names to preserve legacy naming in logs + # avoids overlapping key namespaces with different types + # e.g. log.info() creates 'msg' -> 'msg$s' + # log.event() creates 'msg.health.logMonoTime' -> 'msg.health.logMonoTime$i' + # because overlapping namespace 'msg' caused problems + if isinstance(v, (str, bytes)): + k += "$s" + elif isinstance(v, float): + k += "$f" + elif isinstance(v, bool): + k += "$b" + elif isinstance(v, int): + k += "$i" + elif isinstance(v, dict): + nv = {} + for ik, iv in v.items(): + ik, iv = self.fix_kv(ik, iv) + nv[ik] = iv + v = nv + elif isinstance(v, list): + k += "$a" + return k, v + + def format(self, record): + if isinstance(record, str): + v = json.loads(record) + else: + v = self.format_dict(record) + + mk, mv = self.fix_kv('msg', v['msg']) + del v['msg'] + v[mk] = mv + v['id'] = uuid.uuid4().hex + + return json_robust_dumps(v) + class SwagErrorFilter(logging.Filter): def filter(self, record): return record.levelno < logging.ERROR diff --git a/release/files_common b/release/files_common index 4cf62be815..71fca93bc0 100644 --- a/release/files_common +++ b/release/files_common @@ -29,6 +29,7 @@ common/params_pyx.pyx common/xattr.py common/profiler.py common/basedir.py +common/dict_helpers.py common/filter_simple.py common/stat_live.py common/spinner.py diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index c3645ed8d6..b5d4860553 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -4,6 +4,7 @@ import hashlib import io import json import os +import sys import queue import random import select @@ -24,18 +25,24 @@ from common.api import Api from common.basedir import PERSIST from common.params import Params from common.realtime import sec_since_boot -from selfdrive.hardware import HARDWARE +from selfdrive.hardware import HARDWARE, PC from selfdrive.loggerd.config import ROOT -from selfdrive.swaglog import cloudlog +from selfdrive.loggerd.xattr_cache import getxattr, setxattr +from selfdrive.swaglog import cloudlog, SWAGLOG_DIR ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) LOCAL_PORT_WHITELIST = set([8022]) +LOG_ATTR_NAME = 'user.upload' +LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder) + dispatcher["echo"] = lambda s: s -payload_queue: Any = queue.Queue() -response_queue: Any = queue.Queue() +recv_queue: Any = queue.Queue() +send_queue: Any = queue.Queue() upload_queue: Any = queue.Queue() +log_send_queue: Any = queue.Queue() +log_recv_queue: Any = queue.Queue() cancelled_uploads: Any = set() UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id']) @@ -46,7 +53,8 @@ def handle_long_poll(ws): threads = [ threading.Thread(target=ws_recv, args=(ws, end_event)), threading.Thread(target=ws_send, args=(ws, end_event)), - threading.Thread(target=upload_handler, args=(end_event,)) + threading.Thread(target=upload_handler, args=(end_event,)), + threading.Thread(target=log_handler, args=(end_event,)), ] + [ threading.Thread(target=jsonrpc_handler, args=(end_event,)) for x in range(HANDLER_THREADS) @@ -64,19 +72,21 @@ def handle_long_poll(ws): for thread in threads: thread.join() - def jsonrpc_handler(end_event): dispatcher["startLocalProxy"] = partial(startLocalProxy, end_event) while not end_event.is_set(): try: - data = payload_queue.get(timeout=1) - response = JSONRPCResponseManager.handle(data, dispatcher) - response_queue.put_nowait(response) + data = recv_queue.get(timeout=1) + if "method" in data and "params" in data: + response = JSONRPCResponseManager.handle(data, dispatcher) + send_queue.put_nowait(response.json) + elif "result" in data and "id" in data: + log_recv_queue.put_nowait(data) except queue.Empty: pass except Exception as e: cloudlog.exception("athena jsonrpc handler failed") - response_queue.put_nowait(json.dumps({"error": str(e)})) + send_queue.put_nowait(json.dumps({"error": str(e)})) def upload_handler(end_event): @@ -244,6 +254,82 @@ def takeSnapshot(): raise Exception("not available while camerad is started") +def get_logs_to_send_sorted(): + # TODO: scan once then use inotify to detect file creation/deletion + curr_time = int(time.time()) + logs = [] + for log_entry in os.listdir(SWAGLOG_DIR): + log_path = os.path.join(SWAGLOG_DIR, log_entry) + try: + time_sent = int.from_bytes(getxattr(log_path, LOG_ATTR_NAME), sys.byteorder) + except (ValueError, TypeError): + time_sent = 0 + # assume send failed and we lost the response if sent more than one hour ago + if not time_sent or curr_time - time_sent > 3600: + logs.append(log_entry) + # return logs in order they should be sent + # excluding most recent (active) log file + return sorted(logs[:-1]) + + +def log_handler(end_event): + if PC: + return + + log_files = [] + last_scan = 0 + log_retries = 0 + while not end_event.is_set(): + try: + try: + result = json.loads(log_recv_queue.get(timeout=1)) + log_success = result.get("success") + log_entry = result.get("id") + log_path = os.path.join(SWAGLOG_DIR, log_entry) + if log_entry and log_success: + try: + setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) + except OSError: + pass # file could be deleted by log rotation + except queue.Empty: + pass + + curr_scan = sec_since_boot() + if curr_scan - last_scan > 10: + log_files = get_logs_to_send_sorted() + last_scan = curr_scan + + # never send last log file because it is the active log + # and only send one log file at a time (most recent first) + if not len(log_files) or not log_send_queue.empty(): + continue + + log_entry = log_files.pop() + try: + curr_time = int(time.time()) + log_path = os.path.join(SWAGLOG_DIR, log_entry) + setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) + with open(log_path, "r") as f: + jsonrpc = { + "method": "forwardLogs", + "params": { + "logs": f.read() + }, + "jsonrpc": "2.0", + "id": log_entry + } + log_send_queue.put_nowait(json.dumps(jsonrpc)) + except OSError: + pass # file could be deleted by log rotation + log_retries = 0 + except Exception: + cloudlog.exception("athena.log_handler.exception") + log_retries += 1 + + if log_retries != 0: + time.sleep(backoff(log_retries)) + + def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event): while not (end_event.is_set() or global_end_event.is_set()): try: @@ -290,7 +376,7 @@ def ws_recv(ws, end_event): if opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY): if opcode == ABNF.OPCODE_TEXT: data = data.decode("utf-8") - payload_queue.put_nowait(data) + recv_queue.put_nowait(data) elif opcode == ABNF.OPCODE_PING: Params().put("LastAthenaPingTime", str(int(sec_since_boot() * 1e9))) except WebSocketTimeoutException: @@ -303,8 +389,11 @@ def ws_recv(ws, end_event): def ws_send(ws, end_event): while not end_event.is_set(): try: - response = response_queue.get(timeout=1) - ws.send(response.json) + try: + data = send_queue.get_nowait() + except queue.Empty: + data = log_send_queue.get(timeout=1) + ws.send(data) except queue.Empty: pass except Exception: diff --git a/selfdrive/athena/tests/helpers.py b/selfdrive/athena/tests/helpers.py index ef64968f7b..5a6fba362d 100644 --- a/selfdrive/athena/tests/helpers.py +++ b/selfdrive/athena/tests/helpers.py @@ -91,7 +91,7 @@ def with_http_server(func): p.start() time.sleep(0.1) - with Timeout(2): + with Timeout(2, 'HTTP Server seeding failed'): while True: try: requests.put(f'http://{host}:{port}/qlog.bz2', data='') diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 42eb5ca318..2e9e79b651 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -103,6 +103,7 @@ class TestAthenadMethods(unittest.TestCase): athenad.upload_queue.put_nowait(item) try: + time.sleep(1) # give it time to process to prevent shutdown before upload completes now = time.time() while time.time() - now < 5: if athenad.upload_queue.qsize() == 0: @@ -178,10 +179,14 @@ class TestAthenadMethods(unittest.TestCase): thread = threading.Thread(target=athenad.jsonrpc_handler, args=(end_event,)) thread.daemon = True thread.start() - athenad.payload_queue.put_nowait(json.dumps({"method": "echo", "params": ["hello"], "jsonrpc": "2.0", "id": 0})) try: - resp = athenad.response_queue.get(timeout=3) - self.assertDictEqual(resp.data, {'result': 'hello', 'id': 0, 'jsonrpc': '2.0'}) + athenad.recv_queue.put_nowait(json.dumps({"method": "echo", "params": ["hello"], "jsonrpc": "2.0", "id": 0})) + resp = athenad.send_queue.get(timeout=3) + self.assertDictEqual(json.loads(resp), {'result': 'hello', 'id': 0, 'jsonrpc': '2.0'}) + + athenad.recv_queue.put_nowait(json.dumps({'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'})) + resp = athenad.log_recv_queue.get(timeout=3) + self.assertDictEqual(json.loads(resp), {'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'}) finally: end_event.set() thread.join() diff --git a/selfdrive/logmessaged.py b/selfdrive/logmessaged.py index 953f3d477f..17ef42cd61 100755 --- a/selfdrive/logmessaged.py +++ b/selfdrive/logmessaged.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 import zmq import cereal.messaging as messaging -from selfdrive.swaglog import get_le_handler +from common.logging_extra import SwagLogFileFormatter +from selfdrive.swaglog import get_file_handler def main(): - le_handler = get_le_handler() - le_level = 20 # logging.INFO + log_handler = get_file_handler() + log_handler.setFormatter(SwagLogFileFormatter(None)) + log_level = 20 # logging.INFO ctx = zmq.Context().instance() sock = ctx.socket(zmq.PULL) @@ -17,19 +19,14 @@ def main(): while True: dat = b''.join(sock.recv_multipart()) - dat = dat.decode('utf8') - - levelnum = ord(dat[0]) - dat = dat[1:] - - if levelnum >= le_level: - # push to logentries - # TODO: push to athena instead - le_handler.emit_raw(dat) + level = dat[0] + record = dat[1:].decode("utf-8") + if level >= log_level: + log_handler.emit(record) # then we publish them msg = messaging.new_message() - msg.logMessage = dat + msg.logMessage = record pub_sock.send(msg.to_bytes()) diff --git a/selfdrive/manager/build.py b/selfdrive/manager/build.py index 4c12b3a318..9589cbee59 100755 --- a/selfdrive/manager/build.py +++ b/selfdrive/manager/build.py @@ -10,7 +10,7 @@ import textwrap from common.basedir import BASEDIR from common.spinner import Spinner from common.text_window import TextWindow -from selfdrive.swaglog import add_logentries_handler, cloudlog +from selfdrive.swaglog import cloudlog, add_file_handler from selfdrive.version import dirty TOTAL_SCONS_NODES = 1225 @@ -70,7 +70,7 @@ def build(spinner, dirty=False): errors = [line.decode('utf8', 'replace') for line in compile_output if any([err in line for err in [b'error: ', b'not found, needed by target']])] error_s = "\n".join(errors) - add_logentries_handler(cloudlog) + add_file_handler(cloudlog) cloudlog.error("scons build failed\n" + error_s) # Show TextWindow diff --git a/selfdrive/manager/manager.py b/selfdrive/manager/manager.py index 9cb667d048..393b513939 100755 --- a/selfdrive/manager/manager.py +++ b/selfdrive/manager/manager.py @@ -16,7 +16,7 @@ from selfdrive.manager.helpers import unblock_stdout from selfdrive.manager.process import ensure_running from selfdrive.manager.process_config import managed_processes from selfdrive.registration import register -from selfdrive.swaglog import add_logentries_handler, cloudlog +from selfdrive.swaglog import cloudlog, add_file_handler from selfdrive.version import dirty, version @@ -183,7 +183,7 @@ if __name__ == "__main__": try: main() except Exception: - add_logentries_handler(cloudlog) + add_file_handler(cloudlog) cloudlog.exception("Manager failed to start") # Show last 3 lines of traceback diff --git a/selfdrive/swaglog.py b/selfdrive/swaglog.py index 48447527c6..beacd5b87a 100644 --- a/selfdrive/swaglog.py +++ b/selfdrive/swaglog.py @@ -1,19 +1,72 @@ import os +from pathlib import Path import logging +from logging.handlers import BaseRotatingHandler -from logentries import LogentriesHandler import zmq -from common.logging_extra import SwagLogger, SwagFormatter +from common.logging_extra import SwagLogger, SwagFormatter, SwagLogFileFormatter +from common.realtime import sec_since_boot +from selfdrive.hardware import PC +if PC: + SWAGLOG_DIR = os.path.join(str(Path.home()), ".comma", "log") +else: + SWAGLOG_DIR = "/data/log/" -def get_le_handler(): - # setup logentries. we forward log messages to it - le_token = "e8549616-0798-4d7e-a2ca-2513ae81fa17" - return LogentriesHandler(le_token, use_tls=False, verbose=False) +def get_file_handler(): + Path(SWAGLOG_DIR).mkdir(parents=True, exist_ok=True) + base_filename = os.path.join(SWAGLOG_DIR, "swaglog") + handler = SwaglogRotatingFileHandler(base_filename) + return handler +class SwaglogRotatingFileHandler(BaseRotatingHandler): + def __init__(self, base_filename, interval=60, max_bytes=1024*256, backup_count=2500, encoding=None): + super().__init__(base_filename, mode="a", encoding=encoding, delay=True) + self.base_filename = base_filename + self.interval = interval # seconds + self.max_bytes = max_bytes + self.backup_count = backup_count + self.log_files = self.get_existing_logfiles() + log_indexes = [f.split(".")[-1] for f in self.log_files] + self.last_file_idx = max([int(i) for i in log_indexes if i.isdigit()] or [-1]) + self.last_rollover = None + self.doRollover() -class LogMessageHandler(logging.Handler): + def _open(self): + self.last_rollover = sec_since_boot() + self.last_file_idx += 1 + next_filename = f"{self.base_filename}.{self.last_file_idx:010}" + stream = open(next_filename, self.mode, encoding=self.encoding) + self.log_files.insert(0, next_filename) + return stream + + def get_existing_logfiles(self): + log_files = list() + base_dir = os.path.dirname(self.base_filename) + for fn in os.listdir(base_dir): + fp = os.path.join(base_dir, fn) + if fp.startswith(self.base_filename) and os.path.isfile(fp): + log_files.append(fp) + return sorted(log_files) + + def shouldRollover(self, record): + size_exceeded = self.max_bytes > 0 and self.stream.tell() >= self.max_bytes + time_exceeded = self.interval > 0 and self.last_rollover + self.interval <= sec_since_boot() + return size_exceeded or time_exceeded + + def doRollover(self): + if self.stream: + self.stream.close() + self.stream = self._open() + + if self.backup_count > 0: + while len(self.log_files) > self.backup_count: + to_delete = self.log_files.pop() + if os.path.exists(to_delete): # just being safe, should always exist + os.remove(to_delete) + +class UnixDomainSocketHandler(logging.Handler): def __init__(self, formatter): logging.Handler.__init__(self) self.setFormatter(formatter) @@ -40,11 +93,13 @@ class LogMessageHandler(logging.Handler): pass -def add_logentries_handler(log): - """Function to add the logentries handler to swaglog. - This can be used to send logs when logmessaged is not running.""" - handler = get_le_handler() - handler.setFormatter(SwagFormatter(log)) +def add_file_handler(log): + """ + Function to add the file log handler to swaglog. + This can be used to store logs when logmessaged is not running. + """ + handler = get_file_handler() + handler.setFormatter(SwagLogFileFormatter(log)) log.addHandler(handler) @@ -53,4 +108,5 @@ log.setLevel(logging.DEBUG) outhandler = logging.StreamHandler() log.addHandler(outhandler) -log.addHandler(LogMessageHandler(SwagFormatter(log))) +# logs are sent through IPC before writing to disk to prevent disk I/O blocking +log.addHandler(UnixDomainSocketHandler(SwagFormatter(log))) diff --git a/selfdrive/thermald/thermald.py b/selfdrive/thermald/thermald.py index 01268346d0..6240d14e47 100755 --- a/selfdrive/thermald/thermald.py +++ b/selfdrive/thermald/thermald.py @@ -13,6 +13,7 @@ from common.filter_simple import FirstOrderFilter from common.numpy_fast import clip, interp from common.params import Params from common.realtime import DT_TRML, sec_since_boot +from common.dict_helpers import strip_deprecated_keys from selfdrive.controls.lib.alertmanager import set_offroad_alert from selfdrive.hardware import EON, TICI, HARDWARE from selfdrive.loggerd.config import get_available_percent @@ -417,9 +418,9 @@ def thermald_thread(): location = messaging.recv_sock(location_sock) cloudlog.event("STATUS_PACKET", count=count, - pandaState=(pandaState.to_dict() if pandaState else None), - location=(location.gpsLocationExternal.to_dict() if location else None), - deviceState=msg.to_dict()) + pandaState=(strip_deprecated_keys(pandaState.to_dict()) if pandaState else None), + location=(strip_deprecated_keys(location.gpsLocationExternal.to_dict()) if location else None), + deviceState=strip_deprecated_keys(msg.to_dict())) count += 1