|
|
@ -19,6 +19,7 @@ NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless' |
|
|
|
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' |
|
|
|
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' |
|
|
|
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" |
|
|
|
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 |
|
|
|
|
|
|
|
|
|
|
|
# NetworkManager device states |
|
|
|
# NetworkManager device states |
|
|
|
class NMDeviceState(IntEnum): |
|
|
|
class NMDeviceState(IntEnum): |
|
|
@ -51,14 +52,15 @@ class NetworkInfo: |
|
|
|
class WifiManager: |
|
|
|
class WifiManager: |
|
|
|
def __init__(self): |
|
|
|
def __init__(self): |
|
|
|
self.networks: list[NetworkInfo] = [] |
|
|
|
self.networks: list[NetworkInfo] = [] |
|
|
|
self.bus: MessageBus | None = None |
|
|
|
self.bus: MessageBus = None |
|
|
|
self.device_path: str | None = None |
|
|
|
self.device_path: str = '' |
|
|
|
self.device_proxy = None |
|
|
|
self.device_proxy = None |
|
|
|
self.saved_connections: dict[str, str] = {} |
|
|
|
self.saved_connections: dict[str, str] = {} |
|
|
|
self.active_ap_path: str = '' |
|
|
|
self.active_ap_path: str = '' |
|
|
|
self.scan_task: asyncio.Task | None = None |
|
|
|
self.scan_task: asyncio.Task | None = None |
|
|
|
self.running: bool = True |
|
|
|
self.running: bool = True |
|
|
|
self.need_auth_callback = None |
|
|
|
self.need_auth_callback = None |
|
|
|
|
|
|
|
self.activated_callback = None |
|
|
|
|
|
|
|
|
|
|
|
async def connect(self) -> None: |
|
|
|
async def connect(self) -> None: |
|
|
|
"""Connect to the DBus system bus.""" |
|
|
|
"""Connect to the DBus system bus.""" |
|
|
@ -96,7 +98,7 @@ class WifiManager: |
|
|
|
except DBusError as e: |
|
|
|
except DBusError as e: |
|
|
|
cloudlog.warning(f"Scan request failed: {str(e)}") |
|
|
|
cloudlog.warning(f"Scan request failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
async def get_active_access_point(self) -> str: |
|
|
|
async def get_active_access_point(self): |
|
|
|
try: |
|
|
|
try: |
|
|
|
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) |
|
|
|
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) |
|
|
|
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') |
|
|
|
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') |
|
|
@ -183,7 +185,7 @@ class WifiManager: |
|
|
|
device = await self.bus.introspect(NM, device_path) |
|
|
|
device = await self.bus.introspect(NM, device_path) |
|
|
|
device_proxy = self.bus.get_proxy_object(NM, device_path, device) |
|
|
|
device_proxy = self.bus.get_proxy_object(NM, device_path, device) |
|
|
|
device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) |
|
|
|
device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) |
|
|
|
device_type = await device_interface.get_device_type() |
|
|
|
device_type = await device_interface.get_device_type() # type: ignore[attr-defined] |
|
|
|
if device_type == 2: # WiFi device |
|
|
|
if device_type == 2: # WiFi device |
|
|
|
self.device_path = device_path |
|
|
|
self.device_path = device_path |
|
|
|
self.device_proxy = device_proxy |
|
|
|
self.device_proxy = device_proxy |
|
|
@ -232,11 +234,17 @@ class WifiManager: |
|
|
|
def _on_state_changed(self, new_state: int, old_state: int, reason: int): |
|
|
|
def _on_state_changed(self, new_state: int, old_state: int, reason: int): |
|
|
|
print(f"State changed: {old_state} -> {new_state}, reason: {reason}") |
|
|
|
print(f"State changed: {old_state} -> {new_state}, reason: {reason}") |
|
|
|
if new_state == NMDeviceState.ACTIVATED: |
|
|
|
if new_state == NMDeviceState.ACTIVATED: |
|
|
|
|
|
|
|
if self.activated_callback: |
|
|
|
|
|
|
|
self.activated_callback() |
|
|
|
asyncio.create_task(self._update_connection_status()) |
|
|
|
asyncio.create_task(self._update_connection_status()) |
|
|
|
elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH): |
|
|
|
elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH): |
|
|
|
for network in self.networks: |
|
|
|
for network in self.networks: |
|
|
|
network.is_connected = False |
|
|
|
network.is_connected = False |
|
|
|
if new_state == NMDeviceState.NEED_AUTH and self.need_auth_callback: |
|
|
|
if ( |
|
|
|
|
|
|
|
new_state == NMDeviceState.NEED_AUTH |
|
|
|
|
|
|
|
and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT |
|
|
|
|
|
|
|
and self.need_auth_callback |
|
|
|
|
|
|
|
): |
|
|
|
self.need_auth_callback() |
|
|
|
self.need_auth_callback() |
|
|
|
|
|
|
|
|
|
|
|
def _on_new_connection(self, path: str) -> None: |
|
|
|
def _on_new_connection(self, path: str) -> None: |
|
|
@ -289,10 +297,9 @@ class WifiManager: |
|
|
|
|
|
|
|
|
|
|
|
async def _get_available_networks(self): |
|
|
|
async def _get_available_networks(self): |
|
|
|
"""Get a list of available networks via NetworkManager.""" |
|
|
|
"""Get a list of available networks via NetworkManager.""" |
|
|
|
networks = [] |
|
|
|
|
|
|
|
wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) |
|
|
|
wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) |
|
|
|
access_points = await wifi_iface.get_access_points() |
|
|
|
access_points = await wifi_iface.get_access_points() |
|
|
|
|
|
|
|
network_dict = {} |
|
|
|
for ap_path in access_points: |
|
|
|
for ap_path in access_points: |
|
|
|
try: |
|
|
|
try: |
|
|
|
props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE) |
|
|
|
props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE) |
|
|
@ -303,27 +310,28 @@ class WifiManager: |
|
|
|
continue |
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
bssid = properties.get('HwAddress', Variant('s', '')).value |
|
|
|
bssid = properties.get('HwAddress', Variant('s', '')).value |
|
|
|
|
|
|
|
strength = properties['Strength'].value |
|
|
|
flags = properties['Flags'].value |
|
|
|
flags = properties['Flags'].value |
|
|
|
wpa_flags = properties['WpaFlags'].value |
|
|
|
wpa_flags = properties['WpaFlags'].value |
|
|
|
rsn_flags = properties['RsnFlags'].value |
|
|
|
rsn_flags = properties['RsnFlags'].value |
|
|
|
|
|
|
|
existing_network = network_dict.get(ssid) |
|
|
|
networks.append( |
|
|
|
if not existing_network or ((not existing_network.bssid and bssid) or (existing_network.strength < strength)): |
|
|
|
NetworkInfo( |
|
|
|
network_dict[ssid] = NetworkInfo( |
|
|
|
ssid=ssid, |
|
|
|
ssid=ssid, |
|
|
|
strength=properties['Strength'].value, |
|
|
|
strength=strength, |
|
|
|
security_type=self._get_security_type(flags, wpa_flags, rsn_flags), |
|
|
|
security_type=self._get_security_type(flags, wpa_flags, rsn_flags), |
|
|
|
path=ap_path, |
|
|
|
path=ap_path, |
|
|
|
bssid=bssid, |
|
|
|
bssid=bssid, |
|
|
|
is_connected=self.active_ap_path == ap_path, |
|
|
|
is_connected=self.active_ap_path == ap_path, |
|
|
|
) |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
except DBusError as e: |
|
|
|
except DBusError as e: |
|
|
|
cloudlog.error(f"Error fetching networks: {e}") |
|
|
|
cloudlog.error(f"Error fetching networks: {e}") |
|
|
|
except Exception as e: |
|
|
|
except Exception as e: |
|
|
|
cloudlog.error({e}) |
|
|
|
cloudlog.error({e}) |
|
|
|
|
|
|
|
|
|
|
|
self.networks = sorted( |
|
|
|
self.networks = sorted( |
|
|
|
networks, |
|
|
|
network_dict.values(), |
|
|
|
key=lambda network: ( |
|
|
|
key=lambda network: ( |
|
|
|
not network.is_connected, |
|
|
|
not network.is_connected, |
|
|
|
-network.strength, # Higher signal strength first |
|
|
|
-network.strength, # Higher signal strength first |
|
|
|