Add more logging to athena threads (#21086)

* add more logging to athena threads

* add debug to local proxy

* log athena method calls

* data is a json string
old-commit-hash: a065eb6942
commatwo_master
Willem Melching 4 years ago committed by GitHub
parent 345d9abe7c
commit a8c569f2da
  1. 29
      selfdrive/athena/athenad.py

@ -53,12 +53,12 @@ def handle_long_poll(ws):
end_event = threading.Event()
threads = [
threading.Thread(target=ws_recv, args=(ws, end_event)),
threading.Thread(target=ws_send, args=(ws, end_event)),
threading.Thread(target=upload_handler, args=(end_event,)),
threading.Thread(target=log_handler, args=(end_event,)),
threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'),
threading.Thread(target=ws_send, args=(ws, end_event), name='wc_send'),
threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'),
threading.Thread(target=log_handler, args=(end_event,), name='log_handler'),
] + [
threading.Thread(target=jsonrpc_handler, args=(end_event,))
threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}')
for x in range(HANDLER_THREADS)
]
@ -72,14 +72,17 @@ def handle_long_poll(ws):
raise
finally:
for thread in threads:
cloudlog.debug(f"athena.joining {thread.name}")
thread.join()
def jsonrpc_handler(end_event):
dispatcher["startLocalProxy"] = partial(startLocalProxy, end_event)
while not end_event.is_set():
try:
data = recv_queue.get(timeout=1)
if "method" in data:
cloudlog.debug(f"athena.jsonrpc_handler.call_method {data}")
response = JSONRPCResponseManager.handle(data, dispatcher)
send_queue.put_nowait(response.json)
elif "result" in data and "id" in data:
@ -211,6 +214,8 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):
if local_port not in LOCAL_PORT_WHITELIST:
raise Exception("Requested local port not whitelisted")
cloudlog.debug("athena.startLocalProxy.starting")
params = Params()
dongle_id = params.get("DongleId").decode('utf8')
identity_token = Api(dongle_id).get_token()
@ -231,6 +236,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):
for thread in threads:
thread.start()
cloudlog.debug("athena.startLocalProxy.started")
return {"success": 1}
except Exception as e:
cloudlog.exception("athenad.startLocalProxy.exception")
@ -303,7 +309,6 @@ def log_handler(end_event):
log_files = []
last_scan = 0
log_retries = 0
while not end_event.is_set():
try:
try:
@ -316,6 +321,7 @@ def log_handler(end_event):
setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME)
except OSError:
pass # file could be deleted by log rotation
except queue.Empty:
pass
@ -346,13 +352,9 @@ def log_handler(end_event):
log_send_queue.put_nowait(json.dumps(jsonrpc))
except OSError:
pass # file could be deleted by log rotation
log_retries = 0
except Exception:
cloudlog.exception("athena.log_handler.exception")
log_retries += 1
if log_retries != 0:
time.sleep(backoff(log_retries))
def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event):
@ -366,8 +368,11 @@ def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event):
cloudlog.exception("athenad.ws_proxy_recv.exception")
break
cloudlog.debug("athena.ws_proxy_recv closing sockets")
ssock.close()
local_sock.close()
cloudlog.debug("athena.ws_proxy_recv done closing sockets")
end_event.set()
@ -391,7 +396,9 @@ def ws_proxy_send(ws, local_sock, signal_sock, end_event):
cloudlog.exception("athenad.ws_proxy_send.exception")
end_event.set()
cloudlog.debug("athena.ws_proxy_send closing sockets")
signal_sock.close()
cloudlog.debug("athena.ws_proxy_send done closing sockets")
def ws_recv(ws, end_event):

Loading…
Cancel
Save