openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

214 lines
6.1 KiB

import asyncio
import concurrent.futures
import copy
from collections import defaultdict
import dbus
import threading
import time
import uuid
from collections.abc import Callable
from dataclasses import dataclass
from enum import IntEnum
from typing import TypeVar
from dbus_next.aio import MessageBus
from dbus_next import BusType, Variant, Message
from dbus_next.errors import DBusError
from dbus_next.constants import MessageType
try:
from openpilot.common.params import Params
except ImportError:
# Params/Cythonized modules are not available in zipapp
Params = None
from openpilot.common.swaglog import cloudlog
T = TypeVar("T")
# NetworkManager constants
NM = "org.freedesktop.NetworkManager"
NM_PATH = '/org/freedesktop/NetworkManager'
NM_IFACE = 'org.freedesktop.NetworkManager'
NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings'
NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings'
NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection'
NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless'
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties'
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device"
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8
NM_DEVICE_TYPE_WIFI = 2
TETHERING_IP_ADDRESS = "192.168.43.1"
DEFAULT_TETHERING_PASSWORD = "swagswagcomma"
# NetworkManager device states
class NMDeviceState(IntEnum):
DISCONNECTED = 30
PREPARE = 40
NEED_AUTH = 60
IP_CONFIG = 70
ACTIVATED = 100
class SecurityType(IntEnum):
OPEN = 0
WPA = 1
WPA2 = 2
WPA3 = 3
UNSUPPORTED = 4
@dataclass(frozen=True)
class Network:
ssid: str
strength: int
is_connected: bool
# security_type: SecurityType
# network_path: dbus.ObjectPath
# bssid: str
# is_saved: bool = False
# saved_path: str
@dataclass(frozen=True)
class AccessPoint:
ssid: str
bssid: str
strength: int
is_connected: bool
flags: int
wpa_flags: int
rsn_flags: int
ap_path: dbus.ObjectPath
@classmethod
def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint":
ssid = bytes(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid")).decode("utf-8", "replace")
bssid = str(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress"))
strength = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Strength"))
flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "Flags"))
wpa_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "WpaFlags"))
rsn_flags = int(ap_props.Get("org.freedesktop.NetworkManager.AccessPoint", "RsnFlags"))
is_connected = ap_path == active_ap_path
return cls(
ssid=ssid,
bssid=bssid,
strength=strength,
is_connected=is_connected,
flags=flags,
wpa_flags=wpa_flags,
rsn_flags=rsn_flags,
ap_path=ap_path,
)
@dataclass
class WifiManagerCallbacks:
need_auth: Callable[[str], None] | None = None
activated: Callable[[], None] | None = None
forgotten: Callable[[str], None] | None = None
networks_updated: Callable[[list[Network]], None] | None = None
connection_failed: Callable[[str, str], None] | None = None # Added for error feedback
class WifiManager:
def __init__(self):
self._networks = [] # a network can be comprised of multiple APs
self._active = True # used to not run this cycle when not in settings
self._running = True
# DBus and NetworkManager setup
self._bus = dbus.SystemBus()
self._nm = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_IFACE)
self._props = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE)
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._running = False
self._thread.join()
def _run(self):
while True:
if self._active:
self._update_networks()
if not self._running:
break
time.sleep(1)
def set_active(self, active: bool):
self._active = active
def _get_wifi_device(self) -> dbus.ObjectPath | None:
# TODO: cache if slow
t = time.monotonic()
device_paths = self._nm.GetDevices()
wifi_device = None
for device_path in device_paths:
dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE)
dev_type = dev_props.Get(NM_DEVICE_IFACE, "DeviceType")
if dev_type == NM_DEVICE_TYPE_WIFI:
wifi_device = device_path
break
print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}")
return wifi_device
def _update_networks(self):
# TODO: only run this function on scan complete!
self._get_wifi_device()
print('UPDATING NETWORKS!!!!')
device_paths = self._nm.GetDevices()
print(f'DEVICE PATHS: {device_paths}')
all_networks: dict[str, Network] = {}
device_path = self._get_wifi_device()
if device_path is None:
cloudlog.warning("No WiFi device found")
return
wifi_iface = dbus.Interface(self._bus.get_object(NM, device_path), NM_WIRELESS_IFACE)
dev_props = dbus.Interface(self._bus.get_object(NM, device_path), NM_PROPERTIES_IFACE)
active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint")
print('active', active_ap_path)
aps: dict[str, list[AccessPoint]] = {}
for ap_path in wifi_iface.GetAllAccessPoints():
ap_props = dbus.Interface(self._bus.get_object(NM, ap_path), NM_PROPERTIES_IFACE)
ap = AccessPoint.from_dbus(ap_props, ap_path, active_ap_path)
if ap.ssid not in aps:
aps[ap.ssid] = []
aps[ap.ssid].append(ap)
print('ap:', ap)
networks: dict[str, Network] = {}
for ssid, ap_list in aps.items():
# we only want to show the strongest AP for each SSID
strongest_ap = max(ap_list, key=lambda ap: ap.strength)
is_connected = any(ap.ap_path == active_ap_path for ap in ap_list)
print('ssid', ssid, is_connected)
networks[ssid] = Network(
ssid=ssid,
strength=strongest_ap.strength,
is_connected=is_connected,
)
print('networks:', networks)
def get_networks(self):
return self._networks