diff --git a/common/SConscript b/common/SConscript index d999d54336..d22aca128b 100644 --- a/common/SConscript +++ b/common/SConscript @@ -29,7 +29,7 @@ Export('_common', '_gpucommon') if GetOption('extras'): env.Program('tests/test_common', - ['tests/test_runner.cc', 'tests/test_params.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'], + ['tests/test_runner.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc', 'tests/test_ratekeeper.cc'], LIBS=[_common, 'json11', 'zmq', 'pthread']) # Cython bindings diff --git a/common/params.cc b/common/params.cc index 2cd9807887..63baa30315 100644 --- a/common/params.cc +++ b/common/params.cc @@ -7,7 +7,6 @@ #include #include -#include "common/queue.h" #include "common/swaglog.h" #include "common/util.h" #include "system/hardware/hw.h" @@ -328,33 +327,3 @@ void Params::clearAll(ParamKeyType key_type) { fsync_dir(getParamPath()); } - -void Params::putNonBlocking(const std::string &key, const std::string &val) { - static AsyncWriter async_writer; - async_writer.queue({params_path, key, val}); -} - - -// AsyncWriter - -AsyncWriter::~AsyncWriter() { - if (future.valid()) { - future.wait(); - } -} - -void AsyncWriter::queue(const std::tuple &dat) { - q.push(dat); - // start thread on demand - if (!future.valid() || future.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready) { - future = std::async(std::launch::async, &AsyncWriter::write, this); - } -} - -void AsyncWriter::write() { - std::tuple dat; - while (q.try_pop(dat, 0)) { - auto &[path, key, value] = dat; - Params(path).put(key, value); - } -} diff --git a/common/params.h b/common/params.h index 8889a8e1b4..24b1bffeb1 100644 --- a/common/params.h +++ b/common/params.h @@ -1,13 +1,9 @@ #pragma once -#include #include #include -#include #include -#include "common/queue.h" - enum ParamKeyType { PERSISTENT = 0x02, CLEAR_ON_MANAGER_START = 0x04, @@ -46,25 +42,8 @@ public: inline int putBool(const std::string &key, bool val) { return put(key.c_str(), val ? "1" : "0", 1); } - void putNonBlocking(const std::string &key, const std::string &val); - inline void putBoolNonBlocking(const std::string &key, bool val) { - putNonBlocking(key, val ? "1" : "0"); - } private: std::string params_path; std::string prefix; }; - -class AsyncWriter { -public: - AsyncWriter() {} - ~AsyncWriter(); - void queue(const std::tuple &dat); - -private: - void write(); - - std::future future; - SafeQueue> q; -}; diff --git a/common/params.py b/common/params.py index 66808083dc..ea8ac7514a 100644 --- a/common/params.py +++ b/common/params.py @@ -1,7 +1,10 @@ -from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName +from openpilot.common.params_pyx import Params, ParamKeyType, UnknownKeyName, put_nonblocking, \ + put_bool_nonblocking assert Params assert ParamKeyType assert UnknownKeyName +assert put_nonblocking +assert put_bool_nonblocking if __name__ == "__main__": import sys diff --git a/common/params_pyx.pyx b/common/params_pyx.pyx index 5ea7874a5f..abb3199d05 100644 --- a/common/params_pyx.pyx +++ b/common/params_pyx.pyx @@ -3,6 +3,7 @@ from libcpp cimport bool from libcpp.string cimport string from libcpp.vector cimport vector +import threading cdef extern from "common/params.h": cpdef enum ParamKeyType: @@ -18,8 +19,6 @@ cdef extern from "common/params.h": bool getBool(string, bool) nogil int remove(string) nogil int put(string, string) nogil - void putNonBlocking(string, string) nogil - void putBoolNonBlocking(string, bool) nogil int putBool(string, bool) nogil bool checkKey(string) nogil string getParamPath(string) nogil @@ -80,7 +79,7 @@ cdef class Params: """ Warning: This function blocks until the param is written to disk! In very rare cases this can take over a second, and your code will hang. - Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but + Use the put_nonblocking helper function in time sensitive code, but in general try to avoid writing params as much as possible. """ cdef string k = self.check_key(key) @@ -93,17 +92,6 @@ cdef class Params: with nogil: self.p.putBool(k, val) - def put_nonblocking(self, key, dat): - cdef string k = self.check_key(key) - cdef string dat_bytes = ensure_bytes(dat) - with nogil: - self.p.putNonBlocking(k, dat_bytes) - - def put_bool_nonblocking(self, key, bool val): - cdef string k = self.check_key(key) - with nogil: - self.p.putBoolNonBlocking(k, val) - def remove(self, key): cdef string k = self.check_key(key) with nogil: @@ -115,3 +103,9 @@ cdef class Params: def all_keys(self): return self.p.allKeys() + +def put_nonblocking(key, val, d=""): + threading.Thread(target=lambda: Params(d).put(key, val)).start() + +def put_bool_nonblocking(key, bool val, d=""): + threading.Thread(target=lambda: Params(d).put_bool(key, val)).start() diff --git a/common/tests/test_params.cc b/common/tests/test_params.cc deleted file mode 100644 index a7af7ceb06..0000000000 --- a/common/tests/test_params.cc +++ /dev/null @@ -1,27 +0,0 @@ -#include "catch2/catch.hpp" -#define private public -#include "common/params.h" -#include "common/util.h" - -TEST_CASE("Params/asyncWriter") { - char tmp_path[] = "/tmp/asyncWriter_XXXXXX"; - const std::string param_path = mkdtemp(tmp_path); - Params params(param_path); - auto param_names = {"CarParams", "IsMetric"}; - { - AsyncWriter async_writer; - for (const auto &name : param_names) { - async_writer.queue({param_path, name, "1"}); - // param is empty - REQUIRE(params.get(name).empty()); - } - - // check if thread is running - REQUIRE(async_writer.future.valid()); - REQUIRE(async_writer.future.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout); - } - // check results - for (const auto &name : param_names) { - REQUIRE(params.get(name) == "1"); - } -} diff --git a/common/tests/test_params.py b/common/tests/test_params.py index 490ee122be..fb6f320ea4 100644 --- a/common/tests/test_params.py +++ b/common/tests/test_params.py @@ -4,7 +4,7 @@ import time import uuid import unittest -from openpilot.common.params import Params, ParamKeyType, UnknownKeyName +from openpilot.common.params import Params, ParamKeyType, UnknownKeyName, put_nonblocking, put_bool_nonblocking class TestParams(unittest.TestCase): def setUp(self): @@ -86,7 +86,7 @@ class TestParams(unittest.TestCase): q = Params() def _delayed_writer(): time.sleep(0.1) - Params().put_nonblocking("CarParams", "test") + put_nonblocking("CarParams", "test") threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"test" @@ -95,7 +95,7 @@ class TestParams(unittest.TestCase): q = Params() def _delayed_writer(): time.sleep(0.1) - Params().put_bool_nonblocking("CarParams", True) + put_bool_nonblocking("CarParams", True) threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"1" diff --git a/selfdrive/controls/controlsd.py b/selfdrive/controls/controlsd.py index 50791e51ca..8094030731 100755 --- a/selfdrive/controls/controlsd.py +++ b/selfdrive/controls/controlsd.py @@ -8,7 +8,7 @@ from cereal import car, log from openpilot.common.numpy_fast import clip from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper, DT_CTRL from openpilot.common.profiler import Profiler -from openpilot.common.params import Params +from openpilot.common.params import Params, put_nonblocking, put_bool_nonblocking import cereal.messaging as messaging from cereal.visionipc import VisionIpcClient, VisionStreamType from openpilot.common.conversions import Conversions as CV @@ -129,8 +129,8 @@ class Controls: # Write CarParams for radard cp_bytes = self.CP.to_bytes() self.params.put("CarParams", cp_bytes) - self.params.put_nonblocking("CarParamsCache", cp_bytes) - self.params.put_nonblocking("CarParamsPersistent", cp_bytes) + put_nonblocking("CarParamsCache", cp_bytes) + put_nonblocking("CarParamsPersistent", cp_bytes) # cleanup old params if not self.CP.experimentalLongitudinalAvailable or is_release_branch(): @@ -449,7 +449,7 @@ class Controls: self.initialized = True self.set_initial_state() - self.params.put_bool_nonblocking("ControlsReady", True) + put_bool_nonblocking("ControlsReady", True) # Check for CAN timeout if not can_strs: diff --git a/selfdrive/locationd/calibrationd.py b/selfdrive/locationd/calibrationd.py index 8a0cecb81a..6469ece402 100755 --- a/selfdrive/locationd/calibrationd.py +++ b/selfdrive/locationd/calibrationd.py @@ -15,7 +15,7 @@ from typing import List, NoReturn, Optional from cereal import log import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV -from openpilot.common.params import Params +from openpilot.common.params import Params, put_nonblocking from openpilot.common.realtime import set_realtime_priority from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot from openpilot.system.swaglog import cloudlog @@ -63,8 +63,8 @@ class Calibrator: self.not_car = False # Read saved calibration - self.params = Params() - calibration_params = self.params.get("CalibrationParams") + params = Params() + calibration_params = params.get("CalibrationParams") rpy_init = RPY_INIT wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT height = HEIGHT_INIT @@ -162,7 +162,7 @@ class Calibrator: write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5) if self.param_put and write_this_cycle: - self.params.put_nonblocking("CalibrationParams", self.get_msg().to_bytes()) + put_nonblocking("CalibrationParams", self.get_msg().to_bytes()) def handle_v_ego(self, v_ego: float) -> None: self.v_ego = v_ego diff --git a/selfdrive/locationd/laikad.py b/selfdrive/locationd/laikad.py index ffb095439a..3d190fb00a 100755 --- a/selfdrive/locationd/laikad.py +++ b/selfdrive/locationd/laikad.py @@ -11,7 +11,7 @@ from typing import List, Optional, Dict, Any import numpy as np from cereal import log, messaging -from openpilot.common.params import Params +from openpilot.common.params import Params, put_nonblocking from laika import AstroDog from laika.constants import SECS_IN_HR, SECS_IN_MIN from laika.downloader import DownloadFailed @@ -82,8 +82,6 @@ class Laikad: valid_ephem_types: Valid ephemeris types to be used by AstroDog save_ephemeris: If true saves and loads nav and orbit ephemeris to cache. """ - self.params = Params() - self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True, cache_dir=DOWNLOADS_CACHE_FOLDER) self.gnss_kf = GNSSKalman(GENERATED_DIR, cython=True, erratic_clock=use_qcom) @@ -115,7 +113,7 @@ class Laikad: if not self.save_ephemeris: return - cache_bytes = self.params.get(EPHEMERIS_CACHE) + cache_bytes = Params().get(EPHEMERIS_CACHE) if not cache_bytes: return @@ -143,7 +141,7 @@ class Laikad: if len(valid_navs) > 0: ephem_cache = ephemeris_structs.EphemerisCache(glonassEphemerides=[e.data for e in valid_navs if e.prn[0]=='R'], gpsEphemerides=[e.data for e in valid_navs if e.prn[0]=='G']) - self.params.put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes()) + put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes()) cloudlog.debug("Cache saved") self.last_cached_t = self.last_report_time diff --git a/selfdrive/locationd/locationd.cc b/selfdrive/locationd/locationd.cc index 508cc9f692..401de0cfdd 100644 --- a/selfdrive/locationd/locationd.cc +++ b/selfdrive/locationd/locationd.cc @@ -669,10 +669,9 @@ void Localizer::configure_gnss_source(const LocalizerGnssSource &source) { } int Localizer::locationd_thread() { - Params params; LocalizerGnssSource source; const char* gps_location_socket; - if (params.getBool("UbloxAvailable")) { + if (Params().getBool("UbloxAvailable")) { source = LocalizerGnssSource::UBLOX; gps_location_socket = "gpsLocationExternal"; } else { @@ -729,7 +728,10 @@ int Localizer::locationd_thread() { VectorXd posGeo = this->get_position_geodetic(); std::string lastGPSPosJSON = util::string_format( "{\"latitude\": %.15f, \"longitude\": %.15f, \"altitude\": %.15f}", posGeo(0), posGeo(1), posGeo(2)); - params.putNonBlocking("LastGPSPosition", lastGPSPosJSON); + + std::thread([] (const std::string gpsjson) { + Params().put("LastGPSPosition", gpsjson); + }, lastGPSPosJSON).detach(); } cnt++; } diff --git a/selfdrive/locationd/paramsd.py b/selfdrive/locationd/paramsd.py index f8bdacc0ce..55ad62145e 100755 --- a/selfdrive/locationd/paramsd.py +++ b/selfdrive/locationd/paramsd.py @@ -7,7 +7,7 @@ import numpy as np import cereal.messaging as messaging from cereal import car from cereal import log -from openpilot.common.params import Params +from openpilot.common.params import Params, put_nonblocking from openpilot.common.realtime import config_realtime_process, DT_MDL from openpilot.common.numpy_fast import clip from openpilot.selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States @@ -247,7 +247,7 @@ def main(sm=None, pm=None): 'stiffnessFactor': liveParameters.stiffnessFactor, 'angleOffsetAverageDeg': liveParameters.angleOffsetAverageDeg, } - params_reader.put_nonblocking("LiveParameters", json.dumps(params)) + put_nonblocking("LiveParameters", json.dumps(params)) pm.send('liveParameters', msg) diff --git a/selfdrive/monitoring/dmonitoringd.py b/selfdrive/monitoring/dmonitoringd.py index 0cd55652af..c7cff88f3e 100755 --- a/selfdrive/monitoring/dmonitoringd.py +++ b/selfdrive/monitoring/dmonitoringd.py @@ -4,7 +4,7 @@ import gc import cereal.messaging as messaging from cereal import car from cereal import log -from openpilot.common.params import Params +from openpilot.common.params import Params, put_bool_nonblocking from openpilot.common.realtime import set_realtime_priority from openpilot.selfdrive.controls.lib.events import Events from openpilot.selfdrive.monitoring.driver_monitor import DriverStatus @@ -14,15 +14,13 @@ def dmonitoringd_thread(sm=None, pm=None): gc.disable() set_realtime_priority(2) - params = Params() - if pm is None: pm = messaging.PubMaster(['driverMonitoringState']) if sm is None: sm = messaging.SubMaster(['driverStateV2', 'liveCalibration', 'carState', 'controlsState', 'modelV2'], poll=['driverStateV2']) - driver_status = DriverStatus(rhd_saved=params.get_bool("IsRhdDetected")) + driver_status = DriverStatus(rhd_saved=Params().get_bool("IsRhdDetected")) sm['liveCalibration'].calStatus = log.LiveCalibrationData.Status.invalid sm['liveCalibration'].rpyCalib = [0, 0, 0] @@ -89,7 +87,7 @@ def dmonitoringd_thread(sm=None, pm=None): if (sm['driverStateV2'].frameId % 6000 == 0 and driver_status.wheelpos_learner.filtered_stat.n > driver_status.settings._WHEELPOS_FILTER_MIN_COUNT and driver_status.wheel_on_right == (driver_status.wheelpos_learner.filtered_stat.M > driver_status.settings._WHEELPOS_THRESHOLD)): - params.put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right) + put_bool_nonblocking("IsRhdDetected", driver_status.wheel_on_right) def main(sm=None, pm=None): dmonitoringd_thread(sm, pm) diff --git a/selfdrive/thermald/power_monitoring.py b/selfdrive/thermald/power_monitoring.py index 9d3720dac1..0b3c73a1c0 100644 --- a/selfdrive/thermald/power_monitoring.py +++ b/selfdrive/thermald/power_monitoring.py @@ -2,7 +2,7 @@ import time import threading from typing import Optional -from openpilot.common.params import Params +from openpilot.common.params import Params, put_nonblocking from openpilot.system.hardware import HARDWARE from openpilot.system.swaglog import cloudlog from openpilot.selfdrive.statsd import statlog @@ -60,7 +60,7 @@ class PowerMonitoring: self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0) self.car_battery_capacity_uWh = min(self.car_battery_capacity_uWh, CAR_BATTERY_CAPACITY_uWh) if now - self.last_save_time >= 10: - self.params.put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh))) + put_nonblocking("CarBatteryCapacity", str(int(self.car_battery_capacity_uWh))) self.last_save_time = now # First measurement, set integration time