diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index ae61a5fea4..ce648cd14f 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -198,6 +198,11 @@ struct _GstAdaptiveDemuxPrivate GCond manifest_cond; GMutex manifest_update_lock; + /* Lock and condition for prerolling streams before exposing */ + GMutex preroll_lock; + GCond preroll_cond; + gint preroll_pending; + GMutex api_lock; /* Protects demux and stream segment information @@ -242,8 +247,9 @@ static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux); static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream); static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux); -static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux, +static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux, gboolean first_and_live); +static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux); static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux); static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags, @@ -270,7 +276,8 @@ static GstFlowReturn gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream, GstEvent * event); -static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux); +static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux, + gboolean start_preroll_streams); static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux); static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux); @@ -483,6 +490,9 @@ gst_adaptive_demux_init (GstAdaptiveDemux * demux, g_mutex_init (&demux->priv->api_lock); g_mutex_init (&demux->priv->segment_lock); + g_cond_init (&demux->priv->preroll_cond); + g_mutex_init (&demux->priv->preroll_lock); + pad_template = gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink"); g_return_if_fail (pad_template != NULL); @@ -525,6 +535,9 @@ gst_adaptive_demux_finalize (GObject * object) demux->realtime_clock = NULL; } + g_cond_clear (&demux->priv->preroll_cond); + g_mutex_clear (&demux->priv->preroll_lock); + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -684,9 +697,9 @@ gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent, } if (demux->next_streams) { - gst_adaptive_demux_expose_streams (demux, + gst_adaptive_demux_prepare_streams (demux, gst_adaptive_demux_is_live (demux)); - gst_adaptive_demux_start_tasks (demux); + gst_adaptive_demux_start_tasks (demux, TRUE); if (gst_adaptive_demux_is_live (demux)) { g_mutex_lock (&demux->priv->updates_timed_lock); demux->priv->stop_updates_task = FALSE; @@ -775,6 +788,16 @@ gst_adaptive_demux_reset (GstAdaptiveDemux * demux) gst_event_unref (eos); g_list_free (demux->streams); demux->streams = NULL; + if (demux->prepared_streams) { + g_list_free_full (demux->prepared_streams, + (GDestroyNotify) gst_adaptive_demux_stream_free); + demux->prepared_streams = NULL; + } + if (demux->next_streams) { + g_list_free_full (demux->next_streams, + (GDestroyNotify) gst_adaptive_demux_stream_free); + demux->next_streams = NULL; + } if (old_streams) { g_list_free_full (old_streams, @@ -872,10 +895,9 @@ gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux, /* must be called with manifest_lock taken */ static gboolean -gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux, +gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream) { - gboolean ret; GstPad *pad = stream->pad; gchar *name = gst_pad_get_name (pad); GstEvent *event; @@ -907,16 +929,33 @@ gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux, g_free (stream_id); g_free (name); - GST_DEBUG_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT, - GST_DEBUG_PAD_NAME (pad), stream->pending_caps); + GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + stream->discont = TRUE; + + return TRUE; +} + +static gboolean +gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream) +{ + gboolean ret; + GstPad *pad = stream->pad; + GstCaps *caps; if (stream->pending_caps) { gst_pad_set_caps (pad, stream->pending_caps); - gst_caps_unref (stream->pending_caps); + caps = stream->pending_caps; stream->pending_caps = NULL; + } else { + caps = gst_pad_get_current_caps (pad); } - stream->discont = TRUE; + GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT, + GST_DEBUG_PAD_NAME (pad), caps); + if (caps) + gst_caps_unref (caps); gst_object_ref (pad); @@ -959,17 +998,21 @@ gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux) /* must be called with manifest_lock taken */ static gboolean -gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux, +gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux, gboolean first_and_live) { GList *iter; - GList *old_streams; GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE; g_return_val_if_fail (demux->next_streams != NULL, FALSE); + if (demux->prepared_streams != NULL) { + /* Old streams that were never exposed, due to a seek or so */ + GST_FIXME_OBJECT (demux, + "Preparing new streams without cleaning up old ones!"); + return FALSE; + } - old_streams = demux->streams; - demux->streams = demux->next_streams; + demux->prepared_streams = demux->next_streams; demux->next_streams = NULL; if (!demux->running) { @@ -977,10 +1020,12 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux, return TRUE; } - for (iter = demux->streams; iter; iter = g_list_next (iter)) { + for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) { GstAdaptiveDemuxStream *stream = iter->data; - if (!gst_adaptive_demux_expose_stream (demux, + stream->do_block = TRUE; + + if (!gst_adaptive_demux_prepare_stream (demux, GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) { /* TODO act on error */ } @@ -1011,7 +1056,7 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux, GST_SEEK_TYPE_NONE, -1, NULL); } - for (iter = demux->streams; iter; iter = g_list_next (iter)) { + for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) { GstAdaptiveDemuxStream *stream = iter->data; GstClockTime offset; @@ -1091,8 +1136,36 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux, stream->pending_segment = gst_event_new_segment (&stream->segment); gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum); + + GST_DEBUG ("Prepared segment %" GST_SEGMENT_FORMAT " for stream %p", + &stream->segment, stream); } + return TRUE; +} + +static gboolean +gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux) +{ + GList *iter; + GList *old_streams; + + g_return_val_if_fail (demux->prepared_streams != NULL, FALSE); + + old_streams = demux->streams; + demux->streams = demux->prepared_streams; + demux->prepared_streams = NULL; + + for (iter = demux->streams; iter; iter = g_list_next (iter)) { + GstAdaptiveDemuxStream *stream = iter->data; + + if (!gst_adaptive_demux_expose_stream (demux, + GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) { + /* TODO act on error */ + } + } + demux->priv->preroll_pending = 0; + GST_MANIFEST_UNLOCK (demux); gst_element_no_more_pads (GST_ELEMENT_CAST (demux)); GST_MANIFEST_LOCK (demux); @@ -1142,6 +1215,14 @@ gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux, g_list_concat (demux->priv->old_streams, old_streams); } + /* Unblock after removing oldstreams */ + for (iter = demux->streams; iter; iter = g_list_next (iter)) { + GstAdaptiveDemuxStream *stream = iter->data; + stream->do_block = FALSE; + } + + GST_DEBUG_OBJECT (demux, "All streams are exposed"); + return TRUE; } @@ -1166,6 +1247,11 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad) g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS); gst_pad_set_element_private (pad, stream); + g_mutex_lock (&demux->priv->preroll_lock); + stream->do_block = TRUE; + demux->priv->preroll_pending++; + g_mutex_unlock (&demux->priv->preroll_lock); + gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query)); gst_pad_set_event_function (pad, @@ -1509,7 +1595,10 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad, } if (demux->next_streams) { - gst_adaptive_demux_expose_streams (demux, FALSE); + /* If the seek generated new streams, get them + * to preroll */ + gst_adaptive_demux_prepare_streams (demux, FALSE); + gst_adaptive_demux_start_tasks (demux, TRUE); } else { GList *iter; GstClockTime period_start = @@ -1540,10 +1629,11 @@ gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad, } GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux); + + /* Restart the demux */ + gst_adaptive_demux_start_tasks (demux, FALSE); } - /* Restart the demux */ - gst_adaptive_demux_start_tasks (demux); if (gst_adaptive_demux_is_live (demux)) { g_mutex_lock (&demux->priv->updates_timed_lock); demux->priv->stop_updates_task = FALSE; @@ -1730,7 +1820,8 @@ gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent, /* must be called with manifest_lock taken */ static void -gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux) +gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux, + gboolean start_preroll_streams) { GList *iter; @@ -1740,12 +1831,17 @@ gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux) } GST_INFO_OBJECT (demux, "Starting streams' tasks"); - for (iter = demux->streams; iter; iter = g_list_next (iter)) { + + iter = start_preroll_streams ? demux->prepared_streams : demux->streams; + + for (; iter; iter = g_list_next (iter)) { GstAdaptiveDemuxStream *stream = iter->data; - g_mutex_lock (&stream->fragment_download_lock); - stream->cancelled = FALSE; - g_mutex_unlock (&stream->fragment_download_lock); + if (!start_preroll_streams) { + g_mutex_lock (&stream->fragment_download_lock); + stream->cancelled = FALSE; + g_mutex_unlock (&stream->fragment_download_lock); + } stream->last_ret = GST_FLOW_OK; gst_task_start (stream->download_task); @@ -1770,8 +1866,6 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux) g_cond_signal (&demux->priv->updates_timed_cond); g_mutex_unlock (&demux->priv->updates_timed_lock); - gst_uri_downloader_cancel (demux->downloader); - GST_LOG_OBJECT (demux, "Stopping tasks"); for (iter = demux->streams; iter; iter = g_list_next (iter)) { @@ -1784,6 +1878,12 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux) g_mutex_unlock (&stream->fragment_download_lock); } + g_mutex_lock (&demux->priv->preroll_lock); + g_cond_broadcast (&demux->priv->preroll_cond); + g_mutex_unlock (&demux->priv->preroll_lock); + + gst_uri_downloader_cancel (demux->downloader); + g_mutex_lock (&demux->priv->manifest_update_lock); g_cond_broadcast (&demux->priv->manifest_cond); g_mutex_unlock (&demux->priv->manifest_update_lock); @@ -1973,6 +2073,21 @@ gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux) return GST_FLOW_OK; } +/* Called with preroll_lock */ +static void +gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux, + GstAdaptiveDemuxStream * stream) +{ + demux->priv->preroll_pending--; + if (demux->priv->preroll_pending == 0) { + /* That was the last one, time to release all streams + * and expose them */ + GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing"); + gst_adaptive_demux_expose_streams (demux); + g_cond_broadcast (&demux->priv->preroll_cond); + } +} + /* must be called with manifest_lock taken. * Temporarily releases manifest_lock */ @@ -2043,6 +2158,35 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream, gst_caps_unref (stream->pending_caps); stream->pending_caps = NULL; } + + if (stream->do_block) { + + g_mutex_lock (&demux->priv->preroll_lock); + + /* If we are preroll state, set caps in here */ + if (pending_caps) { + gst_pad_push_event (stream->pad, pending_caps); + pending_caps = NULL; + } + + gst_adaptive_demux_handle_preroll (demux, stream); + GST_MANIFEST_UNLOCK (demux); + + while (stream->do_block && !stream->cancelled) { + GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream); + g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock); + } + if (stream->cancelled) { + GST_LOG_OBJECT (demux, "stream %p cancelled", stream); + gst_buffer_unref (buffer); + g_mutex_unlock (&demux->priv->preroll_lock); + return GST_FLOW_FLUSHING; + } + + g_mutex_unlock (&demux->priv->preroll_lock); + GST_MANIFEST_LOCK (demux); + } + if (G_UNLIKELY (stream->pending_segment)) { GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux); pending_segment = stream->pending_segment; @@ -2098,6 +2242,8 @@ gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream, pending_events = g_list_delete_link (pending_events, pending_events); } + /* Wait for preroll if blocking */ + ret = gst_pad_push (stream->pad, buffer); GST_MANIFEST_LOCK (demux); @@ -3786,8 +3932,8 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux, if (can_expose) { GST_DEBUG_OBJECT (demux, "Subclass wants new pads " "to do bitrate switching"); - gst_adaptive_demux_expose_streams (demux, FALSE); - gst_adaptive_demux_start_tasks (demux); + gst_adaptive_demux_prepare_streams (demux, FALSE); + gst_adaptive_demux_start_tasks (demux, TRUE); } else { GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads"); } @@ -3946,8 +4092,8 @@ gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux) GST_DEBUG_OBJECT (demux, "Advancing to next period"); klass->advance_period (demux); - gst_adaptive_demux_expose_streams (demux, FALSE); - gst_adaptive_demux_start_tasks (demux); + gst_adaptive_demux_prepare_streams (demux, FALSE); + gst_adaptive_demux_start_tasks (demux, TRUE); } /** diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h index 5b39728e0a..830171467f 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h @@ -185,6 +185,8 @@ struct _GstAdaptiveDemuxStream /* TODO check if used */ gboolean eos; + + gboolean do_block; /* TRUE if stream should block on preroll */ }; /** @@ -207,6 +209,7 @@ struct _GstAdaptiveDemux GstUriDownloader *downloader; GList *streams; + GList *prepared_streams; GList *next_streams; GstSegment segment;