test_athenad: create with_upload_handler decorator (#31250)

cleaner
old-commit-hash: 5c85925e9b
chrysler-long2
Justin Newberry 1 year ago committed by GitHub
parent 19b192faf3
commit ab63a1b1ef
  1. 143
      selfdrive/athena/tests/test_athenad.py

@ -1,5 +1,5 @@
#!/usr/bin/env python3
from functools import partial
from functools import partial, wraps
import json
import multiprocessing
import os
@ -42,6 +42,20 @@ def seed_athena_server(host, port):
with_mock_athena = partial(with_http_server, handler=HTTPRequestHandler, setup=seed_athena_server)
def with_upload_handler(func):
@wraps(func)
def wrapper(*args, **kwargs):
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
return func(*args, **kwargs)
finally:
end_event.set()
thread.join()
return wrapper
class TestAthenadMethods(unittest.TestCase):
@classmethod
def setUpClass(cls):
@ -209,77 +223,59 @@ class TestAthenadMethods(unittest.TestCase):
self.assertEqual(not_exists_resp, {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']})
@with_mock_athena
@with_upload_handler
def test_upload_handler(self, host):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
athenad.upload_queue.put_nowait(item)
try:
self._wait_for_upload()
time.sleep(0.1)
self._wait_for_upload()
time.sleep(0.1)
# TODO: verify that upload actually succeeded
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
# TODO: verify that upload actually succeeded
self.assertEqual(athenad.upload_queue.qsize(), 0)
@parameterized.expand([(500, True), (412, False)])
@with_mock_athena
@mock.patch('requests.put')
def test_upload_handler_retry(self, host, mock_put):
for status, retry in ((500, True), (412, False)):
mock_put.return_value.status_code = status
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
@with_upload_handler
def test_upload_handler_retry(self, status, retry, mock_put, host):
mock_put.return_value.status_code = status
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
try:
self._wait_for_upload()
time.sleep(0.1)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
finally:
end_event.set()
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
if retry:
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
if retry:
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
@with_upload_handler
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item_no_retry)
self._wait_for_upload()
time.sleep(0.1)
# Check that upload with retry count exceeded is not put back
self.assertEqual(athenad.upload_queue.qsize(), 0)
athenad.upload_queue.put_nowait(item_no_retry)
self._wait_for_upload()
time.sleep(0.1)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)
# Check that upload with retry count exceeded is not put back
self.assertEqual(athenad.upload_queue.qsize(), 0)
# Check that upload item was put back in the queue with incremented retry count
self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)
finally:
end_event.set()
# Check that upload item was put back in the queue with incremented retry count
self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertEqual(athenad.upload_queue.get().retry_count, 1)
@with_upload_handler
def test_cancelUpload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True)
@ -288,18 +284,13 @@ class TestAthenadMethods(unittest.TestCase):
self.assertIn(item.id, athenad.cancelled_uploads)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
self._wait_for_upload()
time.sleep(0.1)
self._wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0)
self.assertEqual(len(athenad.cancelled_uploads), 0)
finally:
end_event.set()
self.assertEqual(athenad.upload_queue.qsize(), 0)
self.assertEqual(len(athenad.cancelled_uploads), 0)
@with_upload_handler
def test_cancelExpiry(self):
t_future = datetime.now() - timedelta(days=40)
ts = int(t_future.strftime("%s")) * 1000
@ -308,42 +299,28 @@ class TestAthenadMethods(unittest.TestCase):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
self.assertEqual(athenad.upload_queue.qsize(), 0)
def test_listUploadQueueEmpty(self):
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 0)
@with_http_server
@with_upload_handler
def test_listUploadQueueCurrent(self, host: str):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start()
try:
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 1)
self.assertTrue(items[0]['current'])
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
finally:
end_event.set()
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 1)
self.assertTrue(items[0]['current'])
def test_listUploadQueue(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},

Loading…
Cancel
Save