From 1d7d7b89b2c251b2fb4e49a2617bd399689ad24e Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Thu, 14 Dec 2023 12:36:01 +0800 Subject: [PATCH] common/params: support nonblocking write (#29808) * Safe and efficient asynchronous writing parameters * call putNonBlocking in locationd * remove space * ->AsyncWriter * remove semicolon * use member function * asyc write multiple times * add test case for AsyncWriter * merge master * add missing include * public * cleanup * create once * cleanup * update that * explicit waiting * improve test case * pass prefix to asywriter * move to params * assert(queue.empty()) * add comment * add todo * test_power_monitoring: remove patch * rm laikad.py * fix import --------- Co-authored-by: Adeeb Shihadeh old-commit-hash: 3c4c4d1f7f22884f84f0f52ad45ab0412fab1eb5 --- common/SConscript | 2 +- common/params.cc | 30 +++++++++++++++++-- common/params.h | 20 +++++++++++-- common/params.py | 5 +--- common/params_pyx.pyx | 22 +++++++++----- common/tests/test_params.cc | 27 +++++++++++++++++ common/tests/test_params.py | 6 ++-- selfdrive/controls/controlsd.py | 8 ++--- selfdrive/locationd/calibrationd.py | 8 ++--- selfdrive/locationd/locationd.cc | 8 ++--- selfdrive/locationd/paramsd.py | 4 +-- selfdrive/monitoring/dmonitoringd.py | 7 +++-- selfdrive/thermald/power_monitoring.py | 4 +-- .../thermald/tests/test_power_monitoring.py | 2 -- 14 files changed, 111 insertions(+), 42 deletions(-) create mode 100644 common/tests/test_params.cc diff --git a/common/SConscript b/common/SConscript index 97322b248d..b8c23b7019 100644 --- a/common/SConscript +++ b/common/SConscript @@ -24,7 +24,7 @@ Export('_common', '_gpucommon') if GetOption('extras'): env.Program('tests/test_common', - ['tests/test_runner.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'], + ['tests/test_runner.cc', 'tests/test_params.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'], LIBS=[_common, 'json11', 'zmq', 'pthread']) # Cython bindings diff --git a/common/params.cc b/common/params.cc index d1d3513469..620cd42641 100644 --- a/common/params.cc +++ b/common/params.cc @@ -4,9 +4,11 @@ #include #include +#include #include #include +#include "common/queue.h" #include "common/swaglog.h" #include "common/util.h" #include "system/hardware/hw.h" @@ -214,8 +216,15 @@ std::unordered_map keys = { Params::Params(const std::string &path) { - prefix = "/" + util::getenv("OPENPILOT_PREFIX", "d"); - params_path = ensure_params_path(prefix, path); + params_prefix = "/" + util::getenv("OPENPILOT_PREFIX", "d"); + params_path = ensure_params_path(params_prefix, path); +} + +Params::~Params() { + if (future.valid()) { + future.wait(); + } + assert(queue.empty()); } std::vector Params::allKeys() const { @@ -328,3 +337,20 @@ void Params::clearAll(ParamKeyType key_type) { fsync_dir(getParamPath()); } + +void Params::putNonBlocking(const std::string &key, const std::string &val) { + queue.push(std::make_pair(key, val)); + // start thread on demand + if (!future.valid() || future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) { + future = std::async(std::launch::async, &Params::asyncWriteThread, this); + } +} + +void Params::asyncWriteThread() { + // TODO: write the latest one if a key has multiple values in the queue. + std::pair p; + while (queue.try_pop(p, 0)) { + // Params::put is Thread-Safe + put(p.first, p.second); + } +} diff --git a/common/params.h b/common/params.h index aa586a1581..d726a6185f 100644 --- a/common/params.h +++ b/common/params.h @@ -1,9 +1,14 @@ #pragma once +#include #include #include +#include +#include #include +#include "common/queue.h" + enum ParamKeyType { PERSISTENT = 0x02, CLEAR_ON_MANAGER_START = 0x04, @@ -17,6 +22,7 @@ enum ParamKeyType { class Params { public: explicit Params(const std::string &path = {}); + ~Params(); // Not copyable. Params(const Params&) = delete; Params& operator=(const Params&) = delete; @@ -25,7 +31,7 @@ public: bool checkKey(const std::string &key); ParamKeyType getKeyType(const std::string &key); inline std::string getParamPath(const std::string &key = {}) { - return params_path + prefix + (key.empty() ? "" : "/" + key); + return params_path + params_prefix + (key.empty() ? "" : "/" + key); } // Delete a value @@ -47,8 +53,18 @@ public: inline int putBool(const std::string &key, bool val) { return put(key.c_str(), val ? "1" : "0", 1); } + void putNonBlocking(const std::string &key, const std::string &val); + inline void putBoolNonBlocking(const std::string &key, bool val) { + putNonBlocking(key, val ? "1" : "0"); + } private: + void asyncWriteThread(); + std::string params_path; - std::string prefix; + std::string params_prefix; + + // for nonblocking write + std::future future; + SafeQueue> queue; }; diff --git a/common/params.py b/common/params.py index ea8ac7514a..66808083dc 100644 --- a/common/params.py +++ b/common/params.py @@ -1,10 +1,7 @@ -from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName, put_nonblocking, \ - put_bool_nonblocking +from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName assert Params assert ParamKeyType assert UnknownKeyName -assert put_nonblocking -assert put_bool_nonblocking if __name__ == "__main__": import sys diff --git a/common/params_pyx.pyx b/common/params_pyx.pyx index db8f496d30..47d2075df2 100644 --- a/common/params_pyx.pyx +++ b/common/params_pyx.pyx @@ -3,7 +3,6 @@ from libcpp cimport bool from libcpp.string cimport string from libcpp.vector cimport vector -import threading cdef extern from "common/params.h": cpdef enum ParamKeyType: @@ -20,6 +19,8 @@ cdef extern from "common/params.h": bool getBool(string, bool) nogil int remove(string) nogil int put(string, string) nogil + void putNonBlocking(string, string) nogil + void putBoolNonBlocking(string, bool) nogil int putBool(string, bool) nogil bool checkKey(string) nogil string getParamPath(string) nogil @@ -80,7 +81,7 @@ cdef class Params: """ Warning: This function blocks until the param is written to disk! In very rare cases this can take over a second, and your code will hang. - Use the put_nonblocking helper function in time sensitive code, but + Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but in general try to avoid writing params as much as possible. """ cdef string k = self.check_key(key) @@ -93,6 +94,17 @@ cdef class Params: with nogil: self.p.putBool(k, val) + def put_nonblocking(self, key, dat): + cdef string k = self.check_key(key) + cdef string dat_bytes = ensure_bytes(dat) + with nogil: + self.p.putNonBlocking(k, dat_bytes) + + def put_bool_nonblocking(self, key, bool val): + cdef string k = self.check_key(key) + with nogil: + self.p.putBoolNonBlocking(k, val) + def remove(self, key): cdef string k = self.check_key(key) with nogil: @@ -104,9 +116,3 @@ cdef class Params: def all_keys(self): return self.p.allKeys() - -def put_nonblocking(key, val, d=""): - threading.Thread(target=lambda: Params(d).put(key, val)).start() - -def put_bool_nonblocking(key, bool val, d=""): - threading.Thread(target=lambda: Params(d).put_bool(key, val)).start() diff --git a/common/tests/test_params.cc b/common/tests/test_params.cc new file mode 100644 index 0000000000..f8d6c79f55 --- /dev/null +++ b/common/tests/test_params.cc @@ -0,0 +1,27 @@ +#include "catch2/catch.hpp" +#define private public +#include "common/params.h" +#include "common/util.h" + +TEST_CASE("params_nonblocking_put") { + char tmp_path[] = "/tmp/asyncWriter_XXXXXX"; + const std::string param_path = mkdtemp(tmp_path); + auto param_names = {"CarParams", "IsMetric"}; + { + Params params(param_path); + for (const auto &name : param_names) { + params.putNonBlocking(name, "1"); + // param is empty + REQUIRE(params.get(name).empty()); + } + + // check if thread is running + REQUIRE(params.future.valid()); + REQUIRE(params.future.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout); + } + // check results + Params p(param_path); + for (const auto &name : param_names) { + REQUIRE(p.get(name) == "1"); + } +} diff --git a/common/tests/test_params.py b/common/tests/test_params.py index fb6f320ea4..490ee122be 100644 --- a/common/tests/test_params.py +++ b/common/tests/test_params.py @@ -4,7 +4,7 @@ import time import uuid import unittest -from openpilot.common.params import Params, ParamKeyType, UnknownKeyName, put_nonblocking, put_bool_nonblocking +from openpilot.common.params import Params, ParamKeyType, UnknownKeyName class TestParams(unittest.TestCase): def setUp(self): @@ -86,7 +86,7 @@ class TestParams(unittest.TestCase): q = Params() def _delayed_writer(): time.sleep(0.1) - put_nonblocking("CarParams", "test") + Params().put_nonblocking("CarParams", "test") threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"test" @@ -95,7 +95,7 @@ class TestParams(unittest.TestCase): q = Params() def _delayed_writer(): time.sleep(0.1) - put_bool_nonblocking("CarParams", True) + Params().put_bool_nonblocking("CarParams", True) threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"1" diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index 397d5e7d09..46a0b7f462 100755 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -9,7 +9,7 @@ from cereal import car, log from openpilot.common.numpy_fast import clip from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL from openpilot.common.profiler import Profiler -from openpilot.common.params import Params, put_nonblocking, put_bool_nonblocking +from openpilot.common.params import Params import cereal.messaging as messaging from cereal.visionipc import VisionIpcClient, VisionStreamType from openpilot.common.conversions import Conversions as CV @@ -128,8 +128,8 @@ class Controls: # Write CarParams for radard cp_bytes = self.CP.to_bytes() self.params.put("CarParams", cp_bytes) - put_nonblocking("CarParamsCache", cp_bytes) - put_nonblocking("CarParamsPersistent", cp_bytes) + self.params.put_nonblocking("CarParamsCache", cp_bytes) + self.params.put_nonblocking("CarParamsPersistent", cp_bytes) # cleanup old params if not self.CP.experimentalLongitudinalAvailable: @@ -449,7 +449,7 @@ class Controls: self.initialized = True self.set_initial_state() - put_bool_nonblocking("ControlsReady", True) + self.params.put_bool_nonblocking("ControlsReady", True) # Check for CAN timeout if not can_strs: diff --git a/selfdrive/locationd/calibrationd.py b/selfdrive/locationd/calibrationd.py index 961efabf8d..06be9f031a 100755 --- a/selfdrive/locationd/calibrationd.py +++ b/selfdrive/locationd/calibrationd.py @@ -15,7 +15,7 @@ from typing import List, NoReturn, Optional from cereal import log import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV -from openpilot.common.params import Params, put_nonblocking +from openpilot.common.params import Params from openpilot.common.realtime import set_realtime_priority from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot from openpilot.common.swaglog import cloudlog @@ -64,8 +64,8 @@ class Calibrator: self.not_car = False # Read saved calibration - params = Params() - calibration_params = params.get("CalibrationParams") + self.params = Params() + calibration_params = self.params.get("CalibrationParams") rpy_init = RPY_INIT wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT height = HEIGHT_INIT @@ -164,7 +164,7 @@ class Calibrator: write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5) if self.param_put and write_this_cycle: - put_nonblocking("CalibrationParams", self.get_msg(True).to_bytes()) + self.params.put_nonblocking("CalibrationParams", self.get_msg(True).to_bytes()) def handle_v_ego(self, v_ego: float) -> None: self.v_ego = v_ego diff --git a/selfdrive/locationd/locationd.cc b/selfdrive/locationd/locationd.cc index 80882fc951..e32ed78a3e 100644 --- a/selfdrive/locationd/locationd.cc +++ b/selfdrive/locationd/locationd.cc @@ -678,9 +678,10 @@ void Localizer::configure_gnss_source(const LocalizerGnssSource &source) { } int Localizer::locationd_thread() { + Params params; LocalizerGnssSource source; const char* gps_location_socket; - if (Params().getBool("UbloxAvailable")) { + if (params.getBool("UbloxAvailable")) { source = LocalizerGnssSource::UBLOX; gps_location_socket = "gpsLocationExternal"; } else { @@ -737,10 +738,7 @@ int Localizer::locationd_thread() { VectorXd posGeo = this->get_position_geodetic(); std::string lastGPSPosJSON = util::string_format( "{\"latitude\": %.15f, \"longitude\": %.15f, \"altitude\": %.15f}", posGeo(0), posGeo(1), posGeo(2)); - - std::thread([] (const std::string gpsjson) { - Params().put("LastGPSPosition", gpsjson); - }, lastGPSPosJSON).detach(); + params.putNonBlocking("LastGPSPosition", lastGPSPosJSON); } cnt++; } diff --git a/selfdrive/locationd/paramsd.py b/selfdrive/locationd/paramsd.py index 4a598e8d4b..183a8666e8 100755 --- a/selfdrive/locationd/paramsd.py +++ b/selfdrive/locationd/paramsd.py @@ -7,7 +7,7 @@ import numpy as np import cereal.messaging as messaging from cereal import car from cereal import log -from openpilot.common.params import Params, put_nonblocking +from openpilot.common.params import Params from openpilot.common.realtime import config_realtime_process, DT_MDL from openpilot.common.numpy_fast import clip from openpilot.selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States @@ -251,7 +251,7 @@ def main(): 'stiffnessFactor': liveParameters.stiffnessFactor, 'angleOffsetAverageDeg': liveParameters.angleOffsetAverageDeg, } - put_nonblocking("LiveParameters", json.dumps(params)) + params_reader.put_nonblocking("LiveParameters", json.dumps(params)) pm.send('liveParameters', msg) diff --git a/selfdrive/monitoring/dmonitoringd.py b/selfdrive/monitoring/dmonitoringd.py index 7455459967..7a24c0107e 100755 --- a/selfdrive/monitoring/dmonitoringd.py +++ b/selfdrive/monitoring/dmonitoringd.py @@ -3,7 +3,7 @@ import gc import cereal.messaging as messaging from cereal import car -from openpilot.common.params import Params, put_bool_nonblocking +from openpilot.common.params import Params from openpilot.common.realtime import set_realtime_priority from openpilot.selfdrive.controls.lib.events import Events from openpilot.selfdrive.monitoring.driver_monitor import DriverStatus @@ -13,10 +13,11 @@ def dmonitoringd_thread(): gc.disable() set_realtime_priority(2) + params = Params() pm = messaging.PubMaster(['driverMonitoringState']) sm = messaging.SubMaster(['driverStateV2', 'liveCalibration', 'carState', 'controlsState', 'modelV2'], poll=['driverStateV2']) - driver_status = DriverStatus(rhd_saved=Params().get_bool("IsRhdDetected")) + driver_status = DriverStatus(rhd_saved=params.get_bool("IsRhdDetected")) v_cruise_last = 0 driver_engaged = False @@ -79,7 +80,7 @@ def dmonitoringd_thread(): if (sm['driverStateV2'].frameId % 6000 == 0 and driver_status.wheelpos_learner.filtered_stat.n > driver_status.settings._WHEELPOS_FILTER_MIN_COUNT and driver_status.wheel_on_right == (driver_status.wheelpos_learner.filtered_stat.M > driver_status.settings._WHEELPOS_THRESHOLD)): - put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right) + params.put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right) def main(): dmonitoringd_thread() diff --git a/selfdrive/thermald/power_monitoring.py b/selfdrive/thermald/power_monitoring.py index 0520385473..8802a82af4 100644 --- a/selfdrive/thermald/power_monitoring.py +++ b/selfdrive/thermald/power_monitoring.py @@ -2,7 +2,7 @@ import time import threading from typing import Optional -from openpilot.common.params import Params, put_nonblocking +from openpilot.common.params import Params from openpilot.system.hardware import HARDWARE from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.statsd import statlog @@ -60,7 +60,7 @@ class PowerMonitoring: self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0) self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh) if now - self.last_save_time >= 10: - put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh))) + self.params.put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh))) self.last_save_time = now # First measurement, set integration time diff --git a/selfdrive/thermald/tests/test_power_monitoring.py b/selfdrive/thermald/tests/test_power_monitoring.py index f8d43737cd..c3a890f068 100755 --- a/selfdrive/thermald/tests/test_power_monitoring.py +++ b/selfdrive/thermald/tests/test_power_monitoring.py @@ -3,7 +3,6 @@ import unittest from unittest.mock import patch from openpilot.common.params import Params -from openpilot.selfdrive.test.helpers import noop from openpilot.selfdrive.thermald.power_monitoring import PowerMonitoring, CAR_BATTERY_CAPACITY_uWh, \ CAR_CHARGING_RATE_W, VBATT_PAUSE_CHARGING, DELAY_SHUTDOWN_TIME_S @@ -26,7 +25,6 @@ def pm_patch(name, value, constant=False): @patch("time.monotonic", new=mock_time_monotonic) -@patch("openpilot.selfdrive.thermald.power_monitoring.put_nonblocking", new=noop) # TODO: Remove this once nonblocking params are safer class TestPowerMonitoring(unittest.TestCase): def setUp(self): self.params = Params()