|
|
|
@ -7,9 +7,6 @@ from collections.abc import Callable |
|
|
|
|
from dataclasses import dataclass |
|
|
|
|
from enum import IntEnum |
|
|
|
|
|
|
|
|
|
from jeepney import DBusAddress, new_method_call |
|
|
|
|
from jeepney.io.blocking import open_dbus_connection |
|
|
|
|
|
|
|
|
|
from openpilot.common.swaglog import cloudlog |
|
|
|
|
from openpilot.system.ui.lib.networkmanager import * |
|
|
|
|
|
|
|
|
@ -114,17 +111,6 @@ class WifiManager: |
|
|
|
|
self._nm = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_IFACE) |
|
|
|
|
self._props = dbus.Interface(self._bus.get_object(NM, NM_PATH), NM_PROPERTIES_IFACE) |
|
|
|
|
|
|
|
|
|
# jeepney version |
|
|
|
|
self._bus2 = open_dbus_connection(bus="SYSTEM") |
|
|
|
|
self._nm2 = DBusAddress(NM_PATH, |
|
|
|
|
bus_name=NM, |
|
|
|
|
interface=NM_IFACE) |
|
|
|
|
self._props2 = DBusAddress(NM_PATH, |
|
|
|
|
bus_name=NM, |
|
|
|
|
interface=NM_PROPERTIES_IFACE) |
|
|
|
|
|
|
|
|
|
print('DBUS2', self._bus2, self._nm2, self._props2) |
|
|
|
|
|
|
|
|
|
# Callbacks |
|
|
|
|
self._networks_updated: Callable[[list[Network]], None] | None = None |
|
|
|
|
self._connection_failed: Callable[[str, str], None] | None = None |
|
|
|
@ -144,7 +130,6 @@ class WifiManager: |
|
|
|
|
self._running = False |
|
|
|
|
self._thread.join() |
|
|
|
|
self._bus.close() |
|
|
|
|
self._bus2.close() |
|
|
|
|
|
|
|
|
|
def _run(self): |
|
|
|
|
while True: |
|
|
|
@ -177,36 +162,6 @@ class WifiManager: |
|
|
|
|
# print(f"Got wifi device in {time.monotonic() - t}s: {wifi_device}") |
|
|
|
|
return wifi_device |
|
|
|
|
|
|
|
|
|
def _get_wifi_device2(self) -> str: |
|
|
|
|
# jeepney version |
|
|
|
|
t = time.monotonic() |
|
|
|
|
msg = new_method_call(self._nm2, "GetDevices") |
|
|
|
|
device_paths = self._bus2.send_and_get_reply(msg).body[0] |
|
|
|
|
# print(f'DEVICE PATHS: {device_paths}') |
|
|
|
|
|
|
|
|
|
wifi_device = None |
|
|
|
|
for device_path in device_paths: |
|
|
|
|
# Create a DBusAddress for the device |
|
|
|
|
dev_addr = DBusAddress(device_path, bus_name=NM, interface=NM_PROPERTIES_IFACE) |
|
|
|
|
print('dev_addr', dev_addr) |
|
|
|
|
|
|
|
|
|
# Build the Get(DeviceType) call |
|
|
|
|
msg = new_method_call(dev_addr, "Get", signature="ss", body=(NM_DEVICE_IFACE, "DeviceType")) |
|
|
|
|
|
|
|
|
|
# Send the message and get the reply |
|
|
|
|
reply = self._bus2.send_and_get_reply(msg) |
|
|
|
|
|
|
|
|
|
# The property value is wrapped in a variant, so extract it |
|
|
|
|
dev_type = reply.body[0][1] |
|
|
|
|
print(f'Device {device_path} type: {dev_type}') |
|
|
|
|
# print(f'Device {device_path} type: {dev_type} is wifi? {dev_type == NM_DEVICE_TYPE_WIFI}') |
|
|
|
|
|
|
|
|
|
if dev_type == NM_DEVICE_TYPE_WIFI: |
|
|
|
|
wifi_device = device_path |
|
|
|
|
break |
|
|
|
|
print(f"Got wifi device2 in {time.monotonic() - t}s: {wifi_device}") |
|
|
|
|
return wifi_device |
|
|
|
|
|
|
|
|
|
def connect_to_network(self, ssid: str, password: str | None): |
|
|
|
|
def worker(): |
|
|
|
|
t = time.monotonic() |
|
|
|
@ -273,7 +228,7 @@ class WifiManager: |
|
|
|
|
# TODO: only run this function on scan complete! |
|
|
|
|
print('UPDATING NETWORKS!!!!') |
|
|
|
|
|
|
|
|
|
device_path = self._get_wifi_device2() |
|
|
|
|
device_path = self._get_wifi_device() |
|
|
|
|
if device_path is None: |
|
|
|
|
cloudlog.warning("No WiFi device found") |
|
|
|
|
return |
|
|
|
|