diff --git a/system/ui/lib/scroll_panel.py b/system/ui/lib/scroll_panel.py index 0dc757ff6f..ca8f427830 100644 --- a/system/ui/lib/scroll_panel.py +++ b/system/ui/lib/scroll_panel.py @@ -4,6 +4,7 @@ from enum import IntEnum MOUSE_WHEEL_SCROLL_SPEED = 30 INERTIA_FRICTION = 0.95 # The rate at which the inertia slows down MIN_VELOCITY = 0.1 # Minimum velocity before stopping the inertia +DRAG_THRESHOLD = 5 # Pixels of movement to consider it a drag, not a click class ScrollState(IntEnum): @@ -16,10 +17,12 @@ class GuiScrollPanel: def __init__(self, show_vertical_scroll_bar: bool = False): self._scroll_state: ScrollState = ScrollState.IDLE self._last_mouse_y: float = 0.0 + self._start_mouse_y: float = 0.0 # Track initial mouse position for drag detection self._offset = rl.Vector2(0, 0) self._view = rl.Rectangle(0, 0, 0, 0) self._show_vertical_scroll_bar: bool = show_vertical_scroll_bar self._velocity_y = 0.0 # Velocity for inertia + self._is_dragging = False # Flag to indicate if drag occurred def handle_scroll(self, bounds: rl.Rectangle, content: rl.Rectangle) -> rl.Vector2: mouse_pos = rl.get_mouse_position() @@ -35,20 +38,27 @@ class GuiScrollPanel: self._scroll_state = ScrollState.DRAGGING_SCROLLBAR self._last_mouse_y = mouse_pos.y + self._start_mouse_y = mouse_pos.y # Record starting position self._velocity_y = 0.0 # Reset velocity when drag starts + self._is_dragging = False # Reset dragging flag if self._scroll_state != ScrollState.IDLE: if rl.is_mouse_button_down(rl.MouseButton.MOUSE_BUTTON_LEFT): delta_y = mouse_pos.y - self._last_mouse_y + # Check if movement exceeds drag threshold + total_drag = abs(mouse_pos.y - self._start_mouse_y) + if total_drag > DRAG_THRESHOLD: + self._is_dragging = True + if self._scroll_state == ScrollState.DRAGGING_CONTENT: self._offset.y += delta_y - else: - delta_y = -delta_y + else: # DRAGGING_SCROLLBAR + delta_y = -delta_y # Invert for scrollbar self._last_mouse_y = mouse_pos.y self._velocity_y = delta_y # Update velocity during drag - else: + elif rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT): self._scroll_state = ScrollState.IDLE # Handle mouse wheel scrolling @@ -73,3 +83,10 @@ class GuiScrollPanel: self._offset.y = max(min(self._offset.y, 0), -max_scroll_y) return self._offset + + def is_click_valid(self) -> bool: + return ( + self._scroll_state == ScrollState.IDLE + and not self._is_dragging + and rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT) + ) diff --git a/system/ui/lib/wifi_manager.py b/system/ui/lib/wifi_manager.py new file mode 100644 index 0000000000..c6aeaccf70 --- /dev/null +++ b/system/ui/lib/wifi_manager.py @@ -0,0 +1,479 @@ +import asyncio +import threading +import time +import uuid +from collections.abc import Callable +from dataclasses import dataclass +from enum import IntEnum + +from dbus_next.aio import MessageBus +from dbus_next import BusType, Variant, Message +from dbus_next.errors import DBusError +from dbus_next.constants import MessageType +from openpilot.common.swaglog import cloudlog + +# NetworkManager constants +NM = "org.freedesktop.NetworkManager" +NM_PATH = '/org/freedesktop/NetworkManager' +NM_IFACE = 'org.freedesktop.NetworkManager' +NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings' +NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings' +NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection' +NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless' +NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' +NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" + +NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 + +# NetworkManager device states +class NMDeviceState(IntEnum): + DISCONNECTED = 30 + PREPARE = 40 + NEED_AUTH = 60 + IP_CONFIG = 70 + ACTIVATED = 100 + +class SecurityType(IntEnum): + OPEN = 0 + WPA = 1 + WPA2 = 2 + WPA3 = 3 + UNSUPPORTED = 4 + +@dataclass +class NetworkInfo: + ssid: str + strength: int + is_connected: bool + security_type: SecurityType + path: str + bssid: str + # saved_path: str + + +@dataclass +class WifiManagerCallbacks: + need_auth: Callable[[], None] | None = None + activated: Callable[[], None] | None = None + forgotten: Callable[[], None] | None = None + + +class WifiManager: + def __init__(self, callbacks): + self.callbacks: WifiManagerCallbacks = callbacks + self.networks: list[NetworkInfo] = [] + self.bus: MessageBus = None + self.device_path: str = "" + self.device_proxy = None + self.saved_connections: dict[str, str] = {} + self.active_ap_path: str = "" + self.scan_task: asyncio.Task | None = None + self.running: bool = True + + async def connect(self) -> None: + """Connect to the DBus system bus.""" + try: + self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + if not await self._find_wifi_device(): + raise ValueError("No Wi-Fi device found") + await self._setup_signals(self.device_path) + + self.active_ap_path = await self.get_active_access_point() + self.saved_connections = await self._get_saved_connections() + self.scan_task = asyncio.create_task(self._periodic_scan()) + except DBusError as e: + cloudlog.error(f"Failed to connect to DBus: {e}") + raise + except Exception as e: + cloudlog.error(f"Unexpected error during connect: {e}") + raise + + async def shutdown(self) -> None: + self.running = False + if self.scan_task: + self.scan_task.cancel() + try: + await self.scan_task + except asyncio.CancelledError: + pass + if self.bus: + await self.bus.disconnect() + + async def request_scan(self) -> None: + try: + interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) + await interface.call_request_scan({}) + except DBusError as e: + cloudlog.warning(f"Scan request failed: {str(e)}") + + async def get_active_access_point(self): + try: + props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) + ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') + return ap_path.value + except DBusError as e: + cloudlog.error(f"Error fetching active access point: {str(e)}") + return '' + + async def forget_connection(self, ssid: str) -> bool: + path = self.saved_connections.get(ssid) + if not path: + return False + + try: + nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE) + await nm_iface.call_delete() + return True + except DBusError as e: + cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}") + return False + + async def activate_connection(self, ssid: str) -> bool: + connection_path = self.saved_connections.get(ssid) + if not connection_path: + return False + try: + nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) + await nm_iface.call_activate_connection(connection_path, self.device_path, "/") + return True + except DBusError as e: + cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}") + return False + + async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None: + """Connect to a selected Wi-Fi network.""" + try: + connection = { + 'connection': { + 'type': Variant('s', '802-11-wireless'), + 'uuid': Variant('s', str(uuid.uuid4())), + 'id': Variant('s', ssid), + 'autoconnect-retries': Variant('i', 0), + }, + '802-11-wireless': { + 'ssid': Variant('ay', ssid.encode('utf-8')), + 'hidden': Variant('b', is_hidden), + 'mode': Variant('s', 'infrastructure'), + }, + 'ipv4': {'method': Variant('s', 'auto')}, + 'ipv6': {'method': Variant('s', 'ignore')}, + } + + if bssid: + connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) + + if password: + connection['802-11-wireless-security'] = { + 'key-mgmt': Variant('s', 'wpa-psk'), + 'auth-alg': Variant('s', 'open'), + 'psk': Variant('s', password), + } + + nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) + await nm_iface.call_add_and_activate_connection(connection, self.device_path, "/") + await self._update_connection_status() + + except DBusError as e: + cloudlog.error(f"Error connecting to network: {e}") + + def is_saved(self, ssid: str) -> bool: + return ssid in self.saved_connections + + async def _find_wifi_device(self) -> bool: + nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) + devices = await nm_iface.get_devices() + + for device_path in devices: + device = await self.bus.introspect(NM, device_path) + device_proxy = self.bus.get_proxy_object(NM, device_path, device) + device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) + device_type = await device_interface.get_device_type() # type: ignore[attr-defined] + if device_type == 2: # Wi-Fi device + self.device_path = device_path + self.device_proxy = device_proxy + return True + + return False + + async def _periodic_scan(self): + while self.running: + try: + await self.request_scan() + await self._get_available_networks() + await asyncio.sleep(30) + except asyncio.CancelledError: + break + except DBusError as e: + cloudlog.error(f"Scan failed: {e}") + await asyncio.sleep(5) + + async def _setup_signals(self, device_path: str) -> None: + rules = [ + f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'", + f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'", + f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'", + f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'", + ] + for rule in rules: + await self._add_match_rule(rule) + + # Set up signal handlers + self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed) + self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed) + + settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) + settings_iface.on_new_connection(self._on_new_connection) + settings_iface.on_connection_removed(self._on_connection_removed) + + def _on_properties_changed(self, interface: str, changed: dict, invalidated: list): + # print("property changed", interface, changed, invalidated) + if 'LastScan' in changed: + asyncio.create_task(self._get_available_networks()) + elif interface == NM_WIRELESS_IFACE and "ActiveAccessPoint" in changed: + self.active_ap_path = changed["ActiveAccessPoint"].value + asyncio.create_task(self._get_available_networks()) + + def _on_state_changed(self, new_state: int, old_state: int, reason: int): + print(f"State changed: {old_state} -> {new_state}, reason: {reason}") + if new_state == NMDeviceState.ACTIVATED: + if self.callbacks.activated: + self.callbacks.activated() + asyncio.create_task(self._update_connection_status()) + elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH): + for network in self.networks: + network.is_connected = False + if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth: + self.callbacks.need_auth() + + def _on_new_connection(self, path: str) -> None: + """Callback for NewConnection signal.""" + print(f"New connection added: {path}") + asyncio.create_task(self._add_saved_connection(path)) + + def _on_connection_removed(self, path: str) -> None: + """Callback for ConnectionRemoved signal.""" + print(f"Connection removed: {path}") + for ssid, p in list(self.saved_connections.items()): + if path == p: + del self.saved_connections[ssid] + break + + async def _add_saved_connection(self, path: str) -> None: + """Add a new saved connection to the dictionary.""" + try: + settings = await self._get_connection_settings(path) + if ssid := self._extract_ssid(settings): + self.saved_connections[ssid] = path + except DBusError as e: + cloudlog.error(f"Failed to add connection {path}: {e}") + + def _extract_ssid(self, settings: dict) -> str | None: + """Extract SSID from connection settings.""" + ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value + return ''.join(chr(b) for b in ssid_variant) if ssid_variant else None + + async def _update_connection_status(self): + self.active_ap_path = await self.get_active_access_point() + await self._get_available_networks() + + async def _add_match_rule(self, rule): + """Add a match rule on the bus.""" + reply = await self.bus.call( + Message( + message_type=MessageType.METHOD_CALL, + destination='org.freedesktop.DBus', + interface="org.freedesktop.DBus", + path='/org/freedesktop/DBus', + member='AddMatch', + signature='s', + body=[rule], + ) + ) + + assert reply.message_type == MessageType.METHOD_RETURN + return reply + + async def _get_available_networks(self): + """Get a list of available networks via NetworkManager.""" + wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) + access_points = await wifi_iface.get_access_points() + network_dict = {} + for ap_path in access_points: + try: + props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE) + properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint') + ssid_variant = properties['Ssid'].value + ssid = ''.join(chr(byte) for byte in ssid_variant) + if not ssid: + continue + + bssid = properties.get('HwAddress', Variant('s', '')).value + strength = properties['Strength'].value + flags = properties['Flags'].value + wpa_flags = properties['WpaFlags'].value + rsn_flags = properties['RsnFlags'].value + existing_network = network_dict.get(ssid) + if not existing_network or ((not existing_network.bssid and bssid) or (existing_network.strength < strength)): + network_dict[ssid] = NetworkInfo( + ssid=ssid, + strength=strength, + security_type=self._get_security_type(flags, wpa_flags, rsn_flags), + path=ap_path, + bssid=bssid, + is_connected=self.active_ap_path == ap_path, + ) + + except DBusError as e: + cloudlog.error(f"Error fetching networks: {e}") + except Exception as e: + cloudlog.error({e}) + + self.networks = sorted( + network_dict.values(), + key=lambda network: ( + not network.is_connected, + -network.strength, # Higher signal strength first + network.ssid.lower(), + ), + ) + + async def _get_connection_settings(self, path): + """Fetch connection settings for a specific connection path.""" + try: + connection_proxy = await self.bus.introspect(NM, path) + connection = self.bus.get_proxy_object(NM, path, connection_proxy) + settings = connection.get_interface(NM_CONNECTION_IFACE) + return await settings.call_get_settings() + except DBusError as e: + cloudlog.error(f"Failed to get settings for {path}: {str(e)}") + return {} + + async def _process_chunk(self, paths_chunk): + """Process a chunk of connection paths.""" + tasks = [self._get_connection_settings(path) for path in paths_chunk] + return await asyncio.gather(*tasks, return_exceptions=True) + + async def _get_saved_connections(self) -> dict[str, str]: + try: + settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) + connection_paths = await settings_iface.call_list_connections() + saved_ssids: dict[str, str] = {} + batch_size = 20 + for i in range(0, len(connection_paths), batch_size): + chunk = connection_paths[i : i + batch_size] + results = await self._process_chunk(chunk) + for path, config in zip(chunk, results, strict=True): + if isinstance(config, dict) and '802-11-wireless' in config: + if ssid := self._extract_ssid(config): + saved_ssids[ssid] = path + return saved_ssids + except DBusError as e: + cloudlog.error(f"Error fetching saved connections: {str(e)}") + return {} + + async def _get_interface(self, bus_name: str, path: str, name: str): + introspection = await self.bus.introspect(bus_name, path) + proxy = self.bus.get_proxy_object(bus_name, path, introspection) + return proxy.get_interface(name) + + def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: + """Determine the security type based on flags.""" + if flags == 0 and not (wpa_flags or rsn_flags): + return SecurityType.OPEN + if rsn_flags & 0x200: # SAE (WPA3 Personal) + return SecurityType.WPA3 + if rsn_flags: # RSN indicates WPA2 or higher + return SecurityType.WPA2 + if wpa_flags: # WPA flags indicate WPA + return SecurityType.WPA + return SecurityType.UNSUPPORTED + + +class WifiManagerWrapper: + def __init__(self): + self._manager: WifiManager | None = None + self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks() + + self._thread = threading.Thread(target=self._run, daemon=True) + self._loop: asyncio.EventLoop | None = None + self._running = False + + def set_callbacks(self, callbacks: WifiManagerCallbacks): + self._callbacks = callbacks + + def start(self) -> None: + if not self._running: + self._thread.start() + while self._thread is not None and not self._running: + time.sleep(0.1) + + def _run(self): + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + try: + self._manager = WifiManager(self._callbacks) + self._running = True + self._loop.run_forever() + except Exception as e: + cloudlog.error(f"Error in WifiManagerWrapper thread: {e}") + finally: + if self._loop.is_running(): + self._loop.stop() + self._running = False + + def shutdown(self) -> None: + if self._running: + if self._manager is not None: + self._run_coroutine(self._manager.shutdown()) + if self._loop and self._loop.is_running(): + self._loop.call_soon_threadsafe(self._loop.stop) + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=2.0) + self._running = False + + @property + def networks(self) -> list[NetworkInfo]: + """Get the current list of networks.""" + return self._manager.networks if self._manager else [] + + def is_saved(self, ssid: str) -> bool: + """Check if a network is saved.""" + return self._manager.is_saved(ssid) if self._manager else False + + def connect(self): + """Connect to DBus and start Wi-Fi scanning.""" + if not self._manager: + return + self._run_coroutine(self._manager.connect()) + + def request_scan(self): + """Request a scan for Wi-Fi networks.""" + if not self._manager: + return + self._run_coroutine(self._manager.request_scan()) + + def forget_connection(self, ssid: str): + """Forget a saved Wi-Fi connection.""" + if not self._manager: + return + self._run_coroutine(self._manager.forget_connection(ssid)) + + def activate_connection(self, ssid: str): + """Activate an existing Wi-Fi connection.""" + if not self._manager: + return + self._run_coroutine(self._manager.activate_connection(ssid)) + + def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False): + """Connect to a Wi-Fi network.""" + if not self._manager: + return + self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden)) + + def _run_coroutine(self, coro): + """Run a coroutine in the async thread.""" + if not self._running or not self._loop: + cloudlog.error("WifiManager thread is not running") + return + asyncio.run_coroutine_threadsafe(coro, self._loop) diff --git a/system/ui/widgets/keyboard.py b/system/ui/widgets/keyboard.py index 4d1ad1b2cd..0c2fe65bd7 100644 --- a/system/ui/widgets/keyboard.py +++ b/system/ui/widgets/keyboard.py @@ -44,25 +44,30 @@ keyboard_layouts = { class Keyboard: def __init__(self, max_text_size: int = 255): self._layout = keyboard_layouts["lowercase"] - self._input_text = "" self._max_text_size = max_text_size + self._string_pointer = rl.ffi.new("char[]", max_text_size) + self._input_text: str = '' + self._clear() @property - def text(self) -> str: - return self._input_text - - def clear(self): - self._input_text = "" + def text(self): + result = rl.ffi.string(self._string_pointer).decode('utf-8') + self._clear() + return result def render(self, rect, title, sub_title): gui_label(rl.Rectangle(rect.x, rect.y, rect.width, 95), title, 90) gui_label(rl.Rectangle(rect.x, rect.y + 95, rect.width, 60), sub_title, 55, rl.GRAY) if gui_button(rl.Rectangle(rect.x + rect.width - 300, rect.y, 300, 100), "Cancel"): - return -1 + self._clear() + return 0 # Text box for input - rl.gui_text_box(rl.Rectangle(rect.x, rect.y + 160, rect.width, 100), self._input_text, self._max_text_size, True) - + self._sync_string_pointer() + rl.gui_text_box( + rl.Rectangle(rect.x, rect.y + 160, rect.width, 100), self._string_pointer, self._max_text_size, True + ) + self._input_text = rl.ffi.string(self._string_pointer).decode('utf-8') h_space, v_space = 15, 15 row_y_start = rect.y + 300 # Starting Y position for the first row key_height = (rect.height - 300 - 3 * v_space) / 4 @@ -77,7 +82,11 @@ class Keyboard: if i > 0: start_x += h_space - new_width = (key_width * 3 + h_space * 2) if key == SPACE_KEY else (key_width * 2 + h_space if key == ENTER_KEY else key_width) + new_width = ( + (key_width * 3 + h_space * 2) + if key == SPACE_KEY + else (key_width * 2 + h_space if key == ENTER_KEY else key_width) + ) key_rect = rl.Rectangle(start_x, row_y_start + row * (key_height + v_space), new_width, key_height) start_x += new_width @@ -87,7 +96,7 @@ class Keyboard: else: self.handle_key_press(key) - return 0 + return -1 def handle_key_press(self, key): if key in (SHIFT_DOWN_KEY, ABC_KEY): @@ -102,3 +111,14 @@ class Keyboard: self._input_text = self._input_text[:-1] elif key != BACKSPACE_KEY and len(self._input_text) < self._max_text_size: self._input_text += key + + def _clear(self): + self._input_text = '' + self._string_pointer[0] = b'\0' + + def _sync_string_pointer(self): + """Sync the C-string pointer with the internal Python string.""" + encoded = self._input_text.encode("utf-8")[:self._max_text_size - 1] # Leave room for null terminator + buffer = rl.ffi.buffer(self._string_pointer) + buffer[:len(encoded)] = encoded + self._string_pointer[len(encoded)] = b'\0' # Null terminate diff --git a/system/ui/widgets/network.py b/system/ui/widgets/network.py new file mode 100644 index 0000000000..47318e4a3a --- /dev/null +++ b/system/ui/widgets/network.py @@ -0,0 +1,171 @@ +from dataclasses import dataclass +from typing import Literal + +import pyray as rl +from openpilot.system.ui.lib.wifi_manager import NetworkInfo, WifiManagerCallbacks, WifiManagerWrapper +from openpilot.system.ui.lib.application import gui_app +from openpilot.system.ui.lib.button import gui_button +from openpilot.system.ui.lib.label import gui_label +from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel +from openpilot.system.ui.widgets.keyboard import Keyboard +from openpilot.system.ui.widgets.confirm_dialog import confirm_dialog + +NM_DEVICE_STATE_NEED_AUTH = 60 +ITEM_HEIGHT = 160 + + +@dataclass +class StateIdle: + action: Literal["idle"] = "idle" + +@dataclass +class StateConnecting: + network: NetworkInfo + action: Literal["connecting"] = "connecting" + +@dataclass +class StateNeedsAuth: + network: NetworkInfo + action: Literal["needs_auth"] = "needs_auth" + +@dataclass +class StateShowForgetConfirm: + network: NetworkInfo + action: Literal["show_forget_confirm"] = "show_forget_confirm" + +@dataclass +class StateForgetting: + network: NetworkInfo + action: Literal["forgetting"] = "forgetting" + +UIState = StateIdle | StateConnecting | StateNeedsAuth | StateShowForgetConfirm | StateForgetting + + +class WifiManagerUI: + def __init__(self, wifi_manager: WifiManagerWrapper): + self.state: UIState = StateIdle() + self.btn_width = 200 + self.scroll_panel = GuiScrollPanel() + self.keyboard = Keyboard() + + self.wifi_manager = wifi_manager + self.wifi_manager.set_callbacks(WifiManagerCallbacks(self._on_need_auth, self._on_activated, self._on_forgotten)) + self.wifi_manager.start() + self.wifi_manager.connect() + + def render(self, rect: rl.Rectangle): + if not self.wifi_manager.networks: + gui_label(rect, "Scanning Wi-Fi networks...", 72, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER) + return + + match self.state: + case StateNeedsAuth(network): + result = self.keyboard.render(rect, "Enter password", f"for {network.ssid}") + if result == 1: + self.connect_to_network(network, self.keyboard.text) + elif result == 0: + self.state = StateIdle() + + case StateShowForgetConfirm(network): + result = confirm_dialog(rect, f'Forget Wi-Fi Network "{network.ssid}"?', "Forget") + if result == 1: + self.forget_network(network) + elif result == 0: + self.state = StateIdle() + + case _: + self._draw_network_list(rect) + + def _draw_network_list(self, rect: rl.Rectangle): + content_rect = rl.Rectangle(rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * ITEM_HEIGHT) + offset = self.scroll_panel.handle_scroll(rect, content_rect) + clicked = self.scroll_panel.is_click_valid() + + rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height)) + for i, network in enumerate(self.wifi_manager.networks): + y_offset = rect.y + i * ITEM_HEIGHT + offset.y + item_rect = rl.Rectangle(rect.x, y_offset, rect.width, ITEM_HEIGHT) + if not rl.check_collision_recs(item_rect, rect): + continue + + self._draw_network_item(item_rect, network, clicked) + if i < len(self.wifi_manager.networks) - 1: + line_y = int(item_rect.y + item_rect.height - 1) + rl.draw_line(int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY) + + rl.end_scissor_mode() + + def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool): + label_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, ITEM_HEIGHT) + state_rect = rl.Rectangle(rect.x + rect.width - self.btn_width * 2 - 150, rect.y, 300, ITEM_HEIGHT) + + gui_label(label_rect, network.ssid, 55) + + status_text = "" + if network.is_connected: + status_text = "Connected" + match self.state: + case StateConnecting(network=connecting): + if connecting.ssid == network.ssid: + status_text = "CONNECTING..." + case StateForgetting(network=forgetting): + if forgetting.ssid == network.ssid: + status_text = "FORGETTING..." + if status_text: + rl.gui_label(state_rect, status_text) + + # If the network is saved, show the "Forget" button + if self.wifi_manager.is_saved(network.ssid): + forget_btn_rect = rl.Rectangle( + rect.x + rect.width - self.btn_width, + rect.y + (ITEM_HEIGHT - 80) / 2, + self.btn_width, + 80, + ) + if isinstance(self.state, StateIdle) and gui_button(forget_btn_rect, "Forget") and clicked: + self.state = StateShowForgetConfirm(network) + + if isinstance(self.state, StateIdle) and rl.check_collision_point_rec(rl.get_mouse_position(), label_rect) and clicked: + if not self.wifi_manager.is_saved(network.ssid): + self.state = StateNeedsAuth(network) + else: + self.connect_to_network(network) + + def connect_to_network(self, network: NetworkInfo, password=''): + if self.wifi_manager.is_saved(network.ssid) and not password: + self.wifi_manager.activate_connection(network.ssid) + else: + self.wifi_manager.connect_to_network(network.ssid, password) + + def forget_network(self, network: NetworkInfo): + self.state = StateForgetting(network) + self.wifi_manager.forget_connection(network.ssid) + + def _on_need_auth(self): + match self.state: + case StateConnecting(network): + self.state = StateNeedsAuth(network) + + def _on_activated(self): + if isinstance(self.state, StateConnecting): + self.state = StateIdle() + + def _on_forgotten(self): + if isinstance(self.state, StateForgetting): + self.state = StateIdle() + + +def main(): + gui_app.init_window("Wi-Fi Manager") + wifi_manager = WifiManagerWrapper() + wifi_ui = WifiManagerUI(wifi_manager) + + for _ in gui_app.render(): + wifi_ui.render(rl.Rectangle(50, 50, gui_app.width - 100, gui_app.height - 100)) + + wifi_manager.shutdown() + gui_app.close() + + +if __name__ == "__main__": + main()