appsink: Add in/out/dropped and silent properties

This allows tracking how many buffers the appsink has processed so far, similar
to the same properties on videorate / audiorate.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/8824>
This commit is contained in:
Sebastian Dröge 2025-04-11 17:32:29 +03:00 committed by GStreamer Marge Bot
parent bdf39c3270
commit 68cf7a0df1
2 changed files with 146 additions and 0 deletions

View File

@ -927,12 +927,20 @@ condition.</doc>
<doc-deprecated xml:space="preserve">Use "leaky-type" property instead.</doc-deprecated> <doc-deprecated xml:space="preserve">Use "leaky-type" property instead.</doc-deprecated>
<type name="gboolean" c:type="gboolean"/> <type name="gboolean" c:type="gboolean"/>
</property> </property>
<property name="dropped" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Number of buffers that were dropped.</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="emit-signals" writable="1" transfer-ownership="none" setter="set_emit_signals" getter="get_emit_signals" default-value="FALSE"> <property name="emit-signals" writable="1" transfer-ownership="none" setter="set_emit_signals" getter="get_emit_signals" default-value="FALSE">
<type name="gboolean" c:type="gboolean"/> <type name="gboolean" c:type="gboolean"/>
</property> </property>
<property name="eos" transfer-ownership="none" default-value="TRUE"> <property name="eos" transfer-ownership="none" default-value="TRUE">
<type name="gboolean" c:type="gboolean"/> <type name="gboolean" c:type="gboolean"/>
</property> </property>
<property name="in" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Number of input buffers that were queued.</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="leaky-type" version="1.28" writable="1" transfer-ownership="none" setter="set_leaky_type" getter="get_leaky_type" default-value="GST_APP_LEAKY_TYPE_NONE"> <property name="leaky-type" version="1.28" writable="1" transfer-ownership="none" setter="set_leaky_type" getter="get_leaky_type" default-value="GST_APP_LEAKY_TYPE_NONE">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsink <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">When set to any other value than GST_APP_LEAKY_TYPE_NONE then the appsink
will drop any buffers that are pushed into it once its internal queue is will drop any buffers that are pushed into it once its internal queue is
@ -952,6 +960,14 @@ buffers.</doc>
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Maximum total duration of data in the queue (0 = unlimited)</doc> <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Maximum total duration of data in the queue (0 = unlimited)</doc>
<type name="guint64" c:type="guint64"/> <type name="guint64" c:type="guint64"/>
</property> </property>
<property name="out" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Number of output buffers that were dequeued.</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="silent" version="1.28" writable="1" transfer-ownership="none" default-value="TRUE">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Don't emit notify for input, output and dropped buffers.</doc>
<type name="gboolean" c:type="gboolean"/>
</property>
<property name="wait-on-eos" version="1.8" writable="1" transfer-ownership="none" setter="set_wait_on_eos" getter="get_wait_on_eos" default-value="TRUE"> <property name="wait-on-eos" version="1.8" writable="1" transfer-ownership="none" setter="set_wait_on_eos" getter="get_wait_on_eos" default-value="TRUE">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Wait for all buffers to be processed after receiving an EOS. <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsink.c">Wait for all buffers to be processed after receiving an EOS.

View File

@ -121,6 +121,8 @@ struct _GstAppSinkPrivate
GstAppSinkWaitStatus wait_status; GstAppSinkWaitStatus wait_status;
GstQueueStatusInfo queue_status_info; GstQueueStatusInfo queue_status_info;
GstAppLeakyType leaky_type; GstAppLeakyType leaky_type;
guint64 in, out, dropped;
gboolean silent;
GCond cond; GCond cond;
GMutex mutex; GMutex mutex;
@ -175,6 +177,7 @@ enum
#define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0 #define DEFAULT_PROP_CURRENT_LEVEL_BUFFERS 0
#define DEFAULT_PROP_CURRENT_LEVEL_TIME 0 #define DEFAULT_PROP_CURRENT_LEVEL_TIME 0
#define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE #define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE
#define DEFAULT_SILENT TRUE
enum enum
{ {
@ -192,6 +195,10 @@ enum
PROP_CURRENT_LEVEL_BUFFERS, PROP_CURRENT_LEVEL_BUFFERS,
PROP_CURRENT_LEVEL_TIME, PROP_CURRENT_LEVEL_TIME,
PROP_LEAKY_TYPE, PROP_LEAKY_TYPE,
PROP_IN,
PROP_OUT,
PROP_DROPPED,
PROP_SILENT,
PROP_LAST PROP_LAST
}; };
@ -397,6 +404,52 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
G_PARAM_STATIC_STRINGS)); G_PARAM_STATIC_STRINGS));
/**
* GstAppSink:in:
*
* Number of input buffers that were queued.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_IN,
g_param_spec_uint64 ("in", "In",
"Number of input buffers", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink:out:
*
* Number of output buffers that were dequeued.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_OUT,
g_param_spec_uint64 ("out", "Out", "Number of output buffers", 0,
G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink:dropped:
*
* Number of buffers that were dropped.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_DROPPED,
g_param_spec_uint64 ("dropped", "Dropped", "Number of dropped buffers", 0,
G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink:silent:
*
* Don't emit notify for input, output and dropped buffers.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_SILENT,
g_param_spec_boolean ("silent", "silent",
"Don't emit notify for input, output and dropped buffers",
DEFAULT_SILENT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_PLAYING));
/** /**
* GstAppSink::eos: * GstAppSink::eos:
* @appsink: the appsink element that emitted the signal * @appsink: the appsink element that emitted the signal
@ -699,6 +752,7 @@ gst_app_sink_init (GstAppSink * appsink)
priv->buffer_lists_supported = DEFAULT_PROP_BUFFER_LIST; priv->buffer_lists_supported = DEFAULT_PROP_BUFFER_LIST;
priv->wait_status = NOONE_WAITING; priv->wait_status = NOONE_WAITING;
priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE; priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
priv->silent = DEFAULT_SILENT;
} }
static void static void
@ -783,6 +837,9 @@ gst_app_sink_set_property (GObject * object, guint prop_id,
case PROP_LEAKY_TYPE: case PROP_LEAKY_TYPE:
gst_app_sink_set_leaky_type (appsink, g_value_get_enum (value)); gst_app_sink_set_leaky_type (appsink, g_value_get_enum (value));
break; break;
case PROP_SILENT:
appsink->priv->silent = g_value_get_boolean (value);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -845,6 +902,24 @@ gst_app_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_LEAKY_TYPE: case PROP_LEAKY_TYPE:
g_value_set_enum (value, gst_app_sink_get_leaky_type (appsink)); g_value_set_enum (value, gst_app_sink_get_leaky_type (appsink));
break; break;
case PROP_IN:
g_mutex_lock (&appsink->priv->mutex);
g_value_set_uint64 (value, appsink->priv->in);
g_mutex_unlock (&appsink->priv->mutex);
break;
case PROP_OUT:
g_mutex_lock (&appsink->priv->mutex);
g_value_set_uint64 (value, appsink->priv->out);
g_mutex_unlock (&appsink->priv->mutex);
break;
case PROP_DROPPED:
g_mutex_lock (&appsink->priv->mutex);
g_value_set_uint64 (value, appsink->priv->dropped);
g_mutex_unlock (&appsink->priv->mutex);
break;
case PROP_SILENT:
g_value_set_boolean (value, appsink->priv->silent);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -897,6 +972,7 @@ gst_app_sink_flush_unlocked (GstAppSink * appsink)
gst_caps_replace (&priv->last_caps, NULL); gst_caps_replace (&priv->last_caps, NULL);
g_cond_signal (&priv->cond); g_cond_signal (&priv->cond);
priv->in = priv->out = priv->dropped = 0;
} }
static gboolean static gboolean
@ -917,6 +993,7 @@ gst_app_sink_start (GstBaseSink * psink)
gst_sample_set_buffer_list (priv->sample, NULL); gst_sample_set_buffer_list (priv->sample, NULL);
gst_sample_set_caps (priv->sample, NULL); gst_sample_set_caps (priv->sample, NULL);
gst_sample_set_segment (priv->sample, NULL); gst_sample_set_segment (priv->sample, NULL);
priv->in = priv->out = priv->dropped = 0;
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
return TRUE; return TRUE;
@ -944,8 +1021,15 @@ gst_app_sink_stop (GstBaseSink * psink)
gst_sample_set_buffer_list (priv->sample, NULL); gst_sample_set_buffer_list (priv->sample, NULL);
gst_sample_set_caps (priv->sample, NULL); gst_sample_set_caps (priv->sample, NULL);
gst_sample_set_segment (priv->sample, NULL); gst_sample_set_segment (priv->sample, NULL);
priv->in = priv->out = priv->dropped = 0;
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsink), "in");
g_object_notify (G_OBJECT (appsink), "out");
g_object_notify (G_OBJECT (appsink), "dropped");
}
return TRUE; return TRUE;
} }
@ -1046,6 +1130,12 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
GST_DEBUG_OBJECT (appsink, "received FLUSH_STOP"); GST_DEBUG_OBJECT (appsink, "received FLUSH_STOP");
gst_app_sink_flush_unlocked (appsink); gst_app_sink_flush_unlocked (appsink);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsink), "in");
g_object_notify (G_OBJECT (appsink), "out");
g_object_notify (G_OBJECT (appsink), "dropped");
}
break; break;
default: default:
break; break;
@ -1235,7 +1325,18 @@ restart:
/* we need to drop the oldest buffer/list and try again */ /* we need to drop the oldest buffer/list and try again */
if ((old = dequeue_buffer (appsink))) { if ((old = dequeue_buffer (appsink))) {
GST_DEBUG_OBJECT (appsink, "dropping old buffer/list %p", old); GST_DEBUG_OBJECT (appsink, "dropping old buffer/list %p", old);
if (GST_IS_BUFFER_LIST (old))
priv->dropped += gst_buffer_list_length (GST_BUFFER_LIST_CAST (old));
else
priv->dropped += 1;
gst_mini_object_unref (old); gst_mini_object_unref (old);
if (!priv->silent) {
g_mutex_unlock (&priv->mutex);
g_object_notify (G_OBJECT (appsink), "dropped");
g_mutex_lock (&priv->mutex);
}
} }
} else if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) { } else if (priv->leaky_type == GST_APP_LEAKY_TYPE_UPSTREAM) {
goto dropped; goto dropped;
@ -1268,6 +1369,11 @@ restart:
} }
} }
/* we need to ref the buffer/list when pushing it in the queue */ /* we need to ref the buffer/list when pushing it in the queue */
if (is_list)
priv->in += gst_buffer_list_length (GST_BUFFER_LIST_CAST (data));
else
priv->in += 1;
gst_vec_deque_push_tail (priv->queue, gst_mini_object_ref (data)); gst_vec_deque_push_tail (priv->queue, gst_mini_object_ref (data));
gst_queue_status_info_push (&priv->queue_status_info, data, gst_queue_status_info_push (&priv->queue_status_info, data,
&priv->last_segment, GST_OBJECT_CAST (appsink)); &priv->last_segment, GST_OBJECT_CAST (appsink));
@ -1280,6 +1386,10 @@ restart:
callbacks = callbacks_ref (priv->callbacks); callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsink), "in");
}
if (callbacks && callbacks->callbacks.new_sample) { if (callbacks && callbacks->callbacks.new_sample) {
ret = callbacks->callbacks.new_sample (appsink, callbacks->user_data); ret = callbacks->callbacks.new_sample (appsink, callbacks->user_data);
} else { } else {
@ -1305,7 +1415,18 @@ stopping:
dropped: dropped:
{ {
GST_DEBUG_OBJECT (appsink, "dropped new buffer/list %p, we are full", data); GST_DEBUG_OBJECT (appsink, "dropped new buffer/list %p, we are full", data);
if (is_list)
priv->dropped += gst_buffer_list_length (GST_BUFFER_LIST_CAST (data));
else
priv->dropped += 1;
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsink), "dropped");
}
return ret; return ret;
} }
} }
@ -2324,9 +2445,12 @@ gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout)
obj = dequeue_object (appsink); obj = dequeue_object (appsink);
gboolean notify_out = FALSE;
/* convert buffer and buffer list to sample */ /* convert buffer and buffer list to sample */
if (GST_IS_BUFFER (obj)) { if (GST_IS_BUFFER (obj)) {
GST_DEBUG_OBJECT (appsink, "we have a buffer %p", obj); GST_DEBUG_OBJECT (appsink, "we have a buffer %p", obj);
priv->out += 1;
notify_out = !priv->silent;
priv->sample = gst_sample_make_writable (priv->sample); priv->sample = gst_sample_make_writable (priv->sample);
gst_sample_set_buffer_list (priv->sample, NULL); gst_sample_set_buffer_list (priv->sample, NULL);
gst_sample_set_buffer (priv->sample, GST_BUFFER_CAST (obj)); gst_sample_set_buffer (priv->sample, GST_BUFFER_CAST (obj));
@ -2334,6 +2458,8 @@ gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout)
gst_mini_object_unref (obj); gst_mini_object_unref (obj);
} else if (GST_IS_BUFFER_LIST (obj)) { } else if (GST_IS_BUFFER_LIST (obj)) {
GST_DEBUG_OBJECT (appsink, "we have a list %p", obj); GST_DEBUG_OBJECT (appsink, "we have a list %p", obj);
priv->out += gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj));
notify_out = !priv->silent;
priv->sample = gst_sample_make_writable (priv->sample); priv->sample = gst_sample_make_writable (priv->sample);
gst_sample_set_buffer (priv->sample, NULL); gst_sample_set_buffer (priv->sample, NULL);
gst_sample_set_buffer_list (priv->sample, GST_BUFFER_LIST_CAST (obj)); gst_sample_set_buffer_list (priv->sample, GST_BUFFER_LIST_CAST (obj));
@ -2348,6 +2474,10 @@ gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout)
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (notify_out) {
g_object_notify (G_OBJECT (appsink), "out");
}
return ret; return ret;
/* special conditions */ /* special conditions */