|
|
|
@ -9,6 +9,7 @@ from enum import IntEnum |
|
|
|
|
|
|
|
|
|
from collections import deque |
|
|
|
|
from jeepney import DBusAddress, new_method_call |
|
|
|
|
from jeepney.wrappers import Properties |
|
|
|
|
from jeepney.bus_messages import message_bus, MatchRule |
|
|
|
|
from jeepney.io.blocking import open_dbus_connection |
|
|
|
|
|
|
|
|
@ -120,8 +121,15 @@ class WifiManager: |
|
|
|
|
self._nm = dbus.Interface(self._main_bus.get_object(NM, NM_PATH), NM_IFACE) |
|
|
|
|
self._props = dbus.Interface(self._main_bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) |
|
|
|
|
|
|
|
|
|
# Jeepney DBus connections |
|
|
|
|
# TODO: can we use one? or will the signal blocking not work properly? |
|
|
|
|
self._conn_main = open_dbus_connection(bus="SYSTEM") # used by scanner / general method calls |
|
|
|
|
self._conn_monitor = open_dbus_connection(bus="SYSTEM") # used by state monitor thread |
|
|
|
|
|
|
|
|
|
self._nmj = DBusAddress(NM_PATH, bus_name=NM, interface=NM_IFACE) |
|
|
|
|
|
|
|
|
|
# store wifi device path |
|
|
|
|
self._wifi_device: dbus.ObjectPath | None = None |
|
|
|
|
self._wifi_device: str | None = None |
|
|
|
|
|
|
|
|
|
# State |
|
|
|
|
self._connecting_to_ssid: str = "" |
|
|
|
@ -165,7 +173,7 @@ class WifiManager: |
|
|
|
|
threading.Thread(target=self._update_networks, daemon=True).start() |
|
|
|
|
|
|
|
|
|
def _monitor_state(self): |
|
|
|
|
device_path: dbus.ObjectPath = self._wait_for_wifi_device() |
|
|
|
|
device_path: str = self._wait_for_wifi_device() |
|
|
|
|
|
|
|
|
|
conn = open_dbus_connection(bus="SYSTEM") |
|
|
|
|
|
|
|
|
@ -234,9 +242,9 @@ class WifiManager: |
|
|
|
|
|
|
|
|
|
time.sleep(5) |
|
|
|
|
|
|
|
|
|
def _wait_for_wifi_device(self) -> dbus.ObjectPath: |
|
|
|
|
def _wait_for_wifi_device(self) -> str: |
|
|
|
|
with self._lock: |
|
|
|
|
device_path: dbus.ObjectPath | None = None |
|
|
|
|
device_path: str | None = None |
|
|
|
|
while not self._exit: |
|
|
|
|
device_path = self._get_wifi_device() |
|
|
|
|
if device_path is not None: |
|
|
|
@ -244,18 +252,18 @@ class WifiManager: |
|
|
|
|
time.sleep(1) |
|
|
|
|
return device_path |
|
|
|
|
|
|
|
|
|
def _get_wifi_device(self) -> dbus.ObjectPath | None: |
|
|
|
|
def _get_wifi_device(self) -> str | None: |
|
|
|
|
if self._wifi_device is not None: |
|
|
|
|
return self._wifi_device |
|
|
|
|
|
|
|
|
|
t = time.monotonic() |
|
|
|
|
device_paths = self._nm.GetDevices() |
|
|
|
|
device_paths = self._conn_main.send_and_get_reply(new_method_call(self._nmj, 'GetDevices')).body[0] |
|
|
|
|
|
|
|
|
|
for device_path in device_paths: |
|
|
|
|
dev_props = dbus.Interface(self._main_bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) |
|
|
|
|
dev_type = dev_props.Get(NM_DEVICE_IFACE, "DeviceType") |
|
|
|
|
dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_DEVICE_IFACE) |
|
|
|
|
dev_type = self._conn_main.send_and_get_reply(Properties(dev_addr).get('DeviceType')).body[0] |
|
|
|
|
|
|
|
|
|
if dev_type == NM_DEVICE_TYPE_WIFI: |
|
|
|
|
if dev_type[1] == NM_DEVICE_TYPE_WIFI: |
|
|
|
|
self._wifi_device = device_path |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|