|
|
|
@ -62,67 +62,6 @@ class WifiManagerCallbacks: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WifiManager: |
|
|
|
|
def __init__(self, callbacks): |
|
|
|
|
self.callbacks: WifiManagerCallbacks = callbacks |
|
|
|
|
self.networks: list[NetworkInfo] = [] |
|
|
|
|
self.bus: MessageBus = None |
|
|
|
|
self.device_path: str = "" |
|
|
|
|
self.device_proxy = None |
|
|
|
|
self.saved_connections: dict[str, str] = {} |
|
|
|
|
self.active_ap_path: str = "" |
|
|
|
|
self.scan_task: asyncio.Task | None = None |
|
|
|
|
# Set tethering ssid as "weedle" + first 4 characters of a dongle id |
|
|
|
|
self._tethering_ssid = "weedle" |
|
|
|
|
if Params is not None: |
|
|
|
|
dongle_id = Params().get("DongleId") |
|
|
|
|
if dongle_id: |
|
|
|
|
self._tethering_ssid += "-" + dongle_id[:4] |
|
|
|
|
self.running: bool = True |
|
|
|
|
self._current_connection_ssid: str | None = None |
|
|
|
|
|
|
|
|
|
async def shutdown(self) -> None: |
|
|
|
|
self.running = False |
|
|
|
|
if self.scan_task: |
|
|
|
|
self.scan_task.cancel() |
|
|
|
|
try: |
|
|
|
|
await self.scan_task |
|
|
|
|
except asyncio.CancelledError: |
|
|
|
|
pass |
|
|
|
|
if self.bus: |
|
|
|
|
self.bus.disconnect() |
|
|
|
|
|
|
|
|
|
async def _request_scan(self) -> None: |
|
|
|
|
try: |
|
|
|
|
interface = self.device_proxy.get_interface(NM_WIRELESS_IFACE) |
|
|
|
|
await interface.call_request_scan({}) |
|
|
|
|
except DBusError as e: |
|
|
|
|
cloudlog.warning(f"Scan request failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
async def get_active_access_point(self): |
|
|
|
|
try: |
|
|
|
|
props_iface = self.device_proxy.get_interface(NM_PROPERTIES_IFACE) |
|
|
|
|
ap_path = await props_iface.call_get(NM_WIRELESS_IFACE, 'ActiveAccessPoint') |
|
|
|
|
return ap_path.value |
|
|
|
|
except DBusError as e: |
|
|
|
|
cloudlog.error(f"Error fetching active access point: {str(e)}") |
|
|
|
|
return '' |
|
|
|
|
|
|
|
|
|
async def _find_wifi_device(self) -> bool: |
|
|
|
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
|
|
|
|
devices = await nm_iface.get_devices() |
|
|
|
|
|
|
|
|
|
for device_path in devices: |
|
|
|
|
device = await self.bus.introspect(NM, device_path) |
|
|
|
|
device_proxy = self.bus.get_proxy_object(NM, device_path, device) |
|
|
|
|
device_interface = device_proxy.get_interface(NM_DEVICE_IFACE) |
|
|
|
|
device_type = await device_interface.get_device_type() # type: ignore[attr-defined] |
|
|
|
|
if device_type == 2: # Wi-Fi device |
|
|
|
|
self.device_path = device_path |
|
|
|
|
self.device_proxy = device_proxy |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
async def add_tethering_connection(self, ssid: str, password: str = "12345678") -> bool: |
|
|
|
|
"""Create a WiFi tethering connection.""" |
|
|
|
|
if len(password) < 8: |
|
|
|
|