|
|
|
@ -128,9 +128,14 @@ class StreamSession: |
|
|
|
|
self.stream = builder.stream() |
|
|
|
|
self.identifier = str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
|
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) |
|
|
|
|
self.incoming_bridge = CerealIncomingMessageProxy(messaging.PubMaster(incoming_services)) |
|
|
|
|
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) |
|
|
|
|
self.incoming_bridge: CerealIncomingMessageProxy | None = None |
|
|
|
|
self.outgoing_bridge: CerealOutgoingMessageProxy | None = None |
|
|
|
|
self.outgoing_bridge_runner: CerealProxyRunner | None = None |
|
|
|
|
if len(incoming_services) > 0: |
|
|
|
|
self.incoming_bridge = CerealIncomingMessageProxy(messaging.PubMaster(incoming_services)) |
|
|
|
|
if len(outgoing_services) > 0: |
|
|
|
|
self.outgoing_bridge = CerealOutgoingMessageProxy(messaging.SubMaster(outgoing_services)) |
|
|
|
|
self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) |
|
|
|
|
|
|
|
|
|
self.audio_output: AudioOutputSpeaker | MediaBlackhole | None = None |
|
|
|
|
self.run_task: asyncio.Task | None = None |
|
|
|
@ -152,6 +157,7 @@ class StreamSession: |
|
|
|
|
return await self.stream.start() |
|
|
|
|
|
|
|
|
|
async def message_handler(self, message: bytes): |
|
|
|
|
assert self.incoming_bridge is not None |
|
|
|
|
try: |
|
|
|
|
self.incoming_bridge.send(message) |
|
|
|
|
except Exception as ex: |
|
|
|
@ -161,10 +167,12 @@ class StreamSession: |
|
|
|
|
try: |
|
|
|
|
await self.stream.wait_for_connection() |
|
|
|
|
if self.stream.has_messaging_channel(): |
|
|
|
|
self.stream.set_message_handler(self.message_handler) |
|
|
|
|
channel = self.stream.get_messaging_channel() |
|
|
|
|
self.outgoing_bridge_runner.proxy.add_channel(channel) |
|
|
|
|
self.outgoing_bridge_runner.start() |
|
|
|
|
if self.incoming_bridge is not None: |
|
|
|
|
self.stream.set_message_handler(self.message_handler) |
|
|
|
|
if self.outgoing_bridge_runner is not None: |
|
|
|
|
channel = self.stream.get_messaging_channel() |
|
|
|
|
self.outgoing_bridge_runner.proxy.add_channel(channel) |
|
|
|
|
self.outgoing_bridge_runner.start() |
|
|
|
|
if self.stream.has_incoming_audio_track(): |
|
|
|
|
track = self.stream.get_incoming_audio_track(buffered=False) |
|
|
|
|
self.audio_output = self.audio_output_cls() |
|
|
|
@ -181,7 +189,8 @@ class StreamSession: |
|
|
|
|
|
|
|
|
|
async def post_run_cleanup(self): |
|
|
|
|
await self.stream.stop() |
|
|
|
|
self.outgoing_bridge_runner.stop() |
|
|
|
|
if self.outgoing_bridge is not None: |
|
|
|
|
self.outgoing_bridge_runner.stop() |
|
|
|
|
if self.audio_output: |
|
|
|
|
self.audio_output.stop() |
|
|
|
|
|
|
|
|
|