openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
8.7 KiB

import os
import pathlib
import shutil
import signal
import stat
import subprocess
import tempfile
import time
import unittest
from unittest import mock
import pytest
from openpilot.selfdrive.manager.process import ManagerProcess
from openpilot.selfdrive.test.helpers import processes_context
from openpilot.common.params import Params
from openpilot.system.updated.common import get_consistent_flag
def run(args, **kwargs):
return subprocess.check_output(args, **kwargs)
def update_release(directory, name, version, agnos_version, release_notes):
with open(directory / "RELEASES.md", "w") as f:
f.write(release_notes)
(directory / "common").mkdir(exist_ok=True)
with open(directory / "common" / "version.h", "w") as f:
f.write(f'#define COMMA_VERSION "{version}"')
launch_env = directory / "launch_env.sh"
with open(launch_env, "w") as f:
f.write(f'export AGNOS_VERSION="{agnos_version}"')
st = os.stat(launch_env)
os.chmod(launch_env, st.st_mode | stat.S_IEXEC)
test_symlink = directory / "test_symlink"
if not os.path.exists(str(test_symlink)):
os.symlink("common/version.h", test_symlink)
def get_version(path: str) -> str:
with open(os.path.join(path, "common", "version.h")) as f:
return f.read().split('"')[1]
@pytest.mark.slow # TODO: can we test overlayfs in GHA?
class BaseUpdateTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
if "Base" in cls.__name__:
raise unittest.SkipTest
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
run(["sudo", "mount", "-t", "tmpfs", "tmpfs", self.tmpdir]) # overlayfs doesn't work inside of docker unless this is a tmpfs
self.mock_update_path = pathlib.Path(self.tmpdir)
self.params = Params()
self.basedir = self.mock_update_path / "openpilot"
self.basedir.mkdir()
self.staging_root = self.mock_update_path / "safe_staging"
self.staging_root.mkdir()
self.remote_dir = self.mock_update_path / "remote"
self.remote_dir.mkdir()
mock.patch("openpilot.common.basedir.BASEDIR", self.basedir).start()
os.environ["UPDATER_STAGING_ROOT"] = str(self.staging_root)
os.environ["UPDATER_LOCK_FILE"] = str(self.mock_update_path / "safe_staging_overlay.lock")
self.MOCK_RELEASES = {
"release3": ("0.1.2", "1.2", "0.1.2 release notes"),
"master": ("0.1.3", "1.2", "0.1.3 release notes"),
}
def set_target_branch(self, branch):
self.params.put("UpdaterTargetBranch", branch)
def setup_basedir_release(self, release):
self.params = Params()
self.set_target_branch(release)
def update_remote_release(self, release):
raise NotImplementedError("")
def setup_remote_release(self, release):
raise NotImplementedError("")
def additional_context(self):
raise NotImplementedError("")
def tearDown(self):
mock.patch.stopall()
try:
run(["sudo", "umount", "-l", str(self.staging_root / "merged")])
run(["sudo", "umount", "-l", self.tmpdir])
shutil.rmtree(self.tmpdir)
except Exception:
print("cleanup failed...")
def wait_for_condition(self, condition, timeout=12):
start = time.monotonic()
while True:
waited = time.monotonic() - start
if condition():
print(f"waited {waited}s for condition ")
return waited
if waited > timeout:
raise TimeoutError("timed out waiting for condition")
time.sleep(1)
def _test_finalized_update(self, branch, version, agnos_version, release_notes):
self.assertEqual(get_version(str(self.staging_root / "finalized")), version)
self.assertEqual(get_consistent_flag(str(self.staging_root / "finalized")), True)
self.assertTrue(os.access(str(self.staging_root / "finalized" / "launch_env.sh"), os.X_OK))
with open(self.staging_root / "finalized" / "test_symlink") as f:
self.assertIn(version, f.read())
class ParamsBaseUpdateTest(BaseUpdateTest):
def _test_finalized_update(self, branch, version, agnos_version, release_notes):
self.assertTrue(self.params.get("UpdaterNewDescription", encoding="utf-8").startswith(f"{version} / {branch}"))
self.assertEqual(self.params.get("UpdaterNewReleaseNotes", encoding="utf-8"), f"<p>{release_notes}</p>\n")
super()._test_finalized_update(branch, version, agnos_version, release_notes)
def send_check_for_updates_signal(self, updated: ManagerProcess):
updated.signal(signal.SIGUSR1.value)
def send_download_signal(self, updated: ManagerProcess):
updated.signal(signal.SIGHUP.value)
def _test_params(self, branch, fetch_available, update_available):
self.assertEqual(self.params.get("UpdaterTargetBranch", encoding="utf-8"), branch)
self.assertEqual(self.params.get_bool("UpdaterFetchAvailable"), fetch_available)
self.assertEqual(self.params.get_bool("UpdateAvailable"), update_available)
def wait_for_idle(self):
self.wait_for_condition(lambda: self.params.get("UpdaterState", encoding="utf-8") == "idle")
def wait_for_failed(self):
self.wait_for_condition(lambda: self.params.get("UpdateFailedCount", encoding="utf-8") is not None and \
int(self.params.get("UpdateFailedCount", encoding="utf-8")) > 0)
def wait_for_fetch_available(self):
self.wait_for_condition(lambda: self.params.get_bool("UpdaterFetchAvailable"))
def wait_for_update_available(self):
self.wait_for_condition(lambda: self.params.get_bool("UpdateAvailable"))
def test_no_update(self):
# Start on release3, ensure we don't fetch any updates
self.setup_remote_release("release3")
self.setup_basedir_release("release3")
with self.additional_context(), processes_context(["updated"]) as [updated]:
self._test_params("release3", False, False)
self.wait_for_idle()
self._test_params("release3", False, False)
self.send_check_for_updates_signal(updated)
self.wait_for_idle()
self._test_params("release3", False, False)
def test_new_release(self):
# Start on release3, simulate a release3 commit, ensure we fetch that update properly
self.setup_remote_release("release3")
self.setup_basedir_release("release3")
with self.additional_context(), processes_context(["updated"]) as [updated]:
self._test_params("release3", False, False)
self.wait_for_idle()
self._test_params("release3", False, False)
self.MOCK_RELEASES["release3"] = ("0.1.3", "1.2", "0.1.3 release notes")
self.update_remote_release("release3")
self.send_check_for_updates_signal(updated)
self.wait_for_fetch_available()
self._test_params("release3", True, False)
self.send_download_signal(updated)
self.wait_for_update_available()
self._test_params("release3", False, True)
self._test_finalized_update("release3", *self.MOCK_RELEASES["release3"])
def test_switch_branches(self):
# Start on release3, request to switch to master manually, ensure we switched
self.setup_remote_release("release3")
self.setup_remote_release("master")
self.setup_basedir_release("release3")
with self.additional_context(), processes_context(["updated"]) as [updated]:
self._test_params("release3", False, False)
self.wait_for_idle()
self._test_params("release3", False, False)
self.set_target_branch("master")
self.send_check_for_updates_signal(updated)
self.wait_for_fetch_available()
self._test_params("master", True, False)
self.send_download_signal(updated)
self.wait_for_update_available()
self._test_params("master", False, True)
self._test_finalized_update("master", *self.MOCK_RELEASES["master"])
def test_agnos_update(self):
# Start on release3, push an update with an agnos change
self.setup_remote_release("release3")
self.setup_basedir_release("release3")
with self.additional_context(), \
mock.patch("openpilot.system.hardware.AGNOS", "True"), \
mock.patch("openpilot.system.hardware.tici.hardware.Tici.get_os_version", "1.2"), \
mock.patch("openpilot.system.hardware.tici.agnos.get_target_slot_number"), \
mock.patch("openpilot.system.hardware.tici.agnos.flash_agnos_update"), \
processes_context(["updated"]) as [updated]:
self._test_params("release3", False, False)
self.wait_for_idle()
self._test_params("release3", False, False)
self.MOCK_RELEASES["release3"] = ("0.1.3", "1.3", "0.1.3 release notes")
self.update_remote_release("release3")
self.send_check_for_updates_signal(updated)
self.wait_for_fetch_available()
self._test_params("release3", True, False)
self.send_download_signal(updated)
self.wait_for_update_available()
self._test_params("release3", False, True)
self._test_finalized_update("release3", *self.MOCK_RELEASES["release3"])