diff --git a/gst/playback/gststreamsynchronizer.c b/gst/playback/gststreamsynchronizer.c index 8380fa418a..41836f79b7 100644 --- a/gst/playback/gststreamsynchronizer.c +++ b/gst/playback/gststreamsynchronizer.c @@ -81,35 +81,147 @@ typedef struct guint32 stream_start_seqnum; guint32 segment_seqnum; guint group_id; + + gint refcount; } GstSyncStream; -/* Must be called with lock! */ -static inline GstPad * -gst_stream_get_other_pad (GstSyncStream * stream, GstPad * pad) +static GstSyncStream * +gst_syncstream_ref (GstSyncStream * stream) { - if (stream->sinkpad == pad) - return gst_object_ref (stream->srcpad); - if (stream->srcpad == pad) - return gst_object_ref (stream->sinkpad); + g_return_val_if_fail (stream != NULL, NULL); + g_atomic_int_add (&stream->refcount, 1); + return stream; +} - return NULL; +static void +gst_syncstream_unref (GstSyncStream * stream) +{ + g_return_if_fail (stream != NULL); + g_return_if_fail (stream->refcount > 0); + + if (g_atomic_int_dec_and_test (&stream->refcount)) + g_slice_free (GstSyncStream, stream); +} + +G_BEGIN_DECLS +#define GST_TYPE_STREAMSYNC_PAD (gst_streamsync_pad_get_type ()) +#define GST_IS_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_STREAMSYNC_PAD)) +#define GST_IS_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_STREAMSYNC_PAD)) +#define GST_STREAMSYNC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPad)) +#define GST_STREAMSYNC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_STREAMSYNC_PAD, GstStreamSyncPadClass)) +typedef struct _GstStreamSyncPad GstStreamSyncPad; +typedef struct _GstStreamSyncPadClass GstStreamSyncPadClass; + +struct _GstStreamSyncPad +{ + GstPad parent; + + GstSyncStream *stream; + + /* Since we need to access data associated with a pad in this + * element, it's important to manage the respective lifetimes of the + * stored pad data and the pads themselves. Pad deactivation happens + * without mutual exclusion to the use of pad data in this element. + * + * The approach here is to have the sinkpad (the request pad) hold a + * strong reference onto the srcpad (so that it stays alive until + * the last pad is destroyed). Similarly the srcpad has a weak + * reference to the sinkpad (request pad) to ensure it knows when + * the pads are destroyed, since the pad data may be requested from + * either the srcpad or the sinkpad. This avoids a nasty set of + * potential race conditions. + * + * The code is arranged so that in the srcpad, the pad pointer is + * always NULL (not used) and in the sinkpad, the otherpad is always + * NULL. */ + GstPad *pad; + GWeakRef otherpad; +}; + +struct _GstStreamSyncPadClass +{ + GstPadClass parent_class; +}; + +static GType gst_streamsync_pad_get_type (void); +static GstSyncStream *gst_streamsync_pad_get_stream (GstPad * pad); + +G_END_DECLS +#define GST_STREAMSYNC_PAD_CAST(obj) ((GstStreamSyncPad *)obj) + G_DEFINE_TYPE (GstStreamSyncPad, gst_streamsync_pad, GST_TYPE_PAD); + +static void gst_streamsync_pad_dispose (GObject * object); + +static void +gst_streamsync_pad_class_init (GstStreamSyncPadClass * klass) +{ + GObjectClass *gobject_class; + gobject_class = G_OBJECT_CLASS (klass); + gobject_class->dispose = gst_streamsync_pad_dispose; +} + +static void +gst_streamsync_pad_init (GstStreamSyncPad * ppad) +{ +} + +static void +gst_streamsync_pad_dispose (GObject * object) +{ + GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (object); + + if (GST_PAD_DIRECTION (spad) == GST_PAD_SINK) + gst_clear_object (&spad->pad); + else + g_weak_ref_clear (&spad->otherpad); + + g_clear_pointer (&spad->stream, gst_syncstream_unref); + + G_OBJECT_CLASS (gst_streamsync_pad_parent_class)->dispose (object); +} + +static GstPad * +gst_streamsync_pad_new_from_template (GstPadTemplate * templ, + const gchar * name) +{ + g_return_val_if_fail (GST_IS_PAD_TEMPLATE (templ), NULL); + + return GST_PAD_CAST (g_object_new (GST_TYPE_STREAMSYNC_PAD, + "name", name, "direction", templ->direction, "template", templ, + NULL)); +} + +static GstPad * +gst_streamsync_pad_new_from_static_template (GstStaticPadTemplate * templ, + const gchar * name) +{ + GstPad *pad; + GstPadTemplate *template; + + template = gst_static_pad_template_get (templ); + pad = gst_streamsync_pad_new_from_template (template, name); + gst_object_unref (template); + + return pad; +} + +static GstSyncStream * +gst_streamsync_pad_get_stream (GstPad * pad) +{ + GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad); + return gst_syncstream_ref (spad->stream); } static GstPad * gst_stream_get_other_pad_from_pad (GstStreamSynchronizer * self, GstPad * pad) { - GstSyncStream *stream; + GstStreamSyncPad *spad = GST_STREAMSYNC_PAD_CAST (pad); GstPad *opad = NULL; - GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (!stream) - goto out; - - opad = gst_stream_get_other_pad (stream, pad); - -out: - GST_STREAM_SYNCHRONIZER_UNLOCK (self); + if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK) + opad = gst_object_ref (spad->pad); + else + opad = g_weak_ref_get (&spad->otherpad); if (!opad) GST_WARNING_OBJECT (pad, "Trying to get other pad after releasing"); @@ -163,9 +275,9 @@ gst_stream_synchronizer_src_event (GstPad * pad, GstObject * parent, gst_event_unref (event); GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (stream) - running_time_diff = stream->segment.base; + stream = gst_streamsync_pad_get_stream (pad); + running_time_diff = stream->segment.base; + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); if (running_time_diff == -1) { @@ -214,12 +326,9 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad) gboolean ret = FALSE; GstSyncStream *stream; + stream = gst_streamsync_pad_get_stream (pad); + while (!self->eos && !self->flushing) { - stream = gst_pad_get_element_private (pad); - if (!stream) { - GST_WARNING_OBJECT (pad, "unknown stream"); - return ret; - } if (stream->flushing) { GST_DEBUG_OBJECT (pad, "Flushing"); break; @@ -250,6 +359,7 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad) ret = gst_pad_push_event (pad, event); GST_STREAM_SYNCHRONIZER_LOCK (self); if (!ret) { + gst_syncstream_unref (stream); return ret; } stream->send_gap_event = FALSE; @@ -262,6 +372,7 @@ gst_stream_synchronizer_wait (GstStreamSynchronizer * self, GstPad * pad) g_cond_wait (&stream->stream_finish_cond, &self->lock); } + gst_syncstream_unref (stream); return TRUE; } @@ -293,13 +404,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, self->have_group_id &= have_group_id; have_group_id = self->have_group_id; - stream = gst_pad_get_element_private (pad); - - if (!stream) { - GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source"); - GST_STREAM_SYNCHRONIZER_UNLOCK (self); - break; - } + stream = gst_streamsync_pad_get_stream (pad); gst_event_parse_stream_flags (event, &stream->flags); @@ -418,6 +523,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, } } + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; } @@ -437,8 +543,8 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, goto done; } - stream = gst_pad_get_element_private (pad); - if (stream && segment.format == GST_FORMAT_TIME) { + stream = gst_streamsync_pad_get_stream (pad); + if (segment.format == GST_FORMAT_TIME) { GST_DEBUG_OBJECT (pad, "New stream, updating base from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (segment.base), @@ -467,6 +573,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, gst_format_get_name (segment.format)); gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); } + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; } @@ -474,13 +581,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, GstSyncStream *stream; GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); + stream = gst_streamsync_pad_get_stream (pad); self->eos = FALSE; - if (stream) { - GST_DEBUG_OBJECT (pad, "Flushing streams"); - stream->flushing = TRUE; - g_cond_broadcast (&stream->stream_finish_cond); - } + GST_DEBUG_OBJECT (pad, "Flushing streams"); + stream->flushing = TRUE; + g_cond_broadcast (&stream->stream_finish_cond); + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; } @@ -490,18 +596,16 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, GstClockTime new_group_start_time = 0; GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (stream) { - GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d", - stream->stream_number); - gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); + stream = gst_streamsync_pad_get_stream (pad); + GST_DEBUG_OBJECT (pad, "Resetting segment for stream %d", + stream->stream_number); + gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); - stream->is_eos = FALSE; - stream->eos_sent = FALSE; - stream->flushing = FALSE; - stream->wait = FALSE; - g_cond_broadcast (&stream->stream_finish_cond); - } + stream->is_eos = FALSE; + stream->eos_sent = FALSE; + stream->flushing = FALSE; + stream->wait = FALSE; + g_cond_broadcast (&stream->stream_finish_cond); for (l = self->streams; l; l = l->next) { GstSyncStream *ostream = l->data; @@ -529,6 +633,8 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, GST_TIME_FORMAT, GST_TIME_ARGS (self->group_start_time), GST_TIME_ARGS (new_group_start_time)); self->group_start_time = new_group_start_time; + + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); break; } @@ -540,13 +646,12 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, GstSyncStream *stream; GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (stream) { - stream->is_eos = FALSE; - stream->eos_sent = FALSE; - stream->wait = FALSE; - g_cond_broadcast (&stream->stream_finish_cond); - } + stream = gst_streamsync_pad_get_stream (pad); + stream->is_eos = FALSE; + stream->eos_sent = FALSE; + stream->wait = FALSE; + g_cond_broadcast (&stream->stream_finish_cond); + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); } break; @@ -562,12 +667,7 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, guint32 seqnum; GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (!stream) { - GST_STREAM_SYNCHRONIZER_UNLOCK (self); - GST_WARNING_OBJECT (pad, "EOS for unknown stream"); - break; - } + stream = gst_streamsync_pad_get_stream (pad); GST_DEBUG_OBJECT (pad, "Have EOS for stream %d", stream->stream_number); stream->is_eos = TRUE; @@ -612,11 +712,9 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, epad = pads; while (epad) { pad = epad->data; - ostream = gst_pad_get_element_private (pad); - if (ostream) { - g_cond_broadcast (&ostream->stream_finish_cond); - } - + ostream = gst_streamsync_pad_get_stream (pad); + g_cond_broadcast (&ostream->stream_finish_cond); + gst_syncstream_unref (ostream); gst_object_unref (pad); epad = g_slist_next (epad); } @@ -645,14 +743,14 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent, GST_STREAM_SYNCHRONIZER_UNLOCK (self); ret = gst_pad_push_event (srcpad, topush); GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (stream) { - stream->eos_sent = TRUE; - } + stream = gst_streamsync_pad_get_stream (pad); + stream->eos_sent = TRUE; + gst_syncstream_unref (stream); } gst_object_unref (srcpad); gst_event_unref (event); + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); goto done; } @@ -694,21 +792,21 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent, timestamp_end = timestamp + duration; GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); + stream = gst_streamsync_pad_get_stream (pad); - if (stream) { - stream->seen_data = TRUE; - if (stream->segment.format == GST_FORMAT_TIME - && GST_CLOCK_TIME_IS_VALID (timestamp)) { - GST_LOG_OBJECT (pad, - "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, - GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp)); - if (stream->segment.rate > 0.0) - stream->segment.position = timestamp; - else - stream->segment.position = timestamp_end; - } + stream->seen_data = TRUE; + if (stream->segment.format == GST_FORMAT_TIME + && GST_CLOCK_TIME_IS_VALID (timestamp)) { + GST_LOG_OBJECT (pad, + "Updating position from %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, + GST_TIME_ARGS (stream->segment.position), GST_TIME_ARGS (timestamp)); + if (stream->segment.rate > 0.0) + stream->segment.position = timestamp; + else + stream->segment.position = timestamp_end; } + + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); opad = gst_stream_get_other_pad_from_pad (self, pad); @@ -722,8 +820,8 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent, GList *l; GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (stream && stream->segment.format == GST_FORMAT_TIME) { + stream = gst_streamsync_pad_get_stream (pad); + if (stream->segment.format == GST_FORMAT_TIME) { GstClockTime position; if (stream->segment.rate > 0.0) @@ -778,39 +876,40 @@ gst_stream_synchronizer_sink_chain (GstPad * pad, GstObject * parent, g_cond_broadcast (&ostream->stream_finish_cond); } } + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); } return ret; } -/* GstElement vfuncs */ +/* Must be called with lock! */ static GstPad * -gst_stream_synchronizer_request_new_pad (GstElement * element, - GstPadTemplate * temp, const gchar * name, const GstCaps * caps) +gst_stream_synchronizer_new_pad (GstStreamSynchronizer * sync) { - GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element); - GstSyncStream *stream; + GstSyncStream *stream = NULL; + GstStreamSyncPad *sinkpad, *srcpad; gchar *tmp; - GST_STREAM_SYNCHRONIZER_LOCK (self); - GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d", - self->current_stream_number); - stream = g_slice_new0 (GstSyncStream); - stream->transform = self; - stream->stream_number = self->current_stream_number; + stream->transform = sync; + stream->stream_number = sync->current_stream_number; g_cond_init (&stream->stream_finish_cond); stream->stream_start_seqnum = G_MAXUINT32; stream->segment_seqnum = G_MAXUINT32; stream->group_id = G_MAXUINT; stream->seen_data = FALSE; stream->send_gap_event = FALSE; + stream->refcount = 1; - tmp = g_strdup_printf ("sink_%u", self->current_stream_number); - stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp); + tmp = g_strdup_printf ("sink_%u", sync->current_stream_number); + stream->sinkpad = + gst_streamsync_pad_new_from_static_template (&sinktemplate, tmp); g_free (tmp); - gst_pad_set_element_private (stream->sinkpad, stream); + + GST_STREAMSYNC_PAD_CAST (stream->sinkpad)->stream = + gst_syncstream_ref (stream); + gst_pad_set_iterate_internal_links_function (stream->sinkpad, GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links)); gst_pad_set_event_function (stream->sinkpad, @@ -821,10 +920,24 @@ gst_stream_synchronizer_request_new_pad (GstElement * element, GST_PAD_SET_PROXY_ALLOCATION (stream->sinkpad); GST_PAD_SET_PROXY_SCHEDULING (stream->sinkpad); - tmp = g_strdup_printf ("src_%u", self->current_stream_number); - stream->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp); + tmp = g_strdup_printf ("src_%u", sync->current_stream_number); + stream->srcpad = + gst_streamsync_pad_new_from_static_template (&srctemplate, tmp); g_free (tmp); - gst_pad_set_element_private (stream->srcpad, stream); + + GST_STREAMSYNC_PAD_CAST (stream->srcpad)->stream = + gst_syncstream_ref (stream); + + sinkpad = GST_STREAMSYNC_PAD_CAST (stream->sinkpad); + srcpad = GST_STREAMSYNC_PAD_CAST (stream->srcpad); + /* Hold a strong reference from the sink (request pad) to the src to + * ensure a predicatable destruction order */ + sinkpad->pad = gst_object_ref (srcpad); + /* And a weak reference from the src to the sink, to know when pad + * release is occuring, and to ensure we do not try and take + * references to inactive / destructing streams. */ + g_weak_ref_init (&srcpad->otherpad, stream->sinkpad); + gst_pad_set_iterate_internal_links_function (stream->srcpad, GST_DEBUG_FUNCPTR (gst_stream_synchronizer_iterate_internal_links)); gst_pad_set_event_function (stream->srcpad, @@ -835,21 +948,43 @@ gst_stream_synchronizer_request_new_pad (GstElement * element, gst_segment_init (&stream->segment, GST_FORMAT_UNDEFINED); - self->streams = g_list_prepend (self->streams, stream); - self->current_stream_number++; - GST_STREAM_SYNCHRONIZER_UNLOCK (self); + GST_STREAM_SYNCHRONIZER_UNLOCK (sync); /* Add pads and activate unless we're going to NULL */ - g_rec_mutex_lock (GST_STATE_GET_LOCK (self)); - if (GST_STATE_TARGET (self) != GST_STATE_NULL) { + g_rec_mutex_lock (GST_STATE_GET_LOCK (sync)); + if (GST_STATE_TARGET (sync) != GST_STATE_NULL) { gst_pad_set_active (stream->srcpad, TRUE); gst_pad_set_active (stream->sinkpad, TRUE); } - gst_element_add_pad (GST_ELEMENT_CAST (self), stream->srcpad); - gst_element_add_pad (GST_ELEMENT_CAST (self), stream->sinkpad); - g_rec_mutex_unlock (GST_STATE_GET_LOCK (self)); + gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->srcpad); + gst_element_add_pad (GST_ELEMENT_CAST (sync), stream->sinkpad); + g_rec_mutex_unlock (GST_STATE_GET_LOCK (sync)); - return stream->sinkpad; + GST_STREAM_SYNCHRONIZER_LOCK (sync); + + sync->streams = g_list_prepend (sync->streams, g_steal_pointer (&stream)); + sync->current_stream_number++; + + return GST_PAD_CAST (sinkpad); +} + +/* GstElement vfuncs */ +static GstPad * +gst_stream_synchronizer_request_new_pad (GstElement * element, + GstPadTemplate * temp, const gchar * name, const GstCaps * caps) +{ + GstStreamSynchronizer *self = GST_STREAM_SYNCHRONIZER (element); + GstPad *request_pad; + + GST_STREAM_SYNCHRONIZER_LOCK (self); + GST_DEBUG_OBJECT (self, "Requesting new pad for stream %d", + self->current_stream_number); + + request_pad = gst_stream_synchronizer_new_pad (self); + + GST_STREAM_SYNCHRONIZER_UNLOCK (self); + + return request_pad; } /* Must be called with lock! */ @@ -878,15 +1013,15 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self, * (due to reverse lock order) when deactivating pads */ GST_STREAM_SYNCHRONIZER_UNLOCK (self); - gst_pad_set_element_private (stream->srcpad, NULL); - gst_pad_set_element_private (stream->sinkpad, NULL); gst_pad_set_active (stream->srcpad, FALSE); gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->srcpad); gst_pad_set_active (stream->sinkpad, FALSE); gst_element_remove_pad (GST_ELEMENT_CAST (self), stream->sinkpad); g_cond_clear (&stream->stream_finish_cond); - g_slice_free (GstSyncStream, stream); + + /* Release the ref maintaining validity in the streams list */ + gst_syncstream_unref (stream); /* NOTE: In theory we have to check here if all streams * are EOS but the one that was removed wasn't and then @@ -908,12 +1043,12 @@ gst_stream_synchronizer_release_pad (GstElement * element, GstPad * pad) GstSyncStream *stream; GST_STREAM_SYNCHRONIZER_LOCK (self); - stream = gst_pad_get_element_private (pad); - if (stream) { - g_assert (stream->sinkpad == pad); + stream = gst_streamsync_pad_get_stream (pad); + g_assert (stream->sinkpad == pad); - gst_stream_synchronizer_release_stream (self, stream); - } + gst_stream_synchronizer_release_stream (self, stream); + + gst_syncstream_unref (stream); GST_STREAM_SYNCHRONIZER_UNLOCK (self); }