Cameron Clough 3 days ago
parent 6b0ba3dee8
commit 44332eb186
  1. 126
      system/ui/lib/wifi_manager.py
  2. 165
      system/ui/widgets/network.py

@ -1,11 +1,15 @@
import asyncio
import threading
import time
import uuid
from collections.abc import Callable
from dataclasses import dataclass
from enum import IntEnum
from dbus_next.aio import MessageBus
from dbus_next import BusType, Variant, Message
from dbus_next.errors import DBusError
from dbus_next.constants import MessageType
from enum import IntEnum
import uuid
from dataclasses import dataclass
from openpilot.common.swaglog import cloudlog
# NetworkManager constants
@ -21,6 +25,7 @@ NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device"
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8
# NetworkManager device states
class NMDeviceState(IntEnum):
DISCONNECTED = 30
@ -49,8 +54,16 @@ class NetworkInfo:
# saved_path: str
@dataclass
class WifiManagerCallbacks:
need_auth: Callable[[], None] | None = None
activated: Callable[[], None] | None = None
forgotten: Callable[[], None] | None = None
class WifiManager:
def __init__(self):
def __init__(self, callbacks: WifiManagerCallbacks):
self.callbacks = callbacks
self.networks: list[NetworkInfo] = []
self.bus: MessageBus = None
self.device_path: str = ''
@ -59,8 +72,6 @@ class WifiManager:
self.active_ap_path: str = ''
self.scan_task: asyncio.Task | None = None
self.running: bool = True
self.need_auth_callback = None
self.activated_callback = None
async def connect(self) -> None:
"""Connect to the DBus system bus."""
@ -132,9 +143,7 @@ class WifiManager:
cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}")
return False
async def connect_to_network(
self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False
) -> None:
async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None:
"""Connect to a selected Wi-Fi network."""
try:
# settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
@ -234,18 +243,14 @@ class WifiManager:
def _on_state_changed(self, new_state: int, old_state: int, reason: int):
print(f"State changed: {old_state} -> {new_state}, reason: {reason}")
if new_state == NMDeviceState.ACTIVATED:
if self.activated_callback:
self.activated_callback()
if self.callbacks.activated:
self.callbacks.activated()
asyncio.create_task(self._update_connection_status())
elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH):
for network in self.networks:
network.is_connected = False
if (
new_state == NMDeviceState.NEED_AUTH
and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT
and self.need_auth_callback
):
self.need_auth_callback()
if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth:
self.callbacks.need_auth()
def _on_new_connection(self, path: str) -> None:
"""Callback for NewConnection signal."""
@ -389,3 +394,90 @@ class WifiManager:
if wpa_flags: # WPA flags indicate WPA
return SecurityType.WPA
return SecurityType.UNSUPPORTED
class WifiManagerWrapper:
def __init__(self):
self._manager: WifiManager | None = None
self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks()
self._loop = None
self._running = False
self._lock = threading.RLock()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
while self._thread is not None and not self._running:
time.sleep(0.1)
@property
def callbacks(self) -> WifiManagerCallbacks:
return self._callbacks
@callbacks.setter
def callbacks(self, callbacks: WifiManagerCallbacks):
with self._lock:
self._callbacks = callbacks
def _run(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._manager = WifiManager(self._callbacks)
self._running = True
self._loop.run_forever()
except Exception as e:
cloudlog.error(f"Error in WifiManagerWrapper thread: {e}")
finally:
if self._loop.is_running():
self._loop.stop()
self._running = False
def shutdown(self):
if self._running:
self._run_coroutine(self._manager.shutdown())
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
self._running = False
@property
def networks(self) -> list[NetworkInfo]:
"""Get the current list of networks (thread-safe)."""
with self._lock:
return self._manager.networks if self._manager else []
def is_saved(self, ssid: str) -> bool:
"""Check if a network is saved (thread-safe)."""
with self._lock:
return self._manager.is_saved(ssid) if self._manager else False
def connect(self):
"""Connect to DBus and start Wi-Fi scanning."""
self._run_coroutine(self._manager.connect())
def request_scan(self):
"""Request a scan for Wi-Fi networks."""
self._run_coroutine(self._manager.request_scan())
def forget_connection(self, ssid: str):
"""Forget a saved Wi-Fi connection."""
self._run_coroutine(self._manager.forget_connection(ssid))
def activate_connection(self, ssid: str):
"""Activate an existing Wi-Fi connection."""
self._run_coroutine(self._manager.activate_connection(ssid))
def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False):
"""Connect to a Wi-Fi network."""
self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden))
def _run_coroutine(self, coro):
"""Run a coroutine in the async thread."""
if not self._running or not self._loop:
cloudlog.error("WifiManager thread is not running")
return
asyncio.run_coroutine_threadsafe(coro, self._loop)

@ -1,7 +1,8 @@
import asyncio
from dataclasses import dataclass
from typing import Literal
import pyray as rl
from enum import IntEnum
from openpilot.system.ui.lib.wifi_manager import WifiManager, NetworkInfo
from openpilot.system.ui.lib.wifi_manager import NetworkInfo, WifiManagerCallbacks, WifiManagerWrapper
from openpilot.system.ui.lib.application import gui_app
from openpilot.system.ui.lib.button import gui_button
from openpilot.system.ui.lib.label import gui_label
@ -12,57 +13,67 @@ from openpilot.system.ui.widgets.confirm_dialog import confirm_dialog
NM_DEVICE_STATE_NEED_AUTH = 60
ITEM_HEIGHT = 160
class ActionState(IntEnum):
NONE = 0
CONNECT = 1
CONNECTING = 2
FORGOT = 3
FORGETTING = 4
NEED_AUTH = 5
SHOW_FORGOT_CONFIRM = 6
@dataclass
class StateIdle:
action: Literal["idle"] = "idle"
@dataclass
class StateConnecting:
network: NetworkInfo
action: Literal["connecting"] = "connecting"
@dataclass
class StateNeedsAuth:
network: NetworkInfo
action: Literal["needs_auth"] = "needs_auth"
@dataclass
class StateShowForgetConfirm:
network: NetworkInfo
action: Literal["show_forget_confirm"] = "show_forget_confirm"
@dataclass
class StateForgetting:
network: NetworkInfo
action: Literal["forgetting"] = "forgetting"
UIState = StateIdle | StateConnecting | StateNeedsAuth | StateShowForgetConfirm | StateForgetting
class WifiManagerUI:
def __init__(self, wifi_manager):
self.wifi_manager = wifi_manager
self.wifi_manager.need_auth_callback = self._on_need_auth
self.wifi_manager.activated_callback = self._on_activated
self._selected_network = None
def __init__(self, wifi_manager: WifiManagerWrapper):
self.state: UIState = StateIdle()
self.btn_width = 200
self.current_action: ActionState = ActionState.NONE
self.scroll_panel = GuiScrollPanel()
self.keyboard = Keyboard()
asyncio.create_task(self._initialize())
async def _initialize(self) -> None:
try:
await self.wifi_manager.connect()
except Exception as e:
print(f"Initialization error: {e}")
self.wifi_manager = wifi_manager
self.wifi_manager.callbacks = WifiManagerCallbacks(self._on_need_auth, self._on_activated, self._on_forgotten)
self.wifi_manager.connect()
def render(self, rect: rl.Rectangle):
if not self.wifi_manager.networks:
gui_label(rect, "Scanning Wi-Fi networks...", 40, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
gui_label(rect, "Scanning Wi-Fi networks...", 72, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER)
return
if self.current_action == ActionState.SHOW_FORGOT_CONFIRM:
result = confirm_dialog(rect, f'Forget Wi-Fi Network "{self._selected_network.ssid}"?', 'Forget')
if result == 1:
asyncio.create_task(self.forgot_network())
elif result == 0:
self.current_action = ActionState.NONE
return
match self.state:
case StateNeedsAuth(network):
result = self.keyboard.render(rect, "Enter password", f"for {network.ssid}")
if result == 1:
self.connect_to_network(network, self.keyboard.text)
elif result == 0:
self.state = StateIdle()
if self.current_action == ActionState.NEED_AUTH:
result = self.keyboard.render(rect, 'Enter password', f'for {self._selected_network.ssid}')
if result == 1:
asyncio.create_task(self.connect_to_network(self.keyboard.text))
elif result == 0:
self.current_action = ActionState.NONE
return
case StateShowForgetConfirm(network):
result = confirm_dialog(rect, f'Forget Wi-Fi Network "{network.ssid}"?', "Forget")
if result == 1:
self.forget_network(network)
elif result == 0:
self.state = StateIdle()
self._draw_network_list(rect)
case _:
self._draw_network_list(rect)
def _draw_network_list(self, rect: rl.Rectangle):
content_rect = rl.Rectangle(rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * ITEM_HEIGHT)
@ -89,10 +100,18 @@ class WifiManagerUI:
gui_label(label_rect, network.ssid, 55)
if network.is_connected and self.current_action == ActionState.NONE:
rl.gui_label(state_rect, "Connected")
elif self.current_action == ActionState.CONNECTING and self._selected_network and self._selected_network.ssid == network.ssid:
rl.gui_label(state_rect, "CONNECTING...")
status_text = ""
if network.is_connected:
status_text = "Connected"
match self.state:
case StateConnecting(network=connecting):
if connecting.ssid == network.ssid:
status_text = "CONNECTING..."
case StateForgetting(network=forgetting):
if forgetting.ssid == network.ssid:
status_text = "FORGETTING..."
if status_text:
rl.gui_label(state_rect, status_text)
# If the network is saved, show the "Forget" button
if self.wifi_manager.is_saved(network.ssid):
@ -102,46 +121,42 @@ class WifiManagerUI:
self.btn_width,
80,
)
if gui_button(forget_btn_rect, "Forget") and self.current_action == ActionState.NONE and clicked:
self._selected_network = network
self.current_action = ActionState.SHOW_FORGOT_CONFIRM
if (
self.current_action == ActionState.NONE
and rl.check_collision_point_rec(rl.get_mouse_position(), label_rect)
and clicked
):
self._selected_network = network
if not self.wifi_manager.is_saved(self._selected_network.ssid):
self.current_action = ActionState.NEED_AUTH
else:
asyncio.create_task(self.connect_to_network())
if isinstance(self.state, StateIdle) and gui_button(forget_btn_rect, "Forget") and clicked:
self.state = StateShowForgetConfirm(network)
async def forgot_network(self):
self.current_action = ActionState.FORGETTING
await self.wifi_manager.forget_connection(self._selected_network.ssid)
self.current_action = ActionState.NONE
if isinstance(self.state, StateIdle) and rl.check_collision_point_rec(rl.get_mouse_position(), label_rect) and clicked:
if not self.wifi_manager.is_saved(network.ssid):
self.state = StateNeedsAuth(network)
else:
self.connect_to_network(network)
async def connect_to_network(self, password=''):
self.current_action = ActionState.CONNECTING
if self.wifi_manager.is_saved(self._selected_network.ssid) and not password:
await self.wifi_manager.activate_connection(self._selected_network.ssid)
def connect_to_network(self, network: NetworkInfo, password=''):
if self.wifi_manager.is_saved(network.ssid) and not password:
self.wifi_manager.activate_connection(network.ssid)
else:
await self.wifi_manager.connect_to_network(self._selected_network.ssid, password)
self.wifi_manager.connect_to_network(network.ssid, password)
def forget_network(self, network: NetworkInfo):
self.state = StateForgetting(network)
self.wifi_manager.forget_connection(network.ssid)
def _on_need_auth(self):
if self.current_action == ActionState.CONNECTING and self._selected_network:
self.current_action = ActionState.NEED_AUTH
match self.state:
case StateConnecting(network):
self.state = StateNeedsAuth(network)
def _on_activated(self):
if self.current_action == ActionState.CONNECTING:
self.current_action = ActionState.NONE
if isinstance(self.state, StateConnecting):
self.state = StateIdle()
def _on_forgotten(self):
if isinstance(self.state, StateForgetting):
self.state = StateIdle()
async def main():
gui_app.init_window("Wifi Manager")
wifi_manager = WifiManager()
def main():
gui_app.init_window("Wi-Fi Manager")
wifi_manager = WifiManagerWrapper()
wifi_ui = WifiManagerUI(wifi_manager)
for _ in gui_app.render():
@ -151,4 +166,4 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())
main()

Loading…
Cancel
Save