diff --git a/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c b/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c index 2012a3947f..42d63a8f37 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c +++ b/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c @@ -154,6 +154,11 @@ struct _GstAppSrcPrivate /* queue up a segment event based on last_segment before * the next buffer of buffer list */ gboolean pending_custom_segment; + /* events that have been delayed until either the caps is configured, ensuring + that no events are sent before CAPS, or buffers are being pushed. */ + GstQueueArray *delayed_events; + /* if a buffer has been pushed yet */ + gboolean pushed_buffer; /* the next buffer that will be queued needs a discont flag * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */ @@ -743,7 +748,9 @@ gst_app_src_init (GstAppSrc * appsrc) g_mutex_init (&priv->mutex); g_cond_init (&priv->cond); priv->queue = gst_queue_array_new (16); + priv->delayed_events = gst_queue_array_new (16); priv->wait_status = NOONE_WAITING; + priv->pushed_buffer = FALSE; priv->size = DEFAULT_PROP_SIZE; priv->duration = DEFAULT_PROP_DURATION; @@ -785,6 +792,9 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps) gst_queue_array_push_tail (priv->queue, requeue_caps); } + gst_queue_array_clear (priv->delayed_events); + priv->pushed_buffer = FALSE; + priv->queued_bytes = 0; priv->queued_buffers = 0; priv->queued_time = 0; @@ -832,6 +842,7 @@ gst_app_src_finalize (GObject * obj) g_mutex_clear (&priv->mutex); g_cond_clear (&priv->cond); gst_queue_array_free (priv->queue); + gst_queue_array_free (priv->delayed_events); g_free (priv->uri); @@ -1027,6 +1038,7 @@ gst_app_src_send_event (GstElement * element, GstEvent * event) if (GST_EVENT_IS_SERIALIZED (event)) { GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event); g_mutex_lock (&priv->mutex); + gst_queue_array_push_tail (priv->queue, event); if ((priv->wait_status & STREAM_WAITING)) @@ -1559,6 +1571,54 @@ gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item) GST_TIME_ARGS (priv->queued_time)); } +/* check if @obj should be send after the CAPS and SEGMENT events */ +static gboolean +needs_segment (GstMiniObject * obj) +{ + if (GST_IS_CAPS (obj)) { + return FALSE; + } else if (GST_IS_EVENT (obj)) { + if (GST_EVENT_TYPE (obj) == GST_EVENT_SEGMENT) + return FALSE; + } + + return TRUE; +} + +static void +ensure_segment (GstAppSrc * appsrc) +{ + GstAppSrcPrivate *priv = appsrc->priv; + GstEvent *seg_event; + + seg_event = gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc), + GST_EVENT_SEGMENT, 0); + + if (!seg_event) { + GST_DEBUG_OBJECT (appsrc, "sending default segment"); + gst_base_src_push_segment (GST_BASE_SRC_CAST (appsrc), &priv->last_segment); + } else { + gst_event_unref (seg_event); + } +} + +static void +push_delayed_events (GstAppSrc * appsrc) +{ + GstAppSrcPrivate *priv = appsrc->priv; + + while (!gst_queue_array_is_empty (priv->delayed_events)) { + GstEvent *event; + + event = gst_queue_array_pop_head (priv->delayed_events); + GST_DEBUG_OBJECT (appsrc, "sending event: %" GST_PTR_FORMAT, event); + + g_mutex_unlock (&priv->mutex); + gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), GST_EVENT (event)); + g_mutex_lock (&priv->mutex); + } +} + static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, GstBuffer ** buf) @@ -1625,6 +1685,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, if (!gst_queue_array_is_empty (priv->queue)) { GstMiniObject *obj = gst_queue_array_pop_head (priv->queue); + if (priv->current_caps && needs_segment (obj)) { + /* need to have sent a segment before sending `obj` */ + ensure_segment (appsrc); + } + if (GST_IS_CAPS (obj)) { GstCaps *next_caps = GST_CAPS (obj); gboolean caps_changed = TRUE; @@ -1645,6 +1710,17 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, if (caps_changed) gst_app_src_do_negotiate (bsrc); + /* sending delayed events which were waiting on the caps */ + if (!gst_queue_array_is_empty (priv->delayed_events)) { + /* need to send a segment before the events */ + ensure_segment (appsrc); + + GST_DEBUG_OBJECT (appsrc, + "sending delayed events after caps: %" GST_PTR_FORMAT, obj); + + push_delayed_events (appsrc); + } + /* Continue checks caps and queue */ continue; } @@ -1662,6 +1738,13 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, priv->need_discont_downstream = FALSE; } + if (!gst_queue_array_is_empty (priv->delayed_events)) { + /* don't keep delaying events if a buffer has been pushed without CAPS */ + GST_DEBUG_OBJECT (appsrc, "push delayed events before buffer"); + push_delayed_events (appsrc); + } + + priv->pushed_buffer = TRUE; *buf = buffer; } else if (GST_IS_BUFFER_LIST (obj)) { GstBufferList *buffer_list; @@ -1681,7 +1764,14 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, priv->need_discont_downstream = FALSE; } + if (!gst_queue_array_is_empty (priv->delayed_events)) { + /* don't keep delaying events if a buffer has been pushed without CAPS */ + GST_DEBUG_OBJECT (appsrc, "push delayed events before buffer"); + push_delayed_events (appsrc); + } + gst_base_src_submit_buffer_list (bsrc, buffer_list); + priv->pushed_buffer = TRUE; *buf = NULL; } else if (GST_IS_EVENT (obj)) { GstEvent *event = GST_EVENT (obj); @@ -1697,7 +1787,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, if (!gst_segment_is_equal (&priv->current_segment, segment)) { GST_DEBUG_OBJECT (appsrc, "Update new segment %" GST_PTR_FORMAT, event); - if (!gst_base_src_new_segment (bsrc, segment)) { + if (!gst_base_src_push_segment (bsrc, segment)) { GST_ERROR_OBJECT (appsrc, "Couldn't set new segment %" GST_PTR_FORMAT, event); gst_event_unref (event); @@ -1708,32 +1798,18 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, gst_event_unref (event); } else { - GstEvent *seg_event; - GstSegment last_segment = priv->last_segment; - /* event is serialized with the buffers flow */ - /* We are about to push an event, release out lock */ - g_mutex_unlock (&priv->mutex); - - seg_event = - gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc), - GST_EVENT_SEGMENT, 0); - if (!seg_event) { - seg_event = gst_event_new_segment (&last_segment); - + if (!priv->current_caps && !priv->pushed_buffer) { GST_DEBUG_OBJECT (appsrc, - "received serialized event before first buffer, push default segment %" - GST_PTR_FORMAT, seg_event); - - gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event); + "did not send caps yet, delay event for now"); + gst_queue_array_push_tail (priv->delayed_events, event); } else { - gst_event_unref (seg_event); + /* We are about to push an event, release out lock */ + g_mutex_unlock (&priv->mutex); + gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event); + g_mutex_lock (&priv->mutex); } - - gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event); - - g_mutex_lock (&priv->mutex); } continue; } else { diff --git a/subprojects/gst-plugins-base/tests/check/elements/appsrc.c b/subprojects/gst-plugins-base/tests/check/elements/appsrc.c index 5dfbae8fb7..f4ed80aeb8 100644 --- a/subprojects/gst-plugins-base/tests/check/elements/appsrc.c +++ b/subprojects/gst-plugins-base/tests/check/elements/appsrc.c @@ -1446,6 +1446,192 @@ GST_START_TEST (test_appsrc_send_custom_event) GST_END_TEST; +enum ExpectedObj +{ + EXPECTED_STREAM_START, + EXPECTED_CAPS, + EXPECTED_SEGMENT, + EXPECTED_CUSTOM, + EXPECTED_BUFFER, +}; + +static enum ExpectedObj expected_obj; + +static gboolean +send_event_before_buffer_event_func (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + GST_LOG ("event %" GST_PTR_FORMAT, event); + + if (expected_obj == EXPECTED_STREAM_START) { + g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_STREAM_START); + expected_obj = EXPECTED_CAPS; + } else if (expected_obj == EXPECTED_CAPS) { + g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_CAPS); + expected_obj = EXPECTED_SEGMENT; + } else if (expected_obj == EXPECTED_SEGMENT) { + g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_SEGMENT); + expected_obj = EXPECTED_CUSTOM; + } else if (expected_obj == EXPECTED_CUSTOM) { + g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_CUSTOM_DOWNSTREAM); + expected_obj = EXPECTED_BUFFER; + } else { + g_assert_not_reached (); + } + + gst_event_unref (event); + return TRUE; +} + +static GstFlowReturn +send_event_before_buffer_chain_func (GstPad * pad, GstObject * parent, + GstBuffer * buf) +{ + GST_LOG ("buffer # %3u", (guint) GST_BUFFER_OFFSET (buf)); + + g_assert_cmpuint (expected_obj, ==, EXPECTED_BUFFER); + + g_mutex_lock (&check_mutex); + done = TRUE; + g_cond_signal (&check_cond); + g_mutex_unlock (&check_mutex); + + gst_buffer_unref (buf); + return GST_FLOW_OK; +} + +/* send a custom event to appsrc before the first buffer */ +GST_START_TEST (test_appsrc_send_event_before_buffer) +{ + GstElement *src; + GstBuffer *buf; + GstCaps *caps; + + src = setup_appsrc (); + g_object_set (src, "format", GST_FORMAT_TIME, NULL); + + ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + + gst_pad_set_event_function (mysinkpad, send_event_before_buffer_event_func); + gst_pad_set_chain_function (mysinkpad, send_event_before_buffer_chain_func); + + expected_obj = EXPECTED_STREAM_START; + + /* send a custom event and then the first buffer */ + gst_element_send_event (src, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("custom", NULL, NULL))); + + caps = gst_caps_from_string ("video/x-raw"); + gst_app_src_set_caps (GST_APP_SRC_CAST (src), caps); + gst_caps_unref (caps); + + buf = gst_buffer_new_and_alloc (2); + GST_BUFFER_OFFSET (buf) = 0; + fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src), + buf) == GST_FLOW_OK); + + g_mutex_lock (&check_mutex); + while (!done) + g_cond_wait (&check_cond, &check_mutex); + g_mutex_unlock (&check_mutex); + + ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_appsrc (src); +} + +GST_END_TEST; + +/* send a custom event to appsrc before the first sample */ +GST_START_TEST (test_appsrc_send_event_before_sample) +{ + GstElement *src; + GstSample *sample; + GstBuffer *buf; + GstCaps *caps; + GstSegment segment; + + src = setup_appsrc (); + g_object_set (src, "format", GST_FORMAT_TIME, NULL); + + ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + + gst_pad_set_event_function (mysinkpad, send_event_before_buffer_event_func); + gst_pad_set_chain_function (mysinkpad, send_event_before_buffer_chain_func); + + expected_obj = EXPECTED_STREAM_START; + + /* send a custom event and then the first sample */ + gst_element_send_event (src, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("custom", NULL, NULL))); + + buf = gst_buffer_new_and_alloc (2); + caps = gst_caps_from_string ("video/x-raw"); + gst_segment_init (&segment, GST_FORMAT_TIME); + segment.start = 5; + + sample = gst_sample_new (buf, caps, &segment, NULL); + fail_unless (gst_app_src_push_sample (GST_APP_SRC_CAST (src), + sample) == GST_FLOW_OK); + + gst_caps_unref (caps); + gst_buffer_unref (buf); + gst_sample_unref (sample); + + g_mutex_lock (&check_mutex); + while (!done) + g_cond_wait (&check_cond, &check_mutex); + g_mutex_unlock (&check_mutex); + + ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_appsrc (src); +} + +GST_END_TEST; + +/* send a custom event to appsrc between the caps and the first buffer */ +GST_START_TEST (test_appsrc_send_event_between_caps_buffer) +{ + GstElement *src; + GstBuffer *buf; + GstCaps *caps; + + src = setup_appsrc (); + g_object_set (src, "format", GST_FORMAT_TIME, NULL); + + ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + + gst_pad_set_event_function (mysinkpad, send_event_before_buffer_event_func); + gst_pad_set_chain_function (mysinkpad, send_event_before_buffer_chain_func); + + expected_obj = EXPECTED_STREAM_START; + + caps = gst_caps_from_string ("video/x-raw"); + gst_app_src_set_caps (GST_APP_SRC_CAST (src), caps); + gst_caps_unref (caps); + + /* send a custom event and then the first buffer */ + gst_element_send_event (src, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("custom", NULL, NULL))); + + buf = gst_buffer_new_and_alloc (2); + GST_BUFFER_OFFSET (buf) = 0; + fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src), + buf) == GST_FLOW_OK); + + g_mutex_lock (&check_mutex); + while (!done) + g_cond_wait (&check_cond, &check_mutex); + g_mutex_unlock (&check_mutex); + + ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_appsrc (src); +} + +GST_END_TEST; + static Suite * appsrc_suite (void) { @@ -1461,6 +1647,9 @@ appsrc_suite (void) tcase_add_test (tc_chain, test_appsrc_custom_segment_twice); tcase_add_test (tc_chain, test_appsrc_limits); tcase_add_test (tc_chain, test_appsrc_send_custom_event); + tcase_add_test (tc_chain, test_appsrc_send_event_before_buffer); + tcase_add_test (tc_chain, test_appsrc_send_event_before_sample); + tcase_add_test (tc_chain, test_appsrc_send_event_between_caps_buffer); if (RUNNING_ON_VALGRIND) tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);