|
|
|
@ -12,6 +12,7 @@ from dbus_next.aio import MessageBus |
|
|
|
|
from dbus_next import BusType, Variant, Message |
|
|
|
|
from dbus_next.errors import DBusError |
|
|
|
|
from dbus_next.constants import MessageType |
|
|
|
|
from openpilot.common.params import Params |
|
|
|
|
from openpilot.common.swaglog import cloudlog |
|
|
|
|
|
|
|
|
|
T = TypeVar("T") |
|
|
|
@ -29,6 +30,9 @@ NM_DEVICE_IFACE = "org.freedesktop.NetworkManager.Device" |
|
|
|
|
|
|
|
|
|
NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 |
|
|
|
|
|
|
|
|
|
TETHERING_IP_ADDRESS = "192.168.43.1" |
|
|
|
|
DEFAULT_TETHERING_PASSWORD = "12345678" |
|
|
|
|
|
|
|
|
|
# NetworkManager device states |
|
|
|
|
class NMDeviceState(IntEnum): |
|
|
|
|
DISCONNECTED = 30 |
|
|
|
@ -72,6 +76,7 @@ class WifiManager: |
|
|
|
|
self.saved_connections: dict[str, str] = {} |
|
|
|
|
self.active_ap_path: str = "" |
|
|
|
|
self.scan_task: asyncio.Task | None = None |
|
|
|
|
self._tethering_ssid = "weedle-" + Params().get("DongleId", encoding="utf-8") |
|
|
|
|
self.running: bool = True |
|
|
|
|
|
|
|
|
|
async def connect(self) -> None: |
|
|
|
@ -83,6 +88,7 @@ class WifiManager: |
|
|
|
|
await self._setup_signals(self.device_path) |
|
|
|
|
|
|
|
|
|
self.active_ap_path = await self.get_active_access_point() |
|
|
|
|
await self.add_tethering_connection(self._tethering_ssid, DEFAULT_TETHERING_PASSWORD) |
|
|
|
|
self.saved_connections = await self._get_saved_connections() |
|
|
|
|
self.scan_task = asyncio.create_task(self._periodic_scan()) |
|
|
|
|
except DBusError as e: |
|
|
|
@ -199,6 +205,160 @@ class WifiManager: |
|
|
|
|
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
async def add_tethering_connection(self, ssid: str, password: str = "12345678") -> bool: |
|
|
|
|
"""Create a WiFi tethering connection.""" |
|
|
|
|
if len(password) < 8: |
|
|
|
|
print("Tethering password must be at least 8 characters") |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
# First, check if a hotspot connection already exists |
|
|
|
|
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) |
|
|
|
|
connection_paths = await settings_iface.call_list_connections() |
|
|
|
|
|
|
|
|
|
# Look for an existing hotspot connection |
|
|
|
|
for path in connection_paths: |
|
|
|
|
try: |
|
|
|
|
settings = await self._get_connection_settings(path) |
|
|
|
|
conn_type = settings.get('connection', {}).get('type', Variant('s', '')).value |
|
|
|
|
wifi_mode = settings.get('802-11-wireless', {}).get('mode', Variant('s', '')).value |
|
|
|
|
|
|
|
|
|
if conn_type == '802-11-wireless' and wifi_mode == 'ap': |
|
|
|
|
# Extract the SSID to check |
|
|
|
|
connection_ssid = self._extract_ssid(settings) |
|
|
|
|
if connection_ssid == ssid: |
|
|
|
|
return True |
|
|
|
|
except DBusError: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
connection = { |
|
|
|
|
'connection': { |
|
|
|
|
'id': Variant('s', 'Hotspot'), |
|
|
|
|
'uuid': Variant('s', str(uuid.uuid4())), |
|
|
|
|
'type': Variant('s', '802-11-wireless'), |
|
|
|
|
'interface-name': Variant('s', 'wlan0'), |
|
|
|
|
'autoconnect': Variant('b', False), |
|
|
|
|
}, |
|
|
|
|
'802-11-wireless': { |
|
|
|
|
'band': Variant('s', 'bg'), |
|
|
|
|
'mode': Variant('s', 'ap'), |
|
|
|
|
'ssid': Variant('ay', ssid.encode('utf-8')), |
|
|
|
|
}, |
|
|
|
|
'802-11-wireless-security': { |
|
|
|
|
'group': Variant('as', ['ccmp']), |
|
|
|
|
'key-mgmt': Variant('s', 'wpa-psk'), |
|
|
|
|
'pairwise': Variant('as', ['ccmp']), |
|
|
|
|
'proto': Variant('as', ['rsn']), |
|
|
|
|
'psk': Variant('s', password), |
|
|
|
|
}, |
|
|
|
|
'ipv4': { |
|
|
|
|
'method': Variant('s', 'shared'), |
|
|
|
|
'address-data': Variant('aa{sv}', [{'address': Variant('s', TETHERING_IP_ADDRESS), 'prefix': Variant('u', 24)}]), |
|
|
|
|
'gateway': Variant('s', TETHERING_IP_ADDRESS), |
|
|
|
|
'never-default': Variant('b', True), |
|
|
|
|
}, |
|
|
|
|
'ipv6': { |
|
|
|
|
'method': Variant('s', 'ignore'), |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
settings_iface = await self._get_interface(NM, NM_SETTINGS_PATH, NM_SETTINGS_IFACE) |
|
|
|
|
new_connection = await settings_iface.call_add_connection(connection) |
|
|
|
|
print(f"Added tethering connection with path: {new_connection}") |
|
|
|
|
return True |
|
|
|
|
except DBusError as e: |
|
|
|
|
print(f"Failed to add tethering connection: {e}") |
|
|
|
|
return False |
|
|
|
|
except Exception as e: |
|
|
|
|
print(f"Unexpected error adding tethering connection: {e}") |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
async def get_tethering_password(self) -> str: |
|
|
|
|
"""Get the current tethering password.""" |
|
|
|
|
try: |
|
|
|
|
hotspot_path = self.saved_connections.get(self._tethering_ssid) |
|
|
|
|
if hotspot_path: |
|
|
|
|
conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) |
|
|
|
|
secrets = await conn_iface.call_get_secrets('802-11-wireless-security') |
|
|
|
|
if secrets and '802-11-wireless-security' in secrets: |
|
|
|
|
psk = secrets.get('802-11-wireless-security', {}).get('psk', Variant('s', '')).value |
|
|
|
|
return str(psk) if psk is not None else "" |
|
|
|
|
return "" |
|
|
|
|
except DBusError as e: |
|
|
|
|
print(f"Failed to get tethering password: {e}") |
|
|
|
|
return "" |
|
|
|
|
except Exception as e: |
|
|
|
|
print(f"Unexpected error getting tethering password: {e}") |
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
|
async def set_tethering_password(self, password: str) -> bool: |
|
|
|
|
"""Set the tethering password.""" |
|
|
|
|
if len(password) < 8: |
|
|
|
|
cloudlog.error("Tethering password must be at least 8 characters") |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
hotspot_path = self.saved_connections.get(self._tethering_ssid) |
|
|
|
|
if not hotspot_path: |
|
|
|
|
print("No hotspot connection found") |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
# Update the connection settings with new password |
|
|
|
|
settings = await self._get_connection_settings(hotspot_path) |
|
|
|
|
if '802-11-wireless-security' not in settings: |
|
|
|
|
settings['802-11-wireless-security'] = {} |
|
|
|
|
settings['802-11-wireless-security']['psk'] = Variant('s', password) |
|
|
|
|
|
|
|
|
|
# Apply changes |
|
|
|
|
conn_iface = await self._get_interface(NM, hotspot_path, NM_CONNECTION_IFACE) |
|
|
|
|
await conn_iface.call_update(settings) |
|
|
|
|
|
|
|
|
|
# Check if connection is active and restart if needed |
|
|
|
|
is_active = False |
|
|
|
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
|
|
|
|
active_connections = await nm_iface.get_active_connections() |
|
|
|
|
|
|
|
|
|
for conn_path in active_connections: |
|
|
|
|
props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) |
|
|
|
|
conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') |
|
|
|
|
if conn_id_path.value == hotspot_path: |
|
|
|
|
is_active = True |
|
|
|
|
await nm_iface.call_deactivate_connection(conn_path) |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
if is_active: |
|
|
|
|
await nm_iface.call_activate_connection(hotspot_path, self.device_path, "/") |
|
|
|
|
|
|
|
|
|
print("Tethering password updated successfully") |
|
|
|
|
return True |
|
|
|
|
except DBusError as e: |
|
|
|
|
print(f"Failed to set tethering password: {e}") |
|
|
|
|
return False |
|
|
|
|
except Exception as e: |
|
|
|
|
print(f"Unexpected error setting tethering password: {e}") |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
async def is_tethering_active(self) -> bool: |
|
|
|
|
"""Check if tethering is active for the specified SSID.""" |
|
|
|
|
try: |
|
|
|
|
hotspot_path = self.saved_connections.get(self._tethering_ssid) |
|
|
|
|
if not hotspot_path: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
nm_iface = await self._get_interface(NM, NM_PATH, NM_IFACE) |
|
|
|
|
active_connections = await nm_iface.get_active_connections() |
|
|
|
|
|
|
|
|
|
for conn_path in active_connections: |
|
|
|
|
props_iface = await self._get_interface(NM, conn_path, NM_PROPERTIES_IFACE) |
|
|
|
|
conn_id_path = await props_iface.call_get('org.freedesktop.NetworkManager.Connection.Active', 'Connection') |
|
|
|
|
|
|
|
|
|
if conn_id_path.value == hotspot_path: |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
return False |
|
|
|
|
except Exception: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
async def _periodic_scan(self): |
|
|
|
|
while self.running: |
|
|
|
|
try: |
|
|
|
|