From 55032ae5fee1d6253632f4ce7f2912b3d36e3a66 Mon Sep 17 00:00:00 2001 From: George Kiagiadakis Date: Fri, 29 Aug 2014 12:38:12 +0200 Subject: [PATCH] dashdemux: synchronize with the download loop thread to signal it to continue If EOS or ERROR happens before the download loop thread has reached its g_cond_wait() call, then the g_cond_signal doesn't have any effect and the download loop thread stucks later. https://bugzilla.gnome.org/show_bug.cgi?id=735663 --- ext/dash/gstdashdemux.c | 124 ++++++++++++++++++++++++++++------------ ext/dash/gstdashdemux.h | 1 + 2 files changed, 88 insertions(+), 37 deletions(-) diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 8d209facd8..300e5faad3 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -220,7 +220,7 @@ static GstFlowReturn gst_dash_demux_stream_get_next_fragment (GstDashDemuxStream static gboolean gst_dash_demux_advance_period (GstDashDemux * demux); static void gst_dash_demux_download_wait (GstDashDemuxStream * stream, GstClockTime time_diff); -static void gst_dash_demux_stream_download_uri (GstDashDemux * demux, +static GstFlowReturn gst_dash_demux_stream_download_uri (GstDashDemux * demux, GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end); static void gst_dash_demux_expose_streams (GstDashDemux * demux); @@ -236,8 +236,7 @@ static GstPad *gst_dash_demux_create_pad (GstDashDemux * demux, #define gst_dash_demux_parent_class parent_class G_DEFINE_TYPE_WITH_CODE (GstDashDemux, gst_dash_demux, GST_TYPE_BIN, GST_DEBUG_CATEGORY_INIT (gst_dash_demux_debug, "dashdemux", 0, - "dashdemux element"); - ); + "dashdemux element");); static void gst_dash_demux_dispose (GObject * obj) @@ -428,8 +427,13 @@ gst_dash_demux_handle_message (GstBin * bin, GstMessage * msg) err->domain, err->code, err->message, debug); /* error, but ask to retry */ - stream->last_ret = GST_FLOW_CUSTOM_ERROR; + g_mutex_lock (&stream->fragment_download_lock); + if (!stream->download_finished) { + stream->last_ret = GST_FLOW_CUSTOM_ERROR; + stream->download_finished = TRUE; + } g_cond_signal (&stream->fragment_download_cond); + g_mutex_unlock (&stream->fragment_download_lock); g_error_free (err); g_free (debug); @@ -601,12 +605,16 @@ gst_dash_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) GstDashDemuxStream *stream = iter->data; if (stream->pad == pad) { + g_mutex_lock (&stream->fragment_download_lock); if (stream->last_ret == GST_FLOW_NOT_LINKED) { stream->last_ret = GST_FLOW_OK; stream->restart_download = TRUE; stream->need_header = TRUE; + g_mutex_unlock (&stream->fragment_download_lock); GST_DEBUG_OBJECT (stream->pad, "Restarting download loop"); gst_task_start (stream->download_task); + } else { + g_mutex_unlock (&stream->fragment_download_lock); } GST_DASH_DEMUX_CLIENT_UNLOCK (demux); gst_event_unref (event); @@ -662,11 +670,15 @@ gst_dash_demux_all_streams_eop (GstDashDemux * demux) for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; + g_mutex_lock (&stream->fragment_download_lock); GST_LOG_OBJECT (stream->pad, "EOP: %d (linked: %d)", stream->stream_eos, stream->last_ret != GST_FLOW_NOT_LINKED); - if (!stream->stream_eos && stream->last_ret != GST_FLOW_NOT_LINKED) + if (!stream->stream_eos && stream->last_ret != GST_FLOW_NOT_LINKED) { + g_mutex_unlock (&stream->fragment_download_lock); return FALSE; + } + g_mutex_unlock (&stream->fragment_download_lock); } return TRUE; @@ -724,6 +736,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) g_cond_init (&stream->fragment_download_cond); g_mutex_init (&stream->fragment_download_lock); + stream->download_finished = FALSE; + GST_LOG_OBJECT (demux, "Creating stream %d %" GST_PTR_FORMAT, i, caps); streams = g_slist_prepend (streams, stream); stream->pad = gst_dash_demux_create_pad (demux, stream); @@ -1076,12 +1090,15 @@ gst_dash_demux_stop (GstDashDemux * demux) for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; - stream->last_ret = GST_FLOW_FLUSHING; stream->need_header = TRUE; gst_task_stop (stream->download_task); GST_TASK_SIGNAL (stream->download_task); gst_element_set_state (stream->src, GST_STATE_READY); + g_mutex_lock (&stream->fragment_download_lock); + stream->download_finished = TRUE; + stream->last_ret = GST_FLOW_FLUSHING; g_cond_signal (&stream->fragment_download_cond); + g_mutex_unlock (&stream->fragment_download_lock); } } @@ -1131,7 +1148,7 @@ gst_dash_demux_remove_streams (GstDashDemux * demux, GSList * streams) GstEvent *eos = gst_event_new_eos (); for (iter = streams; iter; iter = g_slist_next (iter)) { - GstDashDemuxStream *stream = iter->data;; + GstDashDemuxStream *stream = iter->data; GST_LOG_OBJECT (stream->pad, "Removing stream %d %" GST_PTR_FORMAT, stream->index, stream->input_caps); @@ -1450,12 +1467,16 @@ gst_dash_demux_combine_flows (GstDashDemux * demux) for (iter = demux->streams; iter; iter = g_slist_next (iter)) { GstDashDemuxStream *stream = iter->data; + g_mutex_lock (&stream->fragment_download_lock); if (stream->last_ret != GST_FLOW_NOT_LINKED) all_notlinked = FALSE; if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED - || stream->last_ret == GST_FLOW_FLUSHING) + || stream->last_ret == GST_FLOW_FLUSHING) { + g_mutex_unlock (&stream->fragment_download_lock); return stream->last_ret; + } + g_mutex_unlock (&stream->fragment_download_lock); } if (all_notlinked) return GST_FLOW_NOT_LINKED; @@ -1524,7 +1545,6 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) goto cancelled; } - stream->last_ret = flow_ret; switch (flow_ret) { case GST_FLOW_OK: break; @@ -1627,13 +1647,13 @@ gst_dash_demux_stream_download_loop (GstDashDemuxStream * stream) break; } - if (G_LIKELY (stream->last_ret != GST_FLOW_ERROR)) + if (G_LIKELY (flow_ret != GST_FLOW_ERROR)) demux->client->update_failed_count = 0; quit: GST_DASH_DEMUX_CLIENT_UNLOCK (demux); - if (G_UNLIKELY (stream->last_ret == GST_FLOW_EOS)) { + if (G_UNLIKELY (flow_ret == GST_FLOW_EOS)) { gst_pad_push_event (stream->pad, gst_event_new_eos ()); } @@ -1753,12 +1773,13 @@ gst_dash_demux_stream_select_representation_unlocked (GstDashDemuxStream * return NULL; } -static void +static GstFlowReturn gst_dash_demux_download_header_fragment (GstDashDemux * demux, GstDashDemuxStream * stream, gchar * path, gint64 range_start, gint64 range_end) { gchar *next_header_uri; + GstFlowReturn ret; if (strncmp (path, "http://", 7) != 0) { next_header_uri = @@ -1769,37 +1790,40 @@ gst_dash_demux_download_header_fragment (GstDashDemux * demux, next_header_uri = path; } - gst_dash_demux_stream_download_uri (demux, stream, next_header_uri, + ret = gst_dash_demux_stream_download_uri (demux, stream, next_header_uri, range_start, range_end); g_free (next_header_uri); + return ret; } -static void +static GstFlowReturn gst_dash_demux_get_next_header (GstDashDemux * demux, GstDashDemuxStream * stream) { gchar *initializationURL; gint64 range_start, range_end; + GstFlowReturn ret; if (!gst_mpd_client_get_next_header (demux->client, &initializationURL, stream->index, &range_start, &range_end)) - return; + return GST_FLOW_OK; GST_INFO_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, initializationURL, range_start, range_end); - gst_dash_demux_download_header_fragment (demux, stream, + ret = gst_dash_demux_download_header_fragment (demux, stream, initializationURL, range_start, range_end); /* check if we have an index */ - if (!demux->cancelled && stream->last_ret == GST_FLOW_OK /* TODO check for other valid types */ + if (!demux->cancelled && ret == GST_FLOW_OK /* TODO check for other valid types */ && gst_mpd_client_get_next_header_index (demux->client, &initializationURL, stream->index, &range_start, &range_end)) { GST_INFO_OBJECT (demux, "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, initializationURL, range_start, range_end); - gst_dash_demux_download_header_fragment (demux, stream, + ret = gst_dash_demux_download_header_fragment (demux, stream, initializationURL, range_start, range_end); } + return ret; } static GstCaps * @@ -2001,11 +2025,13 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) /* TODO properly stop tasks */ /* gst_hls_demux_pause_tasks (demux); */ + g_mutex_lock (&stream->fragment_download_lock); + stream->last_ret = ret; + stream->download_finished = TRUE; g_cond_signal (&stream->fragment_download_cond); + g_mutex_unlock (&stream->fragment_download_lock); } - stream->last_ret = ret; - return ret; } @@ -2017,7 +2043,10 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: + g_mutex_lock (&stream->fragment_download_lock); + stream->download_finished = TRUE; g_cond_signal (&stream->fragment_download_cond); + g_mutex_unlock (&stream->fragment_download_lock); break; default: break; @@ -2076,7 +2105,6 @@ gst_dash_demux_stream_update_source (GstDashDemuxStream * stream, if (!gst_uri_is_valid (uri)) { GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri); - stream->last_ret = GST_FLOW_ERROR; return FALSE; } @@ -2177,16 +2205,21 @@ gst_dash_demux_stream_update_source (GstDashDemuxStream * stream, return TRUE; } -/* must be called with the stream's fragment_download_lock */ -static void +static GstFlowReturn gst_dash_demux_stream_download_uri (GstDashDemux * demux, GstDashDemuxStream * stream, const gchar * uri, gint64 start, gint64 end) { + GstFlowReturn ret = GST_FLOW_OK; GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT, uri, start, end); + stream->download_finished = FALSE; + if (!gst_dash_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) { - return; + g_mutex_lock (&stream->fragment_download_lock); + ret = stream->last_ret = GST_FLOW_ERROR; + g_mutex_unlock (&stream->fragment_download_lock); + return ret; } if (gst_element_set_state (stream->src, @@ -2205,18 +2238,30 @@ gst_dash_demux_stream_download_uri (GstDashDemux * demux, } } + g_mutex_lock (&stream->fragment_download_lock); if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) { + g_mutex_unlock (&stream->fragment_download_lock); stream->download_start_time = g_get_monotonic_time (); gst_element_sync_state_with_parent (stream->src); + g_mutex_lock (&stream->fragment_download_lock); /* wait for the fragment to be completely downloaded */ GST_DEBUG_OBJECT (stream->pad, "Waiting for fragment download to finish: %s", uri); - g_cond_wait (&stream->fragment_download_cond, - &stream->fragment_download_lock); + + while (!demux->cancelled && !stream->download_finished) { + g_cond_wait (&stream->fragment_download_cond, + &stream->fragment_download_lock); + } + ret = stream->last_ret; + + GST_DEBUG_OBJECT (stream->pad, "Fragment download finished: %s", uri); } + g_mutex_unlock (&stream->fragment_download_lock); } else { - stream->last_ret = GST_FLOW_CUSTOM_ERROR; + g_mutex_lock (&stream->fragment_download_lock); + ret = stream->last_ret = GST_FLOW_CUSTOM_ERROR; + g_mutex_unlock (&stream->fragment_download_lock); } /* flush the proxypads so that the EOS state is reset */ @@ -2224,6 +2269,8 @@ gst_dash_demux_stream_download_uri (GstDashDemux * demux, gst_pad_push_event (stream->src_srcpad, gst_event_new_flush_stop (TRUE)); gst_element_set_state (stream->src, GST_STATE_READY); gst_dash_demux_stream_clear_eos_state (stream); + + return ret; } static void @@ -2233,6 +2280,7 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, GstActiveStream *active_stream; guint stream_idx = stream->index; GstMediaFragmentInfo *fragment = &stream->current_fragment; + GstFlowReturn ret = GST_FLOW_OK; if (G_UNLIKELY (stream->restart_download)) { GstClockTime cur, ts; @@ -2294,7 +2342,6 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, stream->restart_download = FALSE; } - g_mutex_lock (&stream->fragment_download_lock); if (gst_mpd_client_get_next_fragment (demux->client, stream_idx, fragment, stream->demux->segment.rate > 0.0)) { GST_INFO_OBJECT (stream->pad, @@ -2305,16 +2352,18 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, fragment->range_start, fragment->range_end); /* Reset last flow return */ + g_mutex_lock (&stream->fragment_download_lock); stream->last_ret = GST_FLOW_OK; stream->starting_fragment = TRUE; + g_mutex_unlock (&stream->fragment_download_lock); if (stream->need_header) { /* We need to fetch a new header */ - gst_dash_demux_get_next_header (demux, stream); + ret = gst_dash_demux_get_next_header (demux, stream); stream->need_header = FALSE; } - if (stream->last_ret != GST_FLOW_OK) { + if (ret != GST_FLOW_OK) { GST_WARNING_OBJECT (stream->pad, "Failed to download headers"); goto exit; } @@ -2334,11 +2383,11 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, "Fragment index download: %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT, uri, fragment->index_range_start, fragment->index_range_end); - gst_dash_demux_stream_download_uri (demux, stream, uri, + ret = gst_dash_demux_stream_download_uri (demux, stream, uri, fragment->index_range_start, fragment->index_range_end); } - if (stream->last_ret != GST_FLOW_OK) { + if (ret != GST_FLOW_OK) { GST_WARNING_OBJECT (stream->pad, "Failed to download fragment headers"); goto exit; } @@ -2347,10 +2396,10 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, goto exit; /* now get the real fragment */ - gst_dash_demux_stream_download_uri (demux, stream, fragment->uri, + ret = gst_dash_demux_stream_download_uri (demux, stream, fragment->uri, fragment->range_start, fragment->range_end); - if (stream->last_ret < GST_FLOW_EOS) { + if (ret < GST_FLOW_EOS) { gst_media_fragment_info_clear (fragment); goto exit; } @@ -2358,11 +2407,14 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, active_stream = stream->active_stream; if (active_stream == NULL) { gst_media_fragment_info_clear (fragment); + g_mutex_lock (&stream->fragment_download_lock); + ret = GST_FLOW_ERROR; stream->last_ret = GST_FLOW_ERROR; + g_mutex_unlock (&stream->fragment_download_lock); goto exit; } - if (stream->last_ret == GST_FLOW_OK) { + if (ret == GST_FLOW_OK) { stream->position += fragment->duration; } @@ -2374,8 +2426,6 @@ gst_dash_demux_stream_download_fragment (GstDashDemux * demux, exit: - g_mutex_unlock (&stream->fragment_download_lock); - if (stream->last_ret == GST_FLOW_OK) gst_element_set_state (stream->src, GST_STATE_READY); else diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index a6caabefdf..e08910d979 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -86,6 +86,7 @@ struct _GstDashDemuxStream GstPad *src_srcpad; GMutex fragment_download_lock; GCond fragment_download_cond; + gboolean download_finished; GstMediaFragmentInfo current_fragment; gboolean starting_fragment; gint64 download_start_time;