@ -370,46 +370,47 @@ class WifiManager:
cloudlog . warning ( f " Failed to request scan: { reply } " )
def _update_networks ( self ) :
if self . _wifi_device is None :
cloudlog . warning ( " No WiFi device found " )
return
# returns '/' if no active AP
wifi_addr = DBusAddress ( self . _wifi_device , NM , interface = NM_WIRELESS_IFACE )
active_ap_path = self . _router_main . send_and_get_reply ( Properties ( wifi_addr ) . get ( ' ActiveAccessPoint ' ) ) . body [ 0 ] [ 1 ]
ap_paths = self . _router_main . send_and_get_reply ( new_method_call ( wifi_addr , ' GetAllAccessPoints ' ) ) . body [ 0 ]
with self . _lock :
if self . _wifi_device is None :
cloudlog . warning ( " No WiFi device found " )
return
aps : dict [ str , list [ AccessPoint ] ] = { }
# returns '/' if no active AP
wifi_addr = DBusAddress ( self . _wifi_device , NM , interface = NM_WIRELESS_IFACE )
active_ap_path = self . _router_main . send_and_get_reply ( Properties ( wifi_addr ) . get ( ' ActiveAccessPoint ' ) ) . body [ 0 ] [ 1 ]
ap_paths = self . _router_main . send_and_get_reply ( new_method_call ( wifi_addr , ' GetAllAccessPoints ' ) ) . body [ 0 ]
for ap_path in ap_paths :
ap_addr = DBusAddress ( ap_path , NM , interface = NM_ACCESS_POINT_IFACE )
ap_props = self . _router_main . send_and_get_reply ( Properties ( ap_addr ) . get_all ( ) )
aps : dict [ str , list [ AccessPoint ] ] = { }
# some APs have been seen dropping off during iteration
if ap_props . header . message_type == MessageType . error :
cloudlog . warning ( f " Failed to get AP properties for { ap_path } " )
continue
for ap_path in ap_paths :
ap_addr = DBusAddress ( ap_path , NM , interface = NM_ACCESS_POINT_IFACE )
ap_props = self . _router_main . send_and_get_reply ( Properties ( ap_addr ) . get_all ( ) )
try :
ap = AccessPoint . from_dbus ( ap_props . body [ 0 ] , ap_path , active_ap_path )
if ap . ssid == " " :
# some APs have been seen dropping off during iteration
if ap_props . header . message_type == MessageType . error :
cloudlog . warning ( f " Failed to get AP properties for { ap_path } " )
continue
if ap . ssid not in aps :
aps [ ap . ssid ] = [ ]
try :
ap = AccessPoint . from_dbus ( ap_props . body [ 0 ] , ap_path , active_ap_path )
if ap . ssid == " " :
continue
if ap . ssid not in aps :
aps [ ap . ssid ] = [ ]
aps [ ap . ssid ] . append ( ap )
except Exception :
# catch all for parsing errors
cloudlog . exception ( f " Failed to parse AP properties for { ap_path } " )
aps [ ap . ssid ] . append ( ap )
except Exception :
# catch all for parsing errors
cloudlog . exception ( f " Failed to parse AP properties for { ap_path } " )
known_connections = self . _get_connections ( )
networks = [ Network . from_dbus ( ssid , ap_list , ssid in known_connections ) for ssid , ap_list in aps . items ( ) ]
networks . sort ( key = lambda n : ( - n . is_connected , - n . strength , n . ssid . lower ( ) ) )
self . _networks = networks
known_connections = self . _get_connections ( )
networks = [ Network . from_dbus ( ssid , ap_list , ssid in known_connections ) for ssid , ap_list in aps . items ( ) ]
networks . sort ( key = lambda n : ( - n . is_connected , - n . strength , n . ssid . lower ( ) ) )
self . _networks = networks
if self . _networks_updated is not None :
self . _enqueue_callback ( self . _networks_updated , self . _networks )
if self . _networks_updated is not None :
self . _enqueue_callback ( self . _networks_updated , self . _networks )
def __del__ ( self ) :
self . stop ( )