@ -15,6 +15,7 @@ from jeepney.low_level import MessageType
from jeepney . wrappers import Properties
from openpilot . common . swaglog import cloudlog
from openpilot . common . params import Params
from openpilot . system . ui . lib . networkmanager import ( NM , NM_WIRELESS_IFACE , NM_802_11_AP_SEC_PAIR_WEP40 ,
NM_802_11_AP_SEC_PAIR_WEP104 , NM_802_11_AP_SEC_GROUP_WEP40 ,
NM_802_11_AP_SEC_GROUP_WEP104 , NM_802_11_AP_SEC_KEY_MGMT_PSK ,
@ -23,8 +24,8 @@ from openpilot.system.ui.lib.networkmanager import (NM, NM_WIRELESS_IFACE, NM_80
NM_PATH , NM_IFACE , NM_ACCESS_POINT_IFACE , NM_SETTINGS_PATH ,
NM_SETTINGS_IFACE , NM_CONNECTION_IFACE , NM_DEVICE_IFACE ,
NM_DEVICE_TYPE_WIFI , NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT ,
NM_DEVICE_STATE_REASON_NEW_ACTIVATION ,
NMDeviceState )
NM_DEVICE_STATE_REASON_NEW_ACTIVATION , NM_ACTIVE_CONNECTION_IFACE ,
NM_IP4_CONFIG_IFACE , NM DeviceState )
TETHERING_IP_ADDRESS = " 192.168.43.1 "
DEFAULT_TETHERING_PASSWORD = " swagswagcomma "
@ -40,6 +41,12 @@ class SecurityType(IntEnum):
UNSUPPORTED = 4
class MeteredType ( IntEnum ) :
UNKNOWN = 0
YES = 1
NO = 2
def get_security_type ( flags : int , wpa_flags : int , rsn_flags : int ) - > SecurityType :
wpa_props = wpa_flags | rsn_flags
@ -114,7 +121,7 @@ class AccessPoint:
class WifiManager :
def __init__ ( self ) :
self . _networks = [ ] # a network can be comprised of multiple APs
self . _networks : list [ Network ] = [ ] # a network can be comprised of multiple APs
self . _active = True # used to not run when not in settings
self . _exit = False
@ -132,15 +139,23 @@ class WifiManager:
# State
self . _connecting_to_ssid : str = " "
self . _ipv4_address : str = " "
self . _current_network_metered : MeteredType = MeteredType . UNKNOWN
self . _tethering_password : str = " "
self . _last_network_update : float = 0.0
self . _callback_queue : list [ Callable ] = [ ]
self . _tethering_ssid = " weedle "
dongle_id = Params ( ) . get ( " DongleId " )
if dongle_id :
self . _tethering_ssid + = " - " + dongle_id [ : 4 ]
# Callbacks
self . _need_auth : Callable [ [ str ] , None ] | None = None
self . _activated : Callable [ [ ] , None ] | None = None
self . _forgotten : Callable [ [ ] , None ] | None = None
self . _networks_updated : Callable [ [ list [ Network ] ] , None ] | None = None
self . _disconnected : Callable [ [ ] , None ] | None = None
self . _need_auth : list [ Callable [ [ str ] , None ] ] = [ ]
self . _activated : list [ Callable [ [ ] , None ] ] = [ ]
self . _forgotten : list [ Callable [ [ ] , None ] ] = [ ]
self . _networks_updated : list [ Callable [ [ list [ Network ] ] , None ] ] = [ ]
self . _disconnected : list [ Callable [ [ ] , None ] ] = [ ]
self . _lock = threading . Lock ( )
@ -152,19 +167,37 @@ class WifiManager:
atexit . register ( self . stop )
def set_callbacks ( self , need_auth : Callable [ [ str ] , None ] ,
activated : Callable [ [ ] , None ] | None ,
forgotten : Callable [ [ ] , None ] ,
networks_updated : Callable [ [ list [ Network ] ] , None ] ,
disconnected : Callable [ [ ] , None ] ) :
self . _need_auth = need_auth
self . _activated = activated
self . _forgotten = forgotten
self . _networks_updated = networks_updated
self . _disconnected = disconnected
def _enqueue_callback ( self , cb : Callable , * args ) :
self . _callback_queue . append ( lambda : cb ( * args ) )
def set_callbacks ( self , need_auth : Callable [ [ str ] , None ] | None = None ,
activated : Callable [ [ ] , None ] | None = None ,
forgotten : Callable [ [ ] , None ] | None = None ,
networks_updated : Callable [ [ list [ Network ] ] , None ] | None = None ,
disconnected : Callable [ [ ] , None ] | None = None ) :
if need_auth is not None :
self . _need_auth . append ( need_auth )
if activated is not None :
self . _activated . append ( activated )
if forgotten is not None :
self . _forgotten . append ( forgotten )
if networks_updated is not None :
self . _networks_updated . append ( networks_updated )
if disconnected is not None :
self . _disconnected . append ( disconnected )
@property
def ipv4_address ( self ) - > str :
return self . _ipv4_address
@property
def current_network_metered ( self ) - > MeteredType :
return self . _current_network_metered
@property
def tethering_password ( self ) - > str :
return self . _tethering_password
def _enqueue_callbacks ( self , cbs : list [ Callable ] , * args ) :
for cb in cbs :
self . _callback_queue . append ( lambda _cb = cb : _cb ( * args ) )
def process_callbacks ( self ) :
# Call from UI thread to run any pending callbacks
@ -180,10 +213,13 @@ class WifiManager:
self . _last_network_update = 0.0
def _monitor_state ( self ) :
# TODO: make an initialize function to only wait in one place
device_path = self . _wait_for_wifi_device ( )
if device_path is None :
return
self . _tethering_password = self . _get_tethering_password ( )
rule = MatchRule (
type = " signal " ,
interface = NM_DEVICE_IFACE ,
@ -211,20 +247,18 @@ class WifiManager:
# BAD PASSWORD
if new_state == NMDeviceState . NEED_AUTH and change_reason == NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT and len ( self . _connecting_to_ssid ) :
self . forget_connection ( self . _connecting_to_ssid , block = True )
if self . _need_auth is not None :
self . _enqueue_callback ( self . _need_auth , self . _connecting_to_ssid )
self . _enqueue_callbacks ( self . _need_auth , self . _connecting_to_ssid )
self . _connecting_to_ssid = " "
elif new_state == NMDeviceState . ACTIVATED :
if self . _activated is not None :
if len ( self . _activated ) :
self . _update_networks ( )
self . _enqueue_callback ( self . _activated )
self . _enqueue_callbacks ( self . _activated )
self . _connecting_to_ssid = " "
elif new_state == NMDeviceState . DISCONNECTED and change_reason != NM_DEVICE_STATE_REASON_NEW_ACTIVATION :
self . _connecting_to_ssid = " "
if self . _disconnected is not None :
self . _enqueue_callback ( self . _disconnected )
self . _enqueue_callbacks ( self . _forgotten )
def _network_scanner ( self ) :
self . _wait_for_wifi_device ( )
@ -285,14 +319,24 @@ class WifiManager:
conns [ ssid ] = conn_path
return conns
def connect_to_network ( self , ssid : str , password : str ) :
def _get_active_connections ( self ) :
return self . _router_main . send_and_get_reply ( Properties ( self . _nm ) . get ( ' ActiveConnections ' ) ) . body [ 0 ] [ 1 ]
# TODO: use this
def _get_connection_settings ( self , conn_path : str ) - > dict :
conn_addr = DBusAddress ( conn_path , bus_name = NM , interface = NM_CONNECTION_IFACE )
reply = self . _router_main . send_and_get_reply ( new_method_call ( conn_addr , ' GetSettings ' ) )
if reply . header . message_type == MessageType . error :
cloudlog . warning ( f ' Failed to get connection settings: { reply } ' )
return { }
return dict ( reply . body [ 0 ] )
def connect_to_network ( self , ssid : str , password : str , hidden : bool = False ) :
def worker ( ) :
# Clear all connections that may already exist to the network we are connecting to
self . _connecting_to_ssid = ssid
self . forget_connection ( ssid , block = True )
is_hidden = False
connection = {
' connection ' : {
' type ' : ( ' s ' , ' 802-11-wireless ' ) ,
@ -302,7 +346,7 @@ class WifiManager:
} ,
' 802-11-wireless ' : {
' ssid ' : ( ' ay ' , ssid . encode ( " utf-8 " ) ) ,
' hidden ' : ( ' b ' , is_ hidden) ,
' hidden ' : ( ' b ' , hidden ) ,
' mode ' : ( ' s ' , ' infrastructure ' ) ,
} ,
' ipv4 ' : {
@ -332,9 +376,9 @@ class WifiManager:
conn_addr = DBusAddress ( conn_path , bus_name = NM , interface = NM_CONNECTION_IFACE )
self . _router_main . send_and_get_reply ( new_method_call ( conn_addr , ' Delete ' ) )
if self . _forgotten is not None :
if len ( self . _forgotten ) :
self . _update_networks ( )
self . _enqueue_callback ( self . _forgotten )
self . _enqueue_callbacks ( self . _forgotten )
if block :
worker ( )
@ -358,6 +402,137 @@ class WifiManager:
else :
threading . Thread ( target = worker , daemon = True ) . start ( )
def _deactivate_connection ( self , ssid : str ) :
for conn_path in self . _get_active_connections ( ) :
conn_addr = DBusAddress ( conn_path , bus_name = NM , interface = NM_ACTIVE_CONNECTION_IFACE )
specific_obj_path = self . _router_main . send_and_get_reply ( Properties ( conn_addr ) . get ( ' SpecificObject ' ) ) . body [ 0 ] [ 1 ]
if specific_obj_path != " / " :
ap_addr = DBusAddress ( specific_obj_path , bus_name = NM , interface = NM_ACCESS_POINT_IFACE )
ap_ssid = bytes ( self . _router_main . send_and_get_reply ( Properties ( ap_addr ) . get ( ' Ssid ' ) ) . body [ 0 ] [ 1 ] ) . decode ( " utf-8 " , " replace " )
if ap_ssid == ssid :
self . _router_main . send_and_get_reply ( new_method_call ( self . _nm , ' DeactivateConnection ' , ' o ' , ( conn_path , ) ) )
return
def is_tethering_active ( self ) - > bool :
for network in self . _networks :
if network . is_connected :
return bool ( network . ssid == self . _tethering_ssid )
return False
def set_tethering_password ( self , password : str ) :
def worker ( ) :
conn_path = self . _get_connections ( ) . get ( self . _tethering_ssid , None )
if conn_path is None :
cloudlog . warning ( ' No tethering connection found ' )
return
settings = self . _get_connection_settings ( conn_path )
if len ( settings ) == 0 :
cloudlog . warning ( f ' Failed to get tethering settings for { conn_path } ' )
return
settings [ ' 802-11-wireless-security ' ] [ ' psk ' ] = ( ' s ' , password )
conn_addr = DBusAddress ( conn_path , bus_name = NM , interface = NM_CONNECTION_IFACE )
reply = self . _router_main . send_and_get_reply ( new_method_call ( conn_addr , ' Update ' , ' a { sa {sv} } ' , ( settings , ) ) )
if reply . header . message_type == MessageType . error :
cloudlog . warning ( f ' Failed to update tethering settings: { reply } ' )
return
self . _tethering_password = password
if self . is_tethering_active ( ) :
self . activate_connection ( self . _tethering_ssid , block = True )
threading . Thread ( target = worker , daemon = True ) . start ( )
def _get_tethering_password ( self ) - > str :
conn_path = self . _get_connections ( ) . get ( self . _tethering_ssid , None )
if conn_path is None :
cloudlog . warning ( ' No tethering connection found ' )
return ' '
reply = self . _router_main . send_and_get_reply ( new_method_call (
DBusAddress ( conn_path , bus_name = NM , interface = NM_CONNECTION_IFACE ) ,
' GetSecrets ' , ' s ' , ( ' 802-11-wireless-security ' , )
) )
if reply . header . message_type == MessageType . error :
cloudlog . warning ( f ' Failed to get tethering password: { reply } ' )
return ' '
secrets = reply . body [ 0 ]
if ' 802-11-wireless-security ' not in secrets :
return ' '
return str ( secrets [ ' 802-11-wireless-security ' ] . get ( ' psk ' , ( ' s ' , ' ' ) ) [ 1 ] )
def set_tethering_active ( self , active : bool ) :
def worker ( ) :
if active :
self . activate_connection ( self . _tethering_ssid , block = True )
else :
self . _deactivate_connection ( self . _tethering_ssid )
threading . Thread ( target = worker , daemon = True ) . start ( )
def _update_current_network_metered ( self ) - > None :
if self . _wifi_device is None :
cloudlog . warning ( " No WiFi device found " )
return
self . _current_network_metered = MeteredType . UNKNOWN
for active_conn in self . _get_active_connections ( ) :
conn_addr = DBusAddress ( active_conn , bus_name = NM , interface = NM_ACTIVE_CONNECTION_IFACE )
conn_type = self . _router_main . send_and_get_reply ( Properties ( conn_addr ) . get ( ' Type ' ) ) . body [ 0 ] [ 1 ]
if conn_type == ' 802-11-wireless ' :
conn_path = self . _router_main . send_and_get_reply ( Properties ( conn_addr ) . get ( ' Connection ' ) ) . body [ 0 ] [ 1 ]
if conn_path == " / " :
continue
settings = self . _get_connection_settings ( conn_path )
if len ( settings ) == 0 :
cloudlog . warning ( f ' Failed to get connection settings for { conn_path } ' )
continue
metered_prop = settings [ ' connection ' ] . get ( ' metered ' , ( ' i ' , 0 ) ) [ 1 ]
if metered_prop == MeteredType . YES :
self . _current_network_metered = MeteredType . YES
elif metered_prop == MeteredType . NO :
self . _current_network_metered = MeteredType . NO
print ( ' current_network_metered ' , self . _current_network_metered )
return
def set_current_network_metered ( self , metered : MeteredType ) :
def worker ( ) :
for active_conn in self . _get_active_connections ( ) :
conn_addr = DBusAddress ( active_conn , bus_name = NM , interface = NM_ACTIVE_CONNECTION_IFACE )
conn_type = self . _router_main . send_and_get_reply ( Properties ( conn_addr ) . get ( ' Type ' ) ) . body [ 0 ] [ 1 ]
if conn_type == ' 802-11-wireless ' and not self . is_tethering_active ( ) :
conn_path = self . _router_main . send_and_get_reply ( Properties ( conn_addr ) . get ( ' Connection ' ) ) . body [ 0 ] [ 1 ]
if conn_path == " / " :
continue
settings = self . _get_connection_settings ( conn_path )
if len ( settings ) == 0 :
cloudlog . warning ( f ' Failed to get connection settings for { conn_path } ' )
return
settings [ ' connection ' ] [ ' metered ' ] = ( ' i ' , int ( metered ) )
conn_addr = DBusAddress ( conn_path , bus_name = NM , interface = NM_CONNECTION_IFACE )
reply = self . _router_main . send_and_get_reply ( new_method_call ( conn_addr , ' Update ' , ' a { sa {sv} } ' , ( settings , ) ) )
if reply . header . message_type == MessageType . error :
cloudlog . warning ( f ' Failed to update tethering settings: { reply } ' )
return
threading . Thread ( target = worker , daemon = True ) . start ( )
def _request_scan ( self ) :
if self . _wifi_device is None :
cloudlog . warning ( " No WiFi device found " )
@ -409,8 +584,32 @@ class WifiManager:
networks . sort ( key = lambda n : ( - n . is_connected , - n . strength , n . ssid . lower ( ) ) )
self . _networks = networks
if self . _networks_updated is not None :
self . _enqueue_callback ( self . _networks_updated , self . _networks )
self . _update_ipv4_address ( )
self . _update_current_network_metered ( )
self . _enqueue_callbacks ( self . _networks_updated , self . _networks )
def _update_ipv4_address ( self ) :
if self . _wifi_device is None :
cloudlog . warning ( " No WiFi device found " )
return
self . _ipv4_address = " "
for conn_path in self . _get_active_connections ( ) :
conn_addr = DBusAddress ( conn_path , bus_name = NM , interface = NM_ACTIVE_CONNECTION_IFACE )
conn_type = self . _router_main . send_and_get_reply ( Properties ( conn_addr ) . get ( ' Type ' ) ) . body [ 0 ] [ 1 ]
if conn_type == ' 802-11-wireless ' :
ip4config_path = self . _router_main . send_and_get_reply ( Properties ( conn_addr ) . get ( ' Ip4Config ' ) ) . body [ 0 ] [ 1 ]
if ip4config_path != " / " :
ip4config_addr = DBusAddress ( ip4config_path , bus_name = NM , interface = NM_IP4_CONFIG_IFACE )
address_data = self . _router_main . send_and_get_reply ( Properties ( ip4config_addr ) . get ( ' AddressData ' ) ) . body [ 0 ] [ 1 ]
for entry in address_data :
if ' address ' in entry :
self . _ipv4_address = entry [ ' address ' ] [ 1 ]
return
def __del__ ( self ) :
self . stop ( )