diff --git a/selfdrive/test/test_updated.py b/selfdrive/test/test_updated.py new file mode 100755 index 0000000000..08eca5bd23 --- /dev/null +++ b/selfdrive/test/test_updated.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +import datetime +import os +import time +import tempfile +import unittest +import shutil +import signal +import subprocess +import random + +from common.basedir import BASEDIR +from common.params import Params + + +class TestUpdater(unittest.TestCase): + + def setUp(self): + self.updated_proc = None + + self.tmp_dir = tempfile.TemporaryDirectory() + org_dir = os.path.join(self.tmp_dir.name, "commaai") + + self.basedir = os.path.join(org_dir, "openpilot") + self.git_remote_dir = os.path.join(org_dir, "openpilot_remote") + self.staging_dir = os.path.join(org_dir, "safe_staging") + for d in [org_dir, self.basedir, self.git_remote_dir, self.staging_dir]: + os.mkdir(d) + + self.upper_dir = os.path.join(self.staging_dir, "upper") + self.merged_dir = os.path.join(self.staging_dir, "merged") + self.finalized_dir = os.path.join(self.staging_dir, "finalized") + + # setup local submodule remotes + submodules = subprocess.check_output("git submodule --quiet foreach 'echo $name'", + shell=True, cwd=BASEDIR, encoding='utf8').split() + for s in submodules: + sub_path = os.path.join(org_dir, s.split("_repo")[0]) + self._run(f"git clone {s} {sub_path}.git", cwd=BASEDIR) + + # setup two git repos, a remote and one we'll run updated in + self._run([ + f"git clone {BASEDIR} {self.git_remote_dir}", + f"git clone {self.git_remote_dir} {self.basedir}", + f"cd {self.basedir} && git submodule init && git submodule update", + f"cd {self.basedir} && scons -j{os.cpu_count()} cereal" + ]) + + self.params = Params(db=os.path.join(self.basedir, "persist/params")) + self.params.clear_all() + os.sync() + + def tearDown(self): + try: + if self.updated_proc is not None: + self.updated_proc.terminate() + self.updated_proc.wait(30) + except Exception as e: + print(e) + self.tmp_dir.cleanup() + + + # *** test helpers *** + + + def _run(self, cmd, cwd=None): + if not isinstance(cmd, list): + cmd = (cmd,) + + for c in cmd: + subprocess.check_output(c, cwd=cwd, shell=True) + + def _get_updated_proc(self): + os.environ["PYTHONPATH"] = self.basedir + os.environ["GIT_AUTHOR_NAME"] = "testy tester" + os.environ["GIT_COMMITTER_NAME"] = "testy tester" + os.environ["GIT_AUTHOR_EMAIL"] = "testy@tester.test" + os.environ["GIT_COMMITTER_EMAIL"] = "testy@tester.test" + os.environ["UPDATER_TEST_IP"] = "localhost" + os.environ["UPDATER_LOCK_FILE"] = os.path.join(self.tmp_dir.name, "updater.lock") + os.environ["UPDATER_STAGING_ROOT"] = self.staging_dir + updated_path = os.path.join(self.basedir, "selfdrive/updated.py") + return subprocess.Popen(updated_path, env=os.environ) + + def _start_updater(self, offroad=True, nosleep=False): + self.params.put("IsOffroad", "1" if offroad else "0") + self.updated_proc = self._get_updated_proc() + if not nosleep: + time.sleep(1) + + def _update_now(self): + self.updated_proc.send_signal(signal.SIGHUP) + + # TODO: this should be implemented in params + def _read_param(self, key, timeout=1): + ret = None + start_time = time.monotonic() + while ret is None: + ret = self.params.get(key, encoding='utf8') + if time.monotonic() - start_time > timeout: + break + time.sleep(0.01) + return ret + + def _wait_for_update(self, timeout=30, clear_param=False): + if clear_param: + self.params.delete("LastUpdateTime") + + self._update_now() + t = self._read_param("LastUpdateTime", timeout=timeout) + if t is None: + raise Exception("timed out waiting for update to complate") + + def _make_commit(self): + all_dirs, all_files = [], [] + for root, dirs, files in os.walk(self.git_remote_dir): + if ".git" in root: + continue + for d in dirs: + all_dirs.append(os.path.join(root, d)) + for f in files: + all_files.append(os.path.join(root, f)) + + # make a new dir and some new files + new_dir = os.path.join(self.git_remote_dir, "this_is_a_new_dir") + os.mkdir(new_dir) + for _ in range(random.randrange(5, 30)): + for d in (new_dir, random.choice(all_dirs)): + with tempfile.NamedTemporaryFile(dir=d, delete=False) as f: + f.write(os.urandom(random.randrange(1, 1000000))) + + # modify some files + for f in random.sample(all_files, random.randrange(5, 50)): + with open(f, "w+") as ff: + txt = ff.readlines() + ff.seek(0) + for line in txt: + ff.write(line[::-1]) + + # remove some files + for f in random.sample(all_files, random.randrange(5, 50)): + os.remove(f) + + # remove some dirs + for d in random.sample(all_dirs, random.randrange(1, 10)): + shutil.rmtree(d) + + # commit the changes + self._run([ + "git add -A", + "git commit -m 'an update'", + ], cwd=self.git_remote_dir) + + def _check_update_state(self, update_available): + # make sure LastUpdateTime is recent + t = self._read_param("LastUpdateTime") + last_update_time = datetime.datetime.fromisoformat(t) + td = datetime.datetime.utcnow() - last_update_time + self.assertLess(td.total_seconds(), 10) + self.params.delete("LastUpdateTime") + + # wait a bit for the rest of the params to be written + time.sleep(0.1) + + # check params + update = self._read_param("UpdateAvailable") + self.assertEqual(update == "1", update_available, f"UpdateAvailable: {repr(update)}") + self.assertEqual(self._read_param("UpdateFailedCount"), "0") + + # TODO: check that the finalized update actually matches remote + # check the .overlay_init and .overlay_consistent flags + self.assertTrue(os.path.isfile(os.path.join(self.basedir, ".overlay_init"))) + self.assertEqual(os.path.isfile(os.path.join(self.finalized_dir, ".overlay_consistent")), update_available) + + + # *** test cases *** + + + # Run updated for 100 cycles with no update + def test_no_update(self): + self._start_updater() + for _ in range(100): + self._wait_for_update(clear_param=True) + self._check_update_state(False) + + # Let the updater run with no update for a cycle, then write an update + def test_update(self): + self._start_updater() + + # run for a cycle with no update + self._wait_for_update(clear_param=True) + self._check_update_state(False) + + # write an update to our remote + self._make_commit() + + # run for a cycle to get the update + self._wait_for_update(timeout=60, clear_param=True) + self._check_update_state(True) + + # run another cycle with no update + self._wait_for_update(clear_param=True) + self._check_update_state(True) + + # Let the updater run for 10 cycles, and write an update every cycle + @unittest.skip("need to make this faster") + def test_update_loop(self): + self._start_updater() + + # run for a cycle with no update + self._wait_for_update(clear_param=True) + for _ in range(10): + time.sleep(0.5) + self._make_commit() + self._wait_for_update(timeout=90, clear_param=True) + self._check_update_state(True) + + # Test overlay re-creation after tracking a new file in basedir's git + def test_overlay_reinit(self): + self._start_updater() + + overlay_init_fn = os.path.join(self.basedir, ".overlay_init") + + # run for a cycle with no update + self._wait_for_update(clear_param=True) + self.params.delete("LastUpdateTime") + first_mtime = os.path.getmtime(overlay_init_fn) + + # touch a file in the basedir + self._run("touch new_file && git add new_file", cwd=self.basedir) + + # run another cycle, should have a new mtime + self._wait_for_update(clear_param=True) + second_mtime = os.path.getmtime(overlay_init_fn) + self.assertTrue(first_mtime != second_mtime) + + # run another cycle, mtime should be same as last cycle + self._wait_for_update(clear_param=True) + new_mtime = os.path.getmtime(overlay_init_fn) + self.assertTrue(second_mtime == new_mtime) + + # Make sure updated exits if another instance is running + def test_multiple_instances(self): + # start updated and let it run for a cycle + self._start_updater() + time.sleep(1) + self._wait_for_update(clear_param=True) + + # start another instance + second_updated = self._get_updated_proc() + ret_code = second_updated.wait(timeout=5) + self.assertTrue(ret_code is not None) + + +if __name__ == "__main__": + unittest.main() diff --git a/selfdrive/updated.py b/selfdrive/updated.py index 21c0ec9806..59ec7d8802 100755 --- a/selfdrive/updated.py +++ b/selfdrive/updated.py @@ -28,16 +28,18 @@ import subprocess import psutil import shutil import signal -from pathlib import Path import fcntl import threading from cffi import FFI +from pathlib import Path from common.basedir import BASEDIR from common.params import Params from selfdrive.swaglog import cloudlog -STAGING_ROOT = "/data/safe_staging" +TEST_IP = os.getenv("UPDATER_TEST_IP", "8.8.8.8") +LOCK_FILE = os.getenv("UPDATER_LOCK_FILE", "/tmp/safe_staging_overlay.lock") +STAGING_ROOT = os.getenv("UPDATER_STAGING_ROOT", "/data/safe_staging") OVERLAY_UPPER = os.path.join(STAGING_ROOT, "upper") OVERLAY_METADATA = os.path.join(STAGING_ROOT, "metadata") @@ -81,20 +83,13 @@ def run(cmd, cwd=None): return subprocess.check_output(cmd, cwd=cwd, stderr=subprocess.STDOUT, encoding='utf8') -def remove_consistent_flag(): +def set_consistent_flag(consistent): os.system("sync") consistent_file = Path(os.path.join(FINALIZED, ".overlay_consistent")) - try: + if consistent: + consistent_file.touch() + elif not consistent and consistent_file.exists(): consistent_file.unlink() - except FileNotFoundError: - pass - os.system("sync") - - -def set_consistent_flag(): - consistent_file = Path(os.path.join(FINALIZED, ".overlay_consistent")) - os.system("sync") - consistent_file.touch() os.system("sync") @@ -150,7 +145,7 @@ def init_ovfs(): cloudlog.info("preparing new safe staging area") Params().put("UpdateAvailable", "0") - remove_consistent_flag() + set_consistent_flag(False) dismount_ovfs() if os.path.isdir(STAGING_ROOT): @@ -158,6 +153,7 @@ def init_ovfs(): for dirname in [STAGING_ROOT, OVERLAY_UPPER, OVERLAY_METADATA, OVERLAY_MERGED, FINALIZED]: os.mkdir(dirname, 0o755) + if not os.lstat(BASEDIR).st_dev == os.lstat(OVERLAY_MERGED).st_dev: raise RuntimeError("base and overlay merge directories are on different filesystems; not valid for overlay FS!") @@ -216,13 +212,13 @@ def attempt_update(): # Un-set the validity flag to prevent the finalized tree from being # activated later if the finalize step is interrupted - remove_consistent_flag() + set_consistent_flag(False) finalize_from_ovfs() # Make sure the validity flag lands on disk LAST, only when the local git # repo and OP install are in a consistent state. - set_consistent_flag() + set_consistent_flag(True) cloudlog.info("update successful!") else: @@ -232,8 +228,6 @@ def attempt_update(): def main(): - update_failed_count = 0 - overlay_init_done = False params = Params() if params.get("DisableUpdates") == b"1": @@ -247,7 +241,7 @@ def main(): if psutil.LINUX: p.ionice(psutil.IOPRIO_CLASS_BE, value=7) - ov_lock_fd = open('/tmp/safe_staging_overlay.lock', 'w') + ov_lock_fd = open(LOCK_FILE, 'w') try: fcntl.flock(ov_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: @@ -257,33 +251,34 @@ def main(): wait_helper = WaitTimeHelper() wait_helper.sleep(30) + update_failed_count = 0 + overlay_initialized = False while not wait_helper.shutdown: update_failed_count += 1 wait_helper.ready_event.clear() # Check for internet every 30s time_wrong = datetime.datetime.utcnow().year < 2019 - ping_failed = os.system("git ls-remote --tags --quiet") != 0 + ping_failed = os.system(f"ping -W 4 -c 1 {TEST_IP}") != 0 if ping_failed or time_wrong: wait_helper.sleep(30) continue # Attempt an update try: - # If the git directory has modifcations after we created the overlay - # we need to recreate the overlay - if overlay_init_done: + # Re-create the overlay if BASEDIR/.git has changed since we created the overlay + if overlay_initialized: overlay_init_fn = os.path.join(BASEDIR, ".overlay_init") git_dir_path = os.path.join(BASEDIR, ".git") new_files = run(["find", git_dir_path, "-newer", overlay_init_fn]) if len(new_files.splitlines()): cloudlog.info(".git directory changed, recreating overlay") - overlay_init_done = False + overlay_initialized = False - if not overlay_init_done: + if not overlay_initialized: init_ovfs() - overlay_init_done = True + overlay_initialized = True if params.get("IsOffroad") == b"1": attempt_update() @@ -298,11 +293,13 @@ def main(): output=e.output, returncode=e.returncode ) - overlay_init_done = False + overlay_initialized = False except Exception: cloudlog.exception("uncaught updated exception, shouldn't happen") params.put("UpdateFailedCount", str(update_failed_count)) + + # Wait 10 minutes between update attempts wait_helper.sleep(60*10) # We've been signaled to shut down