diff --git a/gst/rtmp2/rtmp/rtmpchunkstream.c b/gst/rtmp2/rtmp/rtmpchunkstream.c index cca8848815..1cdd68b26d 100644 --- a/gst/rtmp2/rtmp/rtmpchunkstream.c +++ b/gst/rtmp2/rtmp/rtmpchunkstream.c @@ -643,7 +643,7 @@ gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream, gst_rtmp_buffer_dump (buffer, ">>> message"); chunk_stream_clear (cstream); - chunk_stream_take_buffer (cstream, buffer); + chunk_stream_take_buffer (cstream, gst_buffer_ref (buffer)); return serialize_next (cstream, chunk_size, type); } diff --git a/gst/rtmp2/rtmp/rtmpconnection.c b/gst/rtmp2/rtmp/rtmpconnection.c index 0733244d9b..7d78377796 100644 --- a/gst/rtmp2/rtmp/rtmpconnection.c +++ b/gst/rtmp2/rtmp/rtmpconnection.c @@ -464,17 +464,39 @@ static void gst_rtmp_connection_start_write (GstRtmpConnection * self) { GOutputStream *os; - GstBuffer *chunks; + GstBuffer *message, *chunks; + GstRtmpMeta *meta; + GstRtmpChunkStream *cstream; if (self->writing) { return; } - chunks = g_async_queue_try_pop (self->output_queue); - if (!chunks) { + message = g_async_queue_try_pop (self->output_queue); + if (!message) { return; } + meta = gst_buffer_get_rtmp_meta (message); + if (!meta) { + GST_ERROR_OBJECT (self, "No RTMP meta on %" GST_PTR_FORMAT, message); + goto out; + } + + cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream); + if (!cstream) { + GST_ERROR_OBJECT (self, "Failed to get chunk stream for %" GST_PTR_FORMAT, + message); + goto out; + } + + chunks = gst_rtmp_chunk_stream_serialize_all (cstream, message, + self->out_chunk_size); + if (!chunks) { + GST_ERROR_OBJECT (self, "Failed to serialize %" GST_PTR_FORMAT, message); + goto out; + } + self->writing = TRUE; if (self->output_handler) { self->output_handler (self, self->output_handler_user_data); @@ -486,6 +508,9 @@ gst_rtmp_connection_start_write (GstRtmpConnection * self) g_object_ref (self)); gst_buffer_unref (chunks); + +out: + gst_buffer_unref (message); } static void @@ -910,24 +935,10 @@ start_write (gpointer user_data) void gst_rtmp_connection_queue_message (GstRtmpConnection * self, GstBuffer * buffer) { - GstRtmpMeta *meta; - GstRtmpChunkStream *cstream; - GstBuffer *chunks; - g_return_if_fail (GST_IS_RTMP_CONNECTION (self)); g_return_if_fail (GST_IS_BUFFER (buffer)); - meta = gst_buffer_get_rtmp_meta (buffer); - g_return_if_fail (meta); - - cstream = gst_rtmp_chunk_streams_get (self->output_streams, meta->cstream); - g_return_if_fail (cstream); - - chunks = gst_rtmp_chunk_stream_serialize_all (cstream, buffer, - self->out_chunk_size); - g_return_if_fail (chunks); - - g_async_queue_push (self->output_queue, chunks); + g_async_queue_push (self->output_queue, buffer); g_main_context_invoke_full (self->main_context, G_PRIORITY_DEFAULT, start_write, g_object_ref (self), g_object_unref); }