|
|
@ -163,8 +163,6 @@ def upload_handler(end_event: threading.Event) -> None: |
|
|
|
sm = messaging.SubMaster(['deviceState']) |
|
|
|
sm = messaging.SubMaster(['deviceState']) |
|
|
|
tid = threading.get_ident() |
|
|
|
tid = threading.get_ident() |
|
|
|
|
|
|
|
|
|
|
|
cellular_unmetered = Params().get_bool("CellularUnmetered") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while not end_event.is_set(): |
|
|
|
while not end_event.is_set(): |
|
|
|
cur_upload_items[tid] = None |
|
|
|
cur_upload_items[tid] = None |
|
|
|
|
|
|
|
|
|
|
@ -181,46 +179,45 @@ def upload_handler(end_event: threading.Event) -> None: |
|
|
|
cloudlog.event("athena.upload_handler.expired", item=cur_upload_items[tid], error=True) |
|
|
|
cloudlog.event("athena.upload_handler.expired", item=cur_upload_items[tid], error=True) |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
# Check if uploading over cell is allowed |
|
|
|
# Check if uploading over metered connection is allowed |
|
|
|
sm.update(0) |
|
|
|
sm.update(0) |
|
|
|
cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet] |
|
|
|
metered = sm['deviceState'].networkMetered |
|
|
|
if cell and (not cur_upload_items[tid].allow_cellular) and (not cellular_unmetered): |
|
|
|
network_type = sm['deviceState'].networkType.raw |
|
|
|
|
|
|
|
if metered and (not cur_upload_items[tid].allow_cellular): |
|
|
|
retry_upload(tid, end_event, False) |
|
|
|
retry_upload(tid, end_event, False) |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
try: |
|
|
|
def cb(sz, cur): |
|
|
|
def cb(sz, cur): |
|
|
|
# Abort transfer if connection changed to cell after starting upload |
|
|
|
# Abort transfer if connection changed to metered after starting upload |
|
|
|
sm.update(0) |
|
|
|
sm.update(0) |
|
|
|
cell = sm['deviceState'].networkType not in [NetworkType.wifi, NetworkType.ethernet] |
|
|
|
metered = sm['deviceState'].networkMetered |
|
|
|
if cell and (not cur_upload_items[tid].allow_cellular) and (not cellular_unmetered): |
|
|
|
if metered and (not cur_upload_items[tid].allow_cellular): |
|
|
|
raise AbortTransferException |
|
|
|
raise AbortTransferException |
|
|
|
|
|
|
|
|
|
|
|
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1) |
|
|
|
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
network_type = sm['deviceState'].networkType.raw |
|
|
|
|
|
|
|
fn = cur_upload_items[tid].path |
|
|
|
fn = cur_upload_items[tid].path |
|
|
|
try: |
|
|
|
try: |
|
|
|
sz = os.path.getsize(fn) |
|
|
|
sz = os.path.getsize(fn) |
|
|
|
except OSError: |
|
|
|
except OSError: |
|
|
|
sz = -1 |
|
|
|
sz = -1 |
|
|
|
|
|
|
|
|
|
|
|
cloudlog.event("athena.upload_handler.upload_start", fn=fn, sz=sz, network_type=network_type) |
|
|
|
cloudlog.event("athena.upload_handler.upload_start", fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
response = _do_upload(cur_upload_items[tid], cb) |
|
|
|
response = _do_upload(cur_upload_items[tid], cb) |
|
|
|
|
|
|
|
|
|
|
|
if response.status_code not in (200, 201, 403, 412): |
|
|
|
if response.status_code not in (200, 201, 403, 412): |
|
|
|
cloudlog.event("athena.upload_handler.retry", status_code=response.status_code, fn=fn, sz=sz, network_type=network_type) |
|
|
|
cloudlog.event("athena.upload_handler.retry", status_code=response.status_code, fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
retry_upload(tid, end_event) |
|
|
|
retry_upload(tid, end_event) |
|
|
|
else: |
|
|
|
else: |
|
|
|
cloudlog.event("athena.upload_handler.success", fn=fn, sz=sz, network_type=network_type) |
|
|
|
cloudlog.event("athena.upload_handler.success", fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
|
|
|
|
|
|
|
|
UploadQueueCache.cache(upload_queue) |
|
|
|
UploadQueueCache.cache(upload_queue) |
|
|
|
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError): |
|
|
|
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError): |
|
|
|
cloudlog.event("athena.upload_handler.timeout", fn=fn, sz=sz, network_type=network_type) |
|
|
|
cloudlog.event("athena.upload_handler.timeout", fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
retry_upload(tid, end_event) |
|
|
|
retry_upload(tid, end_event) |
|
|
|
except AbortTransferException: |
|
|
|
except AbortTransferException: |
|
|
|
cloudlog.event("athena.upload_handler.abort", fn=fn, sz=sz, network_type=network_type) |
|
|
|
cloudlog.event("athena.upload_handler.abort", fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
retry_upload(tid, end_event, False) |
|
|
|
retry_upload(tid, end_event, False) |
|
|
|
|
|
|
|
|
|
|
|
except queue.Empty: |
|
|
|
except queue.Empty: |
|
|
@ -459,6 +456,12 @@ def getNetworkType(): |
|
|
|
return HARDWARE.get_network_type() |
|
|
|
return HARDWARE.get_network_type() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dispatcher.add_method |
|
|
|
|
|
|
|
def getNetworkMetered(): |
|
|
|
|
|
|
|
network_type = HARDWARE.get_network_type() |
|
|
|
|
|
|
|
return HARDWARE.get_network_metered(network_type) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dispatcher.add_method |
|
|
|
@dispatcher.add_method |
|
|
|
def getNetworks(): |
|
|
|
def getNetworks(): |
|
|
|
return HARDWARE.get_networks() |
|
|
|
return HARDWARE.get_networks() |
|
|
|