Tools: Storage API (#1161)
* filereader
* support URLs in filereader, logreader
* unused
* use route files api; add auth file
* Implement browser auth
* Update readme, fix up cache paths
* Add tests, clear token on 401
* Factor out URLFile
* space
old-commit-hash: c4af05868b
commatwo_master
parent
fa287f6a8c
commit
738c3cca79
12 changed files with 336 additions and 203 deletions
@ -0,0 +1,106 @@ |
||||
import os |
||||
import sys |
||||
import time |
||||
import tempfile |
||||
import threading |
||||
import urllib.parse |
||||
import pycurl |
||||
import hashlib |
||||
from io import BytesIO |
||||
from tenacity import retry, wait_random_exponential, stop_after_attempt |
||||
from common.file_helpers import mkdirs_exists_ok, atomic_write_in_dir |
||||
|
||||
class URLFile(object): |
||||
_tlocal = threading.local() |
||||
|
||||
def __init__(self, url, debug=False): |
||||
self._url = url |
||||
self._pos = 0 |
||||
self._local_file = None |
||||
self._debug = debug |
||||
|
||||
try: |
||||
self._curl = self._tlocal.curl |
||||
except AttributeError: |
||||
self._curl = self._tlocal.curl = pycurl.Curl() |
||||
|
||||
def __enter__(self): |
||||
return self |
||||
|
||||
def __exit__(self, type, value, traceback): |
||||
if self._local_file is not None: |
||||
os.remove(self._local_file.name) |
||||
self._local_file.close() |
||||
self._local_file = None |
||||
|
||||
@retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True) |
||||
def read(self, ll=None): |
||||
if ll is None: |
||||
trange = 'bytes=%d-' % self._pos |
||||
else: |
||||
trange = 'bytes=%d-%d' % (self._pos, self._pos+ll-1) |
||||
|
||||
dats = BytesIO() |
||||
c = self._curl |
||||
c.setopt(pycurl.URL, self._url) |
||||
c.setopt(pycurl.WRITEDATA, dats) |
||||
c.setopt(pycurl.NOSIGNAL, 1) |
||||
c.setopt(pycurl.TIMEOUT_MS, 500000) |
||||
c.setopt(pycurl.HTTPHEADER, ["Range: " + trange, "Connection: keep-alive"]) |
||||
c.setopt(pycurl.FOLLOWLOCATION, True) |
||||
|
||||
if self._debug: |
||||
print("downloading", self._url) |
||||
def header(x): |
||||
if b'MISS' in x: |
||||
print(x.strip()) |
||||
c.setopt(pycurl.HEADERFUNCTION, header) |
||||
def test(debug_type, debug_msg): |
||||
print(" debug(%d): %s" % (debug_type, debug_msg.strip())) |
||||
c.setopt(pycurl.VERBOSE, 1) |
||||
c.setopt(pycurl.DEBUGFUNCTION, test) |
||||
t1 = time.time() |
||||
|
||||
c.perform() |
||||
|
||||
if self._debug: |
||||
t2 = time.time() |
||||
if t2-t1 > 0.1: |
||||
print("get %s %r %.f slow" % (self._url, trange, t2-t1)) |
||||
|
||||
response_code = c.getinfo(pycurl.RESPONSE_CODE) |
||||
if response_code == 416: # Requested Range Not Satisfiable |
||||
return "" |
||||
if response_code != 206 and response_code != 200: |
||||
raise Exception("Error {}: {}".format(response_code, repr(dats.getvalue())[:500])) |
||||
|
||||
ret = dats.getvalue() |
||||
self._pos += len(ret) |
||||
return ret |
||||
|
||||
def seek(self, pos): |
||||
self._pos = pos |
||||
|
||||
@property |
||||
def name(self): |
||||
"""Returns a local path to file with the URLFile's contents. |
||||
|
||||
This can be used to interface with modules that require local files. |
||||
""" |
||||
if self._local_file is None: |
||||
_, ext = os.path.splitext(urllib.parse.urlparse(self._url).path) |
||||
local_fd, local_path = tempfile.mkstemp(suffix=ext) |
||||
try: |
||||
os.write(local_fd, self.read()) |
||||
local_file = open(local_path, "rb") |
||||
except: |
||||
os.remove(local_path) |
||||
raise |
||||
finally: |
||||
os.close(local_fd) |
||||
|
||||
self._local_file = local_file |
||||
self.read = self._local_file.read |
||||
self.seek = self._local_file.seek |
||||
|
||||
return self._local_file.name |
@ -0,0 +1,37 @@ |
||||
import sys |
||||
import os |
||||
import requests |
||||
from tools.lib.auth_config import clear_token |
||||
API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com') |
||||
|
||||
class CommaApi(): |
||||
def __init__(self, token=None): |
||||
self.session = requests.Session() |
||||
self.session.headers['User-agent'] = 'OpenpilotTools' |
||||
if token: |
||||
self.session.headers['Authorization'] = 'JWT ' + token |
||||
|
||||
def request(self, method, endpoint, **kwargs): |
||||
resp = self.session.request(method, API_HOST + '/' + endpoint, **kwargs) |
||||
resp_json = resp.json() |
||||
if isinstance(resp_json, dict) and resp_json.get('error'): |
||||
if resp.status_code == 401: |
||||
clear_token() |
||||
raise UnauthorizedError('Unauthorized. Authenticate with tools/lib/auth.py') |
||||
|
||||
e = APIError(str(resp.status_code) + ":" + resp_json.get('description', str(resp_json['error']))) |
||||
e.status_code = resp.status_code |
||||
raise e |
||||
return resp_json |
||||
|
||||
def get(self, endpoint, **kwargs): |
||||
return self.request('GET', endpoint, **kwargs) |
||||
|
||||
def post(self, endpoint, **kwargs): |
||||
return self.request('POST', endpoint, **kwargs) |
||||
|
||||
class APIError(Exception): |
||||
pass |
||||
|
||||
class UnauthorizedError(Exception): |
||||
pass |
@ -0,0 +1,73 @@ |
||||
#!/usr/bin/env python3 |
||||
|
||||
import json |
||||
import os |
||||
import sys |
||||
import webbrowser |
||||
from http.server import HTTPServer, BaseHTTPRequestHandler |
||||
from urllib.parse import urlencode, parse_qs |
||||
from common.file_helpers import mkdirs_exists_ok |
||||
from tools.lib.api import CommaApi, APIError |
||||
from tools.lib.auth_config import set_token |
||||
|
||||
class ClientRedirectServer(HTTPServer): |
||||
query_params = {} |
||||
|
||||
class ClientRedirectHandler(BaseHTTPRequestHandler): |
||||
def do_GET(self): |
||||
if not self.path.startswith('/auth_redirect'): |
||||
self.send_response(204) |
||||
return |
||||
|
||||
query = self.path.split('?', 1)[-1] |
||||
query = parse_qs(query, keep_blank_values=True) |
||||
self.server.query_params = query |
||||
|
||||
self.send_response(200) |
||||
self.send_header('Content-type', 'text/plain') |
||||
self.end_headers() |
||||
self.wfile.write(b'Return to the CLI to continue') |
||||
|
||||
def log_message(self, format, *args): |
||||
pass # this prevent http server from dumping messages to stdout |
||||
|
||||
def auth_redirect_link(port): |
||||
redirect_uri = f'http://localhost:{port}/auth_redirect' |
||||
params = { |
||||
'type': 'web_server', |
||||
'client_id': '45471411055-ornt4svd2miog6dnopve7qtmh5mnu6id.apps.googleusercontent.com', |
||||
'redirect_uri': redirect_uri, |
||||
'response_type': 'code', |
||||
'scope': 'https://www.googleapis.com/auth/userinfo.email', |
||||
'prompt': 'select_account', |
||||
} |
||||
|
||||
return (redirect_uri, 'https://accounts.google.com/o/oauth2/auth?' + urlencode(params)) |
||||
|
||||
def login(): |
||||
port = 9090 |
||||
redirect_uri, oauth_uri = auth_redirect_link(port) |
||||
|
||||
web_server = ClientRedirectServer(('localhost', port), ClientRedirectHandler) |
||||
webbrowser.open(oauth_uri, new=2) |
||||
|
||||
while True: |
||||
web_server.handle_request() |
||||
if 'code' in web_server.query_params: |
||||
code = web_server.query_params['code'] |
||||
break |
||||
elif 'error' in web_server.query_params: |
||||
print('Authentication Error: "%s". Description: "%s" ' % ( |
||||
web_server.query_params['error'], |
||||
web_server.query_params.get('error_description')), file=sys.stderr) |
||||
break |
||||
|
||||
try: |
||||
auth_resp = CommaApi().post('v2/auth/', data={'code': code, 'redirect_uri': redirect_uri}) |
||||
set_token(auth_resp['access_token']) |
||||
print('Authenticated') |
||||
except APIError as e: |
||||
print(f'Authentication Error: {e}', file=sys.stderr) |
||||
|
||||
if __name__ == '__main__': |
||||
login() |
@ -0,0 +1,24 @@ |
||||
import json |
||||
import os |
||||
from common.file_helpers import mkdirs_exists_ok |
||||
|
||||
class MissingAuthConfigError(Exception): |
||||
pass |
||||
|
||||
CONFIG_DIR = os.path.expanduser('~/.comma') |
||||
mkdirs_exists_ok(CONFIG_DIR) |
||||
|
||||
def get_token(): |
||||
try: |
||||
with open(os.path.join(CONFIG_DIR, 'auth.json')) as f: |
||||
auth = json.load(f) |
||||
return auth['access_token'] |
||||
except: |
||||
raise MissingAuthConfigError('Authenticate with tools/lib/auth.py') |
||||
|
||||
def set_token(token): |
||||
with open(os.path.join(CONFIG_DIR, 'auth.json'), 'w') as f: |
||||
json.dump({'access_token': token}, f) |
||||
|
||||
def clear_token(): |
||||
os.unlink(os.path.join(CONFIG_DIR, 'auth.json')) |
@ -1,3 +1,7 @@ |
||||
def FileReader(fn): |
||||
return open(fn, 'rb') |
||||
from common.url_file import URLFile |
||||
|
||||
def FileReader(fn, debug=False): |
||||
if fn.startswith("http://") or fn.startswith("https://"): |
||||
return URLFile(fn, debug=debug) |
||||
else: |
||||
return open(fn, "rb") |
||||
|
@ -1,111 +0,0 @@ |
||||
from cereal import log as capnp_log |
||||
|
||||
def write_can_to_msg(data, src, msg): |
||||
if not isinstance(data[0], Sequence): |
||||
data = [data] |
||||
|
||||
can_msgs = msg.init('can', len(data)) |
||||
for i, d in enumerate(data): |
||||
if d[0] < 0: continue # ios bug |
||||
cc = can_msgs[i] |
||||
cc.address = d[0] |
||||
cc.busTime = 0 |
||||
cc.dat = hex_to_str(d[2]) |
||||
if len(d) == 4: |
||||
cc.src = d[3] |
||||
cc.busTime = d[1] |
||||
else: |
||||
cc.src = src |
||||
|
||||
def convert_old_pkt_to_new(old_pkt): |
||||
m, d = old_pkt |
||||
msg = capnp_log.Event.new_message() |
||||
|
||||
if len(m) == 3: |
||||
_, pid, t = m |
||||
msg.logMonoTime = t |
||||
else: |
||||
t, pid = m |
||||
msg.logMonoTime = int(t * 1e9) |
||||
|
||||
last_velodyne_time = None |
||||
|
||||
if pid == PID_OBD: |
||||
write_can_to_msg(d, 0, msg) |
||||
elif pid == PID_CAM: |
||||
frame = msg.init('frame') |
||||
frame.frameId = d[0] |
||||
frame.timestampEof = msg.logMonoTime |
||||
# iOS |
||||
elif pid == PID_IGPS: |
||||
loc = msg.init('gpsLocation') |
||||
loc.latitude = d[0] |
||||
loc.longitude = d[1] |
||||
loc.speed = d[2] |
||||
loc.timestamp = int(m[0]*1000.0) # on iOS, first number is wall time in seconds |
||||
loc.flags = 1 | 4 # has latitude, longitude, and speed. |
||||
elif pid == PID_IMOTION: |
||||
user_acceleration = d[:3] |
||||
gravity = d[3:6] |
||||
|
||||
# iOS separates gravity from linear acceleration, so we recombine them. |
||||
# Apple appears to use this constant for the conversion. |
||||
g = -9.8 |
||||
acceleration = [g*(a + b) for a, b in zip(user_acceleration, gravity)] |
||||
|
||||
accel_event = msg.init('sensorEvents', 1)[0] |
||||
accel_event.acceleration.v = acceleration |
||||
# android |
||||
elif pid == PID_GPS: |
||||
if len(d) <= 6 or d[-1] == "gps": |
||||
loc = msg.init('gpsLocation') |
||||
loc.latitude = d[0] |
||||
loc.longitude = d[1] |
||||
loc.speed = d[2] |
||||
if len(d) > 6: |
||||
loc.timestamp = d[6] |
||||
loc.flags = 1 | 4 # has latitude, longitude, and speed. |
||||
elif pid == PID_ACCEL: |
||||
val = d[2] if type(d[2]) != type(0.0) else d |
||||
accel_event = msg.init('sensorEvents', 1)[0] |
||||
accel_event.acceleration.v = val |
||||
elif pid == PID_GYRO: |
||||
val = d[2] if type(d[2]) != type(0.0) else d |
||||
gyro_event = msg.init('sensorEvents', 1)[0] |
||||
gyro_event.init('gyro').v = val |
||||
elif pid == PID_LIDAR: |
||||
lid = msg.init('lidarPts') |
||||
lid.idx = d[3] |
||||
elif pid == PID_APPLANIX: |
||||
loc = msg.init('liveLocation') |
||||
loc.status = d[18] |
||||
|
||||
loc.lat, loc.lon, loc.alt = d[0:3] |
||||
loc.vNED = d[3:6] |
||||
|
||||
loc.roll = d[6] |
||||
loc.pitch = d[7] |
||||
loc.heading = d[8] |
||||
|
||||
loc.wanderAngle = d[9] |
||||
loc.trackAngle = d[10] |
||||
|
||||
loc.speed = d[11] |
||||
|
||||
loc.gyro = d[12:15] |
||||
loc.accel = d[15:18] |
||||
elif pid == PID_IBAROMETER: |
||||
pressure_event = msg.init('sensorEvents', 1)[0] |
||||
_, pressure = d[0:2] |
||||
pressure_event.init('pressure').v = [pressure] # Kilopascals |
||||
elif pid == PID_IINIT and len(d) == 4: |
||||
init_event = msg.init('initData') |
||||
init_event.deviceType = capnp_log.InitData.DeviceType.chffrIos |
||||
|
||||
build_info = init_event.init('iosBuildInfo') |
||||
build_info.appVersion = d[0] |
||||
build_info.appBuild = int(d[1]) |
||||
build_info.osVersion = d[2] |
||||
build_info.deviceModel = d[3] |
||||
|
||||
return msg.as_reader() |
Loading…
Reference in new issue