athena: persist upload queue to disk (#22659)

* revert submodules

* lowercase

* addressed comments

* add test for cancelled uploads

* formatting

* catch all exceptions

* handle empty param

Co-authored-by: Willem Melching <willem.melching@gmail.com>
pull/23218/head
Devin Leamy 3 years ago committed by GitHub
parent b745a14ff7
commit 8cb83b29a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      selfdrive/athena/athenad.py
  2. 22
      selfdrive/athena/tests/helpers.py
  3. 25
      selfdrive/athena/tests/test_athenad.py
  4. 1
      selfdrive/common/params.cc

@ -56,6 +56,28 @@ UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', '
cur_upload_items = {}
class UploadQueueCache():
params = Params()
@staticmethod
def initialize(upload_queue):
try:
upload_queue_json = UploadQueueCache.params.get("AthenadUploadQueue")
if upload_queue_json is not None:
for item in json.loads(upload_queue_json):
upload_queue.put(UploadItem(**item))
except Exception:
cloudlog.exception("athena.UploadQueueCache.initialize.exception")
@staticmethod
def cache(upload_queue):
try:
items = [i._asdict() for i in upload_queue.queue if i.id not in cancelled_uploads]
UploadQueueCache.params.put("AthenadUploadQueue", json.dumps(items))
except Exception:
cloudlog.exception("athena.UploadQueueCache.cache.exception")
def handle_long_poll(ws):
end_event = threading.Event()
@ -111,6 +133,7 @@ def upload_handler(end_event):
try:
cur_upload_items[tid] = upload_queue.get(timeout=1)._replace(current=True)
if cur_upload_items[tid].id in cancelled_uploads:
cancelled_uploads.remove(cur_upload_items[tid].id)
continue
@ -120,6 +143,7 @@ def upload_handler(end_event):
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1)
_do_upload(cur_upload_items[tid], cb)
UploadQueueCache.cache(upload_queue)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}")
@ -131,6 +155,8 @@ def upload_handler(end_event):
current=False
)
upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)
cur_upload_items[tid] = None
for _ in range(RETRY_DELAY):
@ -248,6 +274,7 @@ def uploadFileToUrl(fn, url, headers):
item = item._replace(id=upload_id)
upload_queue.put_nowait(item)
UploadQueueCache.cache(upload_queue)
return {"enqueued": 1, "item": item._asdict()}
@ -280,8 +307,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):
cloudlog.debug("athena.startLocalProxy.starting")
params = Params()
dongle_id = params.get("DongleId").decode('utf8')
dongle_id = Params().get("DongleId").decode('utf8')
identity_token = Api(dongle_id).get_token()
ws = create_connection(remote_ws_uri,
cookie="jwt=" + identity_token,
@ -525,6 +551,7 @@ def backoff(retries):
def main():
params = Params()
dongle_id = params.get("DongleId", encoding='utf-8')
UploadQueueCache.initialize(upload_queue)
ws_uri = ATHENA_HOST + "/ws/v2/" + dongle_id
api = Api(dongle_id)

@ -50,18 +50,28 @@ class MockApi():
class MockParams():
def __init__(self):
self.params = {
"DongleId": b"0000000000000000",
"GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private" # noqa: E501
}
default_params = {
"DongleId": b"0000000000000000",
"GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private", # noqa: E501
"AthenadUploadQueue": '[]'
}
params = default_params.copy()
@staticmethod
def restore_defaults():
MockParams.params = MockParams.default_params.copy()
def get(self, k, encoding=None):
ret = self.params.get(k)
ret = MockParams.params.get(k)
if ret is not None and encoding is not None:
ret = ret.decode(encoding)
return ret
def put(self, k, v):
if k not in MockParams.params:
raise KeyError(f"key: {k} not in MockParams")
MockParams.params[k] = v
class MockWebsocket():
def __init__(self, recv_queue, send_queue):

@ -21,19 +21,22 @@ from selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher
from selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server
from cereal import messaging
class TestAthenadMethods(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.SOCKET_PORT = 45454
athenad.Params = MockParams
athenad.ROOT = tempfile.mkdtemp()
athenad.SWAGLOG_DIR = swaglog.SWAGLOG_DIR = tempfile.mkdtemp()
athenad.Params = MockParams
athenad.Api = MockApi
athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT])
def setUp(self):
MockParams.restore_defaults()
athenad.upload_queue = queue.Queue()
athenad.cur_upload_items.clear()
athenad.cancelled_uploads.clear()
for i in os.listdir(athenad.ROOT):
p = os.path.join(athenad.ROOT, i)
@ -249,6 +252,26 @@ class TestAthenadMethods(unittest.TestCase):
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 0)
def test_upload_queue_persistence(self):
item1 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id1')
item2 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id2')
athenad.upload_queue.put_nowait(item1)
athenad.upload_queue.put_nowait(item2)
# Ensure cancelled items are not persisted
athenad.cancelled_uploads.add(item2.id)
# serialize item
athenad.UploadQueueCache.cache(athenad.upload_queue)
# deserialize item
athenad.upload_queue.queue.clear()
athenad.UploadQueueCache.initialize(athenad.upload_queue)
self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertDictEqual(athenad.upload_queue.queue[-1]._asdict(), item1._asdict())
@mock.patch('selfdrive.athena.athenad.create_connection')
def test_startLocalProxy(self, mock_create_connection):
end_event = threading.Event()

@ -85,6 +85,7 @@ private:
std::unordered_map<std::string, uint32_t> keys = {
{"AccessToken", CLEAR_ON_MANAGER_START | DONT_LOG},
{"AthenadPid", PERSISTENT},
{"AthenadUploadQueue", PERSISTENT},
{"BootedOnroad", CLEAR_ON_MANAGER_START | CLEAR_ON_IGNITION_OFF},
{"CalibrationParams", PERSISTENT},
{"CarBatteryCapacity", PERSISTENT},

Loading…
Cancel
Save