diff --git a/selfdrive/ui/layouts/settings/settings.py b/selfdrive/ui/layouts/settings/settings.py index 89e39562b5..84836c1cc0 100644 --- a/selfdrive/ui/layouts/settings/settings.py +++ b/selfdrive/ui/layouts/settings/settings.py @@ -7,8 +7,7 @@ from openpilot.selfdrive.ui.layouts.settings.device import DeviceLayout from openpilot.selfdrive.ui.layouts.settings.firehose import FirehoseLayout from openpilot.selfdrive.ui.layouts.settings.software import SoftwareLayout from openpilot.selfdrive.ui.layouts.settings.toggles import TogglesLayout -from openpilot.system.ui.lib.wifi_manager import WifiManagerWrapper -from openpilot.system.ui.lib.wifi_manager_v2 import WifiManager +from openpilot.system.ui.lib.wifi_manager import WifiManager from openpilot.system.ui.lib.application import gui_app, FontWeight, MousePos from openpilot.system.ui.lib.text_measure import measure_text_cached from openpilot.system.ui.widgets.network import WifiManagerUI diff --git a/system/ui/lib/wifi_manager.py b/system/ui/lib/wifi_manager.py index 4cb741bc95..ca00d9a222 100644 --- a/system/ui/lib/wifi_manager.py +++ b/system/ui/lib/wifi_manager.py @@ -1,54 +1,26 @@ -import asyncio -import concurrent.futures +import atexit import copy +import dbus import threading import time import uuid from collections.abc import Callable from dataclasses import dataclass from enum import IntEnum -from typing import TypeVar -from dbus_next.aio import MessageBus -from dbus_next import BusType, Variant, Message -from dbus_next.errors import DBusError -from dbus_next.constants import MessageType +from openpilot.common.swaglog import cloudlog +from openpilot.system.ui.lib.networkmanager import * try: from openpilot.common.params import Params except ImportError: # Params/Cythonized modules are not available in zipapp Params = None -from openpilot.common.swaglog import cloudlog - -T = TypeVar("T") - -# NetworkManager constants -NM = "org.freedesktop.NetworkManager" -NM_PATH = '/org/freedesktop/NetworkManager' -NM_IFACE = 'org.freedesktop.NetworkManager' -NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings' -NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings' -NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection' -NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless' -NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' -NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" - -NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 TETHERING_IP_ADDRESS = "192.168.43.1" DEFAULT_TETHERING_PASSWORD = "swagswagcomma" -# NetworkManager device states -class NMDeviceState(IntEnum): - DISCONNECTED = 30 - PREPARE = 40 - NEED_AUTH = 60 - IP_CONFIG = 70 - ACTIVATED = 100 - - class SecurityType(IntEnum): OPEN = 0 WPA = 1 @@ -57,673 +29,362 @@ class SecurityType(IntEnum): UNSUPPORTED = 4 -@dataclass -class NetworkInfo: +def get_security_type(flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: + wpa_props = wpa_flags | rsn_flags + + # obtained by looking at flags of networks in the office as reported by an Android phone + supports_wpa = NM_802_11_AP_SEC_PAIR_WEP40 | NM_802_11_AP_SEC_PAIR_WEP104 | NM_802_11_AP_SEC_GROUP_WEP40 | NM_802_11_AP_SEC_GROUP_WEP104 | NM_802_11_AP_SEC_KEY_MGMT_PSK; + + if (flags == NM_802_11_AP_FLAGS_NONE) or ((flags & NM_802_11_AP_FLAGS_WPS) and not (wpa_props & supports_wpa)): + return SecurityType.OPEN + elif (flags & NM_802_11_AP_FLAGS_PRIVACY) and (wpa_props & supports_wpa) and not (wpa_props & NM_802_11_AP_SEC_KEY_MGMT_802_1X): + return SecurityType.WPA + else: + cloudlog.warning(f"Unsupported network! flags: {flags}, wpa_flags: {wpa_flags}, rsn_flags: {rsn_flags}") + return SecurityType.UNSUPPORTED + + +@dataclass(frozen=True) +class Network: ssid: str strength: int is_connected: bool security_type: SecurityType - path: str - bssid: str - is_saved: bool = False - # saved_path: str + is_saved: bool + + @classmethod + def from_dbus(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: dbus.ObjectPath, is_saved: bool) -> "Network": + # we only want to show the strongest AP for each Network/SSID + strongest_ap = max(aps, key=lambda ap: ap.strength) + + is_connected = any(ap.ap_path == active_ap_path for ap in aps) # TODO: just any is_connected aps! + security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags) + + return cls( + ssid=ssid, + strength=strongest_ap.strength, + is_connected=is_connected, + security_type=security_type, + is_saved=is_saved, + ) -@dataclass -class WifiManagerCallbacks: - need_auth: Callable[[str], None] | None = None - activated: Callable[[], None] | None = None - forgotten: Callable[[str], None] | None = None - networks_updated: Callable[[list[NetworkInfo]], None] | None = None - connection_failed: Callable[[str, str], None] | None = None # Added for error feedback +@dataclass(frozen=True) +class AccessPoint: + ssid: str + bssid: str + strength: int + is_connected: bool + flags: int + wpa_flags: int + rsn_flags: int + ap_path: dbus.ObjectPath + + @classmethod + def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": + ssid = bytes(ap_props.Get(NM_ACCESS_POINT_IFACE, "Ssid")).decode("utf-8", "replace") + bssid = str(ap_props.Get(NM_ACCESS_POINT_IFACE, "HwAddress")) + strength = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Strength")) + flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Flags")) + wpa_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "WpaFlags")) + rsn_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "RsnFlags")) + + return cls( + ssid=ssid, + bssid=bssid, + strength=strength, + is_connected=ap_path == active_ap_path, + flags=flags, + wpa_flags=wpa_flags, + rsn_flags=rsn_flags, + ap_path=ap_path, + ) class WifiManager: - def __init__(self, callbacks): - self.callbacks: WifiManagerCallbacks = callbacks - self.networks: list[NetworkInfo] = [] - self.bus: MessageBus = None - self.device_path: str = "" - self.device_proxy = None - self.saved_connections: dict[str, str] = {} - self.active_ap_path: str = "" - self.scan_task: asyncio.Task | None = None - # Set tethering ssid as "weedle" + first 4 characters of a dongle id - self._tethering_ssid = "weedle" - if Params is not None: - dongle_id = Params().get("DongleId") - if dongle_id: - self._tethering_ssid += "-" + dongle_id[:4] - self.running: bool = True - self._current_connection_ssid: str | None = None - - async def connect(self) -> None: - """Connect to the DBus system bus.""" - try: - self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect() - while not await self._find_wifi_device(): - await asyncio.sleep(1) - - await self._setup_signals(self.device_path) - self.active_ap_path = await self.get_active_access_point() - await self.add_tethering_connection(self._tethering_ssid, DEFAULT_TETHERING_PASSWORD) - self.saved_connections = await self._get_saved_connections() - self.scan_task = asyncio.create_task(self._periodic_scan()) - except DBusError as e: - cloudlog.error(f"Failed to connect to DBus: {e}") - raise - except Exception as e: - cloudlog.error(f"Unexpected error during connect: {e}") - raise - - async def shutdown(self) -> None: - self.running = False - if self.scan_task: - self.scan_task.cancel() - try: - await self.scan_task - except asyncio.CancelledError: - pass - if self.bus: - self.bus.disconnect() + def __init__(self): + self._networks = [] # a network can be comprised of multiple APs + self._active = True # used to not run this cycle when not in settings + self._running = True + + # DBus and NetworkManager setup + self._bus = dbus.SystemBus() + self._nm = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_IFACE) + self._props = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) + + self._bus2 = dbus.SystemBus(private=True) + + # State + self._connecting_to_ssid: str = "" + + # Callbacks + # TODO: some of these are called from threads, either: + # 1. make sure this is fine + # 2. add callback event list that user can call from main thread to get callbacks safely + self._need_auth: Callable[[str], None] | None = None + self._activated: Callable[[], None] | None = None + self._forgotten: Callable[[str], None] | None = None + self._networks_updated: Callable[[list[Network]], None] | None = None + self._disconnected: Callable[[], None] | None = None - async def _request_scan(self) -> None: - try: - interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) - await interface.call_request_scan({}) - except DBusError as e: - cloudlog.warning(f"Scan request failed: {str(e)}") + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() - async def get_active_access_point(self): - try: - props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) - ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') - return ap_path.value - except DBusError as e: - cloudlog.error(f"Error fetching active access point: {str(e)}") - return '' - - async def forget_connection(self, ssid: str) -> bool: - path = self.saved_connections.get(ssid) - if not path: - return False + self._state_thread = threading.Thread(target=self._monitor_state, daemon=True) + self._state_thread.start() - try: - nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE) - await nm_iface.call_delete() - - if self._current_connection_ssid == ssid: - self._current_connection_ssid = None - - if ssid in self.saved_connections: - del self.saved_connections[ssid] - - for network in self.networks: - if network.ssid == ssid: - network.is_saved = False - network.is_connected = False - break - - # Notify UI of forgotten connection - if self.callbacks.networks_updated: - self.callbacks.networks_updated(copy.deepcopy(self.networks)) - - return True - except DBusError as e: - cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}") - return False - - async def activate_connection(self, ssid: str) -> bool: - connection_path = self.saved_connections.get(ssid) - if not connection_path: - return False - try: - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - await nm_iface.call_activate_connection(connection_path, self.device_path, "/") - return True - except DBusError as e: - cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}") - return False - - async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None: - """Connect to a selected Wi-Fi network.""" - try: - self._current_connection_ssid = ssid - - if ssid in self.saved_connections: - # Forget old connection if new password provided - if password: - await self.forget_connection(ssid) - await asyncio.sleep(0.2) # NetworkManager delay - else: - # Just activate existing connection - await self.activate_connection(ssid) - return + atexit.register(self.stop) - connection = { - 'connection': { - 'type': Variant('s', '802-11-wireless'), - 'uuid': Variant('s', str(uuid.uuid4())), - 'id': Variant('s', f'openpilot connection {ssid}'), - 'autoconnect-retries': Variant('i', 0), - }, - '802-11-wireless': { - 'ssid': Variant('ay', ssid.encode('utf-8')), - 'hidden': Variant('b', is_hidden), - 'mode': Variant('s', 'infrastructure'), - }, - 'ipv4': { - 'method': Variant('s', 'auto'), - 'dns-priority': Variant('i', 600), - }, - 'ipv6': {'method': Variant('s', 'ignore')}, - } + def __del__(self): + self.stop() - if bssid: - connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) + def stop(self): + self._running = False + self._thread.join() + self._state_thread.join() + self._bus.close() + self._bus2.close() + + def _monitor_state(self): + prev_state = -1 + + device_path = self._get_wifi_device() + props_dev = dbus.Interface(self._bus2.get_object(NM, device_path), NM_PROPERTIES_IFACE) + _props = dbus.Interface(self._bus2.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) + + while self._running: + if self._active: + print('moritring state ACTivE!!1') + dev_state = int(props_dev.Get(NM_DEVICE_IFACE, "State")) + state_reason = props_dev.Get(NM_DEVICE_IFACE, "StateReason") # (u state, u reason) + reason = int(state_reason[1]) if isinstance(state_reason, (list, tuple)) and len(state_reason) == 2 else 0 + + if dev_state != prev_state: + print(f" WiFi device state change: {dev_state}, reason: {reason}") + if dev_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self._connecting_to_ssid: + print('------ NEED AUTH - SUPPLICANT DISCONNECT') + self.forget_connection(self._connecting_to_ssid, block=True) + if self._need_auth is not None: + self._need_auth(self._connecting_to_ssid) + self._connecting_to_ssid = "" + elif dev_state == NMDeviceState.ACTIVATED: + print('------ ACTIVATED') + if self._activated is not None: + self._activated() + self._connecting_to_ssid = "" + elif dev_state == NMDeviceState.DISCONNECTED: + print('------ DISCONNECTED') + self._connecting_to_ssid = "" + if self._disconnected is not None: + self._disconnected() + + print() + + if self._connecting_to_ssid: + print(' CONNECTING', self._connecting_to_ssid) + + prev_state = dev_state + + time.sleep(1 / 2.) + + def set_callbacks(self, need_auth: Callable[[str], None], + activated: Callable[[], None] | None, + forgotten: Callable[[str], None], + networks_updated: Callable[[list[Network]], None], + disconnected: Callable[[], None]): + self._need_auth = need_auth + self._activated = activated + self._forgotten = forgotten + self._networks_updated = networks_updated + self._disconnected = disconnected - if password: - connection['802-11-wireless-security'] = { - 'key-mgmt': Variant('s', 'wpa-psk'), - 'auth-alg': Variant('s', 'open'), - 'psk': Variant('s', password), - } + def _run(self): + i = 0 + while self._running: + if self._active: + print('we;re acti!!!!!!!!!!!!') + # Scan for networks every 5 seconds + if i % 5 == 0: + # TODO: should watch when scan is complete, but this is more than good enough for now + self._update_networks() + self._request_scan() + + i += 1 + time.sleep(1) + + def set_active(self, active: bool): + print('SETTING ACTIVE', active) + self._active = active + + def _get_wifi_device(self) -> dbus.ObjectPath | None: + # TODO: cache if slow + t = time.monotonic() + device_paths = self._nm.GetDevices() + # print(f'DEVICE PATHS: {device_paths}') + + wifi_device = None + for device_path in device_paths: + dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) + dev_type = dev_props.Get(NM_DEVICE_IFACE, "DeviceType") + + if dev_type == NM_DEVICE_TYPE_WIFI: + wifi_device = device_path + break - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - await nm_iface.call_add_and_activate_connection(connection, self.device_path, "/") - except Exception as e: - self._current_connection_ssid = None - cloudlog.error(f"Error connecting to network: {e}") - # Notify UI of failure - if self.callbacks.connection_failed: - self.callbacks.connection_failed(ssid, str(e)) - - def is_saved(self, ssid: str) -> bool: - return ssid in self.saved_connections - - async def _find_wifi_device(self) -> bool: - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - devices = await nm_iface.get_devices() - - for device_path in devices: - device = await self.bus.introspect(NM, device_path) - device_proxy = self.bus.get_proxy_object(NM, device_path, device) - device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) - device_type = await device_interface.get_device_type() # type: ignore[attr-defined] - if device_type == 2: # Wi-Fi device - self.device_path = device_path - self.device_proxy = device_proxy - return True - - return False - - async def add_tethering_connection(self, ssid: str, password: str = "12345678") -> bool: - """Create a WiFi tethering connection.""" - if len(password) < 8: - print("Tethering password must be at least 8 characters") - return False + # print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}") + return wifi_device - try: - # First, check if a hotspot connection already exists - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - connection_paths = await settings_iface.call_list_connections() - - # Look for an existing hotspot connection - for path in connection_paths: - try: - settings = await self._get_connection_settings(path) - conn_type = settings.get('connection', {}).get('type', Variant('s', '')).value - wifi_mode = settings.get('802-11-wireless', {}).get('mode', Variant('s', '')).value - - if conn_type == '802-11-wireless' and wifi_mode == 'ap': - # Extract the SSID to check - connection_ssid = self._extract_ssid(settings) - if connection_ssid == ssid: - return True - except DBusError: - continue + def connect_to_network(self, ssid: str, password: str | None): + def worker(): + t = time.monotonic() + + # Clear all connections that may already exist to the network we are connecting + self._connecting_to_ssid = ssid + self.forget_connection(ssid, block=True) + + is_hidden = False connection = { 'connection': { - 'id': Variant('s', 'Hotspot'), - 'uuid': Variant('s', str(uuid.uuid4())), - 'type': Variant('s', '802-11-wireless'), - 'interface-name': Variant('s', 'wlan0'), - 'autoconnect': Variant('b', False), + 'type': '802-11-wireless', + 'uuid': str(uuid.uuid4()), + 'id': f'openpilot connection {ssid}', + 'autoconnect-retries': 0, }, '802-11-wireless': { - 'band': Variant('s', 'bg'), - 'mode': Variant('s', 'ap'), - 'ssid': Variant('ay', ssid.encode('utf-8')), - }, - '802-11-wireless-security': { - 'group': Variant('as', ['ccmp']), - 'key-mgmt': Variant('s', 'wpa-psk'), - 'pairwise': Variant('as', ['ccmp']), - 'proto': Variant('as', ['rsn']), - 'psk': Variant('s', password), + 'ssid': dbus.ByteArray(ssid.encode("utf-8")), + 'hidden': is_hidden, + 'mode': 'infrastructure', }, 'ipv4': { - 'method': Variant('s', 'shared'), - 'address-data': Variant('aa{sv}', [{'address': Variant('s', TETHERING_IP_ADDRESS), 'prefix': Variant('u', 24)}]), - 'gateway': Variant('s', TETHERING_IP_ADDRESS), - 'never-default': Variant('b', True), - }, - 'ipv6': { - 'method': Variant('s', 'ignore'), + 'method': 'auto', + 'dns-priority': 600, }, + 'ipv6': {'method': 'ignore'}, } - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - new_connection = await settings_iface.call_add_connection(connection) - print(f"Added tethering connection with path: {new_connection}") - return True - except DBusError as e: - print(f"Failed to add tethering connection: {e}") - return False - except Exception as e: - print(f"Unexpected error adding tethering connection: {e}") - return False - - async def get_tethering_password(self) -> str: - """Get the current tethering password.""" - try: - hotspot_path = self.saved_connections.get(self._tethering_ssid) - if hotspot_path: - conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) - secrets = await conn_iface.call_get_secrets('802-11-wireless-security') - if secrets and '802-11-wireless-security' in secrets: - psk = secrets.get('802-11-wireless-security', {}).get('psk', Variant('s', '')).value - return str(psk) if psk is not None else "" - return "" - except DBusError as e: - print(f"Failed to get tethering password: {e}") - return "" - except Exception as e: - print(f"Unexpected error getting tethering password: {e}") - return "" - - async def set_tethering_password(self, password: str) -> bool: - """Set the tethering password.""" - if len(password) < 8: - cloudlog.error("Tethering password must be at least 8 characters") - return False - - try: - hotspot_path = self.saved_connections.get(self._tethering_ssid) - if not hotspot_path: - print("No hotspot connection found") - return False - - # Update the connection settings with new password - settings = await self._get_connection_settings(hotspot_path) - if '802-11-wireless-security' not in settings: - settings['802-11-wireless-security'] = {} - settings['802-11-wireless-security']['psk'] = Variant('s', password) - - # Apply changes - conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) - await conn_iface.call_update(settings) - - # Check if connection is active and restart if needed - is_active = False - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - active_connections = await nm_iface.get_active_connections() - - for conn_path in active_connections: - props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) - conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') - if conn_id_path.value == hotspot_path: - is_active = True - await nm_iface.call_deactivate_connection(conn_path) - break - - if is_active: - await nm_iface.call_activate_connection(hotspot_path, self.device_path, "/") - - print("Tethering password updated successfully") - return True - except DBusError as e: - print(f"Failed to set tethering password: {e}") - return False - except Exception as e: - print(f"Unexpected error setting tethering password: {e}") - return False - - async def is_tethering_active(self) -> bool: - """Check if tethering is active for the specified SSID.""" - try: - hotspot_path = self.saved_connections.get(self._tethering_ssid) - if not hotspot_path: - return False - - nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) - active_connections = await nm_iface.get_active_connections() - - for conn_path in active_connections: - props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) - conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') - - if conn_id_path.value == hotspot_path: - return True - - return False - except Exception: - return False - - async def _periodic_scan(self): - while self.running: - try: - await self._request_scan() - await asyncio.sleep(30) - except asyncio.CancelledError: - break - except DBusError as e: - cloudlog.error(f"Scan failed: {e}") - await asyncio.sleep(5) - - async def _setup_signals(self, device_path: str) -> None: - rules = [ - f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'", - f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'", - f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'", - f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'", - ] - for rule in rules: - await self._add_match_rule(rule) - - # Set up signal handlers - self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed) - self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed) - - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - settings_iface.on_new_connection(self._on_new_connection) - settings_iface.on_connection_removed(self._on_connection_removed) - - def _on_properties_changed(self, interface: str, changed: dict, invalidated: list): - if interface == NM_WIRELESS_IFACE and 'LastScan' in changed: - asyncio.create_task(self._refresh_networks()) - elif interface == NM_WIRELESS_IFACE and "ActiveAccessPoint" in changed: - new_ap_path = changed["ActiveAccessPoint"].value - if self.active_ap_path != new_ap_path: - self.active_ap_path = new_ap_path - - def _on_state_changed(self, new_state: int, old_state: int, reason: int): - if new_state == NMDeviceState.ACTIVATED: - if self.callbacks.activated: - self.callbacks.activated() - self._current_connection_ssid = None - asyncio.create_task(self._refresh_networks()) - elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH): - for network in self.networks: - network.is_connected = False - - # BAD PASSWORD - if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth: - if self._current_connection_ssid: - asyncio.create_task(self.forget_connection(self._current_connection_ssid)) - self.callbacks.need_auth(self._current_connection_ssid) - else: - # Try to find the network from active_ap_path - for network in self.networks: - if network.path == self.active_ap_path: - asyncio.create_task(self.forget_connection(network.ssid)) - self.callbacks.need_auth(network.ssid) - break - else: - # Couldn't identify the network that needs auth - cloudlog.error("Network needs authentication but couldn't identify which one") - - def _on_new_connection(self, path: str) -> None: - """Callback for NewConnection signal.""" - asyncio.create_task(self._add_saved_connection(path)) - - def _on_connection_removed(self, path: str) -> None: - """Callback for ConnectionRemoved signal.""" - for ssid, p in list(self.saved_connections.items()): - if path == p: - del self.saved_connections[ssid] - - if self.callbacks.forgotten: - self.callbacks.forgotten(ssid) - break + if password is not None: + connection['802-11-wireless-security'] = { + 'key-mgmt': 'wpa-psk', + 'auth-alg': 'open', + 'psk': password, + } - async def _add_saved_connection(self, path: str) -> None: - """Add a new saved connection to the dictionary.""" - try: - settings = await self._get_connection_settings(path) - if ssid := self._extract_ssid(settings): - self.saved_connections[ssid] = path - except DBusError as e: - cloudlog.error(f"Failed to add connection {path}: {e}") - - def _extract_ssid(self, settings: dict) -> str | None: - """Extract SSID from connection settings.""" - ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value - return bytes(ssid_variant).decode('utf-8') if ssid_variant else None - - async def _add_match_rule(self, rule): - """Add a match rule on the bus.""" - reply = await self.bus.call( - Message( - message_type=MessageType.METHOD_CALL, - destination='org.freedesktop.DBus', - interface="org.freedesktop.DBus", - path='/org/freedesktop/DBus', - member='AddMatch', - signature='s', - body=[rule], + settings = dbus.Interface( + self._bus.get_object(NM, NM_SETTINGS_PATH), + NM_SETTINGS_IFACE ) - ) - assert reply.message_type == MessageType.METHOD_RETURN - return reply + conn_path = settings.AddConnection(connection) - async def _refresh_networks(self): - """Get a list of available networks via NetworkManager.""" - wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) - access_points = await wifi_iface.get_access_points() - self.active_ap_path = await self.get_active_access_point() - network_dict = {} - for ap_path in access_points: - try: - props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE) - properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint') - ssid_variant = properties['Ssid'].value - ssid = bytes(ssid_variant).decode('utf-8') - if not ssid: - continue + print('Added connection', conn_path) - bssid = properties.get('HwAddress', Variant('s', '')).value - strength = properties['Strength'].value - flags = properties['Flags'].value - wpa_flags = properties['WpaFlags'].value - rsn_flags = properties['RsnFlags'].value - - # May be multiple access points for each SSID. Use first for ssid - # and security type, then update the rest using all APs - if ssid not in network_dict: - network_dict[ssid] = NetworkInfo( - ssid=ssid, - strength=0, - security_type=self._get_security_type(flags, wpa_flags, rsn_flags), - path="", - bssid="", - is_connected=False, - is_saved=ssid in self.saved_connections - ) - - existing_network = network_dict.get(ssid) - if existing_network.strength < strength: - existing_network.strength = strength - existing_network.path = ap_path - existing_network.bssid = bssid - if self.active_ap_path == ap_path: - existing_network.is_connected = self._current_connection_ssid != ssid - - except DBusError as e: - cloudlog.error(f"Error fetching networks: {e}") - except Exception as e: - cloudlog.error({e}) - - self.networks = sorted( - network_dict.values(), - key=lambda network: ( - not network.is_connected, - -network.strength, # Higher signal strength first - network.ssid.lower(), - ), - ) + print(f'Connecting to network took {time.monotonic() - t}s') - if self.callbacks.networks_updated: - self.callbacks.networks_updated(copy.deepcopy(self.networks)) + self.activate_connection(ssid) - async def _get_connection_settings(self, path): - """Fetch connection settings for a specific connection path.""" - try: - settings = await self._get_interface(NM, path, NM_CONNECTION_IFACE) - return await settings.call_get_settings() - except DBusError as e: - cloudlog.error(f"Failed to get settings for {path}: {str(e)}") - return {} - - async def _process_chunk(self, paths_chunk): - """Process a chunk of connection paths.""" - tasks = [self._get_connection_settings(path) for path in paths_chunk] - return await asyncio.gather(*tasks, return_exceptions=True) - - async def _get_saved_connections(self) -> dict[str, str]: - try: - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - connection_paths = await settings_iface.call_list_connections() - saved_ssids: dict[str, str] = {} - batch_size = 20 - for i in range(0, len(connection_paths), batch_size): - chunk = connection_paths[i : i + batch_size] - results = await self._process_chunk(chunk) - for path, config in zip(chunk, results, strict=True): - if isinstance(config, dict) and '802-11-wireless' in config: - if ssid := self._extract_ssid(config): - saved_ssids[ssid] = path - return saved_ssids - except DBusError as e: - cloudlog.error(f"Error fetching saved connections: {str(e)}") - return {} - - async def _get_interface(self, bus_name: str, path: str, name: str): - introspection = await self.bus.introspect(bus_name, path) - proxy = self.bus.get_proxy_object(bus_name, path, introspection) - return proxy.get_interface(name) - - def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: - """Determine the security type based on flags.""" - if flags == 0 and not (wpa_flags or rsn_flags): - return SecurityType.OPEN - if rsn_flags & 0x200: # SAE (WPA3 Personal) - # TODO: support WPA3 - return SecurityType.UNSUPPORTED - if rsn_flags: # RSN indicates WPA2 or higher - return SecurityType.WPA2 - if wpa_flags: # WPA flags indicate WPA - return SecurityType.WPA - return SecurityType.UNSUPPORTED + threading.Thread(target=worker, daemon=True).start() + def _get_connections(self) -> list[dbus.ObjectPath]: + settings_iface = dbus.Interface(self._bus.get_object(NM, NM_SETTINGS_PATH), NM_SETTINGS_IFACE) + return settings_iface.ListConnections() -class WifiManagerWrapper: - def __init__(self): - self._manager: WifiManager | None = None - self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks() + def _connection_by_ssid(self, ssid: str, known_connections: list[dbus.ObjectPath] | None = None) -> dbus.ObjectPath | None: + for conn_path in known_connections or self._get_connections(): + conn_props = dbus.Interface(self._bus.get_object(NM, conn_path), NM_CONNECTION_IFACE) + settings = conn_props.GetSettings() + if "802-11-wireless" in settings and bytes(settings["802-11-wireless"]["ssid"]).decode("utf-8", "replace") == ssid: + return conn_path + return None - self._thread = threading.Thread(target=self._run, daemon=True) - self._loop: asyncio.EventLoop | None = None - self._running = False - - def set_callbacks(self, callbacks: WifiManagerCallbacks): - self._callbacks = callbacks + def forget_connection(self, ssid: str, block: bool = False): + def worker(): + t = time.monotonic() + conn_path = self._connection_by_ssid(ssid) + print(f'Finding connection by SSID took {time.monotonic() - t}s: {conn_path}') + if conn_path is not None: + conn_iface = dbus.Interface(self._bus.get_object(NM, conn_path), NM_CONNECTION_IFACE) + conn_iface.Delete() + print(f'Forgetting connection took {time.monotonic() - t}s') + if self._forgotten is not None: + self._forgotten(ssid) - def start(self) -> None: - if not self._running: - self._thread.start() - while self._thread is not None and not self._running: - time.sleep(0.1) + # TODO: make a helper when it makes sense + if block: + worker() + else: + threading.Thread(target=worker, daemon=True).start() - def _run(self): - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) + def activate_connection(self, ssid: str): + t = time.monotonic() + conn_path = self._connection_by_ssid(ssid) + if conn_path is not None: + self._connecting_to_ssid = ssid + device_path = self._get_wifi_device() + if device_path is None: + cloudlog.warning("No WiFi device found") + return + + print(f'Activating connection to {ssid}') + self._nm.ActivateConnection(conn_path, device_path, dbus.ObjectPath("/")) + print(f"Activated connection in {time.monotonic() - t}s") + # FIXME: deadlock issue with ui + # if self._activated is not None: + # self._activated() + + def _request_scan(self): + device_path = self._get_wifi_device() + if device_path is None: + cloudlog.warning("No WiFi device found") + return + wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE) try: - self._manager = WifiManager(self._callbacks) - self._running = True - self._loop.run_forever() - except Exception as e: - cloudlog.error(f"Error in WifiManagerWrapper thread: {e}") - finally: - if self._loop.is_running(): - self._loop.stop() - self._running = False - - def shutdown(self) -> None: - if self._running: - if self._manager is not None and self._loop: - shutdown_future = asyncio.run_coroutine_threadsafe(self._manager.shutdown(), self._loop) - shutdown_future.result(timeout=3.0) - - if self._loop and self._loop.is_running(): - self._loop.call_soon_threadsafe(self._loop.stop) - if self._thread and self._thread.is_alive(): - self._thread.join(timeout=2.0) - self._running = False - - def is_saved(self, ssid: str) -> bool: - """Check if a network is saved.""" - return self._run_coroutine_sync(lambda manager: manager.is_saved(ssid), default=False) - - def connect(self): - """Connect to DBus and start Wi-Fi scanning.""" - if not self._manager: + wifi_iface.RequestScan({}) + print('Requested scan') + except dbus.exceptions.DBusException as e: + # TODO: copilot is wrong, this never happens + if "org.freedesktop.NetworkManager.Device.Error.AlreadyScanning" in str(e): + print('Already scanning, skipping') + else: + cloudlog.exception("Failed to request scan") + + def _update_networks(self): + # TODO: only run this function on scan complete! + print('UPDATING NETWORKS!!!!') + + device_path = self._get_wifi_device() + if device_path is None: + cloudlog.warning("No WiFi device found") return - self._run_coroutine(self._manager.connect()) - def forget_connection(self, ssid: str): - """Forget a saved Wi-Fi connection.""" - if not self._manager: - return - self._run_coroutine(self._manager.forget_connection(ssid)) + wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE) + dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) + active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint") - def activate_connection(self, ssid: str): - """Activate an existing Wi-Fi connection.""" - if not self._manager: - return - self._run_coroutine(self._manager.activate_connection(ssid)) + aps: dict[str, list[AccessPoint]] = {} - def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False): - """Connect to a Wi-Fi network.""" - if not self._manager: - return - self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden)) + for ap_path in wifi_iface.GetAllAccessPoints(): + ap_props = dbus.Interface(self._bus.get_object(NM, ap_path), NM_PROPERTIES_IFACE) - def _run_coroutine(self, coro): - """Run a coroutine in the async thread.""" - if not self._running or not self._loop: - cloudlog.error("WifiManager thread is not running") - return - asyncio.run_coroutine_threadsafe(coro, self._loop) + try: + ap = AccessPoint.from_dbus(ap_props, ap_path, active_ap_path) + if ap.ssid == "": + continue - def _run_coroutine_sync(self, func: Callable[[WifiManager], T], default: T) -> T: - """Run a function synchronously in the async thread.""" - if not self._running or not self._loop or not self._manager: - return default - future = concurrent.futures.Future[T]() + if ap.ssid not in aps: + aps[ap.ssid] = [] - def wrapper(manager: WifiManager) -> None: - try: - future.set_result(func(manager)) - except Exception as e: - future.set_exception(e) + aps[ap.ssid].append(ap) + except dbus.exceptions.DBusException: + # some APs have been seen dropping off during iteration + cloudlog.exception(f"Failed to get AP properties for {ap_path}") - try: - self._loop.call_soon_threadsafe(wrapper, self._manager) - return future.result(timeout=1.0) - except Exception as e: - cloudlog.error(f"WifiManagerWrapper property access failed: {e}") - return default + known_connections = self._get_connections() + self._networks = [Network.from_dbus(ssid, ap_list, active_ap_path, self._connection_by_ssid(ssid, known_connections) is not None) + for ssid, ap_list in aps.items()] + if self._networks_updated is not None: + self._networks_updated(self._networks) + + def get_networks(self): + return self._networks diff --git a/system/ui/lib/wifi_manager_v2.py b/system/ui/lib/wifi_manager_v2.py deleted file mode 100644 index ca00d9a222..0000000000 --- a/system/ui/lib/wifi_manager_v2.py +++ /dev/null @@ -1,390 +0,0 @@ -import atexit -import copy -import dbus -import threading -import time -import uuid -from collections.abc import Callable -from dataclasses import dataclass -from enum import IntEnum - -from openpilot.common.swaglog import cloudlog -from openpilot.system.ui.lib.networkmanager import * - -try: - from openpilot.common.params import Params -except ImportError: - # Params/Cythonized modules are not available in zipapp - Params = None - -TETHERING_IP_ADDRESS = "192.168.43.1" -DEFAULT_TETHERING_PASSWORD = "swagswagcomma" - - -class SecurityType(IntEnum): - OPEN = 0 - WPA = 1 - WPA2 = 2 - WPA3 = 3 - UNSUPPORTED = 4 - - -def get_security_type(flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: - wpa_props = wpa_flags | rsn_flags - - # obtained by looking at flags of networks in the office as reported by an Android phone - supports_wpa = NM_802_11_AP_SEC_PAIR_WEP40 | NM_802_11_AP_SEC_PAIR_WEP104 | NM_802_11_AP_SEC_GROUP_WEP40 | NM_802_11_AP_SEC_GROUP_WEP104 | NM_802_11_AP_SEC_KEY_MGMT_PSK; - - if (flags == NM_802_11_AP_FLAGS_NONE) or ((flags & NM_802_11_AP_FLAGS_WPS) and not (wpa_props & supports_wpa)): - return SecurityType.OPEN - elif (flags & NM_802_11_AP_FLAGS_PRIVACY) and (wpa_props & supports_wpa) and not (wpa_props & NM_802_11_AP_SEC_KEY_MGMT_802_1X): - return SecurityType.WPA - else: - cloudlog.warning(f"Unsupported network! flags: {flags}, wpa_flags: {wpa_flags}, rsn_flags: {rsn_flags}") - return SecurityType.UNSUPPORTED - - -@dataclass(frozen=True) -class Network: - ssid: str - strength: int - is_connected: bool - security_type: SecurityType - is_saved: bool - - @classmethod - def from_dbus(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: dbus.ObjectPath, is_saved: bool) -> "Network": - # we only want to show the strongest AP for each Network/SSID - strongest_ap = max(aps, key=lambda ap: ap.strength) - - is_connected = any(ap.ap_path == active_ap_path for ap in aps) # TODO: just any is_connected aps! - security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags) - - return cls( - ssid=ssid, - strength=strongest_ap.strength, - is_connected=is_connected, - security_type=security_type, - is_saved=is_saved, - ) - - -@dataclass(frozen=True) -class AccessPoint: - ssid: str - bssid: str - strength: int - is_connected: bool - flags: int - wpa_flags: int - rsn_flags: int - ap_path: dbus.ObjectPath - - @classmethod - def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": - ssid = bytes(ap_props.Get(NM_ACCESS_POINT_IFACE, "Ssid")).decode("utf-8", "replace") - bssid = str(ap_props.Get(NM_ACCESS_POINT_IFACE, "HwAddress")) - strength = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Strength")) - flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Flags")) - wpa_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "WpaFlags")) - rsn_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "RsnFlags")) - - return cls( - ssid=ssid, - bssid=bssid, - strength=strength, - is_connected=ap_path == active_ap_path, - flags=flags, - wpa_flags=wpa_flags, - rsn_flags=rsn_flags, - ap_path=ap_path, - ) - - -class WifiManager: - def __init__(self): - self._networks = [] # a network can be comprised of multiple APs - self._active = True # used to not run this cycle when not in settings - self._running = True - - # DBus and NetworkManager setup - self._bus = dbus.SystemBus() - self._nm = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_IFACE) - self._props = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) - - self._bus2 = dbus.SystemBus(private=True) - - # State - self._connecting_to_ssid: str = "" - - # Callbacks - # TODO: some of these are called from threads, either: - # 1. make sure this is fine - # 2. add callback event list that user can call from main thread to get callbacks safely - self._need_auth: Callable[[str], None] | None = None - self._activated: Callable[[], None] | None = None - self._forgotten: Callable[[str], None] | None = None - self._networks_updated: Callable[[list[Network]], None] | None = None - self._disconnected: Callable[[], None] | None = None - - self._thread = threading.Thread(target=self._run, daemon=True) - self._thread.start() - - self._state_thread = threading.Thread(target=self._monitor_state, daemon=True) - self._state_thread.start() - - atexit.register(self.stop) - - def __del__(self): - self.stop() - - def stop(self): - self._running = False - self._thread.join() - self._state_thread.join() - self._bus.close() - self._bus2.close() - - def _monitor_state(self): - prev_state = -1 - - device_path = self._get_wifi_device() - props_dev = dbus.Interface(self._bus2.get_object(NM, device_path), NM_PROPERTIES_IFACE) - _props = dbus.Interface(self._bus2.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) - - while self._running: - if self._active: - print('moritring state ACTivE!!1') - dev_state = int(props_dev.Get(NM_DEVICE_IFACE, "State")) - state_reason = props_dev.Get(NM_DEVICE_IFACE, "StateReason") # (u state, u reason) - reason = int(state_reason[1]) if isinstance(state_reason, (list, tuple)) and len(state_reason) == 2 else 0 - - if dev_state != prev_state: - print(f" WiFi device state change: {dev_state}, reason: {reason}") - if dev_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self._connecting_to_ssid: - print('------ NEED AUTH - SUPPLICANT DISCONNECT') - self.forget_connection(self._connecting_to_ssid, block=True) - if self._need_auth is not None: - self._need_auth(self._connecting_to_ssid) - self._connecting_to_ssid = "" - elif dev_state == NMDeviceState.ACTIVATED: - print('------ ACTIVATED') - if self._activated is not None: - self._activated() - self._connecting_to_ssid = "" - elif dev_state == NMDeviceState.DISCONNECTED: - print('------ DISCONNECTED') - self._connecting_to_ssid = "" - if self._disconnected is not None: - self._disconnected() - - print() - - if self._connecting_to_ssid: - print(' CONNECTING', self._connecting_to_ssid) - - prev_state = dev_state - - time.sleep(1 / 2.) - - def set_callbacks(self, need_auth: Callable[[str], None], - activated: Callable[[], None] | None, - forgotten: Callable[[str], None], - networks_updated: Callable[[list[Network]], None], - disconnected: Callable[[], None]): - self._need_auth = need_auth - self._activated = activated - self._forgotten = forgotten - self._networks_updated = networks_updated - self._disconnected = disconnected - - def _run(self): - i = 0 - while self._running: - if self._active: - print('we;re acti!!!!!!!!!!!!') - # Scan for networks every 5 seconds - if i % 5 == 0: - # TODO: should watch when scan is complete, but this is more than good enough for now - self._update_networks() - self._request_scan() - - i += 1 - time.sleep(1) - - def set_active(self, active: bool): - print('SETTING ACTIVE', active) - self._active = active - - def _get_wifi_device(self) -> dbus.ObjectPath | None: - # TODO: cache if slow - t = time.monotonic() - device_paths = self._nm.GetDevices() - # print(f'DEVICE PATHS: {device_paths}') - - wifi_device = None - for device_path in device_paths: - dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) - dev_type = dev_props.Get(NM_DEVICE_IFACE, "DeviceType") - - if dev_type == NM_DEVICE_TYPE_WIFI: - wifi_device = device_path - break - - # print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}") - return wifi_device - - def connect_to_network(self, ssid: str, password: str | None): - def worker(): - t = time.monotonic() - - # Clear all connections that may already exist to the network we are connecting - self._connecting_to_ssid = ssid - self.forget_connection(ssid, block=True) - - is_hidden = False - - connection = { - 'connection': { - 'type': '802-11-wireless', - 'uuid': str(uuid.uuid4()), - 'id': f'openpilot connection {ssid}', - 'autoconnect-retries': 0, - }, - '802-11-wireless': { - 'ssid': dbus.ByteArray(ssid.encode("utf-8")), - 'hidden': is_hidden, - 'mode': 'infrastructure', - }, - 'ipv4': { - 'method': 'auto', - 'dns-priority': 600, - }, - 'ipv6': {'method': 'ignore'}, - } - - if password is not None: - connection['802-11-wireless-security'] = { - 'key-mgmt': 'wpa-psk', - 'auth-alg': 'open', - 'psk': password, - } - - settings = dbus.Interface( - self._bus.get_object(NM, NM_SETTINGS_PATH), - NM_SETTINGS_IFACE - ) - - conn_path = settings.AddConnection(connection) - - print('Added connection', conn_path) - - print(f'Connecting to network took {time.monotonic() - t}s') - - self.activate_connection(ssid) - - threading.Thread(target=worker, daemon=True).start() - - def _get_connections(self) -> list[dbus.ObjectPath]: - settings_iface = dbus.Interface(self._bus.get_object(NM, NM_SETTINGS_PATH), NM_SETTINGS_IFACE) - return settings_iface.ListConnections() - - def _connection_by_ssid(self, ssid: str, known_connections: list[dbus.ObjectPath] | None = None) -> dbus.ObjectPath | None: - for conn_path in known_connections or self._get_connections(): - conn_props = dbus.Interface(self._bus.get_object(NM, conn_path), NM_CONNECTION_IFACE) - settings = conn_props.GetSettings() - if "802-11-wireless" in settings and bytes(settings["802-11-wireless"]["ssid"]).decode("utf-8", "replace") == ssid: - return conn_path - return None - - def forget_connection(self, ssid: str, block: bool = False): - def worker(): - t = time.monotonic() - conn_path = self._connection_by_ssid(ssid) - print(f'Finding connection by SSID took {time.monotonic() - t}s: {conn_path}') - if conn_path is not None: - conn_iface = dbus.Interface(self._bus.get_object(NM, conn_path), NM_CONNECTION_IFACE) - conn_iface.Delete() - print(f'Forgetting connection took {time.monotonic() - t}s') - if self._forgotten is not None: - self._forgotten(ssid) - - # TODO: make a helper when it makes sense - if block: - worker() - else: - threading.Thread(target=worker, daemon=True).start() - - def activate_connection(self, ssid: str): - t = time.monotonic() - conn_path = self._connection_by_ssid(ssid) - if conn_path is not None: - self._connecting_to_ssid = ssid - device_path = self._get_wifi_device() - if device_path is None: - cloudlog.warning("No WiFi device found") - return - - print(f'Activating connection to {ssid}') - self._nm.ActivateConnection(conn_path, device_path, dbus.ObjectPath("/")) - print(f"Activated connection in {time.monotonic() - t}s") - # FIXME: deadlock issue with ui - # if self._activated is not None: - # self._activated() - - def _request_scan(self): - device_path = self._get_wifi_device() - if device_path is None: - cloudlog.warning("No WiFi device found") - return - - wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE) - try: - wifi_iface.RequestScan({}) - print('Requested scan') - except dbus.exceptions.DBusException as e: - # TODO: copilot is wrong, this never happens - if "org.freedesktop.NetworkManager.Device.Error.AlreadyScanning" in str(e): - print('Already scanning, skipping') - else: - cloudlog.exception("Failed to request scan") - - def _update_networks(self): - # TODO: only run this function on scan complete! - print('UPDATING NETWORKS!!!!') - - device_path = self._get_wifi_device() - if device_path is None: - cloudlog.warning("No WiFi device found") - return - - wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE) - dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) - active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint") - - aps: dict[str, list[AccessPoint]] = {} - - for ap_path in wifi_iface.GetAllAccessPoints(): - ap_props = dbus.Interface(self._bus.get_object(NM, ap_path), NM_PROPERTIES_IFACE) - - try: - ap = AccessPoint.from_dbus(ap_props, ap_path, active_ap_path) - if ap.ssid == "": - continue - - if ap.ssid not in aps: - aps[ap.ssid] = [] - - aps[ap.ssid].append(ap) - except dbus.exceptions.DBusException: - # some APs have been seen dropping off during iteration - cloudlog.exception(f"Failed to get AP properties for {ap_path}") - - known_connections = self._get_connections() - self._networks = [Network.from_dbus(ssid, ap_list, active_ap_path, self._connection_by_ssid(ssid, known_connections) is not None) - for ssid, ap_list in aps.items()] - if self._networks_updated is not None: - self._networks_updated(self._networks) - - def get_networks(self): - return self._networks diff --git a/system/ui/setup.py b/system/ui/setup.py index 800ca7662c..8f783639f7 100755 --- a/system/ui/setup.py +++ b/system/ui/setup.py @@ -19,7 +19,7 @@ from openpilot.system.ui.widgets import Widget from openpilot.system.ui.widgets.button import Button, ButtonStyle, ButtonRadio from openpilot.system.ui.widgets.keyboard import Keyboard from openpilot.system.ui.widgets.label import Label, TextAlignment -from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManagerWrapper +from openpilot.system.ui.widgets.network import WifiManagerUI, WifiManager NetworkType = log.DeviceState.NetworkType @@ -72,7 +72,7 @@ class Setup(Widget): self.download_url = "" self.download_progress = 0 self.download_thread = None - self.wifi_manager = WifiManagerWrapper() + self.wifi_manager = WifiManager() self.wifi_ui = WifiManagerUI(self.wifi_manager) self.keyboard = Keyboard() self.selected_radio = None diff --git a/system/ui/updater.py b/system/ui/updater.py index 31799d3628..731e766f8c 100755 --- a/system/ui/updater.py +++ b/system/ui/updater.py @@ -7,7 +7,7 @@ from enum import IntEnum from openpilot.system.hardware import HARDWARE from openpilot.system.ui.lib.application import gui_app, FontWeight -from openpilot.system.ui.lib.wifi_manager import WifiManagerWrapper +from openpilot.system.ui.lib.wifi_manager import WifiManager from openpilot.system.ui.widgets import Widget from openpilot.system.ui.widgets.button import gui_button, ButtonStyle from openpilot.system.ui.widgets.label import gui_text_box, gui_label @@ -43,7 +43,7 @@ class Updater(Widget): self.show_reboot_button = False self.process = None self.update_thread = None - self.wifi_manager = WifiManagerWrapper() + self.wifi_manager = WifiManager() self.wifi_manager_ui = WifiManagerUI(self.wifi_manager) def install_update(self): diff --git a/system/ui/widgets/network.py b/system/ui/widgets/network.py index 4b06296daf..7b76f3092a 100644 --- a/system/ui/widgets/network.py +++ b/system/ui/widgets/network.py @@ -6,8 +6,7 @@ from typing import Literal import pyray as rl from openpilot.system.ui.lib.application import gui_app from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel -from openpilot.system.ui.lib.wifi_manager import NetworkInfo, WifiManagerCallbacks, WifiManagerWrapper, SecurityType -from openpilot.system.ui.lib.wifi_manager_v2 import WifiManager, Network +from openpilot.system.ui.lib.wifi_manager import WifiManager, SecurityType, Network from openpilot.system.ui.widgets import Widget from openpilot.system.ui.widgets.button import ButtonStyle, Button, TextAlignment from openpilot.system.ui.widgets.confirm_dialog import ConfirmDialog @@ -35,26 +34,26 @@ class StateIdle: @dataclass class StateConnecting: - network: NetworkInfo + network: Network action: Literal["connecting"] = "connecting" @dataclass class StateNeedsAuth: - network: NetworkInfo + network: Network retry: bool action: Literal["needs_auth"] = "needs_auth" @dataclass class StateShowForgetConfirm: - network: NetworkInfo + network: Network action: Literal["show_forget_confirm"] = "show_forget_confirm" @dataclass class StateForgetting: - network: NetworkInfo + network: Network action: Literal["forgetting"] = "forgetting" @@ -109,7 +108,7 @@ class WifiManagerUI(Widget): case _: self._draw_network_list(rect) - def _on_password_entered(self, network: NetworkInfo, result: int): + def _on_password_entered(self, network: Network, result: int): if result == 1: password = self.keyboard.text self.keyboard.clear() @@ -144,7 +143,7 @@ class WifiManagerUI(Widget): rl.end_scissor_mode() - def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool): + def _draw_network_item(self, rect, network: Network, clicked: bool): spacing = 50 ssid_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, ITEM_HEIGHT) signal_icon_rect = rl.Rectangle(rect.x + rect.width - ICON_SIZE, rect.y + (ITEM_HEIGHT - ICON_SIZE) / 2, ICON_SIZE, ICON_SIZE) @@ -197,7 +196,7 @@ class WifiManagerUI(Widget): if self.scroll_panel.is_touch_valid(): self.state = StateShowForgetConfirm(network) - def _draw_status_icon(self, rect, network: NetworkInfo): + def _draw_status_icon(self, rect, network: Network): """Draw the status icon based on network's connection state""" icon_file = None if network.is_connected: @@ -214,19 +213,19 @@ class WifiManagerUI(Widget): icon_rect = rl.Vector2(rect.x, rect.y + (ICON_SIZE - texture.height) / 2) rl.draw_texture_v(texture, icon_rect, rl.WHITE) - def _draw_signal_strength_icon(self, rect: rl.Rectangle, network: NetworkInfo): + def _draw_signal_strength_icon(self, rect: rl.Rectangle, network: Network): """Draw the Wi-Fi signal strength icon based on network's signal strength""" strength_level = max(0, min(3, round(network.strength / 33.0))) rl.draw_texture_v(gui_app.texture(STRENGTH_ICONS[strength_level], ICON_SIZE, ICON_SIZE), rl.Vector2(rect.x, rect.y), rl.WHITE) - def connect_to_network(self, network: NetworkInfo, password=''): + def connect_to_network(self, network: Network, password=''): self.state = StateConnecting(network) if network.is_saved and not password: self.wifi_manager.activate_connection(network.ssid) else: self.wifi_manager.connect_to_network(network.ssid, password) - def forget_network(self, network: NetworkInfo): + def forget_network(self, network: Network): self.state = StateForgetting(network) # network.is_saved = False self.wifi_manager.forget_connection(network.ssid)