appsrc: properly handle events received before sending the segment

The first serialized events that can be send on a src pad are a CAPS and then a
SEGMENT event.

When handling events from user in appsrc, we used to send a segment
automatically if the SEGMENT has not been sent yet.
This breaks if the CAPS event was not send either as we were now sending
a SEGMENT before the CAPS.

Fix this by delaying such events until the CAPS has been configured.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297>
This commit is contained in:
Guillaume Desmottes 2023-03-31 15:36:53 +02:00 committed by GStreamer Marge Bot
parent 7b41db3ab6
commit 84dea99132
2 changed files with 287 additions and 22 deletions

View File

@ -154,6 +154,11 @@ struct _GstAppSrcPrivate
/* queue up a segment event based on last_segment before /* queue up a segment event based on last_segment before
* the next buffer of buffer list */ * the next buffer of buffer list */
gboolean pending_custom_segment; gboolean pending_custom_segment;
/* events that have been delayed until either the caps is configured, ensuring
that no events are sent before CAPS, or buffers are being pushed. */
GstQueueArray *delayed_events;
/* if a buffer has been pushed yet */
gboolean pushed_buffer;
/* the next buffer that will be queued needs a discont flag /* the next buffer that will be queued needs a discont flag
* because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */ * because the previous one was dropped - GST_APP_LEAKY_TYPE_UPSTREAM */
@ -743,7 +748,9 @@ gst_app_src_init (GstAppSrc * appsrc)
g_mutex_init (&priv->mutex); g_mutex_init (&priv->mutex);
g_cond_init (&priv->cond); g_cond_init (&priv->cond);
priv->queue = gst_queue_array_new (16); priv->queue = gst_queue_array_new (16);
priv->delayed_events = gst_queue_array_new (16);
priv->wait_status = NOONE_WAITING; priv->wait_status = NOONE_WAITING;
priv->pushed_buffer = FALSE;
priv->size = DEFAULT_PROP_SIZE; priv->size = DEFAULT_PROP_SIZE;
priv->duration = DEFAULT_PROP_DURATION; priv->duration = DEFAULT_PROP_DURATION;
@ -785,6 +792,9 @@ gst_app_src_flush_queued (GstAppSrc * src, gboolean retain_last_caps)
gst_queue_array_push_tail (priv->queue, requeue_caps); gst_queue_array_push_tail (priv->queue, requeue_caps);
} }
gst_queue_array_clear (priv->delayed_events);
priv->pushed_buffer = FALSE;
priv->queued_bytes = 0; priv->queued_bytes = 0;
priv->queued_buffers = 0; priv->queued_buffers = 0;
priv->queued_time = 0; priv->queued_time = 0;
@ -832,6 +842,7 @@ gst_app_src_finalize (GObject * obj)
g_mutex_clear (&priv->mutex); g_mutex_clear (&priv->mutex);
g_cond_clear (&priv->cond); g_cond_clear (&priv->cond);
gst_queue_array_free (priv->queue); gst_queue_array_free (priv->queue);
gst_queue_array_free (priv->delayed_events);
g_free (priv->uri); g_free (priv->uri);
@ -1027,6 +1038,7 @@ gst_app_src_send_event (GstElement * element, GstEvent * event)
if (GST_EVENT_IS_SERIALIZED (event)) { if (GST_EVENT_IS_SERIALIZED (event)) {
GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event); GST_DEBUG_OBJECT (appsrc, "queue event: %" GST_PTR_FORMAT, event);
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
gst_queue_array_push_tail (priv->queue, event); gst_queue_array_push_tail (priv->queue, event);
if ((priv->wait_status & STREAM_WAITING)) if ((priv->wait_status & STREAM_WAITING))
@ -1559,6 +1571,54 @@ gst_app_src_update_queued_push (GstAppSrc * appsrc, GstMiniObject * item)
GST_TIME_ARGS (priv->queued_time)); GST_TIME_ARGS (priv->queued_time));
} }
/* check if @obj should be send after the CAPS and SEGMENT events */
static gboolean
needs_segment (GstMiniObject * obj)
{
if (GST_IS_CAPS (obj)) {
return FALSE;
} else if (GST_IS_EVENT (obj)) {
if (GST_EVENT_TYPE (obj) == GST_EVENT_SEGMENT)
return FALSE;
}
return TRUE;
}
static void
ensure_segment (GstAppSrc * appsrc)
{
GstAppSrcPrivate *priv = appsrc->priv;
GstEvent *seg_event;
seg_event = gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
GST_EVENT_SEGMENT, 0);
if (!seg_event) {
GST_DEBUG_OBJECT (appsrc, "sending default segment");
gst_base_src_push_segment (GST_BASE_SRC_CAST (appsrc), &priv->last_segment);
} else {
gst_event_unref (seg_event);
}
}
static void
push_delayed_events (GstAppSrc * appsrc)
{
GstAppSrcPrivate *priv = appsrc->priv;
while (!gst_queue_array_is_empty (priv->delayed_events)) {
GstEvent *event;
event = gst_queue_array_pop_head (priv->delayed_events);
GST_DEBUG_OBJECT (appsrc, "sending event: %" GST_PTR_FORMAT, event);
g_mutex_unlock (&priv->mutex);
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), GST_EVENT (event));
g_mutex_lock (&priv->mutex);
}
}
static GstFlowReturn static GstFlowReturn
gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size, gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
GstBuffer ** buf) GstBuffer ** buf)
@ -1625,6 +1685,11 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
if (!gst_queue_array_is_empty (priv->queue)) { if (!gst_queue_array_is_empty (priv->queue)) {
GstMiniObject *obj = gst_queue_array_pop_head (priv->queue); GstMiniObject *obj = gst_queue_array_pop_head (priv->queue);
if (priv->current_caps && needs_segment (obj)) {
/* need to have sent a segment before sending `obj` */
ensure_segment (appsrc);
}
if (GST_IS_CAPS (obj)) { if (GST_IS_CAPS (obj)) {
GstCaps *next_caps = GST_CAPS (obj); GstCaps *next_caps = GST_CAPS (obj);
gboolean caps_changed = TRUE; gboolean caps_changed = TRUE;
@ -1645,6 +1710,17 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
if (caps_changed) if (caps_changed)
gst_app_src_do_negotiate (bsrc); gst_app_src_do_negotiate (bsrc);
/* sending delayed events which were waiting on the caps */
if (!gst_queue_array_is_empty (priv->delayed_events)) {
/* need to send a segment before the events */
ensure_segment (appsrc);
GST_DEBUG_OBJECT (appsrc,
"sending delayed events after caps: %" GST_PTR_FORMAT, obj);
push_delayed_events (appsrc);
}
/* Continue checks caps and queue */ /* Continue checks caps and queue */
continue; continue;
} }
@ -1662,6 +1738,13 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
priv->need_discont_downstream = FALSE; priv->need_discont_downstream = FALSE;
} }
if (!gst_queue_array_is_empty (priv->delayed_events)) {
/* don't keep delaying events if a buffer has been pushed without CAPS */
GST_DEBUG_OBJECT (appsrc, "push delayed events before buffer");
push_delayed_events (appsrc);
}
priv->pushed_buffer = TRUE;
*buf = buffer; *buf = buffer;
} else if (GST_IS_BUFFER_LIST (obj)) { } else if (GST_IS_BUFFER_LIST (obj)) {
GstBufferList *buffer_list; GstBufferList *buffer_list;
@ -1681,7 +1764,14 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
priv->need_discont_downstream = FALSE; priv->need_discont_downstream = FALSE;
} }
if (!gst_queue_array_is_empty (priv->delayed_events)) {
/* don't keep delaying events if a buffer has been pushed without CAPS */
GST_DEBUG_OBJECT (appsrc, "push delayed events before buffer");
push_delayed_events (appsrc);
}
gst_base_src_submit_buffer_list (bsrc, buffer_list); gst_base_src_submit_buffer_list (bsrc, buffer_list);
priv->pushed_buffer = TRUE;
*buf = NULL; *buf = NULL;
} else if (GST_IS_EVENT (obj)) { } else if (GST_IS_EVENT (obj)) {
GstEvent *event = GST_EVENT (obj); GstEvent *event = GST_EVENT (obj);
@ -1697,7 +1787,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
if (!gst_segment_is_equal (&priv->current_segment, segment)) { if (!gst_segment_is_equal (&priv->current_segment, segment)) {
GST_DEBUG_OBJECT (appsrc, GST_DEBUG_OBJECT (appsrc,
"Update new segment %" GST_PTR_FORMAT, event); "Update new segment %" GST_PTR_FORMAT, event);
if (!gst_base_src_new_segment (bsrc, segment)) { if (!gst_base_src_push_segment (bsrc, segment)) {
GST_ERROR_OBJECT (appsrc, GST_ERROR_OBJECT (appsrc,
"Couldn't set new segment %" GST_PTR_FORMAT, event); "Couldn't set new segment %" GST_PTR_FORMAT, event);
gst_event_unref (event); gst_event_unref (event);
@ -1708,32 +1798,18 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
gst_event_unref (event); gst_event_unref (event);
} else { } else {
GstEvent *seg_event;
GstSegment last_segment = priv->last_segment;
/* event is serialized with the buffers flow */ /* event is serialized with the buffers flow */
/* We are about to push an event, release out lock */ if (!priv->current_caps && !priv->pushed_buffer) {
g_mutex_unlock (&priv->mutex);
seg_event =
gst_pad_get_sticky_event (GST_BASE_SRC_PAD (appsrc),
GST_EVENT_SEGMENT, 0);
if (!seg_event) {
seg_event = gst_event_new_segment (&last_segment);
GST_DEBUG_OBJECT (appsrc, GST_DEBUG_OBJECT (appsrc,
"received serialized event before first buffer, push default segment %" "did not send caps yet, delay event for now");
GST_PTR_FORMAT, seg_event); gst_queue_array_push_tail (priv->delayed_events, event);
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), seg_event);
} else { } else {
gst_event_unref (seg_event); /* We are about to push an event, release out lock */
g_mutex_unlock (&priv->mutex);
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
g_mutex_lock (&priv->mutex);
} }
gst_pad_push_event (GST_BASE_SRC_PAD (appsrc), event);
g_mutex_lock (&priv->mutex);
} }
continue; continue;
} else { } else {

View File

@ -1446,6 +1446,192 @@ GST_START_TEST (test_appsrc_send_custom_event)
GST_END_TEST; GST_END_TEST;
enum ExpectedObj
{
EXPECTED_STREAM_START,
EXPECTED_CAPS,
EXPECTED_SEGMENT,
EXPECTED_CUSTOM,
EXPECTED_BUFFER,
};
static enum ExpectedObj expected_obj;
static gboolean
send_event_before_buffer_event_func (GstPad * pad, GstObject * parent,
GstEvent * event)
{
GST_LOG ("event %" GST_PTR_FORMAT, event);
if (expected_obj == EXPECTED_STREAM_START) {
g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_STREAM_START);
expected_obj = EXPECTED_CAPS;
} else if (expected_obj == EXPECTED_CAPS) {
g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_CAPS);
expected_obj = EXPECTED_SEGMENT;
} else if (expected_obj == EXPECTED_SEGMENT) {
g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_SEGMENT);
expected_obj = EXPECTED_CUSTOM;
} else if (expected_obj == EXPECTED_CUSTOM) {
g_assert_cmpuint (GST_EVENT_TYPE (event), ==, GST_EVENT_CUSTOM_DOWNSTREAM);
expected_obj = EXPECTED_BUFFER;
} else {
g_assert_not_reached ();
}
gst_event_unref (event);
return TRUE;
}
static GstFlowReturn
send_event_before_buffer_chain_func (GstPad * pad, GstObject * parent,
GstBuffer * buf)
{
GST_LOG ("buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
g_assert_cmpuint (expected_obj, ==, EXPECTED_BUFFER);
g_mutex_lock (&check_mutex);
done = TRUE;
g_cond_signal (&check_cond);
g_mutex_unlock (&check_mutex);
gst_buffer_unref (buf);
return GST_FLOW_OK;
}
/* send a custom event to appsrc before the first buffer */
GST_START_TEST (test_appsrc_send_event_before_buffer)
{
GstElement *src;
GstBuffer *buf;
GstCaps *caps;
src = setup_appsrc ();
g_object_set (src, "format", GST_FORMAT_TIME, NULL);
ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
gst_pad_set_event_function (mysinkpad, send_event_before_buffer_event_func);
gst_pad_set_chain_function (mysinkpad, send_event_before_buffer_chain_func);
expected_obj = EXPECTED_STREAM_START;
/* send a custom event and then the first buffer */
gst_element_send_event (src,
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
gst_structure_new ("custom", NULL, NULL)));
caps = gst_caps_from_string ("video/x-raw");
gst_app_src_set_caps (GST_APP_SRC_CAST (src), caps);
gst_caps_unref (caps);
buf = gst_buffer_new_and_alloc (2);
GST_BUFFER_OFFSET (buf) = 0;
fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
buf) == GST_FLOW_OK);
g_mutex_lock (&check_mutex);
while (!done)
g_cond_wait (&check_cond, &check_mutex);
g_mutex_unlock (&check_mutex);
ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsrc (src);
}
GST_END_TEST;
/* send a custom event to appsrc before the first sample */
GST_START_TEST (test_appsrc_send_event_before_sample)
{
GstElement *src;
GstSample *sample;
GstBuffer *buf;
GstCaps *caps;
GstSegment segment;
src = setup_appsrc ();
g_object_set (src, "format", GST_FORMAT_TIME, NULL);
ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
gst_pad_set_event_function (mysinkpad, send_event_before_buffer_event_func);
gst_pad_set_chain_function (mysinkpad, send_event_before_buffer_chain_func);
expected_obj = EXPECTED_STREAM_START;
/* send a custom event and then the first sample */
gst_element_send_event (src,
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
gst_structure_new ("custom", NULL, NULL)));
buf = gst_buffer_new_and_alloc (2);
caps = gst_caps_from_string ("video/x-raw");
gst_segment_init (&segment, GST_FORMAT_TIME);
segment.start = 5;
sample = gst_sample_new (buf, caps, &segment, NULL);
fail_unless (gst_app_src_push_sample (GST_APP_SRC_CAST (src),
sample) == GST_FLOW_OK);
gst_caps_unref (caps);
gst_buffer_unref (buf);
gst_sample_unref (sample);
g_mutex_lock (&check_mutex);
while (!done)
g_cond_wait (&check_cond, &check_mutex);
g_mutex_unlock (&check_mutex);
ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsrc (src);
}
GST_END_TEST;
/* send a custom event to appsrc between the caps and the first buffer */
GST_START_TEST (test_appsrc_send_event_between_caps_buffer)
{
GstElement *src;
GstBuffer *buf;
GstCaps *caps;
src = setup_appsrc ();
g_object_set (src, "format", GST_FORMAT_TIME, NULL);
ASSERT_SET_STATE (src, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
gst_pad_set_event_function (mysinkpad, send_event_before_buffer_event_func);
gst_pad_set_chain_function (mysinkpad, send_event_before_buffer_chain_func);
expected_obj = EXPECTED_STREAM_START;
caps = gst_caps_from_string ("video/x-raw");
gst_app_src_set_caps (GST_APP_SRC_CAST (src), caps);
gst_caps_unref (caps);
/* send a custom event and then the first buffer */
gst_element_send_event (src,
gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
gst_structure_new ("custom", NULL, NULL)));
buf = gst_buffer_new_and_alloc (2);
GST_BUFFER_OFFSET (buf) = 0;
fail_unless (gst_app_src_push_buffer (GST_APP_SRC_CAST (src),
buf) == GST_FLOW_OK);
g_mutex_lock (&check_mutex);
while (!done)
g_cond_wait (&check_cond, &check_mutex);
g_mutex_unlock (&check_mutex);
ASSERT_SET_STATE (src, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsrc (src);
}
GST_END_TEST;
static Suite * static Suite *
appsrc_suite (void) appsrc_suite (void)
{ {
@ -1461,6 +1647,9 @@ appsrc_suite (void)
tcase_add_test (tc_chain, test_appsrc_custom_segment_twice); tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
tcase_add_test (tc_chain, test_appsrc_limits); tcase_add_test (tc_chain, test_appsrc_limits);
tcase_add_test (tc_chain, test_appsrc_send_custom_event); tcase_add_test (tc_chain, test_appsrc_send_custom_event);
tcase_add_test (tc_chain, test_appsrc_send_event_before_buffer);
tcase_add_test (tc_chain, test_appsrc_send_event_before_sample);
tcase_add_test (tc_chain, test_appsrc_send_event_between_caps_buffer);
if (RUNNING_ON_VALGRIND) if (RUNNING_ON_VALGRIND)
tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5); tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);