From a19497ab67cf9517fb12f2bf30e034078d36a958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 2 May 2018 18:11:58 +0300 Subject: [PATCH] appsrc/sink: Fix optimization for only signalling waiters if someone is actually waiting It is possible that both application and the stream are waiting currently, if for example the following happens: 1) app is waiting because no buffer in appsink 2) appsink providing a buffer and waking up app 3) appsink getting another buffer and waiting because it's full now 4) app thread getting back control Previously step 4 would overwrite that the appsink is currently waiting, so it would never be signalled again. https://bugzilla.gnome.org/show_bug.cgi?id=795551 --- gst-libs/gst/app/gstappsink.c | 36 +++++++++++++++++------------------ gst-libs/gst/app/gstappsrc.c | 18 +++++++++--------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/gst-libs/gst/app/gstappsink.c b/gst-libs/gst/app/gstappsink.c index ab662f5243..4d8be0d696 100644 --- a/gst-libs/gst/app/gstappsink.c +++ b/gst-libs/gst/app/gstappsink.c @@ -75,9 +75,9 @@ typedef enum { - NOONE_WAITING, - STREAM_WAITING, /* streaming thread is waiting for application thread */ - APP_WAITING, /* application thread is waiting for streaming thread */ + NOONE_WAITING = 0, + STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */ + APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ } GstAppSinkWaitStatus; struct _GstAppSinkPrivate @@ -731,9 +731,9 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event) * consumed, which is a bit confusing for the application */ while (priv->num_buffers > 0 && !priv->flushing && priv->wait_on_eos) { - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; } if (priv->flushing) emit = FALSE; @@ -781,7 +781,7 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer) GST_DEBUG_OBJECT (appsink, "setting preroll buffer %p", buffer); gst_buffer_replace (&priv->preroll_buffer, buffer); - if (priv->wait_status == APP_WAITING) + if ((priv->wait_status & APP_WAITING)) g_cond_signal (&priv->cond); emit = priv->emit_signals; @@ -902,9 +902,9 @@ restart: } /* wait for a buffer to be removed or flush */ - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; if (priv->flushing) goto flushing; @@ -914,7 +914,7 @@ restart: gst_queue_array_push_tail (priv->queue, gst_mini_object_ref (data)); priv->num_buffers++; - if (priv->wait_status == APP_WAITING) + if ((priv->wait_status & APP_WAITING)) g_cond_signal (&priv->cond); emit = priv->emit_signals; @@ -1012,9 +1012,9 @@ gst_app_sink_query (GstBaseSink * bsink, GstQuery * query) g_mutex_lock (&priv->mutex); GST_DEBUG_OBJECT (appsink, "waiting buffers to be consumed"); while (priv->num_buffers > 0 || priv->preroll_buffer) { - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; } g_mutex_unlock (&priv->mutex); ret = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query); @@ -1528,14 +1528,14 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout) /* nothing to return, wait */ GST_DEBUG_OBJECT (appsink, "waiting for the preroll buffer"); - priv->wait_status = APP_WAITING; + priv->wait_status |= APP_WAITING; if (timeout_valid) { if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time)) goto expired; } else { g_cond_wait (&priv->cond, &priv->mutex); } - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; } sample = gst_sample_new (priv->preroll_buffer, priv->preroll_caps, @@ -1550,7 +1550,7 @@ gst_app_sink_try_pull_preroll (GstAppSink * appsink, GstClockTime timeout) expired: { GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL"); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; g_mutex_unlock (&priv->mutex); return NULL; } @@ -1626,14 +1626,14 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) /* nothing to return, wait */ GST_DEBUG_OBJECT (appsink, "waiting for a buffer"); - priv->wait_status = APP_WAITING; + priv->wait_status |= APP_WAITING; if (timeout_valid) { if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time)) goto expired; } else { g_cond_wait (&priv->cond, &priv->mutex); } - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; } obj = dequeue_buffer (appsink); @@ -1652,7 +1652,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) } gst_mini_object_unref (obj); - if (priv->wait_status == STREAM_WAITING) + if ((priv->wait_status & STREAM_WAITING)) g_cond_signal (&priv->cond); g_mutex_unlock (&priv->mutex); @@ -1663,7 +1663,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout) expired: { GST_DEBUG_OBJECT (appsink, "timeout expired, return NULL"); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; g_mutex_unlock (&priv->mutex); return NULL; } diff --git a/gst-libs/gst/app/gstappsrc.c b/gst-libs/gst/app/gstappsrc.c index 1328893e8b..541eb60d50 100644 --- a/gst-libs/gst/app/gstappsrc.c +++ b/gst-libs/gst/app/gstappsrc.c @@ -103,9 +103,9 @@ typedef enum { - NOONE_WAITING, - STREAM_WAITING, /* streaming thread is waiting for application thread */ - APP_WAITING, /* application thread is waiting for streaming thread */ + NOONE_WAITING = 0, + STREAM_WAITING = 1 << 0, /* streaming thread is waiting for application thread */ + APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ } GstAppSrcWaitStatus; struct _GstAppSrcPrivate @@ -1242,7 +1242,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, priv->offset += buf_size; /* signal that we removed an item */ - if (priv->wait_status == APP_WAITING) + if ((priv->wait_status & APP_WAITING)) g_cond_broadcast (&priv->cond); /* see if we go lower than the empty-percent */ @@ -1277,9 +1277,9 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, goto eos; /* nothing to return, wait a while for new data or flushing. */ - priv->wait_status = STREAM_WAITING; + priv->wait_status |= STREAM_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~STREAM_WAITING; } g_mutex_unlock (&priv->mutex); return ret; @@ -1840,9 +1840,9 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, GST_DEBUG_OBJECT (appsrc, "waiting for free space"); /* we are filled, wait until a buffer gets popped or when we * flush. */ - priv->wait_status = APP_WAITING; + priv->wait_status |= APP_WAITING; g_cond_wait (&priv->cond, &priv->mutex); - priv->wait_status = NOONE_WAITING; + priv->wait_status &= ~APP_WAITING; } else { /* no need to wait for free space, we just pump more data into the * queue hoping that the caller reacts to the enough-data signal and @@ -1867,7 +1867,7 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer, priv->queued_bytes += gst_buffer_get_size (buffer); } - if (priv->wait_status == STREAM_WAITING) + if ((priv->wait_status & STREAM_WAITING)) g_cond_broadcast (&priv->cond); g_mutex_unlock (&priv->mutex);