You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							357 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
	
	
							357 lines
						
					
					
						
							11 KiB
						
					
					
				| import binascii
 | |
| import itertools
 | |
| import os
 | |
| import re
 | |
| import struct
 | |
| import subprocess
 | |
| 
 | |
| from cereal import log
 | |
| from selfdrive.hardware.base import HardwareBase, ThermalConfig
 | |
| 
 | |
| NetworkType = log.DeviceState.NetworkType
 | |
| NetworkStrength = log.DeviceState.NetworkStrength
 | |
| 
 | |
| 
 | |
| def service_call(call):
 | |
|   try:
 | |
|     ret = subprocess.check_output(["service", "call", *call], encoding='utf8').strip()
 | |
|     if 'Parcel' not in ret:
 | |
|       return None
 | |
|     return parse_service_call_bytes(ret)
 | |
|   except subprocess.CalledProcessError:
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def parse_service_call_unpack(r, fmt):
 | |
|   try:
 | |
|     return struct.unpack(fmt, r)[0]
 | |
|   except Exception:
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def parse_service_call_string(r):
 | |
|   try:
 | |
|     r = r[8:]  # Cut off length field
 | |
|     r = r.decode('utf_16_be')
 | |
| 
 | |
|     # All pairs of two characters seem to be swapped. Not sure why
 | |
|     result = ""
 | |
|     for a, b, in itertools.zip_longest(r[::2], r[1::2], fillvalue='\x00'):
 | |
|         result += b + a
 | |
| 
 | |
|     result = result.replace('\x00', '')
 | |
| 
 | |
|     return result
 | |
|   except Exception:
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def parse_service_call_bytes(ret):
 | |
|   try:
 | |
|     r = b""
 | |
|     for hex_part in re.findall(r'[ (]([0-9a-f]{8})', ret):
 | |
|       r += binascii.unhexlify(hex_part)
 | |
|     return r
 | |
|   except Exception:
 | |
|     return None
 | |
| 
 | |
| 
 | |
| def getprop(key):
 | |
|   return subprocess.check_output(["getprop", key], encoding='utf8').strip()
 | |
| 
 | |
| 
 | |
| class Android(HardwareBase):
 | |
|   def get_os_version(self):
 | |
|     with open("/VERSION") as f:
 | |
|       return f.read().strip()
 | |
| 
 | |
|   def get_device_type(self):
 | |
|     return "eon"
 | |
| 
 | |
|   def get_sound_card_online(self):
 | |
|     return (os.path.isfile('/proc/asound/card0/state') and
 | |
|             open('/proc/asound/card0/state').read().strip() == 'ONLINE')
 | |
| 
 | |
|   def get_imei(self, slot):
 | |
|     slot = str(slot)
 | |
|     if slot not in ("0", "1"):
 | |
|       raise ValueError("SIM slot must be 0 or 1")
 | |
| 
 | |
|     return parse_service_call_string(service_call(["iphonesubinfo", "3", "i32", str(slot)]))
 | |
| 
 | |
|   def get_serial(self):
 | |
|     ret = getprop("ro.serialno")
 | |
|     if len(ret) == 0:
 | |
|       ret = "cccccccc"
 | |
|     return ret
 | |
| 
 | |
|   def get_subscriber_info(self):
 | |
|     ret = parse_service_call_string(service_call(["iphonesubinfo", "7"]))
 | |
|     if ret is None or len(ret) < 8:
 | |
|       return ""
 | |
|     return ret
 | |
| 
 | |
|   def reboot(self, reason=None):
 | |
|     # e.g. reason="recovery" to go into recover mode
 | |
|     if reason is None:
 | |
|       reason_args = ["null"]
 | |
|     else:
 | |
|       reason_args = ["s16", reason]
 | |
| 
 | |
|     subprocess.check_output([
 | |
|       "service", "call", "power", "16",  # IPowerManager.reboot
 | |
|       "i32", "0",  # no confirmation,
 | |
|       *reason_args,
 | |
|       "i32", "1"  # wait
 | |
|     ])
 | |
| 
 | |
|   def uninstall(self):
 | |
|     with open('/cache/recovery/command', 'w') as f:
 | |
|       f.write('--wipe_data\n')
 | |
|     # IPowerManager.reboot(confirm=false, reason="recovery", wait=true)
 | |
|     self.reboot(reason="recovery")
 | |
| 
 | |
|   def get_sim_info(self):
 | |
|     # Used for athena
 | |
|     # TODO: build using methods from this class
 | |
|     sim_state = getprop("gsm.sim.state").split(",")
 | |
|     network_type = getprop("gsm.network.type").split(',')
 | |
|     mcc_mnc = getprop("gsm.sim.operator.numeric") or None
 | |
| 
 | |
|     sim_id = parse_service_call_string(service_call(['iphonesubinfo', '11']))
 | |
|     cell_data_state = parse_service_call_unpack(service_call(['phone', '46']), ">q")
 | |
|     cell_data_connected = (cell_data_state == 2)
 | |
| 
 | |
|     return {
 | |
|       'sim_id': sim_id,
 | |
|       'mcc_mnc': mcc_mnc,
 | |
|       'network_type': network_type,
 | |
|       'sim_state': sim_state,
 | |
|       'data_connected': cell_data_connected
 | |
|     }
 | |
| 
 | |
|   def get_network_type(self):
 | |
|     wifi_check = parse_service_call_string(service_call(["connectivity", "2"]))
 | |
|     if wifi_check is None:
 | |
|       return NetworkType.none
 | |
|     elif 'WIFI' in wifi_check:
 | |
|       return NetworkType.wifi
 | |
|     else:
 | |
|       cell_check = parse_service_call_unpack(service_call(['phone', '59']), ">q")
 | |
|       # from TelephonyManager.java
 | |
|       cell_networks = {
 | |
|         0: NetworkType.none,
 | |
|         1: NetworkType.cell2G,
 | |
|         2: NetworkType.cell2G,
 | |
|         3: NetworkType.cell3G,
 | |
|         4: NetworkType.cell2G,
 | |
|         5: NetworkType.cell3G,
 | |
|         6: NetworkType.cell3G,
 | |
|         7: NetworkType.cell3G,
 | |
|         8: NetworkType.cell3G,
 | |
|         9: NetworkType.cell3G,
 | |
|         10: NetworkType.cell3G,
 | |
|         11: NetworkType.cell2G,
 | |
|         12: NetworkType.cell3G,
 | |
|         13: NetworkType.cell4G,
 | |
|         14: NetworkType.cell4G,
 | |
|         15: NetworkType.cell3G,
 | |
|         16: NetworkType.cell2G,
 | |
|         17: NetworkType.cell3G,
 | |
|         18: NetworkType.cell4G,
 | |
|         19: NetworkType.cell4G
 | |
|       }
 | |
|       return cell_networks.get(cell_check, NetworkType.none)
 | |
| 
 | |
|   def get_network_strength(self, network_type):
 | |
|     network_strength = NetworkStrength.unknown
 | |
| 
 | |
|     # from SignalStrength.java
 | |
|     def get_lte_level(rsrp, rssnr):
 | |
|       INT_MAX = 2147483647
 | |
|       if rsrp == INT_MAX:
 | |
|         lvl_rsrp = NetworkStrength.unknown
 | |
|       elif rsrp >= -95:
 | |
|         lvl_rsrp = NetworkStrength.great
 | |
|       elif rsrp >= -105:
 | |
|         lvl_rsrp = NetworkStrength.good
 | |
|       elif rsrp >= -115:
 | |
|         lvl_rsrp = NetworkStrength.moderate
 | |
|       else:
 | |
|         lvl_rsrp = NetworkStrength.poor
 | |
|       if rssnr == INT_MAX:
 | |
|         lvl_rssnr = NetworkStrength.unknown
 | |
|       elif rssnr >= 45:
 | |
|         lvl_rssnr = NetworkStrength.great
 | |
|       elif rssnr >= 10:
 | |
|         lvl_rssnr = NetworkStrength.good
 | |
|       elif rssnr >= -30:
 | |
|         lvl_rssnr = NetworkStrength.moderate
 | |
|       else:
 | |
|         lvl_rssnr = NetworkStrength.poor
 | |
|       return max(lvl_rsrp, lvl_rssnr)
 | |
| 
 | |
|     def get_tdscdma_level(tdscmadbm):
 | |
|       lvl = NetworkStrength.unknown
 | |
|       if tdscmadbm > -25:
 | |
|         lvl = NetworkStrength.unknown
 | |
|       elif tdscmadbm >= -49:
 | |
|         lvl = NetworkStrength.great
 | |
|       elif tdscmadbm >= -73:
 | |
|         lvl = NetworkStrength.good
 | |
|       elif tdscmadbm >= -97:
 | |
|         lvl = NetworkStrength.moderate
 | |
|       elif tdscmadbm >= -110:
 | |
|         lvl = NetworkStrength.poor
 | |
|       return lvl
 | |
| 
 | |
|     def get_gsm_level(asu):
 | |
|       if asu <= 2 or asu == 99:
 | |
|         lvl = NetworkStrength.unknown
 | |
|       elif asu >= 12:
 | |
|         lvl = NetworkStrength.great
 | |
|       elif asu >= 8:
 | |
|         lvl = NetworkStrength.good
 | |
|       elif asu >= 5:
 | |
|         lvl = NetworkStrength.moderate
 | |
|       else:
 | |
|         lvl = NetworkStrength.poor
 | |
|       return lvl
 | |
| 
 | |
|     def get_evdo_level(evdodbm, evdosnr):
 | |
|       lvl_evdodbm = NetworkStrength.unknown
 | |
|       lvl_evdosnr = NetworkStrength.unknown
 | |
|       if evdodbm >= -65:
 | |
|         lvl_evdodbm = NetworkStrength.great
 | |
|       elif evdodbm >= -75:
 | |
|         lvl_evdodbm = NetworkStrength.good
 | |
|       elif evdodbm >= -90:
 | |
|         lvl_evdodbm = NetworkStrength.moderate
 | |
|       elif evdodbm >= -105:
 | |
|         lvl_evdodbm = NetworkStrength.poor
 | |
|       if evdosnr >= 7:
 | |
|         lvl_evdosnr = NetworkStrength.great
 | |
|       elif evdosnr >= 5:
 | |
|         lvl_evdosnr = NetworkStrength.good
 | |
|       elif evdosnr >= 3:
 | |
|         lvl_evdosnr = NetworkStrength.moderate
 | |
|       elif evdosnr >= 1:
 | |
|         lvl_evdosnr = NetworkStrength.poor
 | |
|       return max(lvl_evdodbm, lvl_evdosnr)
 | |
| 
 | |
|     def get_cdma_level(cdmadbm, cdmaecio):
 | |
|       lvl_cdmadbm = NetworkStrength.unknown
 | |
|       lvl_cdmaecio = NetworkStrength.unknown
 | |
|       if cdmadbm >= -75:
 | |
|         lvl_cdmadbm = NetworkStrength.great
 | |
|       elif cdmadbm >= -85:
 | |
|         lvl_cdmadbm = NetworkStrength.good
 | |
|       elif cdmadbm >= -95:
 | |
|         lvl_cdmadbm = NetworkStrength.moderate
 | |
|       elif cdmadbm >= -100:
 | |
|         lvl_cdmadbm = NetworkStrength.poor
 | |
|       if cdmaecio >= -90:
 | |
|         lvl_cdmaecio = NetworkStrength.great
 | |
|       elif cdmaecio >= -110:
 | |
|         lvl_cdmaecio = NetworkStrength.good
 | |
|       elif cdmaecio >= -130:
 | |
|         lvl_cdmaecio = NetworkStrength.moderate
 | |
|       elif cdmaecio >= -150:
 | |
|         lvl_cdmaecio = NetworkStrength.poor
 | |
|       return max(lvl_cdmadbm, lvl_cdmaecio)
 | |
| 
 | |
|     if network_type == NetworkType.none:
 | |
|       return network_strength
 | |
|     if network_type == NetworkType.wifi:
 | |
|       out = subprocess.check_output('dumpsys connectivity', shell=True).decode('utf-8')
 | |
|       network_strength = NetworkStrength.unknown
 | |
|       for line in out.split('\n'):
 | |
|         signal_str = "SignalStrength: "
 | |
|         if signal_str in line:
 | |
|           lvl_idx_start = line.find(signal_str) + len(signal_str)
 | |
|           lvl_idx_end = line.find(']', lvl_idx_start)
 | |
|           lvl = int(line[lvl_idx_start : lvl_idx_end])
 | |
|           if lvl >= -50:
 | |
|             network_strength = NetworkStrength.great
 | |
|           elif lvl >= -60:
 | |
|             network_strength = NetworkStrength.good
 | |
|           elif lvl >= -70:
 | |
|             network_strength = NetworkStrength.moderate
 | |
|           else:
 | |
|             network_strength = NetworkStrength.poor
 | |
|       return network_strength
 | |
|     else:
 | |
|       # check cell strength
 | |
|       out = subprocess.check_output('dumpsys telephony.registry', shell=True).decode('utf-8')
 | |
|       for line in out.split('\n'):
 | |
|         if "mSignalStrength" in line:
 | |
|           arr = line.split(' ')
 | |
|           ns = 0
 | |
|           if ("gsm" in arr[14]):
 | |
|             rsrp = int(arr[9])
 | |
|             rssnr = int(arr[11])
 | |
|             ns = get_lte_level(rsrp, rssnr)
 | |
|             if ns == NetworkStrength.unknown:
 | |
|               tdscmadbm = int(arr[13])
 | |
|               ns = get_tdscdma_level(tdscmadbm)
 | |
|               if ns == NetworkStrength.unknown:
 | |
|                 asu = int(arr[1])
 | |
|                 ns = get_gsm_level(asu)
 | |
|           else:
 | |
|             cdmadbm = int(arr[3])
 | |
|             cdmaecio = int(arr[4])
 | |
|             evdodbm = int(arr[5])
 | |
|             evdosnr = int(arr[7])
 | |
|             lvl_cdma = get_cdma_level(cdmadbm, cdmaecio)
 | |
|             lvl_edmo = get_evdo_level(evdodbm, evdosnr)
 | |
|             if lvl_edmo == NetworkStrength.unknown:
 | |
|               ns = lvl_cdma
 | |
|             elif lvl_cdma == NetworkStrength.unknown:
 | |
|               ns = lvl_edmo
 | |
|             else:
 | |
|               ns = min(lvl_cdma, lvl_edmo)
 | |
|           network_strength = max(network_strength, ns)
 | |
| 
 | |
|       return network_strength
 | |
| 
 | |
|   def get_battery_capacity(self):
 | |
|     return self.read_param_file("/sys/class/power_supply/battery/capacity", int, 100)
 | |
| 
 | |
|   def get_battery_status(self):
 | |
|     # This does not correspond with actual charging or not.
 | |
|     # If a USB cable is plugged in, it responds with 'Charging', even when charging is disabled
 | |
|     return self.read_param_file("/sys/class/power_supply/battery/status", lambda x: x.strip(), '')
 | |
| 
 | |
|   def get_battery_current(self):
 | |
|     return self.read_param_file("/sys/class/power_supply/battery/current_now", int)
 | |
| 
 | |
|   def get_battery_voltage(self):
 | |
|     return self.read_param_file("/sys/class/power_supply/battery/voltage_now", int)
 | |
| 
 | |
|   def get_battery_charging(self):
 | |
|     # This does correspond with actually charging
 | |
|     return self.read_param_file("/sys/class/power_supply/battery/charge_type", lambda x: x.strip() != "N/A", True)
 | |
| 
 | |
|   def set_battery_charging(self, on):
 | |
|     with open('/sys/class/power_supply/battery/charging_enabled', 'w') as f:
 | |
|       f.write(f"{1 if on else 0}\n")
 | |
| 
 | |
|   def get_usb_present(self):
 | |
|     return self.read_param_file("/sys/class/power_supply/usb/present", lambda x: bool(int(x)), False)
 | |
| 
 | |
|   def get_current_power_draw(self):
 | |
|     # We don't have a good direct way to measure this on android
 | |
|     return None
 | |
| 
 | |
|   def shutdown(self):
 | |
|     os.system('LD_LIBRARY_PATH="" svc power shutdown')
 | |
| 
 | |
|   def get_thermal_config(self):
 | |
|     return ThermalConfig(cpu=((5, 7, 10, 12), 10), gpu=((16,), 10), mem=(2, 10), bat=(29, 1000), ambient=(25, 1))
 | |
| 
 | |
|   def set_screen_brightness(self, percentage):
 | |
|     with open("/sys/class/leds/lcd-backlight/brightness", "w") as f:
 | |
|       f.write(str(int(percentage * 2.55)))
 | |
| 
 | |
|   def set_power_save(self, enabled):
 | |
|     pass
 | |
| 
 |