athena: specify network type for file uploads (#23687)

* athena: specify network type for file uploads

* add comment

* catch abort transfer

* fix tests

* put athena upload args in dict

* fix defaults

Co-authored-by: Joost Wooning <jwooning@gmail.com>
pull/23735/head
Willem Melching 3 years ago committed by GitHub
parent 8effa2d878
commit e9153fdb4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 54
      selfdrive/athena/athenad.py
  2. 14
      selfdrive/athena/tests/test_athenad.py

@ -22,6 +22,7 @@ from jsonrpc import JSONRPCResponseManager, dispatcher
from websocket import ABNF, WebSocketTimeoutException, WebSocketException, create_connection from websocket import ABNF, WebSocketTimeoutException, WebSocketException, create_connection
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import log
from cereal.services import service_list from cereal.services import service_list
from common.api import Api from common.api import Api
from common.file_helpers import CallbackReader from common.file_helpers import CallbackReader
@ -47,6 +48,8 @@ RETRY_DELAY = 10 # seconds
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
WS_FRAME_SIZE = 4096 WS_FRAME_SIZE = 4096
NetworkType = log.DeviceState.NetworkType
dispatcher["echo"] = lambda s: s dispatcher["echo"] = lambda s: s
recv_queue: Any = queue.Queue() recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue() send_queue: Any = queue.Queue()
@ -54,10 +57,13 @@ upload_queue: Any = queue.Queue()
low_priority_send_queue: Any = queue.Queue() low_priority_send_queue: Any = queue.Queue()
log_recv_queue: Any = queue.Queue() log_recv_queue: Any = queue.Queue()
cancelled_uploads: Any = set() cancelled_uploads: Any = set()
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0)) UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress', 'allow_cellular'], defaults=(0, False, 0, False))
cur_upload_items: Dict[int, Any] = {} cur_upload_items: Dict[int, Any] = {}
class AbortTransferException(Exception):
pass
class UploadQueueCache(): class UploadQueueCache():
params = Params() params = Params()
@ -129,11 +135,13 @@ def jsonrpc_handler(end_event):
send_queue.put_nowait(json.dumps({"error": str(e)})) send_queue.put_nowait(json.dumps({"error": str(e)}))
def retry_upload(tid: int, end_event: threading.Event) -> None: def retry_upload(tid: int, end_event: threading.Event, increase_count: bool = True) -> None:
if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT: if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT:
item = cur_upload_items[tid] item = cur_upload_items[tid]
new_retry_count = item.retry_count + 1 if increase_count else item.retry_count
item = item._replace( item = item._replace(
retry_count=item.retry_count + 1, retry_count=new_retry_count,
progress=0, progress=0,
current=False current=False
) )
@ -149,6 +157,7 @@ def retry_upload(tid: int, end_event: threading.Event) -> None:
def upload_handler(end_event: threading.Event) -> None: def upload_handler(end_event: threading.Event) -> None:
sm = messaging.SubMaster(['deviceState'])
tid = threading.get_ident() tid = threading.get_ident()
while not end_event.is_set(): while not end_event.is_set():
@ -161,8 +170,23 @@ def upload_handler(end_event: threading.Event) -> None:
cancelled_uploads.remove(cur_upload_items[tid].id) cancelled_uploads.remove(cur_upload_items[tid].id)
continue continue
# TODO: remove item if too old
# Check if uploading over cell is allowed
sm.update(0)
cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet]
if cell and (not cur_upload_items[tid].allow_cellular):
retry_upload(tid, end_event, False)
continue
try: try:
def cb(sz, cur): def cb(sz, cur):
# Abort transfer if connection changed to cell after starting upload
sm.update(0)
cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet]
if cell and (not cur_upload_items[tid].allow_cellular):
raise AbortTransferException
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1) cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1)
response = _do_upload(cur_upload_items[tid], cb) response = _do_upload(cur_upload_items[tid], cb)
@ -172,8 +196,10 @@ def upload_handler(end_event: threading.Event) -> None:
UploadQueueCache.cache(upload_queue) UploadQueueCache.cache(upload_queue)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e: except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}") cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}")
retry_upload(tid, end_event) retry_upload(tid, end_event)
except AbortTransferException:
cloudlog.warning(f"athena.upload_handler.abort {cur_upload_items[tid]}")
retry_upload(tid, end_event, False)
except queue.Empty: except queue.Empty:
pass pass
@ -274,15 +300,20 @@ def reboot():
@dispatcher.add_method @dispatcher.add_method
def uploadFileToUrl(fn, url, headers): def uploadFileToUrl(fn, url, headers):
return uploadFilesToUrls([[fn, url, headers]]) return uploadFilesToUrls([{
"fn": fn,
"url": url,
"headers": headers,
}])
@dispatcher.add_method @dispatcher.add_method
def uploadFilesToUrls(files_data): def uploadFilesToUrls(files_data):
items = [] items = []
failed = [] failed = []
for fn, url, headers in files_data: for file in files_data:
if len(fn) == 0 or fn[0] == '/' or '..' in fn: fn = file.get('fn', '')
if len(fn) == 0 or fn[0] == '/' or '..' in fn or 'url' not in file:
failed.append(fn) failed.append(fn)
continue continue
path = os.path.join(ROOT, fn) path = os.path.join(ROOT, fn)
@ -290,7 +321,14 @@ def uploadFilesToUrls(files_data):
failed.append(fn) failed.append(fn)
continue continue
item = UploadItem(path=path, url=url, headers=headers, created_at=int(time.time() * 1000), id=None) item = UploadItem(
path=path,
url=file['url'],
headers=file.get('headers', {}),
created_at=int(time.time() * 1000),
id=None,
allow_cellular=file.get('allow_cellular', False),
)
upload_id = hashlib.sha1(str(item).encode()).hexdigest() upload_id = hashlib.sha1(str(item).encode()).hexdigest()
item = item._replace(id=upload_id) item = item._replace(id=upload_id)
upload_queue.put_nowait(item) upload_queue.put_nowait(item)

@ -150,7 +150,7 @@ class TestAthenadMethods(unittest.TestCase):
def test_upload_handler(self, host): def test_upload_handler(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch() Path(fn).touch()
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event() end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
@ -173,7 +173,7 @@ class TestAthenadMethods(unittest.TestCase):
mock_put.return_value.status_code = status mock_put.return_value.status_code = status
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch() Path(fn).touch()
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event() end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
@ -187,7 +187,7 @@ class TestAthenadMethods(unittest.TestCase):
self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0) self.assertEqual(athenad.upload_queue.qsize(), 1 if retry else 0)
finally: finally:
end_event.set() end_event.set()
if retry: if retry:
self.assertEqual(athenad.upload_queue.get().retry_count, 1) self.assertEqual(athenad.upload_queue.get().retry_count, 1)
@ -195,7 +195,7 @@ class TestAthenadMethods(unittest.TestCase):
"""When an upload times out or fails to connect it should be placed back in the queue""" """When an upload times out or fails to connect it should be placed back in the queue"""
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch() Path(fn).touch()
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT) item_no_retry = item._replace(retry_count=MAX_RETRY_COUNT)
end_event = threading.Event() end_event = threading.Event()
@ -222,7 +222,7 @@ class TestAthenadMethods(unittest.TestCase):
end_event.set() end_event.set()
def test_cancelUpload(self): def test_cancelUpload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
dispatcher["cancelUpload"](item.id) dispatcher["cancelUpload"](item.id)
@ -248,7 +248,7 @@ class TestAthenadMethods(unittest.TestCase):
def test_listUploadQueueCurrent(self, host): def test_listUploadQueueCurrent(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch() Path(fn).touch()
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
end_event = threading.Event() end_event = threading.Event()
thread = threading.Thread(target=athenad.upload_handler, args=(end_event,)) thread = threading.Thread(target=athenad.upload_handler, args=(end_event,))
@ -266,7 +266,7 @@ class TestAthenadMethods(unittest.TestCase):
end_event.set() end_event.set()
def test_listUploadQueue(self): def test_listUploadQueue(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item) athenad.upload_queue.put_nowait(item)
items = dispatcher["listUploadQueue"]() items = dispatcher["listUploadQueue"]()

Loading…
Cancel
Save