From 33f01084d16301025a58a7eba42083c74998eb6a Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 25 Sep 2025 23:44:12 -0700 Subject: [PATCH] raylib: implement cell settings (#36204) * get vibing * simplify * vibing is bad * simplify * fix that * now update * clean up * last two * cell is UpdateUnsaved so we don't need to disable * we only need actions * we only need actions * sort * stuff * dont deactivate * clean up * clean up * more * ipv4 fwd * warns * fixz * rm * clean up * one return point * format * top --- system/ui/lib/wifi_manager.py | 125 +++++++++++++++++++++++++++++----- system/ui/widgets/network.py | 82 +++++++++++++++++++--- 2 files changed, 180 insertions(+), 27 deletions(-) diff --git a/system/ui/lib/wifi_manager.py b/system/ui/lib/wifi_manager.py index 3ce5c839e8..e24686566d 100644 --- a/system/ui/lib/wifi_manager.py +++ b/system/ui/lib/wifi_manager.py @@ -2,6 +2,7 @@ import atexit import threading import time import uuid +import subprocess from collections.abc import Callable from dataclasses import dataclass from enum import IntEnum @@ -23,7 +24,7 @@ from openpilot.system.ui.lib.networkmanager import (NM, NM_WIRELESS_IFACE, NM_80 NM_802_11_AP_FLAGS_PRIVACY, NM_802_11_AP_FLAGS_WPS, NM_PATH, NM_IFACE, NM_ACCESS_POINT_IFACE, NM_SETTINGS_PATH, NM_SETTINGS_IFACE, NM_CONNECTION_IFACE, NM_DEVICE_IFACE, - NM_DEVICE_TYPE_WIFI, NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT, + NM_DEVICE_TYPE_WIFI, NM_DEVICE_TYPE_MODEM, NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT, NM_DEVICE_STATE_REASON_NEW_ACTIVATION, NM_ACTIVE_CONNECTION_IFACE, NM_IP4_CONFIG_IFACE, NMDeviceState) @@ -142,6 +143,8 @@ class WifiManager: self._ipv4_address: str = "" self._current_network_metered: MeteredType = MeteredType.UNKNOWN self._tethering_password: str = "" + self._ipv4_forward = False + self._last_network_update: float = 0.0 self._callback_queue: list[Callable] = [] @@ -277,25 +280,24 @@ class WifiManager: def _wait_for_wifi_device(self): while not self._exit: - device_path = self._get_wifi_device() + device_path = self._get_adapter(NM_DEVICE_TYPE_WIFI) if device_path is not None: - break - time.sleep(1) - - def _get_wifi_device(self) -> str | None: - if self._wifi_device is not None: - return self._wifi_device - - device_paths = self._router_main.send_and_get_reply(new_method_call(self._nm, 'GetDevices')).body[0] - for device_path in device_paths: - dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_DEVICE_IFACE) - dev_type = self._router_main.send_and_get_reply(Properties(dev_addr).get('DeviceType')).body[0][1] - - if dev_type == NM_DEVICE_TYPE_WIFI: self._wifi_device = device_path break + time.sleep(1) - return self._wifi_device + def _get_adapter(self, adapter_type: int) -> str | None: + # Return the first NetworkManager device path matching adapter_type + try: + device_paths = self._router_main.send_and_get_reply(new_method_call(self._nm, 'GetDevices')).body[0] + for device_path in device_paths: + dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_DEVICE_IFACE) + dev_type = self._router_main.send_and_get_reply(Properties(dev_addr).get('DeviceType')).body[0][1] + if dev_type == adapter_type: + return str(device_path) + except Exception as e: + cloudlog.exception(f"Error getting adapter type {adapter_type}: {e}") + return None def _get_connections(self) -> dict[str, str]: settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE) @@ -500,10 +502,18 @@ class WifiManager: return str(secrets['802-11-wireless-security'].get('psk', ('s', ''))[1]) + def set_ipv4_forward(self, enabled: bool): + self._ipv4_forward = enabled + def set_tethering_active(self, active: bool): def worker(): if active: self.activate_connection(self._tethering_ssid, block=True) + + if not self._ipv4_forward: + time.sleep(5) + cloudlog.warning("net.ipv4.ip_forward = 0") + subprocess.run(["sudo", "sysctl", "net.ipv4.ip_forward=0"], check=False) else: self._deactivate_connection(self._tethering_ssid) @@ -645,6 +655,89 @@ class WifiManager: def __del__(self): self.stop() + def update_gsm_settings(self, roaming: bool, apn: str, metered: bool): + """Update GSM settings for cellular connection""" + + def worker(): + try: + lte_connection_path = self._get_lte_connection_path() + if not lte_connection_path: + cloudlog.warning("No LTE connection found") + return + + settings = self._get_connection_settings(lte_connection_path) + + if len(settings) == 0: + cloudlog.warning(f"Failed to get connection settings for {lte_connection_path}") + return + + # Ensure dicts exist + if 'gsm' not in settings: + settings['gsm'] = {} + if 'connection' not in settings: + settings['connection'] = {} + + changes = False + auto_config = apn == "" + + if settings['gsm'].get('auto-config', ('b', False))[1] != auto_config: + cloudlog.warning(f'Changing gsm.auto-config to {auto_config}') + settings['gsm']['auto-config'] = ('b', auto_config) + changes = True + + if settings['gsm'].get('apn', ('s', ''))[1] != apn: + cloudlog.warning(f'Changing gsm.apn to {apn}') + settings['gsm']['apn'] = ('s', apn) + changes = True + + if settings['gsm'].get('home-only', ('b', False))[1] == roaming: + cloudlog.warning(f'Changing gsm.home-only to {not roaming}') + settings['gsm']['home-only'] = ('b', not roaming) + changes = True + + # Unknown means NetworkManager decides + metered_int = int(MeteredType.UNKNOWN if metered else MeteredType.NO) + if settings['connection'].get('metered', ('i', 0))[1] != metered_int: + cloudlog.warning(f'Changing connection.metered to {metered_int}') + settings['connection']['metered'] = ('i', metered_int) + changes = True + + if changes: + # Update the connection settings (temporary update) + conn_addr = DBusAddress(lte_connection_path, bus_name=NM, interface=NM_CONNECTION_IFACE) + reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'UpdateUnsaved', 'a{sa{sv}}', (settings,))) + + if reply.header.message_type == MessageType.error: + cloudlog.warning(f"Failed to update GSM settings: {reply}") + return + + self._activate_modem_connection(lte_connection_path) + except Exception as e: + cloudlog.exception(f"Error updating GSM settings: {e}") + + threading.Thread(target=worker, daemon=True).start() + + def _get_lte_connection_path(self) -> str | None: + try: + settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE) + known_connections = self._router_main.send_and_get_reply(new_method_call(settings_addr, 'ListConnections')).body[0] + + for conn_path in known_connections: + settings = self._get_connection_settings(conn_path) + if settings and settings.get('connection', {}).get('id', ('s', ''))[1] == 'lte': + return str(conn_path) + except Exception as e: + cloudlog.exception(f"Error finding LTE connection: {e}") + return None + + def _activate_modem_connection(self, connection_path: str): + try: + modem_device = self._get_adapter(NM_DEVICE_TYPE_MODEM) + if modem_device and connection_path: + self._router_main.send_and_get_reply(new_method_call(self._nm, 'ActivateConnection', 'ooo', (connection_path, modem_device, "/"))) + except Exception as e: + cloudlog.exception(f"Error activating modem connection: {e}") + def stop(self): if not self._exit: self._exit = True diff --git a/system/ui/widgets/network.py b/system/ui/widgets/network.py index 7a7215b48e..119901da40 100644 --- a/system/ui/widgets/network.py +++ b/system/ui/widgets/network.py @@ -4,6 +4,7 @@ from typing import cast import pyray as rl from openpilot.common.filter_simple import FirstOrderFilter +from openpilot.common.params import Params from openpilot.system.ui.lib.application import gui_app, DEFAULT_FPS from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel from openpilot.system.ui.lib.wifi_manager import WifiManager, SecurityType, Network, MeteredType @@ -13,7 +14,9 @@ from openpilot.system.ui.widgets.confirm_dialog import ConfirmDialog from openpilot.system.ui.widgets.keyboard import Keyboard from openpilot.system.ui.widgets.label import TextAlignment, gui_label from openpilot.system.ui.widgets.scroller import Scroller -from openpilot.system.ui.widgets.list_view import text_item, button_item, ListItem, ToggleAction, MultipleButtonAction, ButtonAction +from openpilot.system.ui.widgets.list_view import ButtonAction, ListItem, MultipleButtonAction, ToggleAction, button_item, text_item +from openpilot.selfdrive.ui.ui_state import ui_state +from openpilot.selfdrive.ui.lib.prime_state import PrimeType NM_DEVICE_STATE_NEED_AUTH = 60 MIN_PASSWORD_LENGTH = 8 @@ -114,34 +117,54 @@ class AdvancedNetworkSettings(Widget): super().__init__() self._wifi_manager = wifi_manager self._wifi_manager.set_callbacks(networks_updated=self._on_network_updated) + self._params = Params() self._keyboard = Keyboard(max_text_size=MAX_PASSWORD_LENGTH, min_text_size=MIN_PASSWORD_LENGTH, show_password_toggle=True) # Tethering self._tethering_action = ToggleAction(initial_state=False) - self._tethering_btn = ListItem(title="Enable Tethering", action_item=self._tethering_action, callback=self._toggle_tethering) + tethering_btn = ListItem(title="Enable Tethering", action_item=self._tethering_action, callback=self._toggle_tethering) - # Tethering Password + # Edit tethering password self._tethering_password_action = ButtonAction(text="EDIT") - self._tethering_password_btn = ListItem(title="Tethering Password", action_item=self._tethering_password_action, callback=self._edit_tethering_password) + tethering_password_btn = ListItem(title="Tethering Password", action_item=self._tethering_password_action, callback=self._edit_tethering_password) - # TODO: Roaming toggle, edit APN settings, and cellular metered toggle + # Roaming toggle + roaming_enabled = self._params.get_bool("GsmRoaming") + self._roaming_action = ToggleAction(initial_state=roaming_enabled) + self._roaming_btn = ListItem(title="Enable Roaming", action_item=self._roaming_action, callback=self._toggle_roaming) - # Metered + # Cellular metered toggle + cellular_metered = self._params.get_bool("GsmMetered") + self._cellular_metered_action = ToggleAction(initial_state=cellular_metered) + self._cellular_metered_btn = ListItem(title="Cellular Metered", description="Prevent large data uploads when on a metered cellular connection", + action_item=self._cellular_metered_action, callback=self._toggle_cellular_metered) + + # APN setting + self._apn_btn = button_item("APN Setting", "EDIT", callback=self._edit_apn) + + # Wi-Fi metered toggle self._wifi_metered_action = MultipleButtonAction(["default", "metered", "unmetered"], 255, 0, callback=self._toggle_wifi_metered) - self._wifi_metered_btn = ListItem(title="Wi-Fi Network Metered", description="Prevent large data uploads when on a metered Wi-Fi connection", - action_item=self._wifi_metered_action) + wifi_metered_btn = ListItem(title="Wi-Fi Network Metered", description="Prevent large data uploads when on a metered Wi-Fi connection", + action_item=self._wifi_metered_action) items: list[Widget] = [ - self._tethering_btn, - self._tethering_password_btn, + tethering_btn, + tethering_password_btn, text_item("IP Address", lambda: self._wifi_manager.ipv4_address), - self._wifi_metered_btn, + self._roaming_btn, + self._apn_btn, + self._cellular_metered_btn, + wifi_metered_btn, button_item("Hidden Network", "CONNECT", callback=self._connect_to_hidden_network), ] self._scroller = Scroller(items, line_separator=True, spacing=0) + # Set initial config + metered = self._params.get_bool("GsmMetered") + self._wifi_manager.update_gsm_settings(roaming_enabled, self._params.get("GsmApn") or "", metered) + def _on_network_updated(self, networks: list[Network]): self._tethering_action.set_enabled(True) self._tethering_action.set_state(self._wifi_manager.is_tethering_active()) @@ -162,6 +185,35 @@ class AdvancedNetworkSettings(Widget): self._wifi_metered_action.set_enabled(False) self._wifi_manager.set_tethering_active(checked) + def _toggle_roaming(self): + roaming_state = self._roaming_action.state + self._params.put_bool("GsmRoaming", roaming_state) + self._wifi_manager.update_gsm_settings(roaming_state, self._params.get("GsmApn") or "", self._params.get_bool("GsmMetered")) + + def _edit_apn(self): + def update_apn(result): + if result != 1: + return + + apn = self._keyboard.text.strip() + if apn == "": + self._params.remove("GsmApn") + else: + self._params.put("GsmApn", apn) + + self._wifi_manager.update_gsm_settings(self._params.get_bool("GsmRoaming"), apn, self._params.get_bool("GsmMetered")) + + current_apn = self._params.get("GsmApn") or "" + self._keyboard.reset(min_text_size=0) + self._keyboard.set_title("Enter APN", "leave blank for automatic configuration") + self._keyboard.set_text(current_apn) + gui_app.set_modal_overlay(self._keyboard, update_apn) + + def _toggle_cellular_metered(self): + metered = self._cellular_metered_action.state + self._params.put_bool("GsmMetered", metered) + self._wifi_manager.update_gsm_settings(self._params.get_bool("GsmRoaming"), self._params.get("GsmApn") or "", metered) + def _toggle_wifi_metered(self, metered): metered_type = {0: MeteredType.UNKNOWN, 1: MeteredType.YES, 2: MeteredType.NO}.get(metered, MeteredType.UNKNOWN) self._wifi_metered_action.set_enabled(False) @@ -207,6 +259,14 @@ class AdvancedNetworkSettings(Widget): self._keyboard.set_text(self._wifi_manager.tethering_password) gui_app.set_modal_overlay(self._keyboard, update_password) + def _update_state(self): + # If not using prime SIM, show GSM settings and enable IPv4 forwarding + show_cell_settings = ui_state.prime_state.get_type() in (PrimeType.NONE, PrimeType.LITE) + self._wifi_manager.set_ipv4_forward(show_cell_settings) + self._roaming_btn.set_visible(show_cell_settings) + self._apn_btn.set_visible(show_cell_settings) + self._cellular_metered_btn.set_visible(show_cell_settings) + def _render(self, _): self._scroller.render(self._rect)