more helpers for e2e testing updated (#32128)

casync test prereq
pull/32064/head^2
Justin Newberry 1 year ago committed by GitHub
parent 1788ecf7b1
commit 3c9bf992a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 12
      selfdrive/test/helpers.py
  2. 56
      selfdrive/updated/tests/test_base.py
  3. 4
      selfdrive/updated/tests/test_git.py
  4. 16
      system/updated/common.py

@ -118,6 +118,18 @@ def with_http_server(func, handler=http.server.BaseHTTPRequestHandler, setup=Non
def DirectoryHttpServer(directory) -> type[http.server.SimpleHTTPRequestHandler]:
# creates an http server that serves files from directory
class Handler(http.server.SimpleHTTPRequestHandler):
API_NO_RESPONSE = False
API_BAD_RESPONSE = False
def do_GET(self):
if self.API_NO_RESPONSE:
return
if self.API_BAD_RESPONSE:
self.send_response(500, "")
return
super().do_GET()
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(directory), **kwargs)

@ -15,10 +15,11 @@ from openpilot.selfdrive.manager.process import ManagerProcess
from openpilot.selfdrive.test.helpers import processes_context
from openpilot.common.params import Params
from openpilot.system.updated.common import get_consistent_flag
def run(args, **kwargs):
return subprocess.run(args, **kwargs, check=True)
return subprocess.check_output(args, **kwargs)
def update_release(directory, name, version, agnos_version, release_notes):
@ -47,11 +48,6 @@ def get_version(path: str) -> str:
return f.read().split('"')[1]
def get_consistent_flag(path: str) -> bool:
consistent_file = pathlib.Path(os.path.join(path, ".overlay_consistent"))
return consistent_file.is_file()
@pytest.mark.slow # TODO: can we test overlayfs in GHA?
class BaseUpdateTest(unittest.TestCase):
@classmethod
@ -112,20 +108,20 @@ class BaseUpdateTest(unittest.TestCase):
except Exception:
print("cleanup failed...")
def send_check_for_updates_signal(self, updated: ManagerProcess):
updated.signal(signal.SIGUSR1.value)
def wait_for_condition(self, condition, timeout=12):
start = time.monotonic()
while True:
waited = time.monotonic() - start
if condition():
print(f"waited {waited}s for condition ")
return waited
def send_download_signal(self, updated: ManagerProcess):
updated.signal(signal.SIGHUP.value)
if waited > timeout:
raise TimeoutError("timed out waiting for condition")
def _test_params(self, branch, fetch_available, update_available):
self.assertEqual(self.params.get("UpdaterTargetBranch", encoding="utf-8"), branch)
self.assertEqual(self.params.get_bool("UpdaterFetchAvailable"), fetch_available)
self.assertEqual(self.params.get_bool("UpdateAvailable"), update_available)
time.sleep(1)
def _test_finalized_update(self, branch, version, agnos_version, release_notes):
self.assertTrue(self.params.get("UpdaterNewDescription", encoding="utf-8").startswith(f"{version} / {branch}"))
self.assertEqual(self.params.get("UpdaterNewReleaseNotes", encoding="utf-8"), f"<p>{release_notes}</p>\n")
self.assertEqual(get_version(str(self.staging_root / "finalized")), version)
self.assertEqual(get_consistent_flag(str(self.staging_root / "finalized")), True)
self.assertTrue(os.access(str(self.staging_root / "finalized" / "launch_env.sh"), os.X_OK))
@ -133,22 +129,30 @@ class BaseUpdateTest(unittest.TestCase):
with open(self.staging_root / "finalized" / "test_symlink") as f:
self.assertIn(version, f.read())
def wait_for_condition(self, condition, timeout=12):
start = time.monotonic()
while True:
waited = time.monotonic() - start
if condition():
print(f"waited {waited}s for condition ")
return waited
class ParamsBaseUpdateTest(BaseUpdateTest):
def _test_finalized_update(self, branch, version, agnos_version, release_notes):
self.assertTrue(self.params.get("UpdaterNewDescription", encoding="utf-8").startswith(f"{version} / {branch}"))
self.assertEqual(self.params.get("UpdaterNewReleaseNotes", encoding="utf-8"), f"<p>{release_notes}</p>\n")
super()._test_finalized_update(branch, version, agnos_version, release_notes)
if waited > timeout:
raise TimeoutError("timed out waiting for condition")
def send_check_for_updates_signal(self, updated: ManagerProcess):
updated.signal(signal.SIGUSR1.value)
time.sleep(1)
def send_download_signal(self, updated: ManagerProcess):
updated.signal(signal.SIGHUP.value)
def _test_params(self, branch, fetch_available, update_available):
self.assertEqual(self.params.get("UpdaterTargetBranch", encoding="utf-8"), branch)
self.assertEqual(self.params.get_bool("UpdaterFetchAvailable"), fetch_available)
self.assertEqual(self.params.get_bool("UpdateAvailable"), update_available)
def wait_for_idle(self):
self.wait_for_condition(lambda: self.params.get("UpdaterState", encoding="utf-8") == "idle")
def wait_for_failed(self):
self.wait_for_condition(lambda: self.params.get("UpdateFailedCount", encoding="utf-8") is not None and \
int(self.params.get("UpdateFailedCount", encoding="utf-8")) > 0)
def wait_for_fetch_available(self):
self.wait_for_condition(lambda: self.params.get_bool("UpdaterFetchAvailable"))

@ -1,8 +1,8 @@
import contextlib
from openpilot.selfdrive.updated.tests.test_base import BaseUpdateTest, run, update_release
from openpilot.selfdrive.updated.tests.test_base import ParamsBaseUpdateTest, run, update_release
class TestUpdateDGitStrategy(BaseUpdateTest):
class TestUpdateDGitStrategy(ParamsBaseUpdateTest):
def update_remote_release(self, release):
update_release(self.remote_dir, release, *self.MOCK_RELEASES[release])
run(["git", "add", "."], cwd=self.remote_dir)

@ -0,0 +1,16 @@
import os
import pathlib
def get_consistent_flag(path: str) -> bool:
consistent_file = pathlib.Path(os.path.join(path, ".overlay_consistent"))
return consistent_file.is_file()
def set_consistent_flag(path: str, consistent: bool) -> None:
os.sync()
consistent_file = pathlib.Path(os.path.join(path, ".overlay_consistent"))
if consistent:
consistent_file.touch()
elif not consistent:
consistent_file.unlink(missing_ok=True)
os.sync()
Loading…
Cancel
Save