You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
273 lines
8.4 KiB
273 lines
8.4 KiB
#!/usr/bin/env python3
|
|
import io
|
|
import json
|
|
import os
|
|
import random
|
|
import requests
|
|
import threading
|
|
import time
|
|
import traceback
|
|
import datetime
|
|
import zstandard as zstd
|
|
from collections.abc import Iterator
|
|
|
|
from cereal import log
|
|
import cereal.messaging as messaging
|
|
from openpilot.common.api import Api
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.realtime import set_core_affinity
|
|
from openpilot.system.hardware.hw import Paths
|
|
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
|
|
from openpilot.common.swaglog import cloudlog
|
|
|
|
NetworkType = log.DeviceState.NetworkType
|
|
UPLOAD_ATTR_NAME = 'user.upload'
|
|
UPLOAD_ATTR_VALUE = b'1'
|
|
|
|
MAX_UPLOAD_SIZES = {
|
|
"qlog": 25*1e6, # can't be too restrictive here since we use qlogs to find
|
|
# bugs, including ones that can cause massive log sizes
|
|
"qcam": 5*1e6,
|
|
}
|
|
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change
|
|
|
|
allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1"))
|
|
force_wifi = os.getenv("FORCEWIFI") is not None
|
|
fake_upload = os.getenv("FAKEUPLOAD") is not None
|
|
|
|
|
|
class FakeRequest:
|
|
def __init__(self):
|
|
self.headers = {"Content-Length": "0"}
|
|
|
|
|
|
class FakeResponse:
|
|
def __init__(self):
|
|
self.status_code = 200
|
|
self.request = FakeRequest()
|
|
|
|
|
|
def get_directory_sort(d: str) -> list[str]:
|
|
# ensure old format is sorted sooner
|
|
o = ["0", ] if d.startswith("2024-") else ["1", ]
|
|
return o + [s.rjust(10, '0') for s in d.rsplit('--', 1)]
|
|
|
|
def listdir_by_creation(d: str) -> list[str]:
|
|
if not os.path.isdir(d):
|
|
return []
|
|
|
|
try:
|
|
paths = [f for f in os.listdir(d) if os.path.isdir(os.path.join(d, f))]
|
|
paths = sorted(paths, key=get_directory_sort)
|
|
return paths
|
|
except OSError:
|
|
cloudlog.exception("listdir_by_creation failed")
|
|
return []
|
|
|
|
def clear_locks(root: str) -> None:
|
|
for logdir in os.listdir(root):
|
|
path = os.path.join(root, logdir)
|
|
try:
|
|
for fname in os.listdir(path):
|
|
if fname.endswith(".lock"):
|
|
os.unlink(os.path.join(path, fname))
|
|
except OSError:
|
|
cloudlog.exception("clear_locks failed")
|
|
|
|
|
|
class Uploader:
|
|
def __init__(self, dongle_id: str, root: str):
|
|
self.dongle_id = dongle_id
|
|
self.api = Api(dongle_id)
|
|
self.root = root
|
|
|
|
self.params = Params()
|
|
|
|
# stats for last successfully uploaded file
|
|
self.last_filename = ""
|
|
|
|
self.immediate_folders = ["crash/", "boot/"]
|
|
self.immediate_priority = {"qlog": 0, "qlog.zst": 0, "qcamera.ts": 1}
|
|
|
|
def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]:
|
|
r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8")
|
|
requested_routes = [] if r is None else r.split(",")
|
|
|
|
for logdir in listdir_by_creation(self.root):
|
|
path = os.path.join(self.root, logdir)
|
|
try:
|
|
names = os.listdir(path)
|
|
except OSError:
|
|
continue
|
|
|
|
if any(name.endswith(".lock") for name in names):
|
|
continue
|
|
|
|
for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)):
|
|
key = os.path.join(logdir, name)
|
|
fn = os.path.join(path, name)
|
|
# skip files already uploaded
|
|
try:
|
|
ctime = os.path.getctime(fn)
|
|
is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE
|
|
except OSError:
|
|
cloudlog.event("uploader_getxattr_failed", key=key, fn=fn)
|
|
# deleter could have deleted, so skip
|
|
continue
|
|
if is_uploaded:
|
|
continue
|
|
|
|
# limit uploading on metered connections
|
|
if metered:
|
|
dt = datetime.timedelta(hours=12)
|
|
if logdir in self.immediate_folders and (datetime.datetime.now() - datetime.datetime.fromtimestamp(ctime)) < dt:
|
|
continue
|
|
|
|
if name == "qcamera.ts" and not any(logdir.startswith(r.split('|')[-1]) for r in requested_routes):
|
|
continue
|
|
|
|
yield name, key, fn
|
|
|
|
def next_file_to_upload(self, metered: bool) -> tuple[str, str, str] | None:
|
|
upload_files = list(self.list_upload_files(metered))
|
|
|
|
for name, key, fn in upload_files:
|
|
if any(f in fn for f in self.immediate_folders):
|
|
return name, key, fn
|
|
|
|
for name, key, fn in upload_files:
|
|
if name in self.immediate_priority:
|
|
return name, key, fn
|
|
|
|
return None
|
|
|
|
def do_upload(self, key: str, fn: str):
|
|
url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token())
|
|
if url_resp.status_code == 412:
|
|
return url_resp
|
|
|
|
url_resp_json = json.loads(url_resp.text)
|
|
url = url_resp_json['url']
|
|
headers = url_resp_json['headers']
|
|
cloudlog.debug("upload_url v1.4 %s %s", url, str(headers))
|
|
|
|
if fake_upload:
|
|
return FakeResponse()
|
|
|
|
with open(fn, "rb") as f:
|
|
content = f.read()
|
|
if key.endswith('.zst') and not fn.endswith('.zst'):
|
|
content = zstd.compress(content, LOG_COMPRESSION_LEVEL)
|
|
|
|
with io.BytesIO(content) as data:
|
|
return requests.put(url, data=data, headers=headers, timeout=10)
|
|
|
|
def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool:
|
|
try:
|
|
sz = os.path.getsize(fn)
|
|
except OSError:
|
|
cloudlog.exception("upload: getsize failed")
|
|
return False
|
|
|
|
cloudlog.event("upload_start", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered)
|
|
|
|
if sz == 0:
|
|
# tag files of 0 size as uploaded
|
|
success = True
|
|
elif name in MAX_UPLOAD_SIZES and sz > MAX_UPLOAD_SIZES[name]:
|
|
cloudlog.event("uploader_too_large", key=key, fn=fn, sz=sz)
|
|
success = True
|
|
else:
|
|
start_time = time.monotonic()
|
|
|
|
stat = None
|
|
last_exc = None
|
|
try:
|
|
stat = self.do_upload(key, fn)
|
|
except Exception as e:
|
|
last_exc = (e, traceback.format_exc())
|
|
|
|
if stat is not None and stat.status_code in (200, 201, 401, 403, 412):
|
|
self.last_filename = fn
|
|
dt = time.monotonic() - start_time
|
|
if stat.status_code == 412:
|
|
cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered)
|
|
else:
|
|
content_length = int(stat.request.headers.get("Content-Length", 0))
|
|
speed = (content_length / 1e6) / dt
|
|
cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length,
|
|
network_type=network_type, metered=metered, speed=speed)
|
|
success = True
|
|
else:
|
|
success = False
|
|
cloudlog.event("upload_failed", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered)
|
|
|
|
if success:
|
|
# tag file as uploaded
|
|
try:
|
|
setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE)
|
|
except OSError:
|
|
cloudlog.event("uploader_setxattr_failed", exc=last_exc, key=key, fn=fn, sz=sz)
|
|
|
|
return success
|
|
|
|
|
|
def step(self, network_type: int, metered: bool) -> bool | None:
|
|
d = self.next_file_to_upload(metered)
|
|
if d is None:
|
|
return None
|
|
|
|
name, key, fn = d
|
|
|
|
# qlogs and bootlogs need to be compressed before uploading
|
|
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.zst')):
|
|
key += ".zst"
|
|
|
|
return self.upload(name, key, fn, network_type, metered)
|
|
|
|
|
|
def main(exit_event: threading.Event = None) -> None:
|
|
if exit_event is None:
|
|
exit_event = threading.Event()
|
|
|
|
try:
|
|
set_core_affinity([0, 1, 2, 3])
|
|
except Exception:
|
|
cloudlog.exception("failed to set core affinity")
|
|
|
|
clear_locks(Paths.log_root())
|
|
|
|
params = Params()
|
|
dongle_id = params.get("DongleId", encoding='utf8')
|
|
|
|
if dongle_id is None:
|
|
cloudlog.info("uploader missing dongle_id")
|
|
raise Exception("uploader can't start without dongle id")
|
|
|
|
sm = messaging.SubMaster(['deviceState'])
|
|
uploader = Uploader(dongle_id, Paths.log_root())
|
|
|
|
backoff = 0.1
|
|
while not exit_event.is_set():
|
|
sm.update(0)
|
|
offroad = params.get_bool("IsOffroad")
|
|
network_type = sm['deviceState'].networkType if not force_wifi else NetworkType.wifi
|
|
if network_type == NetworkType.none:
|
|
if allow_sleep:
|
|
time.sleep(60 if offroad else 5)
|
|
continue
|
|
|
|
success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered)
|
|
if success is None:
|
|
backoff = 60 if offroad else 5
|
|
elif success:
|
|
backoff = 0.1
|
|
else:
|
|
cloudlog.info("upload backoff %r", backoff)
|
|
backoff = min(backoff*2, 120)
|
|
if allow_sleep:
|
|
time.sleep(backoff + random.uniform(0, backoff))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|