From 3b94e6f92f922135d05d3d72c150f5d3b65819b0 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Sun, 11 May 2025 00:59:42 +0800 Subject: [PATCH] system/ui: add tethering support to WifiManager (#35167) add tethering support to WifiManager --- system/ui/lib/wifi_manager.py | 160 ++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/system/ui/lib/wifi_manager.py b/system/ui/lib/wifi_manager.py index 990e1d14a1..ff18092364 100644 --- a/system/ui/lib/wifi_manager.py +++ b/system/ui/lib/wifi_manager.py @@ -12,6 +12,7 @@ from dbus_next.aio import MessageBus from dbus_next import BusType, Variant, Message from dbus_next.errors import DBusError from dbus_next.constants import MessageType +from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog T = TypeVar("T") @@ -29,6 +30,9 @@ NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 +TETHERING_IP_ADDRESS = "192.168.43.1" +DEFAULT_TETHERING_PASSWORD = "12345678" + # NetworkManager device states class NMDeviceState(IntEnum): DISCONNECTED = 30 @@ -72,6 +76,7 @@ class WifiManager: self.saved_connections: dict[str, str] = {} self.active_ap_path: str = "" self.scan_task: asyncio.Task | None = None + self._tethering_ssid = "weedle-" + Params().get("DongleId", encoding="utf-8") self.running: bool = True async def connect(self) -> None: @@ -83,6 +88,7 @@ class WifiManager: await self._setup_signals(self.device_path) self.active_ap_path = await self.get_active_access_point() + await self.add_tethering_connection(self._tethering_ssid, DEFAULT_TETHERING_PASSWORD) self.saved_connections = await self._get_saved_connections() self.scan_task = asyncio.create_task(self._periodic_scan()) except DBusError as e: @@ -199,6 +205,160 @@ class WifiManager: return False + async def add_tethering_connection(self, ssid: str, password: str = "12345678") -> bool: + """Create a WiFi tethering connection.""" + if len(password) < 8: + print("Tethering password must be at least 8 characters") + return False + + try: + # First, check if a hotspot connection already exists + settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) + connection_paths = await settings_iface.call_list_connections() + + # Look for an existing hotspot connection + for path in connection_paths: + try: + settings = await self._get_connection_settings(path) + conn_type = settings.get('connection', {}).get('type', Variant('s', '')).value + wifi_mode = settings.get('802-11-wireless', {}).get('mode', Variant('s', '')).value + + if conn_type == '802-11-wireless' and wifi_mode == 'ap': + # Extract the SSID to check + connection_ssid = self._extract_ssid(settings) + if connection_ssid == ssid: + return True + except DBusError: + continue + + connection = { + 'connection': { + 'id': Variant('s', 'Hotspot'), + 'uuid': Variant('s', str(uuid.uuid4())), + 'type': Variant('s', '802-11-wireless'), + 'interface-name': Variant('s', 'wlan0'), + 'autoconnect': Variant('b', False), + }, + '802-11-wireless': { + 'band': Variant('s', 'bg'), + 'mode': Variant('s', 'ap'), + 'ssid': Variant('ay', ssid.encode('utf-8')), + }, + '802-11-wireless-security': { + 'group': Variant('as', ['ccmp']), + 'key-mgmt': Variant('s', 'wpa-psk'), + 'pairwise': Variant('as', ['ccmp']), + 'proto': Variant('as', ['rsn']), + 'psk': Variant('s', password), + }, + 'ipv4': { + 'method': Variant('s', 'shared'), + 'address-data': Variant('aa{sv}', [{'address': Variant('s', TETHERING_IP_ADDRESS), 'prefix': Variant('u', 24)}]), + 'gateway': Variant('s', TETHERING_IP_ADDRESS), + 'never-default': Variant('b', True), + }, + 'ipv6': { + 'method': Variant('s', 'ignore'), + }, + } + + settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) + new_connection = await settings_iface.call_add_connection(connection) + print(f"Added tethering connection with path: {new_connection}") + return True + except DBusError as e: + print(f"Failed to add tethering connection: {e}") + return False + except Exception as e: + print(f"Unexpected error adding tethering connection: {e}") + return False + + async def get_tethering_password(self) -> str: + """Get the current tethering password.""" + try: + hotspot_path = self.saved_connections.get(self._tethering_ssid) + if hotspot_path: + conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) + secrets = await conn_iface.call_get_secrets('802-11-wireless-security') + if secrets and '802-11-wireless-security' in secrets: + psk = secrets.get('802-11-wireless-security', {}).get('psk', Variant('s', '')).value + return str(psk) if psk is not None else "" + return "" + except DBusError as e: + print(f"Failed to get tethering password: {e}") + return "" + except Exception as e: + print(f"Unexpected error getting tethering password: {e}") + return "" + + async def set_tethering_password(self, password: str) -> bool: + """Set the tethering password.""" + if len(password) < 8: + cloudlog.error("Tethering password must be at least 8 characters") + return False + + try: + hotspot_path = self.saved_connections.get(self._tethering_ssid) + if not hotspot_path: + print("No hotspot connection found") + return False + + # Update the connection settings with new password + settings = await self._get_connection_settings(hotspot_path) + if '802-11-wireless-security' not in settings: + settings['802-11-wireless-security'] = {} + settings['802-11-wireless-security']['psk'] = Variant('s', password) + + # Apply changes + conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) + await conn_iface.call_update(settings) + + # Check if connection is active and restart if needed + is_active = False + nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) + active_connections = await nm_iface.get_active_connections() + + for conn_path in active_connections: + props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) + conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') + if conn_id_path.value == hotspot_path: + is_active = True + await nm_iface.call_deactivate_connection(conn_path) + break + + if is_active: + await nm_iface.call_activate_connection(hotspot_path, self.device_path, "/") + + print("Tethering password updated successfully") + return True + except DBusError as e: + print(f"Failed to set tethering password: {e}") + return False + except Exception as e: + print(f"Unexpected error setting tethering password: {e}") + return False + + async def is_tethering_active(self) -> bool: + """Check if tethering is active for the specified SSID.""" + try: + hotspot_path = self.saved_connections.get(self._tethering_ssid) + if not hotspot_path: + return False + + nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) + active_connections = await nm_iface.get_active_connections() + + for conn_path in active_connections: + props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) + conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') + + if conn_id_path.value == hotspot_path: + return True + + return False + except Exception: + return False + async def _periodic_scan(self): while self.running: try: