raylib: wifi manager initialize function (#36203)

* init func

* rm print

* rm

* use get_conn settings in another place
pull/36204/head
Shane Smiskol 3 weeks ago committed by GitHub
parent 1ca9fe35c2
commit 2c377e534f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 95
      system/ui/lib/wifi_manager.py

@ -158,15 +158,26 @@ class WifiManager:
self._disconnected: list[Callable[[], None]] = []
self._lock = threading.Lock()
self._scan_thread = threading.Thread(target=self._network_scanner, daemon=True)
self._scan_thread.start()
self._state_thread = threading.Thread(target=self._monitor_state, daemon=True)
self._state_thread.start()
self._initialize()
atexit.register(self.stop)
def _initialize(self):
def worker():
self._wait_for_wifi_device()
self._scan_thread.start()
self._state_thread.start()
if self._tethering_ssid not in self._get_connections():
self._add_tethering_connection()
self._tethering_password = self._get_tethering_password()
cloudlog.debug("WifiManager initialized")
threading.Thread(target=worker, daemon=True).start()
def set_callbacks(self, need_auth: Callable[[str], None] | None = None,
activated: Callable[[], None] | None = None,
forgotten: Callable[[], None] | None = None,
@ -213,18 +224,11 @@ class WifiManager:
self._last_network_update = 0.0
def _monitor_state(self):
# TODO: make an initialize function to only wait in one place
device_path = self._wait_for_wifi_device()
if device_path is None:
return
self._tethering_password = self._get_tethering_password()
rule = MatchRule(
type="signal",
interface=NM_DEVICE_IFACE,
member="StateChanged",
path=device_path,
path=self._wifi_device,
)
# Filter for StateChanged signal
@ -261,8 +265,6 @@ class WifiManager:
self._enqueue_callbacks(self._forgotten)
def _network_scanner(self):
self._wait_for_wifi_device()
while not self._exit:
if self._active:
if time.monotonic() - self._last_network_update > SCAN_PERIOD_SECONDS:
@ -273,15 +275,12 @@ class WifiManager:
self._last_network_update = time.monotonic()
time.sleep(1 / 2.)
def _wait_for_wifi_device(self) -> str | None:
with self._lock:
device_path: str | None = None
while not self._exit:
device_path = self._get_wifi_device()
if device_path is not None:
break
time.sleep(1)
return device_path
def _wait_for_wifi_device(self):
while not self._exit:
device_path = self._get_wifi_device()
if device_path is not None:
break
time.sleep(1)
def _get_wifi_device(self) -> str | None:
if self._wifi_device is not None:
@ -304,15 +303,12 @@ class WifiManager:
conns: dict[str, str] = {}
for conn_path in known_connections:
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, "GetSettings"))
settings = self._get_connection_settings(conn_path)
# ignore connections removed during iteration (need auth, etc.)
if reply.header.message_type == MessageType.error:
cloudlog.warning(f"Failed to get connection properties for {conn_path}")
if len(settings) == 0:
cloudlog.warning(f'Failed to get connection settings for {conn_path}')
continue
settings = reply.body[0]
if "802-11-wireless" in settings:
ssid = settings['802-11-wireless']['ssid'][1].decode("utf-8", "replace")
if ssid != "":
@ -322,7 +318,6 @@ class WifiManager:
def _get_active_connections(self):
return self._router_main.send_and_get_reply(Properties(self._nm).get('ActiveConnections')).body[0][1]
# TODO: use this
def _get_connection_settings(self, conn_path: str) -> dict:
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_CONNECTION_IFACE)
reply = self._router_main.send_and_get_reply(new_method_call(conn_addr, 'GetSettings'))
@ -331,6 +326,43 @@ class WifiManager:
return {}
return dict(reply.body[0])
def _add_tethering_connection(self):
connection = {
'connection': {
'type': ('s', '802-11-wireless'),
'uuid': ('s', str(uuid.uuid4())),
'id': ('s', 'Hotspot'),
'autoconnect-retries': ('i', 0),
'interface-name': ('s', 'wlan0'),
'autoconnect': ('b', False),
},
'802-11-wireless': {
'band': ('s', 'bg'),
'mode': ('s', 'ap'),
'ssid': ('ay', self._tethering_ssid.encode("utf-8")),
},
'802-11-wireless-security': {
'group': ('as', ['ccmp']),
'key-mgmt': ('s', 'wpa-psk'),
'pairwise': ('as', ['ccmp']),
'proto': ('as', ['rsn']),
'psk': ('s', DEFAULT_TETHERING_PASSWORD),
},
'ipv4': {
'method': ('s', 'shared'),
'address-data': ('aa{sv}', [[
('address', ('s', TETHERING_IP_ADDRESS)),
('prefix', ('u', 24)),
]]),
'gateway': ('s', TETHERING_IP_ADDRESS),
'never-default': ('b', True),
},
'ipv6': {'method': ('s', 'ignore')},
}
settings_addr = DBusAddress(NM_SETTINGS_PATH, bus_name=NM, interface=NM_SETTINGS_IFACE)
self._router_main.send_and_get_reply(new_method_call(settings_addr, 'AddConnection', 'a{sa{sv}}', (connection,)))
def connect_to_network(self, ssid: str, password: str, hidden: bool = False):
def worker():
# Clear all connections that may already exist to the network we are connecting to
@ -503,7 +535,6 @@ class WifiManager:
self._current_network_metered = MeteredType.YES
elif metered_prop == MeteredType.NO:
self._current_network_metered = MeteredType.NO
print('current_network_metered', self._current_network_metered)
return
def set_current_network_metered(self, metered: MeteredType):

Loading…
Cancel
Save