openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

285 lines
7.8 KiB

#!/usr/bin/env python
import os
import time
import stat
import json
import random
import ctypes
import inspect
import requests
import traceback
import threading
import subprocess
from collections import Counter
from selfdrive.swaglog import cloudlog
from selfdrive.loggerd.config import ROOT
from common.params import Params
from common.api import api_get
fake_upload = os.getenv("FAKEUPLOAD") is not None
def raise_on_thread(t, exctype):
for ctid, tobj in threading._active.items():
if tobj is t:
tid = ctid
break
else:
raise Exception("Could not find thread")
'''Raises an exception in the threads with id tid'''
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# "if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
def listdir_with_creation_date(d):
lst = os.listdir(d)
for fn in lst:
try:
st = os.stat(os.path.join(d, fn))
ctime = st[stat.ST_CTIME]
yield (ctime, fn)
except OSError:
cloudlog.exception("listdir_with_creation_date: stat failed?")
yield (None, fn)
def listdir_by_creation_date(d):
times_and_paths = list(listdir_with_creation_date(d))
return [path for _, path in sorted(times_and_paths)]
def clear_locks(root):
for logname in os.listdir(root):
path = os.path.join(root, logname)
try:
for fname in os.listdir(path):
if fname.endswith(".lock"):
os.unlink(os.path.join(path, fname))
except OSError:
cloudlog.exception("clear_locks failed")
def is_on_wifi():
# ConnectivityManager.getActiveNetworkInfo()
result = subprocess.check_output(["service", "call", "connectivity", "2"]).strip().split("\n")
data = ''.join(''.join(w.decode("hex")[::-1] for w in l[14:49].split()) for l in result[1:])
return "\x00".join("WIFI") in data
class Uploader(object):
def __init__(self, dongle_id, access_token, root):
self.dongle_id = dongle_id
self.access_token = access_token
self.root = root
self.upload_thread = None
self.last_resp = None
self.last_exc = None
def clean_dirs(self):
try:
for logname in os.listdir(self.root):
path = os.path.join(self.root, logname)
# remove empty directories
if not os.listdir(path):
os.rmdir(path)
except OSError:
cloudlog.exception("clean_dirs failed")
def gen_upload_files(self):
if not os.path.isdir(self.root):
return
for logname in listdir_by_creation_date(self.root):
path = os.path.join(self.root, logname)
names = os.listdir(path)
if any(name.endswith(".lock") for name in names):
continue
for name in names:
key = os.path.join(logname, name)
fn = os.path.join(path, name)
yield (name, key, fn)
def get_data_stats(self):
name_counts = Counter()
total_size = 0
for name, key, fn in self.gen_upload_files():
name_counts[name] += 1
total_size += os.stat(fn).st_size
return dict(name_counts), total_size
def next_file_to_upload(self, with_video):
# try to upload log files first
for name, key, fn in self.gen_upload_files():
if name in ["rlog", "rlog.bz2"]:
return (key, fn, 0)
if with_video:
# then upload compressed camera file
for name, key, fn in self.gen_upload_files():
if name in ["fcamera.hevc"]:
return (key, fn, 1)
# then upload other files
for name, key, fn in self.gen_upload_files():
if not name.endswith('.lock') and not name.endswith(".tmp"):
return (key, fn, 1)
return None
def do_upload(self, key, fn):
try:
url_resp = api_get("v1.1/"+self.dongle_id+"/upload_url/", timeout=2, path=key, access_token=self.access_token)
url_resp_json = json.loads(url_resp.text)
url = url_resp_json['url']
headers = url_resp_json['headers']
cloudlog.info("upload_url v1.1 %s %s", url, str(headers))
if fake_upload:
cloudlog.info("*** WARNING, THIS IS A FAKE UPLOAD TO %s ***" % url)
class FakeResponse(object):
def __init__(self):
self.status_code = 200
self.last_resp = FakeResponse()
else:
with open(fn, "rb") as f:
self.last_resp = requests.put(url, data=f, headers=headers)
except Exception as e:
self.last_exc = (e, traceback.format_exc())
raise
def normal_upload(self, key, fn):
self.last_resp = None
self.last_exc = None
try:
self.do_upload(key, fn)
except Exception:
pass
return self.last_resp
def killable_upload(self, key, fn):
self.last_resp = None
self.last_exc = None
self.upload_thread = threading.Thread(target=lambda: self.do_upload(key, fn))
self.upload_thread.start()
self.upload_thread.join()
self.upload_thread = None
return self.last_resp
def abort_upload(self):
thread = self.upload_thread
if thread is None:
return
if not thread.is_alive():
return
raise_on_thread(thread, SystemExit)
thread.join()
def upload(self, key, fn):
# write out the bz2 compress
if fn.endswith("log"):
ext = ".bz2"
cloudlog.info("compressing %r to %r", fn, fn+ext)
if os.system("nice -n 19 bzip2 -c %s > %s.tmp && mv %s.tmp %s%s && rm %s" % (fn, fn, fn, fn, ext, fn)) != 0:
cloudlog.exception("upload: bzip2 compression failed")
return False
# assuming file is named properly
key += ext
fn += ext
try:
sz = os.path.getsize(fn)
except OSError:
cloudlog.exception("upload: getsize failed")
return False
cloudlog.event("upload", key=key, fn=fn, sz=sz)
cloudlog.info("checking %r with size %r", key, sz)
if sz == 0:
# can't upload files of 0 size
os.unlink(fn) # delete the file
success = True
else:
cloudlog.info("uploading %r", fn)
# stat = self.killable_upload(key, fn)
stat = self.normal_upload(key, fn)
if stat is not None and stat.status_code in (200, 201):
cloudlog.event("upload_success", key=key, fn=fn, sz=sz)
os.unlink(fn) # delete the file
success = True
else:
cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz)
success = False
self.clean_dirs()
return success
def uploader_fn(exit_event):
cloudlog.info("uploader_fn")
params = Params()
dongle_id, access_token = params.get("DongleId"), params.get("AccessToken")
if dongle_id is None or access_token is None:
cloudlog.info("uploader MISSING DONGLE_ID or ACCESS_TOKEN")
raise Exception("uploader can't start without dongle id and access token")
uploader = Uploader(dongle_id, access_token, ROOT)
while True:
upload_video = (params.get("IsUploadVideoOverCellularEnabled") != "0") or is_on_wifi()
backoff = 0.1
while True:
if exit_event.is_set():
return
d = uploader.next_file_to_upload(upload_video)
if d is None:
break
key, fn, _ = d
cloudlog.info("to upload %r", d)
success = uploader.upload(key, fn)
if success:
backoff = 0.1
else:
cloudlog.info("backoff %r", backoff)
time.sleep(backoff + random.uniform(0, backoff))
backoff = min(backoff*2, 120)
cloudlog.info("upload done, success=%r", success)
time.sleep(5)
def main(gctx=None):
uploader_fn(threading.Event())
if __name__ == "__main__":
main()