diff --git a/girs/GstBase-1.0.gir b/girs/GstBase-1.0.gir index bcdf9a5391..5e8f315e02 100644 --- a/girs/GstBase-1.0.gir +++ b/girs/GstBase-1.0.gir @@ -2503,6 +2503,18 @@ usage. + + The number of currently queued buffers inside this pad + + + + The number of currently queued bytes inside this pad + + + + The amount of currently queued time inside this pad + + Enables the emission of signals such as #GstAggregatorPad::buffer-consumed diff --git a/subprojects/gstreamer/libs/gst/base/gstaggregator.c b/subprojects/gstreamer/libs/gst/base/gstaggregator.c index b2d6fcbe6f..9ccf066512 100644 --- a/subprojects/gstreamer/libs/gst/base/gstaggregator.c +++ b/subprojects/gstreamer/libs/gst/base/gstaggregator.c @@ -257,6 +257,7 @@ struct _GstAggregatorPadPrivate GQueue data; /* buffers, events and queries */ GstBuffer *clipped_buffer; guint num_buffers; + guint64 num_bytes; GstBuffer *peeked_buffer; /* TRUE if the serialized query is in the proccess of handling at some @@ -527,6 +528,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self, /* We must not have any buffers at all in this pad then as otherwise we * would've had an event/query at the top of the queue */ g_assert (pad->priv->num_buffers == 0); + g_assert (pad->priv->num_bytes == 0); /* Only consider this pad as worth waiting for if it's not already EOS. * There's no point in waiting for buffers on EOS pads */ @@ -538,8 +540,10 @@ gst_aggregator_check_pads_ready (GstAggregator * self, n_ready++; } } else { - GST_TRACE_OBJECT (pad, "Have %" GST_TIME_FORMAT " queued in %u buffers", - GST_TIME_ARGS (pad->priv->time_level), pad->priv->num_buffers); + GST_TRACE_OBJECT (pad, + "Have %" GST_TIME_FORMAT " queued in %u buffers (%" G_GUINT64_FORMAT + " bytes)", GST_TIME_ARGS (pad->priv->time_level), + pad->priv->num_buffers, pad->priv->num_bytes); if (is_live_unlocked (self)) { /* In live mode, having a single pad with buffers is enough to * generate a start time from it. In non-live mode all pads need @@ -1146,6 +1150,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, item = next; } aggpad->priv->num_buffers = 0; + aggpad->priv->num_bytes = 0; aggpad->priv->stream_start_pending = FALSE; gst_buffer_replace (&aggpad->priv->clipped_buffer, NULL); @@ -3226,8 +3231,10 @@ gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) { guint64 max_time_level; - GST_TRACE_OBJECT (aggpad, "Have %" GST_TIME_FORMAT " queued in %u buffers", - GST_TIME_ARGS (aggpad->priv->time_level), aggpad->priv->num_buffers); + GST_TRACE_OBJECT (aggpad, + "Have %" GST_TIME_FORMAT " queued in %u buffers (%" G_GUINT64_FORMAT + " bytes)", GST_TIME_ARGS (aggpad->priv->time_level), + aggpad->priv->num_buffers, aggpad->priv->num_bytes); /* Empty queue always has space */ if (aggpad->priv->num_buffers == 0 && aggpad->priv->clipped_buffer == NULL) @@ -3333,6 +3340,7 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, } apply_buffer (aggpad, buffer, head); aggpad->priv->num_buffers++; + aggpad->priv->num_bytes += gst_buffer_get_size (buffer); buffer = NULL; SRC_BROADCAST (self); break; @@ -3492,11 +3500,17 @@ gst_aggregator_pad_activate_mode_func (GstPad * pad, G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD); #define DEFAULT_PAD_EMIT_SIGNALS FALSE +#define DEFAULT_PAD_CURRENT_LEVEL_TIME 0 +#define DEFAULT_PAD_CURRENT_LEVEL_BUFFERS 0 +#define DEFAULT_PAD_CURRENT_LEVEL_BYTES 0 enum { PAD_PROP_0, PAD_PROP_EMIT_SIGNALS, + PAD_PROP_CURRENT_LEVEL_TIME, + PAD_PROP_CURRENT_LEVEL_BUFFERS, + PAD_PROP_CURRENT_LEVEL_BYTES, }; enum @@ -3575,6 +3589,27 @@ gst_aggregator_pad_get_property (GObject * object, guint prop_id, case PAD_PROP_EMIT_SIGNALS: g_value_set_boolean (value, pad->priv->emit_signals); break; + case PAD_PROP_CURRENT_LEVEL_TIME: + PAD_LOCK (pad); + GST_OBJECT_LOCK (pad); + g_value_set_uint64 (value, pad->priv->time_level); + GST_OBJECT_UNLOCK (pad); + PAD_UNLOCK (pad); + break; + case PAD_PROP_CURRENT_LEVEL_BUFFERS: + PAD_LOCK (pad); + GST_OBJECT_LOCK (pad); + g_value_set_uint64 (value, pad->priv->num_buffers); + GST_OBJECT_UNLOCK (pad); + PAD_UNLOCK (pad); + break; + case PAD_PROP_CURRENT_LEVEL_BYTES: + PAD_LOCK (pad); + GST_OBJECT_LOCK (pad); + g_value_set_uint64 (value, pad->priv->num_bytes); + GST_OBJECT_UNLOCK (pad); + PAD_UNLOCK (pad); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3619,6 +3654,46 @@ gst_aggregator_pad_class_init (GstAggregatorPadClass * klass) g_param_spec_boolean ("emit-signals", "Emit signals", "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAggregatorPad:current-level-time: + * + * The amount of currently queued time inside this pad + * + * Since: 1.28 + */ + g_object_class_install_property (gobject_class, PAD_PROP_CURRENT_LEVEL_TIME, + g_param_spec_uint64 ("current-level-time", "Current Level Time", + "The amount of currently queued time", 0, G_MAXUINT64, + DEFAULT_PAD_CURRENT_LEVEL_TIME, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAggregatorPad:current-level-buffers: + * + * The number of currently queued buffers inside this pad + * + * Since: 1.28 + */ + g_object_class_install_property (gobject_class, + PAD_PROP_CURRENT_LEVEL_BUFFERS, + g_param_spec_uint64 ("current-level-buffers", "Current Level Buffers", + "The number of currently queued buffers", 0, G_MAXUINT64, + DEFAULT_PAD_CURRENT_LEVEL_BUFFERS, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAggregatorPad:current-level-bytes: + * + * The number of currently queued bytes inside this pad + * + * Since: 1.28 + */ + g_object_class_install_property (gobject_class, PAD_PROP_CURRENT_LEVEL_BYTES, + g_param_spec_uint64 ("current-level-bytes", "Current Level Bytes", + "The number of currently queued bytes", 0, G_MAXUINT64, + DEFAULT_PAD_CURRENT_LEVEL_BYTES, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); } static void @@ -3642,8 +3717,10 @@ static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer, gboolean dequeued) { - if (dequeued) + if (dequeued) { pad->priv->num_buffers--; + pad->priv->num_bytes -= gst_buffer_get_size (buffer); + } if (buffer && pad->priv->emit_signals) { g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED], @@ -3680,14 +3757,24 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) } if (aggclass->clip) { + GstBuffer *buffer_ref = gst_buffer_ref (buffer); + GST_TRACE_OBJECT (pad, "Clipping: %" GST_PTR_FORMAT, buffer); buffer = aggclass->clip (self, pad, buffer); if (buffer == NULL) { - gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE); + gst_aggregator_pad_buffer_consumed (pad, buffer_ref, TRUE); GST_TRACE_OBJECT (pad, "Clipping consumed the buffer"); + } else { + g_assert (gst_buffer_get_size (buffer) <= + gst_buffer_get_size (buffer_ref)); + + pad->priv->num_bytes -= + gst_buffer_get_size (buffer_ref) - gst_buffer_get_size (buffer); } + + gst_buffer_unref (buffer_ref); } pad->priv->clipped_buffer = buffer;