Add more logging to athena threads (#21086)

* add more logging to athena threads

* add debug to local proxy

* log athena method calls

* data is a json string
pull/21088/head
Willem Melching 4 years ago committed by GitHub
parent 6a04baffc4
commit a065eb6942
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      selfdrive/athena/athenad.py

@ -53,12 +53,12 @@ def handle_long_poll(ws):
end_event = threading.Event() end_event = threading.Event()
threads = [ threads = [
threading.Thread(target=ws_recv, args=(ws, end_event)), threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'),
threading.Thread(target=ws_send, args=(ws, end_event)), threading.Thread(target=ws_send, args=(ws, end_event), name='wc_send'),
threading.Thread(target=upload_handler, args=(end_event,)), threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'),
threading.Thread(target=log_handler, args=(end_event,)), threading.Thread(target=log_handler, args=(end_event,), name='log_handler'),
] + [ ] + [
threading.Thread(target=jsonrpc_handler, args=(end_event,)) threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}')
for x in range(HANDLER_THREADS) for x in range(HANDLER_THREADS)
] ]
@ -72,14 +72,17 @@ def handle_long_poll(ws):
raise raise
finally: finally:
for thread in threads: for thread in threads:
cloudlog.debug(f"athena.joining {thread.name}")
thread.join() thread.join()
def jsonrpc_handler(end_event): def jsonrpc_handler(end_event):
dispatcher["startLocalProxy"] = partial(startLocalProxy, end_event) dispatcher["startLocalProxy"] = partial(startLocalProxy, end_event)
while not end_event.is_set(): while not end_event.is_set():
try: try:
data = recv_queue.get(timeout=1) data = recv_queue.get(timeout=1)
if "method" in data: if "method" in data:
cloudlog.debug(f"athena.jsonrpc_handler.call_method {data}")
response = JSONRPCResponseManager.handle(data, dispatcher) response = JSONRPCResponseManager.handle(data, dispatcher)
send_queue.put_nowait(response.json) send_queue.put_nowait(response.json)
elif "result" in data and "id" in data: elif "result" in data and "id" in data:
@ -211,6 +214,8 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):
if local_port not in LOCAL_PORT_WHITELIST: if local_port not in LOCAL_PORT_WHITELIST:
raise Exception("Requested local port not whitelisted") raise Exception("Requested local port not whitelisted")
cloudlog.debug("athena.startLocalProxy.starting")
params = Params() params = Params()
dongle_id = params.get("DongleId").decode('utf8') dongle_id = params.get("DongleId").decode('utf8')
identity_token = Api(dongle_id).get_token() identity_token = Api(dongle_id).get_token()
@ -231,6 +236,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):
for thread in threads: for thread in threads:
thread.start() thread.start()
cloudlog.debug("athena.startLocalProxy.started")
return {"success": 1} return {"success": 1}
except Exception as e: except Exception as e:
cloudlog.exception("athenad.startLocalProxy.exception") cloudlog.exception("athenad.startLocalProxy.exception")
@ -303,7 +309,6 @@ def log_handler(end_event):
log_files = [] log_files = []
last_scan = 0 last_scan = 0
log_retries = 0
while not end_event.is_set(): while not end_event.is_set():
try: try:
try: try:
@ -315,7 +320,8 @@ def log_handler(end_event):
try: try:
setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME)
except OSError: except OSError:
pass # file could be deleted by log rotation pass # file could be deleted by log rotation
except queue.Empty: except queue.Empty:
pass pass
@ -345,14 +351,10 @@ def log_handler(end_event):
} }
log_send_queue.put_nowait(json.dumps(jsonrpc)) log_send_queue.put_nowait(json.dumps(jsonrpc))
except OSError: except OSError:
pass # file could be deleted by log rotation pass # file could be deleted by log rotation
log_retries = 0
except Exception: except Exception:
cloudlog.exception("athena.log_handler.exception") cloudlog.exception("athena.log_handler.exception")
log_retries += 1
if log_retries != 0:
time.sleep(backoff(log_retries))
def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event): def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event):
@ -366,8 +368,11 @@ def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event):
cloudlog.exception("athenad.ws_proxy_recv.exception") cloudlog.exception("athenad.ws_proxy_recv.exception")
break break
cloudlog.debug("athena.ws_proxy_recv closing sockets")
ssock.close() ssock.close()
local_sock.close() local_sock.close()
cloudlog.debug("athena.ws_proxy_recv done closing sockets")
end_event.set() end_event.set()
@ -391,7 +396,9 @@ def ws_proxy_send(ws, local_sock, signal_sock, end_event):
cloudlog.exception("athenad.ws_proxy_send.exception") cloudlog.exception("athenad.ws_proxy_send.exception")
end_event.set() end_event.set()
cloudlog.debug("athena.ws_proxy_send closing sockets")
signal_sock.close() signal_sock.close()
cloudlog.debug("athena.ws_proxy_send done closing sockets")
def ws_recv(ws, end_event): def ws_recv(ws, end_event):

Loading…
Cancel
Save