|
|
@ -8,15 +8,13 @@ import requests |
|
|
|
import threading |
|
|
|
import threading |
|
|
|
import time |
|
|
|
import time |
|
|
|
import traceback |
|
|
|
import traceback |
|
|
|
from pathlib import Path |
|
|
|
from typing import BinaryIO, Iterator, List, Optional, Tuple |
|
|
|
from typing import BinaryIO, Iterator, List, Optional, Tuple, Union |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from cereal import log |
|
|
|
from cereal import log |
|
|
|
import cereal.messaging as messaging |
|
|
|
import cereal.messaging as messaging |
|
|
|
from openpilot.common.api import Api |
|
|
|
from openpilot.common.api import Api |
|
|
|
from openpilot.common.params import Params |
|
|
|
from openpilot.common.params import Params |
|
|
|
from openpilot.common.realtime import set_core_affinity |
|
|
|
from openpilot.common.realtime import set_core_affinity |
|
|
|
from openpilot.system.hardware import TICI |
|
|
|
|
|
|
|
from openpilot.system.hardware.hw import Paths |
|
|
|
from openpilot.system.hardware.hw import Paths |
|
|
|
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr |
|
|
|
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr |
|
|
|
from openpilot.common.swaglog import cloudlog |
|
|
|
from openpilot.common.swaglog import cloudlog |
|
|
@ -43,8 +41,6 @@ class FakeResponse: |
|
|
|
self.request = FakeRequest() |
|
|
|
self.request = FakeRequest() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
UploadResponse = Union[requests.Response, FakeResponse] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_directory_sort(d: str) -> List[str]: |
|
|
|
def get_directory_sort(d: str) -> List[str]: |
|
|
|
return [s.rjust(10, '0') for s in d.rsplit('--', 1)] |
|
|
|
return [s.rjust(10, '0') for s in d.rsplit('--', 1)] |
|
|
|
|
|
|
|
|
|
|
@ -74,32 +70,16 @@ class Uploader: |
|
|
|
self.api = Api(dongle_id) |
|
|
|
self.api = Api(dongle_id) |
|
|
|
self.root = root |
|
|
|
self.root = root |
|
|
|
|
|
|
|
|
|
|
|
self.last_resp: Optional[UploadResponse] = None |
|
|
|
|
|
|
|
self.last_exc: Optional[Tuple[Exception, str]] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.immediate_size = 0 |
|
|
|
|
|
|
|
self.immediate_count = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# stats for last successfully uploaded file |
|
|
|
# stats for last successfully uploaded file |
|
|
|
self.last_time = 0.0 |
|
|
|
|
|
|
|
self.last_speed = 0.0 |
|
|
|
|
|
|
|
self.last_filename = "" |
|
|
|
self.last_filename = "" |
|
|
|
|
|
|
|
|
|
|
|
self.immediate_folders = ["crash/", "boot/"] |
|
|
|
self.immediate_folders = ["crash/", "boot/"] |
|
|
|
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} |
|
|
|
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} |
|
|
|
|
|
|
|
|
|
|
|
def get_upload_sort(self, name: str) -> int: |
|
|
|
|
|
|
|
if name in self.immediate_priority: |
|
|
|
|
|
|
|
return self.immediate_priority[name] |
|
|
|
|
|
|
|
return 1000 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: |
|
|
|
def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: |
|
|
|
if not os.path.isdir(self.root): |
|
|
|
if not os.path.isdir(self.root): |
|
|
|
return |
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
self.immediate_size = 0 |
|
|
|
|
|
|
|
self.immediate_count = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for logname in listdir_by_creation(self.root): |
|
|
|
for logname in listdir_by_creation(self.root): |
|
|
|
path = os.path.join(self.root, logname) |
|
|
|
path = os.path.join(self.root, logname) |
|
|
|
try: |
|
|
|
try: |
|
|
@ -110,25 +90,18 @@ class Uploader: |
|
|
|
if any(name.endswith(".lock") for name in names): |
|
|
|
if any(name.endswith(".lock") for name in names): |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
for name in sorted(names, key=self.get_upload_sort): |
|
|
|
for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)): |
|
|
|
key = os.path.join(logname, name) |
|
|
|
key = os.path.join(logname, name) |
|
|
|
fn = os.path.join(path, name) |
|
|
|
fn = os.path.join(path, name) |
|
|
|
# skip files already uploaded |
|
|
|
# skip files already uploaded |
|
|
|
try: |
|
|
|
try: |
|
|
|
is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE |
|
|
|
is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE |
|
|
|
except OSError: |
|
|
|
except OSError: |
|
|
|
cloudlog.event("uploader_getxattr_failed", exc=self.last_exc, key=key, fn=fn) |
|
|
|
cloudlog.event("uploader_getxattr_failed", key=key, fn=fn) |
|
|
|
is_uploaded = True # deleter could have deleted |
|
|
|
is_uploaded = True # deleter could have deleted |
|
|
|
if is_uploaded: |
|
|
|
if is_uploaded: |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
if name in self.immediate_priority: |
|
|
|
|
|
|
|
self.immediate_count += 1 |
|
|
|
|
|
|
|
self.immediate_size += os.path.getsize(fn) |
|
|
|
|
|
|
|
except OSError: |
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
yield name, key, fn |
|
|
|
yield name, key, fn |
|
|
|
|
|
|
|
|
|
|
|
def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: |
|
|
|
def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: |
|
|
@ -144,45 +117,28 @@ class Uploader: |
|
|
|
|
|
|
|
|
|
|
|
return None |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def do_upload(self, key: str, fn: str) -> None: |
|
|
|
def do_upload(self, key: str, fn: str): |
|
|
|
try: |
|
|
|
url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token()) |
|
|
|
url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token()) |
|
|
|
if url_resp.status_code == 412: |
|
|
|
if url_resp.status_code == 412: |
|
|
|
return url_resp |
|
|
|
self.last_resp = url_resp |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
url_resp_json = json.loads(url_resp.text) |
|
|
|
|
|
|
|
url = url_resp_json['url'] |
|
|
|
|
|
|
|
headers = url_resp_json['headers'] |
|
|
|
|
|
|
|
cloudlog.debug("upload_url v1.4 %s %s", url, str(headers)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if fake_upload: |
|
|
|
|
|
|
|
cloudlog.debug(f"*** WARNING, THIS IS A FAKE UPLOAD TO {url} ***") |
|
|
|
|
|
|
|
self.last_resp = FakeResponse() |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
with open(fn, "rb") as f: |
|
|
|
|
|
|
|
data: BinaryIO |
|
|
|
|
|
|
|
if key.endswith('.bz2') and not fn.endswith('.bz2'): |
|
|
|
|
|
|
|
compressed = bz2.compress(f.read()) |
|
|
|
|
|
|
|
data = io.BytesIO(compressed) |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
data = f |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.last_resp = requests.put(url, data=data, headers=headers, timeout=10) |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
|
|
self.last_exc = (e, traceback.format_exc()) |
|
|
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def normal_upload(self, key: str, fn: str) -> Optional[UploadResponse]: |
|
|
|
|
|
|
|
self.last_resp = None |
|
|
|
|
|
|
|
self.last_exc = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
url_resp_json = json.loads(url_resp.text) |
|
|
|
self.do_upload(key, fn) |
|
|
|
url = url_resp_json['url'] |
|
|
|
except Exception: |
|
|
|
headers = url_resp_json['headers'] |
|
|
|
pass |
|
|
|
cloudlog.debug("upload_url v1.4 %s %s", url, str(headers)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if fake_upload: |
|
|
|
|
|
|
|
return FakeResponse() |
|
|
|
|
|
|
|
|
|
|
|
return self.last_resp |
|
|
|
with open(fn, "rb") as f: |
|
|
|
|
|
|
|
data: BinaryIO |
|
|
|
|
|
|
|
if key.endswith('.bz2') and not fn.endswith('.bz2'): |
|
|
|
|
|
|
|
compressed = bz2.compress(f.read()) |
|
|
|
|
|
|
|
data = io.BytesIO(compressed) |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
data = f |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return requests.put(url, data=data, headers=headers, timeout=10) |
|
|
|
|
|
|
|
|
|
|
|
def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: |
|
|
|
def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: |
|
|
|
try: |
|
|
|
try: |
|
|
@ -201,44 +157,57 @@ class Uploader: |
|
|
|
success = True |
|
|
|
success = True |
|
|
|
else: |
|
|
|
else: |
|
|
|
start_time = time.monotonic() |
|
|
|
start_time = time.monotonic() |
|
|
|
stat = self.normal_upload(key, fn) |
|
|
|
|
|
|
|
|
|
|
|
stat = None |
|
|
|
|
|
|
|
last_exc = None |
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
stat = self.do_upload(key, fn) |
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
|
|
last_exc = (e, traceback.format_exc()) |
|
|
|
|
|
|
|
|
|
|
|
if stat is not None and stat.status_code in (200, 201, 401, 403, 412): |
|
|
|
if stat is not None and stat.status_code in (200, 201, 401, 403, 412): |
|
|
|
self.last_filename = fn |
|
|
|
self.last_filename = fn |
|
|
|
self.last_time = time.monotonic() - start_time |
|
|
|
dt = time.monotonic() - start_time |
|
|
|
if stat.status_code == 412: |
|
|
|
if stat.status_code == 412: |
|
|
|
self.last_speed = 0 |
|
|
|
|
|
|
|
cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
else: |
|
|
|
else: |
|
|
|
content_length = int(stat.request.headers.get("Content-Length", 0)) |
|
|
|
content_length = int(stat.request.headers.get("Content-Length", 0)) |
|
|
|
self.last_speed = (content_length / 1e6) / self.last_time |
|
|
|
speed = (content_length / 1e6) / dt |
|
|
|
cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length, |
|
|
|
cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length, |
|
|
|
network_type=network_type, metered=metered, speed=self.last_speed) |
|
|
|
network_type=network_type, metered=metered, speed=speed) |
|
|
|
success = True |
|
|
|
success = True |
|
|
|
else: |
|
|
|
else: |
|
|
|
success = False |
|
|
|
success = False |
|
|
|
cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
cloudlog.event("upload_failed", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) |
|
|
|
|
|
|
|
|
|
|
|
if success: |
|
|
|
if success: |
|
|
|
# tag file as uploaded |
|
|
|
# tag file as uploaded |
|
|
|
try: |
|
|
|
try: |
|
|
|
setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) |
|
|
|
setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) |
|
|
|
except OSError: |
|
|
|
except OSError: |
|
|
|
cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz) |
|
|
|
cloudlog.event("uploader_setxattr_failed", exc=last_exc, key=key, fn=fn, sz=sz) |
|
|
|
|
|
|
|
|
|
|
|
return success |
|
|
|
return success |
|
|
|
|
|
|
|
|
|
|
|
def get_msg(self): |
|
|
|
|
|
|
|
msg = messaging.new_message("uploaderState", valid=True) |
|
|
|
def step(self, network_type: int, metered: bool) -> bool: |
|
|
|
us = msg.uploaderState |
|
|
|
d = self.next_file_to_upload() |
|
|
|
us.immediateQueueSize = int(self.immediate_size / 1e6) |
|
|
|
if d is None: |
|
|
|
us.immediateQueueCount = self.immediate_count |
|
|
|
return True |
|
|
|
us.lastTime = self.last_time |
|
|
|
|
|
|
|
us.lastSpeed = self.last_speed |
|
|
|
name, key, fn = d |
|
|
|
us.lastFilename = self.last_filename |
|
|
|
|
|
|
|
return msg |
|
|
|
# qlogs and bootlogs need to be compressed before uploading |
|
|
|
|
|
|
|
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): |
|
|
|
|
|
|
|
key += ".bz2" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return self.upload(name, key, fn, network_type, metered) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def uploader_fn(exit_event: threading.Event) -> None: |
|
|
|
def main(exit_event: Optional[threading.Event] = None) -> None: |
|
|
|
|
|
|
|
if exit_event is None: |
|
|
|
|
|
|
|
exit_event = threading.Event() |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
try: |
|
|
|
set_core_affinity([0, 1, 2, 3]) |
|
|
|
set_core_affinity([0, 1, 2, 3]) |
|
|
|
except Exception: |
|
|
|
except Exception: |
|
|
@ -253,11 +222,7 @@ def uploader_fn(exit_event: threading.Event) -> None: |
|
|
|
cloudlog.info("uploader missing dongle_id") |
|
|
|
cloudlog.info("uploader missing dongle_id") |
|
|
|
raise Exception("uploader can't start without dongle id") |
|
|
|
raise Exception("uploader can't start without dongle id") |
|
|
|
|
|
|
|
|
|
|
|
if TICI and not Path("/data/media").is_mount(): |
|
|
|
|
|
|
|
cloudlog.warning("NVME not mounted") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sm = messaging.SubMaster(['deviceState']) |
|
|
|
sm = messaging.SubMaster(['deviceState']) |
|
|
|
pm = messaging.PubMaster(['uploaderState']) |
|
|
|
|
|
|
|
uploader = Uploader(dongle_id, Paths.log_root()) |
|
|
|
uploader = Uploader(dongle_id, Paths.log_root()) |
|
|
|
|
|
|
|
|
|
|
|
backoff = 0.1 |
|
|
|
backoff = 0.1 |
|
|
@ -270,31 +235,13 @@ def uploader_fn(exit_event: threading.Event) -> None: |
|
|
|
time.sleep(60 if offroad else 5) |
|
|
|
time.sleep(60 if offroad else 5) |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
d = uploader.next_file_to_upload() |
|
|
|
success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) |
|
|
|
if d is None: # Nothing to upload |
|
|
|
|
|
|
|
if allow_sleep: |
|
|
|
|
|
|
|
time.sleep(60 if offroad else 5) |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
name, key, fn = d |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# qlogs and bootlogs need to be compressed before uploading |
|
|
|
|
|
|
|
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): |
|
|
|
|
|
|
|
key += ".bz2" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
success = uploader.upload(name, key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) |
|
|
|
|
|
|
|
if success: |
|
|
|
if success: |
|
|
|
backoff = 0.1 |
|
|
|
backoff = 0.1 |
|
|
|
elif allow_sleep: |
|
|
|
elif allow_sleep: |
|
|
|
cloudlog.info("upload backoff %r", backoff) |
|
|
|
cloudlog.info("upload backoff %r", backoff) |
|
|
|
time.sleep(backoff + random.uniform(0, backoff)) |
|
|
|
|
|
|
|
backoff = min(backoff*2, 120) |
|
|
|
backoff = min(backoff*2, 120) |
|
|
|
|
|
|
|
time.sleep(backoff + random.uniform(0, backoff)) |
|
|
|
pm.send("uploaderState", uploader.get_msg()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
|
|
|
|
|
uploader_fn(threading.Event()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
if __name__ == "__main__": |
|
|
|