|
|
|
@ -63,22 +63,6 @@ class Network: |
|
|
|
|
security_type: SecurityType |
|
|
|
|
is_saved: bool |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def from_dbus_jeepney(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: dbus.ObjectPath, is_saved: bool) -> "Network": |
|
|
|
|
# we only want to show the strongest AP for each Network/SSID |
|
|
|
|
strongest_ap = max(aps, key=lambda ap: ap.strength) |
|
|
|
|
|
|
|
|
|
is_connected = any(ap.ap_path == active_ap_path for ap in aps) # TODO: just any is_connected aps! |
|
|
|
|
security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags) |
|
|
|
|
|
|
|
|
|
return cls( |
|
|
|
|
ssid=ssid, |
|
|
|
|
strength=strongest_ap.strength, |
|
|
|
|
is_connected=is_connected and is_saved, |
|
|
|
|
security_type=security_type, |
|
|
|
|
is_saved=is_saved, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def from_dbus(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: dbus.ObjectPath, is_saved: bool) -> "Network": |
|
|
|
|
# we only want to show the strongest AP for each Network/SSID |
|
|
|
@ -108,7 +92,7 @@ class AccessPoint: |
|
|
|
|
ap_path: dbus.ObjectPath |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def from_dbus_jeepney(cls, conn: DBusConnection, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": |
|
|
|
|
def from_dbus(cls, conn: DBusConnection, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": |
|
|
|
|
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE) |
|
|
|
|
|
|
|
|
|
ssid = bytes(conn.send_and_get_reply(Properties(ap_addr).get('Ssid')).body[0][1]).decode("utf-8", "replace") |
|
|
|
@ -129,26 +113,6 @@ class AccessPoint: |
|
|
|
|
ap_path=ap_path, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": |
|
|
|
|
ssid = bytes(ap_props.Get(NM_ACCESS_POINT_IFACE, "Ssid")).decode("utf-8", "replace") |
|
|
|
|
bssid = str(ap_props.Get(NM_ACCESS_POINT_IFACE, "HwAddress")) |
|
|
|
|
strength = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Strength")) |
|
|
|
|
flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Flags")) |
|
|
|
|
wpa_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "WpaFlags")) |
|
|
|
|
rsn_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "RsnFlags")) |
|
|
|
|
|
|
|
|
|
return cls( |
|
|
|
|
ssid=ssid, |
|
|
|
|
bssid=bssid, |
|
|
|
|
strength=strength, |
|
|
|
|
is_connected=ap_path == active_ap_path, |
|
|
|
|
flags=flags, |
|
|
|
|
wpa_flags=wpa_flags, |
|
|
|
|
rsn_flags=rsn_flags, |
|
|
|
|
ap_path=ap_path, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WifiManager: |
|
|
|
|
def __init__(self): |
|
|
|
@ -203,7 +167,6 @@ class WifiManager: |
|
|
|
|
|
|
|
|
|
atexit.register(self.stop) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _tmp_init(self): |
|
|
|
|
self._wait_for_wifi_device() |
|
|
|
|
self._update_networks_old() |
|
|
|
@ -483,44 +446,6 @@ class WifiManager: |
|
|
|
|
except DBusErrorResponse: |
|
|
|
|
cloudlog.exception("Failed to request scan") |
|
|
|
|
|
|
|
|
|
def _update_networks_old(self): |
|
|
|
|
print('UPDATING NETWORKS!!!!') |
|
|
|
|
|
|
|
|
|
if self._wifi_device is None: |
|
|
|
|
cloudlog.warning("No WiFi device found") |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
wifi_iface = dbus.Interface(self._main_bus.get_object(NM, self._wifi_device), NM_WIRELESS_IFACE) |
|
|
|
|
dev_props = dbus.Interface(self._main_bus.get_object(NM, self._wifi_device), NM_PROPERTIES_IFACE) |
|
|
|
|
active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint") |
|
|
|
|
|
|
|
|
|
aps: dict[str, list[AccessPoint]] = {} |
|
|
|
|
|
|
|
|
|
for ap_path in wifi_iface.GetAllAccessPoints(): |
|
|
|
|
ap_props = dbus.Interface(self._main_bus.get_object(NM, ap_path), NM_PROPERTIES_IFACE) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
ap = AccessPoint.from_dbus(ap_props, ap_path, active_ap_path) |
|
|
|
|
if ap.ssid == "": |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
if ap.ssid not in aps: |
|
|
|
|
aps[ap.ssid] = [] |
|
|
|
|
|
|
|
|
|
aps[ap.ssid].append(ap) |
|
|
|
|
except dbus.exceptions.DBusException: |
|
|
|
|
# some APs have been seen dropping off during iteration |
|
|
|
|
cloudlog.exception(f"Failed to get AP properties for {ap_path}") |
|
|
|
|
|
|
|
|
|
known_connections = self._get_connections() |
|
|
|
|
networks = [Network.from_dbus(ssid, ap_list, active_ap_path, self._connection_by_ssid(ssid, known_connections) is not None) |
|
|
|
|
for ssid, ap_list in aps.items()] |
|
|
|
|
networks.sort(key=lambda n: (-n.is_connected, -n.strength, n.ssid.lower())) |
|
|
|
|
self._networks = networks |
|
|
|
|
|
|
|
|
|
if self._networks_updated is not None: |
|
|
|
|
self._networks_updated(self._networks) |
|
|
|
|
|
|
|
|
|
def _update_networks(self): |
|
|
|
|
print('UPDATING NETWORKS!!!!') |
|
|
|
|
|
|
|
|
@ -528,25 +453,16 @@ class WifiManager: |
|
|
|
|
cloudlog.warning("No WiFi device found") |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
# wifi_iface = dbus.Interface(self._main_bus.get_object(NM, self._wifi_device), NM_WIRELESS_IFACE) |
|
|
|
|
# dev_props = dbus.Interface(self._main_bus.get_object(NM, self._wifi_device), NM_PROPERTIES_IFACE) |
|
|
|
|
# active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint") |
|
|
|
|
|
|
|
|
|
# jeepney: |
|
|
|
|
wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE) |
|
|
|
|
|
|
|
|
|
# returns '/' if no active AP |
|
|
|
|
wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE) |
|
|
|
|
active_ap_path = self._conn_main.send_and_get_reply(Properties(wifi_addr).get('ActiveAccessPoint')).body[0][1] |
|
|
|
|
print('got active_ap_path', active_ap_path) |
|
|
|
|
|
|
|
|
|
ap_paths = self._conn_main.send_and_get_reply(new_method_call(wifi_addr, 'GetAllAccessPoints')).body[0] |
|
|
|
|
print('ap paths', ap_paths) |
|
|
|
|
|
|
|
|
|
aps: dict[str, list[AccessPoint]] = {} |
|
|
|
|
|
|
|
|
|
for ap_path in ap_paths: |
|
|
|
|
# try: |
|
|
|
|
ap = AccessPoint.from_dbus_jeepney(self._conn_main, ap_path, active_ap_path) |
|
|
|
|
ap = AccessPoint.from_dbus(self._conn_main, ap_path, active_ap_path) |
|
|
|
|
if ap.ssid == "": |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
@ -560,7 +476,7 @@ class WifiManager: |
|
|
|
|
# cloudlog.exception(f"Failed to get AP properties for {ap_path}") |
|
|
|
|
|
|
|
|
|
known_connections = self._get_connections() |
|
|
|
|
networks = [Network.from_dbus_jeepney(ssid, ap_list, active_ap_path, self._connection_by_ssid(ssid, known_connections) is not None) |
|
|
|
|
networks = [Network.from_dbus(ssid, ap_list, active_ap_path, self._connection_by_ssid(ssid, known_connections) is not None) |
|
|
|
|
for ssid, ap_list in aps.items()] |
|
|
|
|
networks.sort(key=lambda n: (-n.is_connected, -n.strength, n.ssid.lower())) |
|
|
|
|
self._networks = networks |
|
|
|
|