From 83c0e1e25db9ab975db504c22c99ca7739d8d889 Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Fri, 1 Feb 2013 02:10:15 -0300 Subject: [PATCH] dashdemux: download the next fragment with smaller timestamp Instead of downloading 1 fragment per stream per download loop, select the stream with the earlier timestamp and get a fragment only for that one. The old algorithm would lead to problems when the fragment durations were too different for streams. --- ext/dash/gstdashdemux.c | 145 +++++++++++++++++++++++++--------------- ext/dash/gstdashdemux.h | 13 +++- ext/dash/gstmpdparser.c | 29 ++++++++ ext/dash/gstmpdparser.h | 1 + 4 files changed, 131 insertions(+), 57 deletions(-) diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index f252ca4e46..ea3ecbed18 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -233,7 +233,7 @@ static void gst_dash_demux_resume_download_task (GstDashDemux * demux); static gboolean gst_dash_demux_setup_all_streams (GstDashDemux * demux); static gboolean gst_dash_demux_select_representations (GstDashDemux * demux, guint64 current_bitrate); -static gboolean gst_dash_demux_get_next_fragment_set (GstDashDemux * demux); +static gboolean gst_dash_demux_get_next_fragment (GstDashDemux * demux); static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose); static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux); @@ -1269,6 +1269,21 @@ gst_dash_demux_stream_get_buffering_time (GstDashDemuxStream * stream) return (GstClockTime) level.time; } +static gboolean +gst_dash_demux_all_streams_have_data (GstDashDemux * demux) +{ + GSList *iter; + + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + GstDashDemuxStream *stream = iter->data; + + if (!stream->has_data_queued) + return FALSE; + } + + return TRUE; +} + /* gst_dash_demux_download_loop: * * Loop for the "download' task that fetches fragments based on the @@ -1384,11 +1399,13 @@ gst_dash_demux_download_loop (GstDashDemux * demux) GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest); /* try to switch to another set of representations if needed */ - gst_dash_demux_select_representations (demux, - demux->bandwidth_usage * demux->dnl_rate); + if (gst_dash_demux_all_streams_have_data (demux)) { + gst_dash_demux_select_representations (demux, + demux->bandwidth_usage * demux->dnl_rate); + } /* fetch the next fragment */ - while (!gst_dash_demux_get_next_fragment_set (demux)) { + while (!gst_dash_demux_get_next_fragment (demux)) { if (demux->end_of_period) { GST_INFO_OBJECT (demux, "Reached the end of the Period"); /* setup video, audio and subtitle streams, starting from the next Period */ @@ -1482,6 +1499,7 @@ gst_dash_demux_prepare_pad_switch (GstDashDemux * demux) g_assert (caps != NULL); gst_dash_demux_stream_push_event (stream, gst_event_new_dash_event_pad_switch (caps)); + stream->has_data_queued = FALSE; } } @@ -1672,9 +1690,9 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream) } } -/* gst_dash_demux_get_next_fragment_set: +/* gst_dash_demux_get_next_fragment: * - * Get the next set of fragments for the current representations. + * Get the next fragments for the stream with the earlier timestamp. * * This function uses the generic URI downloader API. * @@ -1682,11 +1700,10 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream) * */ static gboolean -gst_dash_demux_get_next_fragment_set (GstDashDemux * demux) +gst_dash_demux_get_next_fragment (GstDashDemux * demux) { GstActiveStream *active_stream; GstFragment *download, *header; - GList *fragment_set; gchar *next_fragment_uri; GstClockTime duration; GstClockTime timestamp; @@ -1697,25 +1714,28 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux) guint64 size_buffer = 0; GSList *iter; gboolean end_of_period = TRUE; + GstDashDemuxStream *selected_stream = NULL; + GstClockTime best_time = GST_CLOCK_TIME_NONE; - g_get_current_time (&start); - fragment_set = NULL; - /* Get the fragment corresponding to each stream index */ for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - guint stream_idx = stream->index; - GstBuffer *buffer; + GstClockTime ts; if (stream->download_end_of_period) continue; - if (!gst_mpd_client_get_next_fragment (demux->client, - stream_idx, &discont, &next_fragment_uri, &duration, ×tamp)) { + if (gst_mpd_client_get_next_fragment_timestamp (demux->client, + stream->index, &ts)) { + if (ts < best_time || !GST_CLOCK_TIME_IS_VALID (best_time)) { + selected_stream = stream; + best_time = ts; + } + } else { GstEvent *event = NULL; GST_INFO_OBJECT (demux, "This Period doesn't contain more fragments for stream %u", - stream_idx); + stream->index); if (gst_mpd_client_has_next_period (demux->client)) { event = gst_event_new_dash_eop (); } else { @@ -1723,56 +1743,71 @@ gst_dash_demux_get_next_fragment_set (GstDashDemux * demux) } stream->download_end_of_period = TRUE; gst_dash_demux_stream_push_event (stream, event); - continue; } + } - GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx); - GST_INFO_OBJECT (demux, - "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%" - GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp), - GST_TIME_ARGS (duration)); + g_get_current_time (&start); + /* Get the fragment corresponding to each stream index */ + if (selected_stream) { + guint stream_idx = selected_stream->index; + GstBuffer *buffer; - /* got a fragment to fetch, no end of period */ - end_of_period = FALSE; + if (gst_mpd_client_get_next_fragment (demux->client, + stream_idx, &discont, &next_fragment_uri, &duration, ×tamp)) { - download = gst_uri_downloader_fetch_uri (demux->downloader, - next_fragment_uri); - g_free (next_fragment_uri); + GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx); + GST_INFO_OBJECT (demux, + "Fetching next fragment %s ts:%" GST_TIME_FORMAT " dur:%" + GST_TIME_FORMAT, next_fragment_uri, GST_TIME_ARGS (timestamp), + GST_TIME_ARGS (duration)); - if (download == NULL) - return FALSE; + /* got a fragment to fetch, no end of period */ + end_of_period = FALSE; - buffer = gst_fragment_get_buffer (download); + download = gst_uri_downloader_fetch_uri (demux->downloader, + next_fragment_uri); + g_free (next_fragment_uri); - active_stream = - gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx); - if (stream == NULL) /* TODO unref fragments */ - return FALSE; + if (download == NULL) + return FALSE; - if (stream->need_header) { - /* We need to fetch a new header */ - if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) { - GST_INFO_OBJECT (demux, "Unable to fetch header"); - } else { - GstBuffer *header_buffer; - /* Replace fragment with a new one including the header */ + buffer = gst_fragment_get_buffer (download); - header_buffer = gst_fragment_get_buffer (header); - buffer = gst_buffer_join (header_buffer, buffer); + active_stream = + gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx); + if (active_stream == NULL) /* TODO unref fragments */ + return FALSE; + + if (selected_stream->need_header) { + /* We need to fetch a new header */ + if ((header = + gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) { + GST_INFO_OBJECT (demux, "Unable to fetch header"); + } else { + GstBuffer *header_buffer; + /* Replace fragment with a new one including the header */ + + header_buffer = gst_fragment_get_buffer (header); + buffer = gst_buffer_join (header_buffer, buffer); + } + selected_stream->need_header = FALSE; } - stream->need_header = FALSE; + + buffer = gst_buffer_make_metadata_writable (buffer); + + GST_BUFFER_TIMESTAMP (buffer) = timestamp; + GST_BUFFER_DURATION (buffer) = duration; + GST_BUFFER_OFFSET (buffer) = + gst_mpd_client_get_segment_index (active_stream) - 1; + + gst_buffer_set_caps (buffer, selected_stream->input_caps); + gst_dash_demux_stream_push_data (selected_stream, buffer); + selected_stream->has_data_queued = TRUE; + size_buffer += GST_BUFFER_SIZE (buffer); + } else { + GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d", + selected_stream, selected_stream->index); } - - buffer = gst_buffer_make_metadata_writable (buffer); - - GST_BUFFER_TIMESTAMP (buffer) = timestamp; - GST_BUFFER_DURATION (buffer) = duration; - GST_BUFFER_OFFSET (buffer) = - gst_mpd_client_get_segment_index (active_stream) - 1; - - gst_buffer_set_caps (buffer, stream->input_caps); - gst_dash_demux_stream_push_data (stream, buffer); - size_buffer += GST_BUFFER_SIZE (buffer); } demux->end_of_period = end_of_period; diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index ab2acc1290..1157d57bfa 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -78,11 +78,20 @@ struct _GstDashDemuxStream gboolean stream_end_of_period; gboolean stream_eos; - gboolean need_header; - gboolean need_segment; + /* tracks if a stream has enqueued data + * after a pad switch. + * This is required to prevent pads being + * added to the demuxer and having no data + * pushed to it before another pad switch + * as this might make downstream elements + * unhappy and error out if they get + * an EOS without receiving any input + */ + gboolean has_data_queued; + GstDataQueue *queue; }; diff --git a/ext/dash/gstmpdparser.c b/ext/dash/gstmpdparser.c index 3523a24209..55b3b92951 100644 --- a/ext/dash/gstmpdparser.c +++ b/ext/dash/gstmpdparser.c @@ -3129,6 +3129,35 @@ gst_mpd_client_setup_streaming (GstMpdClient * client, return TRUE; } +gboolean +gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client, + guint stream_idx, GstClockTime * ts) +{ + GstActiveStream *stream; + gint segment_idx; + GstMediaSegment *currentChunk; + + GST_DEBUG ("Stream index: %i", stream_idx); + stream = g_list_nth_data (client->active_streams, stream_idx); + g_return_val_if_fail (stream != NULL, 0); + + GST_MPD_CLIENT_LOCK (client); + segment_idx = gst_mpd_client_get_segment_index (stream); + GST_DEBUG ("Looking for fragment sequence chunk %d", segment_idx); + + currentChunk = + gst_mpdparser_get_chunk_by_index (client, stream_idx, segment_idx); + if (currentChunk == NULL) { + GST_MPD_CLIENT_UNLOCK (client); + return FALSE; + } + + *ts = currentChunk->start_time; + GST_MPD_CLIENT_UNLOCK (client); + + return TRUE; +} + gboolean gst_mpd_client_get_next_fragment (GstMpdClient * client, guint indexStream, gboolean * discontinuity, gchar ** uri, diff --git a/ext/dash/gstmpdparser.h b/ext/dash/gstmpdparser.h index 0624043382..16ad7e0d93 100644 --- a/ext/dash/gstmpdparser.h +++ b/ext/dash/gstmpdparser.h @@ -470,6 +470,7 @@ gboolean gst_mpd_client_setup_representation (GstMpdClient *client, GstActiveStr GstClockTime gst_mpd_client_get_current_position (GstMpdClient *client); GstClockTime gst_mpd_client_get_next_fragment_duration (GstMpdClient * client); GstClockTime gst_mpd_client_get_media_presentation_duration (GstMpdClient *client); +gboolean gst_mpd_client_get_next_fragment_timestamp (GstMpdClient * client, guint stream_idx, GstClockTime * ts); gboolean gst_mpd_client_get_next_fragment (GstMpdClient *client, guint indexStream, gboolean *discontinuity, gchar **uri, GstClockTime *duration, GstClockTime *timestamp); gboolean gst_mpd_client_get_next_header (GstMpdClient *client, const gchar **uri, guint stream_idx); gboolean gst_mpd_client_is_live (GstMpdClient * client);