parent
09f2c86945
commit
0e93076253
2 changed files with 241 additions and 0 deletions
@ -0,0 +1,236 @@ |
|||||||
|
import asyncio |
||||||
|
import concurrent.futures |
||||||
|
import copy |
||||||
|
from collections import defaultdict |
||||||
|
import dbus |
||||||
|
import threading |
||||||
|
import time |
||||||
|
import uuid |
||||||
|
from collections.abc import Callable |
||||||
|
from dataclasses import dataclass |
||||||
|
from enum import IntEnum |
||||||
|
from typing import TypeVar |
||||||
|
|
||||||
|
from dbus_next.aio import MessageBus |
||||||
|
from dbus_next import BusType, Variant, Message |
||||||
|
from dbus_next.errors import DBusError |
||||||
|
from dbus_next.constants import MessageType |
||||||
|
|
||||||
|
try: |
||||||
|
from openpilot.common.params import Params |
||||||
|
except ImportError: |
||||||
|
# Params/Cythonized modules are not available in zipapp |
||||||
|
Params = None |
||||||
|
from openpilot.common.swaglog import cloudlog |
||||||
|
|
||||||
|
T = TypeVar("T") |
||||||
|
|
||||||
|
# NetworkManager constants |
||||||
|
NM = "org.freedesktop.NetworkManager" |
||||||
|
NM_PATH = '/org/freedesktop/NetworkManager' |
||||||
|
NM_IFACE = 'org.freedesktop.NetworkManager' |
||||||
|
NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings' |
||||||
|
NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings' |
||||||
|
NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection' |
||||||
|
NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless' |
||||||
|
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' |
||||||
|
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" |
||||||
|
|
||||||
|
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 |
||||||
|
|
||||||
|
NM_DEVICE_TYPE_WIFI = 2 |
||||||
|
|
||||||
|
TETHERING_IP_ADDRESS = "192.168.43.1" |
||||||
|
DEFAULT_TETHERING_PASSWORD = "swagswagcomma" |
||||||
|
|
||||||
|
|
||||||
|
# NetworkManager device states |
||||||
|
class NMDeviceState(IntEnum): |
||||||
|
DISCONNECTED = 30 |
||||||
|
PREPARE = 40 |
||||||
|
NEED_AUTH = 60 |
||||||
|
IP_CONFIG = 70 |
||||||
|
ACTIVATED = 100 |
||||||
|
|
||||||
|
|
||||||
|
class SecurityType(IntEnum): |
||||||
|
OPEN = 0 |
||||||
|
WPA = 1 |
||||||
|
WPA2 = 2 |
||||||
|
WPA3 = 3 |
||||||
|
UNSUPPORTED = 4 |
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True) |
||||||
|
class Network: |
||||||
|
ssid: str |
||||||
|
strength: int |
||||||
|
is_connected: bool |
||||||
|
# security_type: SecurityType |
||||||
|
# network_path: dbus.ObjectPath |
||||||
|
# bssid: str |
||||||
|
# is_saved: bool = False |
||||||
|
# saved_path: str |
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True) |
||||||
|
class AccessPoint: |
||||||
|
ssid: str |
||||||
|
bssid: str |
||||||
|
strength: int |
||||||
|
is_connected: bool |
||||||
|
flags: int |
||||||
|
wpa_flags: int |
||||||
|
rsn_flags: int |
||||||
|
ap_path: dbus.ObjectPath |
||||||
|
|
||||||
|
@classmethod |
||||||
|
def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint": |
||||||
|
ssid = bytes(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid")).decode("utf-8", "replace") |
||||||
|
bssid = str(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress")) |
||||||
|
strength = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Strength")) |
||||||
|
flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Flags")) |
||||||
|
wpa_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "WpaFlags")) |
||||||
|
rsn_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "RsnFlags")) |
||||||
|
is_connected = ap_path == active_ap_path |
||||||
|
|
||||||
|
return cls( |
||||||
|
ssid=ssid, |
||||||
|
bssid=bssid, |
||||||
|
strength=strength, |
||||||
|
is_connected=is_connected, |
||||||
|
flags=flags, |
||||||
|
wpa_flags=wpa_flags, |
||||||
|
rsn_flags=rsn_flags, |
||||||
|
ap_path=ap_path, |
||||||
|
) |
||||||
|
|
||||||
|
|
||||||
|
@dataclass |
||||||
|
class WifiManagerCallbacks: |
||||||
|
need_auth: Callable[[str], None] | None = None |
||||||
|
activated: Callable[[], None] | None = None |
||||||
|
forgotten: Callable[[str], None] | None = None |
||||||
|
networks_updated: Callable[[list[Network]], None] | None = None |
||||||
|
connection_failed: Callable[[str, str], None] | None = None # Added for error feedback |
||||||
|
|
||||||
|
|
||||||
|
class WifiManager: |
||||||
|
def __init__(self): |
||||||
|
self._networks = [] # a network can be comprised of multiple APs |
||||||
|
self._active = True # used to not run this cycle when not in settings |
||||||
|
self._running = True |
||||||
|
|
||||||
|
# DBus and NetworkManager setup |
||||||
|
self._bus = dbus.SystemBus() |
||||||
|
self._nm = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_IFACE) |
||||||
|
self._props = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) |
||||||
|
|
||||||
|
self._thread = threading.Thread(target=self._run, daemon=True) |
||||||
|
self._thread.start() |
||||||
|
|
||||||
|
def stop(self): |
||||||
|
self._running = False |
||||||
|
self._thread.join() |
||||||
|
|
||||||
|
def _run(self): |
||||||
|
while True: |
||||||
|
if self._active: |
||||||
|
self._update_networks() |
||||||
|
|
||||||
|
if not self._running: |
||||||
|
break |
||||||
|
|
||||||
|
time.sleep(1) |
||||||
|
|
||||||
|
def set_active(self, active: bool): |
||||||
|
self._active = active |
||||||
|
|
||||||
|
def _get_wifi_device(self) -> dbus.ObjectPath | None: |
||||||
|
# TODO: cache if slow |
||||||
|
t = time.monotonic() |
||||||
|
device_paths = self._nm.GetDevices() |
||||||
|
|
||||||
|
wifi_device = None |
||||||
|
for device_path in device_paths: |
||||||
|
dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) |
||||||
|
dev_type = dev_props.Get(NM_DEVICE_IFACE, "DeviceType") |
||||||
|
|
||||||
|
if dev_type == NM_DEVICE_TYPE_WIFI: |
||||||
|
wifi_device = device_path |
||||||
|
break |
||||||
|
|
||||||
|
print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}") |
||||||
|
return wifi_device |
||||||
|
|
||||||
|
def _update_networks(self): |
||||||
|
# TODO: only run this function on scan complete! |
||||||
|
self._get_wifi_device() |
||||||
|
print('UPDATING NETWORKS!!!!') |
||||||
|
device_paths = self._nm.GetDevices() |
||||||
|
print(f'DEVICE PATHS: {device_paths}') |
||||||
|
|
||||||
|
all_networks: dict[str, Network] = {} |
||||||
|
|
||||||
|
device_path = self._get_wifi_device() |
||||||
|
if device_path is None: |
||||||
|
cloudlog.warning("No WiFi device found") |
||||||
|
return |
||||||
|
|
||||||
|
wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE) |
||||||
|
dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE) |
||||||
|
active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint") |
||||||
|
|
||||||
|
print('active', active_ap_path) |
||||||
|
|
||||||
|
aps: dict[str, list[AccessPoint]] = {} |
||||||
|
|
||||||
|
for ap_path in wifi_iface.GetAllAccessPoints(): |
||||||
|
# TODO: ap_path to AccessPoint function, or actually dataclass has function to do this? |
||||||
|
ap_props = dbus.Interface(self._bus.get_object(NM, ap_path), NM_PROPERTIES_IFACE) |
||||||
|
|
||||||
|
|
||||||
|
# ssid = bytes(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid")).decode("utf-8", "replace") |
||||||
|
# bssid = str(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress")) |
||||||
|
# strength = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Strength")) |
||||||
|
# flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Flags")) |
||||||
|
# wpa_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "WpaFlags")) |
||||||
|
# rsn_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "RsnFlags")) |
||||||
|
# is_connected = ap_path == active_ap_path |
||||||
|
# |
||||||
|
|
||||||
|
ap = AccessPoint.from_dbus(ap_props, ap_path, active_ap_path) |
||||||
|
if ap.ssid not in aps: |
||||||
|
aps[ap.ssid] = [] |
||||||
|
aps[ap.ssid].append(ap) |
||||||
|
print('ap:', ap) |
||||||
|
# |
||||||
|
# aps[ssid].append(AccessPoint( |
||||||
|
# ssid=ssid, |
||||||
|
# bssid=bssid, |
||||||
|
# strength=strength, |
||||||
|
# is_connected=is_connected, |
||||||
|
# flags=flags, |
||||||
|
# wpa_flags=wpa_flags, |
||||||
|
# rsn_flags=rsn_flags, |
||||||
|
# ap_path=ap_path, |
||||||
|
# )) |
||||||
|
|
||||||
|
networks: dict[str, Network] = {} |
||||||
|
for ssid, ap_list in aps.items(): |
||||||
|
# we only want to show the strongest AP for each SSID |
||||||
|
strongest_ap = max(ap_list, key=lambda ap: ap.strength) |
||||||
|
|
||||||
|
is_connected = any(ap.ap_path == active_ap_path for ap in ap_list) |
||||||
|
print('ssid', ssid, is_connected) |
||||||
|
|
||||||
|
networks[ssid] = Network( |
||||||
|
ssid=ssid, |
||||||
|
strength=strongest_ap.strength, |
||||||
|
is_connected=is_connected, |
||||||
|
) |
||||||
|
|
||||||
|
print('networks:', networks) |
||||||
|
|
||||||
|
def get_networks(self): |
||||||
|
return self._networks |
Loading…
Reference in new issue