From 4efe8d811853cc85ad491fc814a8f0d1860692f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kacper=20R=C4=85czy?= Date: Thu, 18 Jul 2024 20:19:39 -0700 Subject: [PATCH] Fix msgq context overrides (#33014) * Fix msgq context overrides * Remove unneccesary changes * add reset_context helper * Remove redundant import old-commit-hash: c96dbd5a0b681c76b94191d1b928e29eb2b72119 --- cereal/messaging/__init__.py | 7 ++++++- selfdrive/debug/internal/measure_torque_time_to_max.py | 2 +- system/manager/process.py | 2 +- tools/camerastream/compressed_vipc.py | 7 ++++--- tools/lib/live_logreader.py | 2 +- tools/replay/rp_visualization.py | 2 +- tools/tuning/measure_steering_accuracy.py | 2 +- 7 files changed, 15 insertions(+), 9 deletions(-) diff --git a/cereal/messaging/__init__.py b/cereal/messaging/__init__.py index 8dfa42056d..9646047de3 100644 --- a/cereal/messaging/__init__.py +++ b/cereal/messaging/__init__.py @@ -2,7 +2,8 @@ from msgq.ipc_pyx import Context, Poller, SubSocket, PubSocket, SocketEventHandle, toggle_fake_events, \ set_fake_prefix, get_fake_prefix, delete_fake_prefix, wait_for_one_event from msgq.ipc_pyx import MultiplePublishersError, IpcError -from msgq import fake_event_handle, pub_sock, sub_sock, drain_sock_raw, context +from msgq import fake_event_handle, pub_sock, sub_sock, drain_sock_raw +import msgq import os import capnp @@ -17,6 +18,10 @@ from cereal.services import SERVICE_LIST NO_TRAVERSAL_LIMIT = 2**64-1 +def reset_context(): + msgq.context = Context() + + def log_from_bytes(dat: bytes, struct: capnp.lib.capnp._StructModule = log.Event) -> capnp.lib.capnp._DynamicStructReader: with struct.from_bytes(dat, traversal_limit_in_words=NO_TRAVERSAL_LIMIT) as msg: return msg diff --git a/selfdrive/debug/internal/measure_torque_time_to_max.py b/selfdrive/debug/internal/measure_torque_time_to_max.py index ef3152b50c..7052dccf7d 100755 --- a/selfdrive/debug/internal/measure_torque_time_to_max.py +++ b/selfdrive/debug/internal/measure_torque_time_to_max.py @@ -18,7 +18,7 @@ if __name__ == "__main__": if args.addr != "127.0.0.1": os.environ["ZMQ"] = "1" - messaging.context = messaging.Context() + messaging.reset_context() poller = messaging.Poller() messaging.sub_sock('can', poller, addr=args.addr) diff --git a/system/manager/process.py b/system/manager/process.py index 9214e417c1..c78d263dcc 100644 --- a/system/manager/process.py +++ b/system/manager/process.py @@ -30,7 +30,7 @@ def launcher(proc: str, name: str) -> None: setthreadname(proc) # create new context since we forked - messaging.context = messaging.Context() + messaging.reset_context() # add daemon name tag to logs cloudlog.bind(daemon=name) diff --git a/tools/camerastream/compressed_vipc.py b/tools/camerastream/compressed_vipc.py index 6c27e861ff..f531a289f9 100755 --- a/tools/camerastream/compressed_vipc.py +++ b/tools/camerastream/compressed_vipc.py @@ -8,6 +8,7 @@ import multiprocessing import time import signal + import cereal.messaging as messaging from msgq.visionipc import VisionIpcServer, VisionStreamType @@ -42,7 +43,7 @@ def decoder(addr, vipc_server, vst, nvidia, W, H, debug=False): codec = av.CodecContext.create("hevc", "r") os.environ["ZMQ"] = "1" - messaging.context = messaging.Context() + messaging.reset_context() sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False) cnt = 0 last_idx = -1 @@ -109,12 +110,12 @@ class CompressedVipc: def __init__(self, addr, vision_streams, nvidia=False, debug=False): print("getting frame sizes") os.environ["ZMQ"] = "1" - messaging.context = messaging.Context() + messaging.reset_context() sm = messaging.SubMaster([ENCODE_SOCKETS[s] for s in vision_streams], addr=addr) while min(sm.recv_frame.values()) == 0: sm.update(100) os.environ.pop("ZMQ") - messaging.context = messaging.Context() + messaging.reset_context() self.vipc_server = VisionIpcServer("camerad") for vst in vision_streams: diff --git a/tools/lib/live_logreader.py b/tools/lib/live_logreader.py index 6a7ecee6fd..edc4ac1611 100644 --- a/tools/lib/live_logreader.py +++ b/tools/lib/live_logreader.py @@ -10,7 +10,7 @@ ALL_SERVICES = list(SERVICE_LIST.keys()) def raw_live_logreader(services: list[str] = ALL_SERVICES, addr: str = '127.0.0.1') -> RawLogIterable: if addr != "127.0.0.1": os.environ["ZMQ"] = "1" - messaging.context = messaging.Context() + messaging.reset_context() poller = messaging.Poller() diff --git a/tools/replay/rp_visualization.py b/tools/replay/rp_visualization.py index 853a83c150..01058967b2 100755 --- a/tools/replay/rp_visualization.py +++ b/tools/replay/rp_visualization.py @@ -53,7 +53,7 @@ if __name__ == "__main__": args = get_arg_parser().parse_args(sys.argv[1:]) if args.ip_address != "127.0.0.1": os.environ["ZMQ"] = "1" - messaging.context = messaging.Context() + messaging.reset_context() rr.init("RadarPoints", spawn= True) rr.log("tracks", rr.AnnotationContext(rerunColorPalette), static=True) visualize(args.ip_address) diff --git a/tools/tuning/measure_steering_accuracy.py b/tools/tuning/measure_steering_accuracy.py index 6abf1338dc..0b8916343e 100755 --- a/tools/tuning/measure_steering_accuracy.py +++ b/tools/tuning/measure_steering_accuracy.py @@ -147,7 +147,7 @@ if __name__ == "__main__": else: if args.addr != "127.0.0.1": os.environ["ZMQ"] = "1" - messaging.context = messaging.Context() + messaging.reset_context() carControl = messaging.sub_sock('carControl', addr=args.addr, conflate=True) sm = messaging.SubMaster(['carState', 'carControl', 'carOutput', 'controlsState', 'modelV2'], addr=args.addr)