From 6611d14eedc5e7752a6341f20df563326d049181 Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Wed, 4 Dec 2013 11:30:22 -0300 Subject: [PATCH] dashdemux: handle multiple languages Handle multiple languages by using the not-linked return to stop the download task for that stream. It can be reactivated when a reconfigure event is received. Stopping the unused streams is relevant to save network bandwidth --- ext/dash/gstdashdemux.c | 329 +++++++++++++++++++++++++++++----------- ext/dash/gstdashdemux.h | 13 +- 2 files changed, 248 insertions(+), 94 deletions(-) diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 931a9f1f29..4ed16315b7 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -181,8 +181,11 @@ enum #define DEFAULT_FAILED_COUNT 3 #define DOWNLOAD_RATE_HISTORY_MAX 3 -#define GST_DASH_DEMUX_DOWNLOAD_LOCK(d) g_mutex_lock (&d->download_mutex) -#define GST_DASH_DEMUX_DOWNLOAD_UNLOCK(d) g_mutex_unlock (&d->download_mutex) +#define GST_DASH_DEMUX_CLIENT_LOCK(d) g_mutex_lock (&d->client_lock) +#define GST_DASH_DEMUX_CLIENT_UNLOCK(d) g_mutex_unlock (&d->client_lock) + +#define GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK(s) g_mutex_lock (&s->download_mutex) +#define GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK(s) g_mutex_unlock (&s->download_mutex) /* Custom internal event to signal end of period */ #define GST_EVENT_DASH_EOP GST_EVENT_MAKE_TYPE(81, GST_EVENT_TYPE_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED) @@ -225,7 +228,7 @@ static GstFlowReturn gst_dash_demux_get_next_fragment (GstDashDemux * demux, GstDashDemuxStream * stream, GstActiveStream ** active_stream, GstClockTime * next_ts); static gboolean gst_dash_demux_advance_period (GstDashDemux * demux); -static void gst_dash_demux_download_wait (GstDashDemux * demux, +static void gst_dash_demux_download_wait (GstDashDemuxStream * stream, GstClockTime time_diff); static void gst_dash_demux_expose_streams (GstDashDemux * demux); @@ -261,15 +264,13 @@ gst_dash_demux_dispose (GObject * obj) demux->stream_task = NULL; } - g_cond_clear (&demux->download_cond); - g_mutex_clear (&demux->download_mutex); - if (demux->downloader != NULL) { g_object_unref (demux->downloader); demux->downloader = NULL; } g_mutex_clear (&demux->streams_lock); + g_mutex_clear (&demux->client_lock); G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -345,10 +346,6 @@ gst_dash_demux_init (GstDashDemux * demux) demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE; demux->max_bitrate = DEFAULT_MAX_BITRATE; - /* Updates task */ - g_cond_init (&demux->download_cond); - g_mutex_init (&demux->download_mutex); - /* Streaming task */ g_rec_mutex_init (&demux->stream_task_lock); demux->stream_task = @@ -356,6 +353,7 @@ gst_dash_demux_init (GstDashDemux * demux) gst_task_set_lock (demux->stream_task, &demux->stream_task_lock); g_mutex_init (&demux->streams_lock); + g_mutex_init (&demux->client_lock); } static void @@ -470,6 +468,38 @@ gst_dash_demux_stream_push_data (GstDashDemuxStream * stream, gst_data_queue_push (stream->queue, item); } +static void +gst_dash_demux_stream_seek (GstDashDemuxStream * stream, + GstClockTime target_pos) +{ + gint seg_i; + guint current_sequence = 0; + GstActiveStream *active_stream; + GstMediaSegment *chunk; + GstClockTime current_pos = 0; + GstDashDemux *demux = stream->demux; + + active_stream = + gst_mpdparser_get_active_stream_by_index (demux->client, stream->index); + for (seg_i = 0; seg_i < active_stream->segments->len; seg_i++) { + chunk = g_ptr_array_index (active_stream->segments, seg_i); + current_pos = chunk->start_time; + /* current_sequence = chunk->number; */ + GST_DEBUG_OBJECT (demux, "current_pos:%" GST_TIME_FORMAT + " <= target_pos:%" GST_TIME_FORMAT " duration:%" + GST_TIME_FORMAT, GST_TIME_ARGS (current_pos), + GST_TIME_ARGS (target_pos), GST_TIME_ARGS (chunk->duration)); + if (current_pos <= target_pos && target_pos < current_pos + chunk->duration) { + GST_DEBUG_OBJECT (demux, + "selecting sequence %d for stream %" GST_PTR_FORMAT, + current_sequence, stream); + break; + } + current_sequence++; + } + gst_mpd_client_set_segment_index (active_stream, current_sequence); +} + static gboolean gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) { @@ -487,9 +517,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) gint64 start, stop; GList *list; GstClockTime current_pos, target_pos; - guint current_sequence, current_period; - GstActiveStream *active_stream; - GstMediaSegment *chunk; + guint current_period; GstStreamPeriod *period; GSList *iter; gboolean update; @@ -575,32 +603,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) /* Update the current sequence on all streams */ for (iter = demux->streams; iter; iter = g_slist_next (iter)) { - GstDashDemuxStream *stream = iter->data; - gint seg_i; - - active_stream = - gst_mpdparser_get_active_stream_by_index (demux->client, - stream->index); - current_pos = 0; - current_sequence = 0; - for (seg_i = 0; seg_i < active_stream->segments->len; seg_i++) { - chunk = g_ptr_array_index (active_stream->segments, seg_i); - current_pos = chunk->start_time; - /* current_sequence = chunk->number; */ - GST_DEBUG_OBJECT (demux, "current_pos:%" GST_TIME_FORMAT - " <= target_pos:%" GST_TIME_FORMAT " duration:%" - GST_TIME_FORMAT, GST_TIME_ARGS (current_pos), - GST_TIME_ARGS (target_pos), GST_TIME_ARGS (chunk->duration)); - if (current_pos <= target_pos - && target_pos < current_pos + chunk->duration) { - GST_DEBUG_OBJECT (demux, - "selecting sequence %d for stream %" GST_PTR_FORMAT, - current_sequence, stream); - break; - } - current_sequence++; - } - gst_mpd_client_set_segment_index (active_stream, current_sequence); + gst_dash_demux_stream_seek (iter->data, target_pos); } if (flags & GST_SEEK_FLAG_FLUSH) { @@ -624,6 +627,7 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; gst_data_queue_set_flushing (stream->queue, FALSE); + stream->last_ret = GST_FLOW_OK; } demux->timestamp_offset = 0; demux->need_segment = TRUE; @@ -636,6 +640,28 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) return TRUE; } + case GST_EVENT_RECONFIGURE:{ + GSList *iter; + + for (iter = demux->streams; iter; iter = g_slist_next (iter)) { + GstDashDemuxStream *stream = iter->data; + + if (stream->pad == pad) { + GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream); + if (stream->last_ret == GST_FLOW_NOT_LINKED) { + stream->last_ret = GST_FLOW_OK; + stream->restart_download = TRUE; + stream->need_header = TRUE; + GST_DEBUG_OBJECT (stream->pad, "Restarting download loop"); + } + gst_task_start (stream->download_task); + GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream); + gst_event_unref (event); + return TRUE; + } + } + } + break; default: break; } @@ -703,6 +729,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) gst_task_new ((GstTaskFunction) gst_dash_demux_stream_download_loop, stream, NULL); gst_task_set_lock (stream->download_task, &stream->download_task_lock); + g_cond_init (&stream->download_cond); + g_mutex_init (&stream->download_mutex); stream->index = i; stream->input_caps = caps; @@ -1018,13 +1046,10 @@ gst_dash_demux_stop (GstDashDemux * demux) gst_data_queue_set_flushing (stream->queue, TRUE); if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) { - GST_TASK_SIGNAL (stream->download_task); + stream->last_ret = GST_FLOW_FLUSHING; + stream->need_header = TRUE; gst_task_stop (stream->download_task); - g_mutex_lock (&demux->download_mutex); - g_cond_signal (&demux->download_cond); - g_mutex_unlock (&demux->download_mutex); - g_rec_mutex_lock (&stream->download_task_lock); - g_rec_mutex_unlock (&stream->download_task_lock); + GST_TASK_SIGNAL (stream->download_task); gst_task_join (stream->download_task); } } @@ -1154,7 +1179,6 @@ static void gst_dash_demux_stream_loop (GstDashDemux * demux) { GstFlowReturn ret; - GstActiveStream *active_stream; GSList *iter; GstClockTime best_time; GstDashDemuxStream *selected_stream; @@ -1172,6 +1196,9 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) GST_DEBUG_OBJECT (demux, "Peeking stream %d", stream->index); + if (stream->last_ret == GST_FLOW_NOT_LINKED) + continue; + if (stream->stream_eos) { GST_DEBUG_OBJECT (demux, "Stream %d is eos, skipping", stream->index); continue; @@ -1221,6 +1248,7 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) } } + ret = GST_FLOW_OK; if (selected_stream) { GstDataQueueItem *item; @@ -1232,13 +1260,9 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) if (G_LIKELY (GST_IS_BUFFER (item->object))) { GstBuffer *buffer; - GstClockTime timestamp; + GstClockTime timestamp, duration; buffer = GST_BUFFER_CAST (item->object); - active_stream = - gst_mpdparser_get_active_stream_by_index (demux->client, - selected_stream->index); - timestamp = GST_BUFFER_TIMESTAMP (buffer); if (demux->need_segment) { @@ -1255,32 +1279,36 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) } /* make timestamp start from 0 by subtracting the offset */ timestamp -= demux->timestamp_offset; + duration = GST_BUFFER_DURATION (buffer); GST_BUFFER_TIMESTAMP (buffer) = timestamp; GST_DEBUG_OBJECT (demux, "Pushing fragment ts: %" GST_TIME_FORMAT " at pad %s", GST_TIME_ARGS (timestamp), GST_PAD_NAME (selected_stream->pad)); -#if 0 GST_DEBUG_OBJECT (demux, - "Pushing fragment %p #%d (stream %i) ts:%" GST_TIME_FORMAT " dur:%" - GST_TIME_FORMAT " at pad: %s:%s", buffer, GST_BUFFER_OFFSET (buffer), - selected_stream->index, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + "Pushing fragment %p #%" G_GUINT64_FORMAT " (stream %d) ts:%" + GST_TIME_FORMAT " dur:%" GST_TIME_FORMAT " at pad: %s:%s", buffer, + GST_BUFFER_OFFSET (buffer), selected_stream->index, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)), GST_DEBUG_PAD_NAME (selected_stream->pad)); -#endif ret = gst_pad_push (selected_stream->pad, gst_buffer_ref (buffer)); + GST_DEBUG_OBJECT (demux, "Push result: %d %s", ret, + gst_flow_get_name (ret)); + demux->segment.position = timestamp; + selected_stream->position = timestamp; + if (GST_CLOCK_TIME_IS_VALID (duration)) + selected_stream->position += duration; item->destroy (item); - if ((ret != GST_FLOW_OK) && (active_stream - && active_stream->mimeType == GST_STREAM_VIDEO)) - goto error_pushing; } else { /* a GstEvent */ if (GST_EVENT_TYPE (item->object) == GST_EVENT_EOS) { selected_stream->stream_end_of_period = TRUE; selected_stream->stream_eos = TRUE; + ret = GST_FLOW_EOS; } else if (GST_EVENT_TYPE (item->object) == GST_EVENT_DASH_EOP) { selected_stream->stream_end_of_period = TRUE; } @@ -1298,6 +1326,19 @@ gst_dash_demux_stream_loop (GstDashDemux * demux) gst_dash_demux_advance_period (demux); } } + GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (selected_stream); + if (ret != selected_stream->last_ret) { + gst_task_start (selected_stream->download_task); + selected_stream->last_ret = ret; + } + switch (selected_stream->last_ret) { + case GST_FLOW_NOT_LINKED: + gst_data_queue_set_flushing (selected_stream->queue, TRUE); + break; + default: + break; + } + GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (selected_stream); end: GST_INFO_OBJECT (demux, "Leaving streaming task"); @@ -1317,6 +1358,7 @@ end_of_manifest: return; } +#if 0 error_pushing: { /* FIXME: handle error */ @@ -1326,6 +1368,7 @@ error_pushing: gst_task_stop (demux->stream_task); return; } +#endif } static void @@ -1350,6 +1393,8 @@ gst_dash_demux_stream_free (GstDashDemuxStream * stream) gst_object_unref (stream->download_task); g_rec_mutex_clear (&stream->download_task_lock); } + g_cond_clear (&stream->download_cond); + g_mutex_clear (&stream->download_mutex); g_free (stream); } @@ -1630,26 +1675,43 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) GstDashDemux *demux = stream->demux; GstFlowReturn flow_ret = GST_FLOW_OK; - GST_LOG_OBJECT (demux, "Starting download loop %p %s:%s", stream, - GST_DEBUG_PAD_NAME (stream->pad)); + GST_LOG_OBJECT (stream->pad, "Starting download loop"); - GST_DASH_DEMUX_DOWNLOAD_LOCK (demux); + GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream); + if (stream->last_ret < GST_FLOW_OK) { + if (demux->cancelled) { + GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream); + goto cancelled; + } + GST_DEBUG_OBJECT (stream->pad, "Download loop waiting due to flow return: " + "%d %s", stream->last_ret, gst_flow_get_name (stream->last_ret)); + gst_task_pause (stream->download_task); + GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream); + return; + } + GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream); + + if (demux->cancelled) { + goto cancelled; + } + + GST_DASH_DEMUX_CLIENT_LOCK (demux); if (gst_mpd_client_is_live (demux->client) && demux->client->mpd_uri != NULL) { switch (gst_dash_demux_refresh_mpd (demux)) { case GST_FLOW_EOS: - GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux); + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); goto end_of_manifest; default: break; } } - GST_DEBUG_OBJECT (demux, "download loop %i", demux->end_of_manifest); + GST_DEBUG_OBJECT (stream->pad, "End of manifest: %d", demux->end_of_manifest); /* try to switch to another set of representations if needed */ gst_dash_demux_stream_select_representation_unlocked (stream); - GST_DASH_DEMUX_DOWNLOAD_UNLOCK (demux); + GST_DASH_DEMUX_CLIENT_UNLOCK (demux); /* fetch the next fragment */ flow_ret = gst_dash_demux_get_next_fragment (demux, stream, &fragment_stream, @@ -1659,13 +1721,14 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) case GST_FLOW_OK: break; case GST_FLOW_EOS: + GST_DASH_DEMUX_STREAM_DOWNLOAD_LOCK (stream); if (demux->end_of_period) { - GST_INFO_OBJECT (demux, "Reached the end of the Period"); + GST_INFO_OBJECT (stream->pad, "Reached the end of the Period"); /* setup video, audio and subtitle streams, starting from the next Period */ if (!gst_mpd_client_set_period_index (demux->client, gst_mpd_client_get_period_index (demux->client) + 1) || !gst_dash_demux_setup_all_streams (demux)) { - GST_INFO_OBJECT (demux, "Reached the end of the manifest file"); + GST_INFO_OBJECT (stream->pad, "Reached the end of the manifest file"); demux->end_of_manifest = TRUE; gst_task_start (demux->stream_task); goto end_of_manifest; @@ -1674,6 +1737,8 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) gst_mpd_client_set_segment_index_for_all_streams (demux->client, 0); demux->end_of_period = FALSE; } + gst_task_pause (stream->download_task); + GST_DASH_DEMUX_STREAM_DOWNLOAD_UNLOCK (stream); break; case GST_FLOW_ERROR: /* Download failed 'by itself' @@ -1687,7 +1752,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) pos = gst_mpd_client_check_time_position (demux->client, fragment_stream, fragment_ts, &time_diff); - GST_DEBUG_OBJECT (demux, + GST_DEBUG_OBJECT (stream->pad, "Checked position for fragment ts %" GST_TIME_FORMAT ", res: %d, diff: %" G_GINT64_FORMAT, GST_TIME_ARGS (fragment_ts), pos, time_diff); @@ -1697,7 +1762,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) /* we're behind, try moving to the 'present' */ GDateTime *now = g_date_time_new_now_utc (); - GST_DEBUG_OBJECT (demux, + GST_DEBUG_OBJECT (stream->pad, "Falling behind live stream, moving forward"); gst_mpd_client_seek_to_time (demux->client, now); g_date_time_unref (now); @@ -1705,10 +1770,11 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) } else if (pos > 0) { /* we're ahead, wait a little */ - GST_DEBUG_OBJECT (demux, "Waiting for next segment to be created"); + GST_DEBUG_OBJECT (stream->pad, + "Waiting for next segment to be created"); gst_mpd_client_set_segment_index (fragment_stream, fragment_stream->segment_idx - 1); - gst_dash_demux_download_wait (demux, time_diff); + gst_dash_demux_download_wait (stream, time_diff); } else { gst_mpd_client_set_segment_index (fragment_stream, fragment_stream->segment_idx - 1); @@ -1719,7 +1785,7 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) } if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) { - GST_WARNING_OBJECT (demux, "Could not fetch the next fragment"); + GST_WARNING_OBJECT (stream->pad, "Could not fetch the next fragment"); goto quit; } else { goto error_downloading; @@ -1733,24 +1799,23 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) goto cancelled; } - GST_INFO_OBJECT (demux, "Internal buffering : %" G_GUINT64_FORMAT " s", + GST_INFO_OBJECT (stream->pad, "Internal buffering : %" G_GUINT64_FORMAT " s", gst_dash_demux_get_buffering_time (demux) / GST_SECOND); demux->client->update_failed_count = 0; quit: - GST_DEBUG_OBJECT (demux, "Finishing download loop"); + GST_DEBUG_OBJECT (stream->pad, "Finishing download loop"); return; cancelled: { - GST_WARNING_OBJECT (demux, "Cancelled, leaving download task"); - gst_task_stop (stream->download_task); + GST_WARNING_OBJECT (stream->pad, "Cancelled, leaving download task"); return; } end_of_manifest: { - GST_INFO_OBJECT (demux, "End of manifest, leaving download task"); + GST_INFO_OBJECT (stream->pad, "End of manifest, leaving download task"); gst_task_stop (stream->download_task); return; } @@ -2006,7 +2071,7 @@ gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream) static void gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux, - GstActiveStream * stream) + GstDashDemuxStream * dash_stream, GstActiveStream * stream) { GstDateTime *seg_end_time; GstDateTime *cur_time = gst_date_time_new_now_utc (); @@ -2027,26 +2092,82 @@ gst_dash_demux_wait_for_fragment_to_be_available (GstDashDemux * demux, GST_DEBUG_OBJECT (demux, "Selected fragment has end timestamp > now (%" PRIi64 "), delaying download", diff); - gst_dash_demux_download_wait (demux, diff); + gst_dash_demux_download_wait (dash_stream, diff); } } } static gboolean gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, - GstDashDemuxStream * demux_stream, guint64 * size_buffer, + GstDashDemuxStream * stream, guint64 * size_buffer, GstClockTime * download_time) { GstActiveStream *active_stream; GstFragment *download; GTimeVal now; GTimeVal start; - guint stream_idx = demux_stream->index; + guint stream_idx = stream->index; GstBuffer *buffer; GstBuffer *header_buffer; GstMediaFragmentInfo fragment; + if (G_UNLIKELY (stream->restart_download)) { + GstClockTime cur, ts; + gint64 pos; + GstEvent *gap; + + GST_DEBUG_OBJECT (stream->pad, + "Reactivating stream after to reconfigure event"); + + cur = GST_CLOCK_TIME_IS_VALID (stream->position) ? stream->position : 0; + + if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) { + ts = (GstClockTime) pos; + GST_DEBUG_OBJECT (stream->pad, "Downstream position: %" + GST_TIME_FORMAT, GST_TIME_ARGS (ts)); + } else { + ts = demux->segment.position; + GST_DEBUG_OBJECT (stream->pad, "Downstream position query failed, " + "failling back to looking at other pads"); + } + + GST_DEBUG_OBJECT (stream->pad, "Restarting stream at " + "position %" GST_TIME_FORMAT ", current catch up %" GST_TIME_FORMAT, + GST_TIME_ARGS (ts), GST_TIME_ARGS (demux->segment.position)); + + if (GST_CLOCK_TIME_IS_VALID (ts)) { + gst_dash_demux_stream_seek (stream, ts); + + if (cur < ts) { + gap = gst_event_new_gap (cur, ts - cur); + gst_pad_push_event (stream->pad, gap); + } + } + + /* This stream might be entering into catching up mode, + * meaning that it will push buffers from this same download thread + * until it reaches 'catch_up_timestamp'. + * + * The reason for this is that in case of stream switching, the other + * stream that was previously active might be blocking the stream_loop + * in case it is ahead enough that all queues are filled. + * In this case, it is possible that a downstream input-selector is + * blocking waiting for the currently active stream to reach the + * same position of the old linked stream because of the 'sync-streams' + * behavior. + * + * We can push from this thread up to 'catch_up_timestamp' as all other + * streams should be around the same timestamp. + */ + stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; + + gst_data_queue_set_flushing (stream->queue, FALSE); + stream->restart_download = FALSE; + gst_task_start (demux->stream_task); + } + if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, &fragment)) { + gboolean catch_up = FALSE; g_get_current_time (&start); GST_INFO_OBJECT (demux, "Next fragment for stream #%i", stream_idx); @@ -2100,13 +2221,13 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, } } - if (demux_stream->need_header) { + if (stream->need_header) { /* We need to fetch a new header */ if ((header_buffer = gst_dash_demux_get_next_header (demux, stream_idx)) != NULL) { buffer = gst_buffer_append (header_buffer, buffer); } - demux_stream->need_header = FALSE; + stream->need_header = FALSE; } g_get_current_time (&now); *download_time = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start)); @@ -2120,12 +2241,36 @@ gst_dash_demux_get_next_fragment_for_stream (GstDashDemux * demux, gst_media_fragment_info_clear (&fragment); - gst_dash_demux_stream_push_data (demux_stream, buffer); - demux_stream->has_data_queued = TRUE; + /* Check if this stream is on catch up mode */ + if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) { + GST_DEBUG_OBJECT (stream->pad, + "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT, + GST_TIME_ARGS (demux->segment.position), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); + if (GST_BUFFER_TIMESTAMP (buffer) < demux->segment.position) { + catch_up = TRUE; + } else { + stream->last_ret = GST_FLOW_OK; + gst_task_start (demux->stream_task); + } + } + + if (catch_up) { + GstFlowReturn ret; + + ret = gst_pad_push (stream->pad, buffer); + if (G_LIKELY (ret == GST_FLOW_OK)) + stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; + /* TODO handle return */ + } else { + gst_dash_demux_stream_push_data (stream, buffer); + stream->has_data_queued = TRUE; + } + *size_buffer += gst_buffer_get_size (buffer); } else { GST_WARNING_OBJECT (demux, "Failed to download fragment for stream %p %d", - demux_stream, demux_stream->index); + stream, stream->index); } return TRUE; } @@ -2196,7 +2341,8 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux, if (stream && gst_mpd_client_is_live (demux->client) && demux->client->mpd_node->minimumUpdatePeriod != -1) { - gst_dash_demux_wait_for_fragment_to_be_available (demux, *active_stream); + gst_dash_demux_wait_for_fragment_to_be_available (demux, stream, + *active_stream); } /* Get the fragment corresponding to each stream index */ @@ -2229,12 +2375,13 @@ gst_dash_demux_get_next_fragment (GstDashDemux * demux, } static void -gst_dash_demux_download_wait (GstDashDemux * demux, GstClockTime time_diff) +gst_dash_demux_download_wait (GstDashDemuxStream * stream, + GstClockTime time_diff) { gint64 end_time = g_get_monotonic_time () + time_diff / GST_USECOND; - GST_DEBUG_OBJECT (demux, "Download waiting for %" GST_TIME_FORMAT, + GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT, GST_TIME_ARGS (time_diff)); - g_cond_wait_until (&demux->download_cond, &demux->download_mutex, end_time); - GST_DEBUG_OBJECT (demux, "Download finished waiting"); + g_cond_wait_until (&stream->download_cond, &stream->download_mutex, end_time); + GST_DEBUG_OBJECT (stream->pad, "Download finished waiting"); } diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 84baa8e591..604dc4a8e6 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -63,6 +63,10 @@ struct _GstDashDemuxStream GstCaps *input_caps; + GstFlowReturn last_ret; + GstClockTime position; + gboolean restart_download; + /* * Need to store the status for the download and * stream tasks separately as they are working at @@ -94,6 +98,10 @@ struct _GstDashDemuxStream gboolean has_data_queued; GstDataQueue *queue; + + /* Download task */ + GMutex download_mutex; + GCond download_cond; GstTask *download_task; GRecMutex download_task_lock; @@ -124,6 +132,8 @@ struct _GstDashDemux GstBuffer *manifest; GstUriDownloader *downloader; GstMpdClient *client; /* MPD client */ + GMutex client_lock; + gboolean end_of_period; gboolean end_of_manifest; @@ -136,9 +146,6 @@ struct _GstDashDemux GstTask *stream_task; GRecMutex stream_task_lock; - /* Download task */ - GMutex download_mutex; - GCond download_cond; gboolean cancelled; /* Manifest update */