pull/35744/head
Shane Smiskol 1 week ago
parent 81427fdbf0
commit d96a162403
  1. 7
      system/athena/athenad.py
  2. 23
      system/athena/tests/test_athenad.py

@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# ruff: noqa: TID251
from __future__ import annotations from __future__ import annotations
import base64 import base64
@ -426,7 +427,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
path=path, path=path,
url=file.url, url=file.url,
headers=file.headers, headers=file.headers,
created_at=int(time.time() * 1000), # noqa: TID251 created_at=int(time.time() * 1000),
id=None, id=None,
allow_cellular=file.allow_cellular, allow_cellular=file.allow_cellular,
priority=file.priority, priority=file.priority,
@ -580,7 +581,7 @@ def takeSnapshot() -> str | dict[str, str] | None:
def get_logs_to_send_sorted() -> list[str]: def get_logs_to_send_sorted() -> list[str]:
# TODO: scan once then use inotify to detect file creation/deletion # TODO: scan once then use inotify to detect file creation/deletion
curr_time = int(time.time()) # noqa: TID251 curr_time = int(time.time())
logs = [] logs = []
for log_entry in os.listdir(Paths.swaglog_root()): for log_entry in os.listdir(Paths.swaglog_root()):
log_path = os.path.join(Paths.swaglog_root(), log_entry) log_path = os.path.join(Paths.swaglog_root(), log_entry)
@ -617,7 +618,7 @@ def log_handler(end_event: threading.Event) -> None:
log_entry = log_files.pop() # newest log file log_entry = log_files.pop() # newest log file
cloudlog.debug(f"athena.log_handler.forward_request {log_entry}") cloudlog.debug(f"athena.log_handler.forward_request {log_entry}")
try: try:
curr_time = int(time.time()) # noqa: TID251 curr_time = int(time.time())
log_path = os.path.join(Paths.swaglog_root(), log_entry) log_path = os.path.join(Paths.swaglog_root(), log_entry)
setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder)) setxattr(log_path, LOG_ATTR_NAME, int.to_bytes(curr_time, 4, sys.byteorder))
with open(log_path) as f: with open(log_path) as f:

@ -1,3 +1,4 @@
# ruff: noqa: TID251
import pytest import pytest
from functools import wraps from functools import wraps
import json import json
@ -190,11 +191,11 @@ class TestAthenadMethods:
fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) fn = self._create_file('qlog', data=os.urandom(10000 * 1024))
upload_fn = fn + ('.zst' if compress else '') upload_fn = fn + ('.zst' if compress else '')
item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') # noqa: TID251 item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
with pytest.raises(requests.exceptions.ConnectionError): with pytest.raises(requests.exceptions.ConnectionError):
athenad._do_upload(item) athenad._do_upload(item)
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='') # noqa: TID251 item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item) resp = athenad._do_upload(item)
assert resp.status_code == 201 assert resp.status_code == 201
@ -226,7 +227,7 @@ class TestAthenadMethods:
@with_upload_handler @with_upload_handler
def test_upload_handler(self, host): def test_upload_handler(self, host):
fn = self._create_file('qlog.zst') fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) # noqa: TID251 item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
self._wait_for_upload() self._wait_for_upload()
@ -242,7 +243,7 @@ class TestAthenadMethods:
mock_put = mocker.patch('openpilot.system.athena.athenad.UPLOAD_SESS.put') mock_put = mocker.patch('openpilot.system.athena.athenad.UPLOAD_SESS.put')
mock_put.return_value.__enter__.return_value.status_code = status mock_put.return_value.__enter__.return_value.status_code = status
fn = self._create_file('qlog.zst') fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) # noqa: TID251 item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
self._wait_for_upload() self._wait_for_upload()
@ -257,7 +258,7 @@ class TestAthenadMethods:
def test_upload_handler_timeout(self): def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue""" """When an upload times out or fails to connect it should be placed back in the queue"""
fn = self._create_file('qlog.zst') fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) # noqa: TID251 item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT)
athenad.upload_queue.put_nowait(item_no_retry) athenad.upload_queue.put_nowait(item_no_retry)
@ -278,7 +279,7 @@ class TestAthenadMethods:
@with_upload_handler @with_upload_handler
def test_cancel_upload(self): def test_cancel_upload(self):
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True) # noqa: TID251 created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
dispatcher["cancelUpload"](item.id) dispatcher["cancelUpload"](item.id)
@ -312,7 +313,7 @@ class TestAthenadMethods:
@with_upload_handler @with_upload_handler
def test_list_upload_queue_current(self, host: str): def test_list_upload_queue_current(self, host: str):
fn = self._create_file('qlog.zst') fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) # noqa: TID251 item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
self._wait_for_upload() self._wait_for_upload()
@ -331,7 +332,7 @@ class TestAthenadMethods:
path=fp, path=fp,
url=f"http://localhost:44444/{fn}", url=f"http://localhost:44444/{fn}",
headers={}, headers={},
created_at=int(time.time()*1000), # noqa: TID251 created_at=int(time.time()*1000),
id='', id='',
allow_cellular=True, allow_cellular=True,
priority=i priority=i
@ -343,7 +344,7 @@ class TestAthenadMethods:
def test_list_upload_queue(self): def test_list_upload_queue(self):
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True) # noqa: TID251 created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
items = dispatcher["listUploadQueue"]() items = dispatcher["listUploadQueue"]()
@ -356,8 +357,8 @@ class TestAthenadMethods:
assert len(items) == 0 assert len(items) == 0
def test_upload_queue_persistence(self): def test_upload_queue_persistence(self):
item1 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id1') # noqa: TID251 item1 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id1')
item2 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id2') # noqa: TID251 item2 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id2')
athenad.upload_queue.put_nowait(item1) athenad.upload_queue.put_nowait(item1)
athenad.upload_queue.put_nowait(item2) athenad.upload_queue.put_nowait(item2)

Loading…
Cancel
Save