from collections import namedtuple import capnp import pathlib import shutil import sys import os import pywinctl import pyautogui import pickle import time from cereal import car, log from msgq.visionipc import VisionIpcServer, VisionStreamType from cereal.messaging import PubMaster, log_from_bytes, sub_sock from openpilot.common.basedir import BASEDIR from openpilot.common.params import Params from openpilot.common.prefix import OpenpilotPrefix from openpilot.common.transformations.camera import CameraConfig, DEVICE_CAMERAS from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert from openpilot.selfdrive.test.helpers import with_processes from openpilot.selfdrive.test.process_replay.migration import migrate, migrate_controlsState, migrate_carState from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.framereader import FrameReader from openpilot.tools.lib.route import Route from openpilot.tools.lib.cache import DEFAULT_CACHE_DIR UI_DELAY = 0.1 # may be slower on CI? TEST_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19" STREAMS: list[tuple[VisionStreamType, CameraConfig, bytes]] = [] OFFROAD_ALERTS = ['Offroad_StorageMissing', 'Offroad_IsTakingSnapshot'] DATA: dict[str, capnp.lib.capnp._DynamicStructBuilder] = dict.fromkeys( ["carParams", "deviceState", "pandaStates", "controlsState", "selfdriveState", "liveCalibration", "modelV2", "radarState", "driverMonitoringState", "carState", "driverStateV2", "roadCameraState", "wideRoadCameraState", "driverCameraState"], None) def setup_homescreen(click, pm: PubMaster): pass def setup_settings_device(click, pm: PubMaster): click(100, 100) def setup_settings_toggles(click, pm: PubMaster): setup_settings_device(click, pm) click(278, 650) time.sleep(UI_DELAY) def setup_settings_software(click, pm: PubMaster): setup_settings_device(click, pm) click(278, 800) time.sleep(UI_DELAY) def setup_settings_developer(click, pm: PubMaster): CP = car.CarParams() CP.experimentalLongitudinalAvailable = True Params().put("CarParamsPersistent", CP.to_bytes()) setup_settings_device(click, pm) click(278, 960) time.sleep(UI_DELAY) def setup_onroad(click, pm: PubMaster): vipc_server = VisionIpcServer("camerad") for stream_type, cam, _ in STREAMS: vipc_server.create_buffers(stream_type, 5, cam.width, cam.height) vipc_server.start_listener() uidebug_received_cnt = 0 packet_id = 0 uidebug_sock = sub_sock('uiDebug') # Condition check for uiDebug processing check_uidebug = DATA['deviceState'].deviceState.started and not DATA['carParams'].carParams.notCar # Loop until 20 'uiDebug' messages are received while uidebug_received_cnt <= 20: for service, data in DATA.items(): if data: data.clear_write_flag() pm.send(service, data) for stream_type, _, image in STREAMS: vipc_server.send(stream_type, image, packet_id, packet_id, packet_id) if check_uidebug: while uidebug_sock.receive(non_blocking=True): uidebug_received_cnt += 1 else: uidebug_received_cnt += 1 packet_id += 1 time.sleep(0.05) def setup_onroad_disengaged(click, pm: PubMaster): DATA['selfdriveState'].selfdriveState.enabled = False setup_onroad(click, pm) DATA['selfdriveState'].selfdriveState.enabled = True def setup_onroad_override(click, pm: PubMaster): DATA['selfdriveState'].selfdriveState.state = log.SelfdriveState.OpenpilotState.overriding setup_onroad(click, pm) DATA['selfdriveState'].selfdriveState.state = log.SelfdriveState.OpenpilotState.enabled def setup_onroad_wide(click, pm: PubMaster): DATA['selfdriveState'].selfdriveState.experimentalMode = True DATA["carState"].carState.vEgo = 1 setup_onroad(click, pm) def setup_onroad_sidebar(click, pm: PubMaster): setup_onroad(click, pm) click(500, 500) setup_onroad(click, pm) def setup_onroad_wide_sidebar(click, pm: PubMaster): setup_onroad_wide(click, pm) click(500, 500) setup_onroad_wide(click, pm) def setup_body(click, pm: PubMaster): DATA['carParams'].carParams.brand = "body" DATA['carParams'].carParams.notCar = True DATA['carState'].carState.charging = True DATA['carState'].carState.fuelGauge = 50.0 setup_onroad(click, pm) def setup_keyboard(click, pm: PubMaster): setup_settings_device(click, pm) click(250, 965) click(1930, 228) def setup_keyboard_uppercase(click, pm: PubMaster): setup_keyboard(click, pm) click(200, 800) def setup_driver_camera(click, pm: PubMaster): setup_settings_device(click, pm) click(1950, 435) DATA['deviceState'].deviceState.started = False setup_onroad(click, pm) DATA['deviceState'].deviceState.started = True def setup_onroad_alert(click, pm: PubMaster, text1, text2, size, status=log.SelfdriveState.AlertStatus.normal): print(f'setup onroad alert, size: {size}') state = DATA['selfdriveState'] origin_state_bytes = state.to_bytes() cs = state.selfdriveState cs.alertText1 = text1 cs.alertText2 = text2 cs.alertSize = size cs.alertStatus = status cs.alertType = "test_onroad_alert" setup_onroad(click, pm) DATA['selfdriveState'] = log_from_bytes(origin_state_bytes).as_builder() def setup_onroad_alert_small(click, pm: PubMaster): setup_onroad_alert(click, pm, 'This is a small alert message', '', log.SelfdriveState.AlertSize.small) def setup_onroad_alert_mid(click, pm: PubMaster): setup_onroad_alert(click, pm, 'Medium Alert', 'This is a medium alert message', log.SelfdriveState.AlertSize.mid) def setup_onroad_alert_full(click, pm: PubMaster): setup_onroad_alert(click, pm, 'Full Alert', 'This is a full alert message', log.SelfdriveState.AlertSize.full) def setup_offroad_alert(click, pm: PubMaster): for alert in OFFROAD_ALERTS: set_offroad_alert(alert, True) # Toggle between settings and home to refresh the offroad alert widget setup_settings_device(click, pm) click(240, 216) def setup_update_available(click, pm: PubMaster): Params().put_bool("UpdateAvailable", True) release_notes_path = os.path.join(BASEDIR, "RELEASES.md") with open(release_notes_path) as file: release_notes = file.read().split('\n\n', 1)[0] Params().put("UpdaterNewReleaseNotes", release_notes + "\n") setup_settings_device(click, pm) click(240, 216) def setup_pair_device(click, pm: PubMaster): click(1950, 435) click(1800, 900) CASES = { "homescreen": setup_homescreen, "prime": setup_homescreen, "pair_device": setup_pair_device, "settings_device": setup_settings_device, "settings_toggles": setup_settings_toggles, "settings_software": setup_settings_software, "settings_developer": setup_settings_developer, "onroad": setup_onroad, "onroad_disengaged": setup_onroad_disengaged, "onroad_override": setup_onroad_override, "onroad_sidebar": setup_onroad_sidebar, "onroad_alert_small": setup_onroad_alert_small, "onroad_alert_mid": setup_onroad_alert_mid, "onroad_alert_full": setup_onroad_alert_full, "onroad_wide": setup_onroad_wide, "onroad_wide_sidebar": setup_onroad_wide_sidebar, "driver_camera": setup_driver_camera, "body": setup_body, "offroad_alert": setup_offroad_alert, "update_available": setup_update_available, "keyboard": setup_keyboard, "keyboard_uppercase": setup_keyboard_uppercase } TEST_DIR = pathlib.Path(__file__).parent TEST_OUTPUT_DIR = TEST_DIR / "report_1" SCREENSHOTS_DIR = TEST_OUTPUT_DIR / "screenshots" class TestUI: def __init__(self): os.environ["SCALE"] = "1" sys.modules["mouseinfo"] = False def setup(self): self.pm = PubMaster(list(DATA.keys())) DATA['deviceState'].deviceState.networkType = log.DeviceState.NetworkType.wifi DATA['deviceState'].deviceState.lastAthenaPingTime = 0 for _ in range(10): self.pm.send('deviceState', DATA['deviceState']) DATA['deviceState'].clear_write_flag() time.sleep(0.05) try: self.ui = pywinctl.getWindowsWithTitle("ui")[0] except Exception as e: print(f"failed to find ui window, assuming that it's in the top left (for Xvfb) {e}") self.ui = namedtuple("bb", ["left", "top", "width", "height"])(0,0,2160,1080) def screenshot(self, name): im = pyautogui.screenshot(SCREENSHOTS_DIR / f"{name}.png", region=(self.ui.left, self.ui.top, self.ui.width, self.ui.height)) assert im.width == 2160 assert im.height == 1080 def click(self, x, y, *args, **kwargs): pyautogui.click(self.ui.left + x, self.ui.top + y, *args, **kwargs) time.sleep(UI_DELAY) # give enough time for the UI to react @with_processes(["ui"]) def test_ui(self, name, setup_case): self.setup() setup_case(self.click, self.pm) self.screenshot(name) def create_screenshots(): if TEST_OUTPUT_DIR.exists(): shutil.rmtree(TEST_OUTPUT_DIR) SCREENSHOTS_DIR.mkdir(parents=True) route = Route(TEST_ROUTE) segnum = 2 lr = LogReader(route.qlog_paths()[segnum]) DATA['carParams'] = next((event.as_builder() for event in lr if event.which() == 'carParams'), None) for event in migrate(lr, [migrate_controlsState, migrate_carState]): if event.which() in DATA: DATA[event.which()] = event.as_builder() if all(DATA.values()): break cam = DEVICE_CAMERAS[("tici", "ar0231")] frames_cache = f'{DEFAULT_CACHE_DIR}/ui_frames' if os.path.isfile(frames_cache): with open(frames_cache, 'rb') as f: frames = pickle.load(f) road_img = frames[0] wide_road_img = frames[1] driver_img = frames[2] else: with open(frames_cache, 'wb') as f: road_img = FrameReader(route.camera_paths()[segnum]).get(0, pix_fmt="nv12")[0] wide_road_img = FrameReader(route.ecamera_paths()[segnum]).get(0, pix_fmt="nv12")[0] driver_img = FrameReader(route.dcamera_paths()[segnum]).get(0, pix_fmt="nv12")[0] pickle.dump([road_img, wide_road_img, driver_img], f) STREAMS.append((VisionStreamType.VISION_STREAM_ROAD, cam.fcam, road_img.flatten().tobytes())) STREAMS.append((VisionStreamType.VISION_STREAM_WIDE_ROAD, cam.ecam, wide_road_img.flatten().tobytes())) STREAMS.append((VisionStreamType.VISION_STREAM_DRIVER, cam.dcam, driver_img.flatten().tobytes())) t = TestUI() for name, setup in CASES.items(): with OpenpilotPrefix(): params = Params() params.put("DongleId", "123456789012345") if name == 'prime': params.put('PrimeType', '1') elif name == 'pair_device': params.put('ApiCache_Device', '{"is_paired":0, "prime_type":-1}') t.test_ui(name, setup) if __name__ == "__main__": print("creating test screenshots") create_screenshots()