diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index b55ce54ac0..5740c8f647 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -53,12 +53,12 @@ def handle_long_poll(ws): end_event = threading.Event() threads = [ - threading.Thread(target=ws_recv, args=(ws, end_event)), - threading.Thread(target=ws_send, args=(ws, end_event)), - threading.Thread(target=upload_handler, args=(end_event,)), - threading.Thread(target=log_handler, args=(end_event,)), + threading.Thread(target=ws_recv, args=(ws, end_event), name='ws_recv'), + threading.Thread(target=ws_send, args=(ws, end_event), name='wc_send'), + threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'), + threading.Thread(target=log_handler, args=(end_event,), name='log_handler'), ] + [ - threading.Thread(target=jsonrpc_handler, args=(end_event,)) + threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}') for x in range(HANDLER_THREADS) ] @@ -72,14 +72,17 @@ def handle_long_poll(ws): raise finally: for thread in threads: + cloudlog.debug(f"athena.joining {thread.name}") thread.join() + def jsonrpc_handler(end_event): dispatcher["startLocalProxy"] = partial(startLocalProxy, end_event) while not end_event.is_set(): try: data = recv_queue.get(timeout=1) if "method" in data: + cloudlog.debug(f"athena.jsonrpc_handler.call_method {data}") response = JSONRPCResponseManager.handle(data, dispatcher) send_queue.put_nowait(response.json) elif "result" in data and "id" in data: @@ -211,6 +214,8 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port): if local_port not in LOCAL_PORT_WHITELIST: raise Exception("Requested local port not whitelisted") + cloudlog.debug("athena.startLocalProxy.starting") + params = Params() dongle_id = params.get("DongleId").decode('utf8') identity_token = Api(dongle_id).get_token() @@ -231,6 +236,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port): for thread in threads: thread.start() + cloudlog.debug("athena.startLocalProxy.started") return {"success": 1} except Exception as e: cloudlog.exception("athenad.startLocalProxy.exception") @@ -303,7 +309,6 @@ def log_handler(end_event): log_files = [] last_scan = 0 - log_retries = 0 while not end_event.is_set(): try: try: @@ -315,7 +320,8 @@ def log_handler(end_event): try: setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) except OSError: - pass # file could be deleted by log rotation + pass # file could be deleted by log rotation + except queue.Empty: pass @@ -345,14 +351,10 @@ def log_handler(end_event): } log_send_queue.put_nowait(json.dumps(jsonrpc)) except OSError: - pass # file could be deleted by log rotation - log_retries = 0 + pass # file could be deleted by log rotation + except Exception: cloudlog.exception("athena.log_handler.exception") - log_retries += 1 - - if log_retries != 0: - time.sleep(backoff(log_retries)) def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event): @@ -366,8 +368,11 @@ def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event): cloudlog.exception("athenad.ws_proxy_recv.exception") break + cloudlog.debug("athena.ws_proxy_recv closing sockets") ssock.close() local_sock.close() + cloudlog.debug("athena.ws_proxy_recv done closing sockets") + end_event.set() @@ -391,7 +396,9 @@ def ws_proxy_send(ws, local_sock, signal_sock, end_event): cloudlog.exception("athenad.ws_proxy_send.exception") end_event.set() + cloudlog.debug("athena.ws_proxy_send closing sockets") signal_sock.close() + cloudlog.debug("athena.ws_proxy_send done closing sockets") def ws_recv(ws, end_event):