diff --git a/selfdrive/athena/athenad.py b/selfdrive/athena/athenad.py index dc5778f948..27fcc41eb8 100755 --- a/selfdrive/athena/athenad.py +++ b/selfdrive/athena/athenad.py @@ -22,6 +22,7 @@ from jsonrpc import JSONRPCResponseManager, dispatcher from websocket import ABNF, WebSocketTimeoutException, WebSocketException, create_connection import cereal.messaging as messaging +from cereal import log from cereal.services import service_list from common.api import Api from common.file_helpers import CallbackReader @@ -47,6 +48,8 @@ RETRY_DELAY = 10 # seconds MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately WS_FRAME_SIZE = 4096 +NetworkType = log.DeviceState.NetworkType + dispatcher["echo"] = lambda s: s recv_queue: Any = queue.Queue() send_queue: Any = queue.Queue() @@ -54,10 +57,13 @@ upload_queue: Any = queue.Queue() low_priority_send_queue: Any = queue.Queue() log_recv_queue: Any = queue.Queue() cancelled_uploads: Any = set() -UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0)) +UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress', 'allow_cellular'], defaults=(0, False, 0, False)) cur_upload_items: Dict[int, Any] = {} +class AbortTransferException(Exception): + pass + class UploadQueueCache(): params = Params() @@ -129,11 +135,13 @@ def jsonrpc_handler(end_event): send_queue.put_nowait(json.dumps({"error": str(e)})) -def retry_upload(tid: int, end_event: threading.Event) -> None: +def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = True) -> None: if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT: item = cur_upload_items[tid] + new_retry_count = item.retry_count + 1 if increase_count else item.retry_count + item = item._replace( - retry_count=item.retry_count + 1, + retry_count=new_retry_count, progress=0, current=False ) @@ -149,6 +157,7 @@ def retry_upload(tid: int, end_event: threading.Event) -> None: def upload_handler(end_event: threading.Event) -> None: + sm = messaging.SubMaster(['deviceState']) tid = threading.get_ident() while not end_event.is_set(): @@ -161,8 +170,23 @@ def upload_handler(end_event: threading.Event) -> None: cancelled_uploads.remove(cur_upload_items[tid].id) continue + # TODO: remove item if too old + + # Check if uploading over cell is allowed + sm.update(0) + cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet] + if cell and (not cur_upload_items[tid].allow_cellular): + retry_upload(tid, end_event, False) + continue + try: def cb(sz, cur): + # Abort transfer if connection changed to cell after starting upload + sm.update(0) + cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet] + if cell and (not cur_upload_items[tid].allow_cellular): + raise AbortTransferException + cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1) response = _do_upload(cur_upload_items[tid], cb) @@ -172,8 +196,10 @@ def upload_handler(end_event: threading.Event) -> None: UploadQueueCache.cache(upload_queue) except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e: cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}") - retry_upload(tid, end_event) + except AbortTransferException: + cloudlog.warning(f"athena.upload_handler.abort {cur_upload_items[tid]}") + retry_upload(tid, end_event, False) except queue.Empty: pass @@ -274,15 +300,20 @@ def reboot(): @dispatcher.add_method def uploadFileToUrl(fn, url, headers): - return uploadFilesToUrls([[fn, url, headers]]) + return uploadFilesToUrls([{ + "fn": fn, + "url": url, + "headers": headers, + }]) @dispatcher.add_method def uploadFilesToUrls(files_data): items = [] failed = [] - for fn, url, headers in files_data: - if len(fn) == 0 or fn[0] == '/' or '..' in fn: + for file in files_data: + fn = file.get('fn', '') + if len(fn) == 0 or fn[0] == '/' or '..' in fn or 'url' not in file: failed.append(fn) continue path = os.path.join(ROOT, fn) @@ -290,7 +321,14 @@ def uploadFilesToUrls(files_data): failed.append(fn) continue - item = UploadItem(path=path, url=url, headers=headers, created_at=int(time.time() * 1000), id=None) + item = UploadItem( + path=path, + url=file['url'], + headers=file.get('headers', {}), + created_at=int(time.time() * 1000), + id=None, + allow_cellular=file.get('allow_cellular', False), + ) upload_id = hashlib.sha1(str(item).encode()).hexdigest() item = item._replace(id=upload_id) upload_queue.put_nowait(item) diff --git a/selfdrive/athena/tests/test_athenad.py b/selfdrive/athena/tests/test_athenad.py index fb5936dde1..d59876f7ab 100755 --- a/selfdrive/athena/tests/test_athenad.py +++ b/selfdrive/athena/tests/test_athenad.py @@ -150,7 +150,7 @@ class TestAthenadMethods(unittest.TestCase): def test_upload_handler(self, host): fn = os.path.join(athenad.ROOT, 'qlog.bz2') Path(fn).touch() - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) end_event = threading.Event() thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) @@ -173,7 +173,7 @@ class TestAthenadMethods(unittest.TestCase): mock_put.return_value.status_code = status fn = os.path.join(athenad.ROOT, 'qlog.bz2') Path(fn).touch() - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) end_event = threading.Event() thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) @@ -187,7 +187,7 @@ class TestAthenadMethods(unittest.TestCase): self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) finally: end_event.set() - + if retry: self.assertEqual(athenad.upload_queue.get().retry_count, 1) @@ -195,7 +195,7 @@ class TestAthenadMethods(unittest.TestCase): """When an upload times out or fails to connect it should be placed back in the queue""" fn = os.path.join(athenad.ROOT, 'qlog.bz2') Path(fn).touch() - item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT) end_event = threading.Event() @@ -222,7 +222,7 @@ class TestAthenadMethods(unittest.TestCase): end_event.set() def test_cancelUpload(self): - item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') + item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True) athenad.upload_queue.put_nowait(item) dispatcher["cancelUpload"](item.id) @@ -248,7 +248,7 @@ class TestAthenadMethods(unittest.TestCase): def test_listUploadQueueCurrent(self, host): fn = os.path.join(athenad.ROOT, 'qlog.bz2') Path(fn).touch() - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) end_event = threading.Event() thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) @@ -266,7 +266,7 @@ class TestAthenadMethods(unittest.TestCase): end_event.set() def test_listUploadQueue(self): - item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') + item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True) athenad.upload_queue.put_nowait(item) items = dispatcher["listUploadQueue"]()