|
|
|
@ -5,6 +5,7 @@ import re |
|
|
|
|
import serial |
|
|
|
|
import struct |
|
|
|
|
import subprocess |
|
|
|
|
from typing import List, Union |
|
|
|
|
|
|
|
|
|
from cereal import log |
|
|
|
|
from selfdrive.hardware.base import HardwareBase, ThermalConfig |
|
|
|
@ -14,7 +15,7 @@ NetworkStrength = log.DeviceState.NetworkStrength |
|
|
|
|
|
|
|
|
|
MODEM_PATH = "/dev/smd11" |
|
|
|
|
|
|
|
|
|
def service_call(call): |
|
|
|
|
def service_call(call: List[str]) -> Union[bytes, None]: |
|
|
|
|
try: |
|
|
|
|
ret = subprocess.check_output(["service", "call", *call], encoding='utf8').strip() |
|
|
|
|
if 'Parcel' not in ret: |
|
|
|
@ -24,31 +25,29 @@ def service_call(call): |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_service_call_unpack(r, fmt): |
|
|
|
|
def parse_service_call_unpack(r, fmt) -> Union[bytes, None]: |
|
|
|
|
try: |
|
|
|
|
return struct.unpack(fmt, r)[0] |
|
|
|
|
except Exception: |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_service_call_string(r): |
|
|
|
|
def parse_service_call_string(r: bytes) -> Union[str, None]: |
|
|
|
|
try: |
|
|
|
|
r = r[8:] # Cut off length field |
|
|
|
|
r = r.decode('utf_16_be') |
|
|
|
|
r_str = r.decode('utf_16_be') |
|
|
|
|
|
|
|
|
|
# All pairs of two characters seem to be swapped. Not sure why |
|
|
|
|
result = "" |
|
|
|
|
for a, b, in itertools.zip_longest(r[::2], r[1::2], fillvalue='\x00'): |
|
|
|
|
result += b + a |
|
|
|
|
for a, b, in itertools.zip_longest(r_str[::2], r_str[1::2], fillvalue='\x00'): |
|
|
|
|
result += b + a |
|
|
|
|
|
|
|
|
|
result = result.replace('\x00', '') |
|
|
|
|
|
|
|
|
|
return result |
|
|
|
|
return result.replace('\x00', '') |
|
|
|
|
except Exception: |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_service_call_bytes(ret): |
|
|
|
|
def parse_service_call_bytes(ret: str) -> Union[bytes, None]: |
|
|
|
|
try: |
|
|
|
|
r = b"" |
|
|
|
|
for hex_part in re.findall(r'[ (]([0-9a-f]{8})', ret): |
|
|
|
@ -58,8 +57,11 @@ def parse_service_call_bytes(ret): |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getprop(key): |
|
|
|
|
return subprocess.check_output(["getprop", key], encoding='utf8').strip() |
|
|
|
|
def getprop(key: str) -> Union[str, None]: |
|
|
|
|
try: |
|
|
|
|
return subprocess.check_output(["getprop", key], encoding='utf8').strip() |
|
|
|
|
except subprocess.CalledProcessError: |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Android(HardwareBase): |
|
|
|
@ -134,9 +136,9 @@ class Android(HardwareBase): |
|
|
|
|
|
|
|
|
|
def get_network_info(self): |
|
|
|
|
msg = log.DeviceState.NetworkInfo.new_message() |
|
|
|
|
msg.state = getprop("gsm.sim.state") |
|
|
|
|
msg.technology = getprop("gsm.network.type") |
|
|
|
|
msg.operator = getprop("gsm.sim.operator.numeric") |
|
|
|
|
msg.state = getprop("gsm.sim.state") or "" |
|
|
|
|
msg.technology = getprop("gsm.network.type") or "" |
|
|
|
|
msg.operator = getprop("gsm.sim.operator.numeric") or "" |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
modem = serial.Serial(MODEM_PATH, 115200, timeout=0.1) |
|
|
|
|