_update_networks matches!

pull/36039/head
Shane Smiskol 4 days ago
parent 6da9b80cb7
commit bccafe7726
  1. 104
      system/ui/lib/wifi_manager.py

@ -12,7 +12,7 @@ from collections import deque
from jeepney import DBusAddress, new_method_call
from jeepney.wrappers import DBusErrorResponse, Properties
from jeepney.bus_messages import message_bus, MatchRule
from jeepney.io.blocking import open_dbus_connection
from jeepney.io.blocking import DBusConnection, open_dbus_connection
from jeepney.io.threading import open_dbus_router, DBusRouter # , open_dbus_connection
from openpilot.common.swaglog import cloudlog
@ -63,6 +63,22 @@ class Network:
security_type: SecurityType
is_saved: bool
@classmethod
def from_dbus_jeepney(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: dbus.ObjectPath, is_saved: bool) -> "Network":
# we only want to show the strongest AP for each Network/SSID
strongest_ap = max(aps, key=lambda ap: ap.strength)
is_connected = any(ap.ap_path == active_ap_path for ap in aps) # TODO: just any is_connected aps!
security_type = get_security_type(strongest_ap.flags, strongest_ap.wpa_flags, strongest_ap.rsn_flags)
return cls(
ssid=ssid,
strength=strongest_ap.strength,
is_connected=is_connected and is_saved,
security_type=security_type,
is_saved=is_saved,
)
@classmethod
def from_dbus(cls, ssid: str, aps: list["AccessPoint"], active_ap_path: dbus.ObjectPath, is_saved: bool) -> "Network":
# we only want to show the strongest AP for each Network/SSID
@ -91,6 +107,28 @@ class AccessPoint:
rsn_flags: int
ap_path: dbus.ObjectPath
@classmethod
def from_dbus_jeepney(cls, conn: DBusConnection, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint":
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE)
ssid = bytes(conn.send_and_get_reply(Properties(ap_addr).get('Ssid')).body[0][1]).decode("utf-8", "replace")
bssid = str(conn.send_and_get_reply(Properties(ap_addr).get('HwAddress')).body[0][1])
strength = int(conn.send_and_get_reply(Properties(ap_addr).get('Strength')).body[0][1])
flags = int(conn.send_and_get_reply(Properties(ap_addr).get('Flags')).body[0][1])
wpa_flags = int(conn.send_and_get_reply(Properties(ap_addr).get('WpaFlags')).body[0][1])
rsn_flags = int(conn.send_and_get_reply(Properties(ap_addr).get('RsnFlags')).body[0][1])
return cls(
ssid=ssid,
bssid=bssid,
strength=strength,
is_connected=ap_path == active_ap_path,
flags=flags,
wpa_flags=wpa_flags,
rsn_flags=rsn_flags,
ap_path=ap_path,
)
@classmethod
def from_dbus(cls, ap_props: dbus.Interface, ap_path: dbus.ObjectPath, active_ap_path: dbus.ObjectPath) -> "AccessPoint":
ssid = bytes(ap_props.Get(NM_ACCESS_POINT_IFACE, "Ssid")).decode("utf-8", "replace")
@ -155,6 +193,7 @@ class WifiManager:
self._lock = threading.Lock()
self._tmp_init()
sys.exit()
self._scan_thread = threading.Thread(target=self._network_scanner, daemon=True)
self._scan_thread.start()
@ -164,9 +203,14 @@ class WifiManager:
atexit.register(self.stop)
sys.exit()
def _tmp_init(self):
self._wait_for_wifi_device()
self._update_networks_old()
import copy
nets = copy.deepcopy(self._networks)
self._update_networks()
print('networks match:', nets == self._networks)
return
t = time.monotonic()
self.forget_connection('unifi', block=True)
@ -439,7 +483,7 @@ class WifiManager:
except DBusErrorResponse:
cloudlog.exception("Failed to request scan")
def _update_networks(self):
def _update_networks_old(self):
print('UPDATING NETWORKS!!!!')
if self._wifi_device is None:
@ -477,11 +521,61 @@ class WifiManager:
if self._networks_updated is not None:
self._networks_updated(self._networks)
def _update_networks(self):
print('UPDATING NETWORKS!!!!')
if self._wifi_device is None:
cloudlog.warning("No WiFi device found")
return
# wifi_iface = dbus.Interface(self._main_bus.get_object(NM, self._wifi_device), NM_WIRELESS_IFACE)
# dev_props = dbus.Interface(self._main_bus.get_object(NM, self._wifi_device), NM_PROPERTIES_IFACE)
# active_ap_path = dev_props.Get(NM_WIRELESS_IFACE, "ActiveAccessPoint")
# jeepney:
wifi_addr = DBusAddress(self._wifi_device, NM, interface=NM_WIRELESS_IFACE)
# returns '/' if no active AP
active_ap_path = self._conn_main.send_and_get_reply(Properties(wifi_addr).get('ActiveAccessPoint')).body[0][1]
print('got active_ap_path', active_ap_path)
ap_paths = self._conn_main.send_and_get_reply(new_method_call(wifi_addr, 'GetAllAccessPoints')).body[0]
print('ap paths', ap_paths)
aps: dict[str, list[AccessPoint]] = {}
for ap_path in ap_paths:
# try:
ap = AccessPoint.from_dbus_jeepney(self._conn_main, ap_path, active_ap_path)
if ap.ssid == "":
continue
if ap.ssid not in aps:
aps[ap.ssid] = []
aps[ap.ssid].append(ap)
# TODO: add back when seen
# except dbus.exceptions.DBusException:
# # some APs have been seen dropping off during iteration
# cloudlog.exception(f"Failed to get AP properties for {ap_path}")
known_connections = self._get_connections()
networks = [Network.from_dbus_jeepney(ssid, ap_list, active_ap_path, self._connection_by_ssid(ssid, known_connections) is not None)
for ssid, ap_list in aps.items()]
networks.sort(key=lambda n: (-n.is_connected, -n.strength, n.ssid.lower()))
self._networks = networks
if self._networks_updated is not None:
self._networks_updated(self._networks)
def __del__(self):
self.stop()
def stop(self):
self._exit = True
self._scan_thread.join()
self._state_thread.join()
# self._scan_thread.join()
# self._state_thread.join()
self._main_bus.close()
self._conn_main.close()
self._conn_monitor.close()

Loading…
Cancel
Save