streamsynchronizer: avoid pad destruction races.

Due to the use of {set/get}-element_private methods being used to store
the GstSyncStream in the src and sink pads, and the racey nature of pad
destruction, there are numerous ways we can be bitten by race conditions
in the stream synchronizer. Fix that by tying the pads toghether with
references.
This commit is contained in:
Charlie Turner 2019-09-20 09:48:30 +01:00 committed by Sebastian Dröge
parent 6f7c9e43bc
commit 96c6f581ae

View File

@ -81,35 +81,147 @@ typedef struct
guint32 stream_start_seqnum; guint32 stream_start_seqnum;
guint32 segment_seqnum; guint32 segment_seqnum;
guint group_id; guint group_id;
gint refcount;
} GstSyncStream; } GstSyncStream;
/* Must be called with lock! */ static GstSyncStream *
static inline GstPad * gst_syncstream_ref (GstSyncStream * stream)
gst_stream_get_other_pad (GstSyncStream * stream, GstPad * pad)
{ {
if (stream->sinkpad == pad) g_return_val_if_fail (stream != NULL, NULL);
return gst_object_ref (stream->srcpad); g_atomic_int_add (&stream->refcount, 1);
if (stream->srcpad == pad) return stream;
return gst_object_ref (stream->sinkpad); }
return NULL; static void
gst_syncstream_unref (GstSyncStream * stream)
{
g_return_if_fail (stream != NULL);
g_return_if_fail (stream->refcount > 0);
if (g_atomic_int_dec_and_test (&stream->refcount))
g_slice_free (GstSyncStream, stream);
}
G_BEGIN_DECLS
#define GST_TYPE_STREAMSYNC_PAD (gst_streamsync_pad_get_type ())
#define GST_IS_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMSYNC_PAD))
#define GST_IS_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMSYNC_PAD))
#define GST_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPad))
#define GST_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPadClass))
typedef struct _GstStreamSyncPad GstStreamSyncPad;
typedef struct _GstStreamSyncPadClass GstStreamSyncPadClass;
struct _GstStreamSyncPad
{
GstPad parent;
GstSyncStream *stream;
/* Since we need to access data associated with a pad in this
* element, it's important to manage the respective lifetimes of the
* stored pad data and the pads themselves. Pad deactivation happens
* without mutual exclusion to the use of pad data in this element.
*
* The approach here is to have the sinkpad (the request pad) hold a
* strong reference onto the srcpad (so that it stays alive until
* the last pad is destroyed). Similarly the srcpad has a weak
* reference to the sinkpad (request pad) to ensure it knows when
* the pads are destroyed, since the pad data may be requested from
* either the srcpad or the sinkpad. This avoids a nasty set of
* potential race conditions.
*
* The code is arranged so that in the srcpad, the pad pointer is
* always NULL (not used) and in the sinkpad, the otherpad is always
* NULL. */
GstPad *pad;
GWeakRef otherpad;
};
struct _GstStreamSyncPadClass
{
GstPadClass parent_class;
};
static GType gst_streamsync_pad_get_type (void);
static GstSyncStream *gst_streamsync_pad_get_stream (GstPad * pad);
G_END_DECLS
#define GST_STREAMSYNC_PAD_CAST(obj) ((GstStreamSyncPad *)obj)
G_DEFINE_TYPE (GstStreamSyncPad, gst_streamsync_pad, GST_TYPE_PAD);
static void gst_streamsync_pad_dispose (GObject * object);
static void
gst_streamsync_pad_class_init (GstStreamSyncPadClass * klass)
{
GObjectClass *gobject_class;
gobject_class = G_OBJECT_CLASS (klass);
gobject_class->dispose = gst_streamsync_pad_dispose;
}
static void
gst_streamsync_pad_init (GstStreamSyncPad * ppad)
{
}
static void
gst_streamsync_pad_dispose (GObject * object)
{
GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (object);
if (GST_PAD_DIRECTION (spad) == GST_PAD_SINK)
gst_clear_object (&spad->pad);
else
g_weak_ref_clear (&spad->otherpad);
g_clear_pointer (&spad->stream, gst_syncstream_unref);
G_OBJECT_CLASS (gst_streamsync_pad_parent_class)->dispose (object);
}
static GstPad *
gst_streamsync_pad_new_from_template (GstPadTemplate * templ,
const gchar * name)
{
g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL);
return GST_PAD_CAST (g_object_new (GST_TYPE_STREAMSYNC_PAD,
"name", name, "direction", templ->direction, "template", templ,
NULL));
}
static GstPad *
gst_streamsync_pad_new_from_static_template (GstStaticPadTemplate * templ,
const gchar * name)
{
GstPad *pad;
GstPadTemplate *template;
template = gst_static_pad_template_get (templ);
pad = gst_streamsync_pad_new_from_template (template, name);
gst_object_unref (template);
return pad;
}
static GstSyncStream *
gst_streamsync_pad_get_stream (GstPad * pad)
{
GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
return gst_syncstream_ref (spad->stream);
} }
static GstPad * static GstPad *
gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad) gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad)
{ {
GstSyncStream *stream; GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad);
GstPad *opad = NULL; GstPad *opad = NULL;
GST_STREAM_SYNCHRONIZER_LOCK (self); if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK)
stream = gst_pad_get_element_private (pad); opad = gst_object_ref (spad->pad);
if (!stream) else
goto out; opad = g_weak_ref_get (&spad->otherpad);
opad = gst_stream_get_other_pad (stream, pad);
out:
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
if (!opad) if (!opad)
GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing"); GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing");
@ -163,9 +275,9 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent,
gst_event_unref (event); gst_event_unref (event);
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream) running_time_diff = stream->segment.base;
running_time_diff = stream->segment.base; gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
if (running_time_diff == -1) { if (running_time_diff == -1) {
@ -214,12 +326,9 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
gboolean ret = FALSE; gboolean ret = FALSE;
GstSyncStream *stream; GstSyncStream *stream;
stream = gst_streamsync_pad_get_stream (pad);
while (!self->eos && !self->flushing) { while (!self->eos && !self->flushing) {
stream = gst_pad_get_element_private (pad);
if (!stream) {
GST_WARNING_OBJECT (pad, "unknown stream");
return ret;
}
if (stream->flushing) { if (stream->flushing) {
GST_DEBUG_OBJECT (pad, "Flushing"); GST_DEBUG_OBJECT (pad, "Flushing");
break; break;
@ -250,6 +359,7 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
ret = gst_pad_push_event (pad, event); ret = gst_pad_push_event (pad, event);
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
if (!ret) { if (!ret) {
gst_syncstream_unref (stream);
return ret; return ret;
} }
stream->send_gap_event = FALSE; stream->send_gap_event = FALSE;
@ -262,6 +372,7 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad)
g_cond_wait (&stream->stream_finish_cond, &self->lock); g_cond_wait (&stream->stream_finish_cond, &self->lock);
} }
gst_syncstream_unref (stream);
return TRUE; return TRUE;
} }
@ -293,13 +404,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
self->have_group_id &= have_group_id; self->have_group_id &= have_group_id;
have_group_id = self->have_group_id; have_group_id = self->have_group_id;
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (!stream) {
GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
gst_event_parse_stream_flags (event, &stream->flags); gst_event_parse_stream_flags (event, &stream->flags);
@ -418,6 +523,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
} }
} }
gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break; break;
} }
@ -437,8 +543,8 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
goto done; goto done;
} }
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream && segment.format == GST_FORMAT_TIME) { if (segment.format == GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (pad, GST_DEBUG_OBJECT (pad,
"New stream, updating base from %" GST_TIME_FORMAT " to %" "New stream, updating base from %" GST_TIME_FORMAT " to %"
GST_TIME_FORMAT, GST_TIME_ARGS (segment.base), GST_TIME_FORMAT, GST_TIME_ARGS (segment.base),
@ -467,6 +573,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
gst_format_get_name (segment.format)); gst_format_get_name (segment.format));
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
} }
gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break; break;
} }
@ -474,13 +581,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
GstSyncStream *stream; GstSyncStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
self->eos = FALSE; self->eos = FALSE;
if (stream) { GST_DEBUG_OBJECT (pad, "Flushing streams");
GST_DEBUG_OBJECT (pad, "Flushing streams"); stream->flushing = TRUE;
stream->flushing = TRUE; g_cond_broadcast (&stream->stream_finish_cond);
g_cond_broadcast (&stream->stream_finish_cond); gst_syncstream_unref (stream);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break; break;
} }
@ -490,18 +596,16 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
GstClockTime new_group_start_time = 0; GstClockTime new_group_start_time = 0;
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream) { GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d",
GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d", stream->stream_number);
stream->stream_number); gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
stream->is_eos = FALSE; stream->is_eos = FALSE;
stream->eos_sent = FALSE; stream->eos_sent = FALSE;
stream->flushing = FALSE; stream->flushing = FALSE;
stream->wait = FALSE; stream->wait = FALSE;
g_cond_broadcast (&stream->stream_finish_cond); g_cond_broadcast (&stream->stream_finish_cond);
}
for (l = self->streams; l; l = l->next) { for (l = self->streams; l; l = l->next) {
GstSyncStream *ostream = l->data; GstSyncStream *ostream = l->data;
@ -529,6 +633,8 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time), GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time),
GST_TIME_ARGS (new_group_start_time)); GST_TIME_ARGS (new_group_start_time));
self->group_start_time = new_group_start_time; self->group_start_time = new_group_start_time;
gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break; break;
} }
@ -540,13 +646,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
GstSyncStream *stream; GstSyncStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream) { stream->is_eos = FALSE;
stream->is_eos = FALSE; stream->eos_sent = FALSE;
stream->eos_sent = FALSE; stream->wait = FALSE;
stream->wait = FALSE; g_cond_broadcast (&stream->stream_finish_cond);
g_cond_broadcast (&stream->stream_finish_cond); gst_syncstream_unref (stream);
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
} }
break; break;
@ -562,12 +667,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
guint32 seqnum; guint32 seqnum;
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (!stream) {
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
GST_WARNING_OBJECT (pad, "EOS for unknown stream");
break;
}
GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number); GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number);
stream->is_eos = TRUE; stream->is_eos = TRUE;
@ -612,11 +712,9 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
epad = pads; epad = pads;
while (epad) { while (epad) {
pad = epad->data; pad = epad->data;
ostream = gst_pad_get_element_private (pad); ostream = gst_streamsync_pad_get_stream (pad);
if (ostream) { g_cond_broadcast (&ostream->stream_finish_cond);
g_cond_broadcast (&ostream->stream_finish_cond); gst_syncstream_unref (ostream);
}
gst_object_unref (pad); gst_object_unref (pad);
epad = g_slist_next (epad); epad = g_slist_next (epad);
} }
@ -645,14 +743,14 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
ret = gst_pad_push_event (srcpad, topush); ret = gst_pad_push_event (srcpad, topush);
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream) { stream->eos_sent = TRUE;
stream->eos_sent = TRUE; gst_syncstream_unref (stream);
}
} }
gst_object_unref (srcpad); gst_object_unref (srcpad);
gst_event_unref (event); gst_event_unref (event);
gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
goto done; goto done;
} }
@ -694,21 +792,21 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
timestamp_end = timestamp + duration; timestamp_end = timestamp + duration;
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream) { stream->seen_data = TRUE;
stream->seen_data = TRUE; if (stream->segment.format == GST_FORMAT_TIME
if (stream->segment.format == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (timestamp)) {
&& GST_CLOCK_TIME_IS_VALID (timestamp)) { GST_LOG_OBJECT (pad,
GST_LOG_OBJECT (pad, "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT,
"Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp));
GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp)); if (stream->segment.rate > 0.0)
if (stream->segment.rate > 0.0) stream->segment.position = timestamp;
stream->segment.position = timestamp; else
else stream->segment.position = timestamp_end;
stream->segment.position = timestamp_end;
}
} }
gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
opad = gst_stream_get_other_pad_from_pad (self, pad); opad = gst_stream_get_other_pad_from_pad (self, pad);
@ -722,8 +820,8 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
GList *l; GList *l;
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream && stream->segment.format == GST_FORMAT_TIME) { if (stream->segment.format == GST_FORMAT_TIME) {
GstClockTime position; GstClockTime position;
if (stream->segment.rate > 0.0) if (stream->segment.rate > 0.0)
@ -778,39 +876,40 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent,
g_cond_broadcast (&ostream->stream_finish_cond); g_cond_broadcast (&ostream->stream_finish_cond);
} }
} }
gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
} }
return ret; return ret;
} }
/* GstElement vfuncs */ /* Must be called with lock! */
static GstPad * static GstPad *
gst_stream_synchronizer_request_new_pad (GstElement * element, gst_stream_synchronizer_new_pad (GstStreamSynchronizer * sync)
GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
{ {
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element); GstSyncStream *stream = NULL;
GstSyncStream *stream; GstStreamSyncPad *sinkpad, *srcpad;
gchar *tmp; gchar *tmp;
GST_STREAM_SYNCHRONIZER_LOCK (self);
GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
self->current_stream_number);
stream = g_slice_new0 (GstSyncStream); stream = g_slice_new0 (GstSyncStream);
stream->transform = self; stream->transform = sync;
stream->stream_number = self->current_stream_number; stream->stream_number = sync->current_stream_number;
g_cond_init (&stream->stream_finish_cond); g_cond_init (&stream->stream_finish_cond);
stream->stream_start_seqnum = G_MAXUINT32; stream->stream_start_seqnum = G_MAXUINT32;
stream->segment_seqnum = G_MAXUINT32; stream->segment_seqnum = G_MAXUINT32;
stream->group_id = G_MAXUINT; stream->group_id = G_MAXUINT;
stream->seen_data = FALSE; stream->seen_data = FALSE;
stream->send_gap_event = FALSE; stream->send_gap_event = FALSE;
stream->refcount = 1;
tmp = g_strdup_printf ("sink_%u", self->current_stream_number); tmp = g_strdup_printf ("sink_%u", sync->current_stream_number);
stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp); stream->sinkpad =
gst_streamsync_pad_new_from_static_template (&sinktemplate, tmp);
g_free (tmp); g_free (tmp);
gst_pad_set_element_private (stream->sinkpad, stream);
GST_STREAMSYNC_PAD_CAST (stream->sinkpad)->stream =
gst_syncstream_ref (stream);
gst_pad_set_iterate_internal_links_function (stream->sinkpad, gst_pad_set_iterate_internal_links_function (stream->sinkpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links)); GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
gst_pad_set_event_function (stream->sinkpad, gst_pad_set_event_function (stream->sinkpad,
@ -821,10 +920,24 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad); GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad);
GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad); GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad);
tmp = g_strdup_printf ("src_%u", self->current_stream_number); tmp = g_strdup_printf ("src_%u", sync->current_stream_number);
stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp); stream->srcpad =
gst_streamsync_pad_new_from_static_template (&srctemplate, tmp);
g_free (tmp); g_free (tmp);
gst_pad_set_element_private (stream->srcpad, stream);
GST_STREAMSYNC_PAD_CAST (stream->srcpad)->stream =
gst_syncstream_ref (stream);
sinkpad = GST_STREAMSYNC_PAD_CAST (stream->sinkpad);
srcpad = GST_STREAMSYNC_PAD_CAST (stream->srcpad);
/* Hold a strong reference from the sink (request pad) to the src to
* ensure a predicatable destruction order */
sinkpad->pad = gst_object_ref (srcpad);
/* And a weak reference from the src to the sink, to know when pad
* release is occuring, and to ensure we do not try and take
* references to inactive / destructing streams. */
g_weak_ref_init (&srcpad->otherpad, stream->sinkpad);
gst_pad_set_iterate_internal_links_function (stream->srcpad, gst_pad_set_iterate_internal_links_function (stream->srcpad,
GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links)); GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links));
gst_pad_set_event_function (stream->srcpad, gst_pad_set_event_function (stream->srcpad,
@ -835,21 +948,43 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED);
self->streams = g_list_prepend (self->streams, stream); GST_STREAM_SYNCHRONIZER_UNLOCK (sync);
self->current_stream_number++;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
/* Add pads and activate unless we're going to NULL */ /* Add pads and activate unless we're going to NULL */
g_rec_mutex_lock (GST_STATE_GET_LOCK (self)); g_rec_mutex_lock (GST_STATE_GET_LOCK (sync));
if (GST_STATE_TARGET (self) != GST_STATE_NULL) { if (GST_STATE_TARGET (sync) != GST_STATE_NULL) {
gst_pad_set_active (stream->srcpad, TRUE); gst_pad_set_active (stream->srcpad, TRUE);
gst_pad_set_active (stream->sinkpad, TRUE); gst_pad_set_active (stream->sinkpad, TRUE);
} }
gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad); gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->srcpad);
gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad); gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->sinkpad);
g_rec_mutex_unlock (GST_STATE_GET_LOCK (self)); g_rec_mutex_unlock (GST_STATE_GET_LOCK (sync));
return stream->sinkpad; GST_STREAM_SYNCHRONIZER_LOCK (sync);
sync->streams = g_list_prepend (sync->streams, g_steal_pointer (&stream));
sync->current_stream_number++;
return GST_PAD_CAST (sinkpad);
}
/* GstElement vfuncs */
static GstPad *
gst_stream_synchronizer_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * name, const GstCaps * caps)
{
GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element);
GstPad *request_pad;
GST_STREAM_SYNCHRONIZER_LOCK (self);
GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d",
self->current_stream_number);
request_pad = gst_stream_synchronizer_new_pad (self);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
return request_pad;
} }
/* Must be called with lock! */ /* Must be called with lock! */
@ -878,15 +1013,15 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
* (due to reverse lock order) when deactivating pads */ * (due to reverse lock order) when deactivating pads */
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
gst_pad_set_element_private (stream->srcpad, NULL);
gst_pad_set_element_private (stream->sinkpad, NULL);
gst_pad_set_active (stream->srcpad, FALSE); gst_pad_set_active (stream->srcpad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad); gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad);
gst_pad_set_active (stream->sinkpad, FALSE); gst_pad_set_active (stream->sinkpad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad); gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad);
g_cond_clear (&stream->stream_finish_cond); g_cond_clear (&stream->stream_finish_cond);
g_slice_free (GstSyncStream, stream);
/* Release the ref maintaining validity in the streams list */
gst_syncstream_unref (stream);
/* NOTE: In theory we have to check here if all streams /* NOTE: In theory we have to check here if all streams
* are EOS but the one that was removed wasn't and then * are EOS but the one that was removed wasn't and then
@ -908,12 +1043,12 @@ gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad)
GstSyncStream *stream; GstSyncStream *stream;
GST_STREAM_SYNCHRONIZER_LOCK (self); GST_STREAM_SYNCHRONIZER_LOCK (self);
stream = gst_pad_get_element_private (pad); stream = gst_streamsync_pad_get_stream (pad);
if (stream) { g_assert (stream->sinkpad == pad);
g_assert (stream->sinkpad == pad);
gst_stream_synchronizer_release_stream (self, stream); gst_stream_synchronizer_release_stream (self, stream);
}
gst_syncstream_unref (stream);
GST_STREAM_SYNCHRONIZER_UNLOCK (self); GST_STREAM_SYNCHRONIZER_UNLOCK (self);
} }