diff --git a/cereal b/cereal index 95d7c94852..b87d81bfaa 160000 --- a/cereal +++ b/cereal @@ -1 +1 @@ -Subproject commit 95d7c948526faaa5900f37647c1302bde2d99922 +Subproject commit b87d81bfaa2bf730f4088e791c3fce6bcdac0eda diff --git a/selfdrive/loggerd/uploader.py b/selfdrive/loggerd/uploader.py index 3b7d693aff..1cbd815379 100644 --- a/selfdrive/loggerd/uploader.py +++ b/selfdrive/loggerd/uploader.py @@ -60,6 +60,16 @@ class Uploader(): self.last_resp = None self.last_exc = None + self.raw_size = 0 + self.raw_count = 0 + self.immediate_size = 0 + self.immediate_count = 0 + + # stats for last successfully uploaded file + self.last_time = 0 + self.last_speed = 0 + self.last_filename = "" + self.immediate_folders = ["crash/", "boot/"] self.immediate_priority = {"qlog.bz2": 0, "qcamera.ts": 1} self.high_priority = {"rlog.bz2": 0, "fcamera.hevc": 1, "dcamera.hevc": 2, "ecamera.hevc": 3} @@ -71,15 +81,22 @@ class Uploader(): return self.high_priority[name] + 100 return 1000 - def gen_upload_files(self): + def list_upload_files(self): if not os.path.isdir(self.root): return + + self.raw_size = 0 + self.raw_count = 0 + self.immediate_size = 0 + self.immediate_count = 0 + for logname in listdir_by_creation(self.root): path = os.path.join(self.root, logname) try: names = os.listdir(path) except OSError: continue + if any(name.endswith(".lock") for name in names): continue @@ -94,10 +111,21 @@ class Uploader(): is_uploaded = True # deleter could have deleted if is_uploaded: continue + + try: + if name in self.immediate_priority: + self.immediate_count += 1 + self.immediate_size += os.path.getsize(fn) + else: + self.raw_count += 1 + self.raw_size += os.path.getsize(fn) + except OSError: + pass + yield (name, key, fn) def next_file_to_upload(self, with_raw): - upload_files = list(self.gen_upload_files()) + upload_files = list(self.list_upload_files()) # try to upload qlog files first for name, key, fn in upload_files: @@ -174,6 +202,7 @@ class Uploader(): cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz) success = True else: + start_time = time.monotonic() cloudlog.debug("uploading %r", fn) stat = self.normal_upload(key, fn) if stat is not None and stat.status_code in (200, 201, 403, 412): @@ -183,6 +212,10 @@ class Uploader(): setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) except OSError: cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz) + + self.last_filename = fn + self.last_time = time.monotonic() - start_time + self.last_speed = (sz / 1e6) / self.last_time success = True else: cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz, debug=True) @@ -190,6 +223,18 @@ class Uploader(): return success + def get_msg(self): + msg = messaging.new_message("uploaderState") + us = msg.uploaderState + us.rawQueueSize = int(self.raw_size / 1e6) + us.rawQueueCount = self.raw_count + us.immediateQueueSize = int(self.immediate_size / 1e6) + us.immediateQueueCount = self.immediate_count + us.lastTime = self.last_time + us.lastSpeed = self.last_speed + us.lastFilename = self.last_filename + return msg + def uploader_fn(exit_event): params = Params() dongle_id = params.get("DongleId", encoding='utf8') @@ -202,6 +247,7 @@ def uploader_fn(exit_event): cloudlog.warning("NVME not mounted") sm = messaging.SubMaster(['deviceState']) + pm = messaging.PubMaster(['uploaderState']) uploader = Uploader(dongle_id, ROOT) backoff = 0.1 @@ -233,6 +279,8 @@ def uploader_fn(exit_event): cloudlog.info("upload backoff %r", backoff) time.sleep(backoff + random.uniform(0, backoff)) backoff = min(backoff*2, 120) + + pm.send("uploaderState", uploader.get_msg()) cloudlog.info("upload done, success=%r", success) def main():