common/params: support nonblocking write (#29808)

* Safe and efficient asynchronous writing parameters

* call putNonBlocking in locationd

* remove space

* ->AsyncWriter

* remove semicolon

* use member function

* asyc write multiple times

* add test case for AsyncWriter

* merge master

* add missing include

* public

* cleanup

* create once

* cleanup

* update that

* explicit waiting

* improve test case

* pass prefix to asywriter

* move to params

* assert(queue.empty())

* add comment

* add todo

* test_power_monitoring: remove patch

* rm laikad.py

* fix import

---------

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
pull/30721/head
Dean Lee 1 year ago committed by GitHub
parent fcc671297e
commit 3c4c4d1f7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      common/SConscript
  2. 30
      common/params.cc
  3. 20
      common/params.h
  4. 5
      common/params.py
  5. 22
      common/params_pyx.pyx
  6. 27
      common/tests/test_params.cc
  7. 6
      common/tests/test_params.py
  8. 8
      selfdrive/controls/controlsd.py
  9. 8
      selfdrive/locationd/calibrationd.py
  10. 8
      selfdrive/locationd/locationd.cc
  11. 4
      selfdrive/locationd/paramsd.py
  12. 7
      selfdrive/monitoring/dmonitoringd.py
  13. 4
      selfdrive/thermald/power_monitoring.py
  14. 2
      selfdrive/thermald/tests/test_power_monitoring.py

@ -24,7 +24,7 @@ Export('_common', '_gpucommon')
if GetOption('extras'): if GetOption('extras'):
env.Program('tests/test_common', env.Program('tests/test_common',
['tests/test_runner.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'], ['tests/test_runner.cc', 'tests/test_params.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'],
LIBS=[_common, 'json11', 'zmq', 'pthread']) LIBS=[_common, 'json11', 'zmq', 'pthread'])
# Cython bindings # Cython bindings

@ -4,9 +4,11 @@
#include <sys/file.h> #include <sys/file.h>
#include <algorithm> #include <algorithm>
#include <cassert>
#include <csignal> #include <csignal>
#include <unordered_map> #include <unordered_map>
#include "common/queue.h"
#include "common/swaglog.h" #include "common/swaglog.h"
#include "common/util.h" #include "common/util.h"
#include "system/hardware/hw.h" #include "system/hardware/hw.h"
@ -214,8 +216,15 @@ std::unordered_map<std::string, uint32_t> keys = {
Params::Params(const std::string &path) { Params::Params(const std::string &path) {
prefix = "/" + util::getenv("OPENPILOT_PREFIX", "d"); params_prefix = "/" + util::getenv("OPENPILOT_PREFIX", "d");
params_path = ensure_params_path(prefix, path); params_path = ensure_params_path(params_prefix, path);
}
Params::~Params() {
if (future.valid()) {
future.wait();
}
assert(queue.empty());
} }
std::vector<std::string> Params::allKeys() const { std::vector<std::string> Params::allKeys() const {
@ -328,3 +337,20 @@ void Params::clearAll(ParamKeyType key_type) {
fsync_dir(getParamPath()); fsync_dir(getParamPath());
} }
void Params::putNonBlocking(const std::string &key, const std::string &val) {
queue.push(std::make_pair(key, val));
// start thread on demand
if (!future.valid() || future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) {
future = std::async(std::launch::async, &Params::asyncWriteThread, this);
}
}
void Params::asyncWriteThread() {
// TODO: write the latest one if a key has multiple values in the queue.
std::pair<std::string, std::string> p;
while (queue.try_pop(p, 0)) {
// Params::put is Thread-Safe
put(p.first, p.second);
}
}

@ -1,9 +1,14 @@
#pragma once #pragma once
#include <future>
#include <map> #include <map>
#include <string> #include <string>
#include <tuple>
#include <utility>
#include <vector> #include <vector>
#include "common/queue.h"
enum ParamKeyType { enum ParamKeyType {
PERSISTENT = 0x02, PERSISTENT = 0x02,
CLEAR_ON_MANAGER_START = 0x04, CLEAR_ON_MANAGER_START = 0x04,
@ -17,6 +22,7 @@ enum ParamKeyType {
class Params { class Params {
public: public:
explicit Params(const std::string &path = {}); explicit Params(const std::string &path = {});
~Params();
// Not copyable. // Not copyable.
Params(const Params&) = delete; Params(const Params&) = delete;
Params& operator=(const Params&) = delete; Params& operator=(const Params&) = delete;
@ -25,7 +31,7 @@ public:
bool checkKey(const std::string &key); bool checkKey(const std::string &key);
ParamKeyType getKeyType(const std::string &key); ParamKeyType getKeyType(const std::string &key);
inline std::string getParamPath(const std::string &key = {}) { inline std::string getParamPath(const std::string &key = {}) {
return params_path + prefix + (key.empty() ? "" : "/" + key); return params_path + params_prefix + (key.empty() ? "" : "/" + key);
} }
// Delete a value // Delete a value
@ -47,8 +53,18 @@ public:
inline int putBool(const std::string &key, bool val) { inline int putBool(const std::string &key, bool val) {
return put(key.c_str(), val ? "1" : "0", 1); return put(key.c_str(), val ? "1" : "0", 1);
} }
void putNonBlocking(const std::string &key, const std::string &val);
inline void putBoolNonBlocking(const std::string &key, bool val) {
putNonBlocking(key, val ? "1" : "0");
}
private: private:
void asyncWriteThread();
std::string params_path; std::string params_path;
std::string prefix; std::string params_prefix;
// for nonblocking write
std::future<void> future;
SafeQueue<std::pair<std::string, std::string>> queue;
}; };

@ -1,10 +1,7 @@
from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName, put_nonblocking, \ from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName
put_bool_nonblocking
assert Params assert Params
assert ParamKeyType assert ParamKeyType
assert UnknownKeyName assert UnknownKeyName
assert put_nonblocking
assert put_bool_nonblocking
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys

@ -3,7 +3,6 @@
from libcpp cimport bool from libcpp cimport bool
from libcpp.string cimport string from libcpp.string cimport string
from libcpp.vector cimport vector from libcpp.vector cimport vector
import threading
cdef extern from "common/params.h": cdef extern from "common/params.h":
cpdef enum ParamKeyType: cpdef enum ParamKeyType:
@ -20,6 +19,8 @@ cdef extern from "common/params.h":
bool getBool(string, bool) nogil bool getBool(string, bool) nogil
int remove(string) nogil int remove(string) nogil
int put(string, string) nogil int put(string, string) nogil
void putNonBlocking(string, string) nogil
void putBoolNonBlocking(string, bool) nogil
int putBool(string, bool) nogil int putBool(string, bool) nogil
bool checkKey(string) nogil bool checkKey(string) nogil
string getParamPath(string) nogil string getParamPath(string) nogil
@ -80,7 +81,7 @@ cdef class Params:
""" """
Warning: This function blocks until the param is written to disk! Warning: This function blocks until the param is written to disk!
In very rare cases this can take over a second, and your code will hang. In very rare cases this can take over a second, and your code will hang.
Use the put_nonblocking helper function in time sensitive code, but Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but
in general try to avoid writing params as much as possible. in general try to avoid writing params as much as possible.
""" """
cdef string k = self.check_key(key) cdef string k = self.check_key(key)
@ -93,6 +94,17 @@ cdef class Params:
with nogil: with nogil:
self.p.putBool(k, val) self.p.putBool(k, val)
def put_nonblocking(self, key, dat):
cdef string k = self.check_key(key)
cdef string dat_bytes = ensure_bytes(dat)
with nogil:
self.p.putNonBlocking(k, dat_bytes)
def put_bool_nonblocking(self, key, bool val):
cdef string k = self.check_key(key)
with nogil:
self.p.putBoolNonBlocking(k, val)
def remove(self, key): def remove(self, key):
cdef string k = self.check_key(key) cdef string k = self.check_key(key)
with nogil: with nogil:
@ -104,9 +116,3 @@ cdef class Params:
def all_keys(self): def all_keys(self):
return self.p.allKeys() return self.p.allKeys()
def put_nonblocking(key, val, d=""):
threading.Thread(target=lambda: Params(d).put(key, val)).start()
def put_bool_nonblocking(key, bool val, d=""):
threading.Thread(target=lambda: Params(d).put_bool(key, val)).start()

@ -0,0 +1,27 @@
#include "catch2/catch.hpp"
#define private public
#include "common/params.h"
#include "common/util.h"
TEST_CASE("params_nonblocking_put") {
char tmp_path[] = "/tmp/asyncWriter_XXXXXX";
const std::string param_path = mkdtemp(tmp_path);
auto param_names = {"CarParams", "IsMetric"};
{
Params params(param_path);
for (const auto &name : param_names) {
params.putNonBlocking(name, "1");
// param is empty
REQUIRE(params.get(name).empty());
}
// check if thread is running
REQUIRE(params.future.valid());
REQUIRE(params.future.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout);
}
// check results
Params p(param_path);
for (const auto &name : param_names) {
REQUIRE(p.get(name) == "1");
}
}

@ -4,7 +4,7 @@ import time
import uuid import uuid
import unittest import unittest
from openpilot.common.params import Params, ParamKeyType, UnknownKeyName, put_nonblocking, put_bool_nonblocking from openpilot.common.params import Params, ParamKeyType, UnknownKeyName
class TestParams(unittest.TestCase): class TestParams(unittest.TestCase):
def setUp(self): def setUp(self):
@ -86,7 +86,7 @@ class TestParams(unittest.TestCase):
q = Params() q = Params()
def _delayed_writer(): def _delayed_writer():
time.sleep(0.1) time.sleep(0.1)
put_nonblocking("CarParams", "test") Params().put_nonblocking("CarParams", "test")
threading.Thread(target=_delayed_writer).start() threading.Thread(target=_delayed_writer).start()
assert q.get("CarParams") is None assert q.get("CarParams") is None
assert q.get("CarParams", True) == b"test" assert q.get("CarParams", True) == b"test"
@ -95,7 +95,7 @@ class TestParams(unittest.TestCase):
q = Params() q = Params()
def _delayed_writer(): def _delayed_writer():
time.sleep(0.1) time.sleep(0.1)
put_bool_nonblocking("CarParams", True) Params().put_bool_nonblocking("CarParams", True)
threading.Thread(target=_delayed_writer).start() threading.Thread(target=_delayed_writer).start()
assert q.get("CarParams") is None assert q.get("CarParams") is None
assert q.get("CarParams", True) == b"1" assert q.get("CarParams", True) == b"1"

@ -9,7 +9,7 @@ from cereal import car, log
from openpilot.common.numpy_fast import clip from openpilot.common.numpy_fast import clip
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL
from openpilot.common.profiler import Profiler from openpilot.common.profiler import Profiler
from openpilot.common.params import Params, put_nonblocking, put_bool_nonblocking from openpilot.common.params import Params
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal.visionipc import VisionIpcClient, VisionStreamType from cereal.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.conversions import Conversions as CV from openpilot.common.conversions import Conversions as CV
@ -128,8 +128,8 @@ class Controls:
# Write CarParams for radard # Write CarParams for radard
cp_bytes = self.CP.to_bytes() cp_bytes = self.CP.to_bytes()
self.params.put("CarParams", cp_bytes) self.params.put("CarParams", cp_bytes)
put_nonblocking("CarParamsCache", cp_bytes) self.params.put_nonblocking("CarParamsCache", cp_bytes)
put_nonblocking("CarParamsPersistent", cp_bytes) self.params.put_nonblocking("CarParamsPersistent", cp_bytes)
# cleanup old params # cleanup old params
if not self.CP.experimentalLongitudinalAvailable: if not self.CP.experimentalLongitudinalAvailable:
@ -449,7 +449,7 @@ class Controls:
self.initialized = True self.initialized = True
self.set_initial_state() self.set_initial_state()
put_bool_nonblocking("ControlsReady", True) self.params.put_bool_nonblocking("ControlsReady", True)
# Check for CAN timeout # Check for CAN timeout
if not can_strs: if not can_strs:

@ -15,7 +15,7 @@ from typing import List, NoReturn, Optional
from cereal import log from cereal import log
import cereal.messaging as messaging import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV from openpilot.common.conversions import Conversions as CV
from openpilot.common.params import Params, put_nonblocking from openpilot.common.params import Params
from openpilot.common.realtime import set_realtime_priority from openpilot.common.realtime import set_realtime_priority
from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
@ -64,8 +64,8 @@ class Calibrator:
self.not_car = False self.not_car = False
# Read saved calibration # Read saved calibration
params = Params() self.params = Params()
calibration_params = params.get("CalibrationParams") calibration_params = self.params.get("CalibrationParams")
rpy_init = RPY_INIT rpy_init = RPY_INIT
wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT
height = HEIGHT_INIT height = HEIGHT_INIT
@ -164,7 +164,7 @@ class Calibrator:
write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5) write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5)
if self.param_put and write_this_cycle: if self.param_put and write_this_cycle:
put_nonblocking("CalibrationParams", self.get_msg(True).to_bytes()) self.params.put_nonblocking("CalibrationParams", self.get_msg(True).to_bytes())
def handle_v_ego(self, v_ego: float) -> None: def handle_v_ego(self, v_ego: float) -> None:
self.v_ego = v_ego self.v_ego = v_ego

@ -678,9 +678,10 @@ void Localizer::configure_gnss_source(const LocalizerGnssSource &source) {
} }
int Localizer::locationd_thread() { int Localizer::locationd_thread() {
Params params;
LocalizerGnssSource source; LocalizerGnssSource source;
const char* gps_location_socket; const char* gps_location_socket;
if (Params().getBool("UbloxAvailable")) { if (params.getBool("UbloxAvailable")) {
source = LocalizerGnssSource::UBLOX; source = LocalizerGnssSource::UBLOX;
gps_location_socket = "gpsLocationExternal"; gps_location_socket = "gpsLocationExternal";
} else { } else {
@ -737,10 +738,7 @@ int Localizer::locationd_thread() {
VectorXd posGeo = this->get_position_geodetic(); VectorXd posGeo = this->get_position_geodetic();
std::string lastGPSPosJSON = util::string_format( std::string lastGPSPosJSON = util::string_format(
"{\"latitude\": %.15f, \"longitude\": %.15f, \"altitude\": %.15f}", posGeo(0), posGeo(1), posGeo(2)); "{\"latitude\": %.15f, \"longitude\": %.15f, \"altitude\": %.15f}", posGeo(0), posGeo(1), posGeo(2));
params.putNonBlocking("LastGPSPosition", lastGPSPosJSON);
std::thread([] (const std::string gpsjson) {
Params().put("LastGPSPosition", gpsjson);
}, lastGPSPosJSON).detach();
} }
cnt++; cnt++;
} }

@ -7,7 +7,7 @@ import numpy as np
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import car from cereal import car
from cereal import log from cereal import log
from openpilot.common.params import Params, put_nonblocking from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, DT_MDL from openpilot.common.realtime import config_realtime_process, DT_MDL
from openpilot.common.numpy_fast import clip from openpilot.common.numpy_fast import clip
from openpilot.selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States from openpilot.selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States
@ -251,7 +251,7 @@ def main():
'stiffnessFactor': liveParameters.stiffnessFactor, 'stiffnessFactor': liveParameters.stiffnessFactor,
'angleOffsetAverageDeg': liveParameters.angleOffsetAverageDeg, 'angleOffsetAverageDeg': liveParameters.angleOffsetAverageDeg,
} }
put_nonblocking("LiveParameters", json.dumps(params)) params_reader.put_nonblocking("LiveParameters", json.dumps(params))
pm.send('liveParameters', msg) pm.send('liveParameters', msg)

@ -3,7 +3,7 @@ import gc
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import car from cereal import car
from openpilot.common.params import Params, put_bool_nonblocking from openpilot.common.params import Params
from openpilot.common.realtime import set_realtime_priority from openpilot.common.realtime import set_realtime_priority
from openpilot.selfdrive.controls.lib.events import Events from openpilot.selfdrive.controls.lib.events import Events
from openpilot.selfdrive.monitoring.driver_monitor import DriverStatus from openpilot.selfdrive.monitoring.driver_monitor import DriverStatus
@ -13,10 +13,11 @@ def dmonitoringd_thread():
gc.disable() gc.disable()
set_realtime_priority(2) set_realtime_priority(2)
params = Params()
pm = messaging.PubMaster(['driverMonitoringState']) pm = messaging.PubMaster(['driverMonitoringState'])
sm = messaging.SubMaster(['driverStateV2', 'liveCalibration', 'carState', 'controlsState', 'modelV2'], poll=['driverStateV2']) sm = messaging.SubMaster(['driverStateV2', 'liveCalibration', 'carState', 'controlsState', 'modelV2'], poll=['driverStateV2'])
driver_status = DriverStatus(rhd_saved=Params().get_bool("IsRhdDetected")) driver_status = DriverStatus(rhd_saved=params.get_bool("IsRhdDetected"))
v_cruise_last = 0 v_cruise_last = 0
driver_engaged = False driver_engaged = False
@ -79,7 +80,7 @@ def dmonitoringd_thread():
if (sm['driverStateV2'].frameId % 6000 == 0 and if (sm['driverStateV2'].frameId % 6000 == 0 and
driver_status.wheelpos_learner.filtered_stat.n > driver_status.settings._WHEELPOS_FILTER_MIN_COUNT and driver_status.wheelpos_learner.filtered_stat.n > driver_status.settings._WHEELPOS_FILTER_MIN_COUNT and
driver_status.wheel_on_right == (driver_status.wheelpos_learner.filtered_stat.M > driver_status.settings._WHEELPOS_THRESHOLD)): driver_status.wheel_on_right == (driver_status.wheelpos_learner.filtered_stat.M > driver_status.settings._WHEELPOS_THRESHOLD)):
put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right) params.put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right)
def main(): def main():
dmonitoringd_thread() dmonitoringd_thread()

@ -2,7 +2,7 @@ import time
import threading import threading
from typing import Optional from typing import Optional
from openpilot.common.params import Params, put_nonblocking from openpilot.common.params import Params
from openpilot.system.hardware import HARDWARE from openpilot.system.hardware import HARDWARE
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.statsd import statlog from openpilot.selfdrive.statsd import statlog
@ -60,7 +60,7 @@ class PowerMonitoring:
self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0) self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)
self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh) self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh)
if now - self.last_save_time >= 10: if now - self.last_save_time >= 10:
put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh))) self.params.put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh)))
self.last_save_time = now self.last_save_time = now
# First measurement, set integration time # First measurement, set integration time

@ -3,7 +3,6 @@ import unittest
from unittest.mock import patch from unittest.mock import patch
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.selfdrive.test.helpers import noop
from openpilot.selfdrive.thermald.power_monitoring import PowerMonitoring, CAR_BATTERY_CAPACITY_uWh, \ from openpilot.selfdrive.thermald.power_monitoring import PowerMonitoring, CAR_BATTERY_CAPACITY_uWh, \
CAR_CHARGING_RATE_W, VBATT_PAUSE_CHARGING, DELAY_SHUTDOWN_TIME_S CAR_CHARGING_RATE_W, VBATT_PAUSE_CHARGING, DELAY_SHUTDOWN_TIME_S
@ -26,7 +25,6 @@ def pm_patch(name, value, constant=False):
@patch("time.monotonic", new=mock_time_monotonic) @patch("time.monotonic", new=mock_time_monotonic)
@patch("openpilot.selfdrive.thermald.power_monitoring.put_nonblocking", new=noop) # TODO: Remove this once nonblocking params are safer
class TestPowerMonitoring(unittest.TestCase): class TestPowerMonitoring(unittest.TestCase):
def setUp(self): def setUp(self):
self.params = Params() self.params = Params()

Loading…
Cancel
Save