|
|
|
@ -99,9 +99,9 @@ send_queue: Queue[str] = queue.Queue() |
|
|
|
|
upload_queue: Queue[UploadItem] = queue.Queue() |
|
|
|
|
low_priority_send_queue: Queue[str] = queue.Queue() |
|
|
|
|
log_recv_queue: Queue[str] = queue.Queue() |
|
|
|
|
cancelled_uploads: set[str] = set() |
|
|
|
|
|
|
|
|
|
cur_upload_items: dict[int, UploadItem | None] = {} |
|
|
|
|
cur_upload_items_lock = threading.Lock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def strip_zst_extension(fn: str) -> str: |
|
|
|
@ -129,8 +129,9 @@ class UploadQueueCache: |
|
|
|
|
@staticmethod |
|
|
|
|
def cache(upload_queue: Queue[UploadItem]) -> None: |
|
|
|
|
try: |
|
|
|
|
queue: list[UploadItem | None] = list(upload_queue.queue) |
|
|
|
|
items = [asdict(i) for i in queue if i is not None and (i.id not in cancelled_uploads)] |
|
|
|
|
with upload_queue.mutex: |
|
|
|
|
items = [asdict(item) for item in upload_queue.queue] |
|
|
|
|
|
|
|
|
|
Params().put("AthenadUploadQueue", json.dumps(items)) |
|
|
|
|
except Exception: |
|
|
|
|
cloudlog.exception("athena.UploadQueueCache.cache.exception") |
|
|
|
@ -200,7 +201,8 @@ def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = Tr |
|
|
|
|
upload_queue.put_nowait(item) |
|
|
|
|
UploadQueueCache.cache(upload_queue) |
|
|
|
|
|
|
|
|
|
cur_upload_items[tid] = None |
|
|
|
|
with cur_upload_items_lock: |
|
|
|
|
cur_upload_items[tid] = None |
|
|
|
|
|
|
|
|
|
for _ in range(RETRY_DELAY): |
|
|
|
|
time.sleep(1) |
|
|
|
@ -219,7 +221,8 @@ def cb(sm, item, tid, end_event: threading.Event, sz: int, cur: int) -> None: |
|
|
|
|
if end_event.is_set(): |
|
|
|
|
raise AbortTransferException |
|
|
|
|
|
|
|
|
|
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) |
|
|
|
|
with cur_upload_items_lock: |
|
|
|
|
cur_upload_items[tid] = replace(item, progress=cur / sz if sz else 1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_handler(end_event: threading.Event) -> None: |
|
|
|
@ -227,14 +230,10 @@ def upload_handler(end_event: threading.Event) -> None: |
|
|
|
|
tid = threading.get_ident() |
|
|
|
|
|
|
|
|
|
while not end_event.is_set(): |
|
|
|
|
cur_upload_items[tid] = None |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) |
|
|
|
|
|
|
|
|
|
if item.id in cancelled_uploads: |
|
|
|
|
cancelled_uploads.remove(item.id) |
|
|
|
|
continue |
|
|
|
|
with cur_upload_items_lock: |
|
|
|
|
cur_upload_items[tid] = None |
|
|
|
|
cur_upload_items[tid] = item = replace(upload_queue.get(timeout=1), current=True) |
|
|
|
|
|
|
|
|
|
# Remove item if too old |
|
|
|
|
age = datetime.now() - datetime.fromtimestamp(item.created_at / 1000) |
|
|
|
@ -413,8 +412,13 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo |
|
|
|
|
|
|
|
|
|
@dispatcher.add_method |
|
|
|
|
def listUploadQueue() -> list[UploadItemDict]: |
|
|
|
|
items = list(upload_queue.queue) + list(cur_upload_items.values()) |
|
|
|
|
return [asdict(i) for i in items if (i is not None) and (i.id not in cancelled_uploads)] |
|
|
|
|
with upload_queue.mutex: |
|
|
|
|
items = list(upload_queue.queue) |
|
|
|
|
|
|
|
|
|
with cur_upload_items_lock: |
|
|
|
|
items += list(cur_upload_items.values()) |
|
|
|
|
|
|
|
|
|
return [asdict(item) for item in items] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dispatcher.add_method |
|
|
|
@ -422,13 +426,14 @@ def cancelUpload(upload_id: str | list[str]) -> dict[str, int | str]: |
|
|
|
|
if not isinstance(upload_id, list): |
|
|
|
|
upload_id = [upload_id] |
|
|
|
|
|
|
|
|
|
uploading_ids = {item.id for item in list(upload_queue.queue)} |
|
|
|
|
cancelled_ids = uploading_ids.intersection(upload_id) |
|
|
|
|
if len(cancelled_ids) == 0: |
|
|
|
|
return {"success": 0, "error": "not found"} |
|
|
|
|
with upload_queue.mutex: |
|
|
|
|
remaining_items = [item for item in upload_queue.queue if item.id not in upload_id] |
|
|
|
|
if len(remaining_items) == len(upload_queue.queue): |
|
|
|
|
return {"success": 0, "error": "not found"} |
|
|
|
|
|
|
|
|
|
cancelled_uploads.update(cancelled_ids) |
|
|
|
|
return {"success": 1} |
|
|
|
|
upload_queue.queue.clear() |
|
|
|
|
upload_queue.queue.extend(remaining_items) |
|
|
|
|
return {"success": 1} |
|
|
|
|
|
|
|
|
|
@dispatcher.add_method |
|
|
|
|
def setRouteViewed(route: str) -> dict[str, int | str]: |
|
|
|
|