|  |  |  | import random
 | 
					
						
							|  |  |  | import threading
 | 
					
						
							|  |  |  | import time
 | 
					
						
							|  |  |  | from statistics import mean
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from cereal import log
 | 
					
						
							|  |  |  | from common.params import Params, put_nonblocking
 | 
					
						
							|  |  |  | from common.realtime import sec_since_boot
 | 
					
						
							|  |  |  | from selfdrive.hardware import HARDWARE
 | 
					
						
							|  |  |  | from selfdrive.swaglog import cloudlog
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | CAR_VOLTAGE_LOW_PASS_K = 0.091 # LPF gain for 5s tau (dt/tau / (dt/tau + 1))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # A C2 uses about 1W while idling, and 30h seens like a good shutoff for most cars
 | 
					
						
							|  |  |  | # While driving, a battery charges completely in about 30-60 minutes
 | 
					
						
							|  |  |  | CAR_BATTERY_CAPACITY_uWh = 30e6
 | 
					
						
							|  |  |  | CAR_CHARGING_RATE_W = 45
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | VBATT_PAUSE_CHARGING = 11.0
 | 
					
						
							|  |  |  | MAX_TIME_OFFROAD_S = 30*3600
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class PowerMonitoring:
 | 
					
						
							|  |  |  |   def __init__(self):
 | 
					
						
							|  |  |  |     self.params = Params()
 | 
					
						
							|  |  |  |     self.last_measurement_time = None           # Used for integration delta
 | 
					
						
							|  |  |  |     self.last_save_time = 0                     # Used for saving current value in a param
 | 
					
						
							|  |  |  |     self.power_used_uWh = 0                     # Integrated power usage in uWh since going into offroad
 | 
					
						
							|  |  |  |     self.next_pulsed_measurement_time = None
 | 
					
						
							|  |  |  |     self.car_voltage_mV = 12e3                  # Low-passed version of health voltage
 | 
					
						
							|  |  |  |     self.integration_lock = threading.Lock()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     car_battery_capacity_uWh = self.params.get("CarBatteryCapacity")
 | 
					
						
							|  |  |  |     if car_battery_capacity_uWh is None:
 | 
					
						
							|  |  |  |       car_battery_capacity_uWh = 0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Reset capacity if it's low
 | 
					
						
							|  |  |  |     self.car_battery_capacity_uWh = max((CAR_BATTERY_CAPACITY_uWh / 10), int(car_battery_capacity_uWh))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # Calculation tick
 | 
					
						
							|  |  |  |   def calculate(self, health):
 | 
					
						
							|  |  |  |     try:
 | 
					
						
							|  |  |  |       now = sec_since_boot()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # If health is None, we're probably not in a car, so we don't care
 | 
					
						
							|  |  |  |       if health is None or health.health.hwType == log.HealthData.HwType.unknown:
 | 
					
						
							|  |  |  |         with self.integration_lock:
 | 
					
						
							|  |  |  |           self.last_measurement_time = None
 | 
					
						
							|  |  |  |           self.next_pulsed_measurement_time = None
 | 
					
						
							|  |  |  |           self.power_used_uWh = 0
 | 
					
						
							|  |  |  |         return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # Low-pass battery voltage
 | 
					
						
							|  |  |  |       self.car_voltage_mV = ((health.health.voltage * CAR_VOLTAGE_LOW_PASS_K) + (self.car_voltage_mV * (1 -  CAR_VOLTAGE_LOW_PASS_K)))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # Cap the car battery power and save it in a param every 10-ish seconds
 | 
					
						
							|  |  |  |       self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)
 | 
					
						
							|  |  |  |       self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh)
 | 
					
						
							|  |  |  |       if now - self.last_save_time >= 10:
 | 
					
						
							|  |  |  |         put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh)))
 | 
					
						
							|  |  |  |         self.last_save_time = now
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # First measurement, set integration time
 | 
					
						
							|  |  |  |       with self.integration_lock:
 | 
					
						
							|  |  |  |         if self.last_measurement_time is None:
 | 
					
						
							|  |  |  |           self.last_measurement_time = now
 | 
					
						
							|  |  |  |           return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if (health.health.ignitionLine or health.health.ignitionCan):
 | 
					
						
							|  |  |  |         # If there is ignition, we integrate the charging rate of the car
 | 
					
						
							|  |  |  |         with self.integration_lock:
 | 
					
						
							|  |  |  |           self.power_used_uWh = 0
 | 
					
						
							|  |  |  |           integration_time_h = (now - self.last_measurement_time) / 3600
 | 
					
						
							|  |  |  |           if integration_time_h < 0:
 | 
					
						
							|  |  |  |             raise ValueError(f"Negative integration time: {integration_time_h}h")
 | 
					
						
							|  |  |  |           self.car_battery_capacity_uWh += (CAR_CHARGING_RATE_W * 1e6 * integration_time_h)
 | 
					
						
							|  |  |  |           self.last_measurement_time = now
 | 
					
						
							|  |  |  |       else:
 | 
					
						
							|  |  |  |         # No ignition, we integrate the offroad power used by the device
 | 
					
						
							|  |  |  |         is_uno = health.health.hwType == log.HealthData.HwType.uno
 | 
					
						
							|  |  |  |         # Get current power draw somehow
 | 
					
						
							|  |  |  |         current_power = HARDWARE.get_current_power_draw()
 | 
					
						
							|  |  |  |         if current_power is not None:
 | 
					
						
							|  |  |  |           pass
 | 
					
						
							|  |  |  |         elif HARDWARE.get_battery_status() == 'Discharging':
 | 
					
						
							|  |  |  |           # If the battery is discharging, we can use this measurement
 | 
					
						
							|  |  |  |           # On C2: this is low by about 10-15%, probably mostly due to UNO draw not being factored in
 | 
					
						
							|  |  |  |           current_power = ((HARDWARE.get_battery_voltage() / 1000000) * (HARDWARE.get_battery_current() / 1000000))
 | 
					
						
							|  |  |  |         elif (self.next_pulsed_measurement_time is not None) and (self.next_pulsed_measurement_time <= now):
 | 
					
						
							|  |  |  |           # TODO: Figure out why this is off by a factor of 3/4???
 | 
					
						
							|  |  |  |           FUDGE_FACTOR = 1.33
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |           # Turn off charging for about 10 sec in a thread that does not get killed on SIGINT, and perform measurement here to avoid blocking thermal
 | 
					
						
							|  |  |  |           def perform_pulse_measurement(now):
 | 
					
						
							|  |  |  |             try:
 | 
					
						
							|  |  |  |               HARDWARE.set_battery_charging(False)
 | 
					
						
							|  |  |  |               time.sleep(5)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |               # Measure for a few sec to get a good average
 | 
					
						
							|  |  |  |               voltages = []
 | 
					
						
							|  |  |  |               currents = []
 | 
					
						
							|  |  |  |               for _ in range(6):
 | 
					
						
							|  |  |  |                 voltages.append(HARDWARE.get_battery_voltage())
 | 
					
						
							|  |  |  |                 currents.append(HARDWARE.get_battery_current())
 | 
					
						
							|  |  |  |                 time.sleep(1)
 | 
					
						
							|  |  |  |               current_power = ((mean(voltages) / 1000000) * (mean(currents) / 1000000))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |               self._perform_integration(now, current_power * FUDGE_FACTOR)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |               # Enable charging again
 | 
					
						
							|  |  |  |               HARDWARE.set_battery_charging(True)
 | 
					
						
							|  |  |  |             except Exception:
 | 
					
						
							|  |  |  |               cloudlog.exception("Pulsed power measurement failed")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |           # Start pulsed measurement and return
 | 
					
						
							|  |  |  |           threading.Thread(target=perform_pulse_measurement, args=(now,)).start()
 | 
					
						
							|  |  |  |           self.next_pulsed_measurement_time = None
 | 
					
						
							|  |  |  |           return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         elif self.next_pulsed_measurement_time is None and not is_uno:
 | 
					
						
							|  |  |  |           # On a charging EON with black panda, or drawing more than 400mA out of a white/grey one
 | 
					
						
							|  |  |  |           # Only way to get the power draw is to turn off charging for a few sec and check what the discharging rate is
 | 
					
						
							|  |  |  |           # We shouldn't do this very often, so make sure it has been some long-ish random time interval
 | 
					
						
							|  |  |  |           self.next_pulsed_measurement_time = now + random.randint(120, 180)
 | 
					
						
							|  |  |  |           return
 | 
					
						
							|  |  |  |         else:
 | 
					
						
							|  |  |  |           # Do nothing
 | 
					
						
							|  |  |  |           return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Do the integration
 | 
					
						
							|  |  |  |         self._perform_integration(now, current_power)
 | 
					
						
							|  |  |  |     except Exception:
 | 
					
						
							|  |  |  |       cloudlog.exception("Power monitoring calculation failed")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def _perform_integration(self, t, current_power):
 | 
					
						
							|  |  |  |     with self.integration_lock:
 | 
					
						
							|  |  |  |       try:
 | 
					
						
							|  |  |  |         if self.last_measurement_time:
 | 
					
						
							|  |  |  |           integration_time_h = (t - self.last_measurement_time) / 3600
 | 
					
						
							|  |  |  |           power_used = (current_power * 1000000) * integration_time_h
 | 
					
						
							|  |  |  |           if power_used < 0:
 | 
					
						
							|  |  |  |             raise ValueError(f"Negative power used! Integration time: {integration_time_h} h Current Power: {power_used} uWh")
 | 
					
						
							|  |  |  |           self.power_used_uWh += power_used
 | 
					
						
							|  |  |  |           self.car_battery_capacity_uWh -= power_used
 | 
					
						
							|  |  |  |           self.last_measurement_time = t
 | 
					
						
							|  |  |  |       except Exception:
 | 
					
						
							|  |  |  |         cloudlog.exception("Integration failed")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # Get the power usage
 | 
					
						
							|  |  |  |   def get_power_used(self):
 | 
					
						
							|  |  |  |     return int(self.power_used_uWh)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get_car_battery_capacity(self):
 | 
					
						
							|  |  |  |     return int(self.car_battery_capacity_uWh)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # See if we need to disable charging
 | 
					
						
							|  |  |  |   def should_disable_charging(self, health, offroad_timestamp):
 | 
					
						
							|  |  |  |     if health is None or offroad_timestamp is None:
 | 
					
						
							|  |  |  |       return False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     now = sec_since_boot()
 | 
					
						
							|  |  |  |     disable_charging = False
 | 
					
						
							|  |  |  |     disable_charging |= (now - offroad_timestamp) > MAX_TIME_OFFROAD_S
 | 
					
						
							|  |  |  |     disable_charging |= (self.car_voltage_mV < (VBATT_PAUSE_CHARGING * 1e3))
 | 
					
						
							|  |  |  |     disable_charging |= (self.car_battery_capacity_uWh <= 0)
 | 
					
						
							|  |  |  |     disable_charging &= (not health.health.ignitionLine and not health.health.ignitionCan)
 | 
					
						
							|  |  |  |     disable_charging &= (self.params.get("DisablePowerDown") != b"1")
 | 
					
						
							|  |  |  |     return disable_charging
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # See if we need to shutdown
 | 
					
						
							|  |  |  |   def should_shutdown(self, health, offroad_timestamp, started_seen, LEON):
 | 
					
						
							|  |  |  |     if health is None or offroad_timestamp is None:
 | 
					
						
							|  |  |  |       return False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     now = sec_since_boot()
 | 
					
						
							|  |  |  |     panda_charging = (health.health.usbPowerMode != log.HealthData.UsbPowerMode.client)
 | 
					
						
							|  |  |  |     BATT_PERC_OFF = 10 if LEON else 3
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     should_shutdown = False
 | 
					
						
							|  |  |  |     # Wait until we have shut down charging before powering down
 | 
					
						
							|  |  |  |     should_shutdown |= (not panda_charging and self.should_disable_charging(health, offroad_timestamp))
 | 
					
						
							|  |  |  |     should_shutdown |= ((HARDWARE.get_battery_capacity() < BATT_PERC_OFF) and (not HARDWARE.get_battery_charging()) and ((now - offroad_timestamp) > 60))
 | 
					
						
							|  |  |  |     should_shutdown &= started_seen
 | 
					
						
							|  |  |  |     return should_shutdown
 |