streamsynchronizer: Consider streams having received stream-start as waiting

When using the custom WebKitMediaSrc element (used by WebKit and able to
perform an initial seek in playbin), a stall caused by streamsynchronizer
was detected during an initial seek. The flow of events revealed that the
intertwining of the initial configuration of the streams with the reset
caused by the flush events from the seek left streamsynchronizer in an
inconsistent state:

streamsynchronizer0:sink_0 (video) events, starting before the seek:
 stream-start --> Sets the stream to wait
 flush-stop --> Clears the stream wait flag
 caps
 tag
 segment
 stream-collection
 (buffers start to come and flow properly)

streamsynchronizer0:sink_1 (audio) events, happening after seek:
 (no flush events, because the stream hadn't been initialized when the seek happened)
 stream-start --> Sets the stream to wait
 caps
 segment
 (stalled because the stream is in wait mode!)

The code in streamsynchronizer expects that all the streams are in wait
state before releasing all of them at once. The flush on the video stream
broke that assumption and that's why the audio stream is never released in
that scenario.

Avoiding the clearing of the wait flag on flush-stop isn't an actual solution
to the problem, as it creates other side effects and at least makes the
gst-editing-services/seek_with_stop test to timeout. The alternate solution
implemented in this patch consists on analyzing if the other streams different
from the one newly added (after the flush) aren't waiting (which would mean
that they've all been unlocked after all of them were waiting before), and,
in that case, mark the new stream as also not waiting.

A new test_stream_start_wait test case has been added to demonstrate this
problem. The test case creates a video stream, pushes a buffer, then
simulates a seek by pushing flush-start, flush-stop, stream-start and segment
events. Note that the flush-stop clears the video stream waiting flag.
After that, a new audio stream is created and stream-start and new segment
events are sent. Note that stream-start will set the audio stream to wait.
Then a buffer is pushed on each stream. In the failing case, the test hangs.
In the working case (after this fix), the test runs properly because the
fact of having seen a stream-start also helps to clear the wait flag.

A second new test_stream_start_wait_sparse test has also been added to prove
that this mechanism can also work with sparse streams (a special case of the
current stream-start handling code). This test behaves like the previous one,
but there's no video buffer after the seek (it'll come in the future, as the
stream is sparse, but actually never comes). The buffer after the seek in the
audio stream starts at its due time. Streamsynchronizer is able to ignore
the wait for the video stream and produce audio buffers on time.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4544>
This commit is contained in:
Enrique Ocaña González 2024-11-14 21:36:19 +01:00 committed by GStreamer Marge Bot
parent 86bb1cf907
commit a1a189c07c
2 changed files with 647 additions and 46 deletions

View File

@ -492,6 +492,7 @@ gst_stream_synchronizer_handle_stream_start (GstStreamSynchronizer * self,
gboolean have_group_id;
GList *l;
gboolean all_wait = TRUE;
gboolean others_not_wait = TRUE;
gboolean new_stream = TRUE;
have_group_id = gst_event_parse_group_id (event, &group_id);
@ -547,16 +548,36 @@ gst_stream_synchronizer_handle_stream_start (GstStreamSynchronizer * self,
stream->wait = TRUE;
/* This variable checks if all the other streams except this one
are not waiting. There's an initial special case: If there's only
one stream and it's this one, set to FALSE */
others_not_wait = self->streams != NULL && !(self->streams->next == NULL
&& self->streams->data == stream);
for (l = self->streams; l; l = l->next) {
GstSyncStream *ostream = l->data;
/* This condition is reused for compuing both all_wait and
others_not_wait, but the former can also support sparse streams */
gboolean this_wait = ostream->wait && (!have_group_id
|| ostream->group_id == group_id);
all_wait = all_wait && ((ostream->flags & GST_STREAM_FLAG_SPARSE)
|| (ostream->wait && (!have_group_id
|| ostream->group_id == group_id)));
if (!all_wait)
break;
|| this_wait);
if (ostream != stream) {
others_not_wait = others_not_wait && !this_wait;
}
}
/* When all the streams of a media have been waiting, all of them are
unblocked for playback, but if a new stream appears after that (this is
detected when stream-start comes), it will be waiting and unaware of
all its sibling streams having already been unblocked, leading to a
stall. All_but_this_nowait detects that case and automatically sets the
new stream (this one) as not waiting. */
if (others_not_wait)
stream->wait = FALSE;
if (all_wait) {
gint64 position = 0;

View File

@ -40,10 +40,19 @@ typedef struct
{
GstPad *pad;
GList *to_push;
GMutex *push_mutex;
GCond *push_cond;
} MyPushInfo;
GMutex push_mutex;
GCond push_cond;
typedef struct
{
GList *expected;
GMutex push_mutex;
GCond push_cond;
gboolean compare_segment_base;
} MyPadPrivateData;
MyPadPrivateData private_data_video, private_data_audio;
static GstPad *
get_other_pad (GstPad * pad)
@ -64,36 +73,36 @@ get_other_pad (GstPad * pad)
static GstFlowReturn
my_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GList **expected = GST_PAD_ELEMENT_PRIVATE (pad);
MyPadPrivateData *private_data = GST_PAD_ELEMENT_PRIVATE (pad);
GList *next;
GstBuffer *exp;
fail_if (*expected == NULL,
fail_if (private_data->expected == NULL,
"streamsynchronizer pushed a buffer/event but we didn't expect any");
next = (*expected)->next;
next = (private_data->expected)->next;
fail_if (GST_IS_EVENT ((*expected)->data),
fail_if (GST_IS_EVENT ((private_data->expected)->data),
"Expected an event (%s) but got a buffer instead",
GST_EVENT_TYPE_NAME (GST_EVENT ((*expected)->data)));
GST_EVENT_TYPE_NAME (GST_EVENT ((private_data->expected)->data)));
exp = GST_BUFFER ((*expected)->data);
exp = GST_BUFFER ((private_data->expected)->data);
fail_unless_equals_uint64 (GST_BUFFER_TIMESTAMP (buf),
GST_BUFFER_TIMESTAMP (exp));
GST_DEBUG ("Properly received expected buffer");
GST_DEBUG ("Properly received expected buffer: %p", buf);
gst_buffer_unref (exp);
gst_buffer_unref (buf);
g_list_free1 (*expected);
*expected = next;
g_list_free1 (private_data->expected);
private_data->expected = next;
/* When done signal main thread */
if (next == NULL) {
g_mutex_lock (&push_mutex);
g_cond_signal (&push_cond);
g_mutex_unlock (&push_mutex);
g_mutex_lock (&private_data->push_mutex);
g_cond_signal (&private_data->push_cond);
g_mutex_unlock (&private_data->push_mutex);
}
return GST_FLOW_OK;
@ -102,19 +111,19 @@ my_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
static gboolean
my_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GList **expected = GST_PAD_ELEMENT_PRIVATE (pad);
MyPadPrivateData *private_data = GST_PAD_ELEMENT_PRIVATE (pad);
GList *next;
GstEvent *exp;
fail_if (*expected == NULL,
fail_if (private_data->expected == NULL,
"streamsynchronizer pushed a buffer/event but we didn't expect any");
next = (*expected)->next;
next = (private_data->expected)->next;
fail_unless (GST_IS_EVENT ((*expected)->data),
fail_unless (GST_IS_EVENT ((private_data->expected)->data),
"We were not expecting an event (But got an event of type %s)",
GST_EVENT_TYPE_NAME (event));
exp = GST_EVENT ((*expected)->data);
exp = GST_EVENT ((private_data->expected)->data);
fail_unless (GST_EVENT_TYPE (event) == GST_EVENT_TYPE (exp),
"Got event of type %s but expected of type %s",
GST_EVENT_TYPE_NAME (event), GST_EVENT_TYPE_NAME (exp));
@ -131,7 +140,9 @@ my_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_event_parse_segment (exp, &expectseg);
fail_unless_equals_int (recvseg->format, expectseg->format);
fail_unless_equals_uint64 (recvseg->base, expectseg->base);
if (private_data->compare_segment_base) {
fail_unless_equals_uint64 (recvseg->base, expectseg->base);
}
fail_unless_equals_uint64 (recvseg->offset, expectseg->offset);
fail_unless_equals_uint64 (recvseg->start, expectseg->start);
fail_unless_equals_uint64 (recvseg->stop, expectseg->stop);
@ -144,19 +155,20 @@ my_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
break;
}
GST_DEBUG ("Properly received expected event %s", GST_EVENT_TYPE_NAME (exp));
GST_DEBUG ("Properly received expected event %s: %p",
GST_EVENT_TYPE_NAME (exp), event);
gst_event_unref (exp);
gst_event_unref (event);
g_list_free1 (*expected);
*expected = next;
g_list_free1 (private_data->expected);
private_data->expected = next;
/* When done signal main thread */
if (next == NULL) {
g_mutex_lock (&push_mutex);
g_cond_signal (&push_cond);
g_mutex_unlock (&push_mutex);
g_mutex_lock (&private_data->push_mutex);
g_cond_signal (&private_data->push_cond);
g_mutex_unlock (&private_data->push_mutex);
}
return TRUE;
@ -167,12 +179,25 @@ my_push_thread (MyPushInfo * pushinfo)
{
GList *tmp;
GST_DEBUG ("starting thread");
/* Nothing to do if the to_push list is empty in the first place. Signal main thread */
if (pushinfo->to_push == NULL) {
g_mutex_lock (pushinfo->push_mutex);
g_cond_signal (pushinfo->push_cond);
g_mutex_unlock (pushinfo->push_mutex);
}
/* FIXME : Do this in a thread */
for (tmp = pushinfo->to_push; tmp; tmp = tmp->next) {
if (GST_IS_EVENT (tmp->data))
if (GST_IS_EVENT (tmp->data)) {
GST_DEBUG ("Pushing event %s: %p",
GST_EVENT_TYPE_NAME (GST_EVENT (tmp->data)), GST_EVENT (tmp->data));
gst_pad_push_event (pushinfo->pad, GST_EVENT (tmp->data));
else
} else {
GST_DEBUG ("Pushing buffer: %p", GST_BUFFER (tmp->data));
gst_pad_push (pushinfo->pad, GST_BUFFER (tmp->data));
}
}
GST_INFO ("leaving thread");
@ -184,7 +209,8 @@ GST_START_TEST (test_basic)
GstElement *synchr;
GstPad *sinkpad, *srcpad;
GstPad *mysrcpad, *mysinkpad;
GList *to_push = NULL, *expected = NULL;
GList *to_push = NULL;
GstEvent *event;
GstBuffer *buf;
GThread *thread;
@ -214,17 +240,24 @@ GST_START_TEST (test_basic)
fail_if (mysinkpad == NULL);
fail_unless (gst_pad_link (srcpad, mysinkpad) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysinkpad, TRUE));
GST_PAD_ELEMENT_PRIVATE (mysinkpad) = &expected;
GST_PAD_ELEMENT_PRIVATE (mysinkpad) = &private_data_video;
/* The segment.base expected time is important for this test */
private_data_video.compare_segment_base = TRUE;
private_data_video.expected = NULL;
/* Start with a stream START and a new segment */
event = gst_event_new_stream_start ("lala");
to_push = g_list_append (to_push, event);
expected = g_list_append (expected, gst_event_ref (event));
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
gst_segment_init (&segment, GST_FORMAT_TIME);
event = gst_event_new_segment (&segment);
to_push = g_list_append (to_push, event);
expected = g_list_append (expected, gst_event_ref (event));
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
/* Then 10 buffers */
for (i = 0; i < 10; i++) {
@ -232,13 +265,15 @@ GST_START_TEST (test_basic)
GST_BUFFER_TIMESTAMP (buf) = i * GST_SECOND;
GST_BUFFER_DURATION (buf) = GST_SECOND;
to_push = g_list_append (to_push, buf);
expected = g_list_append (expected, gst_buffer_ref (buf));
private_data_video.expected =
g_list_append (private_data_video.expected, gst_buffer_ref (buf));
}
/* Then a new stream start */
event = gst_event_new_stream_start ("lala again");
to_push = g_list_append (to_push, event);
expected = g_list_append (expected, gst_event_ref (event));
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
/* This newsegment will be updated */
gst_segment_init (&segment, GST_FORMAT_TIME);
@ -250,7 +285,8 @@ GST_START_TEST (test_basic)
segment.base = 10 * GST_SECOND;
event = gst_event_new_segment (&segment);
gst_event_set_seqnum (event, seqnum);
expected = g_list_append (expected, event);
private_data_video.expected =
g_list_append (private_data_video.expected, event);
/* Then 10 buffers */
for (i = 0; i < 10; i++) {
@ -258,22 +294,25 @@ GST_START_TEST (test_basic)
GST_BUFFER_TIMESTAMP (buf) = i * GST_SECOND;
GST_BUFFER_DURATION (buf) = GST_SECOND;
to_push = g_list_append (to_push, buf);
expected = g_list_append (expected, gst_buffer_ref (buf));
private_data_video.expected =
g_list_append (private_data_video.expected, gst_buffer_ref (buf));
}
g_mutex_init (&push_mutex);
g_cond_init (&push_cond);
g_mutex_init (&private_data_video.push_mutex);
pushinfo.push_mutex = &private_data_video.push_mutex;
g_cond_init (&private_data_video.push_cond);
pushinfo.push_cond = &private_data_video.push_cond;
pushinfo.pad = mysrcpad;
pushinfo.to_push = to_push;
g_mutex_lock (&push_mutex);
g_mutex_lock (&private_data_video.push_mutex);
thread = g_thread_new ("pushthread", (GThreadFunc) my_push_thread, &pushinfo);
fail_unless (thread != NULL);
g_cond_wait (&push_cond, &push_mutex);
g_mutex_unlock (&push_mutex);
g_cond_wait (&private_data_video.push_cond, &private_data_video.push_mutex);
g_mutex_unlock (&private_data_video.push_mutex);
fail_if (expected != NULL);
fail_if (private_data_video.expected != NULL);
/* wait for thread to exit before freeing things */
g_thread_join (thread);
@ -291,6 +330,545 @@ GST_START_TEST (test_basic)
GST_END_TEST;
GST_START_TEST (test_stream_start_wait)
{
GstElement *synchr;
GstPad *sinkpad_video, *srcpad_video, *sinkpad_audio, *srcpad_audio;
GstPad *mysrcpad_video, *mysinkpad_video, *mysrcpad_audio, *mysinkpad_audio;
GList *to_push_video = NULL, *to_push_audio = NULL;
GstEvent *event;
GstBuffer *buf;
GThread *thread_video, *thread_audio;
MyPushInfo pushinfo_video, pushinfo_audio;
GstSegment segment;
synchr = gst_element_factory_make ("streamsynchronizer", NULL);
GST_DEBUG ("Get sinkpad/srcpad for a first V0 stream");
sinkpad_video = gst_element_request_pad_simple (synchr, "sink_%u");
fail_unless (sinkpad_video != NULL);
srcpad_video = get_other_pad (sinkpad_video);
fail_unless (srcpad_video != NULL);
gst_element_set_state (synchr, GST_STATE_PLAYING);
mysrcpad_video = gst_pad_new_from_static_template (&mysrctemplate, "src");
fail_if (mysrcpad_video == NULL);
fail_unless (gst_pad_link (mysrcpad_video, sinkpad_video) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysrcpad_video, TRUE));
mysinkpad_video = gst_pad_new_from_static_template (&mysinktemplate, "sink");
gst_pad_set_chain_function (mysinkpad_video, my_sink_chain);
gst_pad_set_event_function (mysinkpad_video, my_sink_event);
fail_if (mysinkpad_video == NULL);
fail_unless (gst_pad_link (srcpad_video, mysinkpad_video) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysinkpad_video, TRUE));
GST_PAD_ELEMENT_PRIVATE (mysinkpad_video) = &private_data_video;
/* The segment.base expected time is important for this part of the test */
private_data_video.compare_segment_base = TRUE;
private_data_video.expected = NULL;
GST_DEBUG ("Start with a stream-start and a segment event");
event = gst_event_new_stream_start ("mse/V0");
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
gst_segment_init (&segment, GST_FORMAT_TIME);
event = gst_event_new_segment (&segment);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
buf = gst_buffer_new ();
GST_BUFFER_TIMESTAMP (buf) = 0 * GST_SECOND;
GST_BUFFER_DURATION (buf) = GST_SECOND;
GST_DEBUG ("Then 1 video buffer %p", buf);
to_push_video = g_list_append (to_push_video, buf);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_buffer_ref (buf));
GST_DEBUG ("Simulate the effect of a seek to 6s with basic events with...");
GST_DEBUG ("...a flush-start event");
event = gst_event_new_flush_start ();
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
GST_DEBUG ("...a flush-stop event");
event = gst_event_new_flush_stop (TRUE);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
GST_DEBUG ("...a stream-start event");
event = gst_event_new_stream_start ("mse/V0");
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
GST_DEBUG ("...and a segment event");
gst_segment_init (&segment, GST_FORMAT_TIME);
segment.format = GST_FORMAT_TIME;
segment.start = 6 * GST_SECOND;
segment.time = 6 * GST_SECOND;
event = gst_event_new_segment (&segment);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
/* Run all these steps until completion before continuing */
GST_DEBUG ("Run all these steps until completion before continuing");
g_mutex_init (&private_data_video.push_mutex);
pushinfo_video.push_mutex = &private_data_video.push_mutex;
g_cond_init (&private_data_video.push_cond);
pushinfo_video.push_cond = &private_data_video.push_cond;
pushinfo_video.pad = mysrcpad_video;
pushinfo_video.to_push = to_push_video;
g_mutex_lock (&private_data_video.push_mutex);
GST_DEBUG ("Creating video thread");
thread_video =
g_thread_new ("pushthread_video", (GThreadFunc) my_push_thread,
&pushinfo_video);
fail_unless (thread_video != NULL);
GST_DEBUG
("Waiting for all expected video events/buffers to be processed and join the video thread");
g_cond_wait (&private_data_video.push_cond, &private_data_video.push_mutex);
g_mutex_unlock (&private_data_video.push_mutex);
GST_DEBUG ("Wait completed");
fail_if (private_data_video.expected != NULL);
g_thread_join (thread_video);
GST_DEBUG ("Now create a second stream, A0");
sinkpad_audio = gst_element_request_pad_simple (synchr, "sink_%u");
fail_unless (sinkpad_audio != NULL);
srcpad_audio = get_other_pad (sinkpad_audio);
fail_unless (srcpad_audio != NULL);
mysrcpad_audio = gst_pad_new_from_static_template (&mysrctemplate, "src");
fail_if (mysrcpad_audio == NULL);
fail_unless (gst_pad_link (mysrcpad_audio, sinkpad_audio) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysrcpad_audio, TRUE));
mysinkpad_audio = gst_pad_new_from_static_template (&mysinktemplate, "sink");
gst_pad_set_chain_function (mysinkpad_audio, my_sink_chain);
gst_pad_set_event_function (mysinkpad_audio, my_sink_event);
fail_if (mysinkpad_audio == NULL);
fail_unless (gst_pad_link (srcpad_audio, mysinkpad_audio) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysinkpad_audio, TRUE));
GST_PAD_ELEMENT_PRIVATE (mysinkpad_audio) = &private_data_audio;
/* The segment.base expected time is not important for this part of the test,
* because it changes depending on the stream and sometimes is 1s for audio
* and 0s for video and some other times it's the opposite. It's not
* predictable. */
private_data_video.compare_segment_base = FALSE;
private_data_audio.expected = NULL;
/* Start with a stream START and a new segment like the one used for the simulated seek */
GST_DEBUG
("Start with a stream-start and a new segment like the one used for the simulated seek");
event = gst_event_new_stream_start ("mse/A0");
to_push_audio = g_list_append (to_push_audio, event);
private_data_audio.expected =
g_list_append (private_data_audio.expected, gst_event_ref (event));
gst_segment_init (&segment, GST_FORMAT_TIME);
segment.format = GST_FORMAT_TIME;
segment.start = 6 * GST_SECOND;
segment.time = 6 * GST_SECOND;
event = gst_event_new_segment (&segment);
to_push_audio = g_list_append (to_push_audio, event);
private_data_audio.expected =
g_list_append (private_data_audio.expected, gst_event_ref (event));
g_mutex_lock (&private_data_video.push_mutex);
buf = gst_buffer_new ();
GST_BUFFER_TIMESTAMP (buf) = 6 * GST_SECOND;
GST_BUFFER_DURATION (buf) = GST_SECOND;
GST_DEBUG ("Send one video buffer at 6 sec: %p", buf);
/* Discard old events/buffers from the list and start from scratch */
g_list_free (to_push_video);
to_push_video = NULL;
to_push_video = g_list_append (to_push_video, buf);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_buffer_ref (buf));
buf = gst_buffer_new ();
GST_BUFFER_TIMESTAMP (buf) = 6 * GST_SECOND;
GST_BUFFER_DURATION (buf) = GST_SECOND;
GST_DEBUG ("Send one audio buffer at 6 sec: %p", buf);
to_push_audio = g_list_append (to_push_audio, buf);
private_data_audio.expected =
g_list_append (private_data_audio.expected, gst_buffer_ref (buf));
g_mutex_init (&private_data_video.push_mutex);
pushinfo_video.push_mutex = &private_data_video.push_mutex;
g_cond_init (&private_data_video.push_cond);
pushinfo_video.push_cond = &private_data_video.push_cond;
pushinfo_video.pad = mysrcpad_video;
pushinfo_video.to_push = to_push_video;
g_mutex_lock (&private_data_video.push_mutex);
GST_DEBUG ("Creating video thread again");
thread_video =
g_thread_new ("pushthread_video", (GThreadFunc) my_push_thread,
&pushinfo_video);
fail_unless (thread_video != NULL);
g_mutex_init (&private_data_audio.push_mutex);
pushinfo_audio.push_mutex = &private_data_audio.push_mutex;
g_cond_init (&private_data_audio.push_cond);
pushinfo_audio.push_cond = &private_data_audio.push_cond;
pushinfo_audio.pad = mysrcpad_audio;
pushinfo_audio.to_push = to_push_audio;
g_mutex_lock (&private_data_audio.push_mutex);
GST_DEBUG ("Creating audio thread");
thread_audio =
g_thread_new ("pushthread_audio", (GThreadFunc) my_push_thread,
&pushinfo_audio);
fail_unless (thread_audio != NULL);
GST_DEBUG ("Waiting for all expected video events/buffers to be processed");
g_cond_wait (&private_data_video.push_cond, &private_data_video.push_mutex);
g_mutex_unlock (&private_data_video.push_mutex);
GST_DEBUG ("Video wait completed");
fail_if (private_data_video.expected != NULL);
GST_DEBUG ("Waiting for all expected audio events/buffers to be processed");
g_cond_wait (&private_data_audio.push_cond, &private_data_audio.push_mutex);
g_mutex_unlock (&private_data_audio.push_mutex);
GST_DEBUG ("Audio wait completed");
fail_if (private_data_audio.expected != NULL);
g_thread_join (thread_video);
g_thread_join (thread_audio);
GST_DEBUG ("Cleanup");
g_list_free (to_push_video);
g_list_free (to_push_audio);
gst_element_release_request_pad (synchr, sinkpad_video);
gst_element_release_request_pad (synchr, sinkpad_audio);
gst_object_unref (srcpad_video);
gst_object_unref (sinkpad_video);
gst_object_unref (mysinkpad_video);
gst_object_unref (mysrcpad_video);
gst_object_unref (srcpad_audio);
gst_object_unref (sinkpad_audio);
gst_object_unref (mysinkpad_audio);
gst_object_unref (mysrcpad_audio);
gst_element_set_state (synchr, GST_STATE_NULL);
gst_object_unref (synchr);
}
GST_END_TEST;
GST_START_TEST (test_stream_start_wait_sparse)
{
GstElement *synchr;
GstPad *sinkpad_video, *srcpad_video, *sinkpad_audio, *srcpad_audio;
GstPad *mysrcpad_video, *mysinkpad_video, *mysrcpad_audio, *mysinkpad_audio;
GList *to_push_video = NULL, *to_push_audio = NULL;
GstEvent *event;
GstBuffer *buf;
GThread *thread_video, *thread_audio;
MyPushInfo pushinfo_video, pushinfo_audio;
GstSegment segment;
synchr = gst_element_factory_make ("streamsynchronizer", NULL);
GST_DEBUG ("Get sinkpad/srcpad for a first V0 stream");
sinkpad_video = gst_element_request_pad_simple (synchr, "sink_%u");
fail_unless (sinkpad_video != NULL);
srcpad_video = get_other_pad (sinkpad_video);
fail_unless (srcpad_video != NULL);
gst_element_set_state (synchr, GST_STATE_PLAYING);
mysrcpad_video = gst_pad_new_from_static_template (&mysrctemplate, "src");
fail_if (mysrcpad_video == NULL);
fail_unless (gst_pad_link (mysrcpad_video, sinkpad_video) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysrcpad_video, TRUE));
mysinkpad_video = gst_pad_new_from_static_template (&mysinktemplate, "sink");
gst_pad_set_chain_function (mysinkpad_video, my_sink_chain);
gst_pad_set_event_function (mysinkpad_video, my_sink_event);
fail_if (mysinkpad_video == NULL);
fail_unless (gst_pad_link (srcpad_video, mysinkpad_video) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysinkpad_video, TRUE));
GST_PAD_ELEMENT_PRIVATE (mysinkpad_video) = &private_data_video;
/* The segment.base expected time is important for this part of the test */
private_data_video.compare_segment_base = TRUE;
private_data_video.expected = NULL;
GST_DEBUG ("Start with a stream-start (sparse) and a segment event");
event = gst_event_new_stream_start ("mse/V0");
gst_event_set_stream_flags (event, GST_STREAM_FLAG_SPARSE);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
gst_segment_init (&segment, GST_FORMAT_TIME);
event = gst_event_new_segment (&segment);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
buf = gst_buffer_new ();
/* The stream is sparse. The segment starts at 0s but the first buffer
comes at 1s */
GST_BUFFER_TIMESTAMP (buf) = 1 * GST_SECOND;
GST_BUFFER_DURATION (buf) = GST_SECOND;
GST_DEBUG ("Then 1 video buffer %p", buf);
to_push_video = g_list_append (to_push_video, buf);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_buffer_ref (buf));
GST_DEBUG ("Simulate the effect of a seek to 6s with basic events with...");
GST_DEBUG ("...a flush-start event");
event = gst_event_new_flush_start ();
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
GST_DEBUG ("...a flush-stop event");
event = gst_event_new_flush_stop (TRUE);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
GST_DEBUG ("...a stream-start (sparse) event");
event = gst_event_new_stream_start ("mse/V0");
gst_event_set_stream_flags (event, GST_STREAM_FLAG_SPARSE);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
GST_DEBUG ("...and a segment event");
gst_segment_init (&segment, GST_FORMAT_TIME);
segment.format = GST_FORMAT_TIME;
segment.start = 6 * GST_SECOND;
segment.time = 6 * GST_SECOND;
event = gst_event_new_segment (&segment);
to_push_video = g_list_append (to_push_video, event);
private_data_video.expected =
g_list_append (private_data_video.expected, gst_event_ref (event));
/* Run all these steps until completion before continuing */
GST_DEBUG ("Run all these steps until completion before continuing");
g_mutex_init (&private_data_video.push_mutex);
pushinfo_video.push_mutex = &private_data_video.push_mutex;
g_cond_init (&private_data_video.push_cond);
pushinfo_video.push_cond = &private_data_video.push_cond;
pushinfo_video.pad = mysrcpad_video;
pushinfo_video.to_push = to_push_video;
g_mutex_lock (&private_data_video.push_mutex);
GST_DEBUG ("Creating video thread");
thread_video =
g_thread_new ("pushthread_video", (GThreadFunc) my_push_thread,
&pushinfo_video);
fail_unless (thread_video != NULL);
GST_DEBUG
("Waiting for all expected video events/buffers to be processed and join the video thread");
g_cond_wait (&private_data_video.push_cond, &private_data_video.push_mutex);
g_mutex_unlock (&private_data_video.push_mutex);
GST_DEBUG ("Wait completed");
fail_if (private_data_video.expected != NULL);
g_thread_join (thread_video);
GST_DEBUG ("Now create a second stream, A0");
sinkpad_audio = gst_element_request_pad_simple (synchr, "sink_%u");
fail_unless (sinkpad_audio != NULL);
srcpad_audio = get_other_pad (sinkpad_audio);
fail_unless (srcpad_audio != NULL);
mysrcpad_audio = gst_pad_new_from_static_template (&mysrctemplate, "src");
fail_if (mysrcpad_audio == NULL);
fail_unless (gst_pad_link (mysrcpad_audio, sinkpad_audio) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysrcpad_audio, TRUE));
mysinkpad_audio = gst_pad_new_from_static_template (&mysinktemplate, "sink");
gst_pad_set_chain_function (mysinkpad_audio, my_sink_chain);
gst_pad_set_event_function (mysinkpad_audio, my_sink_event);
fail_if (mysinkpad_audio == NULL);
fail_unless (gst_pad_link (srcpad_audio, mysinkpad_audio) == GST_PAD_LINK_OK);
fail_unless (gst_pad_set_active (mysinkpad_audio, TRUE));
GST_PAD_ELEMENT_PRIVATE (mysinkpad_audio) = &private_data_audio;
/* The segment.base expected time is not important for this part of the test,
* because it changes depending on the stream and sometimes is 1s for audio
* and 0s for video and some other times it's the opposite. It's not
* predictable. */
private_data_video.compare_segment_base = FALSE;
private_data_audio.expected = NULL;
/* Start with a stream START and a new segment like the one used for the simulated seek */
GST_DEBUG
("Start with a stream-start (not sparse) and a new segment like the one used for the simulated seek");
event = gst_event_new_stream_start ("mse/A0");
to_push_audio = g_list_append (to_push_audio, event);
private_data_audio.expected =
g_list_append (private_data_audio.expected, gst_event_ref (event));
gst_segment_init (&segment, GST_FORMAT_TIME);
segment.format = GST_FORMAT_TIME;
segment.start = 6 * GST_SECOND;
segment.time = 6 * GST_SECOND;
event = gst_event_new_segment (&segment);
to_push_audio = g_list_append (to_push_audio, event);
private_data_audio.expected =
g_list_append (private_data_audio.expected, gst_event_ref (event));
/* Discard old events/buffers from the list and start from scratch */
g_list_free (to_push_video);
to_push_video = NULL;
g_list_free (private_data_video.expected);
private_data_video.expected = NULL;
buf = gst_buffer_new ();
GST_BUFFER_TIMESTAMP (buf) = 6 * GST_SECOND;
GST_BUFFER_DURATION (buf) = GST_SECOND;
GST_DEBUG ("Send one audio buffer at 6 sec: %p", buf);
to_push_audio = g_list_append (to_push_audio, buf);
private_data_audio.expected =
g_list_append (private_data_audio.expected, gst_buffer_ref (buf));
g_mutex_init (&private_data_video.push_mutex);
pushinfo_video.push_mutex = &private_data_video.push_mutex;
g_cond_init (&private_data_video.push_cond);
pushinfo_video.push_cond = &private_data_video.push_cond;
pushinfo_video.pad = mysrcpad_video;
pushinfo_video.to_push = to_push_video;
g_mutex_lock (&private_data_video.push_mutex);
GST_DEBUG ("Creating video thread again");
thread_video =
g_thread_new ("pushthread_video", (GThreadFunc) my_push_thread,
&pushinfo_video);
fail_unless (thread_video != NULL);
g_mutex_init (&private_data_audio.push_mutex);
pushinfo_audio.push_mutex = &private_data_audio.push_mutex;
g_cond_init (&private_data_audio.push_cond);
pushinfo_audio.push_cond = &private_data_audio.push_cond;
pushinfo_audio.pad = mysrcpad_audio;
pushinfo_audio.to_push = to_push_audio;
g_mutex_lock (&private_data_audio.push_mutex);
GST_DEBUG ("Creating audio thread");
thread_audio =
g_thread_new ("pushthread_audio", (GThreadFunc) my_push_thread,
&pushinfo_audio);
fail_unless (thread_audio != NULL);
GST_DEBUG ("Waiting for all expected audio events/buffers to be processed");
g_cond_wait (&private_data_audio.push_cond, &private_data_audio.push_mutex);
g_mutex_unlock (&private_data_audio.push_mutex);
GST_DEBUG ("Audio wait completed");
fail_if (private_data_audio.expected != NULL);
GST_DEBUG ("Waiting for all expected video events/buffers to be processed");
g_cond_wait (&private_data_video.push_cond, &private_data_video.push_mutex);
g_mutex_unlock (&private_data_video.push_mutex);
GST_DEBUG ("Video wait completed");
fail_if (private_data_video.expected != NULL);
g_thread_join (thread_video);
g_thread_join (thread_audio);
GST_DEBUG ("Cleanup");
g_list_free (to_push_video);
g_list_free (to_push_audio);
gst_element_release_request_pad (synchr, sinkpad_video);
gst_element_release_request_pad (synchr, sinkpad_audio);
gst_object_unref (srcpad_video);
gst_object_unref (sinkpad_video);
gst_object_unref (mysinkpad_video);
gst_object_unref (mysrcpad_video);
gst_object_unref (srcpad_audio);
gst_object_unref (sinkpad_audio);
gst_object_unref (mysinkpad_audio);
gst_object_unref (mysrcpad_audio);
gst_element_set_state (synchr, GST_STATE_NULL);
gst_object_unref (synchr);
}
GST_END_TEST;
static Suite *
streamsynchronizer_suite (void)
{
@ -299,6 +877,8 @@ streamsynchronizer_suite (void)
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_basic);
tcase_add_test (tc_chain, test_stream_start_wait);
tcase_add_test (tc_chain, test_stream_start_wait_sparse);
return s;
}