#!/usr/bin/env python3
import pytest
import asyncio
import sys
from aiortc . mediastreams import AudioStreamTrack , VideoStreamTrack
from parameterized import parameterized
from teleoprtc . builder import WebRTCOfferBuilder , WebRTCAnswerBuilder
from teleoprtc . stream import StreamingOffer
from teleoprtc . info import parse_info_from_offer
if sys . version_info > = ( 3 , 11 ) :
timeout = asyncio . timeout
else :
class Timeout :
def __init__ ( self , delay : float ) :
self . _delay = delay
self . _task = None
self . _timeout_handle = None
def _timeout ( self ) :
if self . _task :
self . _task . cancel ( )
async def __aenter__ ( self ) :
self . _task = asyncio . current_task ( )
loop = asyncio . events . get_running_loop ( )
self . _timeout_handle = loop . call_later ( self . _delay , self . _timeout )
return self
async def __aexit__ ( self , exc_type , exc , tb ) :
if self . _timeout_handle :
self . _timeout_handle . cancel ( )
if exc_type is asyncio . CancelledError and self . _task and self . _task . cancelled ( ) :
raise asyncio . TimeoutError from exc
return False
def timeout ( delay ) :
return Timeout ( delay )
class SimpleAnswerProvider :
def __init__ ( self ) :
self . stream = None
async def __call__ ( self , offer : StreamingOffer ) :
assert self . stream is None , " This may only be called once "
info = parse_info_from_offer ( offer . sdp )
builder = WebRTCAnswerBuilder ( offer . sdp )
for cam in offer . video :
builder . add_video_stream ( cam , VideoStreamTrack ( ) )
if info . expected_audio_track :
builder . add_audio_stream ( AudioStreamTrack ( ) )
if info . incoming_audio_track :
builder . offer_to_receive_audio_stream ( )
self . stream = builder . stream ( )
answer = await self . stream . start ( )
return answer
@pytest . mark . asyncio
class TestStreamIntegration :
@parameterized . expand ( [
# name, recv_cameras, recv_audio, messaging
( " multi_camera " , [ " driver " , " wideRoad " , " road " ] , False , False ) ,
( " camera_and_audio " , [ " driver " ] , True , False ) ,
( " camera_and__messaging " , [ " driver " ] , False , True ) ,
( " camera_and_audio_and_messaging " , [ " driver " , " wideRoad " , " road " ] , True , True ) ,
] )
async def test_multi_camera ( self , name , cameras , recv_audio , add_messaging ) :
simple_answerer = SimpleAnswerProvider ( )
offer_builder = WebRTCOfferBuilder ( simple_answerer )
for cam in cameras :
offer_builder . offer_to_receive_video_stream ( cam )
if recv_audio :
offer_builder . offer_to_receive_audio_stream ( )
if add_messaging :
offer_builder . add_messaging ( )
stream = offer_builder . stream ( )
_ = await stream . start ( )
assert stream . is_started
try :
async with timeout ( 2 ) :
await stream . wait_for_connection ( )
except TimeoutError :
pytest . fail ( " Timed out waiting for connection " )
assert stream . is_connected_and_ready
assert stream . has_messaging_channel ( ) == add_messaging
if stream . has_messaging_channel ( ) :
channel = stream . get_messaging_channel ( )
assert channel is not None
assert channel . readyState == " open "
assert stream . has_incoming_audio_track ( ) == recv_audio
if stream . has_incoming_audio_track ( ) :
track = stream . get_incoming_audio_track ( False )
assert track is not None
assert track . readyState == " live "
assert track . kind == " audio "
# test audio recv
try :
async with timeout ( 1 ) :
await track . recv ( )
except TimeoutError :
pytest . fail ( " Timed out waiting for audio frame " )
for cam in cameras :
assert stream . has_incoming_video_track ( cam )
if stream . has_incoming_video_track ( cam ) :
track = stream . get_incoming_video_track ( cam , False )
assert track is not None
assert track . readyState == " live "
assert track . kind == " video "
# test video recv
try :
async with timeout ( 1 ) :
await stream . get_incoming_video_track ( cam , False ) . recv ( )
except TimeoutError :
pytest . fail ( " Timed out waiting for video frame " )
await stream . stop ( )
await simple_answerer . stream . stop ( )
assert not stream . is_started
assert not stream . is_connected_and_ready