openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

481 lines
17 KiB

import asyncio
3 days ago
import threading
import time
import uuid
from collections.abc import Callable
from dataclasses import dataclass
from enum import IntEnum
from dbus_next.aio import MessageBus
from dbus_next import BusType, Variant, Message
from dbus_next.errors import DBusError
from dbus_next.constants import MessageType
from openpilot.common.swaglog import cloudlog
2 months ago
# NetworkManager constants
NM = "org.freedesktop.NetworkManager"
NM_PATH = '/org/freedesktop/NetworkManager'
NM_IFACE = 'org.freedesktop.NetworkManager'
NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings'
NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings'
NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection'
NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless'
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties'
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device"
2 months ago
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8
# NetworkManager device states
class NMDeviceState(IntEnum):
DISCONNECTED = 30
PREPARE = 40
NEED_AUTH = 60
IP_CONFIG = 70
ACTIVATED = 100
class SecurityType(IntEnum):
OPEN = 0
WPA = 1
WPA2 = 2
WPA3 = 3
UNSUPPORTED = 4
@dataclass
class NetworkInfo:
ssid: str
strength: int
is_connected: bool
security_type: SecurityType
path: str
bssid: str
# saved_path: str
3 days ago
@dataclass
class WifiManagerCallbacks:
need_auth: Callable[[], None] | None = None
activated: Callable[[], None] | None = None
forgotten: Callable[[], None] | None = None
class WifiManager:
3 days ago
def __init__(self, callbacks: WifiManagerCallbacks):
self.callbacks = callbacks
self.networks: list[NetworkInfo] = []
2 months ago
self.bus: MessageBus = None
3 days ago
self.device_path: str = ""
self.device_proxy = None
self.saved_connections: dict[str, str] = {}
3 days ago
self.active_ap_path: str = ""
self.scan_task: asyncio.Task | None = None
self.running: bool = True
async def connect(self) -> None:
"""Connect to the DBus system bus."""
try:
self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
if not await self._find_wifi_device():
raise ValueError("No Wi-Fi device found")
await self._setup_signals(self.device_path)
self.active_ap_path = await self.get_active_access_point()
self.saved_connections = await self._get_saved_connections()
self.scan_task = asyncio.create_task(self._periodic_scan())
except DBusError as e:
cloudlog.error(f"Failed to connect to DBus: {e}")
raise
except Exception as e:
cloudlog.error(f"Unexpected error during connect: {e}")
raise
async def shutdown(self) -> None:
self.running = False
if self.scan_task:
self.scan_task.cancel()
try:
await self.scan_task
except asyncio.CancelledError:
pass
if self.bus:
await self.bus.disconnect()
async def request_scan(self) -> None:
try:
interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE)
await interface.call_request_scan({})
except DBusError as e:
cloudlog.warning(f"Scan request failed: {str(e)}")
2 months ago
async def get_active_access_point(self):
try:
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE)
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint')
return ap_path.value
except DBusError as e:
cloudlog.error(f"Error fetching active access point: {str(e)}")
return ''
async def forget_connection(self, ssid: str) -> bool:
path = self.saved_connections.get(ssid)
if not path:
return False
try:
nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE)
await nm_iface.call_delete()
return True
except DBusError as e:
cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}")
return False
async def activate_connection(self, ssid: str) -> bool:
connection_path = self.saved_connections.get(ssid)
if not connection_path:
return False
try:
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
3 days ago
await nm_iface.call_activate_connection(connection_path, self.device_path, "/")
return True
except DBusError as e:
cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}")
return False
3 days ago
async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None:
3 days ago
"""Connect to a selected Wi-Fi network."""
try:
2 months ago
# settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
connection = {
'connection': {
'type': Variant('s', '802-11-wireless'),
'uuid': Variant('s', str(uuid.uuid4())),
'id': Variant('s', ssid),
'autoconnect-retries': Variant('i', 0),
},
'802-11-wireless': {
'ssid': Variant('ay', ssid.encode('utf-8')),
'hidden': Variant('b', is_hidden),
'mode': Variant('s', 'infrastructure'),
},
'ipv4': {'method': Variant('s', 'auto')},
'ipv6': {'method': Variant('s', 'ignore')},
}
if bssid:
connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8'))
if password:
connection['802-11-wireless-security'] = {
'key-mgmt': Variant('s', 'wpa-psk'),
'auth-alg': Variant('s', 'open'),
'psk': Variant('s', password),
}
2 months ago
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
2 months ago
await nm_iface.call_add_and_activate_connection(connection, self.device_path, "/")
2 months ago
# for network in self.networks:
# network.is_connected = True if network.ssid == ssid else False
await self._update_connection_status()
except DBusError as e:
cloudlog.error(f"Error connecting to network: {e}")
2 months ago
def is_saved(self, ssid: str) -> bool:
return ssid in self.saved_connections
async def _find_wifi_device(self) -> bool:
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
devices = await nm_iface.get_devices()
for device_path in devices:
device = await self.bus.introspect(NM, device_path)
device_proxy = self.bus.get_proxy_object(NM, device_path, device)
device_interface = device_proxy.get_interface(NM_DEVICE_IFACE)
2 months ago
device_type = await device_interface.get_device_type() # type: ignore[attr-defined]
3 days ago
if device_type == 2: # Wi-Fi device
self.device_path = device_path
self.device_proxy = device_proxy
return True
return False
async def _periodic_scan(self):
while self.running:
try:
await self.request_scan()
await self._get_available_networks()
await asyncio.sleep(30)
except asyncio.CancelledError:
break
except DBusError as e:
cloudlog.error(f"Scan failed: {e}")
await asyncio.sleep(5)
async def _setup_signals(self, device_path: str) -> None:
rules = [
f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'",
f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'",
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'",
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'",
]
for rule in rules:
await self._add_match_rule(rule)
# Set up signal handlers
self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed)
self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed)
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
settings_iface.on_new_connection(self._on_new_connection)
settings_iface.on_connection_removed(self._on_connection_removed)
def _on_properties_changed(self, interface: str, changed: dict, invalidated: list):
# print("property changed", interface, changed, invalidated)
if 'LastScan' in changed:
asyncio.create_task(self._get_available_networks())
elif interface == NM_WIRELESS_IFACE and "ActiveAccessPoint" in changed:
self.active_ap_path = changed["ActiveAccessPoint"].value
asyncio.create_task(self._get_available_networks())
def _on_state_changed(self, new_state: int, old_state: int, reason: int):
print(f"State changed: {old_state} -> {new_state}, reason: {reason}")
if new_state == NMDeviceState.ACTIVATED:
3 days ago
if self.callbacks.activated:
self.callbacks.activated()
asyncio.create_task(self._update_connection_status())
elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH):
for network in self.networks:
network.is_connected = False
3 days ago
if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth:
self.callbacks.need_auth()
def _on_new_connection(self, path: str) -> None:
"""Callback for NewConnection signal."""
print(f"New connection added: {path}")
asyncio.create_task(self._add_saved_connection(path))
def _on_connection_removed(self, path: str) -> None:
"""Callback for ConnectionRemoved signal."""
print(f"Connection removed: {path}")
for ssid, p in list(self.saved_connections.items()):
if path == p:
del self.saved_connections[ssid]
break
async def _add_saved_connection(self, path: str) -> None:
"""Add a new saved connection to the dictionary."""
try:
settings = await self._get_connection_settings(path)
if ssid := self._extract_ssid(settings):
self.saved_connections[ssid] = path
except DBusError as e:
cloudlog.error(f"Failed to add connection {path}: {e}")
def _extract_ssid(self, settings: dict) -> str | None:
"""Extract SSID from connection settings."""
ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value
return ''.join(chr(b) for b in ssid_variant) if ssid_variant else None
async def _update_connection_status(self):
self.active_ap_path = await self.get_active_access_point()
await self._get_available_networks()
async def _add_match_rule(self, rule):
3 days ago
"""Add a match rule on the bus."""
reply = await self.bus.call(
Message(
message_type=MessageType.METHOD_CALL,
destination='org.freedesktop.DBus',
interface="org.freedesktop.DBus",
path='/org/freedesktop/DBus',
member='AddMatch',
signature='s',
body=[rule],
)
)
assert reply.message_type == MessageType.METHOD_RETURN
return reply
async def _get_available_networks(self):
"""Get a list of available networks via NetworkManager."""
wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE)
access_points = await wifi_iface.get_access_points()
2 months ago
network_dict = {}
for ap_path in access_points:
try:
props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE)
properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint')
ssid_variant = properties['Ssid'].value
ssid = ''.join(chr(byte) for byte in ssid_variant)
if not ssid:
continue
bssid = properties.get('HwAddress', Variant('s', '')).value
2 months ago
strength = properties['Strength'].value
flags = properties['Flags'].value
wpa_flags = properties['WpaFlags'].value
rsn_flags = properties['RsnFlags'].value
2 months ago
existing_network = network_dict.get(ssid)
if not existing_network or ((not existing_network.bssid and bssid) or (existing_network.strength < strength)):
network_dict[ssid] = NetworkInfo(
ssid=ssid,
2 months ago
strength=strength,
security_type=self._get_security_type(flags, wpa_flags, rsn_flags),
path=ap_path,
bssid=bssid,
is_connected=self.active_ap_path == ap_path,
)
2 months ago
except DBusError as e:
cloudlog.error(f"Error fetching networks: {e}")
except Exception as e:
cloudlog.error({e})
self.networks = sorted(
2 months ago
network_dict.values(),
key=lambda network: (
not network.is_connected,
-network.strength, # Higher signal strength first
network.ssid.lower(),
),
)
async def _get_connection_settings(self, path):
"""Fetch connection settings for a specific connection path."""
try:
connection_proxy = await self.bus.introspect(NM, path)
connection = self.bus.get_proxy_object(NM, path, connection_proxy)
settings = connection.get_interface(NM_CONNECTION_IFACE)
return await settings.call_get_settings()
except DBusError as e:
cloudlog.error(f"Failed to get settings for {path}: {str(e)}")
return {}
async def _process_chunk(self, paths_chunk):
"""Process a chunk of connection paths."""
tasks = [self._get_connection_settings(path) for path in paths_chunk]
return await asyncio.gather(*tasks, return_exceptions=True)
async def _get_saved_connections(self) -> dict[str, str]:
try:
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
connection_paths = await settings_iface.call_list_connections()
saved_ssids: dict[str, str] = {}
batch_size = 20
for i in range(0, len(connection_paths), batch_size):
chunk = connection_paths[i : i + batch_size]
results = await self._process_chunk(chunk)
for path, config in zip(chunk, results, strict=True):
if isinstance(config, dict) and '802-11-wireless' in config:
if ssid := self._extract_ssid(config):
saved_ssids[ssid] = path
return saved_ssids
except DBusError as e:
cloudlog.error(f"Error fetching saved connections: {str(e)}")
return {}
async def _get_interface(self, bus_name: str, path: str, name: str):
introspection = await self.bus.introspect(bus_name, path)
proxy = self.bus.get_proxy_object(bus_name, path, introspection)
return proxy.get_interface(name)
def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType:
"""Determine the security type based on flags."""
if flags == 0 and not (wpa_flags or rsn_flags):
return SecurityType.OPEN
if rsn_flags & 0x200: # SAE (WPA3 Personal)
return SecurityType.WPA3
if rsn_flags: # RSN indicates WPA2 or higher
return SecurityType.WPA2
if wpa_flags: # WPA flags indicate WPA
return SecurityType.WPA
return SecurityType.UNSUPPORTED
3 days ago
class WifiManagerWrapper:
def __init__(self):
self._manager: WifiManager | None = None
self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks()
self._loop = None
self._running = False
self._lock = threading.RLock()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
while self._thread is not None and not self._running:
time.sleep(0.1)
@property
def callbacks(self) -> WifiManagerCallbacks:
return self._callbacks
@callbacks.setter
def callbacks(self, callbacks: WifiManagerCallbacks):
with self._lock:
self._callbacks = callbacks
def _run(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._manager = WifiManager(self._callbacks)
self._running = True
self._loop.run_forever()
except Exception as e:
cloudlog.error(f"Error in WifiManagerWrapper thread: {e}")
finally:
if self._loop.is_running():
self._loop.stop()
self._running = False
def shutdown(self):
if self._running:
self._run_coroutine(self._manager.shutdown())
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
self._running = False
@property
def networks(self) -> list[NetworkInfo]:
"""Get the current list of networks (thread-safe)."""
with self._lock:
return self._manager.networks if self._manager else []
def is_saved(self, ssid: str) -> bool:
"""Check if a network is saved (thread-safe)."""
with self._lock:
return self._manager.is_saved(ssid) if self._manager else False
def connect(self):
"""Connect to DBus and start Wi-Fi scanning."""
self._run_coroutine(self._manager.connect())
def request_scan(self):
"""Request a scan for Wi-Fi networks."""
self._run_coroutine(self._manager.request_scan())
def forget_connection(self, ssid: str):
"""Forget a saved Wi-Fi connection."""
self._run_coroutine(self._manager.forget_connection(ssid))
def activate_connection(self, ssid: str):
"""Activate an existing Wi-Fi connection."""
self._run_coroutine(self._manager.activate_connection(ssid))
def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False):
"""Connect to a Wi-Fi network."""
self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden))
def _run_coroutine(self, coro):
"""Run a coroutine in the async thread."""
if not self._running or not self._loop:
cloudlog.error("WifiManager thread is not running")
return
asyncio.run_coroutine_threadsafe(coro, self._loop)