athenad: small tests cleanup (#26037)

* test helpers

* create file helper

* clearer

* type

* fix default create path

* static methods
pull/26115/head
Cameron Clough 3 years ago committed by GitHub
parent 1dffbb629a
commit 9e2a1121ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 68
      selfdrive/athena/tests/test_athenad.py

@ -9,6 +9,7 @@ import threading
import queue import queue
import unittest import unittest
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional
from multiprocessing import Process from multiprocessing import Process
from pathlib import Path from pathlib import Path
@ -46,12 +47,26 @@ class TestAthenadMethods(unittest.TestCase):
else: else:
os.unlink(p) os.unlink(p)
def wait_for_upload(self):
# *** test helpers ***
@staticmethod
def _wait_for_upload():
now = time.time() now = time.time()
while time.time() - now < 5: while time.time() - now < 5:
if athenad.upload_queue.qsize() == 0: if athenad.upload_queue.qsize() == 0:
break break
@staticmethod
def _create_file(file: str, parent: Optional[str] = None) -> str:
fn = os.path.join(athenad.ROOT if parent is None else parent, file)
os.makedirs(os.path.dirname(fn), exist_ok=True)
Path(fn).touch()
return fn
# *** test cases ***
def test_echo(self): def test_echo(self):
assert dispatcher["echo"]("bob") == "bob" assert dispatcher["echo"]("bob") == "bob"
@ -85,9 +100,7 @@ class TestAthenadMethods(unittest.TestCase):
filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc'] filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
files = [f'{route}--{s}/{f}' for s in segments for f in filenames] files = [f'{route}--{s}/{f}' for s in segments for f in filenames]
for file in files: for file in files:
fn = os.path.join(athenad.ROOT, file) self._create_file(file)
os.makedirs(os.path.dirname(fn), exist_ok=True)
Path(fn).touch()
resp = dispatcher["listDataDirectory"]() resp = dispatcher["listDataDirectory"]()
self.assertTrue(resp, 'list empty!') self.assertTrue(resp, 'list empty!')
@ -121,16 +134,14 @@ class TestAthenadMethods(unittest.TestCase):
self.assertCountEqual(resp, expected) self.assertCountEqual(resp, expected)
def test_strip_bz2_extension(self): def test_strip_bz2_extension(self):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
if fn.endswith('.bz2'): if fn.endswith('.bz2'):
self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4]) self.assertEqual(athenad.strip_bz2_extension(fn), fn[:-4])
@with_http_server @with_http_server
def test_do_upload(self, host): def test_do_upload(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
with self.assertRaises(requests.exceptions.ConnectionError): with self.assertRaises(requests.exceptions.ConnectionError):
@ -142,8 +153,7 @@ class TestAthenadMethods(unittest.TestCase):
@with_http_server @with_http_server
def test_uploadFileToUrl(self, host): def test_uploadFileToUrl(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {}) resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
self.assertEqual(resp['enqueued'], 1) self.assertEqual(resp['enqueued'], 1)
@ -154,8 +164,7 @@ class TestAthenadMethods(unittest.TestCase):
@with_http_server @with_http_server
def test_uploadFileToUrl_duplicate(self, host): def test_uploadFileToUrl_duplicate(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') self._create_file('qlog.bz2')
Path(fn).touch()
url1 = f"{host}/qlog.bz2?sig=sig1" url1 = f"{host}/qlog.bz2?sig=sig1"
dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {}) dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {})
@ -172,8 +181,7 @@ class TestAthenadMethods(unittest.TestCase):
@with_http_server @with_http_server
def test_upload_handler(self, host): def test_upload_handler(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event() end_event = threading.Event()
@ -182,7 +190,7 @@ class TestAthenadMethods(unittest.TestCase):
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
try: try:
self.wait_for_upload() self._wait_for_upload()
time.sleep(0.1) time.sleep(0.1)
# TODO: verify that upload actually succeeded # TODO: verify that upload actually succeeded
@ -195,8 +203,7 @@ class TestAthenadMethods(unittest.TestCase):
def test_upload_handler_retry(self, host, mock_put): def test_upload_handler_retry(self, host, mock_put):
for status, retry in ((500, True), (412, False)): for status, retry in ((500, True), (412, False)):
mock_put.return_value.status_code = status mock_put.return_value.status_code = status
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event() end_event = threading.Event()
@ -205,7 +212,7 @@ class TestAthenadMethods(unittest.TestCase):
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
try: try:
self.wait_for_upload() self._wait_for_upload()
time.sleep(0.1) time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
@ -217,8 +224,7 @@ class TestAthenadMethods(unittest.TestCase):
def test_upload_handler_timeout(self): def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue""" """When an upload times out or fails to connect it should be placed back in the queue"""
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT) item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT)
@ -228,14 +234,14 @@ class TestAthenadMethods(unittest.TestCase):
try: try:
athenad.upload_queue.put_nowait(item_no_retry) athenad.upload_queue.put_nowait(item_no_retry)
self.wait_for_upload() self._wait_for_upload()
time.sleep(0.1) time.sleep(0.1)
# Check that upload with retry count exceeded is not put back # Check that upload with retry count exceeded is not put back
self.assertEqual(athenad.upload_queue.qsize(), 0) self.assertEqual(athenad.upload_queue.qsize(), 0)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
self.wait_for_upload() self._wait_for_upload()
time.sleep(0.1) time.sleep(0.1)
# Check that upload item was put back in the queue with incremented retry count # Check that upload item was put back in the queue with incremented retry count
@ -256,7 +262,7 @@ class TestAthenadMethods(unittest.TestCase):
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
thread.start() thread.start()
try: try:
self.wait_for_upload() self._wait_for_upload()
time.sleep(0.1) time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0) self.assertEqual(athenad.upload_queue.qsize(), 0)
@ -269,8 +275,7 @@ class TestAthenadMethods(unittest.TestCase):
ts = int(t_future.strftime("%s")) * 1000 ts = int(t_future.strftime("%s")) * 1000
# Item that would time out if actually uploaded # Item that would time out if actually uploaded
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True) item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
@ -279,7 +284,7 @@ class TestAthenadMethods(unittest.TestCase):
thread.start() thread.start()
try: try:
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
self.wait_for_upload() self._wait_for_upload()
time.sleep(0.1) time.sleep(0.1)
self.assertEqual(athenad.upload_queue.qsize(), 0) self.assertEqual(athenad.upload_queue.qsize(), 0)
@ -292,8 +297,7 @@ class TestAthenadMethods(unittest.TestCase):
@with_http_server @with_http_server
def test_listUploadQueueCurrent(self, host): def test_listUploadQueueCurrent(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = self._create_file('qlog.bz2')
Path(fn).touch()
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event() end_event = threading.Event()
@ -302,7 +306,7 @@ class TestAthenadMethods(unittest.TestCase):
try: try:
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
self.wait_for_upload() self._wait_for_upload()
items = dispatcher["listUploadQueue"]() items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 1) self.assertEqual(len(items), 1)
@ -405,9 +409,9 @@ class TestAthenadMethods(unittest.TestCase):
def test_get_logs_to_send_sorted(self): def test_get_logs_to_send_sorted(self):
fl = list() fl = list()
for i in range(10): for i in range(10):
fn = os.path.join(swaglog.SWAGLOG_DIR, f'swaglog.{i:010}') file = f'swaglog.{i:010}'
Path(fn).touch() self._create_file(file, athenad.SWAGLOG_DIR)
fl.append(os.path.basename(fn)) fl.append(file)
# ensure the list is all logs except most recent # ensure the list is all logs except most recent
sl = athenad.get_logs_to_send_sorted() sl = athenad.get_logs_to_send_sorted()

Loading…
Cancel
Save