Offroad power monitoring (#1067)
* Untested implementation of offroad power monitoring
* Fixed some syntax errors
* Cast to int
* Fixed pylint
* Wrapped in class
* Put pulsed calc in own thread
* Longer timeout before starting pulse measurement
* Fudge factor + flake8
* Made integration thread-safe and catch charge disable exceptions
* Catch all calculation errors
* Fixed networkstrength removal
old-commit-hash: 992be20d63
commatwo_master
parent
0bb66220cd
commit
b0855ab17c
4 changed files with 160 additions and 16 deletions
@ -0,0 +1,145 @@ |
|||||||
|
import time |
||||||
|
import datetime |
||||||
|
import threading |
||||||
|
import random |
||||||
|
from statistics import mean |
||||||
|
from cereal import log |
||||||
|
|
||||||
|
PANDA_OUTPUT_VOLTAGE = 5.28 |
||||||
|
|
||||||
|
# Parameters |
||||||
|
def get_battery_capacity(): |
||||||
|
return _read_param("/sys/class/power_supply/battery/capacity", int) |
||||||
|
|
||||||
|
def get_battery_status(): |
||||||
|
# This does not correspond with actual charging or not. |
||||||
|
# If a USB cable is plugged in, it responds with 'Charging', even when charging is disabled |
||||||
|
return _read_param("/sys/class/power_supply/battery/status", lambda x: x.strip(), '') |
||||||
|
|
||||||
|
def get_battery_current(): |
||||||
|
return _read_param("/sys/class/power_supply/battery/current_now", int) |
||||||
|
|
||||||
|
def get_battery_voltage(): |
||||||
|
return _read_param("/sys/class/power_supply/battery/voltage_now", int) |
||||||
|
|
||||||
|
def get_usb_present(): |
||||||
|
return _read_param("/sys/class/power_supply/usb/present", lambda x: bool(int(x)), False) |
||||||
|
|
||||||
|
def get_battery_charging(): |
||||||
|
# This does correspond with actually charging |
||||||
|
return _read_param("/sys/class/power_supply/battery/charge_type", lambda x: x.strip() != "N/A", False) |
||||||
|
|
||||||
|
def set_battery_charging(on): |
||||||
|
with open('/sys/class/power_supply/battery/charging_enabled', 'w') as f: |
||||||
|
f.write(f"{1 if on else 0}\n") |
||||||
|
|
||||||
|
# Helpers |
||||||
|
def _read_param(path, parser, default=0): |
||||||
|
try: |
||||||
|
with open(path) as f: |
||||||
|
return parser(f.read()) |
||||||
|
except Exception: |
||||||
|
return default |
||||||
|
|
||||||
|
def panda_current_to_actual_current(panda_current): |
||||||
|
# From white/grey panda schematic |
||||||
|
return (3.3 - (panda_current * 3.3 / 4096)) / 8.25 |
||||||
|
|
||||||
|
class PowerMonitoring: |
||||||
|
def __init__(self): |
||||||
|
self.last_measurement_time = None # Used for integration delta |
||||||
|
self.power_used_uWh = 0 # Integrated power usage in uWh since going into offroad |
||||||
|
self.next_pulsed_measurement_time = None |
||||||
|
self.integration_lock = threading.Lock() |
||||||
|
|
||||||
|
# Calculation tick |
||||||
|
def calculate(self, health): |
||||||
|
try: |
||||||
|
now = time.time() |
||||||
|
|
||||||
|
# Check that time is valid |
||||||
|
if datetime.datetime.fromtimestamp(now).year < 2019: |
||||||
|
return |
||||||
|
|
||||||
|
# Only integrate when there is no ignition |
||||||
|
# If health is None, we're probably not in a car, so we don't care |
||||||
|
if health == None or (health.health.ignitionLine or health.health.ignitionCan): |
||||||
|
self.last_measurement_time = None |
||||||
|
self.power_used_uWh = 0 |
||||||
|
return |
||||||
|
|
||||||
|
# First measurement, set integration time |
||||||
|
if self.last_measurement_time == None: |
||||||
|
self.last_measurement_time = now |
||||||
|
return |
||||||
|
|
||||||
|
# Get current power draw somehow |
||||||
|
current_power = 0 |
||||||
|
if get_battery_status() == 'Discharging': |
||||||
|
# If the battery is discharging, we can use this measurement |
||||||
|
# On C2: this is low by about 10-15%, probably mostly due to UNO draw not being factored in |
||||||
|
current_power = ((get_battery_voltage() / 1000000) * (get_battery_current() / 1000000)) |
||||||
|
elif (health.health.hwType in [log.HealthData.HwType.whitePanda, log.HealthData.HwType.greyPanda]) and (health.health.current > 1): |
||||||
|
# If white/grey panda, use the integrated current measurements if the measurement is not 0 |
||||||
|
# If the measurement is 0, the current is 400mA or greater, and out of the measurement range of the panda |
||||||
|
# This seems to be accurate to about 5% |
||||||
|
current_power = (PANDA_OUTPUT_VOLTAGE * panda_current_to_actual_current(health.health.current)) |
||||||
|
elif (self.next_pulsed_measurement_time != None) and (self.next_pulsed_measurement_time <= now): |
||||||
|
# TODO: Figure out why this is off by a factor of 3/4??? |
||||||
|
FUDGE_FACTOR = 1.33 |
||||||
|
|
||||||
|
# Turn off charging for about 10 sec in a thread that does not get killed on SIGINT, and perform measurement here to avoid blocking thermal |
||||||
|
def perform_pulse_measurement(now): |
||||||
|
try: |
||||||
|
set_battery_charging(False) |
||||||
|
time.sleep(5) |
||||||
|
|
||||||
|
# Measure for a few sec to get a good average |
||||||
|
voltages = [] |
||||||
|
currents = [] |
||||||
|
for i in range(6): |
||||||
|
voltages.append(get_battery_voltage()) |
||||||
|
currents.append(get_battery_current()) |
||||||
|
time.sleep(1) |
||||||
|
current_power = ((mean(voltages) / 1000000) * (mean(currents) / 1000000)) |
||||||
|
self._perform_integration(now, current_power * FUDGE_FACTOR) |
||||||
|
|
||||||
|
# Enable charging again |
||||||
|
set_battery_charging(True) |
||||||
|
except Exception as e: |
||||||
|
print("Pulsed power measurement failed:", str(e)) |
||||||
|
|
||||||
|
# Start pulsed measurement and return |
||||||
|
threading.Thread(target=perform_pulse_measurement, args=(now,)).start() |
||||||
|
self.next_pulsed_measurement_time = None |
||||||
|
return |
||||||
|
|
||||||
|
elif self.next_pulsed_measurement_time == None: |
||||||
|
# On a charging EON with black panda, or drawing more than 400mA out of a white/grey one |
||||||
|
# Only way to get the power draw is to turn off charging for a few sec and check what the discharging rate is |
||||||
|
# We shouldn't do this very often, so make sure it has been some long-ish random time interval |
||||||
|
self.next_pulsed_measurement_time = now + random.randint(120, 180) |
||||||
|
return |
||||||
|
else: |
||||||
|
# Do nothing |
||||||
|
return |
||||||
|
|
||||||
|
# Do the integration |
||||||
|
self._perform_integration(now, current_power) |
||||||
|
except Exception as e: |
||||||
|
print("Power monitoring calculation failed:", str(e)) |
||||||
|
|
||||||
|
def _perform_integration(self, t, current_power): |
||||||
|
self.integration_lock.acquire() |
||||||
|
integration_time_h = (t - self.last_measurement_time) / 3600 |
||||||
|
self.power_used_uWh += (current_power * 1000000) * integration_time_h |
||||||
|
self.last_measurement_time = t |
||||||
|
self.integration_lock.release() |
||||||
|
|
||||||
|
# Get the power usage |
||||||
|
def get_power_used(self): |
||||||
|
return int(self.power_used_uWh) |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in new issue