You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							347 lines
						
					
					
						
							15 KiB
						
					
					
				
			
		
		
	
	
							347 lines
						
					
					
						
							15 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import os
 | |
| import re
 | |
| import threading
 | |
| import time
 | |
| import urllib.request
 | |
| from enum import IntEnum
 | |
| import pyray as rl
 | |
| 
 | |
| from cereal import log
 | |
| from openpilot.system.hardware import HARDWARE
 | |
| from openpilot.system.ui.lib.application import gui_app, FontWeight
 | |
| from openpilot.system.ui.widgets import Widget
 | |
| from openpilot.system.ui.widgets.button import Button, ButtonStyle, ButtonRadio
 | |
| from openpilot.system.ui.widgets.keyboard import Keyboard
 | |
| from openpilot.system.ui.widgets.label import gui_label, gui_text_box
 | |
| from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManagerWrapper
 | |
| 
 | |
| NetworkType = log.DeviceState.NetworkType
 | |
| 
 | |
| MARGIN = 50
 | |
| TITLE_FONT_SIZE = 116
 | |
| TITLE_FONT_WEIGHT = FontWeight.MEDIUM
 | |
| NEXT_BUTTON_WIDTH = 310
 | |
| BODY_FONT_SIZE = 96
 | |
| BUTTON_HEIGHT = 160
 | |
| BUTTON_SPACING = 50
 | |
| 
 | |
| OPENPILOT_URL = "https://openpilot.comma.ai"
 | |
| USER_AGENT = f"AGNOSSetup-{HARDWARE.get_os_version()}"
 | |
| 
 | |
| 
 | |
| class SetupState(IntEnum):
 | |
|   LOW_VOLTAGE = 0
 | |
|   GETTING_STARTED = 1
 | |
|   NETWORK_SETUP = 2
 | |
|   SOFTWARE_SELECTION = 3
 | |
|   CUSTOM_URL = 4
 | |
|   DOWNLOADING = 5
 | |
|   DOWNLOAD_FAILED = 6
 | |
| 
 | |
| 
 | |
| class Setup(Widget):
 | |
|   def __init__(self):
 | |
|     super().__init__()
 | |
|     self.state = SetupState.GETTING_STARTED
 | |
|     self.network_check_thread = None
 | |
|     self.network_connected = threading.Event()
 | |
|     self.wifi_connected = threading.Event()
 | |
|     self.stop_network_check_thread = threading.Event()
 | |
|     self.failed_url = ""
 | |
|     self.failed_reason = ""
 | |
|     self.download_url = ""
 | |
|     self.download_progress = 0
 | |
|     self.download_thread = None
 | |
|     self.wifi_manager = WifiManagerWrapper()
 | |
|     self.wifi_ui = WifiManagerUI(self.wifi_manager)
 | |
|     self.keyboard = Keyboard()
 | |
|     self.selected_radio = None
 | |
|     self.warning = gui_app.texture("icons/warning.png", 150, 150)
 | |
|     self.checkmark = gui_app.texture("icons/circled_check.png", 100, 100)
 | |
|     self._low_voltage_continue_button = Button("Continue", self._low_voltage_continue_button_callback)
 | |
|     self._low_voltage_poweroff_button = Button("Power Off", HARDWARE.shutdown)
 | |
|     self._getting_started_button = Button("", self._getting_started_button_callback, button_style=ButtonStyle.PRIMARY, border_radius=0)
 | |
|     self._software_selection_openpilot_button = ButtonRadio("openpilot", self.checkmark, font_size=BODY_FONT_SIZE, text_padding=80)
 | |
|     self._software_selection_custom_software_button = ButtonRadio("Custom Software", self.checkmark, font_size=BODY_FONT_SIZE, text_padding=80)
 | |
|     self._software_selection_continue_button = Button("Continue", self._software_selection_continue_button_callback,
 | |
|                                                       button_style=ButtonStyle.PRIMARY, enabled=False)
 | |
|     self._software_selection_back_button = Button("Back", self._software_selection_back_button_callback)
 | |
|     self._download_failed_reboot_button = Button("Reboot device", HARDWARE.reboot)
 | |
|     self._download_failed_startover_button = Button("Start over", self._download_failed_startover_button_callback, button_style=ButtonStyle.PRIMARY)
 | |
|     self._network_setup_back_button = Button("Back", self._network_setup_back_button_callback)
 | |
|     self._network_setup_continue_button = Button("Waiting for internet", self._network_setup_continue_button_callback,
 | |
|                                                  button_style=ButtonStyle.PRIMARY, enabled=False)
 | |
| 
 | |
| 
 | |
|     try:
 | |
|       with open("/sys/class/hwmon/hwmon1/in1_input") as f:
 | |
|         voltage = float(f.read().strip()) / 1000.0
 | |
|         if voltage < 7:
 | |
|           self.state = SetupState.LOW_VOLTAGE
 | |
|     except (FileNotFoundError, ValueError):
 | |
|       self.state = SetupState.LOW_VOLTAGE
 | |
| 
 | |
|   def _render(self, rect: rl.Rectangle):
 | |
|     if self.state == SetupState.LOW_VOLTAGE:
 | |
|       self.render_low_voltage(rect)
 | |
|     elif self.state == SetupState.GETTING_STARTED:
 | |
|       self.render_getting_started(rect)
 | |
|     elif self.state == SetupState.NETWORK_SETUP:
 | |
|       self.render_network_setup(rect)
 | |
|     elif self.state == SetupState.SOFTWARE_SELECTION:
 | |
|       self.render_software_selection(rect)
 | |
|     elif self.state == SetupState.CUSTOM_URL:
 | |
|       self.render_custom_url()
 | |
|     elif self.state == SetupState.DOWNLOADING:
 | |
|       self.render_downloading(rect)
 | |
|     elif self.state == SetupState.DOWNLOAD_FAILED:
 | |
|       self.render_download_failed(rect)
 | |
| 
 | |
|   def _low_voltage_continue_button_callback(self):
 | |
|     self.state = SetupState.GETTING_STARTED
 | |
| 
 | |
|   def _getting_started_button_callback(self):
 | |
|     self.state = SetupState.NETWORK_SETUP
 | |
|     self.stop_network_check_thread.clear()
 | |
|     self.start_network_check()
 | |
| 
 | |
|   def _software_selection_back_button_callback(self):
 | |
|     self.state = SetupState.NETWORK_SETUP
 | |
|     self.stop_network_check_thread.clear()
 | |
|     self.start_network_check()
 | |
| 
 | |
|   def _software_selection_continue_button_callback(self):
 | |
|     if self._software_selection_openpilot_button.selected:
 | |
|       self.download(OPENPILOT_URL)
 | |
|     else:
 | |
|       self.state = SetupState.CUSTOM_URL
 | |
| 
 | |
|   def _download_failed_startover_button_callback(self):
 | |
|     self.state = SetupState.GETTING_STARTED
 | |
| 
 | |
|   def _network_setup_back_button_callback(self):
 | |
|     self.state = SetupState.GETTING_STARTED
 | |
| 
 | |
|   def _network_setup_continue_button_callback(self):
 | |
|     self.state = SetupState.SOFTWARE_SELECTION
 | |
|     self.stop_network_check_thread.set()
 | |
| 
 | |
|   def render_low_voltage(self, rect: rl.Rectangle):
 | |
|     rl.draw_texture(self.warning, int(rect.x + 150), int(rect.y + 110), rl.WHITE)
 | |
| 
 | |
|     title_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100, rect.width - 500 - 150, TITLE_FONT_SIZE)
 | |
|     gui_label(title_rect, "WARNING: Low Voltage", TITLE_FONT_SIZE, rl.Color(255, 89, 79, 255), FontWeight.MEDIUM)
 | |
| 
 | |
|     body_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100 + TITLE_FONT_SIZE + 25, rect.width - 500 - 150, BODY_FONT_SIZE * 3)
 | |
|     gui_text_box(body_rect, "Power your device in a car with a harness or proceed at your own risk.", BODY_FONT_SIZE)
 | |
| 
 | |
|     button_width = (rect.width - MARGIN * 3) / 2
 | |
|     button_y = rect.height - MARGIN - BUTTON_HEIGHT
 | |
| 
 | |
|     self._low_voltage_poweroff_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
 | |
|     self._low_voltage_continue_button.render(rl.Rectangle(rect.x + MARGIN * 2 + button_width, button_y, button_width, BUTTON_HEIGHT))
 | |
| 
 | |
|   def render_getting_started(self, rect: rl.Rectangle):
 | |
|     title_rect = rl.Rectangle(rect.x + 165, rect.y + 280, rect.width - 265, TITLE_FONT_SIZE)
 | |
|     gui_label(title_rect, "Getting Started", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
 | |
| 
 | |
|     desc_rect = rl.Rectangle(rect.x + 165, rect.y + 280 + TITLE_FONT_SIZE + 90, rect.width - 500, BODY_FONT_SIZE * 3)
 | |
|     gui_text_box(desc_rect, "Before we get on the road, let's finish installation and cover some details.", BODY_FONT_SIZE)
 | |
| 
 | |
|     btn_rect = rl.Rectangle(rect.width - NEXT_BUTTON_WIDTH, 0, NEXT_BUTTON_WIDTH, rect.height)
 | |
| 
 | |
|     self._getting_started_button.render(btn_rect)
 | |
|     triangle = gui_app.texture("images/button_continue_triangle.png", 54, int(btn_rect.height))
 | |
|     rl.draw_texture_v(triangle, rl.Vector2(btn_rect.x + btn_rect.width / 2 - triangle.width / 2, btn_rect.height / 2 - triangle.height / 2), rl.WHITE)
 | |
| 
 | |
|   def check_network_connectivity(self):
 | |
|     while not self.stop_network_check_thread.is_set():
 | |
|       if self.state == SetupState.NETWORK_SETUP:
 | |
|         try:
 | |
|           urllib.request.urlopen(OPENPILOT_URL, timeout=2)
 | |
|           self.network_connected.set()
 | |
|           if HARDWARE.get_network_type() == NetworkType.wifi:
 | |
|             self.wifi_connected.set()
 | |
|           else:
 | |
|             self.wifi_connected.clear()
 | |
|         except Exception:
 | |
|           self.network_connected.clear()
 | |
|       time.sleep(1)
 | |
| 
 | |
|   def start_network_check(self):
 | |
|     if self.network_check_thread is None or not self.network_check_thread.is_alive():
 | |
|       self.network_check_thread = threading.Thread(target=self.check_network_connectivity, daemon=True)
 | |
|       self.network_check_thread.start()
 | |
| 
 | |
|   def close(self):
 | |
|     if self.network_check_thread is not None:
 | |
|       self.stop_network_check_thread.set()
 | |
|       self.network_check_thread.join()
 | |
| 
 | |
|   def render_network_setup(self, rect: rl.Rectangle):
 | |
|     title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
 | |
|     gui_label(title_rect, "Connect to Wi-Fi", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
 | |
| 
 | |
|     wifi_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN + 25, rect.width - MARGIN * 2,
 | |
|                              rect.height - TITLE_FONT_SIZE - 25 - BUTTON_HEIGHT - MARGIN * 3)
 | |
|     rl.draw_rectangle_rounded(wifi_rect, 0.05, 10, rl.Color(51, 51, 51, 255))
 | |
|     wifi_content_rect = rl.Rectangle(wifi_rect.x + MARGIN, wifi_rect.y, wifi_rect.width - MARGIN * 2, wifi_rect.height)
 | |
|     self.wifi_ui.render(wifi_content_rect)
 | |
| 
 | |
|     button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
 | |
|     button_y = rect.height - BUTTON_HEIGHT - MARGIN
 | |
| 
 | |
|     self._network_setup_back_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
 | |
| 
 | |
|     # Check network connectivity status
 | |
|     continue_enabled = self.network_connected.is_set()
 | |
|     self._network_setup_continue_button.enabled = continue_enabled
 | |
|     continue_text = ("Continue" if self.wifi_connected.is_set() else "Continue without Wi-Fi") if continue_enabled else "Waiting for internet"
 | |
|     self._network_setup_continue_button.set_text(continue_text)
 | |
|     self._network_setup_continue_button.render(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT))
 | |
| 
 | |
|   def render_software_selection(self, rect: rl.Rectangle):
 | |
|     title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
 | |
|     gui_label(title_rect, "Choose Software to Use", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
 | |
| 
 | |
|     radio_height = 230
 | |
|     radio_spacing = 30
 | |
| 
 | |
|     self._software_selection_continue_button.enabled = False
 | |
| 
 | |
|     openpilot_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2, rect.width - MARGIN * 2, radio_height)
 | |
|     self._software_selection_openpilot_button.render(openpilot_rect)
 | |
| 
 | |
|     if self._software_selection_openpilot_button.selected:
 | |
|       self._software_selection_continue_button.enabled = True
 | |
|       self._software_selection_custom_software_button.selected = False
 | |
| 
 | |
|     custom_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2 + radio_height + radio_spacing, rect.width - MARGIN * 2, radio_height)
 | |
|     self._software_selection_custom_software_button.render(custom_rect)
 | |
| 
 | |
|     if self._software_selection_custom_software_button.selected:
 | |
|       self._software_selection_continue_button.enabled = True
 | |
|       self._software_selection_openpilot_button.selected = False
 | |
| 
 | |
|     button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
 | |
|     button_y = rect.height - BUTTON_HEIGHT - MARGIN
 | |
| 
 | |
|     self._software_selection_back_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
 | |
|     self._software_selection_continue_button.render(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT))
 | |
| 
 | |
|   def render_downloading(self, rect: rl.Rectangle):
 | |
|     title_rect = rl.Rectangle(rect.x, rect.y + rect.height / 2 - TITLE_FONT_SIZE / 2, rect.width, TITLE_FONT_SIZE)
 | |
|     gui_label(title_rect, "Downloading...", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
 | |
| 
 | |
|   def render_download_failed(self, rect: rl.Rectangle):
 | |
|     title_rect = rl.Rectangle(rect.x + 117, rect.y + 185, rect.width - 117, TITLE_FONT_SIZE)
 | |
|     gui_label(title_rect, "Download Failed", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
 | |
| 
 | |
|     url_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67, rect.width - 117 - 100, 64)
 | |
|     gui_label(url_rect, self.failed_url, 64, font_weight=FontWeight.NORMAL)
 | |
| 
 | |
|     error_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67 + 64 + 48,
 | |
|                               rect.width - 117 - 100, rect.height - 185 + TITLE_FONT_SIZE + 67 + 64 + 48 - BUTTON_HEIGHT - MARGIN * 2)
 | |
|     gui_text_box(error_rect, self.failed_reason, BODY_FONT_SIZE)
 | |
| 
 | |
|     button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
 | |
|     button_y = rect.height - BUTTON_HEIGHT - MARGIN
 | |
|     self._download_failed_reboot_button.render(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT))
 | |
|     self._download_failed_startover_button.render(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT))
 | |
| 
 | |
|   def render_custom_url(self):
 | |
|     def handle_keyboard_result(result):
 | |
|       # Enter pressed
 | |
|       if result == 1:
 | |
|         url = self.keyboard.text
 | |
|         self.keyboard.clear()
 | |
|         if url:
 | |
|           self.download(url)
 | |
| 
 | |
|       # Cancel pressed
 | |
|       elif result == 0:
 | |
|         self.state = SetupState.SOFTWARE_SELECTION
 | |
| 
 | |
|     self.keyboard.reset()
 | |
|     self.keyboard.set_title("Enter URL", "for Custom Software")
 | |
|     gui_app.set_modal_overlay(self.keyboard, callback=handle_keyboard_result)
 | |
| 
 | |
|   def download(self, url: str):
 | |
|     # autocomplete incomplete URLs
 | |
|     if re.match("^([^/.]+)/([^/]+)$", url):
 | |
|       url = f"https://installer.comma.ai/{url}"
 | |
| 
 | |
|     self.download_url = url
 | |
|     self.state = SetupState.DOWNLOADING
 | |
| 
 | |
|     self.download_thread = threading.Thread(target=self._download_thread, daemon=True)
 | |
|     self.download_thread.start()
 | |
| 
 | |
|   def _download_thread(self):
 | |
|     try:
 | |
|       import tempfile
 | |
| 
 | |
|       _, tmpfile = tempfile.mkstemp(prefix="installer_")
 | |
| 
 | |
|       headers = {"User-Agent": USER_AGENT, "X-openpilot-serial": HARDWARE.get_serial()}
 | |
|       req = urllib.request.Request(self.download_url, headers=headers)
 | |
| 
 | |
|       with open(tmpfile, 'wb') as f, urllib.request.urlopen(req, timeout=30) as response:
 | |
|         total_size = int(response.headers.get('content-length', 0))
 | |
|         downloaded = 0
 | |
|         block_size = 8192
 | |
| 
 | |
|         while True:
 | |
|           buffer = response.read(block_size)
 | |
|           if not buffer:
 | |
|             break
 | |
| 
 | |
|           downloaded += len(buffer)
 | |
|           f.write(buffer)
 | |
| 
 | |
|           if total_size:
 | |
|             self.download_progress = int(downloaded * 100 / total_size)
 | |
| 
 | |
|       is_elf = False
 | |
|       with open(tmpfile, 'rb') as f:
 | |
|         header = f.read(4)
 | |
|         is_elf = header == b'\x7fELF'
 | |
| 
 | |
|       if not is_elf:
 | |
|         self.download_failed(self.download_url, "No custom software found at this URL.")
 | |
|         return
 | |
| 
 | |
|       os.rename(tmpfile, "/tmp/installer")
 | |
|       os.chmod("/tmp/installer", 0o755)
 | |
| 
 | |
|       with open("/tmp/installer_url", "w") as f:
 | |
|         f.write(self.download_url)
 | |
| 
 | |
|       gui_app.request_close()
 | |
| 
 | |
|     except Exception:
 | |
|       error_msg = "Ensure the entered URL is valid, and the device's internet connection is good."
 | |
|       self.download_failed(self.download_url, error_msg)
 | |
| 
 | |
|   def download_failed(self, url: str, reason: str):
 | |
|     self.failed_url = url
 | |
|     self.failed_reason = reason
 | |
|     self.state = SetupState.DOWNLOAD_FAILED
 | |
| 
 | |
| 
 | |
| def main():
 | |
|   try:
 | |
|     gui_app.init_window("Setup")
 | |
|     setup = Setup()
 | |
|     for _ in gui_app.render():
 | |
|       setup.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
 | |
|     setup.close()
 | |
|   except Exception as e:
 | |
|     print(f"Setup error: {e}")
 | |
|   finally:
 | |
|     gui_app.close()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   main()
 | |
| 
 |