From 68cf7a0df19a8db3ec16b9879effcd4414876df6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 11 Apr 2025 17:32:29 +0300 Subject: [PATCH] appsink: Add in/out/dropped and silent properties This allows tracking how many buffers the appsink has processed so far, similar to the same properties on videorate / audiorate. Part-of: --- girs/GstApp-1.0.gir | 16 +++ .../gst-libs/gst/app/gstappsink.c | 130 ++++++++++++++++++ 2 files changed, 146 insertions(+) diff --git a/girs/GstApp-1.0.gir b/girs/GstApp-1.0.gir index 1ae7a56954..cc45fa1810 100644 --- a/girs/GstApp-1.0.gir +++ b/girs/GstApp-1.0.gir @@ -927,12 +927,20 @@ condition. Use "leaky-type" property instead. + + Number of buffers that were dropped. + + + + Number of input buffers that were queued. + + When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsink will drop any buffers that are pushed into it once its internal queue is @@ -952,6 +960,14 @@ buffers. Maximum total duration of data in the queue (0 = unlimited) + + Number of output buffers that were dequeued. + + + + Don't emit notify for input, output and dropped buffers. + + Wait for all buffers to be processed after receiving an EOS. diff --git a/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c b/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c index 2c48dc855b..321aaeaa73 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c +++ b/subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c @@ -121,6 +121,8 @@ struct _GstAppSinkPrivate GstAppSinkWaitStatus wait_status; GstQueueStatusInfo queue_status_info; GstAppLeakyType leaky_type; + guint64 in, out, dropped; + gboolean silent; GCond cond; GMutex mutex; @@ -175,6 +177,7 @@ enum #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0 #define DEFAULT_PROP_CURRENT_LEVEL_TIME 0 #define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE +#define DEFAULT_SILENT TRUE enum { @@ -192,6 +195,10 @@ enum PROP_CURRENT_LEVEL_BUFFERS, PROP_CURRENT_LEVEL_TIME, PROP_LEAKY_TYPE, + PROP_IN, + PROP_OUT, + PROP_DROPPED, + PROP_SILENT, PROP_LAST }; @@ -397,6 +404,52 @@ gst_app_sink_class_init (GstAppSinkClass * klass) G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS)); + /** + * GstAppSink:in: + * + * Number of input buffers that were queued. + * + * Since: 1.28 + */ + g_object_class_install_property (gobject_class, PROP_IN, + g_param_spec_uint64 ("in", "In", + "Number of input buffers", 0, G_MAXUINT64, 0, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * GstAppSink:out: + * + * Number of output buffers that were dequeued. + * + * Since: 1.28 + */ + g_object_class_install_property (gobject_class, PROP_OUT, + g_param_spec_uint64 ("out", "Out", "Number of output buffers", 0, + G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * GstAppSink:dropped: + * + * Number of buffers that were dropped. + * + * Since: 1.28 + */ + g_object_class_install_property (gobject_class, PROP_DROPPED, + g_param_spec_uint64 ("dropped", "Dropped", "Number of dropped buffers", 0, + G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAppSink:silent: + * + * Don't emit notify for input, output and dropped buffers. + * + * Since: 1.28 + */ + g_object_class_install_property (gobject_class, PROP_SILENT, + g_param_spec_boolean ("silent", "silent", + "Don't emit notify for input, output and dropped buffers", + DEFAULT_SILENT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_PLAYING)); + /** * GstAppSink::eos: * @appsink: the appsink element that emitted the signal @@ -699,6 +752,7 @@ gst_app_sink_init (GstAppSink * appsink) priv->buffer_lists_supported = DEFAULT_PROP_BUFFER_LIST; priv->wait_status = NOONE_WAITING; priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE; + priv->silent = DEFAULT_SILENT; } static void @@ -783,6 +837,9 @@ gst_app_sink_set_property (GObject * object, guint prop_id, case PROP_LEAKY_TYPE: gst_app_sink_set_leaky_type (appsink, g_value_get_enum (value)); break; + case PROP_SILENT: + appsink->priv->silent = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -845,6 +902,24 @@ gst_app_sink_get_property (GObject * object, guint prop_id, GValue * value, case PROP_LEAKY_TYPE: g_value_set_enum (value, gst_app_sink_get_leaky_type (appsink)); break; + case PROP_IN: + g_mutex_lock (&appsink->priv->mutex); + g_value_set_uint64 (value, appsink->priv->in); + g_mutex_unlock (&appsink->priv->mutex); + break; + case PROP_OUT: + g_mutex_lock (&appsink->priv->mutex); + g_value_set_uint64 (value, appsink->priv->out); + g_mutex_unlock (&appsink->priv->mutex); + break; + case PROP_DROPPED: + g_mutex_lock (&appsink->priv->mutex); + g_value_set_uint64 (value, appsink->priv->dropped); + g_mutex_unlock (&appsink->priv->mutex); + break; + case PROP_SILENT: + g_value_set_boolean (value, appsink->priv->silent); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -897,6 +972,7 @@ gst_app_sink_flush_unlocked (GstAppSink * appsink) gst_caps_replace (&priv->last_caps, NULL); g_cond_signal (&priv->cond); + priv->in = priv->out = priv->dropped = 0; } static gboolean @@ -917,6 +993,7 @@ gst_app_sink_start (GstBaseSink * psink) gst_sample_set_buffer_list (priv->sample, NULL); gst_sample_set_caps (priv->sample, NULL); gst_sample_set_segment (priv->sample, NULL); + priv->in = priv->out = priv->dropped = 0; g_mutex_unlock (&priv->mutex); return TRUE; @@ -944,8 +1021,15 @@ gst_app_sink_stop (GstBaseSink * psink) gst_sample_set_buffer_list (priv->sample, NULL); gst_sample_set_caps (priv->sample, NULL); gst_sample_set_segment (priv->sample, NULL); + priv->in = priv->out = priv->dropped = 0; g_mutex_unlock (&priv->mutex); + if (!priv->silent) { + g_object_notify (G_OBJECT (appsink), "in"); + g_object_notify (G_OBJECT (appsink), "out"); + g_object_notify (G_OBJECT (appsink), "dropped"); + } + return TRUE; } @@ -1046,6 +1130,12 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event) GST_DEBUG_OBJECT (appsink, "received FLUSH_STOP"); gst_app_sink_flush_unlocked (appsink); g_mutex_unlock (&priv->mutex); + + if (!priv->silent) { + g_object_notify (G_OBJECT (appsink), "in"); + g_object_notify (G_OBJECT (appsink), "out"); + g_object_notify (G_OBJECT (appsink), "dropped"); + } break; default: break; @@ -1235,7 +1325,18 @@ restart: /* we need to drop the oldest buffer/list and try again */ if ((old = dequeue_buffer (appsink))) { GST_DEBUG_OBJECT (appsink, "dropping old buffer/list %p", old); + if (GST_IS_BUFFER_LIST (old)) + priv->dropped += gst_buffer_list_length (GST_BUFFER_LIST_CAST (old)); + else + priv->dropped += 1; + gst_mini_object_unref (old); + + if (!priv->silent) { + g_mutex_unlock (&priv->mutex); + g_object_notify (G_OBJECT (appsink), "dropped"); + g_mutex_lock (&priv->mutex); + } } } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) { goto dropped; @@ -1268,6 +1369,11 @@ restart: } } /* we need to ref the buffer/list when pushing it in the queue */ + if (is_list) + priv->in += gst_buffer_list_length (GST_BUFFER_LIST_CAST (data)); + else + priv->in += 1; + gst_vec_deque_push_tail (priv->queue, gst_mini_object_ref (data)); gst_queue_status_info_push (&priv->queue_status_info, data, &priv->last_segment, GST_OBJECT_CAST (appsink)); @@ -1280,6 +1386,10 @@ restart: callbacks = callbacks_ref (priv->callbacks); g_mutex_unlock (&priv->mutex); + if (!priv->silent) { + g_object_notify (G_OBJECT (appsink), "in"); + } + if (callbacks && callbacks->callbacks.new_sample) { ret = callbacks->callbacks.new_sample (appsink, callbacks->user_data); } else { @@ -1305,7 +1415,18 @@ stopping: dropped: { GST_DEBUG_OBJECT (appsink, "dropped new buffer/list %p, we are full", data); + + if (is_list) + priv->dropped += gst_buffer_list_length (GST_BUFFER_LIST_CAST (data)); + else + priv->dropped += 1; + g_mutex_unlock (&priv->mutex); + + if (!priv->silent) { + g_object_notify (G_OBJECT (appsink), "dropped"); + } + return ret; } } @@ -2324,9 +2445,12 @@ gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout) obj = dequeue_object (appsink); + gboolean notify_out = FALSE; /* convert buffer and buffer list to sample */ if (GST_IS_BUFFER (obj)) { GST_DEBUG_OBJECT (appsink, "we have a buffer %p", obj); + priv->out += 1; + notify_out = !priv->silent; priv->sample = gst_sample_make_writable (priv->sample); gst_sample_set_buffer_list (priv->sample, NULL); gst_sample_set_buffer (priv->sample, GST_BUFFER_CAST (obj)); @@ -2334,6 +2458,8 @@ gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout) gst_mini_object_unref (obj); } else if (GST_IS_BUFFER_LIST (obj)) { GST_DEBUG_OBJECT (appsink, "we have a list %p", obj); + priv->out += gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)); + notify_out = !priv->silent; priv->sample = gst_sample_make_writable (priv->sample); gst_sample_set_buffer (priv->sample, NULL); gst_sample_set_buffer_list (priv->sample, GST_BUFFER_LIST_CAST (obj)); @@ -2348,6 +2474,10 @@ gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout) g_mutex_unlock (&priv->mutex); + if (notify_out) { + g_object_notify (G_OBJECT (appsink), "out"); + } + return ret; /* special conditions */