From 08e70aecb21dc2084fe2e28fb0f14285548a060c Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Wed, 6 Sep 2023 23:50:28 +0800 Subject: [PATCH] params: safe and efficient async writing parameters (#25912) * Safe and efficient asynchronous writing parameters * call putNonBlocking in locationd * remove space * ->AsyncWriter * remove semicolon * use member function * asyc write multiple times * add test case for AsyncWriter * merge master * add missing include * public * cleanup * create once * cleanup * update that * explicit waiting * improve test case --------- Co-authored-by: Adeeb Shihadeh old-commit-hash: 0d797f4e8b2552192669d0329c14849f0a970066 --- common/SConscript | 2 +- common/params.cc | 31 ++++++++++++++++++++++++++ common/params.h | 21 +++++++++++++++++ common/params.py | 5 +---- common/params_pyx.pyx | 22 +++++++++++------- common/tests/test_params.cc | 27 ++++++++++++++++++++++ common/tests/test_params.py | 6 ++--- selfdrive/controls/controlsd.py | 8 +++---- selfdrive/locationd/calibrationd.py | 8 +++---- selfdrive/locationd/laikad.py | 8 ++++--- selfdrive/locationd/locationd.cc | 8 +++---- selfdrive/locationd/paramsd.py | 4 ++-- selfdrive/monitoring/dmonitoringd.py | 8 ++++--- selfdrive/thermald/power_monitoring.py | 4 ++-- 14 files changed, 123 insertions(+), 39 deletions(-) create mode 100644 common/tests/test_params.cc diff --git a/common/SConscript b/common/SConscript index d22aca128..d999d5433 100644 --- a/common/SConscript +++ b/common/SConscript @@ -29,7 +29,7 @@ Export('_common', '_gpucommon') if GetOption('extras'): env.Program('tests/test_common', - ['tests/test_runner.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'], + ['tests/test_runner.cc', 'tests/test_params.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'], LIBS=[_common, 'json11', 'zmq', 'pthread']) # Cython bindings diff --git a/common/params.cc b/common/params.cc index 63baa3031..2cd980788 100644 --- a/common/params.cc +++ b/common/params.cc @@ -7,6 +7,7 @@ #include #include +#include "common/queue.h" #include "common/swaglog.h" #include "common/util.h" #include "system/hardware/hw.h" @@ -327,3 +328,33 @@ void Params::clearAll(ParamKeyType key_type) { fsync_dir(getParamPath()); } + +void Params::putNonBlocking(const std::string &key, const std::string &val) { + static AsyncWriter async_writer; + async_writer.queue({params_path, key, val}); +} + + +// AsyncWriter + +AsyncWriter::~AsyncWriter() { + if (future.valid()) { + future.wait(); + } +} + +void AsyncWriter::queue(const std::tuple &dat) { + q.push(dat); + // start thread on demand + if (!future.valid() || future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) { + future = std::async(std::launch::async, &AsyncWriter::write, this); + } +} + +void AsyncWriter::write() { + std::tuple dat; + while (q.try_pop(dat, 0)) { + auto &[path, key, value] = dat; + Params(path).put(key, value); + } +} diff --git a/common/params.h b/common/params.h index 24b1bffeb..8889a8e1b 100644 --- a/common/params.h +++ b/common/params.h @@ -1,9 +1,13 @@ #pragma once +#include #include #include +#include #include +#include "common/queue.h" + enum ParamKeyType { PERSISTENT = 0x02, CLEAR_ON_MANAGER_START = 0x04, @@ -42,8 +46,25 @@ public: inline int putBool(const std::string &key, bool val) { return put(key.c_str(), val ? "1" : "0", 1); } + void putNonBlocking(const std::string &key, const std::string &val); + inline void putBoolNonBlocking(const std::string &key, bool val) { + putNonBlocking(key, val ? "1" : "0"); + } private: std::string params_path; std::string prefix; }; + +class AsyncWriter { +public: + AsyncWriter() {} + ~AsyncWriter(); + void queue(const std::tuple &dat); + +private: + void write(); + + std::future future; + SafeQueue> q; +}; diff --git a/common/params.py b/common/params.py index ea8ac7514..66808083d 100644 --- a/common/params.py +++ b/common/params.py @@ -1,10 +1,7 @@ -from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName, put_nonblocking, \ - put_bool_nonblocking +from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName assert Params assert ParamKeyType assert UnknownKeyName -assert put_nonblocking -assert put_bool_nonblocking if __name__ == "__main__": import sys diff --git a/common/params_pyx.pyx b/common/params_pyx.pyx index abb3199d0..5ea7874a5 100644 --- a/common/params_pyx.pyx +++ b/common/params_pyx.pyx @@ -3,7 +3,6 @@ from libcpp cimport bool from libcpp.string cimport string from libcpp.vector cimport vector -import threading cdef extern from "common/params.h": cpdef enum ParamKeyType: @@ -19,6 +18,8 @@ cdef extern from "common/params.h": bool getBool(string, bool) nogil int remove(string) nogil int put(string, string) nogil + void putNonBlocking(string, string) nogil + void putBoolNonBlocking(string, bool) nogil int putBool(string, bool) nogil bool checkKey(string) nogil string getParamPath(string) nogil @@ -79,7 +80,7 @@ cdef class Params: """ Warning: This function blocks until the param is written to disk! In very rare cases this can take over a second, and your code will hang. - Use the put_nonblocking helper function in time sensitive code, but + Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but in general try to avoid writing params as much as possible. """ cdef string k = self.check_key(key) @@ -92,6 +93,17 @@ cdef class Params: with nogil: self.p.putBool(k, val) + def put_nonblocking(self, key, dat): + cdef string k = self.check_key(key) + cdef string dat_bytes = ensure_bytes(dat) + with nogil: + self.p.putNonBlocking(k, dat_bytes) + + def put_bool_nonblocking(self, key, bool val): + cdef string k = self.check_key(key) + with nogil: + self.p.putBoolNonBlocking(k, val) + def remove(self, key): cdef string k = self.check_key(key) with nogil: @@ -103,9 +115,3 @@ cdef class Params: def all_keys(self): return self.p.allKeys() - -def put_nonblocking(key, val, d=""): - threading.Thread(target=lambda: Params(d).put(key, val)).start() - -def put_bool_nonblocking(key, bool val, d=""): - threading.Thread(target=lambda: Params(d).put_bool(key, val)).start() diff --git a/common/tests/test_params.cc b/common/tests/test_params.cc new file mode 100644 index 000000000..a7af7ceb0 --- /dev/null +++ b/common/tests/test_params.cc @@ -0,0 +1,27 @@ +#include "catch2/catch.hpp" +#define private public +#include "common/params.h" +#include "common/util.h" + +TEST_CASE("Params/asyncWriter") { + char tmp_path[] = "/tmp/asyncWriter_XXXXXX"; + const std::string param_path = mkdtemp(tmp_path); + Params params(param_path); + auto param_names = {"CarParams", "IsMetric"}; + { + AsyncWriter async_writer; + for (const auto &name : param_names) { + async_writer.queue({param_path, name, "1"}); + // param is empty + REQUIRE(params.get(name).empty()); + } + + // check if thread is running + REQUIRE(async_writer.future.valid()); + REQUIRE(async_writer.future.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout); + } + // check results + for (const auto &name : param_names) { + REQUIRE(params.get(name) == "1"); + } +} diff --git a/common/tests/test_params.py b/common/tests/test_params.py index fb6f320ea..490ee122b 100644 --- a/common/tests/test_params.py +++ b/common/tests/test_params.py @@ -4,7 +4,7 @@ import time import uuid import unittest -from openpilot.common.params import Params, ParamKeyType, UnknownKeyName, put_nonblocking, put_bool_nonblocking +from openpilot.common.params import Params, ParamKeyType, UnknownKeyName class TestParams(unittest.TestCase): def setUp(self): @@ -86,7 +86,7 @@ class TestParams(unittest.TestCase): q = Params() def _delayed_writer(): time.sleep(0.1) - put_nonblocking("CarParams", "test") + Params().put_nonblocking("CarParams", "test") threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"test" @@ -95,7 +95,7 @@ class TestParams(unittest.TestCase): q = Params() def _delayed_writer(): time.sleep(0.1) - put_bool_nonblocking("CarParams", True) + Params().put_bool_nonblocking("CarParams", True) threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"1" diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index 809403073..50791e51c 100755 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -8,7 +8,7 @@ from cereal import car, log from openpilot.common.numpy_fast import clip from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL from openpilot.common.profiler import Profiler -from openpilot.common.params import Params, put_nonblocking, put_bool_nonblocking +from openpilot.common.params import Params import cereal.messaging as messaging from cereal.visionipc import VisionIpcClient, VisionStreamType from openpilot.common.conversions import Conversions as CV @@ -129,8 +129,8 @@ class Controls: # Write CarParams for radard cp_bytes = self.CP.to_bytes() self.params.put("CarParams", cp_bytes) - put_nonblocking("CarParamsCache", cp_bytes) - put_nonblocking("CarParamsPersistent", cp_bytes) + self.params.put_nonblocking("CarParamsCache", cp_bytes) + self.params.put_nonblocking("CarParamsPersistent", cp_bytes) # cleanup old params if not self.CP.experimentalLongitudinalAvailable or is_release_branch(): @@ -449,7 +449,7 @@ class Controls: self.initialized = True self.set_initial_state() - put_bool_nonblocking("ControlsReady", True) + self.params.put_bool_nonblocking("ControlsReady", True) # Check for CAN timeout if not can_strs: diff --git a/selfdrive/locationd/calibrationd.py b/selfdrive/locationd/calibrationd.py index 6469ece40..8a0cecb81 100755 --- a/selfdrive/locationd/calibrationd.py +++ b/selfdrive/locationd/calibrationd.py @@ -15,7 +15,7 @@ from typing import List, NoReturn, Optional from cereal import log import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV -from openpilot.common.params import Params, put_nonblocking +from openpilot.common.params import Params from openpilot.common.realtime import set_realtime_priority from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot from openpilot.system.swaglog import cloudlog @@ -63,8 +63,8 @@ class Calibrator: self.not_car = False # Read saved calibration - params = Params() - calibration_params = params.get("CalibrationParams") + self.params = Params() + calibration_params = self.params.get("CalibrationParams") rpy_init = RPY_INIT wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT height = HEIGHT_INIT @@ -162,7 +162,7 @@ class Calibrator: write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5) if self.param_put and write_this_cycle: - put_nonblocking("CalibrationParams", self.get_msg().to_bytes()) + self.params.put_nonblocking("CalibrationParams", self.get_msg().to_bytes()) def handle_v_ego(self, v_ego: float) -> None: self.v_ego = v_ego diff --git a/selfdrive/locationd/laikad.py b/selfdrive/locationd/laikad.py index 3d190fb00..ffb095439 100755 --- a/selfdrive/locationd/laikad.py +++ b/selfdrive/locationd/laikad.py @@ -11,7 +11,7 @@ from typing import List, Optional, Dict, Any import numpy as np from cereal import log, messaging -from openpilot.common.params import Params, put_nonblocking +from openpilot.common.params import Params from laika import AstroDog from laika.constants import SECS_IN_HR, SECS_IN_MIN from laika.downloader import DownloadFailed @@ -82,6 +82,8 @@ class Laikad: valid_ephem_types: Valid ephemeris types to be used by AstroDog save_ephemeris: If true saves and loads nav and orbit ephemeris to cache. """ + self.params = Params() + self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True, cache_dir=DOWNLOADS_CACHE_FOLDER) self.gnss_kf = GNSSKalman(GENERATED_DIR, cython=True, erratic_clock=use_qcom) @@ -113,7 +115,7 @@ class Laikad: if not self.save_ephemeris: return - cache_bytes = Params().get(EPHEMERIS_CACHE) + cache_bytes = self.params.get(EPHEMERIS_CACHE) if not cache_bytes: return @@ -141,7 +143,7 @@ class Laikad: if len(valid_navs) > 0: ephem_cache = ephemeris_structs.EphemerisCache(glonassEphemerides=[e.data for e in valid_navs if e.prn[0]=='R'], gpsEphemerides=[e.data for e in valid_navs if e.prn[0]=='G']) - put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes()) + self.params.put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes()) cloudlog.debug("Cache saved") self.last_cached_t = self.last_report_time diff --git a/selfdrive/locationd/locationd.cc b/selfdrive/locationd/locationd.cc index 401de0cfd..508cc9f69 100644 --- a/selfdrive/locationd/locationd.cc +++ b/selfdrive/locationd/locationd.cc @@ -669,9 +669,10 @@ void Localizer::configure_gnss_source(const LocalizerGnssSource &source) { } int Localizer::locationd_thread() { + Params params; LocalizerGnssSource source; const char* gps_location_socket; - if (Params().getBool("UbloxAvailable")) { + if (params.getBool("UbloxAvailable")) { source = LocalizerGnssSource::UBLOX; gps_location_socket = "gpsLocationExternal"; } else { @@ -728,10 +729,7 @@ int Localizer::locationd_thread() { VectorXd posGeo = this->get_position_geodetic(); std::string lastGPSPosJSON = util::string_format( "{\"latitude\": %.15f, \"longitude\": %.15f, \"altitude\": %.15f}", posGeo(0), posGeo(1), posGeo(2)); - - std::thread([] (const std::string gpsjson) { - Params().put("LastGPSPosition", gpsjson); - }, lastGPSPosJSON).detach(); + params.putNonBlocking("LastGPSPosition", lastGPSPosJSON); } cnt++; } diff --git a/selfdrive/locationd/paramsd.py b/selfdrive/locationd/paramsd.py index 55ad62145..f8bdacc0c 100755 --- a/selfdrive/locationd/paramsd.py +++ b/selfdrive/locationd/paramsd.py @@ -7,7 +7,7 @@ import numpy as np import cereal.messaging as messaging from cereal import car from cereal import log -from openpilot.common.params import Params, put_nonblocking +from openpilot.common.params import Params from openpilot.common.realtime import config_realtime_process, DT_MDL from openpilot.common.numpy_fast import clip from openpilot.selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States @@ -247,7 +247,7 @@ def main(sm=None, pm=None): 'stiffnessFactor': liveParameters.stiffnessFactor, 'angleOffsetAverageDeg': liveParameters.angleOffsetAverageDeg, } - put_nonblocking("LiveParameters", json.dumps(params)) + params_reader.put_nonblocking("LiveParameters", json.dumps(params)) pm.send('liveParameters', msg) diff --git a/selfdrive/monitoring/dmonitoringd.py b/selfdrive/monitoring/dmonitoringd.py index c7cff88f3..0cd55652a 100755 --- a/selfdrive/monitoring/dmonitoringd.py +++ b/selfdrive/monitoring/dmonitoringd.py @@ -4,7 +4,7 @@ import gc import cereal.messaging as messaging from cereal import car from cereal import log -from openpilot.common.params import Params, put_bool_nonblocking +from openpilot.common.params import Params from openpilot.common.realtime import set_realtime_priority from openpilot.selfdrive.controls.lib.events import Events from openpilot.selfdrive.monitoring.driver_monitor import DriverStatus @@ -14,13 +14,15 @@ def dmonitoringd_thread(sm=None, pm=None): gc.disable() set_realtime_priority(2) + params = Params() + if pm is None: pm = messaging.PubMaster(['driverMonitoringState']) if sm is None: sm = messaging.SubMaster(['driverStateV2', 'liveCalibration', 'carState', 'controlsState', 'modelV2'], poll=['driverStateV2']) - driver_status = DriverStatus(rhd_saved=Params().get_bool("IsRhdDetected")) + driver_status = DriverStatus(rhd_saved=params.get_bool("IsRhdDetected")) sm['liveCalibration'].calStatus = log.LiveCalibrationData.Status.invalid sm['liveCalibration'].rpyCalib = [0, 0, 0] @@ -87,7 +89,7 @@ def dmonitoringd_thread(sm=None, pm=None): if (sm['driverStateV2'].frameId % 6000 == 0 and driver_status.wheelpos_learner.filtered_stat.n > driver_status.settings._WHEELPOS_FILTER_MIN_COUNT and driver_status.wheel_on_right == (driver_status.wheelpos_learner.filtered_stat.M > driver_status.settings._WHEELPOS_THRESHOLD)): - put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right) + params.put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right) def main(sm=None, pm=None): dmonitoringd_thread(sm, pm) diff --git a/selfdrive/thermald/power_monitoring.py b/selfdrive/thermald/power_monitoring.py index 0b3c73a1c..9d3720dac 100644 --- a/selfdrive/thermald/power_monitoring.py +++ b/selfdrive/thermald/power_monitoring.py @@ -2,7 +2,7 @@ import time import threading from typing import Optional -from openpilot.common.params import Params, put_nonblocking +from openpilot.common.params import Params from openpilot.system.hardware import HARDWARE from openpilot.system.swaglog import cloudlog from openpilot.selfdrive.statsd import statlog @@ -60,7 +60,7 @@ class PowerMonitoring: self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0) self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh) if now - self.last_save_time >= 10: - put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh))) + self.params.put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh))) self.last_save_time = now # First measurement, set integration time