diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 9c7582a260..2fecab1b1b 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -from functools import partial +from functools import partial, wraps import json import multiprocessing import os @@ -42,6 +42,20 @@ def seed_athena_server(host, port): with_mock_athena = partial(with_http_server, handler=HTTPRequestHandler, setup=seed_athena_server) +def with_upload_handler(func): + @wraps(func) + def wrapper(*args, **kwargs): + end_event = threading.Event() + thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) + thread.start() + try: + return func(*args, **kwargs) + finally: + end_event.set() + thread.join() + return wrapper + + class TestAthenadMethods(unittest.TestCase): @classmethod def setUpClass(cls): @@ -209,77 +223,59 @@ class TestAthenadMethods(unittest.TestCase): self.assertEqual(not_exists_resp, {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']}) @with_mock_athena + @with_upload_handler def test_upload_handler(self, host): fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) - end_event = threading.Event() - thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) - thread.start() - athenad.upload_queue.put_nowait(item) - try: - self._wait_for_upload() - time.sleep(0.1) + self._wait_for_upload() + time.sleep(0.1) - # TODO: verify that upload actually succeeded - self.assertEqual(athenad.upload_queue.qsize(), 0) - finally: - end_event.set() + # TODO: verify that upload actually succeeded + self.assertEqual(athenad.upload_queue.qsize(), 0) + @parameterized.expand([(500, True), (412, False)]) @with_mock_athena @mock.patch('requests.put') - def test_upload_handler_retry(self, host, mock_put): - for status, retry in ((500, True), (412, False)): - mock_put.return_value.status_code = status - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) - - end_event = threading.Event() - thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) - thread.start() + @with_upload_handler + def test_upload_handler_retry(self, status, retry, mock_put, host): + mock_put.return_value.status_code = status + fn = self._create_file('qlog.bz2') + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) - athenad.upload_queue.put_nowait(item) - try: - self._wait_for_upload() - time.sleep(0.1) + athenad.upload_queue.put_nowait(item) + self._wait_for_upload() + time.sleep(0.1) - self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) - finally: - end_event.set() + self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) - if retry: - self.assertEqual(athenad.upload_queue.get().retry_count, 1) + if retry: + self.assertEqual(athenad.upload_queue.get().retry_count, 1) + @with_upload_handler def test_upload_handler_timeout(self): """When an upload times out or fails to connect it should be placed back in the queue""" fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) - end_event = threading.Event() - thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) - thread.start() - - try: - athenad.upload_queue.put_nowait(item_no_retry) - self._wait_for_upload() - time.sleep(0.1) - - # Check that upload with retry count exceeded is not put back - self.assertEqual(athenad.upload_queue.qsize(), 0) + athenad.upload_queue.put_nowait(item_no_retry) + self._wait_for_upload() + time.sleep(0.1) - athenad.upload_queue.put_nowait(item) - self._wait_for_upload() - time.sleep(0.1) + # Check that upload with retry count exceeded is not put back + self.assertEqual(athenad.upload_queue.qsize(), 0) - # Check that upload item was put back in the queue with incremented retry count - self.assertEqual(athenad.upload_queue.qsize(), 1) - self.assertEqual(athenad.upload_queue.get().retry_count, 1) + athenad.upload_queue.put_nowait(item) + self._wait_for_upload() + time.sleep(0.1) - finally: - end_event.set() + # Check that upload item was put back in the queue with incremented retry count + self.assertEqual(athenad.upload_queue.qsize(), 1) + self.assertEqual(athenad.upload_queue.get().retry_count, 1) + @with_upload_handler def test_cancelUpload(self): item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True) @@ -288,18 +284,13 @@ class TestAthenadMethods(unittest.TestCase): self.assertIn(item.id, athenad.cancelled_uploads) - end_event = threading.Event() - thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) - thread.start() - try: - self._wait_for_upload() - time.sleep(0.1) + self._wait_for_upload() + time.sleep(0.1) - self.assertEqual(athenad.upload_queue.qsize(), 0) - self.assertEqual(len(athenad.cancelled_uploads), 0) - finally: - end_event.set() + self.assertEqual(athenad.upload_queue.qsize(), 0) + self.assertEqual(len(athenad.cancelled_uploads), 0) + @with_upload_handler def test_cancelExpiry(self): t_future = datetime.now() - timedelta(days=40) ts = int(t_future.strftime("%s")) * 1000 @@ -308,42 +299,28 @@ class TestAthenadMethods(unittest.TestCase): fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True) + athenad.upload_queue.put_nowait(item) + self._wait_for_upload() + time.sleep(0.1) - end_event = threading.Event() - thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) - thread.start() - try: - athenad.upload_queue.put_nowait(item) - self._wait_for_upload() - time.sleep(0.1) - - self.assertEqual(athenad.upload_queue.qsize(), 0) - finally: - end_event.set() + self.assertEqual(athenad.upload_queue.qsize(), 0) def test_listUploadQueueEmpty(self): items = dispatcher["listUploadQueue"]() self.assertEqual(len(items), 0) @with_http_server + @with_upload_handler def test_listUploadQueueCurrent(self, host: str): fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) - end_event = threading.Event() - thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) - thread.start() - - try: - athenad.upload_queue.put_nowait(item) - self._wait_for_upload() - - items = dispatcher["listUploadQueue"]() - self.assertEqual(len(items), 1) - self.assertTrue(items[0]['current']) + athenad.upload_queue.put_nowait(item) + self._wait_for_upload() - finally: - end_event.set() + items = dispatcher["listUploadQueue"]() + self.assertEqual(len(items), 1) + self.assertTrue(items[0]['current']) def test_listUploadQueue(self): item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},