diff --git a/gst/gdp/gstgdppay.c b/gst/gdp/gstgdppay.c index d8bc818dce..708e13790e 100644 --- a/gst/gdp/gstgdppay.c +++ b/gst/gdp/gstgdppay.c @@ -188,23 +188,11 @@ gst_gdp_pay_reset (GstGDPPay * this) gst_caps_unref (this->caps); this->caps = NULL; } - if (this->caps_buf) { - gst_buffer_unref (this->caps_buf); - this->caps_buf = NULL; - } - if (this->tag_buf) { - gst_buffer_unref (this->tag_buf); - this->tag_buf = NULL; - } - if (this->new_segment_buf) { - gst_buffer_unref (this->new_segment_buf); - this->new_segment_buf = NULL; - } - if (this->streamstartid_buf) { - gst_buffer_unref (this->streamstartid_buf); - this->streamstartid_buf = NULL; - } + this->have_caps = FALSE; + this->have_segment = FALSE; + this->have_streamstartid = FALSE; this->sent_streamheader = FALSE; + this->reset_streamheader = FALSE; this->offset = 0; } @@ -308,6 +296,48 @@ no_event: } } +static void +gdp_streamheader_array_append_buffer (GValue * array, GstBuffer * buf) +{ + GValue value = { 0, }; + + g_value_init (&value, GST_TYPE_BUFFER); + gst_value_set_buffer (&value, buf); + gst_value_array_append_value (array, &value); + g_value_unset (&value); +} + +typedef struct +{ + GstGDPPay *gdppay; + GValue *array; +} GstGDPPayAndArray; + +static gboolean +gdp_streamheader_array_store_events (GstPad * pad, GstEvent ** event, + gpointer udata) +{ + GstGDPPayAndArray *gdp_and_array = udata; + GstGDPPay *this = gdp_and_array->gdppay; + GValue *array = gdp_and_array->array; + GstBuffer *buf; + + /* Need to handle caps differently to keep compatibility with 1.0 */ + if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) { + GstCaps *caps; + + gst_event_parse_caps (*event, &caps); + buf = gst_gdp_buffer_from_caps (this, caps); + } else { + buf = gst_gdp_buffer_from_event (this, *event); + } + + GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_HEADER); + gst_gdp_stamp_buffer (this, buf); + gdp_streamheader_array_append_buffer (array, buf); + + return TRUE; +} /* set our caps with streamheader, based on the latest newsegment and caps, * and (possibly) GDP-serialized buffers of the streamheaders on the src pad */ @@ -315,14 +345,15 @@ static GstFlowReturn gst_gdp_pay_reset_streamheader (GstGDPPay * this) { GstCaps *caps; - /* We use copies of these to avoid circular refcounts */ - GstBuffer *new_segment_buf, *caps_buf, *tag_buf, *streamstartid_buf; GstStructure *structure; GstFlowReturn r = GST_FLOW_OK; gboolean version_one_zero = TRUE; + GstGDPPayAndArray gdp_and_array; GValue array = { 0 }; - GValue value = { 0 }; + + gdp_and_array.gdppay = this; + gdp_and_array.array = &array; GST_DEBUG_OBJECT (this, "start"); /* In version 0.2, we didn't need or send new segment or tags */ @@ -330,13 +361,13 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) version_one_zero = FALSE; if (version_one_zero) { - if (!this->new_segment_buf || !this->caps_buf || !this->streamstartid_buf) { + if (!this->have_segment || !this->have_caps || !this->have_streamstartid) { GST_DEBUG_OBJECT (this, "1.0, missing new_segment or caps or stream " "start id, returning"); return GST_FLOW_OK; } } else { - if (!this->caps_buf) { + if (!this->have_caps) { GST_DEBUG_OBJECT (this, "0.2, missing caps, returning"); return GST_FLOW_OK; } @@ -346,51 +377,24 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) * Stamp the buffers with offset and offset_end as well. * We do this here so the offsets match the order the buffers go out in */ g_value_init (&array, GST_TYPE_ARRAY); - if (version_one_zero) { - gst_gdp_stamp_buffer (this, this->streamstartid_buf); - GST_DEBUG_OBJECT (this, "appending copy of stream start id buffer %p", - this->streamstartid_buf); - streamstartid_buf = gst_buffer_copy (this->streamstartid_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, streamstartid_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (streamstartid_buf); - } + gst_pad_sticky_events_foreach (this->sinkpad, + gdp_streamheader_array_store_events, &gdp_and_array); + } else { + GstEvent *capsevent = + gst_pad_get_sticky_event (this->sinkpad, GST_EVENT_CAPS, 0); + if (capsevent) { + GstCaps *caps; + GstBuffer *capsbuffer = NULL; - gst_gdp_stamp_buffer (this, this->caps_buf); - GST_DEBUG_OBJECT (this, "appending copy of caps buffer %p", this->caps_buf); - caps_buf = gst_buffer_copy (this->caps_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, caps_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (caps_buf); + gst_event_parse_caps (capsevent, &caps); + capsbuffer = gst_gdp_buffer_from_caps (this, caps); - if (version_one_zero) { - gst_gdp_stamp_buffer (this, this->new_segment_buf); - GST_DEBUG_OBJECT (this, "1.0, appending copy of new segment buffer %p", - this->new_segment_buf); - new_segment_buf = gst_buffer_copy (this->new_segment_buf); - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, new_segment_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (new_segment_buf); + gst_gdp_stamp_buffer (this, capsbuffer); + gdp_streamheader_array_append_buffer (&array, capsbuffer); - if (this->tag_buf) { - gst_gdp_stamp_buffer (this, this->tag_buf); - GST_DEBUG_OBJECT (this, "1.0, appending current tags buffer %p", - this->tag_buf); - tag_buf = this->tag_buf; - this->tag_buf = NULL; - - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, tag_buf); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); - gst_buffer_unref (tag_buf); + gst_buffer_unref (capsbuffer); + gst_event_unref (capsevent); } } @@ -399,11 +403,8 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) structure = gst_caps_get_structure (this->caps, 0); if (gst_structure_has_field (structure, "streamheader")) { const GValue *sh; - GArray *buffers; - GstBuffer *buffer; - int i; sh = gst_structure_get_value (structure, "streamheader"); @@ -440,10 +441,7 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) GST_BUFFER_OFFSET_END (outbuffer) = GST_BUFFER_OFFSET_NONE; GST_BUFFER_TIMESTAMP (outbuffer) = GST_CLOCK_TIME_NONE; - g_value_init (&value, GST_TYPE_BUFFER); - gst_value_set_buffer (&value, outbuffer); - gst_value_array_append_value (&array, &value); - g_value_unset (&value); + gdp_streamheader_array_append_buffer (&array, outbuffer); gst_buffer_unref (outbuffer); } @@ -478,40 +476,6 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) } } - /* push out these streamheader buffers, then flush our internal queue */ - GST_DEBUG_OBJECT (this, "Pushing GDP stream-start-id buffer %p", - this->streamstartid_buf); - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->streamstartid_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP stream-start-id buffer returned %d", - r); - goto done; - } - GST_DEBUG_OBJECT (this, "Pushing GDP caps buffer %p", this->caps_buf); - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->caps_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP caps buffer returned %d", r); - goto done; - } - GST_DEBUG_OBJECT (this, "Pushing GDP new_segment buffer %p with offset %" - G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, this->new_segment_buf, - GST_BUFFER_OFFSET (this->new_segment_buf), - GST_BUFFER_OFFSET_END (this->new_segment_buf)); - /* we stored these bufs with refcount 1, so make sure we keep a ref */ - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->new_segment_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP newsegment buffer returned %d", r); - goto done; - } - if (this->tag_buf) { - GST_DEBUG_OBJECT (this, "Pushing GDP tag buffer %p", this->tag_buf); - /* we stored these bufs with refcount 1, so make sure we keep a ref */ - r = gst_pad_push (this->srcpad, gst_buffer_ref (this->tag_buf)); - if (r != GST_FLOW_OK) { - GST_WARNING_OBJECT (this, "pushing GDP tag buffer returned %d", r); - goto done; - } - } this->sent_streamheader = TRUE; GST_DEBUG_OBJECT (this, "need to push %d queued buffers", g_list_length (this->queue)); @@ -524,7 +488,6 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) /* delete buffer from queue now */ this->queue = g_list_delete_link (this->queue, this->queue); - /* set caps and push */ r = gst_pad_push (this->srcpad, buffer); if (r != GST_FLOW_OK) { GST_WARNING_OBJECT (this, "pushing queued GDP buffer returned %d", r); @@ -532,6 +495,8 @@ gst_gdp_pay_reset_streamheader (GstGDPPay * this) } } + this->reset_streamheader = FALSE; + done: gst_caps_unref (caps); GST_DEBUG_OBJECT (this, "stop"); @@ -551,7 +516,7 @@ no_buffer: static GstFlowReturn gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer) { - if (this->sent_streamheader) { + if (this->sent_streamheader && !this->reset_streamheader) { GST_LOG_OBJECT (this, "Pushing GDP buffer %p, caps %" GST_PTR_FORMAT, buffer, this->caps); return gst_pad_push (this->srcpad, buffer); @@ -559,12 +524,10 @@ gst_gdp_queue_buffer (GstGDPPay * this, GstBuffer * buffer) /* store it on an internal queue. buffer remains reffed. */ this->queue = g_list_append (this->queue, buffer); - GST_DEBUG_OBJECT (this, "streamheader not sent yet, " + GST_DEBUG_OBJECT (this, "streamheader not sent yet or needs update, " "queued buffer %p, now %d buffers queued", buffer, g_list_length (this->queue)); - gst_gdp_pay_reset_streamheader (this); - return GST_FLOW_OK; } @@ -582,7 +545,7 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) /* we should have received a new_segment before, otherwise it's a bug. * fake one in that case */ - if (!this->new_segment_buf) { + if (!this->have_segment) { GstEvent *event; GstSegment segment; @@ -603,7 +566,7 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); GST_DEBUG_OBJECT (this, "Storing buffer %p as new_segment_buf", outbuffer); - this->new_segment_buf = outbuffer; + this->have_segment = TRUE; } } /* make sure we've received caps before */ @@ -629,6 +592,9 @@ gst_gdp_pay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) GST_BUFFER_TIMESTAMP (outbuffer) = GST_BUFFER_TIMESTAMP (buffer); GST_BUFFER_DURATION (outbuffer) = GST_BUFFER_DURATION (buffer); + if (this->reset_streamheader) + gst_gdp_pay_reset_streamheader (this); + ret = gst_gdp_queue_buffer (this, outbuffer); done: @@ -689,28 +655,16 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) * and not send it on */ switch (GST_EVENT_TYPE (event)) { case GST_EVENT_STREAM_START: - GST_DEBUG_OBJECT (this, "Storing stream start id in buffer %p", - outbuffer); - - if (this->streamstartid_buf) - gst_buffer_unref (this->streamstartid_buf); - this->streamstartid_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); + GST_DEBUG_OBJECT (this, "Received stream start id"); + this->have_streamstartid = TRUE; break; case GST_EVENT_SEGMENT: - GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as new_segment_buf", - outbuffer); - - if (this->new_segment_buf) - gst_buffer_unref (this->new_segment_buf); - this->new_segment_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); + GST_DEBUG_OBJECT (this, "Received segment %" GST_PTR_FORMAT, event); + this->have_segment = TRUE; break; case GST_EVENT_CAPS:{ + GST_DEBUG_OBJECT (this, "Received caps %" GST_PTR_FORMAT, event); + this->have_caps = TRUE; gst_event_parse_caps (event, &caps); gst_buffer_replace (&outbuffer, NULL); if (this->caps == NULL || !gst_caps_is_equal (this->caps, caps)) { @@ -721,34 +675,18 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) goto no_buffer_from_caps; GST_BUFFER_DURATION (outbuffer) = 0; - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - if (this->caps_buf) - gst_buffer_unref (this->caps_buf); - this->caps_buf = outbuffer; - gst_gdp_pay_reset_streamheader (this); } break; } - case GST_EVENT_TAG: - GST_DEBUG_OBJECT (this, "Storing in caps buffer %p as tag_buf", - outbuffer); - - if (this->tag_buf) - gst_buffer_unref (this->tag_buf); - this->tag_buf = outbuffer; - - GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); - gst_gdp_pay_reset_streamheader (this); - break; default: - GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer, - event); - flowret = gst_gdp_queue_buffer (this, outbuffer); - if (flowret != GST_FLOW_OK) - goto push_error; break; } + if (GST_EVENT_IS_STICKY (event)) { + GST_BUFFER_FLAG_SET (outbuffer, GST_BUFFER_FLAG_HEADER); + this->reset_streamheader = TRUE; + } + /* if we have EOS, we should send on EOS ourselves */ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS || GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START) { @@ -757,6 +695,14 @@ gst_gdp_pay_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) ret = gst_pad_push_event (this->srcpad, gst_event_ref (event)); } + if (GST_EVENT_TYPE (event) != GST_EVENT_EOS) { + GST_DEBUG_OBJECT (this, "queuing GDP buffer %p of event %p", outbuffer, + event); + flowret = gst_gdp_queue_buffer (this, outbuffer); + if (flowret != GST_FLOW_OK) + goto push_error; + } + done: gst_event_unref (event); diff --git a/gst/gdp/gstgdppay.h b/gst/gdp/gstgdppay.h index d4433ef775..1dc71aef14 100644 --- a/gst/gdp/gstgdppay.h +++ b/gst/gdp/gstgdppay.h @@ -52,10 +52,11 @@ struct _GstGDPPay GstCaps *caps; /* incoming caps */ - GstBuffer *streamstartid_buf; - GstBuffer *caps_buf; - GstBuffer *new_segment_buf; - GstBuffer *tag_buf; + gboolean have_streamstartid; + gboolean have_caps; + gboolean have_segment; + + gboolean reset_streamheader; gboolean sent_streamheader; /* TRUE after the first streamheaders are sent */ GList *queue; /* list of queued buffers before streamheaders are sent */ diff --git a/tests/check/elements/gdppay.c b/tests/check/elements/gdppay.c index 704c54f2c0..ccf97d697c 100644 --- a/tests/check/elements/gdppay.c +++ b/tests/check/elements/gdppay.c @@ -110,7 +110,7 @@ check_caps_buffer (gint refcount, GstCaps * caps) fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); buffers = g_list_remove (buffers, outbuffer); - ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", 2); + ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", refcount); length = GST_DP_HEADER_LENGTH + (strlen (caps_string) + 1); fail_unless_equals_int (gst_buffer_get_size (outbuffer), length); gst_buffer_unref (outbuffer); @@ -159,14 +159,13 @@ GST_START_TEST (test_audio) fail_unless_equals_int (g_list_length (buffers), 4); /* first buffer is the stream-start event */ - check_stream_start_buffer (2); + check_stream_start_buffer (1); - /* second buffer is the serialized caps; - * the element also holds a ref to it */ - check_caps_buffer (2, caps); + /* second buffer is the serialized caps */ + check_caps_buffer (1, caps); /* third buffer is the serialized new_segment event */ - check_segment_buffer (2); + check_segment_buffer (1); /* the fourth buffer is the GDP buffer for our pushed buffer */ fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); @@ -319,15 +318,15 @@ GST_START_TEST (test_streamheader) gst_caps_unref (sinkcaps); /* first buffer is the stream-start event */ - check_stream_start_buffer (2); + check_stream_start_buffer (1); /* second buffer is the serialized caps; * the element also holds a ref to it */ - check_caps_buffer (2, caps); + check_caps_buffer (1, caps); /* third buffer is the serialized new_segment event; * the element also holds a ref to it */ - check_segment_buffer (2); + check_segment_buffer (1); /* the fourth buffer is the GDP buffer for our pushed buffer */ fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); @@ -500,16 +499,16 @@ GST_START_TEST (test_crc) fail_unless_equals_int (g_list_length (buffers), 4); /* first buffer is the stream-start event */ - check_stream_start_buffer (2); + check_stream_start_buffer (1); /* second buffer is the serialized caps; * the element also holds a ref to it */ - check_caps_buffer (2, caps); + check_caps_buffer (1, caps); /* third buffer is the serialized new_segment event */ fail_if ((outbuffer = (GstBuffer *) buffers->data) == NULL); buffers = g_list_remove (buffers, outbuffer); - ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", 2); + ASSERT_BUFFER_REFCOUNT (outbuffer, "outbuffer", 1); /* verify the header checksum */ /* CRC's start at 58 in the header */