diff --git a/system/ui/lib/scroll_panel.py b/system/ui/lib/scroll_panel.py index 0dc757ff6f..bd54f739ab 100644 --- a/system/ui/lib/scroll_panel.py +++ b/system/ui/lib/scroll_panel.py @@ -4,6 +4,7 @@ from enum import IntEnum MOUSE_WHEEL_SCROLL_SPEED = 30 INERTIA_FRICTION = 0.95 # The rate at which the inertia slows down MIN_VELOCITY = 0.1 # Minimum velocity before stopping the inertia +DRAG_THRESHOLD = 5 # Pixels of movement to consider it a drag, not a click class ScrollState(IntEnum): @@ -16,39 +17,52 @@ class GuiScrollPanel: def __init__(self, show_vertical_scroll_bar: bool = False): self._scroll_state: ScrollState = ScrollState.IDLE self._last_mouse_y: float = 0.0 + self._start_mouse_y: float = 0.0 # Track initial mouse position for drag detection self._offset = rl.Vector2(0, 0) self._view = rl.Rectangle(0, 0, 0, 0) self._show_vertical_scroll_bar: bool = show_vertical_scroll_bar self._velocity_y = 0.0 # Velocity for inertia + self._is_dragging = False # Flag to indicate if drag occurred def handle_scroll(self, bounds: rl.Rectangle, content: rl.Rectangle) -> rl.Vector2: mouse_pos = rl.get_mouse_position() # Handle dragging logic - if rl.check_collision_point_rec(mouse_pos, bounds) and rl.is_mouse_button_pressed(rl.MouseButton.MOUSE_BUTTON_LEFT): + if rl.check_collision_point_rec(mouse_pos, bounds) and rl.is_mouse_button_pressed( + rl.MouseButton.MOUSE_BUTTON_LEFT + ): if self._scroll_state == ScrollState.IDLE: self._scroll_state = ScrollState.DRAGGING_CONTENT if self._show_vertical_scroll_bar: - scrollbar_width = rl.gui_get_style(rl.GuiControl.LISTVIEW, rl.GuiListViewProperty.SCROLLBAR_WIDTH) + scrollbar_width = rl.gui_get_style( + rl.GuiControl.LISTVIEW, rl.GuiListViewProperty.SCROLLBAR_WIDTH + ) scrollbar_x = bounds.x + bounds.width - scrollbar_width if mouse_pos.x >= scrollbar_x: self._scroll_state = ScrollState.DRAGGING_SCROLLBAR self._last_mouse_y = mouse_pos.y + self._start_mouse_y = mouse_pos.y # Record starting position self._velocity_y = 0.0 # Reset velocity when drag starts + self._is_dragging = False # Reset dragging flag if self._scroll_state != ScrollState.IDLE: if rl.is_mouse_button_down(rl.MouseButton.MOUSE_BUTTON_LEFT): delta_y = mouse_pos.y - self._last_mouse_y + # Check if movement exceeds drag threshold + total_drag = abs(mouse_pos.y - self._start_mouse_y) + if total_drag > DRAG_THRESHOLD: + self._is_dragging = True + if self._scroll_state == ScrollState.DRAGGING_CONTENT: self._offset.y += delta_y - else: - delta_y = -delta_y + else: # DRAGGING_SCROLLBAR + delta_y = -delta_y # Invert for scrollbar self._last_mouse_y = mouse_pos.y self._velocity_y = delta_y # Update velocity during drag - else: + elif rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT): self._scroll_state = ScrollState.IDLE # Handle mouse wheel scrolling @@ -73,3 +87,10 @@ class GuiScrollPanel: self._offset.y = max(min(self._offset.y, 0), -max_scroll_y) return self._offset + + def is_click_valid(self) -> bool: + return ( + self._scroll_state == ScrollState.IDLE + and not self._is_dragging + and rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT) + ) diff --git a/system/ui/lib/wifi_manager.py b/system/ui/lib/wifi_manager.py index 618e590103..b85812eb98 100644 --- a/system/ui/lib/wifi_manager.py +++ b/system/ui/lib/wifi_manager.py @@ -33,7 +33,8 @@ class SecurityType(IntEnum): OPEN = 0 WPA = 1 WPA2 = 2 - UNSUPPORTED = 3 + WPA3 = 3 + UNSUPPORTED = 4 @dataclass @@ -53,13 +54,13 @@ class WifiManager: self.bus: MessageBus | None = None self.device_path: str | None = None self.device_proxy = None - self.saved_connections: dict[str, str] = dict() + self.saved_connections: dict[str, str] = {} self.active_ap_path: str = '' self.scan_task: asyncio.Task | None = None self.running: bool = True self.need_auth_callback = None - async def connect(self): + async def connect(self) -> None: """Connect to the DBus system bus.""" try: self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect() @@ -81,27 +82,30 @@ class WifiManager: self.running = False if self.scan_task: self.scan_task.cancel() - await self.scan_task + try: + await self.scan_task + except asyncio.CancelledError: + pass if self.bus: await self.bus.disconnect() - async def request_scan(self): + async def request_scan(self) -> None: try: interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) await interface.call_request_scan({}) except DBusError as e: - cloudlog.warning(f"Scan request failed: {e}") + cloudlog.warning(f"Scan request failed: {str(e)}") - async def get_active_access_point(self): + async def get_active_access_point(self) -> str: try: props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') return ap_path.value except DBusError as e: - cloudlog.error(f"Error fetching active access point: {e}") + cloudlog.error(f"Error fetching active access point: {str(e)}") return '' - async def forgot_connection(self, ssid: str) -> bool: + async def forget_connection(self, ssid: str) -> bool: path = self.saved_connections.get(ssid) if not path: return False @@ -113,17 +117,22 @@ class WifiManager: except DBusError as e: cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}") return False - except Exception as e: - cloudlog.error(f"Unexpected error while deleting connection for SSID: {ssid}: {e}") - return False - async def activate_connection(self, ssid: str) -> None: + async def activate_connection(self, ssid: str) -> bool: connection_path = self.saved_connections.get(ssid) - if connection_path: + if not connection_path: + return False + try: nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) await nm_iface.call_activate_connection(connection_path, self.device_path, '/') + return True + except DBusError as e: + cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}") + return False - async def connect_to_network(self, ssid: str, password: str = None, is_hidden: bool = False): + async def connect_to_network( + self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False + ) -> None: """Connect to a selected WiFi network.""" try: # settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) @@ -143,8 +152,8 @@ class WifiManager: 'ipv6': {'method': Variant('s', 'ignore')}, } - # if bssid: - # connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) + if bssid: + connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) if password: connection['802-11-wireless-security'] = { @@ -205,9 +214,7 @@ class WifiManager: await self._add_match_rule(rule) # Set up signal handlers - self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed( - self._on_properties_changed - ) + self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed) self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed) settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) @@ -240,7 +247,7 @@ class WifiManager: def _on_connection_removed(self, path: str) -> None: """Callback for ConnectionRemoved signal.""" print(f"Connection removed: {path}") - for ssid, p in self.saved_connections.items(): + for ssid, p in list(self.saved_connections.items()): if path == p: del self.saved_connections[ssid] break @@ -326,51 +333,51 @@ class WifiManager: async def _get_connection_settings(self, path): """Fetch connection settings for a specific connection path.""" - connection_proxy = await self.bus.introspect(NM, path) - connection = self.bus.get_proxy_object(NM, path, connection_proxy) - settings = connection.get_interface(NM_CONNECTION_IFACE) - return await settings.call_get_settings() + try: + connection_proxy = await self.bus.introspect(NM, path) + connection = self.bus.get_proxy_object(NM, path, connection_proxy) + settings = connection.get_interface(NM_CONNECTION_IFACE) + return await settings.call_get_settings() + except DBusError as e: + cloudlog.error(f"Failed to get settings for {path}: {str(e)}") + return {} async def _process_chunk(self, paths_chunk): """Process a chunk of connection paths.""" tasks = [self._get_connection_settings(path) for path in paths_chunk] - results = await asyncio.gather(*tasks) - return results + return await asyncio.gather(*tasks, return_exceptions=True) - async def _get_saved_connections(self): - settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) - connection_paths = await settings_iface.call_list_connections() - - saved_ssids: dict[str, str] = {} - batch_size = 120 - for i in range(0, len(connection_paths), batch_size): - chunk = connection_paths[i : i + batch_size] - results = await self._process_chunk(chunk) - - # Loop through the results and filter Wi-Fi connections - for path, config in zip(chunk, results, strict=True): - if '802-11-wireless' in config: - saved_ssids[self._extract_ssid(config)] = path - - return saved_ssids + async def _get_saved_connections(self) -> dict[str, str]: + try: + settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) + connection_paths = await settings_iface.call_list_connections() + saved_ssids: dict[str, str] = {} + batch_size = 20 + for i in range(0, len(connection_paths), batch_size): + chunk = connection_paths[i : i + batch_size] + results = await self._process_chunk(chunk) + for path, config in zip(chunk, results, strict=True): + if isinstance(config, dict) and '802-11-wireless' in config: + if ssid := self._extract_ssid(config): + saved_ssids[ssid] = path + return saved_ssids + except DBusError as e: + cloudlog.error(f"Error fetching saved connections: {str(e)}") + return {} async def _get_interface(self, bus_name: str, path: str, name: str): introspection = await self.bus.introspect(bus_name, path) proxy = self.bus.get_proxy_object(bus_name, path, introspection) return proxy.get_interface(name) - def _get_security_type(self, flags, wpa_flags, rsn_flags): - """Helper function to determine the security type of a network.""" - if flags == 0: + def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: + """Determine the security type based on flags.""" + if flags == 0 and not (wpa_flags or rsn_flags): return SecurityType.OPEN - if wpa_flags: - return SecurityType.WPA - if rsn_flags: + if rsn_flags & 0x200: # SAE (WPA3 Personal) + return SecurityType.WPA3 + if rsn_flags: # RSN indicates WPA2 or higher return SecurityType.WPA2 - else: - return SecurityType.UNSUPPORTED - - async def _get_interface(self, bus_name: str, path: str, name: str): - introspection = await self.bus.introspect(bus_name, path) - proxy = self.bus.get_proxy_object(bus_name, path, introspection) - return proxy.get_interface(name) + if wpa_flags: # WPA flags indicate WPA + return SecurityType.WPA + return SecurityType.UNSUPPORTED diff --git a/system/ui/widgets/network.py b/system/ui/widgets/network.py index 2aae1830b3..1aab7dceb6 100644 --- a/system/ui/widgets/network.py +++ b/system/ui/widgets/network.py @@ -69,12 +69,11 @@ class WifiManagerUI: self._draw_network_list(rect) def _draw_network_list(self, rect: rl.Rectangle): - content_rect = rl.Rectangle( - rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * self.item_height - ) + content_rect = rl.Rectangle(rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * self.item_height) offset = self.scroll_panel.handle_scroll(rect, content_rect) + clicked = self.scroll_panel.is_click_valid() + rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height)) - clicked = offset.y < 10 and rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT) for i, network in enumerate(self.wifi_manager.networks): y_offset = i * self.item_height + offset.y item_rect = rl.Rectangle(rect.x, y_offset, rect.width, self.item_height) @@ -83,27 +82,19 @@ class WifiManagerUI: self._draw_network_item(item_rect, network, clicked) if i < len(self.wifi_manager.networks) - 1: line_y = int(item_rect.y + item_rect.height - 1) - rl.draw_line( - int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY - ) + rl.draw_line(int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY) rl.end_scissor_mode() def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool): label_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, self.item_height) - state_rect = rl.Rectangle( - rect.x + rect.width - self.btn_width * 2 - 150, rect.y, 300, self.item_height - ) + state_rect = rl.Rectangle(rect.x + rect.width - self.btn_width * 2 - 150, rect.y, 300, self.item_height) gui_label(label_rect, network.ssid, 55) if network.is_connected and self.current_action == ActionState.NONE: rl.gui_label(state_rect, "Connected") - elif ( - self.current_action == "Connecting" - and self._selected_network - and self._selected_network.ssid == network.ssid - ): + elif self.current_action == "Connecting" and self._selected_network and self._selected_network.ssid == network.ssid: rl.gui_label(state_rect, "CONNECTING...") # If the network is saved, show the "Forget" button @@ -114,7 +105,7 @@ class WifiManagerUI: self.btn_width, 80, ) - if gui_button(forget_btn_rect, "Forget") and self.current_action == ActionState.NONE: + if gui_button(forget_btn_rect, "Forget") and self.current_action == ActionState.NONE and clicked: self._selected_network = network self.current_action = ActionState.SHOW_FORGOT_CONFIRM @@ -131,7 +122,7 @@ class WifiManagerUI: async def forgot_network(self): self.current_action = ActionState.FORGETTING - await self.wifi_manager.forgot_connection(self._selected_network.ssid) + await self.wifi_manager.forget_connection(self._selected_network.ssid) self.current_action = ActionState.NONE async def connect_to_network(self, password=''):