openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

145 lines
4.9 KiB

#!/usr/bin/env python3
import hashlib
import http.server
import json
import os
import unittest
import random
import requests
import shutil
import socketserver
import tempfile
import multiprocessing
from pathlib import Path
from selfdrive.hardware.eon.neos import RECOVERY_DEV, NEOSUPDATE_DIR, get_fn, download_file, \
verify_update_ready, download_neos_update
EON_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)))
MANIFEST = os.path.join(EON_DIR, "neos.json")
PORT = 8000
def server_thread(port):
socketserver.TCPServer.allow_reuse_address = True
httpd = socketserver.TCPServer(("", port), http.server.SimpleHTTPRequestHandler)
httpd.serve_forever()
class TestNeosUpdater(unittest.TestCase):
@classmethod
def setUpClass(cls):
# generate a fake manifest
cls.manifest = {}
for i in ('ota', 'recovery'):
with tempfile.NamedTemporaryFile(delete=False, dir=os.getcwd()) as f:
dat = os.urandom(random.randint(1000, 100000))
f.write(dat)
cls.manifest[f"{i}_url"] = f"http://localhost:{PORT}/" + os.path.relpath(f.name)
cls.manifest[F"{i}_hash"] = hashlib.sha256(dat).hexdigest()
if i == "recovery":
cls.manifest["recovery_len"] = len(dat)
with tempfile.NamedTemporaryFile(delete=False, mode='w') as f:
f.write(json.dumps(cls.manifest))
cls.fake_manifest = f.name
@classmethod
def tearDownClass(cls):
os.unlink(cls.fake_manifest)
os.unlink(os.path.basename(cls.manifest['ota_url']))
os.unlink(os.path.basename(cls.manifest['recovery_url']))
def setUp(self):
# server for update files
self.server = multiprocessing.Process(target=server_thread, args=(PORT, ))
self.server.start()
# clean up
if os.path.exists(NEOSUPDATE_DIR):
shutil.rmtree(NEOSUPDATE_DIR)
def tearDown(self):
self.server.kill()
self.server.join(1)
def _corrupt_recovery(self):
with open(RECOVERY_DEV, "wb") as f:
f.write(b'\x00'*100)
def test_manifest(self):
with open(MANIFEST) as f:
m = json.load(f)
assert m['ota_hash'] in m['ota_url']
assert m['recovery_hash'] in m['recovery_url']
assert m['recovery_len'] > 0
for url in (m['ota_url'], m['recovery_url']):
r = requests.head(m['recovery_url'])
r.raise_for_status()
self.assertEqual(r.headers['Content-Type'], "application/octet-stream")
if url == m['recovery_url']:
self.assertEqual(int(r.headers['Content-Length']), m['recovery_len'])
def test_download_hash_check(self):
os.makedirs(NEOSUPDATE_DIR, exist_ok=True)
Path(get_fn(self.manifest['ota_url'])).touch()
with self.assertRaisesRegex(Exception, "failed hash check"):
download_file(self.manifest['ota_url'], get_fn(self.manifest['ota_url']),
self.manifest['ota_hash']+'a', "system")
# should've unlinked after the failed hash check, should succeed now
download_file(self.manifest['ota_url'], get_fn(self.manifest['ota_url']),
self.manifest['ota_hash'], "system")
# TODO: needs an http server that supports Content-Range
#def test_download_resume(self):
# os.makedirs(NEOSUPDATE_DIR, exist_ok=True)
# with open(os.path.basename(self.manifest['ota_url']), "rb") as src, \
# open(get_fn(self.manifest['ota_url']), "wb") as dest:
# l = dest.write(src.read(8192))
# assert l == 8192
# download_file(self.manifest['ota_url'], get_fn(self.manifest['ota_url']),
# self.manifest['ota_hash'], "system")
def test_download_no_internet(self):
self.server.kill()
os.makedirs(NEOSUPDATE_DIR, exist_ok=True)
# fail, no internet
with self.assertRaises(requests.exceptions.ConnectionError):
download_file(self.manifest['ota_url'], get_fn(self.manifest['ota_url']),
self.manifest['ota_hash'], "system")
# already cached, ensure we don't hit the server
shutil.copyfile(os.path.basename(self.manifest['ota_url']), get_fn(self.manifest['ota_url']))
download_file(self.manifest['ota_url'], get_fn(self.manifest['ota_url']),
self.manifest['ota_hash'], "system")
def test_download_update(self):
download_neos_update(self.fake_manifest)
self.assertTrue(verify_update_ready(self.fake_manifest))
def test_verify_update(self):
# good state
download_neos_update(self.fake_manifest)
self.assertTrue(verify_update_ready(self.fake_manifest))
# corrupt recovery
self._corrupt_recovery()
self.assertFalse(verify_update_ready(self.fake_manifest))
# back to good state
download_neos_update(self.fake_manifest)
self.assertTrue(verify_update_ready(self.fake_manifest))
# corrupt ota
self._corrupt_recovery()
with open(os.path.join(NEOSUPDATE_DIR, os.path.basename(self.manifest['ota_url'])), "ab") as f:
f.write(b'\x00')
self.assertFalse(verify_update_ready(self.fake_manifest))
if __name__ == "__main__":
unittest.main()