|
|
|
@ -28,12 +28,27 @@ class SecurityType(IntEnum): |
|
|
|
|
UNSUPPORTED = 4 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_security_type(flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: |
|
|
|
|
wpa_props = wpa_flags | rsn_flags |
|
|
|
|
|
|
|
|
|
# obtained by looking at flags of networks in the office as reported by an Android phone |
|
|
|
|
supports_wpa = NM_802_11_AP_SEC_PAIR_WEP40 | NM_802_11_AP_SEC_PAIR_WEP104 | NM_802_11_AP_SEC_GROUP_WEP40 | NM_802_11_AP_SEC_GROUP_WEP104 | NM_802_11_AP_SEC_KEY_MGMT_PSK; |
|
|
|
|
|
|
|
|
|
if (flags == NM_802_11_AP_FLAGS_NONE) or ((flags & NM_802_11_AP_FLAGS_WPS) and not (wpa_props & supports_wpa)): |
|
|
|
|
return SecurityType.OPEN |
|
|
|
|
elif (flags & NM_802_11_AP_FLAGS_PRIVACY) and (wpa_props & supports_wpa) and not (wpa_props & NM_802_11_AP_SEC_KEY_MGMT_802_1X): |
|
|
|
|
return SecurityType.WPA |
|
|
|
|
else: |
|
|
|
|
cloudlog.warning(f"Unsupported network! flags: {flags}, wpa_flags: {wpa_flags}, rsn_flags: {rsn_flags}") |
|
|
|
|
return SecurityType.UNSUPPORTED |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
|
|
class Network: |
|
|
|
|
ssid: str |
|
|
|
|
strength: int |
|
|
|
|
is_connected: bool |
|
|
|
|
security_type: SecurityType # TODO |
|
|
|
|
security_type: SecurityType |
|
|
|
|
is_saved: bool = False # TODO |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
@ -41,13 +56,14 @@ class Network: |
|
|
|
|
# we only want to show the strongest AP for each Network/SSID |
|
|
|
|
strongest_ap = max(aps, key=lambda ap: ap.strength) |
|
|
|
|
|
|
|
|
|
is_connected = any(ap.ap_path == active_ap_path for ap in aps) |
|
|
|
|
is_connected = any(ap.ap_path == active_ap_path for ap in aps) # TODO: just any is_connected aps! |
|
|
|
|
security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags) |
|
|
|
|
|
|
|
|
|
return cls( |
|
|
|
|
ssid=ssid, |
|
|
|
|
strength=strongest_ap.strength, |
|
|
|
|
is_connected=is_connected, |
|
|
|
|
security_type=SecurityType.UNSUPPORTED, # TODO |
|
|
|
|
security_type=security_type, |
|
|
|
|
is_saved=False, # TODO |
|
|
|
|
) |
|
|
|
|
|
|
|
|
@ -65,12 +81,12 @@ class AccessPoint: |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": |
|
|
|
|
ssid = bytes(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid")).decode("utf-8", "replace") |
|
|
|
|
bssid = str(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress")) |
|
|
|
|
strength = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Strength")) |
|
|
|
|
flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Flags")) |
|
|
|
|
wpa_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "WpaFlags")) |
|
|
|
|
rsn_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "RsnFlags")) |
|
|
|
|
ssid = bytes(ap_props.Get(NM_ACCESS_POINT_IFACE, "Ssid")).decode("utf-8", "replace") |
|
|
|
|
bssid = str(ap_props.Get(NM_ACCESS_POINT_IFACE, "HwAddress")) |
|
|
|
|
strength = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Strength")) |
|
|
|
|
flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "Flags")) |
|
|
|
|
wpa_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "WpaFlags")) |
|
|
|
|
rsn_flags = int(ap_props.Get(NM_ACCESS_POINT_IFACE, "RsnFlags")) |
|
|
|
|
|
|
|
|
|
return cls( |
|
|
|
|
ssid=ssid, |
|
|
|
@ -97,12 +113,15 @@ class WifiManager: |
|
|
|
|
|
|
|
|
|
# Callbacks |
|
|
|
|
self._networks_updated: Callable[[list[Network]], None] | None = None |
|
|
|
|
self._connection_failed: Callable[[str, str], None] | None = None |
|
|
|
|
|
|
|
|
|
self._thread = threading.Thread(target=self._run, daemon=True) |
|
|
|
|
self._thread.start() |
|
|
|
|
|
|
|
|
|
def set_callbacks(self, networks_updated: Callable[[list[Network]], None]): |
|
|
|
|
def set_callbacks(self, networks_updated: Callable[[list[Network]], None], |
|
|
|
|
connection_failed: Callable[[str, str], None]): |
|
|
|
|
self._networks_updated = networks_updated |
|
|
|
|
self._connection_failed = connection_failed |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self._running = False |
|
|
|
@ -139,6 +158,24 @@ class WifiManager: |
|
|
|
|
# print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}") |
|
|
|
|
return wifi_device |
|
|
|
|
|
|
|
|
|
def connect_to_network(self, ssid: str, password: str | None): |
|
|
|
|
self._forget_connection(ssid) |
|
|
|
|
... |
|
|
|
|
|
|
|
|
|
def _get_connections(self) -> list[dbus.ObjectPath]: |
|
|
|
|
settings_iface = dbus.Interface(self._bus.get_object(NM, NM_SETTINGS_PATH), NM_SETTINGS_IFACE) |
|
|
|
|
return settings_iface.ListConnections() |
|
|
|
|
|
|
|
|
|
def _forget_connection(self, ssid: str): |
|
|
|
|
for conn_path in self._get_connections(): |
|
|
|
|
conn_props = dbus.Interface(self._bus.get_object(NM, conn_path), NM_CONNECTION_IFACE) |
|
|
|
|
settings = conn_props.GetSettings() |
|
|
|
|
if "802-11-wireless" in settings and bytes(settings["802-11-wireless"]["ssid"]).decode("utf-8", "replace") == ssid: |
|
|
|
|
conn_iface = dbus.Interface(self._bus.get_object(NM, conn_path), NM_CONNECTION_IFACE) |
|
|
|
|
conn_iface.Delete() |
|
|
|
|
print('deleted', ssid) |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
def _update_networks(self): |
|
|
|
|
# TODO: only run this function on scan complete! |
|
|
|
|
print('UPDATING NETWORKS!!!!') |
|
|
|
|