aggregator: expose current-level-* properties on sink pads

As aggregator internally queues data (up to latency), those properties
are helpful to monitor queue levels in the complete pipeline.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/8731>
This commit is contained in:
Mathieu Duponchelle 2025-03-31 15:39:14 +02:00 committed by GStreamer Marge Bot
parent 4fc8773371
commit d1b4104cbc
2 changed files with 105 additions and 6 deletions

View File

@ -2503,6 +2503,18 @@ usage.</doc>
</instance-parameter> </instance-parameter>
</parameters> </parameters>
</method> </method>
<property name="current-level-buffers" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gstreamer/libs/gst/base/gstaggregator.c">The number of currently queued buffers inside this pad</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="current-level-bytes" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gstreamer/libs/gst/base/gstaggregator.c">The number of currently queued bytes inside this pad</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="current-level-time" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gstreamer/libs/gst/base/gstaggregator.c">The amount of currently queued time inside this pad</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="emit-signals" version="1.16" writable="1" transfer-ownership="none" default-value="FALSE"> <property name="emit-signals" version="1.16" writable="1" transfer-ownership="none" default-value="FALSE">
<doc xml:space="preserve" filename="../subprojects/gstreamer/libs/gst/base/gstaggregator.c">Enables the emission of signals such as #GstAggregatorPad::buffer-consumed</doc> <doc xml:space="preserve" filename="../subprojects/gstreamer/libs/gst/base/gstaggregator.c">Enables the emission of signals such as #GstAggregatorPad::buffer-consumed</doc>
<type name="gboolean" c:type="gboolean"/> <type name="gboolean" c:type="gboolean"/>

View File

@ -257,6 +257,7 @@ struct _GstAggregatorPadPrivate
GQueue data; /* buffers, events and queries */ GQueue data; /* buffers, events and queries */
GstBuffer *clipped_buffer; GstBuffer *clipped_buffer;
guint num_buffers; guint num_buffers;
guint64 num_bytes;
GstBuffer *peeked_buffer; GstBuffer *peeked_buffer;
/* TRUE if the serialized query is in the proccess of handling at some /* TRUE if the serialized query is in the proccess of handling at some
@ -527,6 +528,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
/* We must not have any buffers at all in this pad then as otherwise we /* We must not have any buffers at all in this pad then as otherwise we
* would've had an event/query at the top of the queue */ * would've had an event/query at the top of the queue */
g_assert (pad->priv->num_buffers == 0); g_assert (pad->priv->num_buffers == 0);
g_assert (pad->priv->num_bytes == 0);
/* Only consider this pad as worth waiting for if it's not already EOS. /* Only consider this pad as worth waiting for if it's not already EOS.
* There's no point in waiting for buffers on EOS pads */ * There's no point in waiting for buffers on EOS pads */
@ -538,8 +540,10 @@ gst_aggregator_check_pads_ready (GstAggregator * self,
n_ready++; n_ready++;
} }
} else { } else {
GST_TRACE_OBJECT (pad, "Have %" GST_TIME_FORMAT " queued in %u buffers", GST_TRACE_OBJECT (pad,
GST_TIME_ARGS (pad->priv->time_level), pad->priv->num_buffers); "Have %" GST_TIME_FORMAT " queued in %u buffers (%" G_GUINT64_FORMAT
" bytes)", GST_TIME_ARGS (pad->priv->time_level),
pad->priv->num_buffers, pad->priv->num_bytes);
if (is_live_unlocked (self)) { if (is_live_unlocked (self)) {
/* In live mode, having a single pad with buffers is enough to /* In live mode, having a single pad with buffers is enough to
* generate a start time from it. In non-live mode all pads need * generate a start time from it. In non-live mode all pads need
@ -1146,6 +1150,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
item = next; item = next;
} }
aggpad->priv->num_buffers = 0; aggpad->priv->num_buffers = 0;
aggpad->priv->num_bytes = 0;
aggpad->priv->stream_start_pending = FALSE; aggpad->priv->stream_start_pending = FALSE;
gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL);
@ -3226,8 +3231,10 @@ gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad)
{ {
guint64 max_time_level; guint64 max_time_level;
GST_TRACE_OBJECT (aggpad, "Have %" GST_TIME_FORMAT " queued in %u buffers", GST_TRACE_OBJECT (aggpad,
GST_TIME_ARGS (aggpad->priv->time_level), aggpad->priv->num_buffers); "Have %" GST_TIME_FORMAT " queued in %u buffers (%" G_GUINT64_FORMAT
" bytes)", GST_TIME_ARGS (aggpad->priv->time_level),
aggpad->priv->num_buffers, aggpad->priv->num_bytes);
/* Empty queue always has space */ /* Empty queue always has space */
if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL) if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL)
@ -3333,6 +3340,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self,
} }
apply_buffer (aggpad, buffer, head); apply_buffer (aggpad, buffer, head);
aggpad->priv->num_buffers++; aggpad->priv->num_buffers++;
aggpad->priv->num_bytes += gst_buffer_get_size (buffer);
buffer = NULL; buffer = NULL;
SRC_BROADCAST (self); SRC_BROADCAST (self);
break; break;
@ -3492,11 +3500,17 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad,
G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
#define DEFAULT_PAD_EMIT_SIGNALS FALSE #define DEFAULT_PAD_EMIT_SIGNALS FALSE
#define DEFAULT_PAD_CURRENT_LEVEL_TIME 0
#define DEFAULT_PAD_CURRENT_LEVEL_BUFFERS 0
#define DEFAULT_PAD_CURRENT_LEVEL_BYTES 0
enum enum
{ {
PAD_PROP_0, PAD_PROP_0,
PAD_PROP_EMIT_SIGNALS, PAD_PROP_EMIT_SIGNALS,
PAD_PROP_CURRENT_LEVEL_TIME,
PAD_PROP_CURRENT_LEVEL_BUFFERS,
PAD_PROP_CURRENT_LEVEL_BYTES,
}; };
enum enum
@ -3575,6 +3589,27 @@ gst_aggregator_pad_get_property (GObject * object, guint prop_id,
case PAD_PROP_EMIT_SIGNALS: case PAD_PROP_EMIT_SIGNALS:
g_value_set_boolean (value, pad->priv->emit_signals); g_value_set_boolean (value, pad->priv->emit_signals);
break; break;
case PAD_PROP_CURRENT_LEVEL_TIME:
PAD_LOCK (pad);
GST_OBJECT_LOCK (pad);
g_value_set_uint64 (value, pad->priv->time_level);
GST_OBJECT_UNLOCK (pad);
PAD_UNLOCK (pad);
break;
case PAD_PROP_CURRENT_LEVEL_BUFFERS:
PAD_LOCK (pad);
GST_OBJECT_LOCK (pad);
g_value_set_uint64 (value, pad->priv->num_buffers);
GST_OBJECT_UNLOCK (pad);
PAD_UNLOCK (pad);
break;
case PAD_PROP_CURRENT_LEVEL_BYTES:
PAD_LOCK (pad);
GST_OBJECT_LOCK (pad);
g_value_set_uint64 (value, pad->priv->num_bytes);
GST_OBJECT_UNLOCK (pad);
PAD_UNLOCK (pad);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -3619,6 +3654,46 @@ gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
g_param_spec_boolean ("emit-signals", "Emit signals", g_param_spec_boolean ("emit-signals", "Emit signals",
"Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS, "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAggregatorPad:current-level-time:
*
* The amount of currently queued time inside this pad
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PAD_PROP_CURRENT_LEVEL_TIME,
g_param_spec_uint64 ("current-level-time", "Current Level Time",
"The amount of currently queued time", 0, G_MAXUINT64,
DEFAULT_PAD_CURRENT_LEVEL_TIME,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAggregatorPad:current-level-buffers:
*
* The number of currently queued buffers inside this pad
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class,
PAD_PROP_CURRENT_LEVEL_BUFFERS,
g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers",
"The number of currently queued buffers", 0, G_MAXUINT64,
DEFAULT_PAD_CURRENT_LEVEL_BUFFERS,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAggregatorPad:current-level-bytes:
*
* The number of currently queued bytes inside this pad
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PAD_PROP_CURRENT_LEVEL_BYTES,
g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes",
"The number of currently queued bytes", 0, G_MAXUINT64,
DEFAULT_PAD_CURRENT_LEVEL_BYTES,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
} }
static void static void
@ -3642,8 +3717,10 @@ static void
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer, gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer,
gboolean dequeued) gboolean dequeued)
{ {
if (dequeued) if (dequeued) {
pad->priv->num_buffers--; pad->priv->num_buffers--;
pad->priv->num_bytes -= gst_buffer_get_size (buffer);
}
if (buffer && pad->priv->emit_signals) { if (buffer && pad->priv->emit_signals) {
g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
@ -3680,14 +3757,24 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
} }
if (aggclass->clip) { if (aggclass->clip) {
GstBuffer *buffer_ref = gst_buffer_ref (buffer);
GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer); GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer);
buffer = aggclass->clip (self, pad, buffer); buffer = aggclass->clip (self, pad, buffer);
if (buffer == NULL) { if (buffer == NULL) {
gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE); gst_aggregator_pad_buffer_consumed (pad, buffer_ref, TRUE);
GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
} else {
g_assert (gst_buffer_get_size (buffer) <=
gst_buffer_get_size (buffer_ref));
pad->priv->num_bytes -=
gst_buffer_get_size (buffer_ref) - gst_buffer_get_size (buffer);
} }
gst_buffer_unref (buffer_ref);
} }
pad->priv->clipped_buffer = buffer; pad->priv->clipped_buffer = buffer;