diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index d196926611..adfa8673f4 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -56,6 +56,28 @@ UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', ' cur_upload_items = {} +class UploadQueueCache(): + params = Params() + + @staticmethod + def initialize(upload_queue): + try: + upload_queue_json = UploadQueueCache.params.get("AthenadUploadQueue") + if upload_queue_json is not None: + for item in json.loads(upload_queue_json): + upload_queue.put(UploadItem(**item)) + except Exception: + cloudlog.exception("athena.UploadQueueCache.initialize.exception") + + @staticmethod + def cache(upload_queue): + try: + items = [i._asdict() for i in upload_queue.queue if i.id not in cancelled_uploads] + UploadQueueCache.params.put("AthenadUploadQueue", json.dumps(items)) + except Exception: + cloudlog.exception("athena.UploadQueueCache.cache.exception") + + def handle_long_poll(ws): end_event = threading.Event() @@ -111,6 +133,7 @@ def upload_handler(end_event): try: cur_upload_items[tid] = upload_queue.get(timeout=1)._replace(current=True) + if cur_upload_items[tid].id in cancelled_uploads: cancelled_uploads.remove(cur_upload_items[tid].id) continue @@ -120,6 +143,7 @@ def upload_handler(end_event): cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1) _do_upload(cur_upload_items[tid], cb) + UploadQueueCache.cache(upload_queue) except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e: cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}") @@ -131,6 +155,8 @@ def upload_handler(end_event): current=False ) upload_queue.put_nowait(item) + UploadQueueCache.cache(upload_queue) + cur_upload_items[tid] = None for _ in range(RETRY_DELAY): @@ -248,6 +274,7 @@ def uploadFileToUrl(fn, url, headers): item = item._replace(id=upload_id) upload_queue.put_nowait(item) + UploadQueueCache.cache(upload_queue) return {"enqueued": 1, "item": item._asdict()} @@ -280,8 +307,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port): cloudlog.debug("athena.startLocalProxy.starting") - params = Params() - dongle_id = params.get("DongleId").decode('utf8') + dongle_id = Params().get("DongleId").decode('utf8') identity_token = Api(dongle_id).get_token() ws = create_connection(remote_ws_uri, cookie="jwt=" + identity_token, @@ -525,6 +551,7 @@ def backoff(retries): def main(): params = Params() dongle_id = params.get("DongleId", encoding='utf-8') + UploadQueueCache.initialize(upload_queue) ws_uri = ATHENA_HOST + "/ws/v2/" + dongle_id api = Api(dongle_id) diff --git a/selfdrive/athena/tests/helpers.py b/selfdrive/athena/tests/helpers.py index 831b668297..26655b4a37 100644 --- a/selfdrive/athena/tests/helpers.py +++ b/selfdrive/athena/tests/helpers.py @@ -50,18 +50,28 @@ class MockApi(): class MockParams(): - def __init__(self): - self.params = { - "DongleId": b"0000000000000000", - "GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private" # noqa: E501 - } + default_params = { + "DongleId": b"0000000000000000", + "GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private", # noqa: E501 + "AthenadUploadQueue": '[]' + } + params = default_params.copy() + + @staticmethod + def restore_defaults(): + MockParams.params = MockParams.default_params.copy() def get(self, k, encoding=None): - ret = self.params.get(k) + ret = MockParams.params.get(k) if ret is not None and encoding is not None: ret = ret.decode(encoding) return ret + def put(self, k, v): + if k not in MockParams.params: + raise KeyError(f"key: {k} not in MockParams") + MockParams.params[k] = v + class MockWebsocket(): def __init__(self, recv_queue, send_queue): diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index 5bf34d1bc7..d96e32abc2 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -21,19 +21,22 @@ from selfdrive.athena.athenad import MAX_RETRY_COUNT, dispatcher from selfdrive.athena.tests.helpers import MockWebsocket, MockParams, MockApi, EchoSocket, with_http_server from cereal import messaging + class TestAthenadMethods(unittest.TestCase): @classmethod def setUpClass(cls): cls.SOCKET_PORT = 45454 + athenad.Params = MockParams athenad.ROOT = tempfile.mkdtemp() athenad.SWAGLOG_DIR = swaglog.SWAGLOG_DIR = tempfile.mkdtemp() - athenad.Params = MockParams athenad.Api = MockApi athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT]) def setUp(self): + MockParams.restore_defaults() athenad.upload_queue = queue.Queue() athenad.cur_upload_items.clear() + athenad.cancelled_uploads.clear() for i in os.listdir(athenad.ROOT): p = os.path.join(athenad.ROOT, i) @@ -249,6 +252,26 @@ class TestAthenadMethods(unittest.TestCase): items = dispatcher["listUploadQueue"]() self.assertEqual(len(items), 0) + def test_upload_queue_persistence(self): + item1 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id1') + item2 = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id2') + + athenad.upload_queue.put_nowait(item1) + athenad.upload_queue.put_nowait(item2) + + # Ensure cancelled items are not persisted + athenad.cancelled_uploads.add(item2.id) + + # serialize item + athenad.UploadQueueCache.cache(athenad.upload_queue) + + # deserialize item + athenad.upload_queue.queue.clear() + athenad.UploadQueueCache.initialize(athenad.upload_queue) + + self.assertEqual(athenad.upload_queue.qsize(), 1) + self.assertDictEqual(athenad.upload_queue.queue[-1]._asdict(), item1._asdict()) + @mock.patch('selfdrive.athena.athenad.create_connection') def test_startLocalProxy(self, mock_create_connection): end_event = threading.Event() diff --git a/selfdrive/common/params.cc b/selfdrive/common/params.cc index a405d21609..f0508cace0 100644 --- a/selfdrive/common/params.cc +++ b/selfdrive/common/params.cc @@ -85,6 +85,7 @@ private: std::unordered_map keys = { {"AccessToken", CLEAR_ON_MANAGER_START | DONT_LOG}, {"AthenadPid", PERSISTENT}, + {"AthenadUploadQueue", PERSISTENT}, {"BootedOnroad", CLEAR_ON_MANAGER_START | CLEAR_ON_IGNITION_OFF}, {"CalibrationParams", PERSISTENT}, {"CarBatteryCapacity", PERSISTENT},