athenad: retry failed and timed out uploads (#21745)

* retry failed uploads

* test cleanup

* update comment

* also catch SSL error

* use defaults

* sleep in chunks
old-commit-hash: d5b6746ac5
commatwo_master
Willem Melching 4 years ago committed by GitHub
parent 799931cf21
commit dd824ffe34
  1. 20
      selfdrive/athena/athenad.py
  2. 56
      selfdrive/athena/tests/test_athenad.py

@ -39,6 +39,9 @@ LOG_ATTR_NAME = 'user.upload'
LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder)
RECONNECT_TIMEOUT_S = 70
RETRY_DELAY = 10 # seconds
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
dispatcher["echo"] = lambda s: s
recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue()
@ -46,7 +49,7 @@ upload_queue: Any = queue.Queue()
log_send_queue: Any = queue.Queue()
log_recv_queue: Any = queue.Queue()
cancelled_uploads: Any = set()
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id'])
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count'], defaults=(0,))
def handle_long_poll(ws):
@ -103,7 +106,20 @@ def upload_handler(end_event):
if item.id in cancelled_uploads:
cancelled_uploads.remove(item.id)
continue
_do_upload(item)
try:
_do_upload(item)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
cloudlog.warning(f"athena.upload_handler.retry {e} {item}")
if item.retry_count < MAX_RETRY_COUNT:
item = item._replace(retry_count=item.retry_count + 1)
upload_queue.put_nowait(item)
for _ in range(RETRY_DELAY):
time.sleep(1)
if end_event.is_set():
break
except queue.Empty:
pass
except Exception:

@ -16,7 +16,7 @@ from websocket._exceptions import WebSocketConnectionClosedException
from selfdrive import swaglog
from selfdrive.athena import athenad
from selfdrive.athena.athenad import dispatcher
from selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher
from selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server
from cereal import messaging
@ -30,6 +30,12 @@ class TestAthenadMethods(unittest.TestCase):
athenad.Api = MockApi
athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT])
def wait_for_upload(self):
now = time.time()
while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0:
break
def test_echo(self):
assert dispatcher["echo"]("bob") == "bob"
@ -105,12 +111,43 @@ class TestAthenadMethods(unittest.TestCase):
athenad.upload_queue.put_nowait(item)
try:
time.sleep(1) # give it time to process to prevent shutdown before upload completes
now = time.time()
while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0:
break
self.wait_for_upload()
time.sleep(0.1)
# TODO: verify that upload actually succeeded
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
athenad.upload_queue = queue.Queue()
os.unlink(fn)
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item_no_retry)
self.wait_for_upload()
time.sleep(0.1)
# Check that upload with retry count exceeded is not put back
self.assertEqual(athenad.upload_queue.qsize(), 0)
athenad.upload_queue.put_nowait(item)
self.wait_for_upload()
time.sleep(0.1)
# Check that upload item was put back in the queue with incremented retry count
self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
finally:
end_event.set()
athenad.upload_queue = queue.Queue()
@ -127,10 +164,9 @@ class TestAthenadMethods(unittest.TestCase):
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
now = time.time()
while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0 and len(athenad.cancelled_uploads) == 0:
break
self.wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0)
self.assertEqual(len(athenad.cancelled_uploads), 0)
finally:

Loading…
Cancel
Save