From dd824ffe34d16244a3f51919677e5cb9bc883ff0 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Thu, 29 Jul 2021 11:13:59 +0200 Subject: [PATCH] athenad: retry failed and timed out uploads (#21745) * retry failed uploads * test cleanup * update comment * also catch SSL error * use defaults * sleep in chunks old-commit-hash: d5b6746ac55ec2947a7e7d14c12035168b73505d --- selfdrive/athena/athenad.py | 20 ++++++++- selfdrive/athena/tests/test_athenad.py | 56 +++++++++++++++++++++----- 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index 33eba2dab6..86751fad86 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -39,6 +39,9 @@ LOG_ATTR_NAME = 'user.upload' LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder) RECONNECT_TIMEOUT_S = 70 +RETRY_DELAY = 10 # seconds +MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately + dispatcher["echo"] = lambda s: s recv_queue: Any = queue.Queue() send_queue: Any = queue.Queue() @@ -46,7 +49,7 @@ upload_queue: Any = queue.Queue() log_send_queue: Any = queue.Queue() log_recv_queue: Any = queue.Queue() cancelled_uploads: Any = set() -UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id']) +UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count'], defaults=(0,)) def handle_long_poll(ws): @@ -103,7 +106,20 @@ def upload_handler(end_event): if item.id in cancelled_uploads: cancelled_uploads.remove(item.id) continue - _do_upload(item) + + try: + _do_upload(item) + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e: + cloudlog.warning(f"athena.upload_handler.retry {e} {item}") + + if item.retry_count < MAX_RETRY_COUNT: + item = item._replace(retry_count=item.retry_count + 1) + upload_queue.put_nowait(item) + + for _ in range(RETRY_DELAY): + time.sleep(1) + if end_event.is_set(): + break except queue.Empty: pass except Exception: diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index be6c631a0c..67069f136f 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -16,7 +16,7 @@ from websocket._exceptions import WebSocketConnectionClosedException from selfdrive import swaglog from selfdrive.athena import athenad -from selfdrive.athena.athenad import dispatcher +from selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher from selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server from cereal import messaging @@ -30,6 +30,12 @@ class TestAthenadMethods(unittest.TestCase): athenad.Api = MockApi athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT]) + def wait_for_upload(self): + now = time.time() + while time.time() - now < 5: + if athenad.upload_queue.qsize() == 0: + break + def test_echo(self): assert dispatcher["echo"]("bob") == "bob" @@ -105,12 +111,43 @@ class TestAthenadMethods(unittest.TestCase): athenad.upload_queue.put_nowait(item) try: - time.sleep(1) # give it time to process to prevent shutdown before upload completes - now = time.time() - while time.time() - now < 5: - if athenad.upload_queue.qsize() == 0: - break + self.wait_for_upload() + time.sleep(0.1) + + # TODO: verify that upload actually succeeded + self.assertEqual(athenad.upload_queue.qsize(), 0) + finally: + end_event.set() + athenad.upload_queue = queue.Queue() + os.unlink(fn) + + def test_upload_handler_timeout(self): + """When an upload times out or fails to connect it should be placed back in the queue""" + fn = os.path.join(athenad.ROOT, 'qlog.bz2') + Path(fn).touch() + item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT) + + end_event = threading.Event() + thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) + thread.start() + + try: + athenad.upload_queue.put_nowait(item_no_retry) + self.wait_for_upload() + time.sleep(0.1) + + # Check that upload with retry count exceeded is not put back self.assertEqual(athenad.upload_queue.qsize(), 0) + + athenad.upload_queue.put_nowait(item) + self.wait_for_upload() + time.sleep(0.1) + + # Check that upload item was put back in the queue with incremented retry count + self.assertEqual(athenad.upload_queue.qsize(), 1) + self.assertEqual(athenad.upload_queue.get().retry_count, 1) + finally: end_event.set() athenad.upload_queue = queue.Queue() @@ -127,10 +164,9 @@ class TestAthenadMethods(unittest.TestCase): thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread.start() try: - now = time.time() - while time.time() - now < 5: - if athenad.upload_queue.qsize() == 0 and len(athenad.cancelled_uploads) == 0: - break + self.wait_for_upload() + time.sleep(0.1) + self.assertEqual(athenad.upload_queue.qsize(), 0) self.assertEqual(len(athenad.cancelled_uploads), 0) finally: