process_replay: ability to permanently unlock some sockets (#29070)

Add unlocked_pubs for keeping some sockets permanently unlocked
pull/29057/head^2
Kacper Rączy 2 years ago committed by GitHub
parent 2ff33663a7
commit 81e8cf414e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      selfdrive/test/process_replay/process_replay.py

@ -39,6 +39,7 @@ class ReplayContext:
self.pubs = cfg.pubs
self.main_pub = cfg.main_pub
self.main_pub_drained = cfg.main_pub_drained
self.unlocked_pubs = cfg.unlocked_pubs
assert(len(self.pubs) != 0 or self.main_pub is not None)
def __enter__(self):
@ -55,7 +56,8 @@ class ReplayContext:
if self.main_pub is None:
self.events = OrderedDict()
for pub in self.pubs:
pubs_with_events = [pub for pub in self.pubs if pub not in self.unlocked_pubs]
for pub in pubs_with_events:
self.events[pub] = messaging.fake_event_handle(pub, enable=True)
else:
self.events = {self.main_pub: messaging.fake_event_handle(self.main_pub, enable=True)}
@ -117,6 +119,7 @@ class ProcessConfig:
main_pub_drained: bool = True
vision_pubs: List[str] = field(default_factory=list)
ignore_alive_pubs: List[str] = field(default_factory=list)
unlocked_pubs: List[str] = field(default_factory=list)
class ProcessContainer:

Loading…
Cancel
Save