|
|
@ -1,6 +1,5 @@ |
|
|
|
import sys |
|
|
|
import sys |
|
|
|
import atexit |
|
|
|
import atexit |
|
|
|
import dbus |
|
|
|
|
|
|
|
import threading |
|
|
|
import threading |
|
|
|
import time |
|
|
|
import time |
|
|
|
import uuid |
|
|
|
import uuid |
|
|
@ -64,7 +63,7 @@ class Network: |
|
|
|
is_saved: bool |
|
|
|
is_saved: bool |
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
@classmethod |
|
|
|
def from_dbus(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: dbus.ObjectPath, is_saved: bool) -> "Network": |
|
|
|
def from_dbus(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: str, is_saved: bool) -> "Network": |
|
|
|
# we only want to show the strongest AP for each Network/SSID |
|
|
|
# we only want to show the strongest AP for each Network/SSID |
|
|
|
strongest_ap = max(aps, key=lambda ap: ap.strength) |
|
|
|
strongest_ap = max(aps, key=lambda ap: ap.strength) |
|
|
|
|
|
|
|
|
|
|
@ -89,10 +88,10 @@ class AccessPoint: |
|
|
|
flags: int |
|
|
|
flags: int |
|
|
|
wpa_flags: int |
|
|
|
wpa_flags: int |
|
|
|
rsn_flags: int |
|
|
|
rsn_flags: int |
|
|
|
ap_path: dbus.ObjectPath |
|
|
|
ap_path: str |
|
|
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
@classmethod |
|
|
|
def from_dbus(cls, conn: DBusConnection, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": |
|
|
|
def from_dbus(cls, conn: DBusConnection, ap_path: str, active_ap_path: str) -> "AccessPoint": |
|
|
|
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE) |
|
|
|
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE) |
|
|
|
|
|
|
|
|
|
|
|
ssid = bytes(conn.send_and_get_reply(Properties(ap_addr).get('Ssid')).body[0][1]).decode("utf-8", "replace") |
|
|
|
ssid = bytes(conn.send_and_get_reply(Properties(ap_addr).get('Ssid')).body[0][1]).decode("utf-8", "replace") |
|
|
@ -120,12 +119,7 @@ class WifiManager: |
|
|
|
self._active = True # used to not run when not in settings |
|
|
|
self._active = True # used to not run when not in settings |
|
|
|
self._exit = False |
|
|
|
self._exit = False |
|
|
|
|
|
|
|
|
|
|
|
# DBus and NetworkManager setup |
|
|
|
# DBus connections |
|
|
|
self._main_bus = dbus.SystemBus() |
|
|
|
|
|
|
|
self._nm = dbus.Interface(self._main_bus.get_object(NM, NM_PATH), NM_IFACE) |
|
|
|
|
|
|
|
self._props = dbus.Interface(self._main_bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Jeepney DBus connections |
|
|
|
|
|
|
|
# TODO: can we use one? or will the signal blocking not work properly? |
|
|
|
# TODO: can we use one? or will the signal blocking not work properly? |
|
|
|
# TODO: chadder is saying we should lock the main connection since jeepney doesn't provide any multithreaded guarantees |
|
|
|
# TODO: chadder is saying we should lock the main connection since jeepney doesn't provide any multithreaded guarantees |
|
|
|
# TODO: which seems correct, router might be what we want instead: https://jeepney.readthedocs.io/en/latest/api/threading.html |
|
|
|
# TODO: which seems correct, router might be what we want instead: https://jeepney.readthedocs.io/en/latest/api/threading.html |
|
|
@ -495,9 +489,8 @@ class WifiManager: |
|
|
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
def stop(self): |
|
|
|
self._exit = True |
|
|
|
self._exit = True |
|
|
|
# self._scan_thread.join() |
|
|
|
self._scan_thread.join() |
|
|
|
# self._state_thread.join() |
|
|
|
self._state_thread.join() |
|
|
|
self._main_bus.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._conn_main.close() |
|
|
|
self._conn_main.close() |
|
|
|
self._conn_monitor.close() |
|
|
|
self._conn_monitor.close() |
|
|
|