Updater tests (#1974)
* refactor exit handling
* test update
* more reliable?
* better
* init git in CI
* testy tester
* CI should work
* test overlay reinit
* only one
* still need to fix loop test
* more patience
* more patience in CI
* no ping in CI
* this is cleaner
* need to run these in jenkins
* clean up
* run in jenkins
* fix test file path
* it's a git repo now
* no commit
* reinit
* remove duplicate
* why not git
* la
* git status
* pythonpath
* fix
* no CI fro now
* check overlay consistent
* more tests
* make more changes in the update commit
* sample
* no k
old-commit-hash: fe18a014c7
commatwo_master
parent
1ef178279f
commit
3dccc7fc2e
2 changed files with 280 additions and 27 deletions
@ -0,0 +1,256 @@ |
||||
#!/usr/bin/env python3 |
||||
import datetime |
||||
import os |
||||
import time |
||||
import tempfile |
||||
import unittest |
||||
import shutil |
||||
import signal |
||||
import subprocess |
||||
import random |
||||
|
||||
from common.basedir import BASEDIR |
||||
from common.params import Params |
||||
|
||||
|
||||
class TestUpdater(unittest.TestCase): |
||||
|
||||
def setUp(self): |
||||
self.updated_proc = None |
||||
|
||||
self.tmp_dir = tempfile.TemporaryDirectory() |
||||
org_dir = os.path.join(self.tmp_dir.name, "commaai") |
||||
|
||||
self.basedir = os.path.join(org_dir, "openpilot") |
||||
self.git_remote_dir = os.path.join(org_dir, "openpilot_remote") |
||||
self.staging_dir = os.path.join(org_dir, "safe_staging") |
||||
for d in [org_dir, self.basedir, self.git_remote_dir, self.staging_dir]: |
||||
os.mkdir(d) |
||||
|
||||
self.upper_dir = os.path.join(self.staging_dir, "upper") |
||||
self.merged_dir = os.path.join(self.staging_dir, "merged") |
||||
self.finalized_dir = os.path.join(self.staging_dir, "finalized") |
||||
|
||||
# setup local submodule remotes |
||||
submodules = subprocess.check_output("git submodule --quiet foreach 'echo $name'", |
||||
shell=True, cwd=BASEDIR, encoding='utf8').split() |
||||
for s in submodules: |
||||
sub_path = os.path.join(org_dir, s.split("_repo")[0]) |
||||
self._run(f"git clone {s} {sub_path}.git", cwd=BASEDIR) |
||||
|
||||
# setup two git repos, a remote and one we'll run updated in |
||||
self._run([ |
||||
f"git clone {BASEDIR} {self.git_remote_dir}", |
||||
f"git clone {self.git_remote_dir} {self.basedir}", |
||||
f"cd {self.basedir} && git submodule init && git submodule update", |
||||
f"cd {self.basedir} && scons -j{os.cpu_count()} cereal" |
||||
]) |
||||
|
||||
self.params = Params(db=os.path.join(self.basedir, "persist/params")) |
||||
self.params.clear_all() |
||||
os.sync() |
||||
|
||||
def tearDown(self): |
||||
try: |
||||
if self.updated_proc is not None: |
||||
self.updated_proc.terminate() |
||||
self.updated_proc.wait(30) |
||||
except Exception as e: |
||||
print(e) |
||||
self.tmp_dir.cleanup() |
||||
|
||||
|
||||
# *** test helpers *** |
||||
|
||||
|
||||
def _run(self, cmd, cwd=None): |
||||
if not isinstance(cmd, list): |
||||
cmd = (cmd,) |
||||
|
||||
for c in cmd: |
||||
subprocess.check_output(c, cwd=cwd, shell=True) |
||||
|
||||
def _get_updated_proc(self): |
||||
os.environ["PYTHONPATH"] = self.basedir |
||||
os.environ["GIT_AUTHOR_NAME"] = "testy tester" |
||||
os.environ["GIT_COMMITTER_NAME"] = "testy tester" |
||||
os.environ["GIT_AUTHOR_EMAIL"] = "testy@tester.test" |
||||
os.environ["GIT_COMMITTER_EMAIL"] = "testy@tester.test" |
||||
os.environ["UPDATER_TEST_IP"] = "localhost" |
||||
os.environ["UPDATER_LOCK_FILE"] = os.path.join(self.tmp_dir.name, "updater.lock") |
||||
os.environ["UPDATER_STAGING_ROOT"] = self.staging_dir |
||||
updated_path = os.path.join(self.basedir, "selfdrive/updated.py") |
||||
return subprocess.Popen(updated_path, env=os.environ) |
||||
|
||||
def _start_updater(self, offroad=True, nosleep=False): |
||||
self.params.put("IsOffroad", "1" if offroad else "0") |
||||
self.updated_proc = self._get_updated_proc() |
||||
if not nosleep: |
||||
time.sleep(1) |
||||
|
||||
def _update_now(self): |
||||
self.updated_proc.send_signal(signal.SIGHUP) |
||||
|
||||
# TODO: this should be implemented in params |
||||
def _read_param(self, key, timeout=1): |
||||
ret = None |
||||
start_time = time.monotonic() |
||||
while ret is None: |
||||
ret = self.params.get(key, encoding='utf8') |
||||
if time.monotonic() - start_time > timeout: |
||||
break |
||||
time.sleep(0.01) |
||||
return ret |
||||
|
||||
def _wait_for_update(self, timeout=30, clear_param=False): |
||||
if clear_param: |
||||
self.params.delete("LastUpdateTime") |
||||
|
||||
self._update_now() |
||||
t = self._read_param("LastUpdateTime", timeout=timeout) |
||||
if t is None: |
||||
raise Exception("timed out waiting for update to complate") |
||||
|
||||
def _make_commit(self): |
||||
all_dirs, all_files = [], [] |
||||
for root, dirs, files in os.walk(self.git_remote_dir): |
||||
if ".git" in root: |
||||
continue |
||||
for d in dirs: |
||||
all_dirs.append(os.path.join(root, d)) |
||||
for f in files: |
||||
all_files.append(os.path.join(root, f)) |
||||
|
||||
# make a new dir and some new files |
||||
new_dir = os.path.join(self.git_remote_dir, "this_is_a_new_dir") |
||||
os.mkdir(new_dir) |
||||
for _ in range(random.randrange(5, 30)): |
||||
for d in (new_dir, random.choice(all_dirs)): |
||||
with tempfile.NamedTemporaryFile(dir=d, delete=False) as f: |
||||
f.write(os.urandom(random.randrange(1, 1000000))) |
||||
|
||||
# modify some files |
||||
for f in random.sample(all_files, random.randrange(5, 50)): |
||||
with open(f, "w+") as ff: |
||||
txt = ff.readlines() |
||||
ff.seek(0) |
||||
for line in txt: |
||||
ff.write(line[::-1]) |
||||
|
||||
# remove some files |
||||
for f in random.sample(all_files, random.randrange(5, 50)): |
||||
os.remove(f) |
||||
|
||||
# remove some dirs |
||||
for d in random.sample(all_dirs, random.randrange(1, 10)): |
||||
shutil.rmtree(d) |
||||
|
||||
# commit the changes |
||||
self._run([ |
||||
"git add -A", |
||||
"git commit -m 'an update'", |
||||
], cwd=self.git_remote_dir) |
||||
|
||||
def _check_update_state(self, update_available): |
||||
# make sure LastUpdateTime is recent |
||||
t = self._read_param("LastUpdateTime") |
||||
last_update_time = datetime.datetime.fromisoformat(t) |
||||
td = datetime.datetime.utcnow() - last_update_time |
||||
self.assertLess(td.total_seconds(), 10) |
||||
self.params.delete("LastUpdateTime") |
||||
|
||||
# wait a bit for the rest of the params to be written |
||||
time.sleep(0.1) |
||||
|
||||
# check params |
||||
update = self._read_param("UpdateAvailable") |
||||
self.assertEqual(update == "1", update_available, f"UpdateAvailable: {repr(update)}") |
||||
self.assertEqual(self._read_param("UpdateFailedCount"), "0") |
||||
|
||||
# TODO: check that the finalized update actually matches remote |
||||
# check the .overlay_init and .overlay_consistent flags |
||||
self.assertTrue(os.path.isfile(os.path.join(self.basedir, ".overlay_init"))) |
||||
self.assertEqual(os.path.isfile(os.path.join(self.finalized_dir, ".overlay_consistent")), update_available) |
||||
|
||||
|
||||
# *** test cases *** |
||||
|
||||
|
||||
# Run updated for 100 cycles with no update |
||||
def test_no_update(self): |
||||
self._start_updater() |
||||
for _ in range(100): |
||||
self._wait_for_update(clear_param=True) |
||||
self._check_update_state(False) |
||||
|
||||
# Let the updater run with no update for a cycle, then write an update |
||||
def test_update(self): |
||||
self._start_updater() |
||||
|
||||
# run for a cycle with no update |
||||
self._wait_for_update(clear_param=True) |
||||
self._check_update_state(False) |
||||
|
||||
# write an update to our remote |
||||
self._make_commit() |
||||
|
||||
# run for a cycle to get the update |
||||
self._wait_for_update(timeout=60, clear_param=True) |
||||
self._check_update_state(True) |
||||
|
||||
# run another cycle with no update |
||||
self._wait_for_update(clear_param=True) |
||||
self._check_update_state(True) |
||||
|
||||
# Let the updater run for 10 cycles, and write an update every cycle |
||||
@unittest.skip("need to make this faster") |
||||
def test_update_loop(self): |
||||
self._start_updater() |
||||
|
||||
# run for a cycle with no update |
||||
self._wait_for_update(clear_param=True) |
||||
for _ in range(10): |
||||
time.sleep(0.5) |
||||
self._make_commit() |
||||
self._wait_for_update(timeout=90, clear_param=True) |
||||
self._check_update_state(True) |
||||
|
||||
# Test overlay re-creation after tracking a new file in basedir's git |
||||
def test_overlay_reinit(self): |
||||
self._start_updater() |
||||
|
||||
overlay_init_fn = os.path.join(self.basedir, ".overlay_init") |
||||
|
||||
# run for a cycle with no update |
||||
self._wait_for_update(clear_param=True) |
||||
self.params.delete("LastUpdateTime") |
||||
first_mtime = os.path.getmtime(overlay_init_fn) |
||||
|
||||
# touch a file in the basedir |
||||
self._run("touch new_file && git add new_file", cwd=self.basedir) |
||||
|
||||
# run another cycle, should have a new mtime |
||||
self._wait_for_update(clear_param=True) |
||||
second_mtime = os.path.getmtime(overlay_init_fn) |
||||
self.assertTrue(first_mtime != second_mtime) |
||||
|
||||
# run another cycle, mtime should be same as last cycle |
||||
self._wait_for_update(clear_param=True) |
||||
new_mtime = os.path.getmtime(overlay_init_fn) |
||||
self.assertTrue(second_mtime == new_mtime) |
||||
|
||||
# Make sure updated exits if another instance is running |
||||
def test_multiple_instances(self): |
||||
# start updated and let it run for a cycle |
||||
self._start_updater() |
||||
time.sleep(1) |
||||
self._wait_for_update(clear_param=True) |
||||
|
||||
# start another instance |
||||
second_updated = self._get_updated_proc() |
||||
ret_code = second_updated.wait(timeout=5) |
||||
self.assertTrue(ret_code is not None) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
Loading…
Reference in new issue