You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
345 lines
14 KiB
345 lines
14 KiB
6 days ago
|
#!/usr/bin/env python3
|
||
|
import os
|
||
|
import re
|
||
|
import threading
|
||
|
import time
|
||
|
import urllib.request
|
||
|
from enum import IntEnum
|
||
|
import pyray as rl
|
||
|
|
||
|
from cereal import log
|
||
|
from openpilot.system.hardware import HARDWARE
|
||
|
from openpilot.system.ui.lib.application import gui_app, FontWeight
|
||
|
from openpilot.system.ui.lib.button import gui_button, ButtonStyle
|
||
|
from openpilot.system.ui.lib.label import gui_label, gui_text_box
|
||
|
from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManagerWrapper
|
||
|
from openpilot.system.ui.widgets.keyboard import Keyboard
|
||
|
|
||
|
NetworkType = log.DeviceState.NetworkType
|
||
|
|
||
|
MARGIN = 50
|
||
|
TITLE_FONT_SIZE = 116
|
||
|
TITLE_FONT_WEIGHT = FontWeight.MEDIUM
|
||
|
NEXT_BUTTON_WIDTH = 310
|
||
|
BODY_FONT_SIZE = 96
|
||
|
BUTTON_HEIGHT = 160
|
||
|
BUTTON_SPACING = 50
|
||
|
|
||
|
OPENPILOT_URL = "https://openpilot.comma.ai"
|
||
|
USER_AGENT = f"AGNOSSetup-{HARDWARE.get_os_version()}"
|
||
|
|
||
|
|
||
|
class SetupState(IntEnum):
|
||
|
LOW_VOLTAGE = 0
|
||
|
GETTING_STARTED = 1
|
||
|
NETWORK_SETUP = 2
|
||
|
SOFTWARE_SELECTION = 3
|
||
|
CUSTOM_URL = 4
|
||
|
DOWNLOADING = 5
|
||
|
DOWNLOAD_FAILED = 6
|
||
|
|
||
|
|
||
|
class Setup:
|
||
|
def __init__(self):
|
||
|
self.state = SetupState.GETTING_STARTED
|
||
|
self.network_check_thread = None
|
||
|
self.network_connected = threading.Event()
|
||
|
self.wifi_connected = threading.Event()
|
||
|
self.stop_network_check_thread = threading.Event()
|
||
|
self.failed_url = ""
|
||
|
self.failed_reason = ""
|
||
|
self.download_url = ""
|
||
|
self.download_progress = 0
|
||
|
self.download_thread = None
|
||
|
self.wifi_manager = WifiManagerWrapper()
|
||
|
self.wifi_ui = WifiManagerUI(self.wifi_manager)
|
||
|
self.keyboard = Keyboard()
|
||
|
self.selected_radio = None
|
||
|
|
||
|
self.warning = gui_app.texture("icons/warning.png", 150, 150)
|
||
|
self.checkmark = gui_app.texture("icons/circled_check.png", 100, 100)
|
||
|
|
||
|
try:
|
||
|
with open("/sys/class/hwmon/hwmon1/in1_input") as f:
|
||
|
voltage = float(f.read().strip()) / 1000.0
|
||
|
if voltage < 7:
|
||
|
self.state = SetupState.LOW_VOLTAGE
|
||
|
except (FileNotFoundError, ValueError):
|
||
|
self.state = SetupState.LOW_VOLTAGE
|
||
|
|
||
|
def render(self, rect: rl.Rectangle):
|
||
|
if self.state == SetupState.LOW_VOLTAGE:
|
||
|
self.render_low_voltage(rect)
|
||
|
elif self.state == SetupState.GETTING_STARTED:
|
||
|
self.render_getting_started(rect)
|
||
|
elif self.state == SetupState.NETWORK_SETUP:
|
||
|
self.render_network_setup(rect)
|
||
|
elif self.state == SetupState.SOFTWARE_SELECTION:
|
||
|
self.render_software_selection(rect)
|
||
|
elif self.state == SetupState.CUSTOM_URL:
|
||
|
self.render_custom_url()
|
||
|
elif self.state == SetupState.DOWNLOADING:
|
||
|
self.render_downloading(rect)
|
||
|
elif self.state == SetupState.DOWNLOAD_FAILED:
|
||
|
self.render_download_failed(rect)
|
||
|
|
||
|
def render_low_voltage(self, rect: rl.Rectangle):
|
||
|
rl.draw_texture(self.warning, int(rect.x + 150), int(rect.y + 110), rl.WHITE)
|
||
|
|
||
|
title_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100, rect.width - 500 - 150, TITLE_FONT_SIZE)
|
||
|
gui_label(title_rect, "WARNING: Low Voltage", TITLE_FONT_SIZE, rl.Color(255, 89, 79, 255), FontWeight.MEDIUM)
|
||
|
|
||
|
body_rect = rl.Rectangle(rect.x + 150, rect.y + 110 + 150 + 100 + TITLE_FONT_SIZE + 25, rect.width - 500 - 150, BODY_FONT_SIZE * 3)
|
||
|
gui_text_box(body_rect, "Power your device in a car with a harness or proceed at your own risk.", BODY_FONT_SIZE)
|
||
|
|
||
|
button_width = (rect.width - MARGIN * 3) / 2
|
||
|
button_y = rect.height - MARGIN - BUTTON_HEIGHT
|
||
|
|
||
|
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Power off"):
|
||
|
HARDWARE.shutdown()
|
||
|
|
||
|
if gui_button(rl.Rectangle(rect.x + MARGIN * 2 + button_width, button_y, button_width, BUTTON_HEIGHT), "Continue"):
|
||
|
self.state = SetupState.GETTING_STARTED
|
||
|
|
||
|
def render_getting_started(self, rect: rl.Rectangle):
|
||
|
title_rect = rl.Rectangle(rect.x + 165, rect.y + 280, rect.width - 265, TITLE_FONT_SIZE)
|
||
|
gui_label(title_rect, "Getting Started", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
|
||
|
|
||
|
desc_rect = rl.Rectangle(rect.x + 165, rect.y + 280 + TITLE_FONT_SIZE + 90, rect.width - 500, BODY_FONT_SIZE * 3)
|
||
|
gui_text_box(desc_rect, "Before we get on the road, let's finish installation and cover some details.", BODY_FONT_SIZE)
|
||
|
|
||
|
btn_rect = rl.Rectangle(rect.width - NEXT_BUTTON_WIDTH, 0, NEXT_BUTTON_WIDTH, rect.height)
|
||
|
|
||
|
ret = gui_button(btn_rect, "", button_style=ButtonStyle.PRIMARY, border_radius=0)
|
||
|
triangle = gui_app.texture("images/button_continue_triangle.png", 54, int(btn_rect.height))
|
||
|
rl.draw_texture_v(triangle, rl.Vector2(btn_rect.x + btn_rect.width / 2 - triangle.width / 2, btn_rect.height / 2 - triangle.height / 2), rl.WHITE)
|
||
|
|
||
|
if ret:
|
||
|
self.state = SetupState.NETWORK_SETUP
|
||
|
self.wifi_manager.request_scan()
|
||
|
self.start_network_check()
|
||
|
|
||
|
def check_network_connectivity(self):
|
||
|
while not self.stop_network_check_thread.is_set():
|
||
|
if self.state == SetupState.NETWORK_SETUP:
|
||
|
try:
|
||
|
urllib.request.urlopen("https://google.com", timeout=2)
|
||
|
self.network_connected.set()
|
||
|
if HARDWARE.get_network_type() == NetworkType.wifi:
|
||
|
self.wifi_connected.set()
|
||
|
else:
|
||
|
self.wifi_connected.clear()
|
||
|
except Exception:
|
||
|
self.network_connected.clear()
|
||
|
time.sleep(1)
|
||
|
|
||
|
def start_network_check(self):
|
||
|
if self.network_check_thread is None or not self.network_check_thread.is_alive():
|
||
|
self.network_check_thread = threading.Thread(target=self.check_network_connectivity, daemon=True)
|
||
|
self.network_check_thread.start()
|
||
|
|
||
|
def close(self):
|
||
|
if self.network_check_thread is not None:
|
||
|
self.stop_network_check_thread.set()
|
||
|
self.network_check_thread.join()
|
||
|
|
||
|
def render_network_setup(self, rect: rl.Rectangle):
|
||
|
title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
|
||
|
gui_label(title_rect, "Connect to Wi-Fi", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
|
||
|
|
||
|
wifi_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN + 25, rect.width - MARGIN * 2,
|
||
|
rect.height - TITLE_FONT_SIZE - 25 - BUTTON_HEIGHT - MARGIN * 3)
|
||
|
rl.draw_rectangle_rounded(wifi_rect, 0.05, 10, rl.Color(51, 51, 51, 255))
|
||
|
wifi_content_rect = rl.Rectangle(wifi_rect.x + MARGIN, wifi_rect.y, wifi_rect.width - MARGIN * 2, wifi_rect.height)
|
||
|
self.wifi_ui.render(wifi_content_rect)
|
||
|
|
||
|
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
|
||
|
button_y = rect.height - BUTTON_HEIGHT - MARGIN
|
||
|
|
||
|
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Back"):
|
||
|
self.state = SetupState.GETTING_STARTED
|
||
|
|
||
|
# Check network connectivity status
|
||
|
continue_enabled = self.network_connected.is_set()
|
||
|
continue_text = ("Continue" if self.wifi_connected.is_set() else "Continue without Wi-Fi") if continue_enabled else "Waiting for internet"
|
||
|
|
||
|
if gui_button(
|
||
|
rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT),
|
||
|
continue_text,
|
||
|
button_style=ButtonStyle.PRIMARY if continue_enabled else ButtonStyle.NORMAL,
|
||
|
):
|
||
|
self.state = SetupState.SOFTWARE_SELECTION
|
||
|
self.stop_network_check_thread.set()
|
||
|
|
||
|
def render_software_selection(self, rect: rl.Rectangle):
|
||
|
title_rect = rl.Rectangle(rect.x + MARGIN, rect.y + MARGIN, rect.width - MARGIN * 2, TITLE_FONT_SIZE)
|
||
|
gui_label(title_rect, "Choose Software to Install", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
|
||
|
|
||
|
radio_height = 230
|
||
|
radio_spacing = 30
|
||
|
|
||
|
openpilot_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2, rect.width - MARGIN * 2, radio_height)
|
||
|
openpilot_selected = self.selected_radio == "openpilot"
|
||
|
|
||
|
rl.draw_rectangle_rounded(openpilot_rect, 0.1, 10, rl.Color(70, 91, 234, 255) if openpilot_selected else rl.Color(79, 79, 79, 255))
|
||
|
gui_label(rl.Rectangle(openpilot_rect.x + 100, openpilot_rect.y, openpilot_rect.width - 200, radio_height), "openpilot", BODY_FONT_SIZE)
|
||
|
|
||
|
if openpilot_selected:
|
||
|
checkmark_pos = rl.Vector2(openpilot_rect.x + openpilot_rect.width - 100 - self.checkmark.width,
|
||
|
openpilot_rect.y + radio_height / 2 - self.checkmark.height / 2)
|
||
|
rl.draw_texture_v(self.checkmark, checkmark_pos, rl.WHITE)
|
||
|
|
||
|
custom_rect = rl.Rectangle(rect.x + MARGIN, rect.y + TITLE_FONT_SIZE + MARGIN * 2 + radio_height + radio_spacing, rect.width - MARGIN * 2, radio_height)
|
||
|
custom_selected = self.selected_radio == "custom"
|
||
|
|
||
|
rl.draw_rectangle_rounded(custom_rect, 0.1, 10, rl.Color(70, 91, 234, 255) if custom_selected else rl.Color(79, 79, 79, 255))
|
||
|
gui_label(rl.Rectangle(custom_rect.x + 100, custom_rect.y, custom_rect.width - 200, radio_height), "Custom Software", BODY_FONT_SIZE)
|
||
|
|
||
|
if custom_selected:
|
||
|
checkmark_pos = rl.Vector2(custom_rect.x + custom_rect.width - 100 - self.checkmark.width, custom_rect.y + radio_height / 2 - self.checkmark.height / 2)
|
||
|
rl.draw_texture_v(self.checkmark, checkmark_pos, rl.WHITE)
|
||
|
|
||
|
mouse_pos = rl.get_mouse_position()
|
||
|
if rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT):
|
||
|
if rl.check_collision_point_rec(mouse_pos, openpilot_rect):
|
||
|
self.selected_radio = "openpilot"
|
||
|
elif rl.check_collision_point_rec(mouse_pos, custom_rect):
|
||
|
self.selected_radio = "custom"
|
||
|
|
||
|
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
|
||
|
button_y = rect.height - BUTTON_HEIGHT - MARGIN
|
||
|
|
||
|
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Back"):
|
||
|
self.state = SetupState.NETWORK_SETUP
|
||
|
|
||
|
continue_enabled = self.selected_radio is not None
|
||
|
if gui_button(
|
||
|
rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT),
|
||
|
"Continue",
|
||
|
button_style=ButtonStyle.PRIMARY,
|
||
|
is_enabled=continue_enabled,
|
||
|
):
|
||
|
if continue_enabled:
|
||
|
if self.selected_radio == "openpilot":
|
||
|
self.download(OPENPILOT_URL)
|
||
|
else:
|
||
|
self.state = SetupState.CUSTOM_URL
|
||
|
|
||
|
def render_downloading(self, rect: rl.Rectangle):
|
||
|
title_rect = rl.Rectangle(rect.x, rect.y + rect.height / 2 - TITLE_FONT_SIZE / 2, rect.width, TITLE_FONT_SIZE)
|
||
|
gui_label(title_rect, "Downloading...", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
|
||
|
|
||
|
def render_download_failed(self, rect: rl.Rectangle):
|
||
|
title_rect = rl.Rectangle(rect.x + 117, rect.y + 185, rect.width - 117, TITLE_FONT_SIZE)
|
||
|
gui_label(title_rect, "Download Failed", TITLE_FONT_SIZE, font_weight=FontWeight.MEDIUM)
|
||
|
|
||
|
url_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67, rect.width - 117 - 100, 64)
|
||
|
gui_label(url_rect, self.failed_url, 64, font_weight=FontWeight.NORMAL)
|
||
|
|
||
|
error_rect = rl.Rectangle(rect.x + 117, rect.y + 185 + TITLE_FONT_SIZE + 67 + 64 + 48,
|
||
|
rect.width - 117 - 100, rect.height - 185 + TITLE_FONT_SIZE + 67 + 64 + 48 - BUTTON_HEIGHT - MARGIN * 2)
|
||
|
gui_text_box(error_rect, self.failed_reason, BODY_FONT_SIZE)
|
||
|
|
||
|
button_width = (rect.width - BUTTON_SPACING - MARGIN * 2) / 2
|
||
|
button_y = rect.height - BUTTON_HEIGHT - MARGIN
|
||
|
|
||
|
if gui_button(rl.Rectangle(rect.x + MARGIN, button_y, button_width, BUTTON_HEIGHT), "Reboot device"):
|
||
|
HARDWARE.reboot()
|
||
|
|
||
|
if gui_button(rl.Rectangle(rect.x + MARGIN + button_width + BUTTON_SPACING, button_y, button_width, BUTTON_HEIGHT), "Start over",
|
||
|
button_style=ButtonStyle.PRIMARY):
|
||
|
self.state = SetupState.GETTING_STARTED
|
||
|
|
||
|
def render_custom_url(self):
|
||
|
result = self.keyboard.render("Enter URL", "for Custom Software")
|
||
|
|
||
|
# Enter pressed
|
||
|
if result == 1:
|
||
|
url = self.keyboard.text
|
||
|
self.keyboard.clear()
|
||
|
if url:
|
||
|
self.download(url)
|
||
|
|
||
|
# Cancel pressed
|
||
|
elif result == 0:
|
||
|
self.state = SetupState.SOFTWARE_SELECTION
|
||
|
|
||
|
def download(self, url: str):
|
||
|
# autocomplete incomplete URLs
|
||
|
if re.match("^([^/.]+)/([^/]+)$", url):
|
||
|
url = f"https://installer.comma.ai/{url}"
|
||
|
|
||
|
self.download_url = url
|
||
|
self.state = SetupState.DOWNLOADING
|
||
|
|
||
|
self.download_thread = threading.Thread(target=self._download_thread, daemon=True)
|
||
|
self.download_thread.start()
|
||
|
|
||
|
def _download_thread(self):
|
||
|
try:
|
||
|
import tempfile
|
||
|
|
||
|
_, tmpfile = tempfile.mkstemp(prefix="installer_")
|
||
|
|
||
|
headers = {"User-Agent": USER_AGENT, "X-openpilot-serial": HARDWARE.get_serial()}
|
||
|
req = urllib.request.Request(self.download_url, headers=headers)
|
||
|
|
||
|
with open(tmpfile, 'wb') as f, urllib.request.urlopen(req, timeout=30) as response:
|
||
|
total_size = int(response.headers.get('content-length', 0))
|
||
|
downloaded = 0
|
||
|
block_size = 8192
|
||
|
|
||
|
while True:
|
||
|
buffer = response.read(block_size)
|
||
|
if not buffer:
|
||
|
break
|
||
|
|
||
|
downloaded += len(buffer)
|
||
|
f.write(buffer)
|
||
|
|
||
|
if total_size:
|
||
|
self.download_progress = int(downloaded * 100 / total_size)
|
||
|
|
||
|
is_elf = False
|
||
|
with open(tmpfile, 'rb') as f:
|
||
|
header = f.read(4)
|
||
|
is_elf = header == b'\x7fELF'
|
||
|
|
||
|
if not is_elf:
|
||
|
self.download_failed(self.download_url, "No custom software found at this URL.")
|
||
|
return
|
||
|
|
||
|
os.rename(tmpfile, "/tmp/installer")
|
||
|
os.chmod("/tmp/installer", 0o755)
|
||
|
|
||
|
with open("/tmp/installer_url", "w") as f:
|
||
|
f.write(self.download_url)
|
||
|
|
||
|
gui_app.request_close()
|
||
|
|
||
|
except Exception:
|
||
|
error_msg = "Ensure the entered URL is valid, and the device's internet connection is good."
|
||
|
self.download_failed(self.download_url, error_msg)
|
||
|
|
||
|
def download_failed(self, url: str, reason: str):
|
||
|
self.failed_url = url
|
||
|
self.failed_reason = reason
|
||
|
self.state = SetupState.DOWNLOAD_FAILED
|
||
|
|
||
|
|
||
|
def main():
|
||
|
try:
|
||
|
gui_app.init_window("Setup")
|
||
|
setup = Setup()
|
||
|
for _ in gui_app.render():
|
||
|
setup.render(rl.Rectangle(0, 0, gui_app.width, gui_app.height))
|
||
|
setup.close()
|
||
|
except Exception as e:
|
||
|
print(f"Setup error: {e}")
|
||
|
finally:
|
||
|
gui_app.close()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|