diff --git a/cereal/messaging/__init__.py b/cereal/messaging/__init__.py index 9646047de3..6e97318824 100644 --- a/cereal/messaging/__init__.py +++ b/cereal/messaging/__init__.py @@ -92,6 +92,45 @@ def recv_one_retry(sock: SubSocket) -> capnp.lib.capnp._DynamicStructReader: return log_from_bytes(dat) +class FrequencyTracker: + def __init__(self, service_freq: float, update_freq: float, is_poll: bool): + freq = max(min(service_freq, update_freq), 1.) + if is_poll: + min_freq = max_freq = freq + else: + max_freq = min(freq, update_freq) + if service_freq >= 2 * update_freq: + min_freq = update_freq + elif update_freq >= 2* service_freq: + min_freq = freq + else: + min_freq = min(freq, freq / 2.) + + self.min_freq = min_freq * 0.8 + self.max_freq = max_freq * 1.2 + self.recv_dts: Deque[float] = deque(maxlen=int(10 * freq)) + self.prev_time = 0.0 + + def record_recv_time(self, cur_time: float) -> None: + # TODO: Handle case where cur_time is less than prev_time + if self.prev_time > 1e-5: + self.recv_dts.append(cur_time - self.prev_time) + self.prev_time = cur_time + + @property + def valid(self) -> bool: + if not self.recv_dts: + return False + + avg_freq = len(self.recv_dts) / sum(self.recv_dts) + if self.min_freq <= avg_freq <= self.max_freq: + return True + + recent_dts = list(self.recv_dts)[-int(self.recv_dts.maxlen / 10):] + avg_freq_recent = len(recent_dts) / sum(recent_dts) + return self.min_freq <= avg_freq_recent <= self.max_freq + + class SubMaster: def __init__(self, services: List[str], poll: Optional[str] = None, ignore_alive: Optional[List[str]] = None, ignore_avg_freq: Optional[List[str]] = None, @@ -103,15 +142,12 @@ class SubMaster: self.recv_frame = {s: 0 for s in services} self.alive = {s: False for s in services} self.freq_ok = {s: False for s in services} - self.recv_dts: Dict[str, Deque[float]] = {} self.sock = {} self.data = {} self.valid = {} self.logMonoTime = {} - self.max_freq = {} - self.min_freq = {} - + self.freq_tracker: Dict[str, FrequencyTracker] = {} self.poller = Poller() polled_services = set([poll, ] if poll is not None else services) self.non_polled_services = set(services) - polled_services @@ -138,22 +174,7 @@ class SubMaster: self.data[s] = getattr(data.as_reader(), s) self.logMonoTime[s] = 0 self.valid[s] = True # FIXME: this should default to False - - freq = max(min([SERVICE_LIST[s].frequency, self.update_freq]), 1.) - if s == poll: - max_freq = freq - min_freq = freq - else: - max_freq = min(freq, self.update_freq) - if SERVICE_LIST[s].frequency >= 2*self.update_freq: - min_freq = self.update_freq - elif self.update_freq >= 2*SERVICE_LIST[s].frequency: - min_freq = freq - else: - min_freq = min(freq, freq / 2.) - self.max_freq[s] = max_freq*1.2 - self.min_freq[s] = min_freq*0.8 - self.recv_dts[s] = deque(maxlen=int(10*freq)) + self.freq_tracker[s] = FrequencyTracker(SERVICE_LIST[s].frequency, self.update_freq, s == poll) def __getitem__(self, s: str) -> capnp.lib.capnp._DynamicStructReader: return self.data[s] @@ -182,8 +203,7 @@ class SubMaster: self.seen[s] = True self.updated[s] = True - if self.recv_time[s] > 1e-5: - self.recv_dts[s].append(cur_time - self.recv_time[s]) + self.freq_tracker[s].record_recv_time(cur_time) self.recv_time[s] = cur_time self.recv_frame[s] = self.frame self.data[s] = getattr(msg, s) @@ -194,27 +214,10 @@ class SubMaster: if SERVICE_LIST[s].frequency > 1e-5 and not self.simulation: # alive if delay is within 10x the expected frequency self.alive[s] = (cur_time - self.recv_time[s]) < (10. / SERVICE_LIST[s].frequency) - - # check average frequency; slow to fall, quick to recover - dts = self.recv_dts[s] - assert dts.maxlen is not None - recent_dts = list(dts)[-int(dts.maxlen / 10):] - try: - avg_freq = 1 / (sum(dts) / len(dts)) - avg_freq_recent = 1 / (sum(recent_dts) / len(recent_dts)) - except ZeroDivisionError: - avg_freq = 0 - avg_freq_recent = 0 - - avg_freq_ok = self.min_freq[s] <= avg_freq <= self.max_freq[s] - recent_freq_ok = self.min_freq[s] <= avg_freq_recent <= self.max_freq[s] - self.freq_ok[s] = avg_freq_ok or recent_freq_ok + self.freq_ok[s] = self.freq_tracker[s].valid else: self.freq_ok[s] = True - if self.simulation: - self.alive[s] = self.seen[s] # alive is defined as seen when simulation flag set - else: - self.alive[s] = True + self.alive[s] = self.seen[s] if self.simulation else True def all_alive(self, service_list: Optional[List[str]] = None) -> bool: if service_list is None: diff --git a/cereal/messaging/tests/test_pub_sub_master.py b/cereal/messaging/tests/test_pub_sub_master.py index ba5b397aad..99965319eb 100644 --- a/cereal/messaging/tests/test_pub_sub_master.py +++ b/cereal/messaging/tests/test_pub_sub_master.py @@ -89,8 +89,8 @@ class TestSubMaster: for service, (max_freq, min_freq) in checks.items(): if max_freq is not None: assert sm._check_avg_freq(service) - assert sm.max_freq[service] == max_freq*1.2 - assert sm.min_freq[service] == min_freq*0.8 + assert sm.freq_tracker[service].max_freq == max_freq*1.2 + assert sm.freq_tracker[service].min_freq == min_freq*0.8 else: assert not sm._check_avg_freq(service) diff --git a/selfdrive/test/profiling/lib.py b/selfdrive/test/profiling/lib.py index 93ba215386..62bb305ca8 100644 --- a/selfdrive/test/profiling/lib.py +++ b/selfdrive/test/profiling/lib.py @@ -1,4 +1,4 @@ -from collections import defaultdict, deque +from collections import defaultdict from cereal.services import SERVICE_LIST import cereal.messaging as messaging import capnp @@ -45,7 +45,7 @@ class SubMaster(messaging.SubMaster): self.rcv_frame = {s: 0 for s in services} self.valid = {s: True for s in services} self.freq_ok = {s: True for s in services} - self.recv_dts = {s: deque([0.0] * messaging.AVG_FREQ_HISTORY, maxlen=messaging.AVG_FREQ_HISTORY) for s in services} + self.freq_tracker = {s: messaging.FrequencyTracker(SERVICE_LIST[s].frequency, SERVICE_LIST[s].frequency, False) for s in services} self.logMonoTime = {} self.sock = {} self.freq = {}