From e785026a98a8745057a560742fa030c4df14c55a Mon Sep 17 00:00:00 2001 From: Trey Moen <50057480+greatgitsby@users.noreply.github.com> Date: Wed, 19 Mar 2025 12:22:54 -0700 Subject: [PATCH] feat(athenad): priority uploads (#34856) Today, Firehose uploads and user-requested uploads from Connect are treated the same. This is not ideal behavior if the user wants to upload routes immediately for a bug report and the queue is full of uploads for Firehose. The workaround is to clear the queue and retry the upload from Connect. This PR adds an optional `priority` to requested file uploads in `athenad`. By default, all requests are marked `Low`. However, if the caller wishes to mark their uploads as "more important," then the upload queue will prioritize those requests when uploading. The only caveat to this PR is that we won't reorder files currently being uploaded. Most connections are quick enough to finish uploading max 4 files before polling the new high-priority items in the queue. Closes https://github.com/commaai/openpilot/issues/34836 **Verification** Added test case to insert upload tasks with differing priorities. Polling the queue produces items in the correct order (smallest # to largest). PR to mark Connect uploads as high priority: https://github.com/commaai/connect/pull/557 --------- Co-authored-by: Cameron Clough --- system/athena/athenad.py | 23 +++++++++++++++++++---- system/athena/tests/test_athenad.py | 22 +++++++++++++++++++++- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index b36bdb103d..5813bc90a5 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -16,7 +16,7 @@ import threading import time from dataclasses import asdict, dataclass, replace from datetime import datetime -from functools import partial +from functools import partial, total_ordering from queue import Queue from typing import cast from collections.abc import Callable @@ -53,6 +53,7 @@ MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately MAX_AGE = 31 * 24 * 3600 # seconds WS_FRAME_SIZE = 4096 DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds +DEFAULT_UPLOAD_PRIORITY = 99 # higher number = lower priority NetworkType = log.DeviceState.NetworkType @@ -68,13 +69,15 @@ class UploadFile: url: str headers: dict[str, str] allow_cellular: bool + priority: int = DEFAULT_UPLOAD_PRIORITY @classmethod def from_dict(cls, d: dict) -> UploadFile: - return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False)) + return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False), d.get("priority", DEFAULT_UPLOAD_PRIORITY)) @dataclass +@total_ordering class UploadItem: path: str url: str @@ -85,17 +88,28 @@ class UploadItem: current: bool = False progress: float = 0 allow_cellular: bool = False + priority: int = DEFAULT_UPLOAD_PRIORITY @classmethod def from_dict(cls, d: dict) -> UploadItem: return cls(d["path"], d["url"], d["headers"], d["created_at"], d["id"], d["retry_count"], d["current"], - d["progress"], d["allow_cellular"]) + d["progress"], d["allow_cellular"], d["priority"]) + + def __lt__(self, other): + if not isinstance(other, UploadItem): + return NotImplemented + return self.priority < other.priority + + def __eq__(self, other): + if not isinstance(other, UploadItem): + return NotImplemented + return self.priority == other.priority dispatcher["echo"] = lambda s: s recv_queue: Queue[str] = queue.Queue() send_queue: Queue[str] = queue.Queue() -upload_queue: Queue[UploadItem] = queue.Queue() +upload_queue: Queue[UploadItem] = queue.PriorityQueue() low_priority_send_queue: Queue[str] = queue.Queue() log_recv_queue: Queue[str] = queue.Queue() cancelled_uploads: set[str] = set() @@ -398,6 +412,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo created_at=int(time.time() * 1000), id=None, allow_cellular=file.allow_cellular, + priority=file.priority, ) upload_id = hashlib.sha1(str(item).encode()).hexdigest() item = replace(item, id=upload_id) diff --git a/system/athena/tests/test_athenad.py b/system/athena/tests/test_athenad.py index e16e73a7ea..dd82325815 100644 --- a/system/athena/tests/test_athenad.py +++ b/system/athena/tests/test_athenad.py @@ -76,7 +76,7 @@ class TestAthenadMethods: self.params.put(k, v) self.params.put_bool("GsmMetered", True) - athenad.upload_queue = queue.Queue() + athenad.upload_queue = queue.PriorityQueue() athenad.cur_upload_items.clear() athenad.cancelled_uploads.clear() @@ -321,6 +321,26 @@ class TestAthenadMethods: assert len(items) == 1 assert items[0]['current'] + def test_list_upload_queue_priority(self): + priorities = (25, 50, 99, 75, 0) + + for i in priorities: + fn = f'qlog_{i}.zst' + fp = self._create_file(fn) + item = athenad.UploadItem( + path=fp, + url=f"http://localhost:44444/{fn}", + headers={}, + created_at=int(time.time()*1000), + id='', + allow_cellular=True, + priority=i + ) + athenad.upload_queue.put_nowait(item) + + for i in sorted(priorities): + assert athenad.upload_queue.get_nowait().priority == i + def test_list_upload_queue(self): item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True)