diff --git a/system/athena/athenad.py b/system/athena/athenad.py index b36bdb103d..5813bc90a5 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -16,7 +16,7 @@ import threading import time from dataclasses import asdict, dataclass, replace from datetime import datetime -from functools import partial +from functools import partial, total_ordering from queue import Queue from typing import cast from collections.abc import Callable @@ -53,6 +53,7 @@ MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately MAX_AGE = 31 * 24 * 3600 # seconds WS_FRAME_SIZE = 4096 DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds +DEFAULT_UPLOAD_PRIORITY = 99 # higher number = lower priority NetworkType = log.DeviceState.NetworkType @@ -68,13 +69,15 @@ class UploadFile: url: str headers: dict[str, str] allow_cellular: bool + priority: int = DEFAULT_UPLOAD_PRIORITY @classmethod def from_dict(cls, d: dict) -> UploadFile: - return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False)) + return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False), d.get("priority", DEFAULT_UPLOAD_PRIORITY)) @dataclass +@total_ordering class UploadItem: path: str url: str @@ -85,17 +88,28 @@ class UploadItem: current: bool = False progress: float = 0 allow_cellular: bool = False + priority: int = DEFAULT_UPLOAD_PRIORITY @classmethod def from_dict(cls, d: dict) -> UploadItem: return cls(d["path"], d["url"], d["headers"], d["created_at"], d["id"], d["retry_count"], d["current"], - d["progress"], d["allow_cellular"]) + d["progress"], d["allow_cellular"], d["priority"]) + + def __lt__(self, other): + if not isinstance(other, UploadItem): + return NotImplemented + return self.priority < other.priority + + def __eq__(self, other): + if not isinstance(other, UploadItem): + return NotImplemented + return self.priority == other.priority dispatcher["echo"] = lambda s: s recv_queue: Queue[str] = queue.Queue() send_queue: Queue[str] = queue.Queue() -upload_queue: Queue[UploadItem] = queue.Queue() +upload_queue: Queue[UploadItem] = queue.PriorityQueue() low_priority_send_queue: Queue[str] = queue.Queue() log_recv_queue: Queue[str] = queue.Queue() cancelled_uploads: set[str] = set() @@ -398,6 +412,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo created_at=int(time.time() * 1000), id=None, allow_cellular=file.allow_cellular, + priority=file.priority, ) upload_id = hashlib.sha1(str(item).encode()).hexdigest() item = replace(item, id=upload_id) diff --git a/system/athena/tests/test_athenad.py b/system/athena/tests/test_athenad.py index e16e73a7ea..dd82325815 100644 --- a/system/athena/tests/test_athenad.py +++ b/system/athena/tests/test_athenad.py @@ -76,7 +76,7 @@ class TestAthenadMethods: self.params.put(k, v) self.params.put_bool("GsmMetered", True) - athenad.upload_queue = queue.Queue() + athenad.upload_queue = queue.PriorityQueue() athenad.cur_upload_items.clear() athenad.cancelled_uploads.clear() @@ -321,6 +321,26 @@ class TestAthenadMethods: assert len(items) == 1 assert items[0]['current'] + def test_list_upload_queue_priority(self): + priorities = (25, 50, 99, 75, 0) + + for i in priorities: + fn = f'qlog_{i}.zst' + fp = self._create_file(fn) + item = athenad.UploadItem( + path=fp, + url=f"http://localhost:44444/{fn}", + headers={}, + created_at=int(time.time()*1000), + id='', + allow_cellular=True, + priority=i + ) + athenad.upload_queue.put_nowait(item) + + for i in sorted(priorities): + assert athenad.upload_queue.get_nowait().priority == i + def test_list_upload_queue(self): item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True)