log to file and send through athena (#20250)

* log to file and send through athena

* rename logging level

* pass thru log formatter

* logMessage is TEXT

* send queue always strings

* switch to xattr and lower priority queue

* enable cloud logging for devices

* time or size based log rotation

* basename -> dirname

* remove HARDWARE.get_cloudlog_enabled

* fix errors

* fix another exception

* xattrs need to be bytes

* sending works

* cleanup files at start

* add id and adjust formatting

* do not send active log file

* better names

* separate log formatters

* fix formatter super init

* fix log file order

* ensure file always has file formatter

* i see why there was no formatter

* apply same formatting to cpp log msgs

* apply same formatting to cpp log msgs

* update queue names in tests

* strip deprecated keys in STATUS_PACKET

* strip DEPRECATED from dict recursively

* athena log queue test

* instanceof instead of type

* isinstance instead of type

* use super

* remove logentries

* last_scan param unused

* comment about special log msg attr names

* add dict_helpers.py to release files

* use monotonic time and counter for log rotation

* update for adjusted log file naming

* use monotonic clock for tracking last log file scan
old-commit-hash: 3d48bd934d
commatwo_master
Greg Hogan 4 years ago committed by GitHub
parent f1e9dda1ec
commit 183d9f98a1
  1. 4
      Pipfile
  2. 4
      Pipfile.lock
  3. 9
      common/dict_helpers.py
  4. 41
      common/logging_extra.py
  5. 1
      release/files_common
  6. 113
      selfdrive/athena/athenad.py
  7. 2
      selfdrive/athena/tests/helpers.py
  8. 11
      selfdrive/athena/tests/test_athenad.py
  9. 23
      selfdrive/logmessaged.py
  10. 4
      selfdrive/manager/build.py
  11. 4
      selfdrive/manager/manager.py
  12. 82
      selfdrive/swaglog.py
  13. 7
      selfdrive/thermald/thermald.py

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:a6f7dbffcf05f234ff043da28079ef4747d1c7e0ce3240a646ceacb5ef6fc5e9
size 1999
oid sha256:d3354b5ea4f83ca0341a98695a1e513665397cf960c81038f010929584982496
size 1886

4
Pipfile.lock generated

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:d4ac0578553fd40f90df291ebfac29a9447955ead9c70d21b16c35e0f0c467d2
size 199443
oid sha256:013025b078fa735d537faaa172ead548652b456afbd89b631df25fe18360d7d3
size 202630

@ -0,0 +1,9 @@
# remove all keys that end in DEPRECATED
def strip_deprecated_keys(d):
for k in list(d.keys()):
if isinstance(k, str):
if k.endswith('DEPRECATED'):
d.pop(k)
elif isinstance(d[k], dict):
strip_deprecated_keys(d[k])
return d

@ -3,6 +3,7 @@ import os
import sys
import copy
import json
import uuid
import socket
import logging
import traceback
@ -62,8 +63,48 @@ class SwagFormatter(logging.Formatter):
return record_dict
def format(self, record):
if self.swaglogger is None:
raise Exception("must set swaglogger before calling format()")
return json_robust_dumps(self.format_dict(record))
class SwagLogFileFormatter(SwagFormatter):
def fix_kv(self, k, v):
# append type to names to preserve legacy naming in logs
# avoids overlapping key namespaces with different types
# e.g. log.info() creates 'msg' -> 'msg$s'
# log.event() creates 'msg.health.logMonoTime' -> 'msg.health.logMonoTime$i'
# because overlapping namespace 'msg' caused problems
if isinstance(v, (str, bytes)):
k += "$s"
elif isinstance(v, float):
k += "$f"
elif isinstance(v, bool):
k += "$b"
elif isinstance(v, int):
k += "$i"
elif isinstance(v, dict):
nv = {}
for ik, iv in v.items():
ik, iv = self.fix_kv(ik, iv)
nv[ik] = iv
v = nv
elif isinstance(v, list):
k += "$a"
return k, v
def format(self, record):
if isinstance(record, str):
v = json.loads(record)
else:
v = self.format_dict(record)
mk, mv = self.fix_kv('msg', v['msg'])
del v['msg']
v[mk] = mv
v['id'] = uuid.uuid4().hex
return json_robust_dumps(v)
class SwagErrorFilter(logging.Filter):
def filter(self, record):
return record.levelno < logging.ERROR

@ -29,6 +29,7 @@ common/params_pyx.pyx
common/xattr.py
common/profiler.py
common/basedir.py
common/dict_helpers.py
common/filter_simple.py
common/stat_live.py
common/spinner.py

@ -4,6 +4,7 @@ import hashlib
import io
import json
import os
import sys
import queue
import random
import select
@ -24,18 +25,24 @@ from common.api import Api
from common.basedir import PERSIST
from common.params import Params
from common.realtime import sec_since_boot
from selfdrive.hardware import HARDWARE
from selfdrive.hardware import HARDWARE, PC
from selfdrive.loggerd.config import ROOT
from selfdrive.swaglog import cloudlog
from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.swaglog import cloudlog, SWAGLOG_DIR
ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
LOCAL_PORT_WHITELIST = set([8022])
LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
dispatcher["echo"] = lambda s: s
payload_queue: Any = queue.Queue()
response_queue: Any = queue.Queue()
recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue()
upload_queue: Any = queue.Queue()
log_send_queue: Any = queue.Queue()
log_recv_queue: Any = queue.Queue()
cancelled_uploads: Any = set()
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id'])
@ -46,7 +53,8 @@ def handle_long_poll(ws):
threads = [
threading.Thread(target=ws_recv, args=(ws, end_event)),
threading.Thread(target=ws_send, args=(ws, end_event)),
threading.Thread(target=upload_handler, args=(end_event,))
threading.Thread(target=upload_handler, args=(end_event,)),
threading.Thread(target=log_handler, args=(end_event,)),
] + [
threading.Thread(target=jsonrpc_handler, args=(end_event,))
for x in range(HANDLER_THREADS)
@ -64,19 +72,21 @@ def handle_long_poll(ws):
for thread in threads:
thread.join()
def jsonrpc_handler(end_event):
dispatcher["startLocalProxy"] = partial(startLocalProxy, end_event)
while not end_event.is_set():
try:
data = payload_queue.get(timeout=1)
data = recv_queue.get(timeout=1)
if "method" in data and "params" in data:
response = JSONRPCResponseManager.handle(data, dispatcher)
response_queue.put_nowait(response)
send_queue.put_nowait(response.json)
elif "result" in data and "id" in data:
log_recv_queue.put_nowait(data)
except queue.Empty:
pass
except Exception as e:
cloudlog.exception("athena jsonrpc handler failed")
response_queue.put_nowait(json.dumps({"error": str(e)}))
send_queue.put_nowait(json.dumps({"error": str(e)}))
def upload_handler(end_event):
@ -244,6 +254,82 @@ def takeSnapshot():
raise Exception("not available while camerad is started")
def get_logs_to_send_sorted():
# TODO: scan once then use inotify to detect file creation/deletion
curr_time = int(time.time())
logs = []
for log_entry in os.listdir(SWAGLOG_DIR):
log_path = os.path.join(SWAGLOG_DIR, log_entry)
try:
time_sent = int.from_bytes(getxattr(log_path, LOG_ATTR_NAME), sys.byteorder)
except (ValueError, TypeError):
time_sent = 0
# assume send failed and we lost the response if sent more than one hour ago
if not time_sent or curr_time - time_sent > 3600:
logs.append(log_entry)
# return logs in order they should be sent
# excluding most recent (active) log file
return sorted(logs[:-1])
def log_handler(end_event):
if PC:
return
log_files = []
last_scan = 0
log_retries = 0
while not end_event.is_set():
try:
try:
result = json.loads(log_recv_queue.get(timeout=1))
log_success = result.get("success")
log_entry = result.get("id")
log_path = os.path.join(SWAGLOG_DIR, log_entry)
if log_entry and log_success:
try:
setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME)
except OSError:
pass # file could be deleted by log rotation
except queue.Empty:
pass
curr_scan = sec_since_boot()
if curr_scan - last_scan > 10:
log_files = get_logs_to_send_sorted()
last_scan = curr_scan
# never send last log file because it is the active log
# and only send one log file at a time (most recent first)
if not len(log_files) or not log_send_queue.empty():
continue
log_entry = log_files.pop()
try:
curr_time = int(time.time())
log_path = os.path.join(SWAGLOG_DIR, log_entry)
setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder))
with open(log_path, "r") as f:
jsonrpc = {
"method": "forwardLogs",
"params": {
"logs": f.read()
},
"jsonrpc": "2.0",
"id": log_entry
}
log_send_queue.put_nowait(json.dumps(jsonrpc))
except OSError:
pass # file could be deleted by log rotation
log_retries = 0
except Exception:
cloudlog.exception("athena.log_handler.exception")
log_retries += 1
if log_retries != 0:
time.sleep(backoff(log_retries))
def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event):
while not (end_event.is_set() or global_end_event.is_set()):
try:
@ -290,7 +376,7 @@ def ws_recv(ws, end_event):
if opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
if opcode == ABNF.OPCODE_TEXT:
data = data.decode("utf-8")
payload_queue.put_nowait(data)
recv_queue.put_nowait(data)
elif opcode == ABNF.OPCODE_PING:
Params().put("LastAthenaPingTime", str(int(sec_since_boot() * 1e9)))
except WebSocketTimeoutException:
@ -303,8 +389,11 @@ def ws_recv(ws, end_event):
def ws_send(ws, end_event):
while not end_event.is_set():
try:
response = response_queue.get(timeout=1)
ws.send(response.json)
try:
data = send_queue.get_nowait()
except queue.Empty:
data = log_send_queue.get(timeout=1)
ws.send(data)
except queue.Empty:
pass
except Exception:

@ -91,7 +91,7 @@ def with_http_server(func):
p.start()
time.sleep(0.1)
with Timeout(2):
with Timeout(2, 'HTTP Server seeding failed'):
while True:
try:
requests.put(f'http://{host}:{port}/qlog.bz2', data='')

@ -103,6 +103,7 @@ class TestAthenadMethods(unittest.TestCase):
athenad.upload_queue.put_nowait(item)
try:
time.sleep(1) # give it time to process to prevent shutdown before upload completes
now = time.time()
while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0:
@ -178,10 +179,14 @@ class TestAthenadMethods(unittest.TestCase):
thread = threading.Thread(target=athenad.jsonrpc_handler, args=(end_event,))
thread.daemon = True
thread.start()
athenad.payload_queue.put_nowait(json.dumps({"method": "echo", "params": ["hello"], "jsonrpc": "2.0", "id": 0}))
try:
resp = athenad.response_queue.get(timeout=3)
self.assertDictEqual(resp.data, {'result': 'hello', 'id': 0, 'jsonrpc': '2.0'})
athenad.recv_queue.put_nowait(json.dumps({"method": "echo", "params": ["hello"], "jsonrpc": "2.0", "id": 0}))
resp = athenad.send_queue.get(timeout=3)
self.assertDictEqual(json.loads(resp), {'result': 'hello', 'id': 0, 'jsonrpc': '2.0'})
athenad.recv_queue.put_nowait(json.dumps({'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'}))
resp = athenad.log_recv_queue.get(timeout=3)
self.assertDictEqual(json.loads(resp), {'result': {'success': 1}, 'id': 0, 'jsonrpc': '2.0'})
finally:
end_event.set()
thread.join()

@ -1,12 +1,14 @@
#!/usr/bin/env python3
import zmq
import cereal.messaging as messaging
from selfdrive.swaglog import get_le_handler
from common.logging_extra import SwagLogFileFormatter
from selfdrive.swaglog import get_file_handler
def main():
le_handler = get_le_handler()
le_level = 20 # logging.INFO
log_handler = get_file_handler()
log_handler.setFormatter(SwagLogFileFormatter(None))
log_level = 20 # logging.INFO
ctx = zmq.Context().instance()
sock = ctx.socket(zmq.PULL)
@ -17,19 +19,14 @@ def main():
while True:
dat = b''.join(sock.recv_multipart())
dat = dat.decode('utf8')
levelnum = ord(dat[0])
dat = dat[1:]
if levelnum >= le_level:
# push to logentries
# TODO: push to athena instead
le_handler.emit_raw(dat)
level = dat[0]
record = dat[1:].decode("utf-8")
if level >= log_level:
log_handler.emit(record)
# then we publish them
msg = messaging.new_message()
msg.logMessage = dat
msg.logMessage = record
pub_sock.send(msg.to_bytes())

@ -10,7 +10,7 @@ import textwrap
from common.basedir import BASEDIR
from common.spinner import Spinner
from common.text_window import TextWindow
from selfdrive.swaglog import add_logentries_handler, cloudlog
from selfdrive.swaglog import cloudlog, add_file_handler
from selfdrive.version import dirty
TOTAL_SCONS_NODES = 1225
@ -70,7 +70,7 @@ def build(spinner, dirty=False):
errors = [line.decode('utf8', 'replace') for line in compile_output
if any([err in line for err in [b'error: ', b'not found, needed by target']])]
error_s = "\n".join(errors)
add_logentries_handler(cloudlog)
add_file_handler(cloudlog)
cloudlog.error("scons build failed\n" + error_s)
# Show TextWindow

@ -16,7 +16,7 @@ from selfdrive.manager.helpers import unblock_stdout
from selfdrive.manager.process import ensure_running
from selfdrive.manager.process_config import managed_processes
from selfdrive.registration import register
from selfdrive.swaglog import add_logentries_handler, cloudlog
from selfdrive.swaglog import cloudlog, add_file_handler
from selfdrive.version import dirty, version
@ -183,7 +183,7 @@ if __name__ == "__main__":
try:
main()
except Exception:
add_logentries_handler(cloudlog)
add_file_handler(cloudlog)
cloudlog.exception("Manager failed to start")
# Show last 3 lines of traceback

@ -1,19 +1,72 @@
import os
from pathlib import Path
import logging
from logging.handlers import BaseRotatingHandler
from logentries import LogentriesHandler
import zmq
from common.logging_extra import SwagLogger, SwagFormatter
from common.logging_extra import SwagLogger, SwagFormatter, SwagLogFileFormatter
from common.realtime import sec_since_boot
from selfdrive.hardware import PC
if PC:
SWAGLOG_DIR = os.path.join(str(Path.home()), ".comma", "log")
else:
SWAGLOG_DIR = "/data/log/"
def get_le_handler():
# setup logentries. we forward log messages to it
le_token = "e8549616-0798-4d7e-a2ca-2513ae81fa17"
return LogentriesHandler(le_token, use_tls=False, verbose=False)
def get_file_handler():
Path(SWAGLOG_DIR).mkdir(parents=True, exist_ok=True)
base_filename = os.path.join(SWAGLOG_DIR, "swaglog")
handler = SwaglogRotatingFileHandler(base_filename)
return handler
class SwaglogRotatingFileHandler(BaseRotatingHandler):
def __init__(self, base_filename, interval=60, max_bytes=1024*256, backup_count=2500, encoding=None):
super().__init__(base_filename, mode="a", encoding=encoding, delay=True)
self.base_filename = base_filename
self.interval = interval # seconds
self.max_bytes = max_bytes
self.backup_count = backup_count
self.log_files = self.get_existing_logfiles()
log_indexes = [f.split(".")[-1] for f in self.log_files]
self.last_file_idx = max([int(i) for i in log_indexes if i.isdigit()] or [-1])
self.last_rollover = None
self.doRollover()
class LogMessageHandler(logging.Handler):
def _open(self):
self.last_rollover = sec_since_boot()
self.last_file_idx += 1
next_filename = f"{self.base_filename}.{self.last_file_idx:010}"
stream = open(next_filename, self.mode, encoding=self.encoding)
self.log_files.insert(0, next_filename)
return stream
def get_existing_logfiles(self):
log_files = list()
base_dir = os.path.dirname(self.base_filename)
for fn in os.listdir(base_dir):
fp = os.path.join(base_dir, fn)
if fp.startswith(self.base_filename) and os.path.isfile(fp):
log_files.append(fp)
return sorted(log_files)
def shouldRollover(self, record):
size_exceeded = self.max_bytes > 0 and self.stream.tell() >= self.max_bytes
time_exceeded = self.interval > 0 and self.last_rollover + self.interval <= sec_since_boot()
return size_exceeded or time_exceeded
def doRollover(self):
if self.stream:
self.stream.close()
self.stream = self._open()
if self.backup_count > 0:
while len(self.log_files) > self.backup_count:
to_delete = self.log_files.pop()
if os.path.exists(to_delete): # just being safe, should always exist
os.remove(to_delete)
class UnixDomainSocketHandler(logging.Handler):
def __init__(self, formatter):
logging.Handler.__init__(self)
self.setFormatter(formatter)
@ -40,11 +93,13 @@ class LogMessageHandler(logging.Handler):
pass
def add_logentries_handler(log):
"""Function to add the logentries handler to swaglog.
This can be used to send logs when logmessaged is not running."""
handler = get_le_handler()
handler.setFormatter(SwagFormatter(log))
def add_file_handler(log):
"""
Function to add the file log handler to swaglog.
This can be used to store logs when logmessaged is not running.
"""
handler = get_file_handler()
handler.setFormatter(SwagLogFileFormatter(log))
log.addHandler(handler)
@ -53,4 +108,5 @@ log.setLevel(logging.DEBUG)
outhandler = logging.StreamHandler()
log.addHandler(outhandler)
log.addHandler(LogMessageHandler(SwagFormatter(log)))
# logs are sent through IPC before writing to disk to prevent disk I/O blocking
log.addHandler(UnixDomainSocketHandler(SwagFormatter(log)))

@ -13,6 +13,7 @@ from common.filter_simple import FirstOrderFilter
from common.numpy_fast import clip, interp
from common.params import Params
from common.realtime import DT_TRML, sec_since_boot
from common.dict_helpers import strip_deprecated_keys
from selfdrive.controls.lib.alertmanager import set_offroad_alert
from selfdrive.hardware import EON, TICI, HARDWARE
from selfdrive.loggerd.config import get_available_percent
@ -417,9 +418,9 @@ def thermald_thread():
location = messaging.recv_sock(location_sock)
cloudlog.event("STATUS_PACKET",
count=count,
pandaState=(pandaState.to_dict() if pandaState else None),
location=(location.gpsLocationExternal.to_dict() if location else None),
deviceState=msg.to_dict())
pandaState=(strip_deprecated_keys(pandaState.to_dict()) if pandaState else None),
location=(strip_deprecated_keys(location.gpsLocationExternal.to_dict()) if location else None),
deviceState=strip_deprecated_keys(msg.to_dict()))
count += 1

Loading…
Cancel
Save