cereal/SubMaster: encapsulate frequency management in FrequencyTracker (#33252)

* encapsulate frequency management

* apply reviews

* early return, avoiding unnecessary calculations

* simplify avg freq calc
old-commit-hash: b14fca78e0
pull/33386/head
Dean Lee 8 months ago committed by GitHub
parent 3869d2cdf9
commit 2119f7774e
  1. 85
      cereal/messaging/__init__.py
  2. 4
      cereal/messaging/tests/test_pub_sub_master.py
  3. 4
      selfdrive/test/profiling/lib.py

@ -92,6 +92,45 @@ def recv_one_retry(sock: SubSocket) -> capnp.lib.capnp._DynamicStructReader:
return log_from_bytes(dat) return log_from_bytes(dat)
class FrequencyTracker:
def __init__(self, service_freq: float, update_freq: float, is_poll: bool):
freq = max(min(service_freq, update_freq), 1.)
if is_poll:
min_freq = max_freq = freq
else:
max_freq = min(freq, update_freq)
if service_freq >= 2 * update_freq:
min_freq = update_freq
elif update_freq >= 2* service_freq:
min_freq = freq
else:
min_freq = min(freq, freq / 2.)
self.min_freq = min_freq * 0.8
self.max_freq = max_freq * 1.2
self.recv_dts: Deque[float] = deque(maxlen=int(10 * freq))
self.prev_time = 0.0
def record_recv_time(self, cur_time: float) -> None:
# TODO: Handle case where cur_time is less than prev_time
if self.prev_time > 1e-5:
self.recv_dts.append(cur_time - self.prev_time)
self.prev_time = cur_time
@property
def valid(self) -> bool:
if not self.recv_dts:
return False
avg_freq = len(self.recv_dts) / sum(self.recv_dts)
if self.min_freq <= avg_freq <= self.max_freq:
return True
recent_dts = list(self.recv_dts)[-int(self.recv_dts.maxlen / 10):]
avg_freq_recent = len(recent_dts) / sum(recent_dts)
return self.min_freq <= avg_freq_recent <= self.max_freq
class SubMaster: class SubMaster:
def __init__(self, services: List[str], poll: Optional[str] = None, def __init__(self, services: List[str], poll: Optional[str] = None,
ignore_alive: Optional[List[str]] = None, ignore_avg_freq: Optional[List[str]] = None, ignore_alive: Optional[List[str]] = None, ignore_avg_freq: Optional[List[str]] = None,
@ -103,15 +142,12 @@ class SubMaster:
self.recv_frame = {s: 0 for s in services} self.recv_frame = {s: 0 for s in services}
self.alive = {s: False for s in services} self.alive = {s: False for s in services}
self.freq_ok = {s: False for s in services} self.freq_ok = {s: False for s in services}
self.recv_dts: Dict[str, Deque[float]] = {}
self.sock = {} self.sock = {}
self.data = {} self.data = {}
self.valid = {} self.valid = {}
self.logMonoTime = {} self.logMonoTime = {}
self.max_freq = {} self.freq_tracker: Dict[str, FrequencyTracker] = {}
self.min_freq = {}
self.poller = Poller() self.poller = Poller()
polled_services = set([poll, ] if poll is not None else services) polled_services = set([poll, ] if poll is not None else services)
self.non_polled_services = set(services) - polled_services self.non_polled_services = set(services) - polled_services
@ -138,22 +174,7 @@ class SubMaster:
self.data[s] = getattr(data.as_reader(), s) self.data[s] = getattr(data.as_reader(), s)
self.logMonoTime[s] = 0 self.logMonoTime[s] = 0
self.valid[s] = True # FIXME: this should default to False self.valid[s] = True # FIXME: this should default to False
self.freq_tracker[s] = FrequencyTracker(SERVICE_LIST[s].frequency, self.update_freq, s == poll)
freq = max(min([SERVICE_LIST[s].frequency, self.update_freq]), 1.)
if s == poll:
max_freq = freq
min_freq = freq
else:
max_freq = min(freq, self.update_freq)
if SERVICE_LIST[s].frequency >= 2*self.update_freq:
min_freq = self.update_freq
elif self.update_freq >= 2*SERVICE_LIST[s].frequency:
min_freq = freq
else:
min_freq = min(freq, freq / 2.)
self.max_freq[s] = max_freq*1.2
self.min_freq[s] = min_freq*0.8
self.recv_dts[s] = deque(maxlen=int(10*freq))
def __getitem__(self, s: str) -> capnp.lib.capnp._DynamicStructReader: def __getitem__(self, s: str) -> capnp.lib.capnp._DynamicStructReader:
return self.data[s] return self.data[s]
@ -182,8 +203,7 @@ class SubMaster:
self.seen[s] = True self.seen[s] = True
self.updated[s] = True self.updated[s] = True
if self.recv_time[s] > 1e-5: self.freq_tracker[s].record_recv_time(cur_time)
self.recv_dts[s].append(cur_time - self.recv_time[s])
self.recv_time[s] = cur_time self.recv_time[s] = cur_time
self.recv_frame[s] = self.frame self.recv_frame[s] = self.frame
self.data[s] = getattr(msg, s) self.data[s] = getattr(msg, s)
@ -194,27 +214,10 @@ class SubMaster:
if SERVICE_LIST[s].frequency > 1e-5 and not self.simulation: if SERVICE_LIST[s].frequency > 1e-5 and not self.simulation:
# alive if delay is within 10x the expected frequency # alive if delay is within 10x the expected frequency
self.alive[s] = (cur_time - self.recv_time[s]) < (10. / SERVICE_LIST[s].frequency) self.alive[s] = (cur_time - self.recv_time[s]) < (10. / SERVICE_LIST[s].frequency)
self.freq_ok[s] = self.freq_tracker[s].valid
# check average frequency; slow to fall, quick to recover
dts = self.recv_dts[s]
assert dts.maxlen is not None
recent_dts = list(dts)[-int(dts.maxlen / 10):]
try:
avg_freq = 1 / (sum(dts) / len(dts))
avg_freq_recent = 1 / (sum(recent_dts) / len(recent_dts))
except ZeroDivisionError:
avg_freq = 0
avg_freq_recent = 0
avg_freq_ok = self.min_freq[s] <= avg_freq <= self.max_freq[s]
recent_freq_ok = self.min_freq[s] <= avg_freq_recent <= self.max_freq[s]
self.freq_ok[s] = avg_freq_ok or recent_freq_ok
else: else:
self.freq_ok[s] = True self.freq_ok[s] = True
if self.simulation: self.alive[s] = self.seen[s] if self.simulation else True
self.alive[s] = self.seen[s] # alive is defined as seen when simulation flag set
else:
self.alive[s] = True
def all_alive(self, service_list: Optional[List[str]] = None) -> bool: def all_alive(self, service_list: Optional[List[str]] = None) -> bool:
if service_list is None: if service_list is None:

@ -89,8 +89,8 @@ class TestSubMaster:
for service, (max_freq, min_freq) in checks.items(): for service, (max_freq, min_freq) in checks.items():
if max_freq is not None: if max_freq is not None:
assert sm._check_avg_freq(service) assert sm._check_avg_freq(service)
assert sm.max_freq[service] == max_freq*1.2 assert sm.freq_tracker[service].max_freq == max_freq*1.2
assert sm.min_freq[service] == min_freq*0.8 assert sm.freq_tracker[service].min_freq == min_freq*0.8
else: else:
assert not sm._check_avg_freq(service) assert not sm._check_avg_freq(service)

@ -1,4 +1,4 @@
from collections import defaultdict, deque from collections import defaultdict
from cereal.services import SERVICE_LIST from cereal.services import SERVICE_LIST
import cereal.messaging as messaging import cereal.messaging as messaging
import capnp import capnp
@ -45,7 +45,7 @@ class SubMaster(messaging.SubMaster):
self.rcv_frame = {s: 0 for s in services} self.rcv_frame = {s: 0 for s in services}
self.valid = {s: True for s in services} self.valid = {s: True for s in services}
self.freq_ok = {s: True for s in services} self.freq_ok = {s: True for s in services}
self.recv_dts = {s: deque([0.0] * messaging.AVG_FREQ_HISTORY, maxlen=messaging.AVG_FREQ_HISTORY) for s in services} self.freq_tracker = {s: messaging.FrequencyTracker(SERVICE_LIST[s].frequency, SERVICE_LIST[s].frequency, False) for s in services}
self.logMonoTime = {} self.logMonoTime = {}
self.sock = {} self.sock = {}
self.freq = {} self.freq = {}

Loading…
Cancel
Save