diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index 5740c8f647..28ddbb1998 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -85,7 +85,7 @@ def jsonrpc_handler(end_event): cloudlog.debug(f"athena.jsonrpc_handler.call_method {data}") response = JSONRPCResponseManager.handle(data, dispatcher) send_queue.put_nowait(response.json) - elif "result" in data and "id" in data: + elif "id" in data and ("result" in data or "error" in data): log_recv_queue.put_nowait(data) else: raise Exception("not a valid request or response") @@ -311,47 +311,55 @@ def log_handler(end_event): last_scan = 0 while not end_event.is_set(): try: - try: - result = json.loads(log_recv_queue.get(timeout=1)) - log_success = result.get("success") - log_entry = result.get("id") - log_path = os.path.join(SWAGLOG_DIR, log_entry) - if log_entry and log_success: - try: - setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) - except OSError: - pass # file could be deleted by log rotation - - except queue.Empty: - pass - curr_scan = sec_since_boot() if curr_scan - last_scan > 10: log_files = get_logs_to_send_sorted() last_scan = curr_scan - # never send last log file because it is the active log - # and only send one log file at a time (most recent first) - if not len(log_files) or not log_send_queue.empty(): - continue - - log_entry = log_files.pop() - try: - curr_time = int(time.time()) - log_path = os.path.join(SWAGLOG_DIR, log_entry) - setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) - with open(log_path, "r") as f: - jsonrpc = { - "method": "forwardLogs", - "params": { - "logs": f.read() - }, - "jsonrpc": "2.0", - "id": log_entry - } - log_send_queue.put_nowait(json.dumps(jsonrpc)) - except OSError: - pass # file could be deleted by log rotation + # send one log + curr_log = None + if len(log_files) > 0: + log_entry = log_files.pop() + cloudlog.debug(f"athena.log_handler.forward_request {log_entry}") + try: + curr_time = int(time.time()) + log_path = os.path.join(SWAGLOG_DIR, log_entry) + setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) + with open(log_path, "r") as f: + jsonrpc = { + "method": "forwardLogs", + "params": { + "logs": f.read() + }, + "jsonrpc": "2.0", + "id": log_entry + } + log_send_queue.put_nowait(json.dumps(jsonrpc)) + curr_log = log_entry + except OSError: + pass # file could be deleted by log rotation + + # wait for response up to ~100 seconds + # always read queue at least once to process any old responses that arrive + for _ in range(100): + if end_event.is_set(): + break + try: + log_resp = json.loads(log_recv_queue.get(timeout=1)) + log_entry = log_resp.get("id") + log_success = "result" in log_resp and log_resp["result"].get("success") + cloudlog.debug(f"athena.log_handler.forward_response {log_entry} {log_success}") + if log_entry and log_success: + log_path = os.path.join(SWAGLOG_DIR, log_entry) + try: + setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) + except OSError: + pass # file could be deleted by log rotation + if curr_log == log_entry: + break + except queue.Empty: + if curr_log is None: + break except Exception: cloudlog.exception("athena.log_handler.exception")