diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 7f511eecf6..5e86a2e821 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -9,6 +9,7 @@ import threading import queue import unittest from datetime import datetime, timedelta +from typing import Optional from multiprocessing import Process from pathlib import Path @@ -46,12 +47,26 @@ class TestAthenadMethods(unittest.TestCase): else: os.unlink(p) - def wait_for_upload(self): + + # *** test helpers *** + + @staticmethod + def _wait_for_upload(): now = time.time() while time.time() - now < 5: if athenad.upload_queue.qsize() == 0: break + @staticmethod + def _create_file(file: str, parent: Optional[str] = None) -> str: + fn = os.path.join(athenad.ROOT if parent is None else parent, file) + os.makedirs(os.path.dirname(fn), exist_ok=True) + Path(fn).touch() + return fn + + + # *** test cases *** + def test_echo(self): assert dispatcher["echo"]("bob") == "bob" @@ -85,9 +100,7 @@ class TestAthenadMethods(unittest.TestCase): filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc'] files = [f'{route}--{s}/{f}' for s in segments for f in filenames] for file in files: - fn = os.path.join(athenad.ROOT, file) - os.makedirs(os.path.dirname(fn), exist_ok=True) - Path(fn).touch() + self._create_file(file) resp = dispatcher["listDataDirectory"]() self.assertTrue(resp, 'list empty!') @@ -121,16 +134,14 @@ class TestAthenadMethods(unittest.TestCase): self.assertCountEqual(resp, expected) def test_strip_bz2_extension(self): - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') if fn.endswith('.bz2'): self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4]) @with_http_server def test_do_upload(self, host): - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') with self.assertRaises(requests.exceptions.ConnectionError): @@ -142,8 +153,7 @@ class TestAthenadMethods(unittest.TestCase): @with_http_server def test_uploadFileToUrl(self, host): - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {}) self.assertEqual(resp['enqueued'], 1) @@ -154,8 +164,7 @@ class TestAthenadMethods(unittest.TestCase): @with_http_server def test_uploadFileToUrl_duplicate(self, host): - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + self._create_file('qlog.bz2') url1 = f"{host}/qlog.bz2?sig=sig1" dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {}) @@ -172,8 +181,7 @@ class TestAthenadMethods(unittest.TestCase): @with_http_server def test_upload_handler(self, host): - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) end_event = threading.Event() @@ -182,7 +190,7 @@ class TestAthenadMethods(unittest.TestCase): athenad.upload_queue.put_nowait(item) try: - self.wait_for_upload() + self._wait_for_upload() time.sleep(0.1) # TODO: verify that upload actually succeeded @@ -195,8 +203,7 @@ class TestAthenadMethods(unittest.TestCase): def test_upload_handler_retry(self, host, mock_put): for status, retry in ((500, True), (412, False)): mock_put.return_value.status_code = status - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) end_event = threading.Event() @@ -205,7 +212,7 @@ class TestAthenadMethods(unittest.TestCase): athenad.upload_queue.put_nowait(item) try: - self.wait_for_upload() + self._wait_for_upload() time.sleep(0.1) self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) @@ -217,8 +224,7 @@ class TestAthenadMethods(unittest.TestCase): def test_upload_handler_timeout(self): """When an upload times out or fails to connect it should be placed back in the queue""" - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT) @@ -228,14 +234,14 @@ class TestAthenadMethods(unittest.TestCase): try: athenad.upload_queue.put_nowait(item_no_retry) - self.wait_for_upload() + self._wait_for_upload() time.sleep(0.1) # Check that upload with retry count exceeded is not put back self.assertEqual(athenad.upload_queue.qsize(), 0) athenad.upload_queue.put_nowait(item) - self.wait_for_upload() + self._wait_for_upload() time.sleep(0.1) # Check that upload item was put back in the queue with incremented retry count @@ -256,7 +262,7 @@ class TestAthenadMethods(unittest.TestCase): thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread.start() try: - self.wait_for_upload() + self._wait_for_upload() time.sleep(0.1) self.assertEqual(athenad.upload_queue.qsize(), 0) @@ -269,8 +275,7 @@ class TestAthenadMethods(unittest.TestCase): ts = int(t_future.strftime("%s")) * 1000 # Item that would time out if actually uploaded - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True) @@ -279,7 +284,7 @@ class TestAthenadMethods(unittest.TestCase): thread.start() try: athenad.upload_queue.put_nowait(item) - self.wait_for_upload() + self._wait_for_upload() time.sleep(0.1) self.assertEqual(athenad.upload_queue.qsize(), 0) @@ -292,8 +297,7 @@ class TestAthenadMethods(unittest.TestCase): @with_http_server def test_listUploadQueueCurrent(self, host): - fn = os.path.join(athenad.ROOT, 'qlog.bz2') - Path(fn).touch() + fn = self._create_file('qlog.bz2') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) end_event = threading.Event() @@ -302,7 +306,7 @@ class TestAthenadMethods(unittest.TestCase): try: athenad.upload_queue.put_nowait(item) - self.wait_for_upload() + self._wait_for_upload() items = dispatcher["listUploadQueue"]() self.assertEqual(len(items), 1) @@ -405,9 +409,9 @@ class TestAthenadMethods(unittest.TestCase): def test_get_logs_to_send_sorted(self): fl = list() for i in range(10): - fn = os.path.join(swaglog.SWAGLOG_DIR, f'swaglog.{i:010}') - Path(fn).touch() - fl.append(os.path.basename(fn)) + file = f'swaglog.{i:010}' + self._create_file(file, athenad.SWAGLOG_DIR) + fl.append(file) # ensure the list is all logs except most recent sl = athenad.get_logs_to_send_sorted()