Car power integrator + power management refactor (#1994)
* wip, ready to test
* tweaks
* fix
* fix
* fix power monitoring
* fix param writing
* no forced charging on high voltage
* reset capacity on reboot
* don't shutdown unless started seen
* fix unused var warning
* fix linting errors
* time is always valid
* QCOM gate
* Local params
* decimate saving
* fix linting
* rename param
* Log car battery capacity
* fix put_nonblocking
* Added some unit tests
* Add test to docker test list
* fix precommit
* cleanup
* run tests in CI
* bump cereal
Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: 7555379b2b
commatwo_master
parent
bf750511a1
commit
bcaf2a36af
9 changed files with 382 additions and 100 deletions
@ -1 +1 @@ |
||||
Subproject commit d66afca4ac316456711cb80c8e8e2fe91431e1e2 |
||||
Subproject commit 0d2ce45fc681f90b33fbcd11e5d80dd294ef751b |
@ -0,0 +1,221 @@ |
||||
#!/usr/bin/env python3 |
||||
import unittest |
||||
from unittest.mock import patch |
||||
from parameterized import parameterized |
||||
|
||||
from cereal import log |
||||
import cereal.messaging as messaging |
||||
from common.params import Params |
||||
params = Params() |
||||
|
||||
# Create fake time |
||||
ssb = 0 |
||||
def mock_sec_since_boot(): |
||||
global ssb |
||||
ssb += 1 |
||||
return ssb |
||||
|
||||
with patch("common.realtime.sec_since_boot", new=mock_sec_since_boot): |
||||
with patch("common.params.put_nonblocking", new=params.put): |
||||
from selfdrive.thermald.power_monitoring import PowerMonitoring, CAR_BATTERY_CAPACITY_uWh, \ |
||||
PANDA_OUTPUT_VOLTAGE, CAR_CHARGING_RATE_W, \ |
||||
VBATT_PAUSE_CHARGING |
||||
|
||||
def actual_current_to_panda_current(actual_current): |
||||
return max(int(((3.3 - (actual_current * 8.25)) * 4096) / 3.3), 0) |
||||
|
||||
TEST_DURATION_S = 50 |
||||
ALL_PANDA_TYPES = [(hw_type,) for hw_type in [log.HealthData.HwType.whitePanda, |
||||
log.HealthData.HwType.greyPanda, |
||||
log.HealthData.HwType.blackPanda, |
||||
log.HealthData.HwType.uno]] |
||||
|
||||
def pm_patch(name, value, constant=False): |
||||
if constant: |
||||
return patch(f"selfdrive.thermald.power_monitoring.{name}", value) |
||||
return patch(f"selfdrive.thermald.power_monitoring.{name}", return_value=value) |
||||
|
||||
class TestPowerMonitoring(unittest.TestCase): |
||||
def setUp(self): |
||||
# Clear stored capacity before each test |
||||
params.delete("CarBatteryCapacity") |
||||
params.delete("DisablePowerDown") |
||||
|
||||
def mock_health(self, ignition, hw_type, car_voltage=12, current=0): |
||||
health = messaging.new_message('health') |
||||
health.health.hwType = hw_type |
||||
health.health.voltage = car_voltage * 1e3 |
||||
health.health.current = actual_current_to_panda_current(current) |
||||
health.health.ignitionLine = ignition |
||||
health.health.ignitionCan = False |
||||
return health |
||||
|
||||
# Test to see that it doesn't do anything when health is None |
||||
def test_health_present(self): |
||||
pm = PowerMonitoring() |
||||
for _ in range(10): |
||||
pm.calculate(None) |
||||
self.assertEqual(pm.get_power_used(), 0) |
||||
self.assertEqual(pm.get_car_battery_capacity(), (CAR_BATTERY_CAPACITY_uWh / 10)) |
||||
|
||||
# Test to see that it doesn't integrate offroad when ignition is True |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_offroad_ignition(self, hw_type): |
||||
pm = PowerMonitoring() |
||||
for _ in range(10): |
||||
pm.calculate(self.mock_health(True, hw_type)) |
||||
self.assertEqual(pm.get_power_used(), 0) |
||||
|
||||
# Test to see that it integrates with white/grey panda while charging |
||||
@parameterized.expand([(log.HealthData.HwType.whitePanda,), (log.HealthData.HwType.greyPanda,)]) |
||||
def test_offroad_integration_white(self, hw_type): |
||||
with pm_patch("get_battery_voltage", 4e6), pm_patch("get_battery_current", 1e5), pm_patch("get_battery_status", "Charging"): |
||||
pm = PowerMonitoring() |
||||
for _ in range(TEST_DURATION_S + 1): |
||||
pm.calculate(self.mock_health(False, hw_type, current=0.1)) |
||||
expected_power_usage = ((TEST_DURATION_S/3600) * (0.1 * PANDA_OUTPUT_VOLTAGE) * 1e6) |
||||
self.assertLess(abs(pm.get_power_used() - expected_power_usage), 10) |
||||
|
||||
# Test to see that it integrates with discharging battery |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_offroad_integration_discharging(self, hw_type): |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 1 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
for _ in range(TEST_DURATION_S + 1): |
||||
pm.calculate(self.mock_health(False, hw_type)) |
||||
expected_power_usage = ((TEST_DURATION_S/3600) * (BATT_VOLTAGE * BATT_CURRENT) * 1e6) |
||||
self.assertLess(abs(pm.get_power_used() - expected_power_usage), 10) |
||||
|
||||
# Test to check positive integration of car_battery_capacity |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_car_battery_integration_onroad(self, hw_type): |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 1 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = 0 |
||||
for _ in range(TEST_DURATION_S + 1): |
||||
pm.calculate(self.mock_health(True, hw_type)) |
||||
expected_capacity = ((TEST_DURATION_S/3600) * CAR_CHARGING_RATE_W * 1e6) |
||||
self.assertLess(abs(pm.get_car_battery_capacity() - expected_capacity), 10) |
||||
|
||||
# Test to check positive integration upper limit |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_car_battery_integration_upper_limit(self, hw_type): |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 1 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = CAR_BATTERY_CAPACITY_uWh - 1000 |
||||
for _ in range(TEST_DURATION_S + 1): |
||||
pm.calculate(self.mock_health(True, hw_type)) |
||||
estimated_capacity = CAR_BATTERY_CAPACITY_uWh + (CAR_CHARGING_RATE_W / 3600 * 1e6) |
||||
self.assertLess(abs(pm.get_car_battery_capacity() - estimated_capacity), 10) |
||||
|
||||
# Test to check negative integration of car_battery_capacity |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_car_battery_integration_offroad(self, hw_type): |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 1 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = CAR_BATTERY_CAPACITY_uWh |
||||
for _ in range(TEST_DURATION_S + 1): |
||||
pm.calculate(self.mock_health(False, hw_type)) |
||||
expected_capacity = CAR_BATTERY_CAPACITY_uWh - ((TEST_DURATION_S/3600) * (BATT_VOLTAGE * BATT_CURRENT) * 1e6) |
||||
self.assertLess(abs(pm.get_car_battery_capacity() - expected_capacity), 10) |
||||
|
||||
# Test to check negative integration lower limit |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_car_battery_integration_lower_limit(self, hw_type): |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 1 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = 1000 |
||||
for _ in range(TEST_DURATION_S + 1): |
||||
pm.calculate(self.mock_health(False, hw_type)) |
||||
estimated_capacity = 0 - ((1/3600) * (BATT_VOLTAGE * BATT_CURRENT) * 1e6) |
||||
self.assertLess(abs(pm.get_car_battery_capacity() - estimated_capacity), 10) |
||||
|
||||
# Test to check policy of stopping charging after MAX_TIME_OFFROAD_S |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_max_time_offroad(self, hw_type): |
||||
global ssb |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 0 # To stop shutting down for other reasons |
||||
MOCKED_MAX_OFFROAD_TIME = 3600 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"), pm_patch("MAX_TIME_OFFROAD_S", MOCKED_MAX_OFFROAD_TIME, constant=True): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = CAR_BATTERY_CAPACITY_uWh |
||||
start_time = ssb |
||||
health = self.mock_health(False, hw_type) |
||||
while ssb <= start_time + MOCKED_MAX_OFFROAD_TIME: |
||||
pm.calculate(health) |
||||
if (ssb - start_time) % 1000 == 0 and ssb < start_time + MOCKED_MAX_OFFROAD_TIME: |
||||
self.assertFalse(pm.should_disable_charging(health, start_time)) |
||||
self.assertTrue(pm.should_disable_charging(health, start_time)) |
||||
|
||||
# Test to check policy of stopping charging when the car voltage is too low |
||||
@parameterized.expand(ALL_PANDA_TYPES) |
||||
def test_car_voltage(self, hw_type): |
||||
global ssb |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 0 # To stop shutting down for other reasons |
||||
TEST_TIME = 100 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = CAR_BATTERY_CAPACITY_uWh |
||||
health = self.mock_health(False, hw_type, car_voltage=(VBATT_PAUSE_CHARGING - 1)) |
||||
for i in range(TEST_TIME): |
||||
pm.calculate(health) |
||||
if i % 10 == 0: |
||||
self.assertEqual(pm.should_disable_charging(health, ssb), (pm.car_voltage_mV < VBATT_PAUSE_CHARGING*1e3)) |
||||
self.assertTrue(pm.should_disable_charging(health, ssb)) |
||||
|
||||
# Test to check policy of not stopping charging when DisablePowerDown is set |
||||
def test_disable_power_down(self): |
||||
global ssb |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 0 # To stop shutting down for other reasons |
||||
TEST_TIME = 100 |
||||
params.put("DisablePowerDown", b"1") |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = CAR_BATTERY_CAPACITY_uWh |
||||
health = self.mock_health(False, log.HealthData.HwType.uno, car_voltage=(VBATT_PAUSE_CHARGING - 1)) |
||||
for i in range(TEST_TIME): |
||||
pm.calculate(health) |
||||
if i % 10 == 0: |
||||
self.assertFalse(pm.should_disable_charging(health, ssb)) |
||||
self.assertFalse(pm.should_disable_charging(health, ssb)) |
||||
|
||||
# Test to check policy of not stopping charging when ignition |
||||
def test_ignition(self): |
||||
global ssb |
||||
BATT_VOLTAGE = 4 |
||||
BATT_CURRENT = 0 # To stop shutting down for other reasons |
||||
TEST_TIME = 100 |
||||
with pm_patch("get_battery_voltage", BATT_VOLTAGE * 1e6), pm_patch("get_battery_current", BATT_CURRENT * 1e6), \ |
||||
pm_patch("get_battery_status", "Discharging"): |
||||
pm = PowerMonitoring() |
||||
pm.car_battery_capacity_uWh = CAR_BATTERY_CAPACITY_uWh |
||||
health = self.mock_health(True, log.HealthData.HwType.uno, car_voltage=(VBATT_PAUSE_CHARGING - 1)) |
||||
for i in range(TEST_TIME): |
||||
pm.calculate(health) |
||||
if i % 10 == 0: |
||||
self.assertFalse(pm.should_disable_charging(health, ssb)) |
||||
self.assertFalse(pm.should_disable_charging(health, ssb)) |
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
Loading…
Reference in new issue