|
|
@ -29,7 +29,7 @@ def seed_athena_server(host, port): |
|
|
|
with Timeout(2, 'HTTP Server seeding failed'): |
|
|
|
with Timeout(2, 'HTTP Server seeding failed'): |
|
|
|
while True: |
|
|
|
while True: |
|
|
|
try: |
|
|
|
try: |
|
|
|
requests.put(f'http://{host}:{port}/qlog.bz2', data='', timeout=10) |
|
|
|
requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10) |
|
|
|
break |
|
|
|
break |
|
|
|
except requests.exceptions.ConnectionError: |
|
|
|
except requests.exceptions.ConnectionError: |
|
|
|
time.sleep(0.1) |
|
|
|
time.sleep(0.1) |
|
|
@ -174,54 +174,59 @@ class TestAthenadMethods: |
|
|
|
assert resp, 'list empty!' |
|
|
|
assert resp, 'list empty!' |
|
|
|
assert len(resp) == len(expected) |
|
|
|
assert len(resp) == len(expected) |
|
|
|
|
|
|
|
|
|
|
|
def test_strip_bz2_extension(self): |
|
|
|
def test_strip_extension(self): |
|
|
|
|
|
|
|
# any requested log file with an invalid extension won't return as existing |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
if fn.endswith('.bz2'): |
|
|
|
if fn.endswith('.bz2'): |
|
|
|
assert athenad.strip_bz2_extension(fn) == fn[:-4] |
|
|
|
assert athenad.strip_zst_extension(fn) == fn |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn = self._create_file('qlog.zst') |
|
|
|
|
|
|
|
if fn.endswith('.zst'): |
|
|
|
|
|
|
|
assert athenad.strip_zst_extension(fn) == fn[:-4] |
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("compress", [True, False]) |
|
|
|
@pytest.mark.parametrize("compress", [True, False]) |
|
|
|
def test_do_upload(self, host, compress): |
|
|
|
def test_do_upload(self, host, compress): |
|
|
|
# random bytes to ensure rather large object post-compression |
|
|
|
# random bytes to ensure rather large object post-compression |
|
|
|
fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) |
|
|
|
fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) |
|
|
|
|
|
|
|
|
|
|
|
upload_fn = fn + ('.bz2' if compress else '') |
|
|
|
upload_fn = fn + ('.zst' if compress else '') |
|
|
|
item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') |
|
|
|
item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') |
|
|
|
with pytest.raises(requests.exceptions.ConnectionError): |
|
|
|
with pytest.raises(requests.exceptions.ConnectionError): |
|
|
|
athenad._do_upload(item) |
|
|
|
athenad._do_upload(item) |
|
|
|
|
|
|
|
|
|
|
|
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') |
|
|
|
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='') |
|
|
|
resp = athenad._do_upload(item) |
|
|
|
resp = athenad._do_upload(item) |
|
|
|
assert resp.status_code == 201 |
|
|
|
assert resp.status_code == 201 |
|
|
|
|
|
|
|
|
|
|
|
def test_upload_file_to_url(self, host): |
|
|
|
def test_upload_file_to_url(self, host): |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
fn = self._create_file('qlog.zst') |
|
|
|
|
|
|
|
|
|
|
|
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {}) |
|
|
|
resp = dispatcher["uploadFileToUrl"]("qlog.zst", f"{host}/qlog.zst", {}) |
|
|
|
assert resp['enqueued'] == 1 |
|
|
|
assert resp['enqueued'] == 1 |
|
|
|
assert 'failed' not in resp |
|
|
|
assert 'failed' not in resp |
|
|
|
assert {"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}.items() <= resp['items'][0].items() |
|
|
|
assert {"path": fn, "url": f"{host}/qlog.zst", "headers": {}}.items() <= resp['items'][0].items() |
|
|
|
assert resp['items'][0].get('id') is not None |
|
|
|
assert resp['items'][0].get('id') is not None |
|
|
|
assert athenad.upload_queue.qsize() == 1 |
|
|
|
assert athenad.upload_queue.qsize() == 1 |
|
|
|
|
|
|
|
|
|
|
|
def test_upload_file_to_url_duplicate(self, host): |
|
|
|
def test_upload_file_to_url_duplicate(self, host): |
|
|
|
self._create_file('qlog.bz2') |
|
|
|
self._create_file('qlog.zst') |
|
|
|
|
|
|
|
|
|
|
|
url1 = f"{host}/qlog.bz2?sig=sig1" |
|
|
|
url1 = f"{host}/qlog.zst?sig=sig1" |
|
|
|
dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {}) |
|
|
|
dispatcher["uploadFileToUrl"]("qlog.zst", url1, {}) |
|
|
|
|
|
|
|
|
|
|
|
# Upload same file again, but with different signature |
|
|
|
# Upload same file again, but with different signature |
|
|
|
url2 = f"{host}/qlog.bz2?sig=sig2" |
|
|
|
url2 = f"{host}/qlog.zst?sig=sig2" |
|
|
|
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", url2, {}) |
|
|
|
resp = dispatcher["uploadFileToUrl"]("qlog.zst", url2, {}) |
|
|
|
assert resp == {'enqueued': 0, 'items': []} |
|
|
|
assert resp == {'enqueued': 0, 'items': []} |
|
|
|
|
|
|
|
|
|
|
|
def test_upload_file_to_url_does_not_exist(self, host): |
|
|
|
def test_upload_file_to_url_does_not_exist(self, host): |
|
|
|
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {}) |
|
|
|
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.zst", "http://localhost:1238", {}) |
|
|
|
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']} |
|
|
|
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.zst']} |
|
|
|
|
|
|
|
|
|
|
|
@with_upload_handler |
|
|
|
@with_upload_handler |
|
|
|
def test_upload_handler(self, host): |
|
|
|
def test_upload_handler(self, host): |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
fn = self._create_file('qlog.zst') |
|
|
|
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
|
|
|
|
|
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
self._wait_for_upload() |
|
|
|
self._wait_for_upload() |
|
|
@ -236,8 +241,8 @@ class TestAthenadMethods: |
|
|
|
def test_upload_handler_retry(self, mocker, host, status, retry): |
|
|
|
def test_upload_handler_retry(self, mocker, host, status, retry): |
|
|
|
mock_put = mocker.patch('requests.put') |
|
|
|
mock_put = mocker.patch('requests.put') |
|
|
|
mock_put.return_value.status_code = status |
|
|
|
mock_put.return_value.status_code = status |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
fn = self._create_file('qlog.zst') |
|
|
|
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
|
|
|
|
|
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
self._wait_for_upload() |
|
|
|
self._wait_for_upload() |
|
|
@ -251,8 +256,8 @@ class TestAthenadMethods: |
|
|
|
@with_upload_handler |
|
|
|
@with_upload_handler |
|
|
|
def test_upload_handler_timeout(self): |
|
|
|
def test_upload_handler_timeout(self): |
|
|
|
"""When an upload times out or fails to connect it should be placed back in the queue""" |
|
|
|
"""When an upload times out or fails to connect it should be placed back in the queue""" |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
fn = self._create_file('qlog.zst') |
|
|
|
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) |
|
|
|
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) |
|
|
|
|
|
|
|
|
|
|
|
athenad.upload_queue.put_nowait(item_no_retry) |
|
|
|
athenad.upload_queue.put_nowait(item_no_retry) |
|
|
@ -272,7 +277,7 @@ class TestAthenadMethods: |
|
|
|
|
|
|
|
|
|
|
|
@with_upload_handler |
|
|
|
@with_upload_handler |
|
|
|
def test_cancel_upload(self): |
|
|
|
def test_cancel_upload(self): |
|
|
|
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, |
|
|
|
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, |
|
|
|
created_at=int(time.time()*1000), id='id', allow_cellular=True) |
|
|
|
created_at=int(time.time()*1000), id='id', allow_cellular=True) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
dispatcher["cancelUpload"](item.id) |
|
|
|
dispatcher["cancelUpload"](item.id) |
|
|
@ -291,8 +296,8 @@ class TestAthenadMethods: |
|
|
|
ts = int(t_future.strftime("%s")) * 1000 |
|
|
|
ts = int(t_future.strftime("%s")) * 1000 |
|
|
|
|
|
|
|
|
|
|
|
# Item that would time out if actually uploaded |
|
|
|
# Item that would time out if actually uploaded |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
fn = self._create_file('qlog.zst') |
|
|
|
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True) |
|
|
|
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=ts, id='', allow_cellular=True) |
|
|
|
|
|
|
|
|
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
self._wait_for_upload() |
|
|
|
self._wait_for_upload() |
|
|
@ -306,8 +311,8 @@ class TestAthenadMethods: |
|
|
|
|
|
|
|
|
|
|
|
@with_upload_handler |
|
|
|
@with_upload_handler |
|
|
|
def test_list_upload_queue_current(self, host: str): |
|
|
|
def test_list_upload_queue_current(self, host: str): |
|
|
|
fn = self._create_file('qlog.bz2') |
|
|
|
fn = self._create_file('qlog.zst') |
|
|
|
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) |
|
|
|
|
|
|
|
|
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
self._wait_for_upload() |
|
|
|
self._wait_for_upload() |
|
|
@ -317,7 +322,7 @@ class TestAthenadMethods: |
|
|
|
assert items[0]['current'] |
|
|
|
assert items[0]['current'] |
|
|
|
|
|
|
|
|
|
|
|
def test_list_upload_queue(self): |
|
|
|
def test_list_upload_queue(self): |
|
|
|
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, |
|
|
|
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, |
|
|
|
created_at=int(time.time()*1000), id='id', allow_cellular=True) |
|
|
|
created_at=int(time.time()*1000), id='id', allow_cellular=True) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
athenad.upload_queue.put_nowait(item) |
|
|
|
|
|
|
|
|
|
|
|