athena: add prefix param to listDataDirectory (#21906)

* athena: add prefix param to listDataDirectory

* only traverse directories that match prefix

* tests
old-commit-hash: 2dff84121e
commatwo_master
Greg Hogan 4 years ago committed by GitHub
parent dad760624a
commit 9357587ace
  1. 24
      selfdrive/athena/athenad.py
  2. 79
      selfdrive/athena/tests/test_athenad.py

@ -192,11 +192,29 @@ def setNavDestination(latitude=0, longitude=0):
return {"success": 1}
@dispatcher.add_method
def listDataDirectory():
files = [os.path.relpath(os.path.join(dp, f), ROOT) for dp, dn, fn in os.walk(ROOT) for f in fn]
def scan_dir(path, prefix):
files = list()
# only walk directories that match the prefix
# (glob and friends traverse entire dir tree)
with os.scandir(path) as i:
for e in i:
rel_path = os.path.relpath(e.path, ROOT)
if e.is_dir(follow_symlinks=False):
# add trailing slash
rel_path = os.path.join(rel_path, '')
# if prefix is a partial dir name, current dir will start with prefix
# if prefix is a partial file name, prefix with start with dir name
if rel_path.startswith(prefix) or prefix.startswith(rel_path):
files.extend(scan_dir(e.path, prefix))
else:
if rel_path.startswith(prefix):
files.append(rel_path)
return files
@dispatcher.add_method
def listDataDirectory(prefix=''):
return scan_dir(ROOT, prefix)
@dispatcher.add_method
def reboot():

@ -2,6 +2,7 @@
import json
import os
import requests
import shutil
import tempfile
import time
import threading
@ -34,6 +35,13 @@ class TestAthenadMethods(unittest.TestCase):
athenad.upload_queue = queue.Queue()
athenad.cur_upload_items.clear()
for i in os.listdir(athenad.ROOT):
p = os.path.join(athenad.ROOT, i)
if os.path.isdir(p):
shutil.rmtree(p)
else:
os.unlink(p)
def wait_for_upload(self):
now = time.time()
while time.time() - now < 5:
@ -67,23 +75,58 @@ class TestAthenadMethods(unittest.TestCase):
p.terminate()
def test_listDataDirectory(self):
print(dispatcher["listDataDirectory"]())
route = '2021-03-29--13-32-47'
segments = [0, 1, 2, 3, 11]
filenames = ['qlog.bz2', 'qcamera.ts', 'rlog.bz2', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
files = [f'{route}--{s}/{f}' for s in segments for f in filenames]
for file in files:
fn = os.path.join(athenad.ROOT, file)
os.makedirs(os.path.dirname(fn), exist_ok=True)
Path(fn).touch()
resp = dispatcher["listDataDirectory"]()
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, files)
resp = dispatcher["listDataDirectory"](f'{route}--123')
self.assertCountEqual(resp, [])
prefix = f'{route}'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
prefix = f'{route}--1'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
prefix = f'{route}--1/'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
prefix = f'{route}--1/q'
expected = filter(lambda f: f.startswith(prefix), files)
resp = dispatcher["listDataDirectory"](prefix)
self.assertTrue(resp, 'list empty!')
self.assertCountEqual(resp, expected)
@with_http_server
def test_do_upload(self, host):
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch()
try:
item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
with self.assertRaises(requests.exceptions.ConnectionError):
athenad._do_upload(item)
item = athenad.UploadItem(path=fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
with self.assertRaises(requests.exceptions.ConnectionError):
athenad._do_upload(item)
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item)
self.assertEqual(resp.status_code, 201)
finally:
os.unlink(fn)
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item)
self.assertEqual(resp.status_code, 201)
@with_http_server
def test_uploadFileToUrl(self, host):
@ -93,14 +136,11 @@ class TestAthenadMethods(unittest.TestCase):
fn = os.path.join(athenad.ROOT, 'qlog.bz2')
Path(fn).touch()
try:
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
self.assertEqual(resp['enqueued'], 1)
self.assertDictContainsSubset({"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}, resp['item'])
self.assertIsNotNone(resp['item'].get('id'))
self.assertEqual(athenad.upload_queue.qsize(), 1)
finally:
os.unlink(fn)
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
self.assertEqual(resp['enqueued'], 1)
self.assertDictContainsSubset({"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}, resp['item'])
self.assertIsNotNone(resp['item'].get('id'))
self.assertEqual(athenad.upload_queue.qsize(), 1)
@with_http_server
def test_upload_handler(self, host):
@ -121,7 +161,6 @@ class TestAthenadMethods(unittest.TestCase):
self.assertEqual(athenad.upload_queue.qsize(), 0)
finally:
end_event.set()
os.unlink(fn)
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
@ -152,7 +191,6 @@ class TestAthenadMethods(unittest.TestCase):
finally:
end_event.set()
os.unlink(fn)
def test_cancelUpload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id')
@ -197,7 +235,6 @@ class TestAthenadMethods(unittest.TestCase):
finally:
end_event.set()
os.unlink(fn)
def test_listUploadQueue(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='id')

Loading…
Cancel
Save