openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

346 lines
14 KiB

#!/usr/bin/env python3
import os
import re
import threading
import time
import urllib.request
from enum import IntEnum
import pyray as rl
from cereal import log
from openpilot.system.hardware import HARDWARE
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.lib.button import gui_button, ButtonStyle
from openpilot.system.ui.lib.label import gui_label, gui_text_box
from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManagerWrapper
from openpilot.system.ui.widgets.keyboard import Keyboard
NetworkType = log.DeviceState.NetworkType
MARGIN = 50
TITLE_FONT_SIZE = 116
TITLE_FONT_WEIGHT = FontWeight.MEDIUM
NEXT_BUTTON_WIDTH = 310
BODY_FONT_SIZE = 96
BUTTON_HEIGHT = 160
BUTTON_SPACING = 50
OPENPILOT_URL = "https://openpilot.comma.ai"
USER_AGENT = f"AGNOSSetup-{HARDWARE.get_os_version()}"
class SetupState(IntEnum):
LOW_VOLTAGE = 0
GETTING_STARTED = 1
NETWORK_SETUP = 2
SOFTWARE_SELECTION = 3
CUSTOM_URL = 4
DOWNLOADING = 5
DOWNLOAD_FAILED = 6
class Setup:
def __init__(self):
self.state = SetupState.GETTING_STARTED
self.network_check_thread = None
self.network_connected = threading.Event()
self.wifi_connected = threading.Event()
self.stop_network_check_thread = threading.Event()
self.failed_url = ""
self.failed_reason = ""
self.download_url = ""
self.download_progress = 0
self.download_thread = None
self.wifi_manager = WifiManagerWrapper()
self.wifi_ui = WifiManagerUI(self.wifi_manager)
self.keyboard = Keyboard()
self.selected_radio = None
self.warning = gui_app.texture("icons/warning.png", 150, 150)
self.checkmark = gui_app.texture("icons/circled_check.png", 100, 100)
try:
with open("/sys/class/hwmon/hwmon1/in1_input") as f:
voltage = float(f.read().strip()) / 1000.0
if voltage < 7:
self.state = SetupState.LOW_VOLTAGE
except (FileNotFoundError, ValueError):
self.state = SetupState.LOW_VOLTAGE
def render(self, rect: rl.Rectangle):
if self.state == SetupState.LOW_VOLTAGE:
self.render_low_voltage(rect)
elif self.state == SetupState.GETTING_STARTED:
self.render_getting_started(rect)
elif self.state == SetupState.NETWORK_SETUP:
self.render_network_setup(rect)
elif self.state == SetupState.SOFTWARE_SELECTION:
self.render_software_selection(rect)
elif self.state == SetupState.CUSTOM_URL:
self.render_custom_url()
elif self.state == SetupState.DOWNLOADING:
self.render_downloading(rect)
elif self.state == SetupState.DOWNLOAD_FAILED:
self.render_download_failed(rect)
def render_low_voltage(self, rect: rl.Rectangle):
rl.draw_texture(self.warning, int(rect.x + 150), int(rect.y + 110), rl.WHITE)
title_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100, rect.width - 500 - 150, TITLE_FONT_SIZE)
gui_label(title_rect, "WARNING: Low Voltage", TITLE_FONT_SIZE, rl.Color(255, 89, 79, 255), FontWeight.MEDIUM)
body_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100 + TITLE_FONT_SIZE + 25, rect.width - 500 - 150, BODY_FONT_SIZE * 3)
gui_text_box(body_rect, "Power your device in a car with a harness or proceed at your own risk.", BODY_FONT_SIZE)
button_width = (rect.width - MARGIN * 3) / 2
button_y = rect.height - MARGIN - BUTTON_HEIGHT
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Power off"):
HARDWARE.shutdown()
if gui_button(rl.Rectangle(rect.x + MARGIN * 2 + button_width, button_y, button_width, BUTTON_HEIGHT), "Continue"):
self.state = SetupState.GETTING_STARTED
def render_getting_started(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + 165, rect.y + 280, rect.width - 265, TITLE_FONT_SIZE)
gui_label(title_rect, "Getting Started", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
desc_rect = rl.Rectangle(rect.x + 165, rect.y + 280 + TITLE_FONT_SIZE + 90, rect.width - 500, BODY_FONT_SIZE * 3)
gui_text_box(desc_rect, "Before we get on the road, let's finish installation and cover some details.", BODY_FONT_SIZE)
btn_rect = rl.Rectangle(rect.width - NEXT_BUTTON_WIDTH, 0, NEXT_BUTTON_WIDTH, rect.height)
ret = gui_button(btn_rect, "", button_style=ButtonStyle.PRIMARY, border_radius=0)
triangle = gui_app.texture("images/button_continue_triangle.png", 54, int(btn_rect.height))
rl.draw_texture_v(triangle, rl.Vector2(btn_rect.x + btn_rect.width / 2 - triangle.width / 2, btn_rect.height / 2 - triangle.height / 2), rl.WHITE)
if ret:
self.state = SetupState.NETWORK_SETUP
self.wifi_manager.request_scan()
self.start_network_check()
def check_network_connectivity(self):
while not self.stop_network_check_thread.is_set():
if self.state == SetupState.NETWORK_SETUP:
try:
urllib.request.urlopen("https://google.com", timeout=2)
self.network_connected.set()
if HARDWARE.get_network_type() == NetworkType.wifi:
self.wifi_connected.set()
else:
self.wifi_connected.clear()
except Exception:
self.network_connected.clear()
time.sleep(1)
def start_network_check(self):
if self.network_check_thread is None or not self.network_check_thread.is_alive():
self.network_check_thread = threading.Thread(target=self.check_network_connectivity, daemon=True)
self.network_check_thread.start()
def close(self):
if self.network_check_thread is not None:
self.stop_network_check_thread.set()
self.network_check_thread.join()
def render_network_setup(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
gui_label(title_rect, "Connect to Wi-Fi", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
wifi_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN + 25, rect.width - MARGIN * 2,
rect.height - TITLE_FONT_SIZE - 25 - BUTTON_HEIGHT - MARGIN * 3)
rl.draw_rectangle_rounded(wifi_rect, 0.05, 10, rl.Color(51, 51, 51, 255))
wifi_content_rect = rl.Rectangle(wifi_rect.x + MARGIN, wifi_rect.y, wifi_rect.width - MARGIN * 2, wifi_rect.height)
self.wifi_ui.render(wifi_content_rect)
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
button_y = rect.height - BUTTON_HEIGHT - MARGIN
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Back"):
self.state = SetupState.GETTING_STARTED
# Check network connectivity status
continue_enabled = self.network_connected.is_set()
continue_text = ("Continue" if self.wifi_connected.is_set() else "Continue without Wi-Fi") if continue_enabled else "Waiting for internet"
if gui_button(
rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT),
continue_text,
button_style=ButtonStyle.PRIMARY if continue_enabled else ButtonStyle.NORMAL,
is_enabled=continue_enabled,
):
self.state = SetupState.SOFTWARE_SELECTION
self.stop_network_check_thread.set()
def render_software_selection(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
gui_label(title_rect, "Choose Software to Install", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
radio_height = 230
radio_spacing = 30
openpilot_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2, rect.width - MARGIN * 2, radio_height)
openpilot_selected = self.selected_radio == "openpilot"
rl.draw_rectangle_rounded(openpilot_rect, 0.1, 10, rl.Color(70, 91, 234, 255) if openpilot_selected else rl.Color(79, 79, 79, 255))
gui_label(rl.Rectangle(openpilot_rect.x + 100, openpilot_rect.y, openpilot_rect.width - 200, radio_height), "openpilot", BODY_FONT_SIZE)
if openpilot_selected:
checkmark_pos = rl.Vector2(openpilot_rect.x + openpilot_rect.width - 100 - self.checkmark.width,
openpilot_rect.y + radio_height / 2 - self.checkmark.height / 2)
rl.draw_texture_v(self.checkmark, checkmark_pos, rl.WHITE)
custom_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2 + radio_height + radio_spacing, rect.width - MARGIN * 2, radio_height)
custom_selected = self.selected_radio == "custom"
rl.draw_rectangle_rounded(custom_rect, 0.1, 10, rl.Color(70, 91, 234, 255) if custom_selected else rl.Color(79, 79, 79, 255))
gui_label(rl.Rectangle(custom_rect.x + 100, custom_rect.y, custom_rect.width - 200, radio_height), "Custom Software", BODY_FONT_SIZE)
if custom_selected:
checkmark_pos = rl.Vector2(custom_rect.x + custom_rect.width - 100 - self.checkmark.width, custom_rect.y + radio_height / 2 - self.checkmark.height / 2)
rl.draw_texture_v(self.checkmark, checkmark_pos, rl.WHITE)
mouse_pos = rl.get_mouse_position()
if rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT):
if rl.check_collision_point_rec(mouse_pos, openpilot_rect):
self.selected_radio = "openpilot"
elif rl.check_collision_point_rec(mouse_pos, custom_rect):
self.selected_radio = "custom"
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
button_y = rect.height - BUTTON_HEIGHT - MARGIN
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Back"):
self.state = SetupState.NETWORK_SETUP
continue_enabled = self.selected_radio is not None
if gui_button(
rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT),
"Continue",
button_style=ButtonStyle.PRIMARY,
is_enabled=continue_enabled,
):
if continue_enabled:
if self.selected_radio == "openpilot":
self.download(OPENPILOT_URL)
else:
self.state = SetupState.CUSTOM_URL
def render_downloading(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x, rect.y + rect.height / 2 - TITLE_FONT_SIZE / 2, rect.width, TITLE_FONT_SIZE)
gui_label(title_rect, "Downloading...", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
def render_download_failed(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + 117, rect.y + 185, rect.width - 117, TITLE_FONT_SIZE)
gui_label(title_rect, "Download Failed", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
url_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67, rect.width - 117 - 100, 64)
gui_label(url_rect, self.failed_url, 64, font_weight=FontWeight.NORMAL)
error_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67 + 64 + 48,
rect.width - 117 - 100, rect.height - 185 + TITLE_FONT_SIZE + 67 + 64 + 48 - BUTTON_HEIGHT - MARGIN * 2)
gui_text_box(error_rect, self.failed_reason, BODY_FONT_SIZE)
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
button_y = rect.height - BUTTON_HEIGHT - MARGIN
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Reboot device"):
HARDWARE.reboot()
if gui_button(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT), "Start over",
button_style=ButtonStyle.PRIMARY):
self.state = SetupState.GETTING_STARTED
def render_custom_url(self):
result = self.keyboard.render("Enter URL", "for Custom Software")
# Enter pressed
if result == 1:
url = self.keyboard.text
self.keyboard.clear()
if url:
self.download(url)
# Cancel pressed
elif result == 0:
self.state = SetupState.SOFTWARE_SELECTION
def download(self, url: str):
# autocomplete incomplete URLs
if re.match("^([^/.]+)/([^/]+)$", url):
url = f"https://installer.comma.ai/{url}"
self.download_url = url
self.state = SetupState.DOWNLOADING
self.download_thread = threading.Thread(target=self._download_thread, daemon=True)
self.download_thread.start()
def _download_thread(self):
try:
import tempfile
_, tmpfile = tempfile.mkstemp(prefix="installer_")
headers = {"User-Agent": USER_AGENT, "X-openpilot-serial": HARDWARE.get_serial()}
req = urllib.request.Request(self.download_url, headers=headers)
with open(tmpfile, 'wb') as f, urllib.request.urlopen(req, timeout=30) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
block_size = 8192
while True:
buffer = response.read(block_size)
if not buffer:
break
downloaded += len(buffer)
f.write(buffer)
if total_size:
self.download_progress = int(downloaded * 100 / total_size)
is_elf = False
with open(tmpfile, 'rb') as f:
header = f.read(4)
is_elf = header == b'\x7fELF'
if not is_elf:
self.download_failed(self.download_url, "No custom software found at this URL.")
return
os.rename(tmpfile, "/tmp/installer")
os.chmod("/tmp/installer", 0o755)
with open("/tmp/installer_url", "w") as f:
f.write(self.download_url)
gui_app.request_close()
except Exception:
error_msg = "Ensure the entered URL is valid, and the device's internet connection is good."
self.download_failed(self.download_url, error_msg)
def download_failed(self, url: str, reason: str):
self.failed_url = url
self.failed_reason = reason
self.state = SetupState.DOWNLOAD_FAILED
def main():
try:
gui_app.init_window("Setup")
setup = Setup()
for _ in gui_app.render():
setup.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
setup.close()
except Exception as e:
print(f"Setup error: {e}")
finally:
gui_app.close()
if __name__ == "__main__":
main()