improve webrtc stack for use in camera focusing (#36268)

* made LiveStreamVideoStreamTrack use system time to calculate pts

* fixes as requested

* Align panda submodule with master (panda@615009c)

* made loggerd accept a run time env variable to pick stream bitrate

* added /notify endpoint to send json to all session's data channel

* fixed static analysis error

* adapted webrtc stream test to new pts calculation method

* fixed static erro

* fixed wrong indent

* fixed import order

* delete accidental newline

* remove excess spaces

Co-authored-by: Maxime Desroches <desroches.maxime@gmail.com>

* remove excess spaces

Co-authored-by: Maxime Desroches <desroches.maxime@gmail.com>

* changed exeption handling based on review

* fixed typo on exception handling

---------

Co-authored-by: Maxime Desroches <desroches.maxime@gmail.com>
pull/33601/merge
kostas.pats 1 day ago committed by GitHub
parent 226465e882
commit dcc5afa8fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      system/loggerd/loggerd.h
  2. 8
      system/webrtc/device/video.py
  3. 7
      system/webrtc/tests/test_stream_session.py
  4. 15
      system/webrtc/webrtcd.py

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <cstdlib>
#include <vector> #include <vector>
#include "cereal/messaging/messaging.h" #include "cereal/messaging/messaging.h"
@ -46,7 +47,8 @@ struct EncoderSettings {
} }
static EncoderSettings StreamEncoderSettings() { static EncoderSettings StreamEncoderSettings() {
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = 1'000'000, .gop_size = 15}; int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000;
return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15};
} }
}; };

@ -1,4 +1,5 @@
import asyncio import asyncio
import time
import av import av
from teleoprtc.tracks import TiciVideoStreamTrack from teleoprtc.tracks import TiciVideoStreamTrack
@ -20,6 +21,7 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True)
self._pts = 0 self._pts = 0
self._t0_ns = time.monotonic_ns()
async def recv(self): async def recv(self):
while True: while True:
@ -32,10 +34,10 @@ class LiveStreamVideoStreamTrack(TiciVideoStreamTrack):
packet = av.Packet(evta.header + evta.data) packet = av.Packet(evta.header + evta.data)
packet.time_base = self._time_base packet.time_base = self._time_base
packet.pts = self._pts
self.log_debug("track sending frame %s", self._pts) self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000
self._pts += self._dt * self._clock_rate packet.pts = self._pts
self.log_debug("track sending frame %d", self._pts)
return packet return packet

@ -1,5 +1,6 @@
import asyncio import asyncio
import json import json
import time
# for aiortc and its dependencies # for aiortc and its dependencies
import warnings import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) warnings.filterwarnings("ignore", category=DeprecationWarning)
@ -14,7 +15,6 @@ from cereal import messaging, log
from openpilot.system.webrtc.webrtcd import CerealOutgoingMessageProxy, CerealIncomingMessageProxy from openpilot.system.webrtc.webrtcd import CerealOutgoingMessageProxy, CerealIncomingMessageProxy
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
from openpilot.system.webrtc.device.audio import AudioInputStreamTrack from openpilot.system.webrtc.device.audio import AudioInputStreamTrack
from openpilot.common.realtime import DT_DMON
class TestStreamSession: class TestStreamSession:
@ -81,7 +81,10 @@ class TestStreamSession:
for i in range(5): for i in range(5):
packet = self.loop.run_until_complete(track.recv()) packet = self.loop.run_until_complete(track.recv())
assert packet.time_base == VIDEO_TIME_BASE assert packet.time_base == VIDEO_TIME_BASE
assert packet.pts == int(i * DT_DMON * VIDEO_CLOCK_RATE) if i == 0:
start_ns = time.monotonic_ns()
start_pts = packet.pts
assert abs(i + packet.pts - (start_pts + (((time.monotonic_ns() - start_ns) * VIDEO_CLOCK_RATE) // 1_000_000_000))) < 450 #5ms
assert packet.size == 0 assert packet.size == 0
def test_input_audio_track(self, mocker): def test_input_audio_track(self, mocker):

@ -239,6 +239,20 @@ async def get_schema(request: 'web.Request'):
schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services} schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services}
return web.json_response(schema_dict) return web.json_response(schema_dict)
async def post_notify(request: 'web.Request'):
try:
payload = await request.json()
except Exception as e:
raise web.HTTPBadRequest(text="Invalid JSON") from e
for session in list(request.app.get('streams', {}).values()):
try:
ch = session.stream.get_messaging_channel()
ch.send(json.dumps(payload))
except Exception:
continue
return web.Response(status=200, text="OK")
async def on_shutdown(app: 'web.Application'): async def on_shutdown(app: 'web.Application'):
for session in app['streams'].values(): for session in app['streams'].values():
@ -258,6 +272,7 @@ def webrtcd_thread(host: str, port: int, debug: bool):
app['debug'] = debug app['debug'] = debug
app.on_shutdown.append(on_shutdown) app.on_shutdown.append(on_shutdown)
app.router.add_post("/stream", get_stream) app.router.add_post("/stream", get_stream)
app.router.add_post("/notify", post_notify)
app.router.add_get("/schema", get_schema) app.router.add_get("/schema", get_schema)
web.run_app(app, host=host, port=port) web.run_app(app, host=host, port=port)

Loading…
Cancel
Save