You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							165 lines
						
					
					
						
							6.1 KiB
						
					
					
				
			
		
		
	
	
							165 lines
						
					
					
						
							6.1 KiB
						
					
					
				| import datetime
 | |
| import random
 | |
| import threading
 | |
| import time
 | |
| from statistics import mean
 | |
| 
 | |
| from cereal import log
 | |
| from selfdrive.swaglog import cloudlog
 | |
| 
 | |
| PANDA_OUTPUT_VOLTAGE = 5.28
 | |
| 
 | |
| 
 | |
| # Parameters
 | |
| def get_battery_capacity():
 | |
|   return _read_param("/sys/class/power_supply/battery/capacity", int)
 | |
| 
 | |
| 
 | |
| def get_battery_status():
 | |
|   # This does not correspond with actual charging or not.
 | |
|   # If a USB cable is plugged in, it responds with 'Charging', even when charging is disabled
 | |
|   return _read_param("/sys/class/power_supply/battery/status", lambda x: x.strip(), '')
 | |
| 
 | |
| 
 | |
| def get_battery_current():
 | |
|   return _read_param("/sys/class/power_supply/battery/current_now", int)
 | |
| 
 | |
| 
 | |
| def get_battery_voltage():
 | |
|   return _read_param("/sys/class/power_supply/battery/voltage_now", int)
 | |
| 
 | |
| 
 | |
| def get_usb_present():
 | |
|   return _read_param("/sys/class/power_supply/usb/present", lambda x: bool(int(x)), False)
 | |
| 
 | |
| 
 | |
| def get_battery_charging():
 | |
|   # This does correspond with actually charging
 | |
|   return _read_param("/sys/class/power_supply/battery/charge_type", lambda x: x.strip() != "N/A", False)
 | |
| 
 | |
| 
 | |
| def set_battery_charging(on):
 | |
|   with open('/sys/class/power_supply/battery/charging_enabled', 'w') as f:
 | |
|     f.write(f"{1 if on else 0}\n")
 | |
| 
 | |
| 
 | |
| # Helpers
 | |
| def _read_param(path, parser, default=0):
 | |
|   try:
 | |
|     with open(path) as f:
 | |
|       return parser(f.read())
 | |
|   except Exception:
 | |
|     return default
 | |
| 
 | |
| 
 | |
| def panda_current_to_actual_current(panda_current):
 | |
|   # From white/grey panda schematic
 | |
|   return (3.3 - (panda_current * 3.3 / 4096)) / 8.25
 | |
| 
 | |
| 
 | |
| class PowerMonitoring:
 | |
|   def __init__(self):
 | |
|     self.last_measurement_time = None           # Used for integration delta
 | |
|     self.power_used_uWh = 0                     # Integrated power usage in uWh since going into offroad
 | |
|     self.next_pulsed_measurement_time = None
 | |
|     self.integration_lock = threading.Lock()
 | |
| 
 | |
|   # Calculation tick
 | |
|   def calculate(self, health):
 | |
|     try:
 | |
|       now = time.time()
 | |
| 
 | |
|       # Check that time is valid
 | |
|       if datetime.datetime.fromtimestamp(now).year < 2019:
 | |
|         return
 | |
| 
 | |
|       # Only integrate when there is no ignition
 | |
|       # If health is None, we're probably not in a car, so we don't care
 | |
|       if health is None or (health.health.ignitionLine or health.health.ignitionCan) or \
 | |
|          health.health.hwType == log.HealthData.HwType.unknown:
 | |
|         with self.integration_lock:
 | |
|           self.last_measurement_time = None
 | |
|           self.next_pulsed_measurement_time = None
 | |
|           self.power_used_uWh = 0
 | |
|         return
 | |
| 
 | |
|       # First measurement, set integration time
 | |
|       with self.integration_lock:
 | |
|         if self.last_measurement_time is None:
 | |
|           self.last_measurement_time = now
 | |
|           return
 | |
| 
 | |
|       is_uno = health.health.hwType == log.HealthData.HwType.uno
 | |
|       # Get current power draw somehow
 | |
|       current_power = 0
 | |
|       if get_battery_status() == 'Discharging':
 | |
|         # If the battery is discharging, we can use this measurement
 | |
|         # On C2: this is low by about 10-15%, probably mostly due to UNO draw not being factored in
 | |
|         current_power = ((get_battery_voltage() / 1000000) * (get_battery_current() / 1000000))
 | |
|       elif (health.health.hwType in [log.HealthData.HwType.whitePanda, log.HealthData.HwType.greyPanda]) and (health.health.current > 1):
 | |
|         # If white/grey panda, use the integrated current measurements if the measurement is not 0
 | |
|         # If the measurement is 0, the current is 400mA or greater, and out of the measurement range of the panda
 | |
|         # This seems to be accurate to about 5%
 | |
|         current_power = (PANDA_OUTPUT_VOLTAGE * panda_current_to_actual_current(health.health.current))
 | |
|       elif (self.next_pulsed_measurement_time is not None) and (self.next_pulsed_measurement_time <= now):
 | |
|         # TODO: Figure out why this is off by a factor of 3/4???
 | |
|         FUDGE_FACTOR = 1.33
 | |
| 
 | |
|         # Turn off charging for about 10 sec in a thread that does not get killed on SIGINT, and perform measurement here to avoid blocking thermal
 | |
|         def perform_pulse_measurement(now):
 | |
|           try:
 | |
|             set_battery_charging(False)
 | |
|             time.sleep(5)
 | |
| 
 | |
|             # Measure for a few sec to get a good average
 | |
|             voltages = []
 | |
|             currents = []
 | |
|             for i in range(6):
 | |
|               voltages.append(get_battery_voltage())
 | |
|               currents.append(get_battery_current())
 | |
|               time.sleep(1)
 | |
|             current_power = ((mean(voltages) / 1000000) * (mean(currents) / 1000000))
 | |
| 
 | |
|             self._perform_integration(now, current_power * FUDGE_FACTOR)
 | |
| 
 | |
|             # Enable charging again
 | |
|             set_battery_charging(True)
 | |
|           except Exception:
 | |
|             cloudlog.exception("Pulsed power measurement failed")
 | |
| 
 | |
|         # Start pulsed measurement and return
 | |
|         threading.Thread(target=perform_pulse_measurement, args=(now,)).start()
 | |
|         self.next_pulsed_measurement_time = None
 | |
|         return
 | |
| 
 | |
|       elif self.next_pulsed_measurement_time is None and not is_uno:
 | |
|         # On a charging EON with black panda, or drawing more than 400mA out of a white/grey one
 | |
|         # Only way to get the power draw is to turn off charging for a few sec and check what the discharging rate is
 | |
|         # We shouldn't do this very often, so make sure it has been some long-ish random time interval
 | |
|         self.next_pulsed_measurement_time = now + random.randint(120, 180)
 | |
|         return
 | |
|       else:
 | |
|         # Do nothing
 | |
|         return
 | |
| 
 | |
|       # Do the integration
 | |
|       self._perform_integration(now, current_power)
 | |
|     except Exception:
 | |
|       cloudlog.exception("Power monitoring calculation failed")
 | |
| 
 | |
|   def _perform_integration(self, t, current_power):
 | |
|     with self.integration_lock:
 | |
|       try:
 | |
|         if self.last_measurement_time:
 | |
|           integration_time_h = (t - self.last_measurement_time) / 3600
 | |
|           power_used = (current_power * 1000000) * integration_time_h
 | |
|           if power_used < 0:
 | |
|             raise ValueError(f"Negative power used! Integration time: {integration_time_h} h Current Power: {power_used} uWh")
 | |
|           self.power_used_uWh += power_used
 | |
|           self.last_measurement_time = t
 | |
|       except Exception:
 | |
|         cloudlog.exception("Integration failed")
 | |
| 
 | |
|   # Get the power usage
 | |
|   def get_power_used(self):
 | |
|     return int(self.power_used_uWh)
 | |
| 
 |