publish uploader state (#21580)

* uploader state

* add time

* bump

* populate whole packet

* bump cereal

* revert

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: 6b69032807
commatwo_master
Adeeb Shihadeh 4 years ago committed by GitHub
parent 590be23e33
commit 14a441a314
  1. 2
      cereal
  2. 52
      selfdrive/loggerd/uploader.py

@ -1 +1 @@
Subproject commit 95d7c948526faaa5900f37647c1302bde2d99922
Subproject commit b87d81bfaa2bf730f4088e791c3fce6bcdac0eda

@ -60,6 +60,16 @@ class Uploader():
self.last_resp = None
self.last_exc = None
self.raw_size = 0
self.raw_count = 0
self.immediate_size = 0
self.immediate_count = 0
# stats for last successfully uploaded file
self.last_time = 0
self.last_speed = 0
self.last_filename = ""
self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog.bz2": 0, "qcamera.ts": 1}
self.high_priority = {"rlog.bz2": 0, "fcamera.hevc": 1, "dcamera.hevc": 2, "ecamera.hevc": 3}
@ -71,15 +81,22 @@ class Uploader():
return self.high_priority[name] + 100
return 1000
def gen_upload_files(self):
def list_upload_files(self):
if not os.path.isdir(self.root):
return
self.raw_size = 0
self.raw_count = 0
self.immediate_size = 0
self.immediate_count = 0
for logname in listdir_by_creation(self.root):
path = os.path.join(self.root, logname)
try:
names = os.listdir(path)
except OSError:
continue
if any(name.endswith(".lock") for name in names):
continue
@ -94,10 +111,21 @@ class Uploader():
is_uploaded = True # deleter could have deleted
if is_uploaded:
continue
try:
if name in self.immediate_priority:
self.immediate_count += 1
self.immediate_size += os.path.getsize(fn)
else:
self.raw_count += 1
self.raw_size += os.path.getsize(fn)
except OSError:
pass
yield (name, key, fn)
def next_file_to_upload(self, with_raw):
upload_files = list(self.gen_upload_files())
upload_files = list(self.list_upload_files())
# try to upload qlog files first
for name, key, fn in upload_files:
@ -174,6 +202,7 @@ class Uploader():
cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz)
success = True
else:
start_time = time.monotonic()
cloudlog.debug("uploading %r", fn)
stat = self.normal_upload(key, fn)
if stat is not None and stat.status_code in (200, 201, 403, 412):
@ -183,6 +212,10 @@ class Uploader():
setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE)
except OSError:
cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz)
self.last_filename = fn
self.last_time = time.monotonic() - start_time
self.last_speed = (sz / 1e6) / self.last_time
success = True
else:
cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz, debug=True)
@ -190,6 +223,18 @@ class Uploader():
return success
def get_msg(self):
msg = messaging.new_message("uploaderState")
us = msg.uploaderState
us.rawQueueSize = int(self.raw_size / 1e6)
us.rawQueueCount = self.raw_count
us.immediateQueueSize = int(self.immediate_size / 1e6)
us.immediateQueueCount = self.immediate_count
us.lastTime = self.last_time
us.lastSpeed = self.last_speed
us.lastFilename = self.last_filename
return msg
def uploader_fn(exit_event):
params = Params()
dongle_id = params.get("DongleId", encoding='utf8')
@ -202,6 +247,7 @@ def uploader_fn(exit_event):
cloudlog.warning("NVME not mounted")
sm = messaging.SubMaster(['deviceState'])
pm = messaging.PubMaster(['uploaderState'])
uploader = Uploader(dongle_id, ROOT)
backoff = 0.1
@ -233,6 +279,8 @@ def uploader_fn(exit_event):
cloudlog.info("upload backoff %r", backoff)
time.sleep(backoff + random.uniform(0, backoff))
backoff = min(backoff*2, 120)
pm.send("uploaderState", uploader.get_msg())
cloudlog.info("upload done, success=%r", success)
def main():

Loading…
Cancel
Save