openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

238 lines
9.7 KiB

import random
import threading
import time
from statistics import mean
from cereal import log
from common.realtime import sec_since_boot
from common.params import Params, put_nonblocking
from common.hardware import TICI
from selfdrive.swaglog import cloudlog
PANDA_OUTPUT_VOLTAGE = 5.28
CAR_VOLTAGE_LOW_PASS_K = 0.091 # LPF gain for 5s tau (dt/tau / (dt/tau + 1))
# A C2 uses about 1W while idling, and 30h seens like a good shutoff for most cars
# While driving, a battery charges completely in about 30-60 minutes
CAR_BATTERY_CAPACITY_uWh = 30e6
CAR_CHARGING_RATE_W = 45
VBATT_PAUSE_CHARGING = 11.0
MAX_TIME_OFFROAD_S = 30*3600
# Parameters
def get_battery_capacity():
return _read_param("/sys/class/power_supply/battery/capacity", int)
def get_battery_status():
# This does not correspond with actual charging or not.
# If a USB cable is plugged in, it responds with 'Charging', even when charging is disabled
return _read_param("/sys/class/power_supply/battery/status", lambda x: x.strip(), '')
def get_battery_current():
return _read_param("/sys/class/power_supply/battery/current_now", int)
def get_battery_voltage():
return _read_param("/sys/class/power_supply/battery/voltage_now", int)
def get_usb_present():
return _read_param("/sys/class/power_supply/usb/present", lambda x: bool(int(x)), False)
def get_battery_charging():
# This does correspond with actually charging
return _read_param("/sys/class/power_supply/battery/charge_type", lambda x: x.strip() != "N/A", True)
def set_battery_charging(on):
with open('/sys/class/power_supply/battery/charging_enabled', 'w') as f:
f.write(f"{1 if on else 0}\n")
# Helpers
def _read_param(path, parser, default=0):
try:
with open(path) as f:
return parser(f.read())
except Exception:
return default
def panda_current_to_actual_current(panda_current):
# From white/grey panda schematic
return (3.3 - (panda_current * 3.3 / 4096)) / 8.25
class PowerMonitoring:
def __init__(self):
self.params = Params()
self.last_measurement_time = None # Used for integration delta
self.last_save_time = 0 # Used for saving current value in a param
self.power_used_uWh = 0 # Integrated power usage in uWh since going into offroad
self.next_pulsed_measurement_time = None
self.car_voltage_mV = 12e3 # Low-passed version of health voltage
self.integration_lock = threading.Lock()
car_battery_capacity_uWh = self.params.get("CarBatteryCapacity")
if car_battery_capacity_uWh is None:
car_battery_capacity_uWh = 0
# Reset capacity if it's low
self.car_battery_capacity_uWh = max((CAR_BATTERY_CAPACITY_uWh / 10), int(car_battery_capacity_uWh))
# Calculation tick
def calculate(self, health):
try:
now = sec_since_boot()
# If health is None, we're probably not in a car, so we don't care
if health is None or health.health.hwType == log.HealthData.HwType.unknown:
with self.integration_lock:
self.last_measurement_time = None
self.next_pulsed_measurement_time = None
self.power_used_uWh = 0
return
# Low-pass battery voltage
self.car_voltage_mV = ((health.health.voltage * CAR_VOLTAGE_LOW_PASS_K) + (self.car_voltage_mV * (1 - CAR_VOLTAGE_LOW_PASS_K)))
# Cap the car battery power and save it in a param every 10-ish seconds
self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)
self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh)
if now - self.last_save_time >= 10:
put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh)))
self.last_save_time = now
# First measurement, set integration time
with self.integration_lock:
if self.last_measurement_time is None:
self.last_measurement_time = now
return
if (health.health.ignitionLine or health.health.ignitionCan):
# If there is ignition, we integrate the charging rate of the car
with self.integration_lock:
self.power_used_uWh = 0
integration_time_h = (now - self.last_measurement_time) / 3600
if integration_time_h < 0:
raise ValueError(f"Negative integration time: {integration_time_h}h")
self.car_battery_capacity_uWh += (CAR_CHARGING_RATE_W * 1e6 * integration_time_h)
self.last_measurement_time = now
else:
# No ignition, we integrate the offroad power used by the device
is_uno = health.health.hwType == log.HealthData.HwType.uno
# Get current power draw somehow
current_power = 0
if TICI:
with open("/sys/class/hwmon/hwmon1/power1_input") as f:
current_power = int(f.read()) / 1e6
elif get_battery_status() == 'Discharging':
# If the battery is discharging, we can use this measurement
# On C2: this is low by about 10-15%, probably mostly due to UNO draw not being factored in
current_power = ((get_battery_voltage() / 1000000) * (get_battery_current() / 1000000))
elif (health.health.hwType in [log.HealthData.HwType.whitePanda, log.HealthData.HwType.greyPanda]) and (health.health.current > 1):
# If white/grey panda, use the integrated current measurements if the measurement is not 0
# If the measurement is 0, the current is 400mA or greater, and out of the measurement range of the panda
# This seems to be accurate to about 5%
current_power = (PANDA_OUTPUT_VOLTAGE * panda_current_to_actual_current(health.health.current))
elif (self.next_pulsed_measurement_time is not None) and (self.next_pulsed_measurement_time <= now):
# TODO: Figure out why this is off by a factor of 3/4???
FUDGE_FACTOR = 1.33
# Turn off charging for about 10 sec in a thread that does not get killed on SIGINT, and perform measurement here to avoid blocking thermal
def perform_pulse_measurement(now):
try:
set_battery_charging(False)
time.sleep(5)
# Measure for a few sec to get a good average
voltages = []
currents = []
for _ in range(6):
voltages.append(get_battery_voltage())
currents.append(get_battery_current())
time.sleep(1)
current_power = ((mean(voltages) / 1000000) * (mean(currents) / 1000000))
self._perform_integration(now, current_power * FUDGE_FACTOR)
# Enable charging again
set_battery_charging(True)
except Exception:
cloudlog.exception("Pulsed power measurement failed")
# Start pulsed measurement and return
threading.Thread(target=perform_pulse_measurement, args=(now,)).start()
self.next_pulsed_measurement_time = None
return
elif self.next_pulsed_measurement_time is None and not is_uno:
# On a charging EON with black panda, or drawing more than 400mA out of a white/grey one
# Only way to get the power draw is to turn off charging for a few sec and check what the discharging rate is
# We shouldn't do this very often, so make sure it has been some long-ish random time interval
self.next_pulsed_measurement_time = now + random.randint(120, 180)
return
else:
# Do nothing
return
# Do the integration
self._perform_integration(now, current_power)
except Exception:
cloudlog.exception("Power monitoring calculation failed")
def _perform_integration(self, t, current_power):
with self.integration_lock:
try:
if self.last_measurement_time:
integration_time_h = (t - self.last_measurement_time) / 3600
power_used = (current_power * 1000000) * integration_time_h
if power_used < 0:
raise ValueError(f"Negative power used! Integration time: {integration_time_h} h Current Power: {power_used} uWh")
self.power_used_uWh += power_used
self.car_battery_capacity_uWh -= power_used
self.last_measurement_time = t
except Exception:
cloudlog.exception("Integration failed")
# Get the power usage
def get_power_used(self):
return int(self.power_used_uWh)
def get_car_battery_capacity(self):
return int(self.car_battery_capacity_uWh)
# See if we need to disable charging
def should_disable_charging(self, health, offroad_timestamp):
if health is None or offroad_timestamp is None:
return False
now = sec_since_boot()
disable_charging = False
disable_charging |= (now - offroad_timestamp) > MAX_TIME_OFFROAD_S
disable_charging |= (self.car_voltage_mV < (VBATT_PAUSE_CHARGING * 1e3))
disable_charging |= (self.car_battery_capacity_uWh <= 0)
disable_charging &= (not health.health.ignitionLine and not health.health.ignitionCan)
disable_charging &= (self.params.get("DisablePowerDown") != b"1")
return disable_charging
# See if we need to shutdown
def should_shutdown(self, health, offroad_timestamp, started_seen, LEON):
if health is None or offroad_timestamp is None:
return False
now = sec_since_boot()
panda_charging = (health.health.usbPowerMode != log.HealthData.UsbPowerMode.client)
BATT_PERC_OFF = 10 if LEON else 3
should_shutdown = False
# Wait until we have shut down charging before powering down
should_shutdown |= (not panda_charging and self.should_disable_charging(health, offroad_timestamp))
should_shutdown |= ((get_battery_capacity() < BATT_PERC_OFF) and (not get_battery_charging()) and ((now - offroad_timestamp) > 60))
should_shutdown &= started_seen
return should_shutdown