diff --git a/ext/rtmp/gstrtmpsink.c b/ext/rtmp/gstrtmpsink.c index 462ac4d068..781f3bfe3f 100644 --- a/ext/rtmp/gstrtmpsink.c +++ b/ext/rtmp/gstrtmpsink.c @@ -76,6 +76,7 @@ static void gst_rtmp_sink_finalize (GObject * object); static gboolean gst_rtmp_sink_stop (GstBaseSink * sink); static gboolean gst_rtmp_sink_start (GstBaseSink * sink); static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event); +static gboolean gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps); static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * sink, GstBuffer * buf); #define gst_rtmp_sink_parent_class parent_class @@ -114,7 +115,8 @@ gst_rtmp_sink_class_init (GstRTMPSinkClass * klass) gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_sink_start); gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_sink_stop); gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp_sink_render); - gstbasesink_class->event = gst_rtmp_sink_event; + gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp_sink_setcaps); + gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp_sink_event); GST_DEBUG_CATEGORY_INIT (gst_rtmp_sink_debug, "rtmpsink", 0, "RTMP server element"); @@ -200,8 +202,10 @@ gst_rtmp_sink_stop (GstBaseSink * basesink) { GstRTMPSink *sink = GST_RTMP_SINK (basesink); - gst_buffer_replace (&sink->cache, NULL); - + if (sink->header) { + gst_buffer_unref (sink->header); + sink->header = NULL; + } if (sink->rtmp) { RTMP_Close (sink->rtmp); RTMP_Free (sink->rtmp); @@ -219,7 +223,7 @@ static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) { GstRTMPSink *sink = GST_RTMP_SINK (bsink); - GstBuffer *reffed_buf = NULL; + gboolean need_unref = FALSE; GstMapInfo map = GST_MAP_INFO_INIT; if (sink->rtmp == NULL) { @@ -228,6 +232,11 @@ gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) return GST_FLOW_ERROR; } + /* Ignore buffers that are in the stream headers (caps) */ + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) { + return GST_FLOW_OK; + } + if (sink->first) { /* open the connection */ if (!RTMP_IsConnected (sink->rtmp)) { @@ -245,21 +254,14 @@ gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) GST_DEBUG_OBJECT (sink, "Opened connection to %s", sink->rtmp_uri); } - /* FIXME: Parse the first buffer and see if it contains a header plus a packet instead - * of just assuming it's only the header */ - GST_LOG_OBJECT (sink, "Caching first buffer of size %" G_GSIZE_FORMAT - " for concatenation", gst_buffer_get_size (buf)); - gst_buffer_replace (&sink->cache, buf); - sink->first = FALSE; - return GST_FLOW_OK; - } + /* Prepend the header from the caps to the first non header buffer */ + if (sink->header) { + buf = gst_buffer_append (gst_buffer_ref (sink->header), + gst_buffer_ref (buf)); + need_unref = TRUE; + } - if (sink->cache) { - GST_LOG_OBJECT (sink, "Joining 2nd buffer of size %" G_GSIZE_FORMAT - " to cached buf", gst_buffer_get_size (buf)); - gst_buffer_ref (buf); - reffed_buf = buf = gst_buffer_append (sink->cache, buf); - sink->cache = NULL; + sink->first = FALSE; } if (sink->have_write_error) @@ -274,8 +276,8 @@ gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf) goto write_failed; gst_buffer_unmap (buf, &map); - if (reffed_buf) - gst_buffer_unref (reffed_buf); + if (need_unref) + gst_buffer_unref (buf); return GST_FLOW_OK; @@ -284,8 +286,8 @@ write_failed: { GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data")); gst_buffer_unmap (buf, &map); - if (reffed_buf) - gst_buffer_unref (reffed_buf); + if (need_unref) + gst_buffer_unref (buf); sink->have_write_error = TRUE; return GST_FLOW_ERROR; } @@ -391,6 +393,49 @@ gst_rtmp_sink_set_property (GObject * object, guint prop_id, } } +static gboolean +gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps) +{ + GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink); + GstStructure *s; + const GValue *sh; + GArray *buffers; + gint i; + + GST_DEBUG_OBJECT (sink, "caps set to %" GST_PTR_FORMAT, caps); + + /* Clear our current header buffer */ + if (rtmpsink->header) { + gst_buffer_unref (rtmpsink->header); + rtmpsink->header = NULL; + } + + rtmpsink->header = gst_buffer_new (); + + s = gst_caps_get_structure (caps, 0); + + sh = gst_structure_get_value (s, "streamheader"); + buffers = g_value_peek_pointer (sh); + + /* Concatenate all buffers in streamheader into one */ + for (i = 0; i < buffers->len; ++i) { + GValue *val; + GstBuffer *buf; + + val = &g_array_index (buffers, GValue, i); + buf = g_value_peek_pointer (val); + + gst_buffer_ref (buf); + + rtmpsink->header = gst_buffer_append (rtmpsink->header, buf); + } + + GST_DEBUG_OBJECT (rtmpsink, "have %" G_GSIZE_FORMAT " bytes of header data", + gst_buffer_get_size (rtmpsink->header)); + + return TRUE; +} + static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event) { diff --git a/ext/rtmp/gstrtmpsink.h b/ext/rtmp/gstrtmpsink.h index 523f5ae0a4..b9072f5000 100644 --- a/ext/rtmp/gstrtmpsink.h +++ b/ext/rtmp/gstrtmpsink.h @@ -53,7 +53,7 @@ struct _GstRTMPSink { RTMP *rtmp; gchar *rtmp_uri; /* copy of url for librtmp */ - GstBuffer *cache; /* Cached buffer */ + GstBuffer *header; gboolean first; gboolean have_write_error; };