raylib: implement cell settings (#36204)

* get vibing

* simplify

* vibing is bad

* simplify

* fix that

* now update

* clean up

* last two

* cell is UpdateUnsaved so we don't need to disable

* we only need actions

* we only need actions

* sort

* stuff

* dont deactivate

* clean up

* clean up

* more

* ipv4 fwd

* warns

* fixz

* rm

* clean up

* one return point

* format

* top
pull/36195/merge
Shane Smiskol 2 days ago committed by GitHub
parent 1fbec6f601
commit 33f01084d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 117
      system/ui/lib/wifi_manager.py
  2. 80
      system/ui/widgets/network.py

@ -2,6 +2,7 @@ import atexit
import threading
import time
import uuid
import subprocess
from collections.abc import Callable
from dataclasses import dataclass
from enum import IntEnum
@ -23,7 +24,7 @@ from openpilot.system.ui.lib.networkmanager import (NM, NM_WIRELESS_IFACE, NM_80
NM_802_11_AP_FLAGS_PRIVACY, NM_802_11_AP_FLAGS_WPS,
NM_PATH, NM_IFACE, NM_ACCESS_POINT_IFACE, NM_SETTINGS_PATH,
NM_SETTINGS_IFACE, NM_CONNECTION_IFACE, NM_DEVICE_IFACE,
NM_DEVICE_TYPE_WIFI, NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT,
NM_DEVICE_TYPE_WIFI, NM_DEVICE_TYPE_MODEM, NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT,
NM_DEVICE_STATE_REASON_NEW_ACTIVATION, NM_ACTIVE_CONNECTION_IFACE,
NM_IP4_CONFIG_IFACE, NMDeviceState)
@ -142,6 +143,8 @@ class WifiManager:
self._ipv4_address: str = ""
self._current_network_metered: MeteredType = MeteredType.UNKNOWN
self._tethering_password: str = ""
self._ipv4_forward = False
self._last_network_update: float = 0.0
self._callback_queue: list[Callable] = []
@ -277,25 +280,24 @@ class WifiManager:
def _wait_for_wifi_device(self):
while not self._exit:
device_path = self._get_wifi_device()
device_path = self._get_adapter(NM_DEVICE_TYPE_WIFI)
if device_path is not None:
self._wifi_device = device_path
break
time.sleep(1)
def _get_wifi_device(self) -> str | None:
if self._wifi_device is not None:
return self._wifi_device
def _get_adapter(self, adapter_type: int) -> str | None:
# Return the first NetworkManager device path matching adapter_type
try:
device_paths = self._router_main.send_and_get_reply(new_method_call(self._nm, 'GetDevices')).body[0]
for device_path in device_paths:
dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_DEVICE_IFACE)
dev_type = self._router_main.send_and_get_reply(Properties(dev_addr).get('DeviceType')).body[0][1]
if dev_type == NM_DEVICE_TYPE_WIFI:
self._wifi_device = device_path
break
return self._wifi_device
if dev_type == adapter_type:
return str(device_path)
except Exception as e:
cloudlog.exception(f"Error getting adapter type {adapter_type}: {e}")
return None
def _get_connections(self) -> dict[str, str]:
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
@ -500,10 +502,18 @@ class WifiManager:
return str(secrets['802-11-wireless-security'].get('psk', ('s', ''))[1])
def set_ipv4_forward(self, enabled: bool):
self._ipv4_forward = enabled
def set_tethering_active(self, active: bool):
def worker():
if active:
self.activate_connection(self._tethering_ssid, block=True)
if not self._ipv4_forward:
time.sleep(5)
cloudlog.warning("net.ipv4.ip_forward = 0")
subprocess.run(["sudo", "sysctl", "net.ipv4.ip_forward=0"], check=False)
else:
self._deactivate_connection(self._tethering_ssid)
@ -645,6 +655,89 @@ class WifiManager:
def __del__(self):
self.stop()
def update_gsm_settings(self, roaming: bool, apn: str, metered: bool):
"""Update GSM settings for cellular connection"""
def worker():
try:
lte_connection_path = self._get_lte_connection_path()
if not lte_connection_path:
cloudlog.warning("No LTE connection found")
return
settings = self._get_connection_settings(lte_connection_path)
if len(settings) == 0:
cloudlog.warning(f"Failed to get connection settings for {lte_connection_path}")
return
# Ensure dicts exist
if 'gsm' not in settings:
settings['gsm'] = {}
if 'connection' not in settings:
settings['connection'] = {}
changes = False
auto_config = apn == ""
if settings['gsm'].get('auto-config', ('b', False))[1] != auto_config:
cloudlog.warning(f'Changing gsm.auto-config to {auto_config}')
settings['gsm']['auto-config'] = ('b', auto_config)
changes = True
if settings['gsm'].get('apn', ('s', ''))[1] != apn:
cloudlog.warning(f'Changing gsm.apn to {apn}')
settings['gsm']['apn'] = ('s', apn)
changes = True
if settings['gsm'].get('home-only', ('b', False))[1] == roaming:
cloudlog.warning(f'Changing gsm.home-only to {not roaming}')
settings['gsm']['home-only'] = ('b', not roaming)
changes = True
# Unknown means NetworkManager decides
metered_int = int(MeteredType.UNKNOWN if metered else MeteredType.NO)
if settings['connection'].get('metered', ('i', 0))[1] != metered_int:
cloudlog.warning(f'Changing connection.metered to {metered_int}')
settings['connection']['metered'] = ('i', metered_int)
changes = True
if changes:
# Update the connection settings (temporary update)
conn_addr = DBusAddress(lte_connection_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'UpdateUnsaved', 'a{sa{sv}}', (settings,)))
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to update GSM settings: {reply}")
return
self._activate_modem_connection(lte_connection_path)
except Exception as e:
cloudlog.exception(f"Error updating GSM settings: {e}")
threading.Thread(target=worker, daemon=True).start()
def _get_lte_connection_path(self) -> str | None:
try:
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
known_connections = self._router_main.send_and_get_reply(new_method_call(settings_addr, 'ListConnections')).body[0]
for conn_path in known_connections:
settings = self._get_connection_settings(conn_path)
if settings and settings.get('connection', {}).get('id', ('s', ''))[1] == 'lte':
return str(conn_path)
except Exception as e:
cloudlog.exception(f"Error finding LTE connection: {e}")
return None
def _activate_modem_connection(self, connection_path: str):
try:
modem_device = self._get_adapter(NM_DEVICE_TYPE_MODEM)
if modem_device and connection_path:
self._router_main.send_and_get_reply(new_method_call(self._nm, 'ActivateConnection', 'ooo', (connection_path, modem_device, "/")))
except Exception as e:
cloudlog.exception(f"Error activating modem connection: {e}")
def stop(self):
if not self._exit:
self._exit = True

@ -4,6 +4,7 @@ from typing import cast
import pyray as rl
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.params import Params
from openpilot.system.ui.lib.application import gui_app, DEFAULT_FPS
from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel
from openpilot.system.ui.lib.wifi_manager import WifiManager, SecurityType, Network, MeteredType
@ -13,7 +14,9 @@ from openpilot.system.ui.widgets.confirm_dialog import ConfirmDialog
from openpilot.system.ui.widgets.keyboard import Keyboard
from openpilot.system.ui.widgets.label import TextAlignment, gui_label
from openpilot.system.ui.widgets.scroller import Scroller
from openpilot.system.ui.widgets.list_view import text_item, button_item, ListItem, ToggleAction, MultipleButtonAction, ButtonAction
from openpilot.system.ui.widgets.list_view import ButtonAction, ListItem, MultipleButtonAction, ToggleAction, button_item, text_item
from openpilot.selfdrive.ui.ui_state import ui_state
from openpilot.selfdrive.ui.lib.prime_state import PrimeType
NM_DEVICE_STATE_NEED_AUTH = 60
MIN_PASSWORD_LENGTH = 8
@ -114,34 +117,54 @@ class AdvancedNetworkSettings(Widget):
super().__init__()
self._wifi_manager = wifi_manager
self._wifi_manager.set_callbacks(networks_updated=self._on_network_updated)
self._params = Params()
self._keyboard = Keyboard(max_text_size=MAX_PASSWORD_LENGTH, min_text_size=MIN_PASSWORD_LENGTH, show_password_toggle=True)
# Tethering
self._tethering_action = ToggleAction(initial_state=False)
self._tethering_btn = ListItem(title="Enable Tethering", action_item=self._tethering_action, callback=self._toggle_tethering)
tethering_btn = ListItem(title="Enable Tethering", action_item=self._tethering_action, callback=self._toggle_tethering)
# Tethering Password
# Edit tethering password
self._tethering_password_action = ButtonAction(text="EDIT")
self._tethering_password_btn = ListItem(title="Tethering Password", action_item=self._tethering_password_action, callback=self._edit_tethering_password)
tethering_password_btn = ListItem(title="Tethering Password", action_item=self._tethering_password_action, callback=self._edit_tethering_password)
# TODO: Roaming toggle, edit APN settings, and cellular metered toggle
# Roaming toggle
roaming_enabled = self._params.get_bool("GsmRoaming")
self._roaming_action = ToggleAction(initial_state=roaming_enabled)
self._roaming_btn = ListItem(title="Enable Roaming", action_item=self._roaming_action, callback=self._toggle_roaming)
# Metered
# Cellular metered toggle
cellular_metered = self._params.get_bool("GsmMetered")
self._cellular_metered_action = ToggleAction(initial_state=cellular_metered)
self._cellular_metered_btn = ListItem(title="Cellular Metered", description="Prevent large data uploads when on a metered cellular connection",
action_item=self._cellular_metered_action, callback=self._toggle_cellular_metered)
# APN setting
self._apn_btn = button_item("APN Setting", "EDIT", callback=self._edit_apn)
# Wi-Fi metered toggle
self._wifi_metered_action = MultipleButtonAction(["default", "metered", "unmetered"], 255, 0, callback=self._toggle_wifi_metered)
self._wifi_metered_btn = ListItem(title="Wi-Fi Network Metered", description="Prevent large data uploads when on a metered Wi-Fi connection",
wifi_metered_btn = ListItem(title="Wi-Fi Network Metered", description="Prevent large data uploads when on a metered Wi-Fi connection",
action_item=self._wifi_metered_action)
items: list[Widget] = [
self._tethering_btn,
self._tethering_password_btn,
tethering_btn,
tethering_password_btn,
text_item("IP Address", lambda: self._wifi_manager.ipv4_address),
self._wifi_metered_btn,
self._roaming_btn,
self._apn_btn,
self._cellular_metered_btn,
wifi_metered_btn,
button_item("Hidden Network", "CONNECT", callback=self._connect_to_hidden_network),
]
self._scroller = Scroller(items, line_separator=True, spacing=0)
# Set initial config
metered = self._params.get_bool("GsmMetered")
self._wifi_manager.update_gsm_settings(roaming_enabled, self._params.get("GsmApn") or "", metered)
def _on_network_updated(self, networks: list[Network]):
self._tethering_action.set_enabled(True)
self._tethering_action.set_state(self._wifi_manager.is_tethering_active())
@ -162,6 +185,35 @@ class AdvancedNetworkSettings(Widget):
self._wifi_metered_action.set_enabled(False)
self._wifi_manager.set_tethering_active(checked)
def _toggle_roaming(self):
roaming_state = self._roaming_action.state
self._params.put_bool("GsmRoaming", roaming_state)
self._wifi_manager.update_gsm_settings(roaming_state, self._params.get("GsmApn") or "", self._params.get_bool("GsmMetered"))
def _edit_apn(self):
def update_apn(result):
if result != 1:
return
apn = self._keyboard.text.strip()
if apn == "":
self._params.remove("GsmApn")
else:
self._params.put("GsmApn", apn)
self._wifi_manager.update_gsm_settings(self._params.get_bool("GsmRoaming"), apn, self._params.get_bool("GsmMetered"))
current_apn = self._params.get("GsmApn") or ""
self._keyboard.reset(min_text_size=0)
self._keyboard.set_title("Enter APN", "leave blank for automatic configuration")
self._keyboard.set_text(current_apn)
gui_app.set_modal_overlay(self._keyboard, update_apn)
def _toggle_cellular_metered(self):
metered = self._cellular_metered_action.state
self._params.put_bool("GsmMetered", metered)
self._wifi_manager.update_gsm_settings(self._params.get_bool("GsmRoaming"), self._params.get("GsmApn") or "", metered)
def _toggle_wifi_metered(self, metered):
metered_type = {0: MeteredType.UNKNOWN, 1: MeteredType.YES, 2: MeteredType.NO}.get(metered, MeteredType.UNKNOWN)
self._wifi_metered_action.set_enabled(False)
@ -207,6 +259,14 @@ class AdvancedNetworkSettings(Widget):
self._keyboard.set_text(self._wifi_manager.tethering_password)
gui_app.set_modal_overlay(self._keyboard, update_password)
def _update_state(self):
# If not using prime SIM, show GSM settings and enable IPv4 forwarding
show_cell_settings = ui_state.prime_state.get_type() in (PrimeType.NONE, PrimeType.LITE)
self._wifi_manager.set_ipv4_forward(show_cell_settings)
self._roaming_btn.set_visible(show_cell_settings)
self._apn_btn.set_visible(show_cell_settings)
self._cellular_metered_btn.set_visible(show_cell_settings)
def _render(self, _):
self._scroller.render(self._rect)

Loading…
Cancel
Save