diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 78285f018e..55b9476986 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -99,9 +99,9 @@ send_queue: Queue[str] = queue.Queue() upload_queue: Queue[UploadItem] = queue.Queue() low_priority_send_queue: Queue[str] = queue.Queue() log_recv_queue: Queue[str] = queue.Queue() -cancelled_uploads: set[str] = set() cur_upload_items: dict[int, UploadItem | None] = {} +cur_upload_items_lock = threading.Lock() def strip_zst_extension(fn: str) -> str: @@ -129,8 +129,9 @@ class UploadQueueCache: @staticmethod def cache(upload_queue: Queue[UploadItem]) -> None: try: - queue: list[UploadItem | None] = list(upload_queue.queue) - items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)] + with upload_queue.mutex: + items = [asdict(item) for item in upload_queue.queue] + Params().put("AthenadUploadQueue", json.dumps(items)) except Exception: cloudlog.exception("athena.UploadQueueCache.cache.exception") @@ -200,7 +201,8 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr upload_queue.put_nowait(item) UploadQueueCache.cache(upload_queue) - cur_upload_items[tid] = None + with cur_upload_items_lock: + cur_upload_items[tid] = None for _ in range(RETRY_DELAY): time.sleep(1) @@ -219,7 +221,8 @@ def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None: if end_event.is_set(): raise AbortTransferException - cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) + with cur_upload_items_lock: + cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) def upload_handler(end_event: threading.Event) -> None: @@ -227,14 +230,10 @@ def upload_handler(end_event: threading.Event) -> None: tid = threading.get_ident() while not end_event.is_set(): - cur_upload_items[tid] = None - try: - cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) - - if item.id in cancelled_uploads: - cancelled_uploads.remove(item.id) - continue + with cur_upload_items_lock: + cur_upload_items[tid] = None + cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) # Remove item if too old age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000) @@ -413,8 +412,13 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo @dispatcher.add_method def listUploadQueue() -> list[UploadItemDict]: - items = list(upload_queue.queue) + list(cur_upload_items.values()) - return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)] + with upload_queue.mutex: + items = list(upload_queue.queue) + + with cur_upload_items_lock: + items += list(cur_upload_items.values()) + + return [asdict(item) for item in items] @dispatcher.add_method @@ -422,13 +426,14 @@ def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]: if not isinstance(upload_id, list): upload_id = [upload_id] - uploading_ids = {item.id for item in list(upload_queue.queue)} - cancelled_ids = uploading_ids.intersection(upload_id) - if len(cancelled_ids) == 0: - return {"success": 0, "error": "not found"} + with upload_queue.mutex: + remaining_items = [item for item in upload_queue.queue if item.id not in upload_id] + if len(remaining_items) == len(upload_queue.queue): + return {"success": 0, "error": "not found"} - cancelled_uploads.update(cancelled_ids) - return {"success": 1} + upload_queue.queue.clear() + upload_queue.queue.extend(remaining_items) + return {"success": 1} @dispatcher.add_method def setRouteViewed(route: str) -> dict[str, int | str]: diff --git a/system/athena/tests/test_athenad.py b/system/athena/tests/test_athenad.py index a6bfc68930..cee37064b9 100644 --- a/system/athena/tests/test_athenad.py +++ b/system/athena/tests/test_athenad.py @@ -78,7 +78,6 @@ class TestAthenadMethods: athenad.upload_queue = queue.Queue() athenad.cur_upload_items.clear() - athenad.cancelled_uploads.clear() for i in os.listdir(Paths.log_root()): p = os.path.join(Paths.log_root(), i) @@ -282,13 +281,10 @@ class TestAthenadMethods: athenad.upload_queue.put_nowait(item) dispatcher["cancelUpload"](item.id) - assert item.id in athenad.cancelled_uploads - self._wait_for_upload() time.sleep(0.1) assert athenad.upload_queue.qsize() == 0 - assert len(athenad.cancelled_uploads) == 0 @with_upload_handler def test_cancel_expiry(self): @@ -331,7 +327,7 @@ class TestAthenadMethods: assert items[0] == asdict(item) assert not items[0]['current'] - athenad.cancelled_uploads.add(item.id) + dispatcher["cancelUpload"](item.id) items = dispatcher["listUploadQueue"]() assert len(items) == 0 @@ -343,7 +339,7 @@ class TestAthenadMethods: athenad.upload_queue.put_nowait(item2) # Ensure canceled items are not persisted - athenad.cancelled_uploads.add(item2.id) + dispatcher["cancelUpload"](item2.id) # serialize item athenad.UploadQueueCache.cache(athenad.upload_queue)