feat(athenad): priority uploads (#34856)

Today, Firehose uploads and user-requested uploads from Connect are treated the same. This is not ideal behavior if the user wants to upload routes immediately for a bug report and the queue is full of uploads for Firehose. The workaround is to clear the queue and retry the upload from Connect.

This PR adds an optional `priority` to requested file uploads in `athenad`. By default, all requests are marked `Low`. However, if the caller wishes to mark their uploads as "more important," then the upload queue will prioritize those requests when uploading.

The only caveat to this PR is that we won't reorder files currently being uploaded. Most connections are quick enough to finish uploading max 4 files before polling the new high-priority items in the queue.

Closes https://github.com/commaai/openpilot/issues/34836

**Verification**

Added test case to insert upload tasks with differing priorities. Polling the queue produces items in the correct order (smallest # to largest).

PR to mark Connect uploads as high priority: https://github.com/commaai/connect/pull/557

---------

Co-authored-by: Cameron Clough <cameronjclough@gmail.com>
pull/34908/head
Trey Moen 1 month ago committed by GitHub
parent 2fca845153
commit e785026a98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 23
      system/athena/athenad.py
  2. 22
      system/athena/tests/test_athenad.py

@ -16,7 +16,7 @@ import threading
import time import time
from dataclasses import asdict, dataclass, replace from dataclasses import asdict, dataclass, replace
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial, total_ordering
from queue import Queue from queue import Queue
from typing import cast from typing import cast
from collections.abc import Callable from collections.abc import Callable
@ -53,6 +53,7 @@ MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
MAX_AGE = 31 * 24 * 3600 # seconds MAX_AGE = 31 * 24 * 3600 # seconds
WS_FRAME_SIZE = 4096 WS_FRAME_SIZE = 4096
DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds DEVICE_STATE_UPDATE_INTERVAL = 1.0 # in seconds
DEFAULT_UPLOAD_PRIORITY = 99 # higher number = lower priority
NetworkType = log.DeviceState.NetworkType NetworkType = log.DeviceState.NetworkType
@ -68,13 +69,15 @@ class UploadFile:
url: str url: str
headers: dict[str, str] headers: dict[str, str]
allow_cellular: bool allow_cellular: bool
priority: int = DEFAULT_UPLOAD_PRIORITY
@classmethod @classmethod
def from_dict(cls, d: dict) -> UploadFile: def from_dict(cls, d: dict) -> UploadFile:
return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False)) return cls(d.get("fn", ""), d.get("url", ""), d.get("headers", {}), d.get("allow_cellular", False), d.get("priority", DEFAULT_UPLOAD_PRIORITY))
@dataclass @dataclass
@total_ordering
class UploadItem: class UploadItem:
path: str path: str
url: str url: str
@ -85,17 +88,28 @@ class UploadItem:
current: bool = False current: bool = False
progress: float = 0 progress: float = 0
allow_cellular: bool = False allow_cellular: bool = False
priority: int = DEFAULT_UPLOAD_PRIORITY
@classmethod @classmethod
def from_dict(cls, d: dict) -> UploadItem: def from_dict(cls, d: dict) -> UploadItem:
return cls(d["path"], d["url"], d["headers"], d["created_at"], d["id"], d["retry_count"], d["current"], return cls(d["path"], d["url"], d["headers"], d["created_at"], d["id"], d["retry_count"], d["current"],
d["progress"], d["allow_cellular"]) d["progress"], d["allow_cellular"], d["priority"])
def __lt__(self, other):
if not isinstance(other, UploadItem):
return NotImplemented
return self.priority < other.priority
def __eq__(self, other):
if not isinstance(other, UploadItem):
return NotImplemented
return self.priority == other.priority
dispatcher["echo"] = lambda s: s dispatcher["echo"] = lambda s: s
recv_queue: Queue[str] = queue.Queue() recv_queue: Queue[str] = queue.Queue()
send_queue: Queue[str] = queue.Queue() send_queue: Queue[str] = queue.Queue()
upload_queue: Queue[UploadItem] = queue.Queue() upload_queue: Queue[UploadItem] = queue.PriorityQueue()
low_priority_send_queue: Queue[str] = queue.Queue() low_priority_send_queue: Queue[str] = queue.Queue()
log_recv_queue: Queue[str] = queue.Queue() log_recv_queue: Queue[str] = queue.Queue()
cancelled_uploads: set[str] = set() cancelled_uploads: set[str] = set()
@ -398,6 +412,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
created_at=int(time.time() * 1000), created_at=int(time.time() * 1000),
id=None, id=None,
allow_cellular=file.allow_cellular, allow_cellular=file.allow_cellular,
priority=file.priority,
) )
upload_id = hashlib.sha1(str(item).encode()).hexdigest() upload_id = hashlib.sha1(str(item).encode()).hexdigest()
item = replace(item, id=upload_id) item = replace(item, id=upload_id)

@ -76,7 +76,7 @@ class TestAthenadMethods:
self.params.put(k, v) self.params.put(k, v)
self.params.put_bool("GsmMetered", True) self.params.put_bool("GsmMetered", True)
athenad.upload_queue = queue.Queue() athenad.upload_queue = queue.PriorityQueue()
athenad.cur_upload_items.clear() athenad.cur_upload_items.clear()
athenad.cancelled_uploads.clear() athenad.cancelled_uploads.clear()
@ -321,6 +321,26 @@ class TestAthenadMethods:
assert len(items) == 1 assert len(items) == 1
assert items[0]['current'] assert items[0]['current']
def test_list_upload_queue_priority(self):
priorities = (25, 50, 99, 75, 0)
for i in priorities:
fn = f'qlog_{i}.zst'
fp = self._create_file(fn)
item = athenad.UploadItem(
path=fp,
url=f"http://localhost:44444/{fn}",
headers={},
created_at=int(time.time()*1000),
id='',
allow_cellular=True,
priority=i
)
athenad.upload_queue.put_nowait(item)
for i in sorted(priorities):
assert athenad.upload_queue.get_nowait().priority == i
def test_list_upload_queue(self): def test_list_upload_queue(self):
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True) created_at=int(time.time()*1000), id='id', allow_cellular=True)

Loading…
Cancel
Save