import os import pathlib import shutil import signal import stat import subprocess import tempfile import time import unittest from unittest import mock import pytest from openpilot.selfdrive.manager.process import ManagerProcess from openpilot.selfdrive.test.helpers import processes_context from openpilot.common.params import Params def run(args, **kwargs): return subprocess.run(args, **kwargs, check=True) def update_release(directory, name, version, agnos_version, release_notes): with open(directory / "RELEASES.md", "w") as f: f.write(release_notes) (directory / "common").mkdir(exist_ok=True) with open(directory / "common" / "version.h", "w") as f: f.write(f'#define COMMA_VERSION "{version}"') launch_env = directory / "launch_env.sh" with open(launch_env, "w") as f: f.write(f'export AGNOS_VERSION="{agnos_version}"') st = os.stat(launch_env) os.chmod(launch_env, st.st_mode | stat.S_IEXEC) test_symlink = directory / "test_symlink" if not os.path.exists(str(test_symlink)): os.symlink("common/version.h", test_symlink) def get_version(path: str) -> str: with open(os.path.join(path, "common", "version.h")) as f: return f.read().split('"')[1] def get_consistent_flag(path: str) -> bool: consistent_file = pathlib.Path(os.path.join(path, ".overlay_consistent")) return consistent_file.is_file() @pytest.mark.slow # TODO: can we test overlayfs in GHA? class BaseUpdateTest(unittest.TestCase): @classmethod def setUpClass(cls): if "Base" in cls.__name__: raise unittest.SkipTest def setUp(self): self.tmpdir = tempfile.mkdtemp() run(["sudo", "mount", "-t", "tmpfs", "tmpfs", self.tmpdir]) # overlayfs doesn't work inside of docker unless this is a tmpfs self.mock_update_path = pathlib.Path(self.tmpdir) self.params = Params() self.basedir = self.mock_update_path / "openpilot" self.basedir.mkdir() self.staging_root = self.mock_update_path / "safe_staging" self.staging_root.mkdir() self.remote_dir = self.mock_update_path / "remote" self.remote_dir.mkdir() mock.patch("openpilot.common.basedir.BASEDIR", self.basedir).start() os.environ["UPDATER_STAGING_ROOT"] = str(self.staging_root) os.environ["UPDATER_LOCK_FILE"] = str(self.mock_update_path / "safe_staging_overlay.lock") self.MOCK_RELEASES = { "release3": ("0.1.2", "1.2", "0.1.2 release notes"), "master": ("0.1.3", "1.2", "0.1.3 release notes"), } def set_target_branch(self, branch): self.params.put("UpdaterTargetBranch", branch) def setup_basedir_release(self, release): self.params = Params() self.set_target_branch(release) def update_remote_release(self, release): raise NotImplementedError("") def setup_remote_release(self, release): raise NotImplementedError("") def additional_context(self): raise NotImplementedError("") def tearDown(self): mock.patch.stopall() try: run(["sudo", "umount", "-l", str(self.staging_root / "merged")]) run(["sudo", "umount", "-l", self.tmpdir]) shutil.rmtree(self.tmpdir) except Exception: print("cleanup failed...") def send_check_for_updates_signal(self, updated: ManagerProcess): updated.signal(signal.SIGUSR1.value) def send_download_signal(self, updated: ManagerProcess): updated.signal(signal.SIGHUP.value) def _test_params(self, branch, fetch_available, update_available): self.assertEqual(self.params.get("UpdaterTargetBranch", encoding="utf-8"), branch) self.assertEqual(self.params.get_bool("UpdaterFetchAvailable"), fetch_available) self.assertEqual(self.params.get_bool("UpdateAvailable"), update_available) def _test_finalized_update(self, branch, version, agnos_version, release_notes): self.assertTrue(self.params.get("UpdaterNewDescription", encoding="utf-8").startswith(f"{version} / {branch}")) self.assertEqual(self.params.get("UpdaterNewReleaseNotes", encoding="utf-8"), f"

{release_notes}

\n") self.assertEqual(get_version(str(self.staging_root / "finalized")), version) self.assertEqual(get_consistent_flag(str(self.staging_root / "finalized")), True) self.assertTrue(os.access(str(self.staging_root / "finalized" / "launch_env.sh"), os.X_OK)) with open(self.staging_root / "finalized" / "test_symlink") as f: self.assertIn(version, f.read()) def wait_for_condition(self, condition, timeout=12): start = time.monotonic() while True: waited = time.monotonic() - start if condition(): print(f"waited {waited}s for condition ") return waited if waited > timeout: raise TimeoutError("timed out waiting for condition") time.sleep(1) def wait_for_idle(self): self.wait_for_condition(lambda: self.params.get("UpdaterState", encoding="utf-8") == "idle") def wait_for_fetch_available(self): self.wait_for_condition(lambda: self.params.get_bool("UpdaterFetchAvailable")) def wait_for_update_available(self): self.wait_for_condition(lambda: self.params.get_bool("UpdateAvailable")) def test_no_update(self): # Start on release3, ensure we don't fetch any updates self.setup_remote_release("release3") self.setup_basedir_release("release3") with self.additional_context(), processes_context(["updated"]) as [updated]: self._test_params("release3", False, False) self.wait_for_idle() self._test_params("release3", False, False) self.send_check_for_updates_signal(updated) self.wait_for_idle() self._test_params("release3", False, False) def test_new_release(self): # Start on release3, simulate a release3 commit, ensure we fetch that update properly self.setup_remote_release("release3") self.setup_basedir_release("release3") with self.additional_context(), processes_context(["updated"]) as [updated]: self._test_params("release3", False, False) self.wait_for_idle() self._test_params("release3", False, False) self.MOCK_RELEASES["release3"] = ("0.1.3", "1.2", "0.1.3 release notes") self.update_remote_release("release3") self.send_check_for_updates_signal(updated) self.wait_for_fetch_available() self._test_params("release3", True, False) self.send_download_signal(updated) self.wait_for_update_available() self._test_params("release3", False, True) self._test_finalized_update("release3", *self.MOCK_RELEASES["release3"]) def test_switch_branches(self): # Start on release3, request to switch to master manually, ensure we switched self.setup_remote_release("release3") self.setup_remote_release("master") self.setup_basedir_release("release3") with self.additional_context(), processes_context(["updated"]) as [updated]: self._test_params("release3", False, False) self.wait_for_idle() self._test_params("release3", False, False) self.set_target_branch("master") self.send_check_for_updates_signal(updated) self.wait_for_fetch_available() self._test_params("master", True, False) self.send_download_signal(updated) self.wait_for_update_available() self._test_params("master", False, True) self._test_finalized_update("master", *self.MOCK_RELEASES["master"]) def test_agnos_update(self): # Start on release3, push an update with an agnos change self.setup_remote_release("release3") self.setup_basedir_release("release3") with self.additional_context(), \ mock.patch("openpilot.system.hardware.AGNOS", "True"), \ mock.patch("openpilot.system.hardware.tici.hardware.Tici.get_os_version", "1.2"), \ mock.patch("openpilot.system.hardware.tici.agnos.get_target_slot_number"), \ mock.patch("openpilot.system.hardware.tici.agnos.flash_agnos_update"), \ processes_context(["updated"]) as [updated]: self._test_params("release3", False, False) self.wait_for_idle() self._test_params("release3", False, False) self.MOCK_RELEASES["release3"] = ("0.1.3", "1.3", "0.1.3 release notes") self.update_remote_release("release3") self.send_check_for_updates_signal(updated) self.wait_for_fetch_available() self._test_params("release3", True, False) self.send_download_signal(updated) self.wait_for_update_available() self._test_params("release3", False, True) self._test_finalized_update("release3", *self.MOCK_RELEASES["release3"])