|
|
@ -102,7 +102,21 @@ class CerealProxyRunner: |
|
|
|
await asyncio.sleep(0.01) |
|
|
|
await asyncio.sleep(0.01) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DynamicPubMaster(messaging.PubMaster): |
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
|
|
|
|
|
super().__init__(*args, **kwargs) |
|
|
|
|
|
|
|
self.lock = asyncio.Lock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def add_services_if_needed(self, services): |
|
|
|
|
|
|
|
async with self.lock: |
|
|
|
|
|
|
|
for service in services: |
|
|
|
|
|
|
|
if service not in self.sock: |
|
|
|
|
|
|
|
self.sock[service] = messaging.pub_sock(service) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StreamSession: |
|
|
|
class StreamSession: |
|
|
|
|
|
|
|
shared_pub_master = DynamicPubMaster([]) |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False): |
|
|
|
def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], outgoing_services: list[str], debug_mode: bool = False): |
|
|
|
from aiortc.mediastreams import VideoStreamTrack, AudioStreamTrack |
|
|
|
from aiortc.mediastreams import VideoStreamTrack, AudioStreamTrack |
|
|
|
from aiortc.contrib.media import MediaBlackhole |
|
|
|
from aiortc.contrib.media import MediaBlackhole |
|
|
@ -129,10 +143,11 @@ class StreamSession: |
|
|
|
self.identifier = str(uuid.uuid4()) |
|
|
|
self.identifier = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
|
|
|
self.incoming_bridge: CerealIncomingMessageProxy | None = None |
|
|
|
self.incoming_bridge: CerealIncomingMessageProxy | None = None |
|
|
|
|
|
|
|
self.incoming_bridge_services = incoming_services |
|
|
|
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None |
|
|
|
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None |
|
|
|
self.outgoing_bridge_runner: CerealProxyRunner | None = None |
|
|
|
self.outgoing_bridge_runner: CerealProxyRunner | None = None |
|
|
|
if len(incoming_services) > 0: |
|
|
|
if len(incoming_services) > 0: |
|
|
|
self.incoming_bridge = CerealIncomingMessageProxy(messaging.PubMaster(incoming_services)) |
|
|
|
self.incoming_bridge = CerealIncomingMessageProxy(self.shared_pub_master) |
|
|
|
if len(outgoing_services) > 0: |
|
|
|
if len(outgoing_services) > 0: |
|
|
|
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) |
|
|
|
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) |
|
|
|
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) |
|
|
|
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) |
|
|
@ -168,6 +183,7 @@ class StreamSession: |
|
|
|
await self.stream.wait_for_connection() |
|
|
|
await self.stream.wait_for_connection() |
|
|
|
if self.stream.has_messaging_channel(): |
|
|
|
if self.stream.has_messaging_channel(): |
|
|
|
if self.incoming_bridge is not None: |
|
|
|
if self.incoming_bridge is not None: |
|
|
|
|
|
|
|
await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) |
|
|
|
self.stream.set_message_handler(self.message_handler) |
|
|
|
self.stream.set_message_handler(self.message_handler) |
|
|
|
if self.outgoing_bridge_runner is not None: |
|
|
|
if self.outgoing_bridge_runner is not None: |
|
|
|
channel = self.stream.get_messaging_channel() |
|
|
|
channel = self.stream.get_messaging_channel() |
|
|
|