From 61bec65f32c46e26cebedd394479d64bbba2e766 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Sat, 22 Feb 2025 06:47:21 +0800 Subject: [PATCH] common: add MovingAverage class for real-time windowed average calculation (#34569) * add MovingAverage class for real-time windowed average calculation * move to util.py --- cereal/messaging/__init__.py | 27 +++++++++------------------ common/realtime.py | 11 ++++++----- common/util.py | 24 ++++++++++++++++++++++++ 3 files changed, 39 insertions(+), 23 deletions(-) create mode 100644 common/util.py diff --git a/cereal/messaging/__init__.py b/cereal/messaging/__init__.py index 2466f6e9c0..8ad956b61b 100644 --- a/cereal/messaging/__init__.py +++ b/cereal/messaging/__init__.py @@ -9,11 +9,11 @@ import os import capnp import time -from typing import Optional, List, Union, Dict, Deque -from collections import deque +from typing import Optional, List, Union, Dict from cereal import log from cereal.services import SERVICE_LIST +from openpilot.common.util import MovingAverage NO_TRAVERSAL_LIMIT = 2**64-1 @@ -108,10 +108,8 @@ class FrequencyTracker: self.min_freq = min_freq * 0.8 self.max_freq = max_freq * 1.2 - self.recv_dts: Deque[float] = deque(maxlen=int(10 * freq)) - self.recv_dts_sum = 0.0 - self.recent_recv_dts: Deque[float] = deque(maxlen=int(freq)) - self.recent_recv_dts_sum = 0.0 + self.avg_dt = MovingAverage(int(10 * freq)) + self.recent_avg_dt = MovingAverage(int(freq)) self.prev_time = 0.0 def record_recv_time(self, cur_time: float) -> None: @@ -119,28 +117,21 @@ class FrequencyTracker: if self.prev_time > 1e-5: dt = cur_time - self.prev_time - if len(self.recv_dts) == self.recv_dts.maxlen: - self.recv_dts_sum -= self.recv_dts[0] - self.recv_dts.append(dt) - self.recv_dts_sum += dt - - if len(self.recent_recv_dts) == self.recent_recv_dts.maxlen: - self.recent_recv_dts_sum -= self.recent_recv_dts[0] - self.recent_recv_dts.append(dt) - self.recent_recv_dts_sum += dt + self.avg_dt.add_value(dt) + self.recent_avg_dt.add_value(dt) self.prev_time = cur_time @property def valid(self) -> bool: - if not self.recv_dts: + if self.avg_dt.count == 0: return False - avg_freq = len(self.recv_dts) / self.recv_dts_sum + avg_freq = 1.0 / self.avg_dt.get_average() if self.min_freq <= avg_freq <= self.max_freq: return True - avg_freq_recent = len(self.recent_recv_dts) / self.recent_recv_dts_sum + avg_freq_recent = 1.0 / self.recent_avg_dt.get_average() return self.min_freq <= avg_freq_recent <= self.max_freq diff --git a/common/realtime.py b/common/realtime.py index 0178692415..82176a00a6 100644 --- a/common/realtime.py +++ b/common/realtime.py @@ -2,10 +2,10 @@ import gc import os import time -from collections import deque from setproctitle import getproctitle +from openpilot.common.util import MovingAverage from openpilot.system.hardware import PC @@ -48,10 +48,12 @@ class Ratekeeper: self._frame = 0 self._remaining = 0.0 self._process_name = getproctitle() - self._dts = deque([self._interval], maxlen=100) self._last_monitor_time = -1. self._next_frame_time = -1. + self.avg_dt = MovingAverage(100) + self.avg_dt.add_value(self._interval) + @property def frame(self) -> int: return self._frame @@ -62,9 +64,8 @@ class Ratekeeper: @property def lagging(self) -> bool: - avg_dt = sum(self._dts) / len(self._dts) expected_dt = self._interval * (1 / 0.9) - return avg_dt > expected_dt + return self.avg_dt.get_average() > expected_dt # Maintain loop rate by calling this at the end of each loop def keep_time(self) -> bool: @@ -81,7 +82,7 @@ class Ratekeeper: prev = self._last_monitor_time self._last_monitor_time = time.monotonic() - self._dts.append(self._last_monitor_time - prev) + self.avg_dt.add_value(self._last_monitor_time - prev) lagged = False remaining = self._next_frame_time - time.monotonic() diff --git a/common/util.py b/common/util.py new file mode 100644 index 0000000000..33885a9024 --- /dev/null +++ b/common/util.py @@ -0,0 +1,24 @@ +class MovingAverage: + def __init__(self, window_size: int): + self.window_size: int = window_size + self.buffer: list[float] = [0.0] * window_size + self.index: int = 0 + self.count: int = 0 + self.sum: float = 0.0 + + def add_value(self, new_value: float): + # Update the sum: subtract the value being replaced and add the new value + self.sum -= self.buffer[self.index] + self.buffer[self.index] = new_value + self.sum += new_value + + # Update the index in a circular manner + self.index = (self.index + 1) % self.window_size + + # Track the number of added values (for partial windows) + self.count = min(self.count + 1, self.window_size) + + def get_average(self) -> float: + if self.count == 0: + return float('nan') + return self.sum / self.count