diff --git a/gst-libs/gst/app/gstappsink.c b/gst-libs/gst/app/gstappsink.c index 70a3d79302..bd5cc8696a 100644 --- a/gst-libs/gst/app/gstappsink.c +++ b/gst-libs/gst/app/gstappsink.c @@ -965,9 +965,21 @@ gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter) static gboolean gst_app_sink_query (GstBaseSink * bsink, GstQuery * query) { + GstAppSink *appsink = GST_APP_SINK_CAST (bsink); + GstAppSinkPrivate *priv = appsink->priv; gboolean ret; switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_DRAIN: + { + g_mutex_lock (&priv->mutex); + GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed"); + while (priv->num_buffers > 0 && priv->preroll_buffer) + g_cond_wait (&priv->cond, &priv->mutex); + g_mutex_unlock (&priv->mutex); + ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query); + break; + } case GST_QUERY_SEEKING:{ GstFormat fmt; diff --git a/tests/check/elements/appsink.c b/tests/check/elements/appsink.c index c3547ee068..3bf6138e9f 100644 --- a/tests/check/elements/appsink.c +++ b/tests/check/elements/appsink.c @@ -553,6 +553,114 @@ GST_START_TEST (test_do_not_care_preroll) GST_END_TEST; +typedef struct +{ + GMutex mutex; + GCond cond; + GstAppSink *appsink; + gboolean check; +} TestQeuryDrainContext; + +static gpointer +my_app_thread (TestQeuryDrainContext * ctx) +{ + GstSample *pulled_preroll = NULL; + GstSample *pulled_sample = NULL; + + /* Wait for the query to reach appsink. */ + g_mutex_lock (&ctx->mutex); + while (!ctx->check) + g_cond_wait (&ctx->cond, &ctx->mutex); + g_mutex_unlock (&ctx->mutex); + + pulled_preroll = gst_app_sink_pull_preroll (ctx->appsink); + fail_unless (pulled_preroll); + gst_sample_unref (pulled_preroll); + + pulled_sample = gst_app_sink_pull_sample (ctx->appsink); + fail_unless (pulled_sample); + gst_sample_unref (pulled_sample); + + pulled_sample = gst_app_sink_pull_sample (ctx->appsink); + fail_unless (pulled_sample); + gst_sample_unref (pulled_sample); + + return NULL; +} + +static GstPadProbeReturn +query_handler (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info); + TestQeuryDrainContext *ctx = (TestQeuryDrainContext *) user_data; + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_DRAIN: + { + if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_PUSH) { + g_mutex_lock (&ctx->mutex); + ctx->check = TRUE; + g_cond_signal (&ctx->cond); + g_mutex_unlock (&ctx->mutex); + } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_PULL) { + /* Check that there is no pending buffers when drain query is done. */ + fail_if (gst_app_sink_try_pull_preroll (ctx->appsink, 0)); + fail_if (gst_app_sink_try_pull_sample (ctx->appsink, 0)); + } + break; + } + default: + break; + } + return GST_PAD_PROBE_OK; +} + +GST_START_TEST (test_query_drain) +{ + GstElement *sink = NULL; + GstBuffer *buffer = NULL; + GstPad *sinkpad = NULL; + GThread *thread = NULL; + GstQuery *query = NULL; + TestQeuryDrainContext ctx = { 0 }; + + sink = setup_appsink (); + + g_mutex_init (&ctx.mutex); + g_cond_init (&ctx.cond); + ctx.appsink = GST_APP_SINK (sink); + ctx.check = FALSE; + + sinkpad = gst_element_get_static_pad (sink, "sink"); + gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, + query_handler, (gpointer) & ctx, NULL); + gst_object_unref (sinkpad); + + ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC); + + buffer = gst_buffer_new_and_alloc (4); + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + + buffer = gst_buffer_new_and_alloc (4); + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + + thread = g_thread_new ("appthread", (GThreadFunc) my_app_thread, &ctx); + fail_unless (thread != NULL); + + query = gst_query_new_drain (); + fail_unless (gst_pad_peer_query (mysrcpad, query)); + gst_query_unref (query); + + g_thread_join (thread); + + g_mutex_clear (&ctx.mutex); + g_cond_clear (&ctx.cond); + + ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + cleanup_appsink (sink); +} + +GST_END_TEST; + static Suite * appsink_suite (void) { @@ -570,6 +678,7 @@ appsink_suite (void) tcase_add_test (tc_chain, test_buffer_list_signal); tcase_add_test (tc_chain, test_segment); tcase_add_test (tc_chain, test_pull_with_timeout); + tcase_add_test (tc_chain, test_query_drain); tcase_add_test (tc_chain, test_pull_preroll); tcase_add_test (tc_chain, test_do_not_care_preroll);