From bd3f4ca7f8dec0604e7aa042a7d6d3b0548f824c Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Thu, 25 Sep 2025 16:55:19 -0700 Subject: [PATCH] add metered options --- system/ui/lib/wifi_manager.py | 69 ++++++++++++++++++++++++++++++++++- system/ui/widgets/network.py | 17 ++++++++- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/system/ui/lib/wifi_manager.py b/system/ui/lib/wifi_manager.py index baa90c28a8..1324b70a06 100644 --- a/system/ui/lib/wifi_manager.py +++ b/system/ui/lib/wifi_manager.py @@ -41,6 +41,12 @@ class SecurityType(IntEnum): UNSUPPORTED = 4 +class MeteredType(IntEnum): + UNKNOWN = 0 + YES = 1 + NO = 2 + + def get_security_type(flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: wpa_props = wpa_flags | rsn_flags @@ -115,7 +121,7 @@ class AccessPoint: class WifiManager: def __init__(self): - self._networks = [] # a network can be comprised of multiple APs + self._networks: list[Network] = [] # a network can be comprised of multiple APs self._active = True # used to not run when not in settings self._exit = False @@ -134,6 +140,7 @@ class WifiManager: # State self._connecting_to_ssid: str = "" self._ipv4_address: str = "" + self._current_network_metered: MeteredType = MeteredType.UNKNOWN self._tethering_password: str = "" self._last_network_update: float = 0.0 self._callback_queue: list[Callable] = [] @@ -180,6 +187,10 @@ class WifiManager: def ipv4_address(self) -> str: return self._ipv4_address + @property + def current_network_metered(self) -> MeteredType: + return self._current_network_metered + @property def tethering_password(self) -> str: return self._tethering_password @@ -472,6 +483,62 @@ class WifiManager: threading.Thread(target=worker, daemon=True).start() + def _update_current_network_metered(self) -> None: + if self._wifi_device is None: + cloudlog.warning("No WiFi device found") + return + + self._current_network_metered = MeteredType.UNKNOWN + for active_conn in self._get_active_connections(): + conn_addr = DBusAddress(active_conn, bus_name=NM, interface=NM_ACTIVE_CONNECTION_IFACE) + conn_type = self._router_main.send_and_get_reply(Properties(conn_addr).get('Type')).body[0][1] + + if conn_type == '802-11-wireless': + conn_path = self._router_main.send_and_get_reply(Properties(conn_addr).get('Connection')).body[0][1] + if conn_path == "/": + continue + + settings = self._get_connection_settings(conn_path) + + if len(settings) == 0: + cloudlog.warning(f'Failed to get connection settings for {conn_path}') + continue + + metered_prop = settings['connection'].get('metered', ('i', 0))[1] + if metered_prop == MeteredType.YES: + self._current_network_metered = MeteredType.YES + elif metered_prop == MeteredType.NO: + self._current_network_metered = MeteredType.NO + print('current_network_metered', self._current_network_metered) + return + + def set_current_network_metered(self, metered: MeteredType): + def worker(): + for active_conn in self._get_active_connections(): + conn_addr = DBusAddress(active_conn, bus_name=NM, interface=NM_ACTIVE_CONNECTION_IFACE) + conn_type = self._router_main.send_and_get_reply(Properties(conn_addr).get('Type')).body[0][1] + + if conn_type == '802-11-wireless' and not self.is_tethering_active(): + conn_path = self._router_main.send_and_get_reply(Properties(conn_addr).get('Connection')).body[0][1] + if conn_path == "/": + continue + + settings = self._get_connection_settings(conn_path) + + if len(settings) == 0: + cloudlog.warning(f'Failed to get connection settings for {conn_path}') + return + + settings['connection']['metered'] = ('i', int(metered)) + + conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE) + reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'Update', 'a{sa{sv}}', (settings,))) + if reply.header.message_type == MessageType.error: + cloudlog.warning(f'Failed to update tethering settings: {reply}') + return + + threading.Thread(target=worker, daemon=True).start() + def _request_scan(self): if self._wifi_device is None: cloudlog.warning("No WiFi device found") diff --git a/system/ui/widgets/network.py b/system/ui/widgets/network.py index 19fb267cc0..dbd72507c6 100644 --- a/system/ui/widgets/network.py +++ b/system/ui/widgets/network.py @@ -3,7 +3,8 @@ from functools import partial from typing import cast import pyray as rl -from openpilot.system.ui.lib.application import gui_app +from openpilot.common.filter_simple import FirstOrderFilter +from openpilot.system.ui.lib.application import gui_app, DEFAULT_FPS from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel from openpilot.system.ui.lib.wifi_manager import WifiManager, SecurityType, Network from openpilot.system.ui.widgets import Widget @@ -46,6 +47,16 @@ class NavButton(Widget): super().__init__() self.text = text self.set_rect(rl.Rectangle(0, 0, 400, 100)) + self._x_pos_filter = FirstOrderFilter(0.0, 0.05, 1 / DEFAULT_FPS, initialized=False) + self._y_pos_filter = FirstOrderFilter(0.0, 0.05, 1 / DEFAULT_FPS, initialized=False) + + def set_position(self, x: float, y: float) -> None: + x = self._x_pos_filter.update(x) + y = self._y_pos_filter.update(y) + changed = (self._rect.x != x or self._rect.y != y) + self._rect.x, self._rect.y = x, y + if changed: + self._update_layout_rects() def _render(self, _): color = rl.Color(74, 74, 74, 255) if self.is_pressed else rl.Color(57, 57, 57, 255) @@ -117,9 +128,13 @@ class AdvancedNetworkSettings(Widget): # # tethering = toggle_item("Enable Tethering", initial_state=wifi_manager # tethering = toggle_item("Enable Tethering", callback=self._wifi_manager)#, initial_state=wifi_manager + # Tethering self._tethering_action = ToggleAction(initial_state=False, enabled=True) self._tethering_btn = ListItem(title="Enable Tethering", action_item=self._tethering_action, callback=self._tethering_toggled) + # Metered + + items = [ self._tethering_btn, button_item("Tethering Password", "EDIT", callback=self._edit_tethering_password),