commit
173237eab3
62 changed files with 2013 additions and 871 deletions
@ -0,0 +1,3 @@ |
|||||||
|
version https://git-lfs.github.com/spec/v1 |
||||||
|
oid sha256:8a5245f9458982b608fee67fc689d899ca638a405ff62bf9db5e4978b177ef3e |
||||||
|
size 121394 |
@ -1 +1 @@ |
|||||||
Subproject commit 6af5ad8dd5de3b890d2812cc19b063caae858b31 |
Subproject commit c856a2c0bd2b3c75f86a73b051c0c4cc7159559e |
@ -1,3 +1,3 @@ |
|||||||
version https://git-lfs.github.com/spec/v1 |
version https://git-lfs.github.com/spec/v1 |
||||||
oid sha256:f222d2c528f1763828de01bb55e8979b1e4056e1dbb41350f521d2d2bb09d177 |
oid sha256:dad289ae367cefcb862ef1d707fb4919d008f0eeaa1ebaf18df58d8de5a7d96e |
||||||
size 46265585 |
size 46265585 |
||||||
|
@ -0,0 +1,53 @@ |
|||||||
|
import pyray as rl |
||||||
|
|
||||||
|
ON_COLOR = rl.GREEN |
||||||
|
OFF_COLOR = rl.Color(0x39, 0x39, 0x39, 255) |
||||||
|
KNOB_COLOR = rl.WHITE |
||||||
|
BG_HEIGHT = 60 |
||||||
|
KNOB_HEIGHT = 80 |
||||||
|
WIDTH = 160 |
||||||
|
|
||||||
|
|
||||||
|
class Toggle: |
||||||
|
def __init__(self, x, y, initial_state=False): |
||||||
|
self._state = initial_state |
||||||
|
self._rect = rl.Rectangle(x, y, WIDTH, KNOB_HEIGHT) |
||||||
|
|
||||||
|
def handle_input(self): |
||||||
|
if rl.is_mouse_button_pressed(rl.MOUSE_LEFT_BUTTON): |
||||||
|
mouse_pos = rl.get_mouse_position() |
||||||
|
if rl.check_collision_point_rec(mouse_pos, self._rect): |
||||||
|
self._state = not self._state |
||||||
|
|
||||||
|
def get_state(self): |
||||||
|
return self._state |
||||||
|
|
||||||
|
def render(self): |
||||||
|
self._draw_background() |
||||||
|
self._draw_knob() |
||||||
|
|
||||||
|
def _draw_background(self): |
||||||
|
bg_rect = rl.Rectangle( |
||||||
|
self._rect.x + 5, |
||||||
|
self._rect.y + (KNOB_HEIGHT - BG_HEIGHT) / 2, |
||||||
|
self._rect.width - 10, |
||||||
|
BG_HEIGHT, |
||||||
|
) |
||||||
|
rl.draw_rectangle_rounded(bg_rect, 1.0, 10, ON_COLOR if self._state else OFF_COLOR) |
||||||
|
|
||||||
|
def _draw_knob(self): |
||||||
|
knob_radius = KNOB_HEIGHT / 2 |
||||||
|
knob_x = self._rect.x + knob_radius if not self._state else self._rect.x + self._rect.width - knob_radius |
||||||
|
knob_y = self._rect.y + knob_radius |
||||||
|
rl.draw_circle(int(knob_x), int(knob_y), knob_radius, KNOB_COLOR) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
from openpilot.system.ui.lib.application import gui_app |
||||||
|
|
||||||
|
gui_app.init_window("Text toggle example") |
||||||
|
toggle = Toggle(100, 100) |
||||||
|
for _ in gui_app.render(): |
||||||
|
toggle.handle_input() |
||||||
|
toggle.render() |
||||||
|
|
@ -0,0 +1,694 @@ |
|||||||
|
import asyncio |
||||||
|
import concurrent.futures |
||||||
|
import threading |
||||||
|
import time |
||||||
|
import uuid |
||||||
|
from collections.abc import Callable |
||||||
|
from dataclasses import dataclass |
||||||
|
from enum import IntEnum |
||||||
|
from typing import TypeVar |
||||||
|
|
||||||
|
from dbus_next.aio import MessageBus |
||||||
|
from dbus_next import BusType, Variant, Message |
||||||
|
from dbus_next.errors import DBusError |
||||||
|
from dbus_next.constants import MessageType |
||||||
|
from openpilot.common.params import Params |
||||||
|
from openpilot.common.swaglog import cloudlog |
||||||
|
|
||||||
|
T = TypeVar("T") |
||||||
|
|
||||||
|
# NetworkManager constants |
||||||
|
NM = "org.freedesktop.NetworkManager" |
||||||
|
NM_PATH = '/org/freedesktop/NetworkManager' |
||||||
|
NM_IFACE = 'org.freedesktop.NetworkManager' |
||||||
|
NM_SETTINGS_PATH = '/org/freedesktop/NetworkManager/Settings' |
||||||
|
NM_SETTINGS_IFACE = 'org.freedesktop.NetworkManager.Settings' |
||||||
|
NM_CONNECTION_IFACE = 'org.freedesktop.NetworkManager.Settings.Connection' |
||||||
|
NM_WIRELESS_IFACE = 'org.freedesktop.NetworkManager.Device.Wireless' |
||||||
|
NM_PROPERTIES_IFACE = 'org.freedesktop.DBus.Properties' |
||||||
|
NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" |
||||||
|
|
||||||
|
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 |
||||||
|
|
||||||
|
TETHERING_IP_ADDRESS = "192.168.43.1" |
||||||
|
DEFAULT_TETHERING_PASSWORD = "12345678" |
||||||
|
|
||||||
|
# NetworkManager device states |
||||||
|
class NMDeviceState(IntEnum): |
||||||
|
DISCONNECTED = 30 |
||||||
|
PREPARE = 40 |
||||||
|
NEED_AUTH = 60 |
||||||
|
IP_CONFIG = 70 |
||||||
|
ACTIVATED = 100 |
||||||
|
|
||||||
|
class SecurityType(IntEnum): |
||||||
|
OPEN = 0 |
||||||
|
WPA = 1 |
||||||
|
WPA2 = 2 |
||||||
|
WPA3 = 3 |
||||||
|
UNSUPPORTED = 4 |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class NetworkInfo: |
||||||
|
ssid: str |
||||||
|
strength: int |
||||||
|
is_connected: bool |
||||||
|
security_type: SecurityType |
||||||
|
path: str |
||||||
|
bssid: str |
||||||
|
# saved_path: str |
||||||
|
|
||||||
|
|
||||||
|
@dataclass |
||||||
|
class WifiManagerCallbacks: |
||||||
|
need_auth: Callable[[str], None] | None = None |
||||||
|
activated: Callable[[], None] | None = None |
||||||
|
forgotten: Callable[[], None] | None = None |
||||||
|
|
||||||
|
|
||||||
|
class WifiManager: |
||||||
|
def __init__(self, callbacks): |
||||||
|
self.callbacks: WifiManagerCallbacks = callbacks |
||||||
|
self.networks: list[NetworkInfo] = [] |
||||||
|
self.bus: MessageBus = None |
||||||
|
self.device_path: str = "" |
||||||
|
self.device_proxy = None |
||||||
|
self.saved_connections: dict[str, str] = {} |
||||||
|
self.active_ap_path: str = "" |
||||||
|
self.scan_task: asyncio.Task | None = None |
||||||
|
self._tethering_ssid = "weedle-" + Params().get("DongleId", encoding="utf-8") |
||||||
|
self.running: bool = True |
||||||
|
self._current_connection_ssid: str | None = None |
||||||
|
|
||||||
|
async def connect(self) -> None: |
||||||
|
"""Connect to the DBus system bus.""" |
||||||
|
try: |
||||||
|
self.bus = await MessageBus(bus_type=BusType.SYSTEM).connect() |
||||||
|
if not await self._find_wifi_device(): |
||||||
|
raise ValueError("No Wi-Fi device found") |
||||||
|
await self._setup_signals(self.device_path) |
||||||
|
|
||||||
|
self.active_ap_path = await self.get_active_access_point() |
||||||
|
await self.add_tethering_connection(self._tethering_ssid, DEFAULT_TETHERING_PASSWORD) |
||||||
|
self.saved_connections = await self._get_saved_connections() |
||||||
|
self.scan_task = asyncio.create_task(self._periodic_scan()) |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Failed to connect to DBus: {e}") |
||||||
|
raise |
||||||
|
except Exception as e: |
||||||
|
cloudlog.error(f"Unexpected error during connect: {e}") |
||||||
|
raise |
||||||
|
|
||||||
|
async def shutdown(self) -> None: |
||||||
|
self.running = False |
||||||
|
if self.scan_task: |
||||||
|
self.scan_task.cancel() |
||||||
|
try: |
||||||
|
await self.scan_task |
||||||
|
except asyncio.CancelledError: |
||||||
|
pass |
||||||
|
if self.bus: |
||||||
|
await self.bus.disconnect() |
||||||
|
|
||||||
|
async def request_scan(self) -> None: |
||||||
|
try: |
||||||
|
interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) |
||||||
|
await interface.call_request_scan({}) |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.warning(f"Scan request failed: {str(e)}") |
||||||
|
|
||||||
|
async def get_active_access_point(self): |
||||||
|
try: |
||||||
|
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) |
||||||
|
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') |
||||||
|
return ap_path.value |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Error fetching active access point: {str(e)}") |
||||||
|
return '' |
||||||
|
|
||||||
|
async def forget_connection(self, ssid: str) -> bool: |
||||||
|
path = self.saved_connections.get(ssid) |
||||||
|
if not path: |
||||||
|
return False |
||||||
|
|
||||||
|
try: |
||||||
|
nm_iface = await self._get_interface(NM, path, NM_CONNECTION_IFACE) |
||||||
|
await nm_iface.call_delete() |
||||||
|
if self._current_connection_ssid == ssid: |
||||||
|
self._current_connection_ssid = None |
||||||
|
|
||||||
|
if ssid in self.saved_connections: |
||||||
|
del self.saved_connections[ssid] |
||||||
|
|
||||||
|
return True |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Failed to delete connection for SSID: {ssid}. Error: {e}") |
||||||
|
return False |
||||||
|
|
||||||
|
async def activate_connection(self, ssid: str) -> bool: |
||||||
|
connection_path = self.saved_connections.get(ssid) |
||||||
|
if not connection_path: |
||||||
|
return False |
||||||
|
try: |
||||||
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
||||||
|
await nm_iface.call_activate_connection(connection_path, self.device_path, "/") |
||||||
|
return True |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Failed to activate connection {ssid}: {str(e)}") |
||||||
|
return False |
||||||
|
|
||||||
|
async def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False) -> None: |
||||||
|
"""Connect to a selected Wi-Fi network.""" |
||||||
|
try: |
||||||
|
self._current_connection_ssid = ssid |
||||||
|
|
||||||
|
if ssid in self.saved_connections: |
||||||
|
# Forget old connection if new password provided |
||||||
|
if password: |
||||||
|
await self.forget_connection(ssid) |
||||||
|
await asyncio.sleep(0.2) # NetworkManager delay |
||||||
|
else: |
||||||
|
# Just activate existing connection |
||||||
|
await self.activate_connection(ssid) |
||||||
|
return |
||||||
|
|
||||||
|
connection = { |
||||||
|
'connection': { |
||||||
|
'type': Variant('s', '802-11-wireless'), |
||||||
|
'uuid': Variant('s', str(uuid.uuid4())), |
||||||
|
'id': Variant('s', ssid), |
||||||
|
'autoconnect-retries': Variant('i', 0), |
||||||
|
}, |
||||||
|
'802-11-wireless': { |
||||||
|
'ssid': Variant('ay', ssid.encode('utf-8')), |
||||||
|
'hidden': Variant('b', is_hidden), |
||||||
|
'mode': Variant('s', 'infrastructure'), |
||||||
|
}, |
||||||
|
'ipv4': {'method': Variant('s', 'auto')}, |
||||||
|
'ipv6': {'method': Variant('s', 'ignore')}, |
||||||
|
} |
||||||
|
|
||||||
|
if bssid: |
||||||
|
connection['802-11-wireless']['bssid'] = Variant('ay', bssid.encode('utf-8')) |
||||||
|
|
||||||
|
if password: |
||||||
|
connection['802-11-wireless-security'] = { |
||||||
|
'key-mgmt': Variant('s', 'wpa-psk'), |
||||||
|
'auth-alg': Variant('s', 'open'), |
||||||
|
'psk': Variant('s', password), |
||||||
|
} |
||||||
|
|
||||||
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
||||||
|
await nm_iface.call_add_and_activate_connection(connection, self.device_path, "/") |
||||||
|
await self._update_connection_status() |
||||||
|
except DBusError as e: |
||||||
|
self._current_connection_ssid = None |
||||||
|
cloudlog.error(f"Error connecting to network: {e}") |
||||||
|
|
||||||
|
def is_saved(self, ssid: str) -> bool: |
||||||
|
return ssid in self.saved_connections |
||||||
|
|
||||||
|
async def _find_wifi_device(self) -> bool: |
||||||
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
||||||
|
devices = await nm_iface.get_devices() |
||||||
|
|
||||||
|
for device_path in devices: |
||||||
|
device = await self.bus.introspect(NM, device_path) |
||||||
|
device_proxy = self.bus.get_proxy_object(NM, device_path, device) |
||||||
|
device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) |
||||||
|
device_type = await device_interface.get_device_type() # type: ignore[attr-defined] |
||||||
|
if device_type == 2: # Wi-Fi device |
||||||
|
self.device_path = device_path |
||||||
|
self.device_proxy = device_proxy |
||||||
|
return True |
||||||
|
|
||||||
|
return False |
||||||
|
|
||||||
|
async def add_tethering_connection(self, ssid: str, password: str = "12345678") -> bool: |
||||||
|
"""Create a WiFi tethering connection.""" |
||||||
|
if len(password) < 8: |
||||||
|
print("Tethering password must be at least 8 characters") |
||||||
|
return False |
||||||
|
|
||||||
|
try: |
||||||
|
# First, check if a hotspot connection already exists |
||||||
|
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) |
||||||
|
connection_paths = await settings_iface.call_list_connections() |
||||||
|
|
||||||
|
# Look for an existing hotspot connection |
||||||
|
for path in connection_paths: |
||||||
|
try: |
||||||
|
settings = await self._get_connection_settings(path) |
||||||
|
conn_type = settings.get('connection', {}).get('type', Variant('s', '')).value |
||||||
|
wifi_mode = settings.get('802-11-wireless', {}).get('mode', Variant('s', '')).value |
||||||
|
|
||||||
|
if conn_type == '802-11-wireless' and wifi_mode == 'ap': |
||||||
|
# Extract the SSID to check |
||||||
|
connection_ssid = self._extract_ssid(settings) |
||||||
|
if connection_ssid == ssid: |
||||||
|
return True |
||||||
|
except DBusError: |
||||||
|
continue |
||||||
|
|
||||||
|
connection = { |
||||||
|
'connection': { |
||||||
|
'id': Variant('s', 'Hotspot'), |
||||||
|
'uuid': Variant('s', str(uuid.uuid4())), |
||||||
|
'type': Variant('s', '802-11-wireless'), |
||||||
|
'interface-name': Variant('s', 'wlan0'), |
||||||
|
'autoconnect': Variant('b', False), |
||||||
|
}, |
||||||
|
'802-11-wireless': { |
||||||
|
'band': Variant('s', 'bg'), |
||||||
|
'mode': Variant('s', 'ap'), |
||||||
|
'ssid': Variant('ay', ssid.encode('utf-8')), |
||||||
|
}, |
||||||
|
'802-11-wireless-security': { |
||||||
|
'group': Variant('as', ['ccmp']), |
||||||
|
'key-mgmt': Variant('s', 'wpa-psk'), |
||||||
|
'pairwise': Variant('as', ['ccmp']), |
||||||
|
'proto': Variant('as', ['rsn']), |
||||||
|
'psk': Variant('s', password), |
||||||
|
}, |
||||||
|
'ipv4': { |
||||||
|
'method': Variant('s', 'shared'), |
||||||
|
'address-data': Variant('aa{sv}', [{'address': Variant('s', TETHERING_IP_ADDRESS), 'prefix': Variant('u', 24)}]), |
||||||
|
'gateway': Variant('s', TETHERING_IP_ADDRESS), |
||||||
|
'never-default': Variant('b', True), |
||||||
|
}, |
||||||
|
'ipv6': { |
||||||
|
'method': Variant('s', 'ignore'), |
||||||
|
}, |
||||||
|
} |
||||||
|
|
||||||
|
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) |
||||||
|
new_connection = await settings_iface.call_add_connection(connection) |
||||||
|
print(f"Added tethering connection with path: {new_connection}") |
||||||
|
return True |
||||||
|
except DBusError as e: |
||||||
|
print(f"Failed to add tethering connection: {e}") |
||||||
|
return False |
||||||
|
except Exception as e: |
||||||
|
print(f"Unexpected error adding tethering connection: {e}") |
||||||
|
return False |
||||||
|
|
||||||
|
async def get_tethering_password(self) -> str: |
||||||
|
"""Get the current tethering password.""" |
||||||
|
try: |
||||||
|
hotspot_path = self.saved_connections.get(self._tethering_ssid) |
||||||
|
if hotspot_path: |
||||||
|
conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) |
||||||
|
secrets = await conn_iface.call_get_secrets('802-11-wireless-security') |
||||||
|
if secrets and '802-11-wireless-security' in secrets: |
||||||
|
psk = secrets.get('802-11-wireless-security', {}).get('psk', Variant('s', '')).value |
||||||
|
return str(psk) if psk is not None else "" |
||||||
|
return "" |
||||||
|
except DBusError as e: |
||||||
|
print(f"Failed to get tethering password: {e}") |
||||||
|
return "" |
||||||
|
except Exception as e: |
||||||
|
print(f"Unexpected error getting tethering password: {e}") |
||||||
|
return "" |
||||||
|
|
||||||
|
async def set_tethering_password(self, password: str) -> bool: |
||||||
|
"""Set the tethering password.""" |
||||||
|
if len(password) < 8: |
||||||
|
cloudlog.error("Tethering password must be at least 8 characters") |
||||||
|
return False |
||||||
|
|
||||||
|
try: |
||||||
|
hotspot_path = self.saved_connections.get(self._tethering_ssid) |
||||||
|
if not hotspot_path: |
||||||
|
print("No hotspot connection found") |
||||||
|
return False |
||||||
|
|
||||||
|
# Update the connection settings with new password |
||||||
|
settings = await self._get_connection_settings(hotspot_path) |
||||||
|
if '802-11-wireless-security' not in settings: |
||||||
|
settings['802-11-wireless-security'] = {} |
||||||
|
settings['802-11-wireless-security']['psk'] = Variant('s', password) |
||||||
|
|
||||||
|
# Apply changes |
||||||
|
conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) |
||||||
|
await conn_iface.call_update(settings) |
||||||
|
|
||||||
|
# Check if connection is active and restart if needed |
||||||
|
is_active = False |
||||||
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
||||||
|
active_connections = await nm_iface.get_active_connections() |
||||||
|
|
||||||
|
for conn_path in active_connections: |
||||||
|
props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) |
||||||
|
conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') |
||||||
|
if conn_id_path.value == hotspot_path: |
||||||
|
is_active = True |
||||||
|
await nm_iface.call_deactivate_connection(conn_path) |
||||||
|
break |
||||||
|
|
||||||
|
if is_active: |
||||||
|
await nm_iface.call_activate_connection(hotspot_path, self.device_path, "/") |
||||||
|
|
||||||
|
print("Tethering password updated successfully") |
||||||
|
return True |
||||||
|
except DBusError as e: |
||||||
|
print(f"Failed to set tethering password: {e}") |
||||||
|
return False |
||||||
|
except Exception as e: |
||||||
|
print(f"Unexpected error setting tethering password: {e}") |
||||||
|
return False |
||||||
|
|
||||||
|
async def is_tethering_active(self) -> bool: |
||||||
|
"""Check if tethering is active for the specified SSID.""" |
||||||
|
try: |
||||||
|
hotspot_path = self.saved_connections.get(self._tethering_ssid) |
||||||
|
if not hotspot_path: |
||||||
|
return False |
||||||
|
|
||||||
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
||||||
|
active_connections = await nm_iface.get_active_connections() |
||||||
|
|
||||||
|
for conn_path in active_connections: |
||||||
|
props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) |
||||||
|
conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') |
||||||
|
|
||||||
|
if conn_id_path.value == hotspot_path: |
||||||
|
return True |
||||||
|
|
||||||
|
return False |
||||||
|
except Exception: |
||||||
|
return False |
||||||
|
|
||||||
|
async def _periodic_scan(self): |
||||||
|
while self.running: |
||||||
|
try: |
||||||
|
await self.request_scan() |
||||||
|
await self._get_available_networks() |
||||||
|
await asyncio.sleep(30) |
||||||
|
except asyncio.CancelledError: |
||||||
|
break |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Scan failed: {e}") |
||||||
|
await asyncio.sleep(5) |
||||||
|
|
||||||
|
async def _setup_signals(self, device_path: str) -> None: |
||||||
|
rules = [ |
||||||
|
f"type='signal',interface='{NM_PROPERTIES_IFACE}',member='PropertiesChanged',path='{device_path}'", |
||||||
|
f"type='signal',interface='{NM_DEVICE_IFACE}',member='StateChanged',path='{device_path}'", |
||||||
|
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='NewConnection',path='{NM_SETTINGS_PATH}'", |
||||||
|
f"type='signal',interface='{NM_SETTINGS_IFACE}',member='ConnectionRemoved',path='{NM_SETTINGS_PATH}'", |
||||||
|
] |
||||||
|
for rule in rules: |
||||||
|
await self._add_match_rule(rule) |
||||||
|
|
||||||
|
# Set up signal handlers |
||||||
|
self.device_proxy.get_interface(NM_PROPERTIES_IFACE).on_properties_changed(self._on_properties_changed) |
||||||
|
self.device_proxy.get_interface(NM_DEVICE_IFACE).on_state_changed(self._on_state_changed) |
||||||
|
|
||||||
|
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) |
||||||
|
settings_iface.on_new_connection(self._on_new_connection) |
||||||
|
settings_iface.on_connection_removed(self._on_connection_removed) |
||||||
|
|
||||||
|
def _on_properties_changed(self, interface: str, changed: dict, invalidated: list): |
||||||
|
# print("property changed", interface, changed, invalidated) |
||||||
|
if 'LastScan' in changed: |
||||||
|
asyncio.create_task(self._get_available_networks()) |
||||||
|
elif interface == NM_WIRELESS_IFACE and "ActiveAccessPoint" in changed: |
||||||
|
self.active_ap_path = changed["ActiveAccessPoint"].value |
||||||
|
asyncio.create_task(self._get_available_networks()) |
||||||
|
|
||||||
|
def _on_state_changed(self, new_state: int, old_state: int, reason: int): |
||||||
|
print(f"State changed: {old_state} -> {new_state}, reason: {reason}") |
||||||
|
if new_state == NMDeviceState.ACTIVATED: |
||||||
|
if self.callbacks.activated: |
||||||
|
self.callbacks.activated() |
||||||
|
asyncio.create_task(self._update_connection_status()) |
||||||
|
self._current_connection_ssid = None |
||||||
|
elif new_state in (NMDeviceState.DISCONNECTED, NMDeviceState.NEED_AUTH): |
||||||
|
for network in self.networks: |
||||||
|
network.is_connected = False |
||||||
|
if new_state == NMDeviceState.NEED_AUTH and reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and self.callbacks.need_auth: |
||||||
|
if self._current_connection_ssid: |
||||||
|
self.callbacks.need_auth(self._current_connection_ssid) |
||||||
|
else: |
||||||
|
# Try to find the network from active_ap_path |
||||||
|
for network in self.networks: |
||||||
|
if network.path == self.active_ap_path: |
||||||
|
self.callbacks.need_auth(network.ssid) |
||||||
|
break |
||||||
|
else: |
||||||
|
# Couldn't identify the network that needs auth |
||||||
|
cloudlog.error("Network needs authentication but couldn't identify which one") |
||||||
|
|
||||||
|
def _on_new_connection(self, path: str) -> None: |
||||||
|
"""Callback for NewConnection signal.""" |
||||||
|
print(f"New connection added: {path}") |
||||||
|
asyncio.create_task(self._add_saved_connection(path)) |
||||||
|
|
||||||
|
def _on_connection_removed(self, path: str) -> None: |
||||||
|
"""Callback for ConnectionRemoved signal.""" |
||||||
|
print(f"Connection removed: {path}") |
||||||
|
for ssid, p in list(self.saved_connections.items()): |
||||||
|
if path == p: |
||||||
|
del self.saved_connections[ssid] |
||||||
|
if self.callbacks.forgotten: |
||||||
|
self.callbacks.forgotten() |
||||||
|
break |
||||||
|
|
||||||
|
async def _add_saved_connection(self, path: str) -> None: |
||||||
|
"""Add a new saved connection to the dictionary.""" |
||||||
|
try: |
||||||
|
settings = await self._get_connection_settings(path) |
||||||
|
if ssid := self._extract_ssid(settings): |
||||||
|
self.saved_connections[ssid] = path |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Failed to add connection {path}: {e}") |
||||||
|
|
||||||
|
def _extract_ssid(self, settings: dict) -> str | None: |
||||||
|
"""Extract SSID from connection settings.""" |
||||||
|
ssid_variant = settings.get('802-11-wireless', {}).get('ssid', Variant('ay', b'')).value |
||||||
|
return ''.join(chr(b) for b in ssid_variant) if ssid_variant else None |
||||||
|
|
||||||
|
async def _update_connection_status(self): |
||||||
|
self.active_ap_path = await self.get_active_access_point() |
||||||
|
await self._get_available_networks() |
||||||
|
|
||||||
|
async def _add_match_rule(self, rule): |
||||||
|
"""Add a match rule on the bus.""" |
||||||
|
reply = await self.bus.call( |
||||||
|
Message( |
||||||
|
message_type=MessageType.METHOD_CALL, |
||||||
|
destination='org.freedesktop.DBus', |
||||||
|
interface="org.freedesktop.DBus", |
||||||
|
path='/org/freedesktop/DBus', |
||||||
|
member='AddMatch', |
||||||
|
signature='s', |
||||||
|
body=[rule], |
||||||
|
) |
||||||
|
) |
||||||
|
|
||||||
|
assert reply.message_type == MessageType.METHOD_RETURN |
||||||
|
return reply |
||||||
|
|
||||||
|
async def _get_available_networks(self): |
||||||
|
"""Get a list of available networks via NetworkManager.""" |
||||||
|
wifi_iface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) |
||||||
|
access_points = await wifi_iface.get_access_points() |
||||||
|
network_dict = {} |
||||||
|
for ap_path in access_points: |
||||||
|
try: |
||||||
|
props_iface = await self._get_interface(NM, ap_path, NM_PROPERTIES_IFACE) |
||||||
|
properties = await props_iface.call_get_all('org.freedesktop.NetworkManager.AccessPoint') |
||||||
|
ssid_variant = properties['Ssid'].value |
||||||
|
ssid = ''.join(chr(byte) for byte in ssid_variant) |
||||||
|
if not ssid: |
||||||
|
continue |
||||||
|
|
||||||
|
bssid = properties.get('HwAddress', Variant('s', '')).value |
||||||
|
strength = properties['Strength'].value |
||||||
|
flags = properties['Flags'].value |
||||||
|
wpa_flags = properties['WpaFlags'].value |
||||||
|
rsn_flags = properties['RsnFlags'].value |
||||||
|
existing_network = network_dict.get(ssid) |
||||||
|
if not existing_network or ((not existing_network.bssid and bssid) or (existing_network.strength < strength)): |
||||||
|
network_dict[ssid] = NetworkInfo( |
||||||
|
ssid=ssid, |
||||||
|
strength=strength, |
||||||
|
security_type=self._get_security_type(flags, wpa_flags, rsn_flags), |
||||||
|
path=ap_path, |
||||||
|
bssid=bssid, |
||||||
|
is_connected=self.active_ap_path == ap_path, |
||||||
|
) |
||||||
|
|
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Error fetching networks: {e}") |
||||||
|
except Exception as e: |
||||||
|
cloudlog.error({e}) |
||||||
|
|
||||||
|
self.networks = sorted( |
||||||
|
network_dict.values(), |
||||||
|
key=lambda network: ( |
||||||
|
not network.is_connected, |
||||||
|
-network.strength, # Higher signal strength first |
||||||
|
network.ssid.lower(), |
||||||
|
), |
||||||
|
) |
||||||
|
|
||||||
|
async def _get_connection_settings(self, path): |
||||||
|
"""Fetch connection settings for a specific connection path.""" |
||||||
|
try: |
||||||
|
connection_proxy = await self.bus.introspect(NM, path) |
||||||
|
connection = self.bus.get_proxy_object(NM, path, connection_proxy) |
||||||
|
settings = connection.get_interface(NM_CONNECTION_IFACE) |
||||||
|
return await settings.call_get_settings() |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Failed to get settings for {path}: {str(e)}") |
||||||
|
return {} |
||||||
|
|
||||||
|
async def _process_chunk(self, paths_chunk): |
||||||
|
"""Process a chunk of connection paths.""" |
||||||
|
tasks = [self._get_connection_settings(path) for path in paths_chunk] |
||||||
|
return await asyncio.gather(*tasks, return_exceptions=True) |
||||||
|
|
||||||
|
async def _get_saved_connections(self) -> dict[str, str]: |
||||||
|
try: |
||||||
|
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) |
||||||
|
connection_paths = await settings_iface.call_list_connections() |
||||||
|
saved_ssids: dict[str, str] = {} |
||||||
|
batch_size = 20 |
||||||
|
for i in range(0, len(connection_paths), batch_size): |
||||||
|
chunk = connection_paths[i : i + batch_size] |
||||||
|
results = await self._process_chunk(chunk) |
||||||
|
for path, config in zip(chunk, results, strict=True): |
||||||
|
if isinstance(config, dict) and '802-11-wireless' in config: |
||||||
|
if ssid := self._extract_ssid(config): |
||||||
|
saved_ssids[ssid] = path |
||||||
|
return saved_ssids |
||||||
|
except DBusError as e: |
||||||
|
cloudlog.error(f"Error fetching saved connections: {str(e)}") |
||||||
|
return {} |
||||||
|
|
||||||
|
async def _get_interface(self, bus_name: str, path: str, name: str): |
||||||
|
introspection = await self.bus.introspect(bus_name, path) |
||||||
|
proxy = self.bus.get_proxy_object(bus_name, path, introspection) |
||||||
|
return proxy.get_interface(name) |
||||||
|
|
||||||
|
def _get_security_type(self, flags: int, wpa_flags: int, rsn_flags: int) -> SecurityType: |
||||||
|
"""Determine the security type based on flags.""" |
||||||
|
if flags == 0 and not (wpa_flags or rsn_flags): |
||||||
|
return SecurityType.OPEN |
||||||
|
if rsn_flags & 0x200: # SAE (WPA3 Personal) |
||||||
|
return SecurityType.WPA3 |
||||||
|
if rsn_flags: # RSN indicates WPA2 or higher |
||||||
|
return SecurityType.WPA2 |
||||||
|
if wpa_flags: # WPA flags indicate WPA |
||||||
|
return SecurityType.WPA |
||||||
|
return SecurityType.UNSUPPORTED |
||||||
|
|
||||||
|
|
||||||
|
class WifiManagerWrapper: |
||||||
|
def __init__(self): |
||||||
|
self._manager: WifiManager | None = None |
||||||
|
self._callbacks: WifiManagerCallbacks = WifiManagerCallbacks() |
||||||
|
|
||||||
|
self._thread = threading.Thread(target=self._run, daemon=True) |
||||||
|
self._loop: asyncio.EventLoop | None = None |
||||||
|
self._running = False |
||||||
|
|
||||||
|
def set_callbacks(self, callbacks: WifiManagerCallbacks): |
||||||
|
self._callbacks = callbacks |
||||||
|
|
||||||
|
def start(self) -> None: |
||||||
|
if not self._running: |
||||||
|
self._thread.start() |
||||||
|
while self._thread is not None and not self._running: |
||||||
|
time.sleep(0.1) |
||||||
|
|
||||||
|
def _run(self): |
||||||
|
self._loop = asyncio.new_event_loop() |
||||||
|
asyncio.set_event_loop(self._loop) |
||||||
|
|
||||||
|
try: |
||||||
|
self._manager = WifiManager(self._callbacks) |
||||||
|
self._running = True |
||||||
|
self._loop.run_forever() |
||||||
|
except Exception as e: |
||||||
|
cloudlog.error(f"Error in WifiManagerWrapper thread: {e}") |
||||||
|
finally: |
||||||
|
if self._loop.is_running(): |
||||||
|
self._loop.stop() |
||||||
|
self._running = False |
||||||
|
|
||||||
|
def shutdown(self) -> None: |
||||||
|
if self._running: |
||||||
|
if self._manager is not None: |
||||||
|
self._run_coroutine(self._manager.shutdown()) |
||||||
|
if self._loop and self._loop.is_running(): |
||||||
|
self._loop.call_soon_threadsafe(self._loop.stop) |
||||||
|
if self._thread and self._thread.is_alive(): |
||||||
|
self._thread.join(timeout=2.0) |
||||||
|
self._running = False |
||||||
|
|
||||||
|
@property |
||||||
|
def networks(self) -> list[NetworkInfo]: |
||||||
|
"""Get the current list of networks.""" |
||||||
|
return self._run_coroutine_sync(lambda manager: manager.networks.copy(), default=[]) |
||||||
|
|
||||||
|
def is_saved(self, ssid: str) -> bool: |
||||||
|
"""Check if a network is saved.""" |
||||||
|
return self._run_coroutine_sync(lambda manager: manager.is_saved(ssid), default=False) |
||||||
|
|
||||||
|
def connect(self): |
||||||
|
"""Connect to DBus and start Wi-Fi scanning.""" |
||||||
|
if not self._manager: |
||||||
|
return |
||||||
|
self._run_coroutine(self._manager.connect()) |
||||||
|
|
||||||
|
def request_scan(self): |
||||||
|
"""Request a scan for Wi-Fi networks.""" |
||||||
|
if not self._manager: |
||||||
|
return |
||||||
|
self._run_coroutine(self._manager.request_scan()) |
||||||
|
|
||||||
|
def forget_connection(self, ssid: str): |
||||||
|
"""Forget a saved Wi-Fi connection.""" |
||||||
|
if not self._manager: |
||||||
|
return |
||||||
|
self._run_coroutine(self._manager.forget_connection(ssid)) |
||||||
|
|
||||||
|
def activate_connection(self, ssid: str): |
||||||
|
"""Activate an existing Wi-Fi connection.""" |
||||||
|
if not self._manager: |
||||||
|
return |
||||||
|
self._run_coroutine(self._manager.activate_connection(ssid)) |
||||||
|
|
||||||
|
def connect_to_network(self, ssid: str, password: str = None, bssid: str = None, is_hidden: bool = False): |
||||||
|
"""Connect to a Wi-Fi network.""" |
||||||
|
if not self._manager: |
||||||
|
return |
||||||
|
self._run_coroutine(self._manager.connect_to_network(ssid, password, bssid, is_hidden)) |
||||||
|
|
||||||
|
def _run_coroutine(self, coro): |
||||||
|
"""Run a coroutine in the async thread.""" |
||||||
|
if not self._running or not self._loop: |
||||||
|
cloudlog.error("WifiManager thread is not running") |
||||||
|
return |
||||||
|
asyncio.run_coroutine_threadsafe(coro, self._loop) |
||||||
|
|
||||||
|
def _run_coroutine_sync(self, func: Callable[[WifiManager], T], default: T) -> T: |
||||||
|
"""Run a function synchronously in the async thread.""" |
||||||
|
if not self._running or not self._loop or not self._manager: |
||||||
|
return default |
||||||
|
future = concurrent.futures.Future[T]() |
||||||
|
|
||||||
|
def wrapper(manager: WifiManager) -> None: |
||||||
|
try: |
||||||
|
future.set_result(func(manager)) |
||||||
|
except Exception as e: |
||||||
|
future.set_exception(e) |
||||||
|
|
||||||
|
try: |
||||||
|
self._loop.call_soon_threadsafe(wrapper, self._manager) |
||||||
|
return future.result(timeout=1.0) |
||||||
|
except Exception as e: |
||||||
|
cloudlog.error(f"WifiManagerWrapper property access failed: {e}") |
||||||
|
return default |
@ -0,0 +1,58 @@ |
|||||||
|
import threading |
||||||
|
import time |
||||||
|
import os |
||||||
|
from typing import Generic, Protocol, TypeVar |
||||||
|
from openpilot.common.swaglog import cloudlog |
||||||
|
from openpilot.system.ui.lib.application import gui_app |
||||||
|
|
||||||
|
|
||||||
|
class RendererProtocol(Protocol): |
||||||
|
def render(self): ... |
||||||
|
|
||||||
|
|
||||||
|
R = TypeVar("R", bound=RendererProtocol) |
||||||
|
|
||||||
|
|
||||||
|
class BaseWindow(Generic[R]): |
||||||
|
def __init__(self, title: str): |
||||||
|
self._title = title |
||||||
|
self._renderer: R | None = None |
||||||
|
self._stop_event = threading.Event() |
||||||
|
self._thread = threading.Thread(target=self._run) |
||||||
|
self._thread.start() |
||||||
|
|
||||||
|
# wait for the renderer to be initialized |
||||||
|
while self._renderer is None and self._thread.is_alive(): |
||||||
|
time.sleep(0.01) |
||||||
|
|
||||||
|
def _create_renderer(self) -> R: |
||||||
|
raise NotImplementedError() |
||||||
|
|
||||||
|
def _run(self): |
||||||
|
if os.getenv("CI") is not None: |
||||||
|
return |
||||||
|
gui_app.init_window(self._title) |
||||||
|
self._renderer = self._create_renderer() |
||||||
|
try: |
||||||
|
for _ in gui_app.render(): |
||||||
|
if self._stop_event.is_set(): |
||||||
|
break |
||||||
|
self._renderer.render() |
||||||
|
finally: |
||||||
|
gui_app.close() |
||||||
|
|
||||||
|
def __enter__(self): |
||||||
|
return self |
||||||
|
|
||||||
|
def close(self): |
||||||
|
if self._thread.is_alive(): |
||||||
|
self._stop_event.set() |
||||||
|
self._thread.join(timeout=2.0) |
||||||
|
if self._thread.is_alive(): |
||||||
|
cloudlog.warning(f"Failed to join {self._title} thread") |
||||||
|
|
||||||
|
def __del__(self): |
||||||
|
self.close() |
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb): |
||||||
|
self.close() |
@ -0,0 +1,177 @@ |
|||||||
|
from dataclasses import dataclass |
||||||
|
from typing import Literal |
||||||
|
|
||||||
|
import pyray as rl |
||||||
|
from openpilot.system.ui.lib.wifi_manager import NetworkInfo, WifiManagerCallbacks, WifiManagerWrapper |
||||||
|
from openpilot.system.ui.lib.application import gui_app |
||||||
|
from openpilot.system.ui.lib.button import gui_button |
||||||
|
from openpilot.system.ui.lib.label import gui_label |
||||||
|
from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel |
||||||
|
from openpilot.system.ui.widgets.keyboard import Keyboard |
||||||
|
from openpilot.system.ui.widgets.confirm_dialog import confirm_dialog |
||||||
|
|
||||||
|
NM_DEVICE_STATE_NEED_AUTH = 60 |
||||||
|
ITEM_HEIGHT = 160 |
||||||
|
|
||||||
|
|
||||||
|
@dataclass |
||||||
|
class StateIdle: |
||||||
|
action: Literal["idle"] = "idle" |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class StateConnecting: |
||||||
|
network: NetworkInfo |
||||||
|
action: Literal["connecting"] = "connecting" |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class StateNeedsAuth: |
||||||
|
network: NetworkInfo |
||||||
|
action: Literal["needs_auth"] = "needs_auth" |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class StateShowForgetConfirm: |
||||||
|
network: NetworkInfo |
||||||
|
action: Literal["show_forget_confirm"] = "show_forget_confirm" |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class StateForgetting: |
||||||
|
network: NetworkInfo |
||||||
|
action: Literal["forgetting"] = "forgetting" |
||||||
|
|
||||||
|
UIState = StateIdle | StateConnecting | StateNeedsAuth | StateShowForgetConfirm | StateForgetting |
||||||
|
|
||||||
|
|
||||||
|
class WifiManagerUI: |
||||||
|
def __init__(self, wifi_manager: WifiManagerWrapper): |
||||||
|
self.state: UIState = StateIdle() |
||||||
|
self.btn_width = 200 |
||||||
|
self.scroll_panel = GuiScrollPanel() |
||||||
|
self.keyboard = Keyboard() |
||||||
|
|
||||||
|
self.wifi_manager = wifi_manager |
||||||
|
self.wifi_manager.set_callbacks(WifiManagerCallbacks(self._on_need_auth, self._on_activated, self._on_forgotten)) |
||||||
|
self.wifi_manager.start() |
||||||
|
self.wifi_manager.connect() |
||||||
|
|
||||||
|
def render(self, rect: rl.Rectangle): |
||||||
|
if not self.wifi_manager.networks: |
||||||
|
gui_label(rect, "Scanning Wi-Fi networks...", 72, alignment=rl.GuiTextAlignment.TEXT_ALIGN_CENTER) |
||||||
|
return |
||||||
|
|
||||||
|
match self.state: |
||||||
|
case StateNeedsAuth(network): |
||||||
|
result = self.keyboard.render(rect, "Enter password", f"for {network.ssid}") |
||||||
|
if result == 1: |
||||||
|
self.connect_to_network(network, self.keyboard.text) |
||||||
|
elif result == 0: |
||||||
|
self.state = StateIdle() |
||||||
|
|
||||||
|
case StateShowForgetConfirm(network): |
||||||
|
result = confirm_dialog(rect, f'Forget Wi-Fi Network "{network.ssid}"?', "Forget") |
||||||
|
if result == 1: |
||||||
|
self.forget_network(network) |
||||||
|
elif result == 0: |
||||||
|
self.state = StateIdle() |
||||||
|
|
||||||
|
case _: |
||||||
|
self._draw_network_list(rect) |
||||||
|
|
||||||
|
def _draw_network_list(self, rect: rl.Rectangle): |
||||||
|
content_rect = rl.Rectangle(rect.x, rect.y, rect.width, len(self.wifi_manager.networks) * ITEM_HEIGHT) |
||||||
|
offset = self.scroll_panel.handle_scroll(rect, content_rect) |
||||||
|
clicked = self.scroll_panel.is_click_valid() |
||||||
|
|
||||||
|
rl.begin_scissor_mode(int(rect.x), int(rect.y), int(rect.width), int(rect.height)) |
||||||
|
for i, network in enumerate(self.wifi_manager.networks): |
||||||
|
y_offset = rect.y + i * ITEM_HEIGHT + offset.y |
||||||
|
item_rect = rl.Rectangle(rect.x, y_offset, rect.width, ITEM_HEIGHT) |
||||||
|
if not rl.check_collision_recs(item_rect, rect): |
||||||
|
continue |
||||||
|
|
||||||
|
self._draw_network_item(item_rect, network, clicked) |
||||||
|
if i < len(self.wifi_manager.networks) - 1: |
||||||
|
line_y = int(item_rect.y + item_rect.height - 1) |
||||||
|
rl.draw_line(int(item_rect.x), int(line_y), int(item_rect.x + item_rect.width), line_y, rl.LIGHTGRAY) |
||||||
|
|
||||||
|
rl.end_scissor_mode() |
||||||
|
|
||||||
|
def _draw_network_item(self, rect, network: NetworkInfo, clicked: bool): |
||||||
|
label_rect = rl.Rectangle(rect.x, rect.y, rect.width - self.btn_width * 2, ITEM_HEIGHT) |
||||||
|
state_rect = rl.Rectangle(rect.x + rect.width - self.btn_width * 2 - 150, rect.y, 300, ITEM_HEIGHT) |
||||||
|
|
||||||
|
gui_label(label_rect, network.ssid, 55) |
||||||
|
|
||||||
|
status_text = "" |
||||||
|
if network.is_connected: |
||||||
|
status_text = "Connected" |
||||||
|
match self.state: |
||||||
|
case StateConnecting(network=connecting): |
||||||
|
if connecting.ssid == network.ssid: |
||||||
|
status_text = "CONNECTING..." |
||||||
|
case StateForgetting(network=forgetting): |
||||||
|
if forgetting.ssid == network.ssid: |
||||||
|
status_text = "FORGETTING..." |
||||||
|
if status_text: |
||||||
|
rl.gui_label(state_rect, status_text) |
||||||
|
|
||||||
|
# If the network is saved, show the "Forget" button |
||||||
|
if self.wifi_manager.is_saved(network.ssid): |
||||||
|
forget_btn_rect = rl.Rectangle( |
||||||
|
rect.x + rect.width - self.btn_width, |
||||||
|
rect.y + (ITEM_HEIGHT - 80) / 2, |
||||||
|
self.btn_width, |
||||||
|
80, |
||||||
|
) |
||||||
|
if isinstance(self.state, StateIdle) and gui_button(forget_btn_rect, "Forget") and clicked: |
||||||
|
self.state = StateShowForgetConfirm(network) |
||||||
|
|
||||||
|
if isinstance(self.state, StateIdle) and rl.check_collision_point_rec(rl.get_mouse_position(), label_rect) and clicked: |
||||||
|
if not self.wifi_manager.is_saved(network.ssid): |
||||||
|
self.state = StateNeedsAuth(network) |
||||||
|
else: |
||||||
|
self.connect_to_network(network) |
||||||
|
|
||||||
|
def connect_to_network(self, network: NetworkInfo, password=''): |
||||||
|
self.state = StateConnecting(network) |
||||||
|
if self.wifi_manager.is_saved(network.ssid) and not password: |
||||||
|
self.wifi_manager.activate_connection(network.ssid) |
||||||
|
else: |
||||||
|
self.wifi_manager.connect_to_network(network.ssid, password) |
||||||
|
|
||||||
|
def forget_network(self, network: NetworkInfo): |
||||||
|
self.state = StateForgetting(network) |
||||||
|
self.wifi_manager.forget_connection(network.ssid) |
||||||
|
|
||||||
|
def _on_need_auth(self, ssid): |
||||||
|
match self.state: |
||||||
|
case StateConnecting(ssid): |
||||||
|
self.state = StateNeedsAuth(ssid) |
||||||
|
case _: |
||||||
|
# Find network by SSID |
||||||
|
network = next((n for n in self.wifi_manager.networks if n.ssid == ssid), None) |
||||||
|
if network: |
||||||
|
self.state = StateNeedsAuth(network) |
||||||
|
|
||||||
|
def _on_activated(self): |
||||||
|
if isinstance(self.state, StateConnecting): |
||||||
|
self.state = StateIdle() |
||||||
|
|
||||||
|
def _on_forgotten(self): |
||||||
|
if isinstance(self.state, StateForgetting): |
||||||
|
self.state = StateIdle() |
||||||
|
|
||||||
|
|
||||||
|
def main(): |
||||||
|
gui_app.init_window("Wi-Fi Manager") |
||||||
|
wifi_manager = WifiManagerWrapper() |
||||||
|
wifi_ui = WifiManagerUI(wifi_manager) |
||||||
|
|
||||||
|
for _ in gui_app.render(): |
||||||
|
wifi_ui.render(rl.Rectangle(50, 50, gui_app.width - 100, gui_app.height - 100)) |
||||||
|
|
||||||
|
wifi_manager.shutdown() |
||||||
|
gui_app.close() |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
main() |
@ -1,8 +0,0 @@ |
|||||||
#!/usr/bin/env expect |
|
||||||
spawn adb shell |
|
||||||
expect "#" |
|
||||||
send "cd data/openpilot\r" |
|
||||||
send "export TERM=xterm-256color\r" |
|
||||||
send "su comma\r" |
|
||||||
send "clear\r" |
|
||||||
interact |
|
@ -0,0 +1,17 @@ |
|||||||
|
#!/usr/bin/env python3 |
||||||
|
import sys |
||||||
|
from openpilot.tools.lib.logreader import LogReader |
||||||
|
|
||||||
|
|
||||||
|
def main(): |
||||||
|
if len(sys.argv) != 2: |
||||||
|
print("Usage: python auto_source.py <log_path>") |
||||||
|
sys.exit(1) |
||||||
|
|
||||||
|
log_path = sys.argv[1] |
||||||
|
lr = LogReader(log_path, sort_by_time=True) |
||||||
|
print("\n".join(lr.logreader_identifiers)) |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
main() |
@ -0,0 +1,7 @@ |
|||||||
|
#!/usr/bin/env bash |
||||||
|
set -e |
||||||
|
|
||||||
|
# this is a little nicer than "adb shell" since |
||||||
|
# "adb shell" doesn't do full terminal emulation |
||||||
|
adb forward tcp:2222 tcp:22 |
||||||
|
ssh comma@localhost -p 2222 |
Loading…
Reference in new issue