|
|
|
@ -22,6 +22,7 @@ from websocket import ABNF, WebSocketTimeoutException, WebSocketException, creat |
|
|
|
|
import cereal.messaging as messaging |
|
|
|
|
from cereal.services import service_list |
|
|
|
|
from common.api import Api |
|
|
|
|
from common.file_helpers import CallbackReader |
|
|
|
|
from common.basedir import PERSIST |
|
|
|
|
from common.params import Params |
|
|
|
|
from common.realtime import sec_since_boot |
|
|
|
@ -49,7 +50,9 @@ upload_queue: Any = queue.Queue() |
|
|
|
|
log_send_queue: Any = queue.Queue() |
|
|
|
|
log_recv_queue: Any = queue.Queue() |
|
|
|
|
cancelled_uploads: Any = set() |
|
|
|
|
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count'], defaults=(0,)) |
|
|
|
|
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0)) |
|
|
|
|
|
|
|
|
|
cur_upload_items = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_long_poll(ws): |
|
|
|
@ -100,35 +103,53 @@ def jsonrpc_handler(end_event): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_handler(end_event): |
|
|
|
|
tid = threading.get_ident() |
|
|
|
|
|
|
|
|
|
while not end_event.is_set(): |
|
|
|
|
cur_upload_items[tid] = None |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
item = upload_queue.get(timeout=1) |
|
|
|
|
if item.id in cancelled_uploads: |
|
|
|
|
cancelled_uploads.remove(item.id) |
|
|
|
|
cur_upload_items[tid] = upload_queue.get(timeout=1)._replace(current=True) |
|
|
|
|
if cur_upload_items[tid].id in cancelled_uploads: |
|
|
|
|
cancelled_uploads.remove(cur_upload_items[tid].id) |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
_do_upload(item) |
|
|
|
|
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e: |
|
|
|
|
cloudlog.warning(f"athena.upload_handler.retry {e} {item}") |
|
|
|
|
def cb(sz, cur): |
|
|
|
|
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1) |
|
|
|
|
|
|
|
|
|
if item.retry_count < MAX_RETRY_COUNT: |
|
|
|
|
item = item._replace(retry_count=item.retry_count + 1) |
|
|
|
|
_do_upload(cur_upload_items[tid], cb) |
|
|
|
|
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e: |
|
|
|
|
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}") |
|
|
|
|
|
|
|
|
|
if cur_upload_items[tid].retry_count < MAX_RETRY_COUNT: |
|
|
|
|
item = cur_upload_items[tid] |
|
|
|
|
item = item._replace( |
|
|
|
|
retry_count=item.retry_count + 1, |
|
|
|
|
progress=0, |
|
|
|
|
current=False |
|
|
|
|
) |
|
|
|
|
upload_queue.put_nowait(item) |
|
|
|
|
cur_upload_items[tid] = None |
|
|
|
|
|
|
|
|
|
for _ in range(RETRY_DELAY): |
|
|
|
|
time.sleep(1) |
|
|
|
|
if end_event.is_set(): |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
except queue.Empty: |
|
|
|
|
pass |
|
|
|
|
except Exception: |
|
|
|
|
cloudlog.exception("athena.upload_handler.exception") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _do_upload(upload_item): |
|
|
|
|
def _do_upload(upload_item, callback=None): |
|
|
|
|
with open(upload_item.path, "rb") as f: |
|
|
|
|
size = os.fstat(f.fileno()).st_size |
|
|
|
|
|
|
|
|
|
if callback: |
|
|
|
|
f = CallbackReader(f, callback, size) |
|
|
|
|
|
|
|
|
|
return requests.put(upload_item.url, |
|
|
|
|
data=f, |
|
|
|
|
headers={**upload_item.headers, 'Content-Length': str(size)}, |
|
|
|
@ -212,7 +233,8 @@ def uploadFileToUrl(fn, url, headers): |
|
|
|
|
|
|
|
|
|
@dispatcher.add_method |
|
|
|
|
def listUploadQueue(): |
|
|
|
|
return [item._asdict() for item in list(upload_queue.queue)] |
|
|
|
|
items = list(upload_queue.queue) + list(cur_upload_items.values()) |
|
|
|
|
return [i._asdict() for i in items if i is not None] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dispatcher.add_method |
|
|
|
@ -514,6 +536,8 @@ def main(): |
|
|
|
|
manage_tokens(api) |
|
|
|
|
|
|
|
|
|
conn_retries = 0 |
|
|
|
|
cur_upload_items.clear() |
|
|
|
|
|
|
|
|
|
handle_long_poll(ws) |
|
|
|
|
except (KeyboardInterrupt, SystemExit): |
|
|
|
|
break |
|
|
|
|