From 5359f6d35464448491e1d036d6970ee67addc821 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Tue, 26 Aug 2025 01:23:59 -0700 Subject: [PATCH] raylib: clean up networking (#36039) * stasj * remove one of many classes * clean up and fix * clean up * stash/draft: oh this is sick * so epic * some clean up * what the fuck, it doesn't even use these * more epic initializers + make it kind of work * so simple, wonder if we should further 2x reduce line count * i've never ever seen this pattern b4, rm * remove bs add niceness * minor organization * set security type and support listing and rming conns * forget and connect * jeepney is actually pretty good, it's 2x faster to get wifi device (0.005s to 0.002s) * temp * do blocking add in worker thread * add jeepney * lets finish with python-dbus first then evaluate - revert jeepney This reverts commit 7de04b11c2285c298bb1ec907782026c795ab207. and * safe wrap * missing * saved connections * set rest of callbacks * skip hidden APs, simplify _running * add state management * either wrong password or disconnected for now * i can't believe we didn't check this... * disable button if unsupported!!! * hide/show event no lag hopefully yayay * fix hide event * remove old wifi manager * cache wifi device path + some clean up * more clean up * more clean up * temp disable blocking prime thread * hackily get device path once * ok * debug * fix open networks * debug * clean up * all threads wait for device path, and user functions dont ever attempt to get, just skip * same place * helper * Revert "helper" This reverts commit e237d9a720915fb6bd67c539123d3e2d9d582ce1. * organize? * Revert "organize?" This reverts commit 3aca3e5d629c947711ade88799febeb3f61eda87. * c word is a bad word * rk monitor debug for now * nothing crazy * improve checkmark responsiveness * when forgetting: this is correct, but feels unresponsive * this feels good * need these two to keep forgetting and activating responsive * sort by connected, strength, then name * handle non-critical race condition * log more * unused * oh jubilee is sick you can block on signals!! * proof of concept to see if works on device whoiops * so sucking fick * ah this is not generic, it's a filter on the return vals * flip around to not drop * oh thank god * fix * stash * atomic replace * clean up * add old to keep track of what's moved over * these are already moved * so much junk * so much junk * more * tethering wasn't used so we can ignore that for now * no params now * rm duplicate imports * not used anymore * move get wifi device over to jeepney! ~no additional lines * request scan w/ keepney * get_conns * _connection_by_ssid_jeepney is 2x faster (0.01 vs 0.02s) * do forget and activate * _update_networks matches! * rm old update_networks * replace connect_to_network, about same time (yes i removed thread call) * no more python-dbus!k * doesn't hurt * AP.from_dbus: actually handle incorrect paths w/ jeep + more efficient single call * properly handle errors * it's jeepney now * less state * using the thread safe router passes a race condition test that conn failed! * bad to copy from old wifimanager * fix conn usage * clean up * curious if locks are lagging * not for now * Revert "curious if locks are lagging" This reverts commit 085dd185b083f5905a4e71ba3e8c0565175e04aa. * clean up _monitor_state * remove tests * clean up dataclasses * sort * lint: okay fine it can be non by virtue of exiting right at the perfect time * some network clean up * some wifi manager clean up * this is handled * stop can be called manually, from deleting wifimanager, or exiting python. some protection * its not mutable anymore * scan on enter * clean up * back * lint * catch dbus fail to connect catch dbus fail to connect --- pyproject.toml | 2 +- selfdrive/ui/layouts/settings/settings.py | 8 +- system/ui/lib/wifi_manager.py | 970 ++++++++-------------- system/ui/setup.py | 5 +- system/ui/updater.py | 5 +- system/ui/widgets/network.py | 58 +- uv.lock | 22 +- 7 files changed, 385 insertions(+), 685 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7103163458..556b78009d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,8 +101,8 @@ dev = [ "av", "azure-identity", "azure-storage-blob", - "dbus-next", "dictdiffer", + "jeepney", "matplotlib", "opencv-python-headless", "parameterized >=0.8, <0.9", diff --git a/selfdrive/ui/layouts/settings/settings.py b/selfdrive/ui/layouts/settings/settings.py index fe35ea094a..a731a9158c 100644 --- a/selfdrive/ui/layouts/settings/settings.py +++ b/selfdrive/ui/layouts/settings/settings.py @@ -9,7 +9,7 @@ from openpilot.selfdrive.ui.layouts.settings.software import SoftwareLayout from openpilot.selfdrive.ui.layouts.settings.toggles import TogglesLayout from openpilot.system.ui.lib.application import gui_app, FontWeight, MousePos from openpilot.system.ui.lib.text_measure import measure_text_cached -from openpilot.system.ui.lib.wifi_manager import WifiManagerWrapper +from openpilot.system.ui.lib.wifi_manager import WifiManager from openpilot.system.ui.widgets import Widget from openpilot.system.ui.widgets.network import WifiManagerUI @@ -54,12 +54,12 @@ class SettingsLayout(Widget): self._current_panel = PanelType.DEVICE # Panel configuration - self.wifi_manager = WifiManagerWrapper() - self.wifi_ui = WifiManagerUI(self.wifi_manager) + wifi_manager = WifiManager() + wifi_manager.set_active(False) self._panels = { PanelType.DEVICE: PanelInfo("Device", DeviceLayout()), - PanelType.NETWORK: PanelInfo("Network", self.wifi_ui), + PanelType.NETWORK: PanelInfo("Network", WifiManagerUI(wifi_manager)), PanelType.TOGGLES: PanelInfo("Toggles", TogglesLayout()), PanelType.SOFTWARE: PanelInfo("Software", SoftwareLayout()), PanelType.FIREHOSE: PanelInfo("Firehose", FirehoseLayout()), diff --git a/system/ui/lib/wifi_manager.py b/system/ui/lib/wifi_manager.py index 7bd292fa43..96940c2b3b 100644 --- a/system/ui/lib/wifi_manager.py +++ b/system/ui/lib/wifi_manager.py @@ -1,35 +1,35 @@ -import asyncio -import concurrent.futures -import copy +import atexit import threading import time import uuid from collections.abc import Callable from dataclasses import dataclass from enum import IntEnum -from typing import TypeVar +from typing import Any -from dbus_next.aio import MessageBus -from dbus_next import BusType, Variant, Message -from dbus_next.errors import DBusError -from dbus_next.constants import MessageType +from jeepney import DBusAddress, new_method_call +from jeepney.bus_messages import MatchRule, message_bus +from jeepney.io.blocking import open_dbus_connection as open_dbus_connection_blocking +from jeepney.io.threading import DBusRouter, open_dbus_connection as open_dbus_connection_threading +from jeepney.low_level import MessageType +from jeepney.wrappers import Properties -from openpilot.system.ui.lib.networkmanager import (NM, NM_PATH, NM_IFACE, NM_SETTINGS_PATH, NM_SETTINGS_IFACE, - NM_CONNECTION_IFACE, NM_WIRELESS_IFACE, NM_PROPERTIES_IFACE, - NM_DEVICE_IFACE, NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT, - NMDeviceState) - -try: - from openpilot.common.params import Params -except ImportError: - # Params/Cythonized modules are not available in zipapp - Params = None from openpilot.common.swaglog import cloudlog - -T = TypeVar("T") +from openpilot.system.ui.lib.networkmanager import (NM, NM_WIRELESS_IFACE, NM_802_11_AP_SEC_PAIR_WEP40, + NM_802_11_AP_SEC_PAIR_WEP104, NM_802_11_AP_SEC_GROUP_WEP40, + NM_802_11_AP_SEC_GROUP_WEP104, NM_802_11_AP_SEC_KEY_MGMT_PSK, + NM_802_11_AP_SEC_KEY_MGMT_802_1X, NM_802_11_AP_FLAGS_NONE, + NM_802_11_AP_FLAGS_PRIVACY, NM_802_11_AP_FLAGS_WPS, + NM_PATH, NM_IFACE, NM_ACCESS_POINT_IFACE, NM_SETTINGS_PATH, + NM_SETTINGS_IFACE, NM_CONNECTION_IFACE, NM_DEVICE_IFACE, + NM_DEVICE_TYPE_WIFI, NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT, + NM_DEVICE_STATE_REASON_NEW_ACTIVATION, + NMDeviceState) TETHERING_IP_ADDRESS = "192.168.43.1" DEFAULT_TETHERING_PASSWORD = "swagswagcomma" +SIGNAL_QUEUE_SIZE = 10 +SCAN_PERIOD_SECONDS = 10 class SecurityType(IntEnum): @@ -40,673 +40,377 @@ class SecurityType(IntEnum): UNSUPPORTED = 4 -@dataclass -class NetworkInfo: +def get_security_type(flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: + wpa_props = wpa_flags | rsn_flags + + # obtained by looking at flags of networks in the office as reported by an Android phone + supports_wpa = (NM_802_11_AP_SEC_PAIR_WEP40 | NM_802_11_AP_SEC_PAIR_WEP104 | NM_802_11_AP_SEC_GROUP_WEP40 | + NM_802_11_AP_SEC_GROUP_WEP104 | NM_802_11_AP_SEC_KEY_MGMT_PSK) + + if (flags == NM_802_11_AP_FLAGS_NONE) or ((flags & NM_802_11_AP_FLAGS_WPS) and not (wpa_props & supports_wpa)): + return SecurityType.OPEN + elif (flags & NM_802_11_AP_FLAGS_PRIVACY) and (wpa_props & supports_wpa) and not (wpa_props & NM_802_11_AP_SEC_KEY_MGMT_802_1X): + return SecurityType.WPA + else: + cloudlog.warning(f"Unsupported network! flags: {flags}, wpa_flags: {wpa_flags}, rsn_flags: {rsn_flags}") + return SecurityType.UNSUPPORTED + + +@dataclass(frozen=True) +class Network: ssid: str strength: int is_connected: bool security_type: SecurityType - path: str - bssid: str - is_saved: bool = False - # saved_path: str + is_saved: bool + + @classmethod + def from_dbus(cls, ssid: str, aps: list["AccessPoint"], is_saved: bool) -> "Network": + # we only want to show the strongest AP for each Network/SSID + strongest_ap = max(aps, key=lambda ap: ap.strength) + is_connected = any(ap.is_connected for ap in aps) + security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags) + + return cls( + ssid=ssid, + strength=strongest_ap.strength, + is_connected=is_connected and is_saved, + security_type=security_type, + is_saved=is_saved, + ) -@dataclass -class WifiManagerCallbacks: - need_auth: Callable[[str], None] | None = None - activated: Callable[[], None] | None = None - forgotten: Callable[[str], None] | None = None - networks_updated: Callable[[list[NetworkInfo]], None] | None = None - connection_failed: Callable[[str, str], None] | None = None # Added for error feedback +@dataclass(frozen=True) +class AccessPoint: + ssid: str + bssid: str + strength: int + is_connected: bool + flags: int + wpa_flags: int + rsn_flags: int + ap_path: str + + @classmethod + def from_dbus(cls, ap_props: dict[str, tuple[str, Any]], ap_path: str, active_ap_path: str) -> "AccessPoint": + ssid = bytes(ap_props['Ssid'][1]).decode("utf-8", "replace") + bssid = str(ap_props['HwAddress'][1]) + strength = int(ap_props['Strength'][1]) + flags = int(ap_props['Flags'][1]) + wpa_flags = int(ap_props['WpaFlags'][1]) + rsn_flags = int(ap_props['RsnFlags'][1]) + + return cls( + ssid=ssid, + bssid=bssid, + strength=strength, + is_connected=ap_path == active_ap_path, + flags=flags, + wpa_flags=wpa_flags, + rsn_flags=rsn_flags, + ap_path=ap_path, + ) class WifiManager: - def __init__(self, callbacks): - self.callbacks: WifiManagerCallbacks = callbacks - self.networks: list[NetworkInfo] = [] - self.bus: MessageBus = None - self.device_path: str = "" - self.device_proxy = None - self.saved_connections: dict[str, str] = {} - self.active_ap_path: str = "" - self.scan_task: asyncio.Task | None = None - # Set tethering ssid as "weedle" + first 4 characters of a dongle id - self._tethering_ssid = "weedle" - if Params is not None: - dongle_id = Params().get("DongleId") - if dongle_id: - self._tethering_ssid += "-" + dongle_id[:4] - self.running: bool = True - self._current_connection_ssid: str | None = None - - async def connect(self) -> None: - """Connect to the DBus system bus.""" - try: - self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect() - while not await self._find_wifi_device(): - await asyncio.sleep(1) - - await self._setup_signals(self.device_path) - self.active_ap_path = await self.get_active_access_point() - await self.add_tethering_connection(self._tethering_ssid, DEFAULT_TETHERING_PASSWORD) - self.saved_connections = await self._get_saved_connections() - self.scan_task = asyncio.create_task(self._periodic_scan()) - except DBusError as e: - cloudlog.error(f"Failed to connect to DBus: {e}") - raise - except Exception as e: - cloudlog.error(f"Unexpected error during connect: {e}") - raise - - async def shutdown(self) -> None: - self.running = False - if self.scan_task: - self.scan_task.cancel() - try: - await self.scan_task - except asyncio.CancelledError: - pass - if self.bus: - self.bus.disconnect() + def __init__(self): + self._networks = [] # a network can be comprised of multiple APs + self._active = True # used to not run when not in settings + self._exit = False - async def _request_scan(self) -> None: + # DBus connections try: - interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) - await interface.call_request_scan({}) - except DBusError as e: - cloudlog.warning(f"Scan request failed: {str(e)}") + self._router_main = DBusRouter(open_dbus_connection_threading(bus="SYSTEM")) # used by scanner / general method calls + self._conn_monitor = open_dbus_connection_blocking(bus="SYSTEM") # used by state monitor thread + self._nm = DBusAddress(NM_PATH, bus_name=NM, interface=NM_IFACE) + except FileNotFoundError: + cloudlog.exception("Failed to connect to system D-Bus") + self._exit = True + + # Store wifi device path + self._wifi_device: str | None = None + + # State + self._connecting_to_ssid: str = "" + self._last_network_update: float = 0.0 + + # Callbacks + # TODO: implement a callback queue to avoid blocking UI thread + self._need_auth: Callable[[str], None] | None = None + self._activated: Callable[[], None] | None = None + self._forgotten: Callable[[str], None] | None = None + self._networks_updated: Callable[[list[Network]], None] | None = None + self._disconnected: Callable[[], None] | None = None + + self._lock = threading.Lock() + + self._scan_thread = threading.Thread(target=self._network_scanner, daemon=True) + self._scan_thread.start() + + self._state_thread = threading.Thread(target=self._monitor_state, daemon=True) + self._state_thread.start() + + atexit.register(self.stop) + + def set_callbacks(self, need_auth: Callable[[str], None], + activated: Callable[[], None] | None, + forgotten: Callable[[str], None], + networks_updated: Callable[[list[Network]], None], + disconnected: Callable[[], None]): + self._need_auth = need_auth + self._activated = activated + self._forgotten = forgotten + self._networks_updated = networks_updated + self._disconnected = disconnected + + def set_active(self, active: bool): + self._active = active + + # Scan immediately if we haven't scanned in a while + if active and time.monotonic() - self._last_network_update > SCAN_PERIOD_SECONDS / 2: + self._last_network_update = 0.0 + + def _monitor_state(self): + device_path = self._wait_for_wifi_device() + if device_path is None: + return - async def get_active_access_point(self): - try: - props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) - ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') - return ap_path.value - except DBusError as e: - cloudlog.error(f"Error fetching active access point: {str(e)}") - return '' - - async def forget_connection(self, ssid: str) -> bool: - path = self.saved_connections.get(ssid) - if not path: - return False + rule = MatchRule( + type="signal", + interface=NM_DEVICE_IFACE, + member="StateChanged", + path=device_path, + ) - try: - nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE) - await nm_iface.call_delete() + # Filter for StateChanged signal + self._conn_monitor.send_and_get_reply(message_bus.AddMatch(rule)) - if self._current_connection_ssid == ssid: - self._current_connection_ssid = None + with self._conn_monitor.filter(rule, bufsize=SIGNAL_QUEUE_SIZE) as q: + while not self._exit: + # TODO: always run, and ensure callbacks don't block UI thread + if not self._active: + time.sleep(1) + continue - if ssid in self.saved_connections: - del self.saved_connections[ssid] + # Block until a matching signal arrives + try: + msg = self._conn_monitor.recv_until_filtered(q, timeout=1) + except TimeoutError: + continue - for network in self.networks: - if network.ssid == ssid: - network.is_saved = False - network.is_connected = False + new_state, previous_state, change_reason = msg.body + + # BAD PASSWORD + if new_state == NMDeviceState.NEED_AUTH and change_reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and len(self._connecting_to_ssid): + self.forget_connection(self._connecting_to_ssid, block=True) + if self._need_auth is not None: + self._need_auth(self._connecting_to_ssid) + self._connecting_to_ssid = "" + + elif new_state == NMDeviceState.ACTIVATED: + if self._activated is not None: + self._update_networks() + self._activated() + self._connecting_to_ssid = "" + + elif new_state == NMDeviceState.DISCONNECTED and change_reason != NM_DEVICE_STATE_REASON_NEW_ACTIVATION: + self._connecting_to_ssid = "" + if self._disconnected is not None: + self._disconnected() + + def _network_scanner(self): + self._wait_for_wifi_device() + + while not self._exit: + if self._active: + if time.monotonic() - self._last_network_update > SCAN_PERIOD_SECONDS: + # Scan for networks every 10 seconds + # TODO: should update when scan is complete (PropertiesChanged), but this is more than good enough for now + self._update_networks() + self._request_scan() + self._last_network_update = time.monotonic() + time.sleep(1 / 2.) + + def _wait_for_wifi_device(self) -> str | None: + with self._lock: + device_path: str | None = None + while not self._exit: + device_path = self._get_wifi_device() + if device_path is not None: break + time.sleep(1) + return device_path - # Notify UI of forgotten connection - if self.callbacks.networks_updated: - self.callbacks.networks_updated(copy.deepcopy(self.networks)) + def _get_wifi_device(self) -> str | None: + if self._wifi_device is not None: + return self._wifi_device - return True - except DBusError as e: - cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}") - return False + device_paths = self._router_main.send_and_get_reply(new_method_call(self._nm, 'GetDevices')).body[0] + for device_path in device_paths: + dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_DEVICE_IFACE) + dev_type = self._router_main.send_and_get_reply(Properties(dev_addr).get('DeviceType')).body[0][1] - async def activate_connection(self, ssid: str) -> bool: - connection_path = self.saved_connections.get(ssid) - if not connection_path: - return False - try: - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - await nm_iface.call_activate_connection(connection_path, self.device_path, "/") - return True - except DBusError as e: - cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}") - return False - - async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None: - """Connect to a selected Wi-Fi network.""" - try: - self._current_connection_ssid = ssid - - if ssid in self.saved_connections: - # Forget old connection if new password provided - if password: - await self.forget_connection(ssid) - await asyncio.sleep(0.2) # NetworkManager delay - else: - # Just activate existing connection - await self.activate_connection(ssid) - return + if dev_type == NM_DEVICE_TYPE_WIFI: + self._wifi_device = device_path + break - connection = { - 'connection': { - 'type': Variant('s', '802-11-wireless'), - 'uuid': Variant('s', str(uuid.uuid4())), - 'id': Variant('s', f'openpilot connection {ssid}'), - 'autoconnect-retries': Variant('i', 0), - }, - '802-11-wireless': { - 'ssid': Variant('ay', ssid.encode('utf-8')), - 'hidden': Variant('b', is_hidden), - 'mode': Variant('s', 'infrastructure'), - }, - 'ipv4': { - 'method': Variant('s', 'auto'), - 'dns-priority': Variant('i', 600), - }, - 'ipv6': {'method': Variant('s', 'ignore')}, - } + return self._wifi_device - if bssid: - connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) + def _get_connections(self) -> list[str]: + settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE) + return list(self._router_main.send_and_get_reply(new_method_call(settings_addr, 'ListConnections')).body[0]) - if password: - connection['802-11-wireless-security'] = { - 'key-mgmt': Variant('s', 'wpa-psk'), - 'auth-alg': Variant('s', 'open'), - 'psk': Variant('s', password), - } + def _connection_by_ssid(self, ssid: str, known_connections: list[str] | None = None) -> str | None: + for conn_path in known_connections or self._get_connections(): + conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE) + reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, "GetSettings")) - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - await nm_iface.call_add_and_activate_connection(connection, self.device_path, "/") - except Exception as e: - self._current_connection_ssid = None - cloudlog.error(f"Error connecting to network: {e}") - # Notify UI of failure - if self.callbacks.connection_failed: - self.callbacks.connection_failed(ssid, str(e)) - - def is_saved(self, ssid: str) -> bool: - return ssid in self.saved_connections - - async def _find_wifi_device(self) -> bool: - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - devices = await nm_iface.get_devices() - - for device_path in devices: - device = await self.bus.introspect(NM, device_path) - device_proxy = self.bus.get_proxy_object(NM, device_path, device) - device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) - device_type = await device_interface.get_device_type() # type: ignore[attr-defined] - if device_type == 2: # Wi-Fi device - self.device_path = device_path - self.device_proxy = device_proxy - return True - - return False - - async def add_tethering_connection(self, ssid: str, password: str = "12345678") -> bool: - """Create a WiFi tethering connection.""" - if len(password) < 8: - print("Tethering password must be at least 8 characters") - return False + # ignore connections removed during iteration (need auth, etc.) + if reply.header.message_type == MessageType.error: + cloudlog.warning(f"Failed to get connection properties for {conn_path}") + continue - try: - # First, check if a hotspot connection already exists - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - connection_paths = await settings_iface.call_list_connections() + settings = reply.body[0] + if "802-11-wireless" in settings and settings['802-11-wireless']['ssid'][1].decode("utf-8", "replace") == ssid: + return conn_path + return None - # Look for an existing hotspot connection - for path in connection_paths: - try: - settings = await self._get_connection_settings(path) - conn_type = settings.get('connection', {}).get('type', Variant('s', '')).value - wifi_mode = settings.get('802-11-wireless', {}).get('mode', Variant('s', '')).value - - if conn_type == '802-11-wireless' and wifi_mode == 'ap': - # Extract the SSID to check - connection_ssid = self._extract_ssid(settings) - if connection_ssid == ssid: - return True - except DBusError: - continue + def connect_to_network(self, ssid: str, password: str): + def worker(): + # Clear all connections that may already exist to the network we are connecting to + self._connecting_to_ssid = ssid + self.forget_connection(ssid, block=True) + + is_hidden = False connection = { 'connection': { - 'id': Variant('s', 'Hotspot'), - 'uuid': Variant('s', str(uuid.uuid4())), - 'type': Variant('s', '802-11-wireless'), - 'interface-name': Variant('s', 'wlan0'), - 'autoconnect': Variant('b', False), + 'type': ('s', '802-11-wireless'), + 'uuid': ('s', str(uuid.uuid4())), + 'id': ('s', f'openpilot connection {ssid}'), + 'autoconnect-retries': ('i', 0), }, '802-11-wireless': { - 'band': Variant('s', 'bg'), - 'mode': Variant('s', 'ap'), - 'ssid': Variant('ay', ssid.encode('utf-8')), - }, - '802-11-wireless-security': { - 'group': Variant('as', ['ccmp']), - 'key-mgmt': Variant('s', 'wpa-psk'), - 'pairwise': Variant('as', ['ccmp']), - 'proto': Variant('as', ['rsn']), - 'psk': Variant('s', password), + 'ssid': ('ay', ssid.encode("utf-8")), + 'hidden': ('b', is_hidden), + 'mode': ('s', 'infrastructure'), }, 'ipv4': { - 'method': Variant('s', 'shared'), - 'address-data': Variant('aa{sv}', [{'address': Variant('s', TETHERING_IP_ADDRESS), 'prefix': Variant('u', 24)}]), - 'gateway': Variant('s', TETHERING_IP_ADDRESS), - 'never-default': Variant('b', True), - }, - 'ipv6': { - 'method': Variant('s', 'ignore'), + 'method': ('s', 'auto'), + 'dns-priority': ('i', 600), }, + 'ipv6': {'method': ('s', 'ignore')}, } - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - new_connection = await settings_iface.call_add_connection(connection) - print(f"Added tethering connection with path: {new_connection}") - return True - except DBusError as e: - print(f"Failed to add tethering connection: {e}") - return False - except Exception as e: - print(f"Unexpected error adding tethering connection: {e}") - return False - - async def get_tethering_password(self) -> str: - """Get the current tethering password.""" - try: - hotspot_path = self.saved_connections.get(self._tethering_ssid) - if hotspot_path: - conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) - secrets = await conn_iface.call_get_secrets('802-11-wireless-security') - if secrets and '802-11-wireless-security' in secrets: - psk = secrets.get('802-11-wireless-security', {}).get('psk', Variant('s', '')).value - return str(psk) if psk is not None else "" - return "" - except DBusError as e: - print(f"Failed to get tethering password: {e}") - return "" - except Exception as e: - print(f"Unexpected error getting tethering password: {e}") - return "" - - async def set_tethering_password(self, password: str) -> bool: - """Set the tethering password.""" - if len(password) < 8: - cloudlog.error("Tethering password must be at least 8 characters") - return False + if password: + connection['802-11-wireless-security'] = { + 'key-mgmt': ('s', 'wpa-psk'), + 'auth-alg': ('s', 'open'), + 'psk': ('s', password), + } - try: - hotspot_path = self.saved_connections.get(self._tethering_ssid) - if not hotspot_path: - print("No hotspot connection found") - return False - - # Update the connection settings with new password - settings = await self._get_connection_settings(hotspot_path) - if '802-11-wireless-security' not in settings: - settings['802-11-wireless-security'] = {} - settings['802-11-wireless-security']['psk'] = Variant('s', password) - - # Apply changes - conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) - await conn_iface.call_update(settings) - - # Check if connection is active and restart if needed - is_active = False - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - active_connections = await nm_iface.get_active_connections() - - for conn_path in active_connections: - props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) - conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') - if conn_id_path.value == hotspot_path: - is_active = True - await nm_iface.call_deactivate_connection(conn_path) - break + settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE) + self._router_main.send_and_get_reply(new_method_call(settings_addr, 'AddConnection', 'a{sa{sv}}', (connection,))) + self.activate_connection(ssid, block=True) + + threading.Thread(target=worker, daemon=True).start() + + def forget_connection(self, ssid: str, block: bool = False): + def worker(): + conn_path = self._connection_by_ssid(ssid) + if conn_path is not None: + conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE) + self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Delete')) + + if self._forgotten is not None: + self._update_networks() + self._forgotten(ssid) + + if block: + worker() + else: + threading.Thread(target=worker, daemon=True).start() + + def activate_connection(self, ssid: str, block: bool = False): + def worker(): + conn_path = self._connection_by_ssid(ssid) + if conn_path is not None: + if self._wifi_device is None: + cloudlog.warning("No WiFi device found") + return - if is_active: - await nm_iface.call_activate_connection(hotspot_path, self.device_path, "/") + self._connecting_to_ssid = ssid + self._router_main.send(new_method_call(self._nm, 'ActivateConnection', 'ooo', + (conn_path, self._wifi_device, "/"))) - print("Tethering password updated successfully") - return True - except DBusError as e: - print(f"Failed to set tethering password: {e}") - return False - except Exception as e: - print(f"Unexpected error setting tethering password: {e}") - return False + if block: + worker() + else: + threading.Thread(target=worker, daemon=True).start() - async def is_tethering_active(self) -> bool: - """Check if tethering is active for the specified SSID.""" - try: - hotspot_path = self.saved_connections.get(self._tethering_ssid) - if not hotspot_path: - return False + def _request_scan(self): + if self._wifi_device is None: + cloudlog.warning("No WiFi device found") + return - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - active_connections = await nm_iface.get_active_connections() + wifi_addr = DBusAddress(self._wifi_device, bus_name=NM, interface=NM_WIRELESS_IFACE) + reply = self._router_main.send_and_get_reply(new_method_call(wifi_addr, 'RequestScan', 'a{sv}', ({},))) - for conn_path in active_connections: - props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) - conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') + if reply.header.message_type == MessageType.error: + cloudlog.warning(f"Failed to request scan: {reply}") - if conn_id_path.value == hotspot_path: - return True + def _update_networks(self): + if self._wifi_device is None: + cloudlog.warning("No WiFi device found") + return - return False - except Exception: - return False + # returns '/' if no active AP + wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE) + active_ap_path = self._router_main.send_and_get_reply(Properties(wifi_addr).get('ActiveAccessPoint')).body[0][1] + ap_paths = self._router_main.send_and_get_reply(new_method_call(wifi_addr, 'GetAllAccessPoints')).body[0] - async def _periodic_scan(self): - while self.running: - try: - await self._request_scan() - await asyncio.sleep(30) - except asyncio.CancelledError: - break - except DBusError as e: - cloudlog.error(f"Scan failed: {e}") - await asyncio.sleep(5) - - async def _setup_signals(self, device_path: str) -> None: - rules = [ - f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'", - f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'", - f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'", - f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'", - ] - for rule in rules: - await self._add_match_rule(rule) - - # Set up signal handlers - self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed) - self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed) - - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - settings_iface.on_new_connection(self._on_new_connection) - settings_iface.on_connection_removed(self._on_connection_removed) - - def _on_properties_changed(self, interface: str, changed: dict, invalidated: list): - if interface == NM_WIRELESS_IFACE and 'LastScan' in changed: - asyncio.create_task(self._refresh_networks()) - elif interface == NM_WIRELESS_IFACE and "ActiveAccessPoint" in changed: - new_ap_path = changed["ActiveAccessPoint"].value - if self.active_ap_path != new_ap_path: - self.active_ap_path = new_ap_path - - def _on_state_changed(self, new_state: int, old_state: int, reason: int): - if new_state == NMDeviceState.ACTIVATED: - if self.callbacks.activated: - self.callbacks.activated() - self._current_connection_ssid = None - asyncio.create_task(self._refresh_networks()) - elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH): - for network in self.networks: - network.is_connected = False - - # BAD PASSWORD - if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth: - if self._current_connection_ssid: - asyncio.create_task(self.forget_connection(self._current_connection_ssid)) - self.callbacks.need_auth(self._current_connection_ssid) - else: - # Try to find the network from active_ap_path - for network in self.networks: - if network.path == self.active_ap_path: - asyncio.create_task(self.forget_connection(network.ssid)) - self.callbacks.need_auth(network.ssid) - break - else: - # Couldn't identify the network that needs auth - cloudlog.error("Network needs authentication but couldn't identify which one") - - def _on_new_connection(self, path: str) -> None: - """Callback for NewConnection signal.""" - asyncio.create_task(self._add_saved_connection(path)) - - def _on_connection_removed(self, path: str) -> None: - """Callback for ConnectionRemoved signal.""" - for ssid, p in list(self.saved_connections.items()): - if path == p: - del self.saved_connections[ssid] - - if self.callbacks.forgotten: - self.callbacks.forgotten(ssid) - break + aps: dict[str, list[AccessPoint]] = {} - async def _add_saved_connection(self, path: str) -> None: - """Add a new saved connection to the dictionary.""" - try: - settings = await self._get_connection_settings(path) - if ssid := self._extract_ssid(settings): - self.saved_connections[ssid] = path - except DBusError as e: - cloudlog.error(f"Failed to add connection {path}: {e}") - - def _extract_ssid(self, settings: dict) -> str | None: - """Extract SSID from connection settings.""" - ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value - return bytes(ssid_variant).decode('utf-8') if ssid_variant else None - - async def _add_match_rule(self, rule): - """Add a match rule on the bus.""" - reply = await self.bus.call( - Message( - message_type=MessageType.METHOD_CALL, - destination='org.freedesktop.DBus', - interface="org.freedesktop.DBus", - path='/org/freedesktop/DBus', - member='AddMatch', - signature='s', - body=[rule], - ) - ) + for ap_path in ap_paths: + ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE) + ap_props = self._router_main.send_and_get_reply(Properties(ap_addr).get_all()) - assert reply.message_type == MessageType.METHOD_RETURN - return reply + # some APs have been seen dropping off during iteration + if ap_props.header.message_type == MessageType.error: + cloudlog.warning(f"Failed to get AP properties for {ap_path}") + continue - async def _refresh_networks(self): - """Get a list of available networks via NetworkManager.""" - wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) - access_points = await wifi_iface.get_access_points() - self.active_ap_path = await self.get_active_access_point() - network_dict = {} - for ap_path in access_points: try: - props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE) - properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint') - ssid_variant = properties['Ssid'].value - ssid = bytes(ssid_variant).decode('utf-8') - if not ssid: + ap = AccessPoint.from_dbus(ap_props.body[0], ap_path, active_ap_path) + if ap.ssid == "": continue - bssid = properties.get('HwAddress', Variant('s', '')).value - strength = properties['Strength'].value - flags = properties['Flags'].value - wpa_flags = properties['WpaFlags'].value - rsn_flags = properties['RsnFlags'].value - - # May be multiple access points for each SSID. Use first for ssid - # and security type, then update the rest using all APs - if ssid not in network_dict: - network_dict[ssid] = NetworkInfo( - ssid=ssid, - strength=0, - security_type=self._get_security_type(flags, wpa_flags, rsn_flags), - path="", - bssid="", - is_connected=False, - is_saved=ssid in self.saved_connections - ) - - existing_network = network_dict.get(ssid) - if existing_network.strength < strength: - existing_network.strength = strength - existing_network.path = ap_path - existing_network.bssid = bssid - if self.active_ap_path == ap_path: - existing_network.is_connected = self._current_connection_ssid != ssid - - except DBusError as e: - cloudlog.error(f"Error fetching networks: {e}") - except Exception as e: - cloudlog.error({e}) - - self.networks = sorted( - network_dict.values(), - key=lambda network: ( - not network.is_connected, - -network.strength, # Higher signal strength first - network.ssid.lower(), - ), - ) - - if self.callbacks.networks_updated: - self.callbacks.networks_updated(copy.deepcopy(self.networks)) + if ap.ssid not in aps: + aps[ap.ssid] = [] - async def _get_connection_settings(self, path): - """Fetch connection settings for a specific connection path.""" - try: - settings = await self._get_interface(NM, path, NM_CONNECTION_IFACE) - return await settings.call_get_settings() - except DBusError as e: - cloudlog.error(f"Failed to get settings for {path}: {str(e)}") - return {} - - async def _process_chunk(self, paths_chunk): - """Process a chunk of connection paths.""" - tasks = [self._get_connection_settings(path) for path in paths_chunk] - return await asyncio.gather(*tasks, return_exceptions=True) - - async def _get_saved_connections(self) -> dict[str, str]: - try: - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - connection_paths = await settings_iface.call_list_connections() - saved_ssids: dict[str, str] = {} - batch_size = 20 - for i in range(0, len(connection_paths), batch_size): - chunk = connection_paths[i : i + batch_size] - results = await self._process_chunk(chunk) - for path, config in zip(chunk, results, strict=True): - if isinstance(config, dict) and '802-11-wireless' in config: - if ssid := self._extract_ssid(config): - saved_ssids[ssid] = path - return saved_ssids - except DBusError as e: - cloudlog.error(f"Error fetching saved connections: {str(e)}") - return {} - - async def _get_interface(self, bus_name: str, path: str, name: str): - introspection = await self.bus.introspect(bus_name, path) - proxy = self.bus.get_proxy_object(bus_name, path, introspection) - return proxy.get_interface(name) - - def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: - """Determine the security type based on flags.""" - if flags == 0 and not (wpa_flags or rsn_flags): - return SecurityType.OPEN - if rsn_flags & 0x200: # SAE (WPA3 Personal) - # TODO: support WPA3 - return SecurityType.UNSUPPORTED - if rsn_flags: # RSN indicates WPA2 or higher - return SecurityType.WPA2 - if wpa_flags: # WPA flags indicate WPA - return SecurityType.WPA - return SecurityType.UNSUPPORTED - - -class WifiManagerWrapper: - def __init__(self): - self._manager: WifiManager | None = None - self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks() + aps[ap.ssid].append(ap) + except Exception: + # catch all for parsing errors + cloudlog.exception(f"Failed to parse AP properties for {ap_path}") - self._thread = threading.Thread(target=self._run, daemon=True) - self._loop: asyncio.EventLoop | None = None - self._running = False + known_connections = self._get_connections() + networks = [Network.from_dbus(ssid, ap_list, self._connection_by_ssid(ssid, known_connections) is not None) + for ssid, ap_list in aps.items()] + networks.sort(key=lambda n: (-n.is_connected, -n.strength, n.ssid.lower())) + self._networks = networks - def set_callbacks(self, callbacks: WifiManagerCallbacks): - self._callbacks = callbacks + if self._networks_updated is not None: + self._networks_updated(self._networks) - def start(self) -> None: - if not self._running: - self._thread.start() - while self._thread is not None and not self._running: - time.sleep(0.1) + def __del__(self): + self.stop() - def _run(self): - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) - - try: - self._manager = WifiManager(self._callbacks) - self._running = True - self._loop.run_forever() - except Exception as e: - cloudlog.error(f"Error in WifiManagerWrapper thread: {e}") - finally: - if self._loop.is_running(): - self._loop.stop() - self._running = False - - def shutdown(self) -> None: - if self._running: - if self._manager is not None and self._loop: - shutdown_future = asyncio.run_coroutine_threadsafe(self._manager.shutdown(), self._loop) - shutdown_future.result(timeout=3.0) - - if self._loop and self._loop.is_running(): - self._loop.call_soon_threadsafe(self._loop.stop) - if self._thread and self._thread.is_alive(): - self._thread.join(timeout=2.0) - self._running = False - - def is_saved(self, ssid: str) -> bool: - """Check if a network is saved.""" - return self._run_coroutine_sync(lambda manager: manager.is_saved(ssid), default=False) - - def connect(self): - """Connect to DBus and start Wi-Fi scanning.""" - if not self._manager: - return - self._run_coroutine(self._manager.connect()) - - def forget_connection(self, ssid: str): - """Forget a saved Wi-Fi connection.""" - if not self._manager: - return - self._run_coroutine(self._manager.forget_connection(ssid)) + def stop(self): + if not self._exit: + self._exit = True + self._scan_thread.join() + self._state_thread.join() - def activate_connection(self, ssid: str): - """Activate an existing Wi-Fi connection.""" - if not self._manager: - return - self._run_coroutine(self._manager.activate_connection(ssid)) - - def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False): - """Connect to a Wi-Fi network.""" - if not self._manager: - return - self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden)) - - def _run_coroutine(self, coro): - """Run a coroutine in the async thread.""" - if not self._running or not self._loop: - cloudlog.error("WifiManager thread is not running") - return - asyncio.run_coroutine_threadsafe(coro, self._loop) - - def _run_coroutine_sync(self, func: Callable[[WifiManager], T], default: T) -> T: - """Run a function synchronously in the async thread.""" - if not self._running or not self._loop or not self._manager: - return default - future = concurrent.futures.Future[T]() - - def wrapper(manager: WifiManager) -> None: - try: - future.set_result(func(manager)) - except Exception as e: - future.set_exception(e) - - try: - self._loop.call_soon_threadsafe(wrapper, self._manager) - return future.result(timeout=1.0) - except Exception as e: - cloudlog.error(f"WifiManagerWrapper property access failed: {e}") - return default + self._router_main.close() + self._router_main.conn.close() + self._conn_monitor.close() diff --git a/system/ui/setup.py b/system/ui/setup.py index 800ca7662c..a985e783be 100755 --- a/system/ui/setup.py +++ b/system/ui/setup.py @@ -19,7 +19,7 @@ from openpilot.system.ui.widgets import Widget from openpilot.system.ui.widgets.button import Button, ButtonStyle, ButtonRadio from openpilot.system.ui.widgets.keyboard import Keyboard from openpilot.system.ui.widgets.label import Label, TextAlignment -from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManagerWrapper +from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManager NetworkType = log.DeviceState.NetworkType @@ -72,8 +72,7 @@ class Setup(Widget): self.download_url = "" self.download_progress = 0 self.download_thread = None - self.wifi_manager = WifiManagerWrapper() - self.wifi_ui = WifiManagerUI(self.wifi_manager) + self.wifi_ui = WifiManagerUI(WifiManager()) self.keyboard = Keyboard() self.selected_radio = None self.warning = gui_app.texture("icons/warning.png", 150, 150) diff --git a/system/ui/updater.py b/system/ui/updater.py index 31799d3628..b3cdc82cf5 100755 --- a/system/ui/updater.py +++ b/system/ui/updater.py @@ -7,7 +7,7 @@ from enum import IntEnum from openpilot.system.hardware import HARDWARE from openpilot.system.ui.lib.application import gui_app, FontWeight -from openpilot.system.ui.lib.wifi_manager import WifiManagerWrapper +from openpilot.system.ui.lib.wifi_manager import WifiManager from openpilot.system.ui.widgets import Widget from openpilot.system.ui.widgets.button import gui_button, ButtonStyle from openpilot.system.ui.widgets.label import gui_text_box, gui_label @@ -43,8 +43,7 @@ class Updater(Widget): self.show_reboot_button = False self.process = None self.update_thread = None - self.wifi_manager = WifiManagerWrapper() - self.wifi_manager_ui = WifiManagerUI(self.wifi_manager) + self.wifi_manager_ui = WifiManagerUI(WifiManager()) def install_update(self): self.current_screen = Screen.PROGRESS diff --git a/system/ui/widgets/network.py b/system/ui/widgets/network.py index 2ecfd36be7..6f5a00015b 100644 --- a/system/ui/widgets/network.py +++ b/system/ui/widgets/network.py @@ -6,7 +6,7 @@ from typing import cast import pyray as rl from openpilot.system.ui.lib.application import gui_app from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel -from openpilot.system.ui.lib.wifi_manager import NetworkInfo, WifiManagerCallbacks, WifiManagerWrapper, SecurityType +from openpilot.system.ui.lib.wifi_manager import WifiManager, SecurityType, Network from openpilot.system.ui.widgets import Widget from openpilot.system.ui.widgets.button import ButtonStyle, Button from openpilot.system.ui.widgets.confirm_dialog import ConfirmDialog @@ -36,34 +36,35 @@ class UIState(IntEnum): class WifiManagerUI(Widget): - def __init__(self, wifi_manager: WifiManagerWrapper): + def __init__(self, wifi_manager: WifiManager): super().__init__() + self.wifi_manager = wifi_manager self.state: UIState = UIState.IDLE - self._state_network: NetworkInfo | None = None # for CONNECTING / NEEDS_AUTH / SHOW_FORGET_CONFIRM / FORGETTING + self._state_network: Network | None = None # for CONNECTING / NEEDS_AUTH / SHOW_FORGET_CONFIRM / FORGETTING self._password_retry: bool = False # for NEEDS_AUTH self.btn_width: int = 200 self.scroll_panel = GuiScrollPanel() self.keyboard = Keyboard(max_text_size=MAX_PASSWORD_LENGTH, min_text_size=MIN_PASSWORD_LENGTH, show_password_toggle=True) self._load_icons() - self._networks: list[NetworkInfo] = [] + self._networks: list[Network] = [] self._networks_buttons: dict[str, Button] = {} self._forget_networks_buttons: dict[str, Button] = {} self._lock = Lock() - self.wifi_manager = wifi_manager self._confirm_dialog = ConfirmDialog("", "Forget", "Cancel") - self.wifi_manager.set_callbacks( - WifiManagerCallbacks( - need_auth=self._on_need_auth, - activated=self._on_activated, - forgotten=self._on_forgotten, - networks_updated=self._on_network_updated, - connection_failed=self._on_connection_failed - ) - ) - self.wifi_manager.start() - self.wifi_manager.connect() + self.wifi_manager.set_callbacks(need_auth=self._on_need_auth, + activated=self._on_activated, + forgotten=self._on_forgotten, + networks_updated=self._on_network_updated, + disconnected=self._on_disconnected) + + def show_event(self): + # start/stop scanning when widget is visible + self.wifi_manager.set_active(True) + + def hide_event(self): + self.wifi_manager.set_active(False) def _load_icons(self): for icon in STRENGTH_ICONS + ["icons/checkmark.png", "icons/circled_slash.png", "icons/lock_closed.png"]: @@ -78,7 +79,7 @@ class WifiManagerUI(Widget): if self.state == UIState.NEEDS_AUTH and self._state_network: self.keyboard.set_title("Wrong password" if self._password_retry else "Enter password", f"for {self._state_network.ssid}") self.keyboard.reset() - gui_app.set_modal_overlay(self.keyboard, lambda result: self._on_password_entered(cast(NetworkInfo, self._state_network), result)) + gui_app.set_modal_overlay(self.keyboard, lambda result: self._on_password_entered(cast(Network, self._state_network), result)) elif self.state == UIState.SHOW_FORGET_CONFIRM and self._state_network: self._confirm_dialog.set_text(f'Forget Wi-Fi Network "{self._state_network.ssid}"?') self._confirm_dialog.reset() @@ -86,7 +87,7 @@ class WifiManagerUI(Widget): else: self._draw_network_list(rect) - def _on_password_entered(self, network: NetworkInfo, result: int): + def _on_password_entered(self, network: Network, result: int): if result == 1: password = self.keyboard.text self.keyboard.clear() @@ -121,7 +122,7 @@ class WifiManagerUI(Widget): rl.end_scissor_mode() - def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool): + def _draw_network_item(self, rect, network: Network, clicked: bool): spacing = 50 ssid_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, ITEM_HEIGHT) signal_icon_rect = rl.Rectangle(rect.x + rect.width - ICON_SIZE, rect.y + (ITEM_HEIGHT - ICON_SIZE) / 2, ICON_SIZE, ICON_SIZE) @@ -174,10 +175,10 @@ class WifiManagerUI(Widget): self.state = UIState.SHOW_FORGET_CONFIRM self._state_network = network - def _draw_status_icon(self, rect, network: NetworkInfo): + def _draw_status_icon(self, rect, network: Network): """Draw the status icon based on network's connection state""" icon_file = None - if network.is_connected: + if network.is_connected and self.state != UIState.CONNECTING: icon_file = "icons/checkmark.png" elif network.security_type == SecurityType.UNSUPPORTED: icon_file = "icons/circled_slash.png" @@ -191,12 +192,12 @@ class WifiManagerUI(Widget): icon_rect = rl.Vector2(rect.x, rect.y + (ICON_SIZE - texture.height) / 2) rl.draw_texture_v(texture, icon_rect, rl.WHITE) - def _draw_signal_strength_icon(self, rect: rl.Rectangle, network: NetworkInfo): + def _draw_signal_strength_icon(self, rect: rl.Rectangle, network: Network): """Draw the Wi-Fi signal strength icon based on network's signal strength""" strength_level = max(0, min(3, round(network.strength / 33.0))) rl.draw_texture_v(gui_app.texture(STRENGTH_ICONS[strength_level], ICON_SIZE, ICON_SIZE), rl.Vector2(rect.x, rect.y), rl.WHITE) - def connect_to_network(self, network: NetworkInfo, password=''): + def connect_to_network(self, network: Network, password=''): self.state = UIState.CONNECTING self._state_network = network if network.is_saved and not password: @@ -204,13 +205,12 @@ class WifiManagerUI(Widget): else: self.wifi_manager.connect_to_network(network.ssid, password) - def forget_network(self, network: NetworkInfo): + def forget_network(self, network: Network): self.state = UIState.FORGETTING self._state_network = network - network.is_saved = False self.wifi_manager.forget_connection(network.ssid) - def _on_network_updated(self, networks: list[NetworkInfo]): + def _on_network_updated(self, networks: list[Network]): with self._lock: self._networks = networks for n in self._networks: @@ -237,7 +237,7 @@ class WifiManagerUI(Widget): if self.state == UIState.FORGETTING: self.state = UIState.IDLE - def _on_connection_failed(self, ssid: str, error: str): + def _on_disconnected(self): with self._lock: if self.state == UIState.CONNECTING: self.state = UIState.IDLE @@ -245,13 +245,11 @@ class WifiManagerUI(Widget): def main(): gui_app.init_window("Wi-Fi Manager") - wifi_manager = WifiManagerWrapper() - wifi_ui = WifiManagerUI(wifi_manager) + wifi_ui = WifiManagerUI(WifiManager()) for _ in gui_app.render(): wifi_ui.render(rl.Rectangle(50, 50, gui_app.width - 100, gui_app.height - 100)) - wifi_manager.shutdown() gui_app.close() diff --git a/uv.lock b/uv.lock index 7010cdfb83..72c3d2f8e2 100644 --- a/uv.lock +++ b/uv.lock @@ -442,15 +442,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/56/c8/46ac27096684f33e27dab749ef43c6b0119c6a0d852971eaefb73256dc4c/cython-3.1.3-py3-none-any.whl", hash = "sha256:d13025b34f72f77bf7f65c1cd628914763e6c285f4deb934314c922b91e6be5a", size = 1225725, upload-time = "2025-08-13T06:19:09.593Z" }, ] -[[package]] -name = "dbus-next" -version = "0.2.3" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/ce/45/6a40fbe886d60a8c26f480e7d12535502b5ba123814b3b9a0b002ebca198/dbus_next-0.2.3.tar.gz", hash = "sha256:f4eae26909332ada528c0a3549dda8d4f088f9b365153952a408e28023a626a5", size = 71112, upload-time = "2021-07-25T22:11:28.398Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/d2/fc/c0a3f4c4eaa5a22fbef91713474666e13d0ea2a69c84532579490a9f2cc8/dbus_next-0.2.3-py3-none-any.whl", hash = "sha256:58948f9aff9db08316734c0be2a120f6dc502124d9642f55e90ac82ffb16a18b", size = 57885, upload-time = "2021-07-25T22:11:25.466Z" }, -] - [[package]] name = "dictdiffer" version = "0.9.0" @@ -690,6 +681,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/15/aa/0aca39a37d3c7eb941ba736ede56d689e7be91cab5d9ca846bde3999eba6/isodate-0.7.2-py3-none-any.whl", hash = "sha256:28009937d8031054830160fce6d409ed342816b543597cece116d966c6d99e15", size = 22320, upload-time = "2024-10-08T23:04:09.501Z" }, ] +[[package]] +name = "jeepney" +version = "0.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7b/6f/357efd7602486741aa73ffc0617fb310a29b588ed0fd69c2399acbb85b0c/jeepney-0.9.0.tar.gz", hash = "sha256:cf0e9e845622b81e4a28df94c40345400256ec608d0e55bb8a3feaa9163f5732", size = 106758, upload-time = "2025-02-27T18:51:01.684Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b2/a3/e137168c9c44d18eff0376253da9f1e9234d0239e0ee230d2fee6cea8e55/jeepney-0.9.0-py3-none-any.whl", hash = "sha256:97e5714520c16fc0a45695e5365a2e11b81ea79bba796e26f9f1d178cb182683", size = 49010, upload-time = "2025-02-27T18:51:00.104Z" }, +] + [[package]] name = "jinja2" version = "3.1.6" @@ -1241,8 +1241,8 @@ dev = [ { name = "av" }, { name = "azure-identity" }, { name = "azure-storage-blob" }, - { name = "dbus-next" }, { name = "dictdiffer" }, + { name = "jeepney" }, { name = "matplotlib" }, { name = "opencv-python-headless" }, { name = "parameterized" }, @@ -1294,11 +1294,11 @@ requires-dist = [ { name = "codespell", marker = "extra == 'testing'" }, { name = "crcmod" }, { name = "cython" }, - { name = "dbus-next", marker = "extra == 'dev'" }, { name = "dictdiffer", marker = "extra == 'dev'" }, { name = "future-fstrings" }, { name = "hypothesis", marker = "extra == 'testing'", specifier = "==6.47.*" }, { name = "inputs" }, + { name = "jeepney", marker = "extra == 'dev'" }, { name = "jinja2", marker = "extra == 'docs'" }, { name = "json-rpc" }, { name = "libusb1" },