diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index a0c6a0c844..275e336846 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -14,7 +14,7 @@ import time import tempfile from collections import namedtuple from functools import partial -from typing import Any +from typing import Any, Dict import requests from jsonrpc import JSONRPCResponseManager, dispatcher @@ -55,7 +55,7 @@ log_recv_queue: Any = queue.Queue() cancelled_uploads: Any = set() UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0)) -cur_upload_items = {} +cur_upload_items: Dict[int, Any] = {} class UploadQueueCache(): @@ -128,7 +128,26 @@ def jsonrpc_handler(end_event): send_queue.put_nowait(json.dumps({"error": str(e)})) -def upload_handler(end_event): +def retry_upload(tid: int, end_event: threading.Event) -> None: + if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT: + item = cur_upload_items[tid] + item = item._replace( + retry_count=item.retry_count + 1, + progress=0, + current=False + ) + upload_queue.put_nowait(item) + UploadQueueCache.cache(upload_queue) + + cur_upload_items[tid] = None + + for _ in range(RETRY_DELAY): + time.sleep(1) + if end_event.is_set(): + break + + +def upload_handler(end_event: threading.Event) -> None: tid = threading.get_ident() while not end_event.is_set(): @@ -145,27 +164,15 @@ def upload_handler(end_event): def cb(sz, cur): cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1) - _do_upload(cur_upload_items[tid], cb) + response = _do_upload(cur_upload_items[tid], cb) + if response.status_code not in (200, 201, 403, 412): + cloudlog.warning(f"athena.upload_handler.retry {response.status_code} {cur_upload_items[tid]}") + retry_upload(tid, end_event) UploadQueueCache.cache(upload_queue) except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e: cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}") - if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT: - item = cur_upload_items[tid] - item = item._replace( - retry_count=item.retry_count + 1, - progress=0, - current=False - ) - upload_queue.put_nowait(item) - UploadQueueCache.cache(upload_queue) - - cur_upload_items[tid] = None - - for _ in range(RETRY_DELAY): - time.sleep(1) - if end_event.is_set(): - break + retry_upload(tid, end_event) except queue.Empty: pass diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 06d762c180..fb5936dde1 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -166,6 +166,31 @@ class TestAthenadMethods(unittest.TestCase): finally: end_event.set() + @with_http_server + @mock.patch('requests.put') + def test_upload_handler_retry(self, host, mock_put): + for status, retry in ((500, True), (412, False)): + mock_put.return_value.status_code = status + fn = os.path.join(athenad.ROOT, 'qlog.bz2') + Path(fn).touch() + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + + end_event = threading.Event() + thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) + thread.start() + + athenad.upload_queue.put_nowait(item) + try: + self.wait_for_upload() + time.sleep(0.1) + + self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) + finally: + end_event.set() + + if retry: + self.assertEqual(athenad.upload_queue.get().retry_count, 1) + def test_upload_handler_timeout(self): """When an upload times out or fails to connect it should be placed back in the queue""" fn = os.path.join(athenad.ROOT, 'qlog.bz2')