appsrc: Add in/out/dropped and silent properties

This allows tracking how many buffers the appsrc has processed so far, similar
to the same properties on videorate / audiorate.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/8824>
This commit is contained in:
Sebastian Dröge 2025-04-11 17:33:07 +03:00 committed by GStreamer Marge Bot
parent 68cf7a0df1
commit 295c409115
2 changed files with 147 additions and 1 deletions

View File

@ -2206,6 +2206,10 @@ on outgoing buffers.</doc>
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">The amount of currently queued time inside appsrc.</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="dropped" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">Number of buffers that were dropped.</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="duration" version="1.10" writable="1" transfer-ownership="none" setter="set_duration" getter="get_duration" default-value="18446744073709551615">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">The total duration in nanoseconds of the data stream. If the total duration is known, it
is recommended to configure it with this property.</doc>
@ -2233,6 +2237,10 @@ GstAppSrc::format should be time. However, possibly #GstAppSrc can support
other formats.</doc>
<type name="gboolean" c:type="gboolean"/>
</property>
<property name="in" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">Number of input buffers that were queued.</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="is-live" writable="1" transfer-ownership="none" default-value="FALSE">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">Instruct the source to behave like a live source. This includes that it
will only push out buffers in the PLAYING state.</doc>
@ -2276,6 +2284,14 @@ latency calculations of #GstBaseSrc.</doc>
queue drops below this percentage of max-bytes.</doc>
<type name="guint" c:type="guint"/>
</property>
<property name="out" version="1.28" transfer-ownership="none" default-value="0">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">Number of output buffers that were dequeued.</doc>
<type name="guint64" c:type="guint64"/>
</property>
<property name="silent" version="1.28" writable="1" transfer-ownership="none" default-value="TRUE">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">Don't emit notify for input, output and dropped buffers.</doc>
<type name="gboolean" c:type="gboolean"/>
</property>
<property name="size" writable="1" transfer-ownership="none" setter="set_size" getter="get_size" default-value="-1">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/app/gstappsrc.c">The total size in bytes of the data stream. If the total size is known, it
is recommended to configure it with this property.</doc>

View File

@ -195,6 +195,9 @@ struct _GstAppSrcPrivate
GstAppLeakyType leaky_type;
Callbacks *callbacks;
guint64 in, out, dropped;
gboolean silent;
};
GST_DEBUG_CATEGORY_STATIC (app_src_debug);
@ -234,6 +237,7 @@ enum
#define DEFAULT_PROP_DURATION GST_CLOCK_TIME_NONE
#define DEFAULT_PROP_HANDLE_SEGMENT_CHANGE FALSE
#define DEFAULT_PROP_LEAKY_TYPE GST_APP_LEAKY_TYPE_NONE
#define DEFAULT_SILENT TRUE
enum
{
@ -257,6 +261,10 @@ enum
PROP_DURATION,
PROP_HANDLE_SEGMENT_CHANGE,
PROP_LEAKY_TYPE,
PROP_IN,
PROP_OUT,
PROP_DROPPED,
PROP_SILENT,
PROP_LAST
};
@ -575,6 +583,50 @@ gst_app_src_class_init (GstAppSrcClass * klass)
G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc:in:
*
* Number of input buffers that were queued.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_IN,
g_param_spec_uint64 ("in", "In",
"Number of input buffers", 0, G_MAXUINT64, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc:out:
*
* Number of output buffers that were dequeued.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_OUT,
g_param_spec_uint64 ("out", "Out", "Number of output buffers", 0,
G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc:dropped:
*
* Number of buffers that were dropped.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_DROPPED,
g_param_spec_uint64 ("dropped", "Dropped", "Number of dropped buffers", 0,
G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc:silent:
*
* Don't emit notify for input, output and dropped buffers.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_SILENT,
g_param_spec_boolean ("silent", "silent",
"Don't emit notify for input, output and dropped buffers",
DEFAULT_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSrc::need-data:
* @appsrc: the appsrc element that emitted the signal
@ -763,6 +815,7 @@ gst_app_src_init (GstAppSrc * appsrc)
priv->min_percent = DEFAULT_PROP_MIN_PERCENT;
priv->handle_segment_change = DEFAULT_PROP_HANDLE_SEGMENT_CHANGE;
priv->leaky_type = DEFAULT_PROP_LEAKY_TYPE;
priv->silent = DEFAULT_SILENT;
gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
}
@ -795,6 +848,7 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
gst_queue_status_info_reset (&priv->queue_status_info);
priv->need_discont_upstream = FALSE;
priv->need_discont_downstream = FALSE;
priv->in = priv->out = priv->dropped = 0;
}
static void
@ -927,6 +981,9 @@ gst_app_src_set_property (GObject * object, guint prop_id,
case PROP_LEAKY_TYPE:
gst_app_src_set_leaky_type (appsrc, g_value_get_enum (value));
break;
case PROP_SILENT:
priv->silent = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1009,6 +1066,24 @@ gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_LEAKY_TYPE:
g_value_set_enum (value, gst_app_src_get_leaky_type (appsrc));
break;
case PROP_IN:
g_mutex_lock (&appsrc->priv->mutex);
g_value_set_uint64 (value, appsrc->priv->in);
g_mutex_unlock (&appsrc->priv->mutex);
break;
case PROP_OUT:
g_mutex_lock (&appsrc->priv->mutex);
g_value_set_uint64 (value, appsrc->priv->out);
g_mutex_unlock (&appsrc->priv->mutex);
break;
case PROP_DROPPED:
g_mutex_lock (&appsrc->priv->mutex);
g_value_set_uint64 (value, appsrc->priv->dropped);
g_mutex_unlock (&appsrc->priv->mutex);
break;
case PROP_SILENT:
g_value_set_boolean (value, appsrc->priv->silent);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -1027,6 +1102,12 @@ gst_app_src_send_event (GstElement * element, GstEvent * event)
gst_app_src_flush_queued (appsrc, TRUE);
priv->is_eos = FALSE;
g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsrc), "in");
g_object_notify (G_OBJECT (appsrc), "out");
g_object_notify (G_OBJECT (appsrc), "dropped");
}
break;
default:
if (GST_EVENT_IS_SERIALIZED (event)) {
@ -1091,6 +1172,7 @@ gst_app_src_start (GstBaseSrc * bsrc)
* in random-access mode. */
priv->offset = -1;
priv->flushing = FALSE;
priv->in = priv->out = priv->dropped = 0;
g_mutex_unlock (&priv->mutex);
gst_base_src_set_format (bsrc, priv->format);
@ -1115,8 +1197,15 @@ gst_app_src_stop (GstBaseSrc * bsrc)
priv->posted_latency_msg = FALSE;
gst_app_src_flush_queued (appsrc, TRUE);
g_cond_broadcast (&priv->cond);
priv->in = priv->out = priv->dropped = 0;
g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsrc), "in");
g_object_notify (G_OBJECT (appsrc), "out");
g_object_notify (G_OBJECT (appsrc), "dropped");
}
return TRUE;
}
@ -1262,8 +1351,14 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
gst_segment_copy_into (segment, &priv->last_segment);
gst_segment_copy_into (segment, &priv->current_segment);
priv->pending_custom_segment = FALSE;
g_mutex_unlock (&priv->mutex);
priv->is_eos = FALSE;
g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsrc), "in");
g_object_notify (G_OBJECT (appsrc), "out");
g_object_notify (G_OBJECT (appsrc), "dropped");
}
} else {
GST_WARNING_OBJECT (appsrc, "seek failed");
}
@ -1582,6 +1677,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
}
priv->pushed_buffer = TRUE;
priv->out += 1;
*buf = buffer;
} else if (GST_IS_BUFFER_LIST (obj)) {
GstBufferList *buffer_list;
@ -1609,6 +1705,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
push_delayed_events (appsrc);
}
priv->out += gst_buffer_list_length (buffer_list);
gst_base_src_submit_buffer_list (bsrc, buffer_list);
priv->pushed_buffer = TRUE;
*buf = NULL;
@ -1711,6 +1808,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
priv->wait_status &= ~STREAM_WAITING;
}
g_mutex_unlock (&priv->mutex);
if (ret == GST_FLOW_OK && !priv->silent) {
g_object_notify (G_OBJECT (appsrc), "out");
}
return ret;
/* ERRORS */
@ -2492,8 +2594,19 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
GST_DEBUG_OBJECT (appsrc, "Dropping old item %" GST_PTR_FORMAT, item);
gst_app_src_update_queued_pop (appsrc, item, FALSE);
if (GST_IS_BUFFER_LIST (item))
priv->dropped += gst_buffer_list_length (buflist);
else
priv->dropped += 1;
gst_mini_object_unref (item);
if (!priv->silent) {
g_mutex_unlock (&priv->mutex);
g_object_notify (G_OBJECT (appsrc), "dropped");
g_mutex_lock (&priv->mutex);
}
priv->need_discont_downstream = TRUE;
continue;
}
@ -2549,6 +2662,7 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
if (!steal_ref)
gst_buffer_list_ref (buflist);
gst_vec_deque_push_tail (priv->queue, buflist);
priv->in += gst_buffer_list_length (buflist);
} else {
/* Mark the buffer as DISCONT if we previously dropped a buffer instead of
* queueing it */
@ -2567,6 +2681,7 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
if (!steal_ref)
gst_buffer_ref (buffer);
gst_vec_deque_push_tail (priv->queue, buffer);
priv->in += 1;
}
gst_app_src_update_queued_push (appsrc,
@ -2577,6 +2692,10 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
g_mutex_unlock (&priv->mutex);
if (!appsrc->priv->silent) {
g_object_notify (G_OBJECT (appsrc), "in");
}
return GST_FLOW_OK;
/* ERRORS */
@ -2607,6 +2726,12 @@ eos:
dropped:
{
GST_DEBUG_OBJECT (appsrc, "dropped new buffer %p, we are full", buffer);
if (buflist)
priv->dropped += gst_buffer_list_length (buflist);
else
priv->dropped += 1;
if (steal_ref) {
if (buflist)
gst_buffer_list_unref (buflist);
@ -2614,6 +2739,11 @@ dropped:
gst_buffer_unref (buffer);
}
g_mutex_unlock (&priv->mutex);
if (!priv->silent) {
g_object_notify (G_OBJECT (appsrc), "dropped");
}
return GST_FLOW_OK;
}
}