|
|
|
@ -15,6 +15,7 @@ from jeepney.low_level import MessageType |
|
|
|
|
from jeepney.wrappers import Properties |
|
|
|
|
|
|
|
|
|
from openpilot.common.swaglog import cloudlog |
|
|
|
|
from openpilot.common.params import Params |
|
|
|
|
from openpilot.system.ui.lib.networkmanager import (NM, NM_WIRELESS_IFACE, NM_802_11_AP_SEC_PAIR_WEP40, |
|
|
|
|
NM_802_11_AP_SEC_PAIR_WEP104, NM_802_11_AP_SEC_GROUP_WEP40, |
|
|
|
|
NM_802_11_AP_SEC_GROUP_WEP104, NM_802_11_AP_SEC_KEY_MGMT_PSK, |
|
|
|
@ -136,6 +137,11 @@ class WifiManager: |
|
|
|
|
self._last_network_update: float = 0.0 |
|
|
|
|
self._callback_queue: list[Callable] = [] |
|
|
|
|
|
|
|
|
|
self._tethering_ssid = "weedle" |
|
|
|
|
dongle_id = Params().get("DongleId") |
|
|
|
|
if dongle_id: |
|
|
|
|
self._tethering_ssid += "-" + dongle_id[:4] |
|
|
|
|
|
|
|
|
|
# Callbacks |
|
|
|
|
self._need_auth: list[Callable[[str], None]] = [] |
|
|
|
|
self._activated: list[Callable[[], None]] = [] |
|
|
|
@ -153,11 +159,11 @@ class WifiManager: |
|
|
|
|
|
|
|
|
|
atexit.register(self.stop) |
|
|
|
|
|
|
|
|
|
def set_callbacks(self, need_auth: Callable[[str], None] | None, |
|
|
|
|
activated: Callable[[], None] | None, |
|
|
|
|
forgotten: Callable[[], None] | None, |
|
|
|
|
networks_updated: Callable[[list[Network]], None] | None, |
|
|
|
|
disconnected: Callable[[], None] | None): |
|
|
|
|
def set_callbacks(self, need_auth: Callable[[str], None] | None = None, |
|
|
|
|
activated: Callable[[], None] | None = None, |
|
|
|
|
forgotten: Callable[[], None] | None = None, |
|
|
|
|
networks_updated: Callable[[list[Network]], None] | None = None, |
|
|
|
|
disconnected: Callable[[], None] | None = None): |
|
|
|
|
if need_auth is not None: |
|
|
|
|
self._need_auth.append(need_auth) |
|
|
|
|
if activated is not None: |
|
|
|
@ -373,6 +379,34 @@ class WifiManager: |
|
|
|
|
else: |
|
|
|
|
threading.Thread(target=worker, daemon=True).start() |
|
|
|
|
|
|
|
|
|
def _deactivate_connection(self, ssid: str): |
|
|
|
|
for conn_path in self._get_active_connections(): |
|
|
|
|
conn_addr = DBusAddress(conn_path, bus_name=NM, interface=NM_ACTIVE_CONNECTION_IFACE) |
|
|
|
|
specific_obj_path = self._router_main.send_and_get_reply(Properties(conn_addr).get('SpecificObject')).body[0][1] |
|
|
|
|
|
|
|
|
|
if specific_obj_path != "/": |
|
|
|
|
ap_addr = DBusAddress(specific_obj_path, bus_name=NM, interface=NM_ACCESS_POINT_IFACE) |
|
|
|
|
ap_ssid = bytes(self._router_main.send_and_get_reply(Properties(ap_addr).get('Ssid')).body[0][1]).decode("utf-8", "replace") |
|
|
|
|
|
|
|
|
|
if ap_ssid == ssid: |
|
|
|
|
self._router_main.send_and_get_reply(new_method_call(self._nm, 'DeactivateConnection', 'o', (conn_path,))) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
def is_tethering_active(self) -> bool: |
|
|
|
|
for network in self._networks: |
|
|
|
|
if network.is_connected: |
|
|
|
|
return bool(network.ssid == self._tethering_ssid) |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
def set_tethering_active(self, active: bool): |
|
|
|
|
def worker(): |
|
|
|
|
if active: |
|
|
|
|
self.activate_connection(self._tethering_ssid, block=True) |
|
|
|
|
else: |
|
|
|
|
self._deactivate_connection(self._tethering_ssid) |
|
|
|
|
|
|
|
|
|
threading.Thread(target=worker, daemon=True).start() |
|
|
|
|
|
|
|
|
|
def _request_scan(self): |
|
|
|
|
if self._wifi_device is None: |
|
|
|
|
cloudlog.warning("No WiFi device found") |
|
|
|
|