add drag detection

deanlee 2 months ago committed by Cameron Clough
parent f5f14f2994
commit 976b6f3201
  1. 31
      system/ui/lib/scroll_panel.py
  2. 119
      system/ui/lib/wifi_manager.py
  3. 25
      system/ui/widgets/network.py

@ -4,6 +4,7 @@ from enum import IntEnum
MOUSE_WHEEL_SCROLL_SPEED = 30 MOUSE_WHEEL_SCROLL_SPEED = 30
INERTIA_FRICTION = 0.95 # The rate at which the inertia slows down INERTIA_FRICTION = 0.95 # The rate at which the inertia slows down
MIN_VELOCITY = 0.1 # Minimum velocity before stopping the inertia MIN_VELOCITY = 0.1 # Minimum velocity before stopping the inertia
DRAG_THRESHOLD = 5 # Pixels of movement to consider it a drag, not a click
class ScrollState(IntEnum): class ScrollState(IntEnum):
@ -16,39 +17,52 @@ class GuiScrollPanel:
def __init__(self, show_vertical_scroll_bar: bool = False): def __init__(self, show_vertical_scroll_bar: bool = False):
self._scroll_state: ScrollState = ScrollState.IDLE self._scroll_state: ScrollState = ScrollState.IDLE
self._last_mouse_y: float = 0.0 self._last_mouse_y: float = 0.0
self._start_mouse_y: float = 0.0 # Track initial mouse position for drag detection
self._offset = rl.Vector2(0, 0) self._offset = rl.Vector2(0, 0)
self._view = rl.Rectangle(0, 0, 0, 0) self._view = rl.Rectangle(0, 0, 0, 0)
self._show_vertical_scroll_bar: bool = show_vertical_scroll_bar self._show_vertical_scroll_bar: bool = show_vertical_scroll_bar
self._velocity_y = 0.0 # Velocity for inertia self._velocity_y = 0.0 # Velocity for inertia
self._is_dragging = False # Flag to indicate if drag occurred
def handle_scroll(self, bounds: rl.Rectangle, content: rl.Rectangle) -> rl.Vector2: def handle_scroll(self, bounds: rl.Rectangle, content: rl.Rectangle) -> rl.Vector2:
mouse_pos = rl.get_mouse_position() mouse_pos = rl.get_mouse_position()
# Handle dragging logic # Handle dragging logic
if rl.check_collision_point_rec(mouse_pos, bounds) and rl.is_mouse_button_pressed(rl.MouseButton.MOUSE_BUTTON_LEFT): if rl.check_collision_point_rec(mouse_pos, bounds) and rl.is_mouse_button_pressed(
rl.MouseButton.MOUSE_BUTTON_LEFT
):
if self._scroll_state == ScrollState.IDLE: if self._scroll_state == ScrollState.IDLE:
self._scroll_state = ScrollState.DRAGGING_CONTENT self._scroll_state = ScrollState.DRAGGING_CONTENT
if self._show_vertical_scroll_bar: if self._show_vertical_scroll_bar:
scrollbar_width = rl.gui_get_style(rl.GuiControl.LISTVIEW, rl.GuiListViewProperty.SCROLLBAR_WIDTH) scrollbar_width = rl.gui_get_style(
rl.GuiControl.LISTVIEW, rl.GuiListViewProperty.SCROLLBAR_WIDTH
)
scrollbar_x = bounds.x + bounds.width - scrollbar_width scrollbar_x = bounds.x + bounds.width - scrollbar_width
if mouse_pos.x >= scrollbar_x: if mouse_pos.x >= scrollbar_x:
self._scroll_state = ScrollState.DRAGGING_SCROLLBAR self._scroll_state = ScrollState.DRAGGING_SCROLLBAR
self._last_mouse_y = mouse_pos.y self._last_mouse_y = mouse_pos.y
self._start_mouse_y = mouse_pos.y # Record starting position
self._velocity_y = 0.0 # Reset velocity when drag starts self._velocity_y = 0.0 # Reset velocity when drag starts
self._is_dragging = False # Reset dragging flag
if self._scroll_state != ScrollState.IDLE: if self._scroll_state != ScrollState.IDLE:
if rl.is_mouse_button_down(rl.MouseButton.MOUSE_BUTTON_LEFT): if rl.is_mouse_button_down(rl.MouseButton.MOUSE_BUTTON_LEFT):
delta_y = mouse_pos.y - self._last_mouse_y delta_y = mouse_pos.y - self._last_mouse_y
# Check if movement exceeds drag threshold
total_drag = abs(mouse_pos.y - self._start_mouse_y)
if total_drag > DRAG_THRESHOLD:
self._is_dragging = True
if self._scroll_state == ScrollState.DRAGGING_CONTENT: if self._scroll_state == ScrollState.DRAGGING_CONTENT:
self._offset.y += delta_y self._offset.y += delta_y
else: else: # DRAGGING_SCROLLBAR
delta_y = -delta_y delta_y = -delta_y # Invert for scrollbar
self._last_mouse_y = mouse_pos.y self._last_mouse_y = mouse_pos.y
self._velocity_y = delta_y # Update velocity during drag self._velocity_y = delta_y # Update velocity during drag
else: elif rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT):
self._scroll_state = ScrollState.IDLE self._scroll_state = ScrollState.IDLE
# Handle mouse wheel scrolling # Handle mouse wheel scrolling
@ -73,3 +87,10 @@ class GuiScrollPanel:
self._offset.y = max(min(self._offset.y, 0), -max_scroll_y) self._offset.y = max(min(self._offset.y, 0), -max_scroll_y)
return self._offset return self._offset
def is_click_valid(self) -> bool:
return (
self._scroll_state == ScrollState.IDLE
and not self._is_dragging
and rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT)
)

@ -33,7 +33,8 @@ class SecurityType(IntEnum):
OPEN = 0 OPEN = 0
WPA = 1 WPA = 1
WPA2 = 2 WPA2 = 2
UNSUPPORTED = 3 WPA3 = 3
UNSUPPORTED = 4
@dataclass @dataclass
@ -53,13 +54,13 @@ class WifiManager:
self.bus: MessageBus | None = None self.bus: MessageBus | None = None
self.device_path: str | None = None self.device_path: str | None = None
self.device_proxy = None self.device_proxy = None
self.saved_connections: dict[str, str] = dict() self.saved_connections: dict[str, str] = {}
self.active_ap_path: str = '' self.active_ap_path: str = ''
self.scan_task: asyncio.Task | None = None self.scan_task: asyncio.Task | None = None
self.running: bool = True self.running: bool = True
self.need_auth_callback = None self.need_auth_callback = None
async def connect(self): async def connect(self) -> None:
"""Connect to the DBus system bus.""" """Connect to the DBus system bus."""
try: try:
self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect() self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
@ -81,27 +82,30 @@ class WifiManager:
self.running = False self.running = False
if self.scan_task: if self.scan_task:
self.scan_task.cancel() self.scan_task.cancel()
await self.scan_task try:
await self.scan_task
except asyncio.CancelledError:
pass
if self.bus: if self.bus:
await self.bus.disconnect() await self.bus.disconnect()
async def request_scan(self): async def request_scan(self) -> None:
try: try:
interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE)
await interface.call_request_scan({}) await interface.call_request_scan({})
except DBusError as e: except DBusError as e:
cloudlog.warning(f"Scan request failed: {e}") cloudlog.warning(f"Scan request failed: {str(e)}")
async def get_active_access_point(self): async def get_active_access_point(self) -> str:
try: try:
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE)
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint')
return ap_path.value return ap_path.value
except DBusError as e: except DBusError as e:
cloudlog.error(f"Error fetching active access point: {e}") cloudlog.error(f"Error fetching active access point: {str(e)}")
return '' return ''
async def forgot_connection(self, ssid: str) -> bool: async def forget_connection(self, ssid: str) -> bool:
path = self.saved_connections.get(ssid) path = self.saved_connections.get(ssid)
if not path: if not path:
return False return False
@ -113,17 +117,22 @@ class WifiManager:
except DBusError as e: except DBusError as e:
cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}") cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}")
return False return False
except Exception as e:
cloudlog.error(f"Unexpected error while deleting connection for SSID: {ssid}: {e}")
return False
async def activate_connection(self, ssid: str) -> None: async def activate_connection(self, ssid: str) -> bool:
connection_path = self.saved_connections.get(ssid) connection_path = self.saved_connections.get(ssid)
if connection_path: if not connection_path:
return False
try:
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
await nm_iface.call_activate_connection(connection_path, self.device_path, '/') await nm_iface.call_activate_connection(connection_path, self.device_path, '/')
return True
except DBusError as e:
cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}")
return False
async def connect_to_network(self, ssid: str, password: str = None, is_hidden: bool = False): async def connect_to_network(
self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False
) -> None:
"""Connect to a selected WiFi network.""" """Connect to a selected WiFi network."""
try: try:
# settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) # settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
@ -143,8 +152,8 @@ class WifiManager:
'ipv6': {'method': Variant('s', 'ignore')}, 'ipv6': {'method': Variant('s', 'ignore')},
} }
# if bssid: if bssid:
# connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8'))
if password: if password:
connection['802-11-wireless-security'] = { connection['802-11-wireless-security'] = {
@ -205,9 +214,7 @@ class WifiManager:
await self._add_match_rule(rule) await self._add_match_rule(rule)
# Set up signal handlers # Set up signal handlers
self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed( self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed)
self._on_properties_changed
)
self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed) self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed)
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
@ -240,7 +247,7 @@ class WifiManager:
def _on_connection_removed(self, path: str) -> None: def _on_connection_removed(self, path: str) -> None:
"""Callback for ConnectionRemoved signal.""" """Callback for ConnectionRemoved signal."""
print(f"Connection removed: {path}") print(f"Connection removed: {path}")
for ssid, p in self.saved_connections.items(): for ssid, p in list(self.saved_connections.items()):
if path == p: if path == p:
del self.saved_connections[ssid] del self.saved_connections[ssid]
break break
@ -326,51 +333,51 @@ class WifiManager:
async def _get_connection_settings(self, path): async def _get_connection_settings(self, path):
"""Fetch connection settings for a specific connection path.""" """Fetch connection settings for a specific connection path."""
connection_proxy = await self.bus.introspect(NM, path) try:
connection = self.bus.get_proxy_object(NM, path, connection_proxy) connection_proxy = await self.bus.introspect(NM, path)
settings = connection.get_interface(NM_CONNECTION_IFACE) connection = self.bus.get_proxy_object(NM, path, connection_proxy)
return await settings.call_get_settings() settings = connection.get_interface(NM_CONNECTION_IFACE)
return await settings.call_get_settings()
except DBusError as e:
cloudlog.error(f"Failed to get settings for {path}: {str(e)}")
return {}
async def _process_chunk(self, paths_chunk): async def _process_chunk(self, paths_chunk):
"""Process a chunk of connection paths.""" """Process a chunk of connection paths."""
tasks = [self._get_connection_settings(path) for path in paths_chunk] tasks = [self._get_connection_settings(path) for path in paths_chunk]
results = await asyncio.gather(*tasks) return await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _get_saved_connections(self): async def _get_saved_connections(self) -> dict[str, str]:
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) try:
connection_paths = await settings_iface.call_list_connections() settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
connection_paths = await settings_iface.call_list_connections()
saved_ssids: dict[str, str] = {} saved_ssids: dict[str, str] = {}
batch_size = 120 batch_size = 20
for i in range(0, len(connection_paths), batch_size): for i in range(0, len(connection_paths), batch_size):
chunk = connection_paths[i : i + batch_size] chunk = connection_paths[i : i + batch_size]
results = await self._process_chunk(chunk) results = await self._process_chunk(chunk)
for path, config in zip(chunk, results, strict=True):
# Loop through the results and filter Wi-Fi connections if isinstance(config, dict) and '802-11-wireless' in config:
for path, config in zip(chunk, results, strict=True): if ssid := self._extract_ssid(config):
if '802-11-wireless' in config: saved_ssids[ssid] = path
saved_ssids[self._extract_ssid(config)] = path return saved_ssids
except DBusError as e:
return saved_ssids cloudlog.error(f"Error fetching saved connections: {str(e)}")
return {}
async def _get_interface(self, bus_name: str, path: str, name: str): async def _get_interface(self, bus_name: str, path: str, name: str):
introspection = await self.bus.introspect(bus_name, path) introspection = await self.bus.introspect(bus_name, path)
proxy = self.bus.get_proxy_object(bus_name, path, introspection) proxy = self.bus.get_proxy_object(bus_name, path, introspection)
return proxy.get_interface(name) return proxy.get_interface(name)
def _get_security_type(self, flags, wpa_flags, rsn_flags): def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType:
"""Helper function to determine the security type of a network.""" """Determine the security type based on flags."""
if flags == 0: if flags == 0 and not (wpa_flags or rsn_flags):
return SecurityType.OPEN return SecurityType.OPEN
if wpa_flags: if rsn_flags & 0x200: # SAE (WPA3 Personal)
return SecurityType.WPA return SecurityType.WPA3
if rsn_flags: if rsn_flags: # RSN indicates WPA2 or higher
return SecurityType.WPA2 return SecurityType.WPA2
else: if wpa_flags: # WPA flags indicate WPA
return SecurityType.UNSUPPORTED return SecurityType.WPA
return SecurityType.UNSUPPORTED
async def _get_interface(self, bus_name: str, path: str, name: str):
introspection = await self.bus.introspect(bus_name, path)
proxy = self.bus.get_proxy_object(bus_name, path, introspection)
return proxy.get_interface(name)

@ -69,12 +69,11 @@ class WifiManagerUI:
self._draw_network_list(rect) self._draw_network_list(rect)
def _draw_network_list(self, rect: rl.Rectangle): def _draw_network_list(self, rect: rl.Rectangle):
content_rect = rl.Rectangle( content_rect = rl.Rectangle(rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * self.item_height)
rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * self.item_height
)
offset = self.scroll_panel.handle_scroll(rect, content_rect) offset = self.scroll_panel.handle_scroll(rect, content_rect)
clicked = self.scroll_panel.is_click_valid()
rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height)) rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height))
clicked = offset.y < 10 and rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT)
for i, network in enumerate(self.wifi_manager.networks): for i, network in enumerate(self.wifi_manager.networks):
y_offset = i * self.item_height + offset.y y_offset = i * self.item_height + offset.y
item_rect = rl.Rectangle(rect.x, y_offset, rect.width, self.item_height) item_rect = rl.Rectangle(rect.x, y_offset, rect.width, self.item_height)
@ -83,27 +82,19 @@ class WifiManagerUI:
self._draw_network_item(item_rect, network, clicked) self._draw_network_item(item_rect, network, clicked)
if i < len(self.wifi_manager.networks) - 1: if i < len(self.wifi_manager.networks) - 1:
line_y = int(item_rect.y + item_rect.height - 1) line_y = int(item_rect.y + item_rect.height - 1)
rl.draw_line( rl.draw_line(int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY)
int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY
)
rl.end_scissor_mode() rl.end_scissor_mode()
def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool): def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool):
label_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, self.item_height) label_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, self.item_height)
state_rect = rl.Rectangle( state_rect = rl.Rectangle(rect.x + rect.width - self.btn_width * 2 - 150, rect.y, 300, self.item_height)
rect.x + rect.width - self.btn_width * 2 - 150, rect.y, 300, self.item_height
)
gui_label(label_rect, network.ssid, 55) gui_label(label_rect, network.ssid, 55)
if network.is_connected and self.current_action == ActionState.NONE: if network.is_connected and self.current_action == ActionState.NONE:
rl.gui_label(state_rect, "Connected") rl.gui_label(state_rect, "Connected")
elif ( elif self.current_action == "Connecting" and self._selected_network and self._selected_network.ssid == network.ssid:
self.current_action == "Connecting"
and self._selected_network
and self._selected_network.ssid == network.ssid
):
rl.gui_label(state_rect, "CONNECTING...") rl.gui_label(state_rect, "CONNECTING...")
# If the network is saved, show the "Forget" button # If the network is saved, show the "Forget" button
@ -114,7 +105,7 @@ class WifiManagerUI:
self.btn_width, self.btn_width,
80, 80,
) )
if gui_button(forget_btn_rect, "Forget") and self.current_action == ActionState.NONE: if gui_button(forget_btn_rect, "Forget") and self.current_action == ActionState.NONE and clicked:
self._selected_network = network self._selected_network = network
self.current_action = ActionState.SHOW_FORGOT_CONFIRM self.current_action = ActionState.SHOW_FORGOT_CONFIRM
@ -131,7 +122,7 @@ class WifiManagerUI:
async def forgot_network(self): async def forgot_network(self):
self.current_action = ActionState.FORGETTING self.current_action = ActionState.FORGETTING
await self.wifi_manager.forgot_connection(self._selected_network.ssid) await self.wifi_manager.forget_connection(self._selected_network.ssid)
self.current_action = ActionState.NONE self.current_action = ActionState.NONE
async def connect_to_network(self, password=''): async def connect_to_network(self, password=''):

Loading…
Cancel
Save