openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 lines
8.3 KiB

from collections import namedtuple
import capnp
import pathlib
import shutil
import sys
import jinja2
import matplotlib.pyplot as plt
import numpy as np
import os
import pywinctl
import time
from cereal import messaging, log
from msgq.visionipc import VisionIpcServer, VisionStreamType
from cereal.messaging import SubMaster, PubMaster
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.common.prefix import OpenpilotPrefix
from openpilot.common.transformations.camera import CameraConfig, DEVICE_CAMERAS
from openpilot.selfdrive.controls.lib.alertmanager import set_offroad_alert
from openpilot.selfdrive.test.helpers import with_processes
from openpilot.selfdrive.test.process_replay.migration import migrate_selfdriveState
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.framereader import FrameReader
from openpilot.tools.lib.route import Route
UI_DELAY = 0.5 # may be slower on CI?
TEST_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19"
STREAMS: list[tuple[VisionStreamType, CameraConfig, bytes]] = []
OFFROAD_ALERTS = ['Offroad_StorageMissing', 'Offroad_IsTakingSnapshot']
DATA: dict[str, capnp.lib.capnp._DynamicStructBuilder] = dict.fromkeys(
["carParams", "deviceState", "pandaStates", "controlsState", "selfdriveState",
"liveCalibration", "modelV2", "radarState", "driverMonitoringState", "carState",
"driverStateV2", "roadCameraState", "wideRoadCameraState", "driverCameraState"], None)
def setup_common(click, pm: PubMaster):
Params().put("DongleId", "123456789012345")
pm.send('deviceState', DATA['deviceState'])
def setup_homescreen(click, pm: PubMaster):
setup_common(click, pm)
def setup_settings_device(click, pm: PubMaster):
setup_common(click, pm)
click(100, 100)
def setup_onroad(click, pm: PubMaster):
setup_common(click, pm)
vipc_server = VisionIpcServer("camerad")
for stream_type, cam, _ in STREAMS:
vipc_server.create_buffers(stream_type, 5, False, cam.width, cam.height)
vipc_server.start_listener()
packet_id = 0
for _ in range(20):
for service, data in DATA.items():
if data:
data.clear_write_flag()
pm.send(service, data)
packet_id = packet_id + 1
for stream_type, _, image in STREAMS:
vipc_server.send(stream_type, image, packet_id, packet_id, packet_id)
time.sleep(0.05)
def setup_onroad_wide(click, pm: PubMaster):
DATA['selfdriveState'].selfdriveState.experimentalMode = True
DATA["carState"].carState.vEgo = 1
setup_onroad(click, pm)
def setup_onroad_sidebar(click, pm: PubMaster):
setup_onroad(click, pm)
click(500, 500)
def setup_onroad_wide_sidebar(click, pm: PubMaster):
setup_onroad_wide(click, pm)
click(500, 500)
def setup_body(click, pm: PubMaster):
DATA['carParams'].carParams.carName = "BODY"
DATA['carParams'].carParams.notCar = True
DATA['carState'].carState.charging = True
DATA['carState'].carState.fuelGauge = 50.0
setup_onroad(click, pm)
def setup_driver_camera(click, pm: PubMaster):
setup_settings_device(click, pm)
click(1950, 435)
DATA['deviceState'].deviceState.started = False
setup_onroad(click, pm)
DATA['deviceState'].deviceState.started = True
def setup_onroad_alert(click, pm: PubMaster, text1, text2, size, status=log.SelfdriveState.AlertStatus.normal):
print(f'setup onroad alert, size: {size}')
setup_onroad(click, pm)
dat = messaging.new_message('selfdriveState')
cs = dat.selfdriveState
cs.alertText1 = text1
cs.alertText2 = text2
cs.alertSize = size
cs.alertStatus = status
cs.alertType = "test_onroad_alert"
pm.send('selfdriveState', dat)
def setup_onroad_alert_small(click, pm: PubMaster):
setup_onroad_alert(click, pm, 'This is a small alert message', '', log.SelfdriveState.AlertSize.small)
def setup_onroad_alert_mid(click, pm: PubMaster):
setup_onroad_alert(click, pm, 'Medium Alert', 'This is a medium alert message', log.SelfdriveState.AlertSize.mid)
def setup_onroad_alert_full(click, pm: PubMaster):
setup_onroad_alert(click, pm, 'Full Alert', 'This is a full alert message', log.SelfdriveState.AlertSize.full)
def setup_offorad_alert(click, pm: PubMaster):
setup_common(click, pm)
for alert in OFFROAD_ALERTS:
set_offroad_alert(alert, True)
# Toggle between settings and home to refresh the offroad alert widget
setup_settings_device(click, pm)
click(240, 216)
def setup_update_available(click, pm: PubMaster):
setup_common(click, pm)
Params().put_bool("UpdateAvailable", True)
release_notes_path = os.path.join(BASEDIR, "RELEASES.md")
with open(release_notes_path) as file:
release_notes = file.read().split('\n\n', 1)[0]
Params().put("UpdaterNewReleaseNotes", release_notes + "\n")
setup_settings_device(click, pm)
click(240, 216)
CASES = {
"homescreen": setup_homescreen,
"settings_device": setup_settings_device,
"onroad": setup_onroad,
"onroad_sidebar": setup_onroad_sidebar,
"onroad_alert_small": setup_onroad_alert_small,
"onroad_alert_mid": setup_onroad_alert_mid,
"onroad_alert_full": setup_onroad_alert_full,
"onroad_wide": setup_onroad_wide,
"onroad_wide_sidebar": setup_onroad_wide_sidebar,
"driver_camera": setup_driver_camera,
"body": setup_body,
"offroad_alert": setup_offorad_alert,
"update_available": setup_update_available
}
TEST_DIR = pathlib.Path(__file__).parent
TEST_OUTPUT_DIR = TEST_DIR / "report_1"
SCREENSHOTS_DIR = TEST_OUTPUT_DIR / "screenshots"
class TestUI:
def __init__(self):
os.environ["SCALE"] = "1"
sys.modules["mouseinfo"] = False
def setup(self):
self.sm = SubMaster(["uiDebug"])
self.pm = PubMaster(list(DATA.keys()))
while not self.sm.valid["uiDebug"]:
self.sm.update(1)
time.sleep(UI_DELAY) # wait a bit more for the UI to start rendering
try:
self.ui = pywinctl.getWindowsWithTitle("ui")[0]
except Exception as e:
print(f"failed to find ui window, assuming that it's in the top left (for Xvfb) {e}")
self.ui = namedtuple("bb", ["left", "top", "width", "height"])(0,0,2160,1080)
def screenshot(self):
import pyautogui
im = pyautogui.screenshot(region=(self.ui.left, self.ui.top, self.ui.width, self.ui.height))
assert im.width == 2160
assert im.height == 1080
img = np.array(im)
im.close()
return img
def click(self, x, y, *args, **kwargs):
import pyautogui
pyautogui.click(self.ui.left + x, self.ui.top + y, *args, **kwargs)
time.sleep(UI_DELAY) # give enough time for the UI to react
@with_processes(["ui"])
def test_ui(self, name, setup_case):
self.setup()
setup_case(self.click, self.pm)
time.sleep(UI_DELAY) # wait a bit more for the UI to finish rendering
im = self.screenshot()
plt.imsave(SCREENSHOTS_DIR / f"{name}.png", im)
def create_html_report():
OUTPUT_FILE = TEST_OUTPUT_DIR / "index.html"
with open(TEST_DIR / "template.html") as f:
template = jinja2.Template(f.read())
cases = {f.stem: (str(f.relative_to(TEST_OUTPUT_DIR)), "reference.png") for f in SCREENSHOTS_DIR.glob("*.png")}
cases = dict(sorted(cases.items()))
with open(OUTPUT_FILE, "w") as f:
f.write(template.render(cases=cases))
def create_screenshots():
if TEST_OUTPUT_DIR.exists():
shutil.rmtree(TEST_OUTPUT_DIR)
SCREENSHOTS_DIR.mkdir(parents=True)
route = Route(TEST_ROUTE)
segnum = 2
lr = LogReader(route.qlog_paths()[segnum])
DATA['carParams'] = next((event.as_builder() for event in lr if event.which() == 'carParams'), None)
for event in migrate_selfdriveState(lr):
if event.which() in DATA:
DATA[event.which()] = event.as_builder()
if all(DATA.values()):
break
cam = DEVICE_CAMERAS[("tici", "ar0231")]
road_img = FrameReader(route.camera_paths()[segnum]).get(0, pix_fmt="nv12")[0]
STREAMS.append((VisionStreamType.VISION_STREAM_ROAD, cam.fcam, road_img.flatten().tobytes()))
wide_road_img = FrameReader(route.ecamera_paths()[segnum]).get(0, pix_fmt="nv12")[0]
STREAMS.append((VisionStreamType.VISION_STREAM_WIDE_ROAD, cam.ecam, wide_road_img.flatten().tobytes()))
driver_img = FrameReader(route.dcamera_paths()[segnum]).get(0, pix_fmt="nv12")[0]
STREAMS.append((VisionStreamType.VISION_STREAM_DRIVER, cam.dcam, driver_img.flatten().tobytes()))
t = TestUI()
with OpenpilotPrefix():
for name, setup in CASES.items():
t.test_ui(name, setup)
if __name__ == "__main__":
print("creating test screenshots")
create_screenshots()
print("creating html report")
create_html_report()