From 032c0878b8f15ffecd372bc12d8be66de2608358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kacper=20R=C4=85czy?= Date: Tue, 5 Mar 2024 21:14:50 +0100 Subject: [PATCH] webrtcd: ability to have multiple streams publishing same message (#31700) Use single PubMaster with dynamic services --- system/webrtc/webrtcd.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 95c8ae337..51c86aacc 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -102,7 +102,21 @@ class CerealProxyRunner: await asyncio.sleep(0.01) +class DynamicPubMaster(messaging.PubMaster): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.lock = asyncio.Lock() + + async def add_services_if_needed(self, services): + async with self.lock: + for service in services: + if service not in self.sock: + self.sock[service] = messaging.pub_sock(service) + + class StreamSession: + shared_pub_master = DynamicPubMaster([]) + def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False): from aiortc.mediastreams import VideoStreamTrack, AudioStreamTrack from aiortc.contrib.media import MediaBlackhole @@ -129,10 +143,11 @@ class StreamSession: self.identifier = str(uuid.uuid4()) self.incoming_bridge: CerealIncomingMessageProxy | None = None + self.incoming_bridge_services = incoming_services self.outgoing_bridge: CerealOutgoingMessageProxy | None = None self.outgoing_bridge_runner: CerealProxyRunner | None = None if len(incoming_services) > 0: - self.incoming_bridge = CerealIncomingMessageProxy(messaging.PubMaster(incoming_services)) + self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master) if len(outgoing_services) > 0: self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) @@ -168,6 +183,7 @@ class StreamSession: await self.stream.wait_for_connection() if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: + await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel()