You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
208 lines
6.0 KiB
208 lines
6.0 KiB
import asyncio
|
|
import concurrent.futures
|
|
import copy
|
|
from collections import defaultdict
|
|
import dbus
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
from enum import IntEnum
|
|
from typing import TypeVar
|
|
|
|
from dbus_next.aio import MessageBus
|
|
from dbus_next import BusType, Variant, Message
|
|
from dbus_next.errors import DBusError
|
|
from dbus_next.constants import MessageType
|
|
|
|
try:
|
|
from openpilot.common.params import Params
|
|
except ImportError:
|
|
# Params/Cythonized modules are not available in zipapp
|
|
Params = None
|
|
from openpilot.common.swaglog import cloudlog
|
|
|
|
T = TypeVar("T")
|
|
|
|
# NetworkManager constants
|
|
NM = "org.freedesktop.NetworkManager"
|
|
NM_PATH = '/org/freedesktop/NetworkManager'
|
|
NM_IFACE = 'org.freedesktop.NetworkManager'
|
|
NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings'
|
|
NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings'
|
|
NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection'
|
|
NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless'
|
|
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties'
|
|
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device"
|
|
|
|
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8
|
|
|
|
NM_DEVICE_TYPE_WIFI = 2
|
|
|
|
TETHERING_IP_ADDRESS = "192.168.43.1"
|
|
DEFAULT_TETHERING_PASSWORD = "swagswagcomma"
|
|
|
|
|
|
# NetworkManager device states
|
|
class NMDeviceState(IntEnum):
|
|
DISCONNECTED = 30
|
|
PREPARE = 40
|
|
NEED_AUTH = 60
|
|
IP_CONFIG = 70
|
|
ACTIVATED = 100
|
|
|
|
|
|
class SecurityType(IntEnum):
|
|
OPEN = 0
|
|
WPA = 1
|
|
WPA2 = 2
|
|
WPA3 = 3
|
|
UNSUPPORTED = 4
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Network:
|
|
ssid: str
|
|
strength: int
|
|
is_connected: bool
|
|
# security_type: SecurityType
|
|
# network_path: dbus.ObjectPath
|
|
# bssid: str
|
|
# is_saved: bool = False
|
|
# saved_path: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AccessPoint:
|
|
ssid: str
|
|
bssid: str
|
|
strength: int
|
|
is_connected: bool
|
|
flags: int
|
|
wpa_flags: int
|
|
rsn_flags: int
|
|
ap_path: dbus.ObjectPath
|
|
|
|
@classmethod
|
|
def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint":
|
|
ssid = bytes(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid")).decode("utf-8", "replace")
|
|
bssid = str(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress"))
|
|
strength = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Strength"))
|
|
flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Flags"))
|
|
wpa_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "WpaFlags"))
|
|
rsn_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "RsnFlags"))
|
|
is_connected = ap_path == active_ap_path
|
|
|
|
return cls(
|
|
ssid=ssid,
|
|
bssid=bssid,
|
|
strength=strength,
|
|
is_connected=is_connected,
|
|
flags=flags,
|
|
wpa_flags=wpa_flags,
|
|
rsn_flags=rsn_flags,
|
|
ap_path=ap_path,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class WifiManagerCallbacks:
|
|
need_auth: Callable[[str], None] | None = None
|
|
activated: Callable[[], None] | None = None
|
|
forgotten: Callable[[str], None] | None = None
|
|
networks_updated: Callable[[list[Network]], None] | None = None
|
|
connection_failed: Callable[[str, str], None] | None = None # Added for error feedback
|
|
|
|
|
|
class WifiManager:
|
|
def __init__(self):
|
|
self._networks = [] # a network can be comprised of multiple APs
|
|
self._active = True # used to not run this cycle when not in settings
|
|
self._running = True
|
|
|
|
# DBus and NetworkManager setup
|
|
self._bus = dbus.SystemBus()
|
|
self._nm = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_IFACE)
|
|
self._props = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE)
|
|
|
|
self._thread = threading.Thread(target=self._run, daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
self._thread.join()
|
|
|
|
def _run(self):
|
|
while True:
|
|
if self._active:
|
|
self._update_networks()
|
|
|
|
if not self._running:
|
|
break
|
|
|
|
time.sleep(1)
|
|
|
|
def set_active(self, active: bool):
|
|
self._active = active
|
|
|
|
def _get_wifi_device(self) -> dbus.ObjectPath | None:
|
|
# TODO: cache if slow
|
|
t = time.monotonic()
|
|
device_paths = self._nm.GetDevices()
|
|
# print(f'DEVICE PATHS: {device_paths}')
|
|
|
|
wifi_device = None
|
|
for device_path in device_paths:
|
|
dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE)
|
|
dev_type = dev_props.Get(NM_DEVICE_IFACE, "DeviceType")
|
|
|
|
if dev_type == NM_DEVICE_TYPE_WIFI:
|
|
wifi_device = device_path
|
|
break
|
|
|
|
print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}")
|
|
return wifi_device
|
|
|
|
def _update_networks(self):
|
|
# TODO: only run this function on scan complete!
|
|
print('UPDATING NETWORKS!!!!')
|
|
|
|
device_path = self._get_wifi_device()
|
|
if device_path is None:
|
|
cloudlog.warning("No WiFi device found")
|
|
return
|
|
|
|
wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE)
|
|
dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE)
|
|
active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint")
|
|
|
|
aps: dict[str, list[AccessPoint]] = {}
|
|
|
|
for ap_path in wifi_iface.GetAllAccessPoints():
|
|
ap_props = dbus.Interface(self._bus.get_object(NM, ap_path), NM_PROPERTIES_IFACE)
|
|
|
|
ap = AccessPoint.from_dbus(ap_props, ap_path, active_ap_path)
|
|
if ap.ssid not in aps:
|
|
aps[ap.ssid] = []
|
|
|
|
aps[ap.ssid].append(ap)
|
|
|
|
networks = []
|
|
for ssid, ap_list in aps.items():
|
|
# we only want to show the strongest AP for each SSID
|
|
strongest_ap = max(ap_list, key=lambda ap: ap.strength)
|
|
|
|
is_connected = any(ap.ap_path == active_ap_path for ap in ap_list)
|
|
|
|
networks.append(Network(
|
|
ssid=ssid,
|
|
strength=strongest_ap.strength,
|
|
is_connected=is_connected,
|
|
))
|
|
|
|
# TODO: lock this? i don't think so since this is atomic replace right?!!?!
|
|
self._networks = networks
|
|
|
|
def get_networks(self):
|
|
return self._networks
|
|
|