import asyncio import threading import time import uuid from collections.abc import Callable from dataclasses import dataclass from enum import IntEnum from dbus_next.aio import MessageBus from dbus_next import BusType, Variant, Message from dbus_next.errors import DBusError from dbus_next.constants import MessageType from openpilot.common.swaglog import cloudlog # NetworkManager constants NM = "org.freedesktop.NetworkManager" NM_PATH = '/org/freedesktop/NetworkManager' NM_IFACE = 'org.freedesktop.NetworkManager' NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings' NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings' NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection' NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless' NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 # NetworkManager device states class NMDeviceState(IntEnum): DISCONNECTED = 30 PREPARE = 40 NEED_AUTH = 60 IP_CONFIG = 70 ACTIVATED = 100 class SecurityType(IntEnum): OPEN = 0 WPA = 1 WPA2 = 2 WPA3 = 3 UNSUPPORTED = 4 @dataclass class NetworkInfo: ssid: str strength: int is_connected: bool security_type: SecurityType path: str bssid: str # saved_path: str @dataclass class WifiManagerCallbacks: need_auth: Callable[[], None] | None = None activated: Callable[[], None] | None = None forgotten: Callable[[], None] | None = None class WifiManager: def __init__(self, callbacks: WifiManagerCallbacks): self.callbacks = callbacks self.networks: list[NetworkInfo] = [] self.bus: MessageBus = None self.device_path: str = "" self.device_proxy = None self.saved_connections: dict[str, str] = {} self.active_ap_path: str = "" self.scan_task: asyncio.Task | None = None self.running: bool = True async def connect(self) -> None: """Connect to the DBus system bus.""" try: self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect() if not await self._find_wifi_device(): raise ValueError("No Wi-Fi device found") await self._setup_signals(self.device_path) self.active_ap_path = await self.get_active_access_point() self.saved_connections = await self._get_saved_connections() self.scan_task = asyncio.create_task(self._periodic_scan()) except DBusError as e: cloudlog.error(f"Failed to connect to DBus: {e}") raise except Exception as e: cloudlog.error(f"Unexpected error during connect: {e}") raise async def shutdown(self) -> None: self.running = False if self.scan_task: self.scan_task.cancel() try: await self.scan_task except asyncio.CancelledError: pass if self.bus: await self.bus.disconnect() async def request_scan(self) -> None: try: interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) await interface.call_request_scan({}) except DBusError as e: cloudlog.warning(f"Scan request failed: {str(e)}") async def get_active_access_point(self): try: props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') return ap_path.value except DBusError as e: cloudlog.error(f"Error fetching active access point: {str(e)}") return '' async def forget_connection(self, ssid: str) -> bool: path = self.saved_connections.get(ssid) if not path: return False try: nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE) await nm_iface.call_delete() return True except DBusError as e: cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}") return False async def activate_connection(self, ssid: str) -> bool: connection_path = self.saved_connections.get(ssid) if not connection_path: return False try: nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) await nm_iface.call_activate_connection(connection_path, self.device_path, "/") return True except DBusError as e: cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}") return False async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None: """Connect to a selected Wi-Fi network.""" try: connection = { 'connection': { 'type': Variant('s', '802-11-wireless'), 'uuid': Variant('s', str(uuid.uuid4())), 'id': Variant('s', ssid), 'autoconnect-retries': Variant('i', 0), }, '802-11-wireless': { 'ssid': Variant('ay', ssid.encode('utf-8')), 'hidden': Variant('b', is_hidden), 'mode': Variant('s', 'infrastructure'), }, 'ipv4': {'method': Variant('s', 'auto')}, 'ipv6': {'method': Variant('s', 'ignore')}, } if bssid: connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) if password: connection['802-11-wireless-security'] = { 'key-mgmt': Variant('s', 'wpa-psk'), 'auth-alg': Variant('s', 'open'), 'psk': Variant('s', password), } nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) await nm_iface.call_add_and_activate_connection(connection, self.device_path, "/") await self._update_connection_status() except DBusError as e: cloudlog.error(f"Error connecting to network: {e}") def is_saved(self, ssid: str) -> bool: return ssid in self.saved_connections async def _find_wifi_device(self) -> bool: nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) devices = await nm_iface.get_devices() for device_path in devices: device = await self.bus.introspect(NM, device_path) device_proxy = self.bus.get_proxy_object(NM, device_path, device) device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) device_type = await device_interface.get_device_type() # type: ignore[attr-defined] if device_type == 2: # Wi-Fi device self.device_path = device_path self.device_proxy = device_proxy return True return False async def _periodic_scan(self): while self.running: try: await self.request_scan() await self._get_available_networks() await asyncio.sleep(30) except asyncio.CancelledError: break except DBusError as e: cloudlog.error(f"Scan failed: {e}") await asyncio.sleep(5) async def _setup_signals(self, device_path: str) -> None: rules = [ f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'", f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'", f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'", f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'", ] for rule in rules: await self._add_match_rule(rule) # Set up signal handlers self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed) self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed) settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) settings_iface.on_new_connection(self._on_new_connection) settings_iface.on_connection_removed(self._on_connection_removed) def _on_properties_changed(self, interface: str, changed: dict, invalidated: list): # print("property changed", interface, changed, invalidated) if 'LastScan' in changed: asyncio.create_task(self._get_available_networks()) elif interface == NM_WIRELESS_IFACE and "ActiveAccessPoint" in changed: self.active_ap_path = changed["ActiveAccessPoint"].value asyncio.create_task(self._get_available_networks()) def _on_state_changed(self, new_state: int, old_state: int, reason: int): print(f"State changed: {old_state} -> {new_state}, reason: {reason}") if new_state == NMDeviceState.ACTIVATED: if self.callbacks.activated: self.callbacks.activated() asyncio.create_task(self._update_connection_status()) elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH): for network in self.networks: network.is_connected = False if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth: self.callbacks.need_auth() def _on_new_connection(self, path: str) -> None: """Callback for NewConnection signal.""" print(f"New connection added: {path}") asyncio.create_task(self._add_saved_connection(path)) def _on_connection_removed(self, path: str) -> None: """Callback for ConnectionRemoved signal.""" print(f"Connection removed: {path}") for ssid, p in list(self.saved_connections.items()): if path == p: del self.saved_connections[ssid] break async def _add_saved_connection(self, path: str) -> None: """Add a new saved connection to the dictionary.""" try: settings = await self._get_connection_settings(path) if ssid := self._extract_ssid(settings): self.saved_connections[ssid] = path except DBusError as e: cloudlog.error(f"Failed to add connection {path}: {e}") def _extract_ssid(self, settings: dict) -> str | None: """Extract SSID from connection settings.""" ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value return ''.join(chr(b) for b in ssid_variant) if ssid_variant else None async def _update_connection_status(self): self.active_ap_path = await self.get_active_access_point() await self._get_available_networks() async def _add_match_rule(self, rule): """Add a match rule on the bus.""" reply = await self.bus.call( Message( message_type=MessageType.METHOD_CALL, destination='org.freedesktop.DBus', interface="org.freedesktop.DBus", path='/org/freedesktop/DBus', member='AddMatch', signature='s', body=[rule], ) ) assert reply.message_type == MessageType.METHOD_RETURN return reply async def _get_available_networks(self): """Get a list of available networks via NetworkManager.""" wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) access_points = await wifi_iface.get_access_points() network_dict = {} for ap_path in access_points: try: props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE) properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint') ssid_variant = properties['Ssid'].value ssid = ''.join(chr(byte) for byte in ssid_variant) if not ssid: continue bssid = properties.get('HwAddress', Variant('s', '')).value strength = properties['Strength'].value flags = properties['Flags'].value wpa_flags = properties['WpaFlags'].value rsn_flags = properties['RsnFlags'].value existing_network = network_dict.get(ssid) if not existing_network or ((not existing_network.bssid and bssid) or (existing_network.strength < strength)): network_dict[ssid] = NetworkInfo( ssid=ssid, strength=strength, security_type=self._get_security_type(flags, wpa_flags, rsn_flags), path=ap_path, bssid=bssid, is_connected=self.active_ap_path == ap_path, ) except DBusError as e: cloudlog.error(f"Error fetching networks: {e}") except Exception as e: cloudlog.error({e}) self.networks = sorted( network_dict.values(), key=lambda network: ( not network.is_connected, -network.strength, # Higher signal strength first network.ssid.lower(), ), ) async def _get_connection_settings(self, path): """Fetch connection settings for a specific connection path.""" try: connection_proxy = await self.bus.introspect(NM, path) connection = self.bus.get_proxy_object(NM, path, connection_proxy) settings = connection.get_interface(NM_CONNECTION_IFACE) return await settings.call_get_settings() except DBusError as e: cloudlog.error(f"Failed to get settings for {path}: {str(e)}") return {} async def _process_chunk(self, paths_chunk): """Process a chunk of connection paths.""" tasks = [self._get_connection_settings(path) for path in paths_chunk] return await asyncio.gather(*tasks, return_exceptions=True) async def _get_saved_connections(self) -> dict[str, str]: try: settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) connection_paths = await settings_iface.call_list_connections() saved_ssids: dict[str, str] = {} batch_size = 20 for i in range(0, len(connection_paths), batch_size): chunk = connection_paths[i : i + batch_size] results = await self._process_chunk(chunk) for path, config in zip(chunk, results, strict=True): if isinstance(config, dict) and '802-11-wireless' in config: if ssid := self._extract_ssid(config): saved_ssids[ssid] = path return saved_ssids except DBusError as e: cloudlog.error(f"Error fetching saved connections: {str(e)}") return {} async def _get_interface(self, bus_name: str, path: str, name: str): introspection = await self.bus.introspect(bus_name, path) proxy = self.bus.get_proxy_object(bus_name, path, introspection) return proxy.get_interface(name) def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: """Determine the security type based on flags.""" if flags == 0 and not (wpa_flags or rsn_flags): return SecurityType.OPEN if rsn_flags & 0x200: # SAE (WPA3 Personal) return SecurityType.WPA3 if rsn_flags: # RSN indicates WPA2 or higher return SecurityType.WPA2 if wpa_flags: # WPA flags indicate WPA return SecurityType.WPA return SecurityType.UNSUPPORTED class WifiManagerWrapper: def __init__(self): self._manager: WifiManager | None = None self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks() self._loop = None self._running = False self._lock = threading.RLock() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() while self._thread is not None and not self._running: time.sleep(0.1) @property def callbacks(self) -> WifiManagerCallbacks: return self._callbacks @callbacks.setter def callbacks(self, callbacks: WifiManagerCallbacks): with self._lock: self._callbacks = callbacks def _run(self): self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) try: self._manager = WifiManager(self._callbacks) self._running = True self._loop.run_forever() except Exception as e: cloudlog.error(f"Error in WifiManagerWrapper thread: {e}") finally: if self._loop.is_running(): self._loop.stop() self._running = False def shutdown(self): if self._running: self._run_coroutine(self._manager.shutdown()) if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) if self._thread and self._thread.is_alive(): self._thread.join(timeout=2.0) self._running = False @property def networks(self) -> list[NetworkInfo]: """Get the current list of networks (thread-safe).""" with self._lock: return self._manager.networks if self._manager else [] def is_saved(self, ssid: str) -> bool: """Check if a network is saved (thread-safe).""" with self._lock: return self._manager.is_saved(ssid) if self._manager else False def connect(self): """Connect to DBus and start Wi-Fi scanning.""" self._run_coroutine(self._manager.connect()) def request_scan(self): """Request a scan for Wi-Fi networks.""" self._run_coroutine(self._manager.request_scan()) def forget_connection(self, ssid: str): """Forget a saved Wi-Fi connection.""" self._run_coroutine(self._manager.forget_connection(ssid)) def activate_connection(self, ssid: str): """Activate an existing Wi-Fi connection.""" self._run_coroutine(self._manager.activate_connection(ssid)) def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False): """Connect to a Wi-Fi network.""" self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden)) def _run_coroutine(self, coro): """Run a coroutine in the async thread.""" if not self._running or not self._loop: cloudlog.error("WifiManager thread is not running") return asyncio.run_coroutine_threadsafe(coro, self._loop)