diff --git a/selfdrive/ui/layouts/settings/settings.py b/selfdrive/ui/layouts/settings/settings.py index 2d03b60448..fb9387f7b4 100644 --- a/selfdrive/ui/layouts/settings/settings.py +++ b/selfdrive/ui/layouts/settings/settings.py @@ -8,6 +8,7 @@ from openpilot.selfdrive.ui.layouts.settings.firehose import FirehoseLayout from openpilot.selfdrive.ui.layouts.settings.software import SoftwareLayout from openpilot.selfdrive.ui.layouts.settings.toggles import TogglesLayout from openpilot.system.ui.lib.wifi_manager import WifiManagerWrapper +from openpilot.system.ui.lib.wifi_manager_v2 import WifiManager from openpilot.system.ui.lib.application import gui_app, FontWeight, MousePos from openpilot.system.ui.lib.text_measure import measure_text_cached from openpilot.system.ui.widgets.network import WifiManagerUI @@ -58,6 +59,8 @@ class SettingsLayout(Widget): self.wifi_manager = WifiManagerWrapper() self.wifi_ui = WifiManagerUI(self.wifi_manager) + self.wifi_manager2 = WifiManager() + self._panels = { PanelType.DEVICE: PanelInfo("Device", DeviceLayout()), PanelType.NETWORK: PanelInfo("Network", self.wifi_ui), @@ -79,6 +82,8 @@ class SettingsLayout(Widget): self._close_callback = on_close def _render(self, rect: rl.Rectangle): + print(self.wifi_manager2.get_aps()) + # Calculate layout sidebar_rect = rl.Rectangle(rect.x, rect.y, SIDEBAR_WIDTH, rect.height) panel_rect = rl.Rectangle(rect.x + SIDEBAR_WIDTH, rect.y, rect.width - SIDEBAR_WIDTH, rect.height) diff --git a/system/ui/lib/wifi_manager_v2.py b/system/ui/lib/wifi_manager_v2.py new file mode 100644 index 0000000000..cb1d95a31a --- /dev/null +++ b/system/ui/lib/wifi_manager_v2.py @@ -0,0 +1,236 @@ +import asyncio +import concurrent.futures +import copy +from collections import defaultdict +import dbus +import threading +import time +import uuid +from collections.abc import Callable +from dataclasses import dataclass +from enum import IntEnum +from typing import TypeVar + +from dbus_next.aio import MessageBus +from dbus_next import BusType, Variant, Message +from dbus_next.errors import DBusError +from dbus_next.constants import MessageType + +try: + from openpilot.common.params import Params +except ImportError: + # Params/Cythonized modules are not available in zipapp + Params = None +from openpilot.common.swaglog import cloudlog + +T = TypeVar("T") + +# NetworkManager constants +NM = "org.freedesktop.NetworkManager" +NM_PATH = '/org/freedesktop/NetworkManager' +NM_IFACE = 'org.freedesktop.NetworkManager' +NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings' +NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings' +NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection' +NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless' +NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' +NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" + +NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 + +NM_DEVICE_TYPE_WIFI = 2 + +TETHERING_IP_ADDRESS = "192.168.43.1" +DEFAULT_TETHERING_PASSWORD = "swagswagcomma" + + +# NetworkManager device states +class NMDeviceState(IntEnum): + DISCONNECTED = 30 + PREPARE = 40 + NEED_AUTH = 60 + IP_CONFIG = 70 + ACTIVATED = 100 + + +class SecurityType(IntEnum): + OPEN = 0 + WPA = 1 + WPA2 = 2 + WPA3 = 3 + UNSUPPORTED = 4 + + +@dataclass(frozen=True) +class Network: + ssid: str + strength: int + is_connected: bool + # security_type: SecurityType + # network_path: dbus.ObjectPath + # bssid: str + # is_saved: bool = False + # saved_path: str + + +@dataclass(frozen=True) +class AccessPoint: + ssid: str + bssid: str + strength: int + is_connected: bool + flags: int + wpa_flags: int + rsn_flags: int + ap_path: dbus.ObjectPath + + @classmethod + def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": + ssid = bytes(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid")).decode("utf-8", "replace") + bssid = str(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress")) + strength = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Strength")) + flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Flags")) + wpa_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "WpaFlags")) + rsn_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "RsnFlags")) + is_connected = ap_path == active_ap_path + + return cls( + ssid=ssid, + bssid=bssid, + strength=strength, + is_connected=is_connected, + flags=flags, + wpa_flags=wpa_flags, + rsn_flags=rsn_flags, + ap_path=ap_path, + ) + + +@dataclass +class WifiManagerCallbacks: + need_auth: Callable[[str], None] | None = None + activated: Callable[[], None] | None = None + forgotten: Callable[[str], None] | None = None + networks_updated: Callable[[list[Network]], None] | None = None + connection_failed: Callable[[str, str], None] | None = None # Added for error feedback + + +class WifiManager: + def __init__(self): + self._networks = [] # a network can be comprised of multiple APs + self._active = True # used to not run this cycle when not in settings + self._running = True + + # DBus and NetworkManager setup + self._bus = dbus.SystemBus() + self._nm = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_IFACE) + self._props = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) + + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + self._thread.join() + + def _run(self): + while True: + if self._active: + self._update_networks() + + if not self._running: + break + + time.sleep(1) + + def set_active(self, active: bool): + self._active = active + + def _get_wifi_device(self) -> dbus.ObjectPath | None: + # TODO: cache if slow + t = time.monotonic() + device_paths = self._nm.GetDevices() + + wifi_device = None + for device_path in device_paths: + dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) + dev_type = dev_props.Get(NM_DEVICE_IFACE, "DeviceType") + + if dev_type == NM_DEVICE_TYPE_WIFI: + wifi_device = device_path + break + + print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}") + return wifi_device + + def _update_networks(self): + # TODO: only run this function on scan complete! + self._get_wifi_device() + print('UPDATING NETWORKS!!!!') + device_paths = self._nm.GetDevices() + print(f'DEVICE PATHS: {device_paths}') + + all_networks: dict[str, Network] = {} + + device_path = self._get_wifi_device() + if device_path is None: + cloudlog.warning("No WiFi device found") + return + + wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE) + dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) + active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint") + + print('active', active_ap_path) + + aps: dict[str, list[AccessPoint]] = {} + + for ap_path in wifi_iface.GetAllAccessPoints(): + # TODO: ap_path to AccessPoint function, or actually dataclass has function to do this? + ap_props = dbus.Interface(self._bus.get_object(NM, ap_path), NM_PROPERTIES_IFACE) + + + # ssid = bytes(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid")).decode("utf-8", "replace") + # bssid = str(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress")) + # strength = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Strength")) + # flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Flags")) + # wpa_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "WpaFlags")) + # rsn_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "RsnFlags")) + # is_connected = ap_path == active_ap_path + # + + ap = AccessPoint.from_dbus(ap_props, ap_path, active_ap_path) + if ap.ssid not in aps: + aps[ap.ssid] = [] + aps[ap.ssid].append(ap) + print('ap:', ap) + # + # aps[ssid].append(AccessPoint( + # ssid=ssid, + # bssid=bssid, + # strength=strength, + # is_connected=is_connected, + # flags=flags, + # wpa_flags=wpa_flags, + # rsn_flags=rsn_flags, + # ap_path=ap_path, + # )) + + networks: dict[str, Network] = {} + for ssid, ap_list in aps.items(): + # we only want to show the strongest AP for each SSID + strongest_ap = max(ap_list, key=lambda ap: ap.strength) + + is_connected = any(ap.ap_path == active_ap_path for ap in ap_list) + print('ssid', ssid, is_connected) + + networks[ssid] = Network( + ssid=ssid, + strength=strongest_ap.strength, + is_connected=is_connected, + ) + + print('networks:', networks) + + def get_networks(self): + return self._networks