AP.from_dbus: actually handle incorrect paths w/ jeep + more efficient single call

pull/36039/head
Shane Smiskol 4 days ago
parent d00095b9c4
commit c3491efeb9
  1. 31
      system/ui/lib/wifi_manager.py

@ -1,4 +1,5 @@
import sys
from typing import Any
import atexit
import threading
import time
@ -12,6 +13,7 @@ from jeepney import DBusAddress, new_method_call
from jeepney.wrappers import DBusErrorResponse, Properties
from jeepney.bus_messages import message_bus, MatchRule
from jeepney.io.blocking import DBusConnection, open_dbus_connection
from jeepney.low_level import MessageType
from jeepney.io.threading import open_dbus_router, DBusRouter # , open_dbus_connection
from openpilot.common.swaglog import cloudlog
@ -91,15 +93,13 @@ class AccessPoint:
ap_path: str
@classmethod
def from_dbus(cls, conn: DBusConnection, ap_path: str, active_ap_path: str) -> "AccessPoint":
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE)
ssid = bytes(conn.send_and_get_reply(Properties(ap_addr).get('Ssid')).body[0][1]).decode("utf-8", "replace")
bssid = str(conn.send_and_get_reply(Properties(ap_addr).get('HwAddress')).body[0][1])
strength = int(conn.send_and_get_reply(Properties(ap_addr).get('Strength')).body[0][1])
flags = int(conn.send_and_get_reply(Properties(ap_addr).get('Flags')).body[0][1])
wpa_flags = int(conn.send_and_get_reply(Properties(ap_addr).get('WpaFlags')).body[0][1])
rsn_flags = int(conn.send_and_get_reply(Properties(ap_addr).get('RsnFlags')).body[0][1])
def from_dbus(cls, ap_props: dict[str, tuple[str, Any]], ap_path: str, active_ap_path: str) -> "AccessPoint":
ssid = bytes(ap_props['Ssid'][1]).decode("utf-8", "replace")
bssid = str(ap_props['HwAddress'][1])
strength = int(ap_props['Strength'][1])
flags = int(ap_props['Flags'][1])
wpa_flags = int(ap_props['WpaFlags'][1])
rsn_flags = int(ap_props['RsnFlags'][1])
return cls(
ssid=ssid,
@ -460,8 +460,15 @@ class WifiManager:
aps: dict[str, list[AccessPoint]] = {}
for ap_path in ap_paths:
ap_addr = DBusAddress(ap_path, NM, interface=NM_ACCESS_POINT_IFACE)
ap_props = self._conn_main.send_and_get_reply(Properties(ap_addr).get_all())
if ap_props.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get AP properties for {ap_path}")
continue
try:
ap = AccessPoint.from_dbus(self._conn_main, ap_path, active_ap_path)
ap = AccessPoint.from_dbus(ap_props.body[0], ap_path, active_ap_path)
if ap.ssid == "":
continue
@ -469,9 +476,9 @@ class WifiManager:
aps[ap.ssid] = []
aps[ap.ssid].append(ap)
except DBusErrorResponse:
except Exception:
# some APs have been seen dropping off during iteration
cloudlog.exception(f"Failed to get AP properties for {ap_path}")
cloudlog.exception(f"Failed to parse AP properties for {ap_path}")
known_connections = self._get_connections()
networks = [Network.from_dbus(ssid, ap_list, active_ap_path, self._connection_by_ssid(ssid, known_connections) is not None)

Loading…
Cancel
Save