|  |  |  | @ -85,7 +85,7 @@ def jsonrpc_handler(end_event): | 
			
		
	
		
			
				
					|  |  |  |  |         cloudlog.debug(f"athena.jsonrpc_handler.call_method {data}") | 
			
		
	
		
			
				
					|  |  |  |  |         response = JSONRPCResponseManager.handle(data, dispatcher) | 
			
		
	
		
			
				
					|  |  |  |  |         send_queue.put_nowait(response.json) | 
			
		
	
		
			
				
					|  |  |  |  |       elif "result" in data and "id" in data: | 
			
		
	
		
			
				
					|  |  |  |  |       elif "id" in data and ("result" in data or "error" in data): | 
			
		
	
		
			
				
					|  |  |  |  |         log_recv_queue.put_nowait(data) | 
			
		
	
		
			
				
					|  |  |  |  |       else: | 
			
		
	
		
			
				
					|  |  |  |  |         raise Exception("not a valid request or response") | 
			
		
	
	
		
			
				
					|  |  |  | @ -311,47 +311,55 @@ def log_handler(end_event): | 
			
		
	
		
			
				
					|  |  |  |  |   last_scan = 0 | 
			
		
	
		
			
				
					|  |  |  |  |   while not end_event.is_set(): | 
			
		
	
		
			
				
					|  |  |  |  |     try: | 
			
		
	
		
			
				
					|  |  |  |  |       try: | 
			
		
	
		
			
				
					|  |  |  |  |         result = json.loads(log_recv_queue.get(timeout=1)) | 
			
		
	
		
			
				
					|  |  |  |  |         log_success = result.get("success") | 
			
		
	
		
			
				
					|  |  |  |  |         log_entry = result.get("id") | 
			
		
	
		
			
				
					|  |  |  |  |         log_path = os.path.join(SWAGLOG_DIR, log_entry) | 
			
		
	
		
			
				
					|  |  |  |  |         if log_entry and log_success: | 
			
		
	
		
			
				
					|  |  |  |  |           try: | 
			
		
	
		
			
				
					|  |  |  |  |             setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) | 
			
		
	
		
			
				
					|  |  |  |  |           except OSError: | 
			
		
	
		
			
				
					|  |  |  |  |             pass  # file could be deleted by log rotation | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       except queue.Empty: | 
			
		
	
		
			
				
					|  |  |  |  |         pass | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       curr_scan = sec_since_boot() | 
			
		
	
		
			
				
					|  |  |  |  |       if curr_scan - last_scan > 10: | 
			
		
	
		
			
				
					|  |  |  |  |         log_files = get_logs_to_send_sorted() | 
			
		
	
		
			
				
					|  |  |  |  |         last_scan = curr_scan | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # never send last log file because it is the active log | 
			
		
	
		
			
				
					|  |  |  |  |       # and only send one log file at a time (most recent first) | 
			
		
	
		
			
				
					|  |  |  |  |       if not len(log_files) or not log_send_queue.empty(): | 
			
		
	
		
			
				
					|  |  |  |  |         continue | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       log_entry = log_files.pop() | 
			
		
	
		
			
				
					|  |  |  |  |       try: | 
			
		
	
		
			
				
					|  |  |  |  |         curr_time = int(time.time()) | 
			
		
	
		
			
				
					|  |  |  |  |         log_path = os.path.join(SWAGLOG_DIR, log_entry) | 
			
		
	
		
			
				
					|  |  |  |  |         setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) | 
			
		
	
		
			
				
					|  |  |  |  |         with open(log_path, "r") as f: | 
			
		
	
		
			
				
					|  |  |  |  |           jsonrpc = { | 
			
		
	
		
			
				
					|  |  |  |  |             "method": "forwardLogs", | 
			
		
	
		
			
				
					|  |  |  |  |             "params": { | 
			
		
	
		
			
				
					|  |  |  |  |               "logs": f.read() | 
			
		
	
		
			
				
					|  |  |  |  |             }, | 
			
		
	
		
			
				
					|  |  |  |  |             "jsonrpc": "2.0", | 
			
		
	
		
			
				
					|  |  |  |  |             "id": log_entry | 
			
		
	
		
			
				
					|  |  |  |  |           } | 
			
		
	
		
			
				
					|  |  |  |  |           log_send_queue.put_nowait(json.dumps(jsonrpc)) | 
			
		
	
		
			
				
					|  |  |  |  |       except OSError: | 
			
		
	
		
			
				
					|  |  |  |  |         pass  # file could be deleted by log rotation | 
			
		
	
		
			
				
					|  |  |  |  |       # send one log | 
			
		
	
		
			
				
					|  |  |  |  |       curr_log = None | 
			
		
	
		
			
				
					|  |  |  |  |       if len(log_files) > 0: | 
			
		
	
		
			
				
					|  |  |  |  |         log_entry = log_files.pop() | 
			
		
	
		
			
				
					|  |  |  |  |         cloudlog.debug(f"athena.log_handler.forward_request {log_entry}") | 
			
		
	
		
			
				
					|  |  |  |  |         try: | 
			
		
	
		
			
				
					|  |  |  |  |           curr_time = int(time.time()) | 
			
		
	
		
			
				
					|  |  |  |  |           log_path = os.path.join(SWAGLOG_DIR, log_entry) | 
			
		
	
		
			
				
					|  |  |  |  |           setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) | 
			
		
	
		
			
				
					|  |  |  |  |           with open(log_path, "r") as f: | 
			
		
	
		
			
				
					|  |  |  |  |             jsonrpc = { | 
			
		
	
		
			
				
					|  |  |  |  |               "method": "forwardLogs", | 
			
		
	
		
			
				
					|  |  |  |  |               "params": { | 
			
		
	
		
			
				
					|  |  |  |  |                 "logs": f.read() | 
			
		
	
		
			
				
					|  |  |  |  |               }, | 
			
		
	
		
			
				
					|  |  |  |  |               "jsonrpc": "2.0", | 
			
		
	
		
			
				
					|  |  |  |  |               "id": log_entry | 
			
		
	
		
			
				
					|  |  |  |  |             } | 
			
		
	
		
			
				
					|  |  |  |  |             log_send_queue.put_nowait(json.dumps(jsonrpc)) | 
			
		
	
		
			
				
					|  |  |  |  |             curr_log = log_entry | 
			
		
	
		
			
				
					|  |  |  |  |         except OSError: | 
			
		
	
		
			
				
					|  |  |  |  |           pass  # file could be deleted by log rotation | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # wait for response up to ~100 seconds | 
			
		
	
		
			
				
					|  |  |  |  |       # always read queue at least once to process any old responses that arrive | 
			
		
	
		
			
				
					|  |  |  |  |       for _ in range(100): | 
			
		
	
		
			
				
					|  |  |  |  |         if end_event.is_set(): | 
			
		
	
		
			
				
					|  |  |  |  |           break | 
			
		
	
		
			
				
					|  |  |  |  |         try: | 
			
		
	
		
			
				
					|  |  |  |  |           log_resp = json.loads(log_recv_queue.get(timeout=1)) | 
			
		
	
		
			
				
					|  |  |  |  |           log_entry = log_resp.get("id") | 
			
		
	
		
			
				
					|  |  |  |  |           log_success = "result" in log_resp and log_resp["result"].get("success") | 
			
		
	
		
			
				
					|  |  |  |  |           cloudlog.debug(f"athena.log_handler.forward_response {log_entry} {log_success}") | 
			
		
	
		
			
				
					|  |  |  |  |           if log_entry and log_success: | 
			
		
	
		
			
				
					|  |  |  |  |             log_path = os.path.join(SWAGLOG_DIR, log_entry) | 
			
		
	
		
			
				
					|  |  |  |  |             try: | 
			
		
	
		
			
				
					|  |  |  |  |               setxattr(log_path, LOG_ATTR_NAME, LOG_ATTR_VALUE_MAX_UNIX_TIME) | 
			
		
	
		
			
				
					|  |  |  |  |             except OSError: | 
			
		
	
		
			
				
					|  |  |  |  |               pass  # file could be deleted by log rotation | 
			
		
	
		
			
				
					|  |  |  |  |           if curr_log == log_entry: | 
			
		
	
		
			
				
					|  |  |  |  |             break | 
			
		
	
		
			
				
					|  |  |  |  |         except queue.Empty: | 
			
		
	
		
			
				
					|  |  |  |  |           if curr_log is None: | 
			
		
	
		
			
				
					|  |  |  |  |             break | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     except Exception: | 
			
		
	
		
			
				
					|  |  |  |  |       cloudlog.exception("athena.log_handler.exception") | 
			
		
	
	
		
			
				
					|  |  |  | 
 |