openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

345 lines
15 KiB

#!/usr/bin/env python3
import os
import re
import threading
import time
import urllib.request
from enum import IntEnum
import pyray as rl
from cereal import log
from openpilot.system.hardware import HARDWARE
from openpilot.system.ui.lib.application import gui_app, FontWeight
from openpilot.system.ui.widgets import Widget
from openpilot.system.ui.widgets.button import Button, ButtonStyle, ButtonRadio
from openpilot.system.ui.widgets.keyboard import Keyboard
from openpilot.system.ui.widgets.label import gui_label, gui_text_box
from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManagerWrapper
NetworkType = log.DeviceState.NetworkType
MARGIN = 50
TITLE_FONT_SIZE = 116
TITLE_FONT_WEIGHT = FontWeight.MEDIUM
NEXT_BUTTON_WIDTH = 310
BODY_FONT_SIZE = 96
BUTTON_HEIGHT = 160
BUTTON_SPACING = 50
OPENPILOT_URL = "https://openpilot.comma.ai"
USER_AGENT = f"AGNOSSetup-{HARDWARE.get_os_version()}"
class SetupState(IntEnum):
LOW_VOLTAGE = 0
GETTING_STARTED = 1
NETWORK_SETUP = 2
SOFTWARE_SELECTION = 3
CUSTOM_URL = 4
DOWNLOADING = 5
DOWNLOAD_FAILED = 6
class Setup(Widget):
def __init__(self):
super().__init__()
self.state = SetupState.GETTING_STARTED
self.network_check_thread = None
self.network_connected = threading.Event()
self.wifi_connected = threading.Event()
self.stop_network_check_thread = threading.Event()
self.failed_url = ""
self.failed_reason = ""
self.download_url = ""
self.download_progress = 0
self.download_thread = None
self.wifi_manager = WifiManagerWrapper()
self.wifi_ui = WifiManagerUI(self.wifi_manager)
self.keyboard = Keyboard()
self.selected_radio = None
self.warning = gui_app.texture("icons/warning.png", 150, 150)
self.checkmark = gui_app.texture("icons/circled_check.png", 100, 100)
self._low_voltage_continue_button = Button("Continue", self._low_voltage_continue_button_callback)
self._low_voltage_poweroff_button = Button("Power Off", HARDWARE.shutdown)
self._getting_started_button = Button("", self._getting_started_button_callback, button_style=ButtonStyle.PRIMARY, border_radius=0)
self._software_selection_openpilot_button = ButtonRadio("openpilot", self.checkmark, font_size=BODY_FONT_SIZE, text_padding=80)
self._software_selection_custom_software_button = ButtonRadio("Custom Software", self.checkmark, font_size=BODY_FONT_SIZE, text_padding=80)
self._software_selection_continue_button = Button("Continue", self._software_selection_continue_button_callback,
button_style=ButtonStyle.PRIMARY, enabled=False)
self._software_selection_back_button = Button("Back", self._software_selection_back_button_callback)
self._download_failed_reboot_button = Button("Reboot device", HARDWARE.reboot)
self._download_failed_startover_button = Button("Start over", self._download_failed_startover_button_callback, button_style=ButtonStyle.PRIMARY)
self._network_setup_back_button = Button("Back", self._network_setup_back_button_callback)
self._network_setup_continue_button = Button("Waiting for internet", self._network_setup_continue_button_callback,
button_style=ButtonStyle.PRIMARY, enabled=False)
try:
with open("/sys/class/hwmon/hwmon1/in1_input") as f:
voltage = float(f.read().strip()) / 1000.0
if voltage < 7:
self.state = SetupState.LOW_VOLTAGE
except (FileNotFoundError, ValueError):
self.state = SetupState.LOW_VOLTAGE
def _render(self, rect: rl.Rectangle):
if self.state == SetupState.LOW_VOLTAGE:
self.render_low_voltage(rect)
elif self.state == SetupState.GETTING_STARTED:
self.render_getting_started(rect)
elif self.state == SetupState.NETWORK_SETUP:
self.render_network_setup(rect)
elif self.state == SetupState.SOFTWARE_SELECTION:
self.render_software_selection(rect)
elif self.state == SetupState.CUSTOM_URL:
self.render_custom_url()
elif self.state == SetupState.DOWNLOADING:
self.render_downloading(rect)
elif self.state == SetupState.DOWNLOAD_FAILED:
self.render_download_failed(rect)
def _low_voltage_continue_button_callback(self):
self.state = SetupState.GETTING_STARTED
def _getting_started_button_callback(self):
self.state = SetupState.NETWORK_SETUP
self.start_network_check()
def _software_selection_back_button_callback(self):
self.state = SetupState.NETWORK_SETUP
def _software_selection_continue_button_callback(self):
if self._software_selection_openpilot_button.selected:
self.download(OPENPILOT_URL)
else:
self.state = SetupState.CUSTOM_URL
def _download_failed_startover_button_callback(self):
self.state = SetupState.GETTING_STARTED
def _network_setup_back_button_callback(self):
self.state = SetupState.GETTING_STARTED
def _network_setup_continue_button_callback(self):
self.state = SetupState.SOFTWARE_SELECTION
self.stop_network_check_thread.set()
def render_low_voltage(self, rect: rl.Rectangle):
rl.draw_texture(self.warning, int(rect.x + 150), int(rect.y + 110), rl.WHITE)
title_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100, rect.width - 500 - 150, TITLE_FONT_SIZE)
gui_label(title_rect, "WARNING: Low Voltage", TITLE_FONT_SIZE, rl.Color(255, 89, 79, 255), FontWeight.MEDIUM)
body_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100 + TITLE_FONT_SIZE + 25, rect.width - 500 - 150, BODY_FONT_SIZE * 3)
gui_text_box(body_rect, "Power your device in a car with a harness or proceed at your own risk.", BODY_FONT_SIZE)
button_width = (rect.width - MARGIN * 3) / 2
button_y = rect.height - MARGIN - BUTTON_HEIGHT
self._low_voltage_poweroff_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
self._low_voltage_continue_button.render(rl.Rectangle(rect.x + MARGIN * 2 + button_width, button_y, button_width, BUTTON_HEIGHT))
def render_getting_started(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + 165, rect.y + 280, rect.width - 265, TITLE_FONT_SIZE)
gui_label(title_rect, "Getting Started", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
desc_rect = rl.Rectangle(rect.x + 165, rect.y + 280 + TITLE_FONT_SIZE + 90, rect.width - 500, BODY_FONT_SIZE * 3)
gui_text_box(desc_rect, "Before we get on the road, let's finish installation and cover some details.", BODY_FONT_SIZE)
btn_rect = rl.Rectangle(rect.width - NEXT_BUTTON_WIDTH, 0, NEXT_BUTTON_WIDTH, rect.height)
self._getting_started_button.render(btn_rect)
triangle = gui_app.texture("images/button_continue_triangle.png", 54, int(btn_rect.height))
rl.draw_texture_v(triangle, rl.Vector2(btn_rect.x + btn_rect.width / 2 - triangle.width / 2, btn_rect.height / 2 - triangle.height / 2), rl.WHITE)
def check_network_connectivity(self):
while not self.stop_network_check_thread.is_set():
if self.state == SetupState.NETWORK_SETUP:
try:
urllib.request.urlopen(OPENPILOT_URL, timeout=2)
self.network_connected.set()
if HARDWARE.get_network_type() == NetworkType.wifi:
self.wifi_connected.set()
else:
self.wifi_connected.clear()
except Exception:
self.network_connected.clear()
time.sleep(1)
def start_network_check(self):
if self.network_check_thread is None or not self.network_check_thread.is_alive():
self.network_check_thread = threading.Thread(target=self.check_network_connectivity, daemon=True)
self.network_check_thread.start()
def close(self):
if self.network_check_thread is not None:
self.stop_network_check_thread.set()
self.network_check_thread.join()
def render_network_setup(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
gui_label(title_rect, "Connect to Wi-Fi", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
wifi_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN + 25, rect.width - MARGIN * 2,
rect.height - TITLE_FONT_SIZE - 25 - BUTTON_HEIGHT - MARGIN * 3)
rl.draw_rectangle_rounded(wifi_rect, 0.05, 10, rl.Color(51, 51, 51, 255))
wifi_content_rect = rl.Rectangle(wifi_rect.x + MARGIN, wifi_rect.y, wifi_rect.width - MARGIN * 2, wifi_rect.height)
self.wifi_ui.render(wifi_content_rect)
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
button_y = rect.height - BUTTON_HEIGHT - MARGIN
self._network_setup_back_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
# Check network connectivity status
continue_enabled = self.network_connected.is_set()
self._network_setup_continue_button.enabled = continue_enabled
continue_text = ("Continue" if self.wifi_connected.is_set() else "Continue without Wi-Fi") if continue_enabled else "Waiting for internet"
self._network_setup_continue_button._text = continue_text
self._network_setup_continue_button.render(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT))
def render_software_selection(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
gui_label(title_rect, "Choose Software to Use", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
radio_height = 230
radio_spacing = 30
self._software_selection_continue_button.enabled = False
openpilot_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2, rect.width - MARGIN * 2, radio_height)
self._software_selection_openpilot_button.render(openpilot_rect)
if self._software_selection_openpilot_button.selected:
self._software_selection_continue_button.enabled = True
self._software_selection_custom_software_button.selected = False
custom_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2 + radio_height + radio_spacing, rect.width - MARGIN * 2, radio_height)
self._software_selection_custom_software_button.render(custom_rect)
if self._software_selection_custom_software_button.selected:
self._software_selection_continue_button.enabled = True
self._software_selection_openpilot_button.selected = False
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
button_y = rect.height - BUTTON_HEIGHT - MARGIN
self._software_selection_back_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
self._software_selection_continue_button.render(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT))
def render_downloading(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x, rect.y + rect.height / 2 - TITLE_FONT_SIZE / 2, rect.width, TITLE_FONT_SIZE)
gui_label(title_rect, "Downloading...", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
def render_download_failed(self, rect: rl.Rectangle):
title_rect = rl.Rectangle(rect.x + 117, rect.y + 185, rect.width - 117, TITLE_FONT_SIZE)
gui_label(title_rect, "Download Failed", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
url_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67, rect.width - 117 - 100, 64)
gui_label(url_rect, self.failed_url, 64, font_weight=FontWeight.NORMAL)
error_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67 + 64 + 48,
rect.width - 117 - 100, rect.height - 185 + TITLE_FONT_SIZE + 67 + 64 + 48 - BUTTON_HEIGHT - MARGIN * 2)
gui_text_box(error_rect, self.failed_reason, BODY_FONT_SIZE)
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
button_y = rect.height - BUTTON_HEIGHT - MARGIN
self._download_failed_reboot_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
self._download_failed_startover_button.render(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT))
def render_custom_url(self):
def handle_keyboard_result(result):
# Enter pressed
if result == 1:
url = self.keyboard.text
self.keyboard.clear()
if url:
self.download(url)
# Cancel pressed
elif result == 0:
self.state = SetupState.SOFTWARE_SELECTION
self.keyboard.reset()
self.keyboard.set_title("Enter URL", "for Custom Software")
gui_app.set_modal_overlay(self.keyboard, callback=handle_keyboard_result)
def download(self, url: str):
# autocomplete incomplete URLs
if re.match("^([^/.]+)/([^/]+)$", url):
url = f"https://installer.comma.ai/{url}"
self.download_url = url
self.state = SetupState.DOWNLOADING
self.download_thread = threading.Thread(target=self._download_thread, daemon=True)
self.download_thread.start()
def _download_thread(self):
try:
import tempfile
_, tmpfile = tempfile.mkstemp(prefix="installer_")
headers = {"User-Agent": USER_AGENT, "X-openpilot-serial": HARDWARE.get_serial()}
req = urllib.request.Request(self.download_url, headers=headers)
with open(tmpfile, 'wb') as f, urllib.request.urlopen(req, timeout=30) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
block_size = 8192
while True:
buffer = response.read(block_size)
if not buffer:
break
downloaded += len(buffer)
f.write(buffer)
if total_size:
self.download_progress = int(downloaded * 100 / total_size)
is_elf = False
with open(tmpfile, 'rb') as f:
header = f.read(4)
is_elf = header == b'\x7fELF'
if not is_elf:
self.download_failed(self.download_url, "No custom software found at this URL.")
return
os.rename(tmpfile, "/tmp/installer")
os.chmod("/tmp/installer", 0o755)
with open("/tmp/installer_url", "w") as f:
f.write(self.download_url)
gui_app.request_close()
except Exception:
error_msg = "Ensure the entered URL is valid, and the device's internet connection is good."
self.download_failed(self.download_url, error_msg)
def download_failed(self, url: str, reason: str):
self.failed_url = url
self.failed_reason = reason
self.state = SetupState.DOWNLOAD_FAILED
def main():
try:
gui_app.init_window("Setup")
setup = Setup()
for _ in gui_app.render():
setup.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
setup.close()
except Exception as e:
print(f"Setup error: {e}")
finally:
gui_app.close()
if __name__ == "__main__":
main()