python wifi manager

deanlee 2 months ago
parent 5cffaf5bb5
commit 179a8ec007
  1. 104
      system/ui/lib/keyboard.py
  2. 376
      system/ui/lib/wifi_manager.py
  3. 158
      system/ui/network.py

@ -0,0 +1,104 @@
import pyray as rl
from openpilot.system.ui.lib.button import gui_button
from openpilot.system.ui.lib.label import gui_label
# Constants for special keys
BACKSPACE_KEY = "<-"
ENTER_KEY = "Enter"
SPACE_KEY = " "
SHIFT_KEY = ""
SHIFT_DOWN_KEY = ""
NUMERIC_KEY = "123"
SYMBOL_KEY = "#+="
ABC_KEY = "ABC"
# Define keyboard layouts as a dictionary for easier access
keyboard_layouts = {
"lowercase": [
["q", "w", "e", "r", "t", "y", "u", "i", "o", "p"],
["a", "s", "d", "f", "g", "h", "j", "k", "l"],
[SHIFT_KEY, "z", "x", "c", "v", "b", "n", "m", BACKSPACE_KEY],
[NUMERIC_KEY, "/", "-", SPACE_KEY, ".", ENTER_KEY],
],
"uppercase": [
["Q", "W", "E", "R", "T", "Y", "U", "I", "O", "P"],
["A", "S", "D", "F", "G", "H", "J", "K", "L"],
[SHIFT_DOWN_KEY, "Z", "X", "C", "V", "B", "N", "M", BACKSPACE_KEY],
[NUMERIC_KEY, "/", "-", SPACE_KEY, ".", ENTER_KEY],
],
"numbers": [
["1", "2", "3", "4", "5", "6", "7", "8", "9", "0"],
["-", "/", ":", ";", "(", ")", "$", "&", "@", "\""],
[SYMBOL_KEY, ".", ",", "?", "!", "`", BACKSPACE_KEY],
[ABC_KEY, SPACE_KEY, ".", ENTER_KEY],
],
"specials": [
["[", "]", "{", "}", "#", "%", "^", "*", "+", "="],
["_", "\\", "|", "~", "<", ">", "", "£", "¥", ""],
[NUMERIC_KEY, ".", ",", "?", "!", "'", BACKSPACE_KEY],
[ABC_KEY, SPACE_KEY, ".", ENTER_KEY],
],
}
class Keyboard:
def __init__(self, max_text_size: int = 255):
self._layout = keyboard_layouts["lowercase"]
self._input_text = ""
self._max_text_size = max_text_size
@property
def text(self) -> str:
return self._input_text
def clear(self):
self._input_text = ""
def render(self, rect, title, sub_title):
gui_label((rect.x, rect.y, rect.width, 95), title, 90)
gui_label((rect.x, rect.y + 95, rect.width, 60), sub_title, 55, rl.GRAY)
if gui_button(rl.Rectangle(rect.x + rect.width - 300, rect.y, 300, 100), "Cancel"):
return -1
# Text box for input
rl.gui_text_box(rl.Rectangle(rect.x, rect.y + 160, rect.width, 100), self._input_text, self._max_text_size, True)
h_space, v_space = 15, 15
row_y_start = rect.y + 300 # Starting Y position for the first row
key_height = (rect.height - 300 - 3 * v_space) / 4
key_max_width = (rect.width - (len(self._layout[2]) - 1) * h_space) / len(self._layout[2])
# Iterate over the rows of keys in the current layout
for row, keys in enumerate(self._layout):
key_width = min((rect.width - (180 if row == 1 else 0) - h_space * (len(keys) - 1)) / len(keys), key_max_width)
start_x = rect.x + (90 if row == 1 else 0)
for i, key in enumerate(keys):
if i > 0:
start_x += h_space
new_width = (key_width * 3 + h_space * 2) if key == SPACE_KEY else (key_width * 2 + h_space if key == ENTER_KEY else key_width)
key_rect = rl.Rectangle(start_x, row_y_start + row * (key_height + v_space), new_width, key_height)
start_x += new_width
if gui_button(key_rect, key):
if key == ENTER_KEY:
return 1
else:
self.handle_key_press(key)
return 0
def handle_key_press(self, key):
if key in (SHIFT_DOWN_KEY, ABC_KEY):
self._layout = keyboard_layouts["lowercase"]
elif key == SHIFT_KEY:
self._layout = keyboard_layouts["uppercase"]
elif key == NUMERIC_KEY:
self._layout = keyboard_layouts["numbers"]
elif key == SYMBOL_KEY:
self._layout = keyboard_layouts["specials"]
elif key == BACKSPACE_KEY and len(self._input_text) > 0:
self._input_text = self._input_text[:-1]
elif key != BACKSPACE_KEY and len(self._input_text) < self._max_text_size:
self._input_text += key

@ -0,0 +1,376 @@
import asyncio
from dbus_next.aio import MessageBus
from dbus_next import BusType, Variant, Message
from dbus_next.errors import DBusError
from dbus_next.constants import MessageType
from enum import IntEnum
import uuid
from dataclasses import dataclass
from openpilot.common.swaglog import cloudlog
NM = "org.freedesktop.NetworkManager"
NM_PATH = '/org/freedesktop/NetworkManager'
NM_IFACE = 'org.freedesktop.NetworkManager'
NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings'
NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings'
NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection'
NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless'
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties'
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device"
# NetworkManager device states
class NMDeviceState(IntEnum):
DISCONNECTED = 30
PREPARE = 40
NEED_AUTH = 60
IP_CONFIG = 70
ACTIVATED = 100
class SecurityType(IntEnum):
OPEN = 0
WPA = 1
WPA2 = 2
UNSUPPORTED = 3
@dataclass
class NetworkInfo:
ssid: str
strength: int
is_connected: bool
security_type: SecurityType
path: str
bssid: str
# saved_path: str
class WifiManager:
def __init__(self):
self.networks: list[NetworkInfo] = []
self.bus: MessageBus | None = None
self.device_path: str | None = None
self.device_proxy = None
self.saved_connections: dict[str, str] = dict()
self.active_ap_path: str = ''
self.scan_task: asyncio.Task | None = None
self.running: bool = True
def is_saved(self, ssid: str) -> bool:
return ssid in self.saved_connections
async def connect(self):
"""Connect to the DBus system bus."""
try:
self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
if not await self._find_wifi_device():
raise ValueError("No Wi-Fi device found")
await self._setup_signals(self.device_path)
self.active_ap_path = await self.get_active_access_point()
self.saved_connections = await self._get_saved_connections()
self.scan_task = asyncio.create_task(self._periodic_scan())
except DBusError as e:
cloudlog.error(f"Failed to connect to DBus: {e}")
raise
except Exception as e:
cloudlog.error(f"Unexpected error during connect: {e}")
raise
async def shutdown(self) -> None:
self.running = False
if self.scan_task:
self.scan_task.cancel()
await self.scan_task
if self.bus:
await self.bus.disconnect()
async def request_scan(self):
try:
interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE)
await interface.call_request_scan({})
except DBusError as e:
cloudlog.warning(f"Scan request failed: {e}")
async def get_active_access_point(self):
try:
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE)
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint')
return ap_path.value
except DBusError as e:
cloudlog.error(f"Error fetching active access point: {e}")
return ''
async def forgot_connection(self, ssid: str) -> bool:
path = self.saved_connections.get(ssid)
if not path:
return False
try:
nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE)
await nm_iface.call_delete()
self.saved_connections.pop(ssid)
return True
except DBusError as e:
cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}")
return False
except Exception as e:
cloudlog.error(f"Unexpected error while deleting connection for SSID: {ssid}: {e}")
return False
async def activate_connection(self, ssid: str) -> None:
connection_path = self.saved_connections.get(ssid)
if connection_path:
cloudlog.info('activate connection:', connection_path)
introspection = await self.bus.introspect(NM, NM_PATH)
proxy = self.bus.get_proxy_object(NM, NM_PATH, introspection)
interface = proxy.get_interface(NM_IFACE)
await interface.call_activate_connection(connection_path, self.device_path, '/')
async def connect_to_network(self, ssid: str, password: str = None, is_hidden: bool = False):
"""Connect to a selected WiFi network."""
try:
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
connection = {
'connection': {
'type': Variant('s', '802-11-wireless'),
'uuid': Variant('s', str(uuid.uuid4())),
'id': Variant('s', ssid),
'autoconnect-retries': Variant('i', 0),
},
'802-11-wireless': {
'ssid': Variant('ay', ssid.encode('utf-8')),
'hidden': Variant('b', is_hidden),
'mode': Variant('s', 'infrastructure'),
},
'ipv4': {'method': Variant('s', 'auto')},
'ipv6': {'method': Variant('s', 'ignore')},
}
# if bssid:
# connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8'))
if password:
connection['802-11-wireless-security'] = {
'key-mgmt': Variant('s', 'wpa-psk'),
'auth-alg': Variant('s', 'open'),
'psk': Variant('s', password),
}
await settings_iface.call_add_connection(connection)
for network in self.networks:
network.is_connected = True if network.ssid == ssid else False
except DBusError as e:
cloudlog.error(f"Error connecting to network: {e}")
async def _find_wifi_device(self) -> bool:
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE)
devices = await nm_iface.get_devices()
for device_path in devices:
device = await self.bus.introspect(NM, device_path)
device_proxy = self.bus.get_proxy_object(NM, device_path, device)
device_interface = device_proxy.get_interface(NM_DEVICE_IFACE)
device_type = await device_interface.get_device_type()
if device_type == 2: # WiFi device
self.device_path = device_path
self.device_proxy = device_proxy
return True
return False
async def _periodic_scan(self):
while self.running:
try:
await self.request_scan()
await self._get_available_networks()
await asyncio.sleep(30)
except asyncio.CancelledError:
break
except DBusError as e:
cloudlog.error(f"Scan failed: {e}")
await asyncio.sleep(5)
async def _setup_signals(self, device_path: str) -> None:
rules = [
f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'",
f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'",
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'",
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'",
]
for rule in rules:
await self._add_match_rule(rule)
# Set up signal handlers
self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(
self._on_properties_changed
)
self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed)
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
settings_iface.on_new_connection(self._on_new_connection)
settings_iface.on_connection_removed(self._on_connection_removed)
def _on_properties_changed(self, interface: str, changed: dict, invalidated: list):
# print("property changed", interface, changed, invalidated)
if 'LastScan' in changed:
asyncio.create_task(self._get_available_networks())
elif "ActiveAccessPoint" in changed:
self.active_ap_path = changed["ActiveAccessPoint"].value
asyncio.create_task(self._get_available_networks())
def _on_state_changed(self, new_state: int, old_state: int, reason: int):
print(f"State changed: {old_state} -> {new_state}, reason: {reason}")
if new_state == NMDeviceState.ACTIVATED:
asyncio.create_task(self._update_connection_status())
elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH):
for network in self.networks:
network.is_connected = False
def _on_new_connection(self, path: str) -> None:
"""Callback for NewConnection signal."""
print(f"New connection added: {path}")
asyncio.create_task(self._add_saved_connection(path))
def _on_connection_removed(self, path: str) -> None:
"""Callback for ConnectionRemoved signal."""
print(f"Connection removed: {path}")
for ssid, p in self.saved_connections.items():
if path == p:
del self.saved_connections[ssid]
break
async def _add_saved_connection(self, path: str) -> None:
"""Add a new saved connection to the dictionary."""
try:
settings = await self._get_connection_settings(path)
if ssid := self._extract_ssid(settings):
self.saved_connections[ssid] = path
except DBusError as e:
cloudlog.error(f"Failed to add connection {path}: {e}")
def _extract_ssid(self, settings: dict) -> str | None:
"""Extract SSID from connection settings."""
ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value
return ''.join(chr(b) for b in ssid_variant) if ssid_variant else None
async def _update_connection_status(self):
self.active_ap_path = await self.get_active_access_point()
await self._get_available_networks()
async def _add_match_rule(self, rule):
""" "Add a match rule on the bus."""
reply = await self.bus.call(
Message(
message_type=MessageType.METHOD_CALL,
destination='org.freedesktop.DBus',
interface="org.freedesktop.DBus",
path='/org/freedesktop/DBus',
member='AddMatch',
signature='s',
body=[rule],
)
)
assert reply.message_type == MessageType.METHOD_RETURN
return reply
async def _get_available_networks(self):
"""Get a list of available networks via NetworkManager."""
networks = []
wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE)
access_points = await wifi_iface.get_access_points()
for ap_path in access_points:
try:
props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE)
properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint')
ssid_variant = properties['Ssid'].value
ssid = ''.join(chr(byte) for byte in ssid_variant)
if not ssid:
continue
bssid = properties.get('HwAddress', Variant('s', '')).value
print(bssid)
flags = properties['Flags'].value
wpa_flags = properties['WpaFlags'].value
rsn_flags = properties['RsnFlags'].value
networks.append(
NetworkInfo(
ssid=ssid,
strength=properties['Strength'].value,
security_type=self._get_security_type(flags, wpa_flags, rsn_flags),
path=ap_path,
bssid=bssid,
is_connected=self.active_ap_path == ap_path,
)
)
except DBusError as e:
cloudlog.error(f"Error fetching networks: {e}")
except Exception as e:
cloudlog.error({e})
self.networks = sorted(
networks,
key=lambda network: (
not network.is_connected,
-network.strength, # Higher signal strength first
network.ssid.lower(),
),
)
async def _get_connection_settings(self, path):
"""Fetch connection settings for a specific connection path."""
connection_proxy = await self.bus.introspect(NM, path)
connection = self.bus.get_proxy_object(NM, path, connection_proxy)
settings = connection.get_interface(NM_CONNECTION_IFACE)
return await settings.call_get_settings()
async def _process_chunk(self, paths_chunk):
"""Process a chunk of connection paths."""
tasks = [self._get_connection_settings(path) for path in paths_chunk]
results = await asyncio.gather(*tasks)
return results
async def _get_saved_connections(self):
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE)
connection_paths = await settings_iface.call_list_connections()
saved_ssids: dict[str, str] = {}
batch_size = 120
for i in range(0, len(connection_paths), batch_size):
chunk = connection_paths[i : i + batch_size]
results = await self._process_chunk(chunk)
# Loop through the results and filter Wi-Fi connections
for path, config in zip(chunk, results, strict=True):
if '802-11-wireless' in config:
saved_ssids[self._extract_ssid(config)] = path
return saved_ssids
async def _get_interface(self, bus_name: str, path: str, name: str):
introspection = await self.bus.introspect(bus_name, path)
proxy = self.bus.get_proxy_object(bus_name, path, introspection)
return proxy.get_interface(name)
def _get_security_type(self, flags, wpa_flags, rsn_flags):
"""Helper function to determine the security type of a network."""
if flags == 0:
return SecurityType.OPEN
if wpa_flags:
return SecurityType.WPA
if rsn_flags:
return SecurityType.WPA2
else:
return SecurityType.UNSUPPORTED
async def _get_interface(self, bus_name: str, path: str, name: str):
introspection = await self.bus.introspect(bus_name, path)
proxy = self.bus.get_proxy_object(bus_name, path, introspection)
return proxy.get_interface(name)

@ -0,0 +1,158 @@
import asyncio
import pyray as rl
from enum import IntEnum
from dbus_next.constants import MessageType
from openpilot.system.ui.lib.wifi_manager import WifiManager, NetworkInfo
from openpilot.system.ui.lib.application import gui_app
from openpilot.system.ui.lib.label import gui_label
from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel
from openpilot.system.ui.lib.keyboard import Keyboard
NM_DEVICE_STATE_NEED_AUTH = 60
class ActionState(IntEnum):
NONE = 0
CONNECT = 1
CONNECTING = 2
FORGOT = 3
FORGETTING = 4
NEED_AUTH = 5
class WifiManagerUI:
def __init__(self, wifi_manager):
self.wifi_manager = wifi_manager
self._selected_network = None
self.item_height = 160
self.btn_width = 200
self.current_action: ActionState = ActionState.NONE
self.scroll_panel = GuiScrollPanel()
self.keyboard = Keyboard()
asyncio.create_task(self._initialize())
async def _initialize(self) -> None:
try:
await self.wifi_manager.connect()
self.wifi_manager.bus.add_message_handler(self._handle_dbus_signal)
except Exception as e:
print(f"Initialization error: {e}")
def draw_network_list(self, rect: rl.Rectangle):
if not self.wifi_manager.networks:
gui_label(
rect, "Scanning Wi-Fi networks...", 40, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER
)
return
if self.current_action == ActionState.NEED_AUTH:
result = self.keyboard.render(rect, 'Enter password', f'for {self._selected_network.ssid}')
if result == 0:
return
else:
self.current_action = ActionState.NONE
asyncio.create_task(self.connect_to_network(self.keyboard.text))
content_rect = rl.Rectangle(
rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * self.item_height
)
offset = self.scroll_panel.handle_scroll(rect, content_rect)
rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height))
clicked = offset.y < 10 and rl.is_mouse_button_released(rl.MouseButton.MOUSE_BUTTON_LEFT)
for i, network in enumerate(self.wifi_manager.networks):
y_offset = i * self.item_height + offset.y
item_rect = rl.Rectangle(rect.x, y_offset, rect.width, self.item_height)
if rl.check_collision_recs(item_rect, rect):
self.render_network_item(item_rect, network, clicked)
if i < len(self.wifi_manager.networks) - 1:
line_y = int(item_rect.y + item_rect.height - 1)
rl.draw_line(
int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY
)
rl.end_scissor_mode()
def render_network_item(self, rect, network: NetworkInfo, clicked: bool):
label_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, self.item_height)
state_rect = rl.Rectangle(
rect.x + rect.width - self.btn_width * 2 - 30, rect.y, self.btn_width, self.item_height
)
gui_label(label_rect, network.ssid, 55)
if network.is_connected and self.current_action == ActionState.NONE:
rl.gui_label(state_rect, "Connected")
elif (
self.current_action == "Connecting"
and self._selected_network
and self._selected_network.ssid == network.ssid
):
rl.gui_label(state_rect, "CONNECTING...")
# If the network is saved, show the "Forget" button
if self.wifi_manager.is_saved(network.ssid):
forget_btn_rect = rl.Rectangle(
rect.x + rect.width - self.btn_width,
rect.y + (self.item_height - 80) / 2,
self.btn_width,
80,
)
if rl.gui_button(forget_btn_rect, "Forget") and self.current_action == ActionState.NONE:
self._selected_network = network
asyncio.create_task(self.forgot_network())
if (
self.current_action == ActionState.NONE
and rl.check_collision_point_rec(rl.get_mouse_position(), label_rect)
and clicked
):
self._selected_network = network
if not self.wifi_manager.is_saved(self._selected_network.ssid):
self.current_action = ActionState.NEED_AUTH
else:
asyncio.create_task(self.connect_to_network())
async def forgot_network(self):
self.current_action = ActionState.FORGETTING
await self.wifi_manager.forgot_connection(self._selected_network.ssid)
self.current_action = ActionState.NONE
async def connect_to_network(self, password=''):
self.current_action = ActionState.CONNECTING
if self.wifi_manager.is_saved(self._selected_network.ssid) and not password:
await self.wifi_manager.activate_connection(self._selected_network.ssid)
else:
await self.wifi_manager.connect_to_network(self._selected_network.ssid, password)
self.current_action = ActionState.NONE
def _handle_dbus_signal(self, message):
if message.message_type != MessageType.SIGNAL:
return
if message.member == 'StateChanged':
if len(message.body) >= 2:
_, new_state = message.body[0], message.body[1]
if new_state == NM_DEVICE_STATE_NEED_AUTH:
self.current_action = ActionState.NEED_AUTH
async def main():
gui_app.init_window("Wifi Manager")
wifi_manager = WifiManager()
wifi_ui = WifiManagerUI(wifi_manager)
while not rl.window_should_close():
rl.begin_drawing()
rl.clear_background(rl.BLACK)
wifi_ui.draw_network_list(rl.Rectangle(50, 50, gui_app.width - 100, gui_app.height - 100))
rl.end_drawing()
await asyncio.sleep(0.001)
if __name__ == "__main__":
asyncio.run(main())
Loading…
Cancel
Save