athena: add prefix param to listDataDirectory (#21906)

* athena: add prefix param to listDataDirectory

* only traverse directories that match prefix

* tests
pull/21924/head
Greg Hogan 4 years ago committed by GitHub
parent 28823917ea
commit 2dff84121e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      selfdrive/athena/athenad.py
  2. 79
      selfdrive/athena/tests/test_athenad.py

@ -192,11 +192,29 @@ def setNavDestination(latitude=0, longitude=0):
return {"success": 1} return {"success": 1}
@dispatcher.add_method def scan_dir(path, prefix):
def listDataDirectory(): files = list()
files = [os.path.relpath(os.path.join(dp, f), ROOT) for dp, dn, fn in os.walk(ROOT) for f in fn] # only walk directories that match the prefix
# (glob and friends traverse entire dir tree)
with os.scandir(path) as i:
for e in i:
rel_path = os.path.relpath(e.path, ROOT)
if e.is_dir(follow_symlinks=False):
# add trailing slash
rel_path = os.path.join(rel_path, '')
# if prefix is a partial dir name, current dir will start with prefix
# if prefix is a partial file name, prefix with start with dir name
if rel_path.startswith(prefix) or prefix.startswith(rel_path):
files.extend(scan_dir(e.path, prefix))
else:
if rel_path.startswith(prefix):
files.append(rel_path)
return files return files
@dispatcher.add_method
def listDataDirectory(prefix=''):
return scan_dir(ROOT, prefix)
@dispatcher.add_method @dispatcher.add_method
def reboot(): def reboot():

@ -2,6 +2,7 @@
import json import json
import os import os
import requests import requests
import shutil
import tempfile import tempfile
import time import time
import threading import threading
@ -34,6 +35,13 @@ class TestAthenadMethods(unittest.TestCase):
athenad.upload_queue = queue.Queue() athenad.upload_queue = queue.Queue()
athenad.cur_upload_items.clear() athenad.cur_upload_items.clear()
for i in os.listdir(athenad.ROOT):
p = os.path.join(athenad.ROOT, i)
if os.path.isdir(p):
shutil.rmtree(p)
else:
os.unlink(p)
def wait_for_upload(self): def wait_for_upload(self):
now = time.time() now = time.time()
while time.time() - now < 5: while time.time() - now < 5:
@ -67,23 +75,58 @@ class TestAthenadMethods(unittest.TestCase):
p.terminate() p.terminate()
def test_listDataDirectory(self): def test_listDataDirectory(self):
print(dispatcher["listDataDirectory"]()) route = '2021-03-29--13-32-47'
segments = [0, 1, 2, 3, 11]
filenames = ['qlog.bz2', 'qcamera.ts', 'rlog.bz2', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
files = [f'{route}--{s}/{f}' for s in segments for f in filenames]
for file in files:
fn = os.path.join(athenad.ROOT, file)
os.makedirs(os.path.dirname(fn), exist_ok=True)
Path(fn).touch()
resp = dispatcher["listDataDirectory"]()
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, files)
resp = dispatcher["listDataDirectory"](f'{route}--123')
self.assertCountEqual(resp, [])
prefix = f'{route}'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
prefix = f'{route}--1'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
prefix = f'{route}--1/'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
prefix = f'{route}--1/q'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
@with_http_server @with_http_server
def test_do_upload(self, host): def test_do_upload(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch() Path(fn).touch()
try: item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') with self.assertRaises(requests.exceptions.ConnectionError):
with self.assertRaises(requests.exceptions.ConnectionError): athenad._do_upload(item)
athenad._do_upload(item)
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item) resp = athenad._do_upload(item)
self.assertEqual(resp.status_code, 201) self.assertEqual(resp.status_code, 201)
finally:
os.unlink(fn)
@with_http_server @with_http_server
def test_uploadFileToUrl(self, host): def test_uploadFileToUrl(self, host):
@ -93,14 +136,11 @@ class TestAthenadMethods(unittest.TestCase):
fn = os.path.join(athenad.ROOT, 'qlog.bz2') fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch() Path(fn).touch()
try: resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {}) self.assertEqual(resp['enqueued'], 1)
self.assertEqual(resp['enqueued'], 1) self.assertDictContainsSubset({"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}, resp['item'])
self.assertDictContainsSubset({"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}, resp['item']) self.assertIsNotNone(resp['item'].get('id'))
self.assertIsNotNone(resp['item'].get('id')) self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertEqual(athenad.upload_queue.qsize(), 1)
finally:
os.unlink(fn)
@with_http_server @with_http_server
def test_upload_handler(self, host): def test_upload_handler(self, host):
@ -121,7 +161,6 @@ class TestAthenadMethods(unittest.TestCase):
self.assertEqual(athenad.upload_queue.qsize(), 0) self.assertEqual(athenad.upload_queue.qsize(), 0)
finally: finally:
end_event.set() end_event.set()
os.unlink(fn)
def test_upload_handler_timeout(self): def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue""" """When an upload times out or fails to connect it should be placed back in the queue"""
@ -152,7 +191,6 @@ class TestAthenadMethods(unittest.TestCase):
finally: finally:
end_event.set() end_event.set()
os.unlink(fn)
def test_cancelUpload(self): def test_cancelUpload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id')
@ -197,7 +235,6 @@ class TestAthenadMethods(unittest.TestCase):
finally: finally:
end_event.set() end_event.set()
os.unlink(fn)
def test_listUploadQueue(self): def test_listUploadQueue(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id') item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id')

Loading…
Cancel
Save