From a566461294a706545e8ccbcd48aa7fb1a6fb9872 Mon Sep 17 00:00:00 2001 From: "Jan Alexander Steffens (heftig)" Date: Mon, 27 Jan 2020 14:05:31 +0100 Subject: [PATCH] rtmp2: Chunk messages as buffers in loop thread Move output chunking from gst_rtmp_connection_queue_message into gst_rtmp_connection_start_write, which effectively moves it from the streaming thread into the loop thread. This allows us to handle the outgoing chunk-size message (which is generated by changing the future chunk-size property) properly, which could come from any other thread. --- gst/rtmp2/rtmp/rtmpchunkstream.c | 2 +- gst/rtmp2/rtmp/rtmpconnection.c | 47 ++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/gst/rtmp2/rtmp/rtmpchunkstream.c b/gst/rtmp2/rtmp/rtmpchunkstream.c index cca8848815..1cdd68b26d 100644 --- a/gst/rtmp2/rtmp/rtmpchunkstream.c +++ b/gst/rtmp2/rtmp/rtmpchunkstream.c @@ -643,7 +643,7 @@ gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream, gst_rtmp_buffer_dump (buffer, ">>> message"); chunk_stream_clear (cstream); - chunk_stream_take_buffer (cstream, buffer); + chunk_stream_take_buffer (cstream, gst_buffer_ref (buffer)); return serialize_next (cstream, chunk_size, type); } diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c index 0733244d9b..7d78377796 100644 --- a/gst/rtmp2/rtmp/rtmpconnection.c +++ b/gst/rtmp2/rtmp/rtmpconnection.c @@ -464,17 +464,39 @@ static void gst_rtmp_connection_start_write (GstRtmpConnection * self) { GOutputStream *os; - GstBuffer *chunks; + GstBuffer *message, *chunks; + GstRtmpMeta *meta; + GstRtmpChunkStream *cstream; if (self->writing) { return; } - chunks = g_async_queue_try_pop (self->output_queue); - if (!chunks) { + message = g_async_queue_try_pop (self->output_queue); + if (!message) { return; } + meta = gst_buffer_get_rtmp_meta (message); + if (!meta) { + GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message); + goto out; + } + + cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream); + if (!cstream) { + GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT, + message); + goto out; + } + + chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message, + self->out_chunk_size); + if (!chunks) { + GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message); + goto out; + } + self->writing = TRUE; if (self->output_handler) { self->output_handler (self, self->output_handler_user_data); @@ -486,6 +508,9 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self) g_object_ref (self)); gst_buffer_unref (chunks); + +out: + gst_buffer_unref (message); } static void @@ -910,24 +935,10 @@ start_write (gpointer user_data) void gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer) { - GstRtmpMeta *meta; - GstRtmpChunkStream *cstream; - GstBuffer *chunks; - g_return_if_fail (GST_IS_RTMP_CONNECTION (self)); g_return_if_fail (GST_IS_BUFFER (buffer)); - meta = gst_buffer_get_rtmp_meta (buffer); - g_return_if_fail (meta); - - cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream); - g_return_if_fail (cstream); - - chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer, - self->out_chunk_size); - g_return_if_fail (chunks); - - g_async_queue_push (self->output_queue, chunks); + g_async_queue_push (self->output_queue, buffer); g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT, start_write, g_object_ref (self), g_object_unref); }