From 535f10b61d0c45c8e7f73de298432b5e97385e05 Mon Sep 17 00:00:00 2001 From: Vincent Penquerc'h Date: Wed, 24 Feb 2016 15:52:41 +0000 Subject: [PATCH] adaptivedemux: retry once on 4xx/5xx in certain conditions This helps catch those 404 server errors in live streams when seeking to the very beginning, as the server will handle a request with some delay, which can cause it to drop the fragment before sending it. https://bugzilla.gnome.org/show_bug.cgi?id=753751 --- gst-libs/gst/adaptivedemux/gstadaptivedemux.c | 202 ++++++++++++++---- gst-libs/gst/adaptivedemux/gstadaptivedemux.h | 1 + 2 files changed, 159 insertions(+), 44 deletions(-) diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index fa72d05744..efd10c0353 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -794,6 +794,7 @@ gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg) GError *err = NULL; gchar *debug = NULL; gchar *new_error = NULL; + const GstStructure *details = NULL; GST_MANIFEST_LOCK (demux); @@ -814,6 +815,12 @@ gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg) err->message = new_error; } + gst_message_parse_error_details (msg, &details); + if (details) { + gst_structure_get_uint (details, "http-status-code", + &stream->last_status_code); + } + /* error, but ask to retry */ gst_adaptive_demux_stream_fragment_download_finish (stream, GST_FLOW_CUSTOM_ERROR, err); @@ -2275,6 +2282,17 @@ gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream * g_mutex_unlock (&stream->fragment_download_lock); } +static GstFlowReturn +gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream) +{ + GstFlowReturn ret; + GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux); + + ret = klass->finish_fragment (stream->demux, stream); + gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL); + return ret; +} + static gboolean _src_event (GstPad * pad, GstObject * parent, GstEvent * event) { @@ -2283,20 +2301,14 @@ _src_event (GstPad * pad, GstObject * parent, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS:{ - GstAdaptiveDemuxClass *klass; - GstFlowReturn ret; - GST_MANIFEST_LOCK (demux); - klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux); - g_mutex_lock (&stream->fragment_download_lock); stream->download_finished = TRUE; g_cond_signal (&stream->fragment_download_cond); g_mutex_unlock (&stream->fragment_download_lock); - ret = klass->finish_fragment (demux, stream); - gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL); + gst_adaptive_demux_eos_handling (stream); GST_MANIFEST_UNLOCK (demux); break; @@ -2562,11 +2574,12 @@ gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream, stream->uri_handler = uri_handler; stream->queue = queue; + + stream->last_status_code = 200; /* default to OK */ } return TRUE; } - static GstPadProbeReturn gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) @@ -2582,6 +2595,18 @@ gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info, return GST_PAD_PROBE_OK; } +#ifndef GST_DISABLE_GST_DEBUG +static const char * +uritype (GstAdaptiveDemuxStream * s) +{ + if (s->downloading_header) + return "header"; + if (s->downloading_index) + return "index"; + return "fragment"; +} +#endif + /* must be called with manifest_lock taken. * Can temporarily release manifest_lock */ @@ -2591,8 +2616,9 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, gint64 end, guint * http_status) { GstFlowReturn ret = GST_FLOW_OK; - GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT - " - %" G_GINT64_FORMAT, uri, start, end); + GST_DEBUG_OBJECT (stream->pad, + "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT, + uritype (stream), uri, start, end); if (http_status) *http_status = 200; /* default to ok if no further information */ @@ -2650,7 +2676,7 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, /* wait for the fragment to be completely downloaded */ GST_DEBUG_OBJECT (stream->pad, - "Waiting for fragment download to finish: %s", uri); + "Waiting for %s download to finish: %s", uritype (stream), uri); g_mutex_lock (&stream->fragment_download_lock); stream->src_at_ready = FALSE; @@ -2677,8 +2703,9 @@ gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux, ret = stream->last_ret; - GST_DEBUG_OBJECT (stream->pad, "Fragment download finished: %s %d %s", - uri, stream->last_ret, gst_flow_get_name (stream->last_ret)); + GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s", + uritype (stream), uri, stream->last_ret, + gst_flow_get_name (stream->last_ret)); if (stream->last_ret != GST_FLOW_OK && http_status) { *http_status = stream->last_status_code; } @@ -2776,7 +2803,10 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream) { GstAdaptiveDemux *demux = stream->demux; gchar *url = NULL; - GstFlowReturn ret = GST_FLOW_OK; + GstFlowReturn ret; + gboolean retried_once = FALSE, live; + guint http_status; + guint last_status_code; stream->starting_fragment = TRUE; stream->last_ret = GST_FLOW_OK; @@ -2794,44 +2824,128 @@ gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream) stream->need_header = FALSE; } +again: + ret = GST_FLOW_OK; url = stream->fragment.uri; GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream); - if (url) { - guint http_status = 200; - ret = - gst_adaptive_demux_stream_download_uri (demux, stream, url, - stream->fragment.range_start, stream->fragment.range_end, &http_status); - GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s", - stream->last_ret, http_status, gst_flow_get_name (stream->last_ret)); - if (ret != GST_FLOW_OK) { - g_mutex_lock (&stream->fragment_download_lock); - if (G_UNLIKELY (stream->cancelled)) { - g_mutex_unlock (&stream->fragment_download_lock); - return ret; - } - g_mutex_unlock (&stream->fragment_download_lock); + if (!url) + return ret; - /* TODO check if we are truly stopping */ - if (ret == GST_FLOW_CUSTOM_ERROR && gst_adaptive_demux_is_live (demux)) { - if (++stream->download_error_count <= MAX_DOWNLOAD_ERROR_COUNT) { - /* looks like there is no way of knowing when a live stream has ended - * Have to assume we are falling behind and cause a manifest reload */ - GST_DEBUG_OBJECT (stream->pad, - "Converting error of live stream to EOS"); - return GST_FLOW_EOS; + stream->last_ret = GST_FLOW_OK; + http_status = 200; + ret = + gst_adaptive_demux_stream_download_uri (demux, stream, url, + stream->fragment.range_start, stream->fragment.range_end, &http_status); + GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s", + stream->last_ret, http_status, gst_flow_get_name (stream->last_ret)); + if (ret == GST_FLOW_OK) + goto beach; + + g_mutex_lock (&stream->fragment_download_lock); + if (G_UNLIKELY (stream->cancelled)) { + g_mutex_unlock (&stream->fragment_download_lock); + return ret; + } + g_mutex_unlock (&stream->fragment_download_lock); + + /* TODO check if we are truly stopping */ + if (ret != GST_FLOW_CUSTOM_ERROR) + goto beach; + + last_status_code = stream->last_status_code; + GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d", + last_status_code, stream->download_error_count); + + live = gst_adaptive_demux_is_live (demux); + if (!retried_once && ((last_status_code / 100 == 4 && live) || last_status_code / 100 == 5)) { /* 4xx/5xx */ + /* if current position is before available start, switch to next */ + if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) + goto flushing; + + if (live) { + gint64 range_start, range_stop; + + if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start, + &range_stop)) + goto flushing; + + if (demux->segment.position < range_start) { + GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment"); + stream->last_ret = GST_FLOW_OK; + ret = gst_adaptive_demux_eos_handling (stream); + GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s", + gst_flow_get_name (ret)); + ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream); + GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s", + gst_flow_get_name (ret)); + if (ret == GST_FLOW_OK) { + retried_once = TRUE; + goto again; + } + } else if (demux->segment.position > range_stop) { + /* wait a bit to be in range, we don't have any locks at that point */ + gint64 wait_time = + gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream); + if (wait_time > 0) { + gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND; + + GST_DEBUG_OBJECT (stream->pad, + "Download waiting for %" GST_TIME_FORMAT, + GST_TIME_ARGS (wait_time)); + + GST_MANIFEST_UNLOCK (demux); + g_mutex_lock (&stream->fragment_download_lock); + if (G_UNLIKELY (stream->cancelled)) { + g_mutex_unlock (&stream->fragment_download_lock); + GST_MANIFEST_LOCK (demux); + stream->last_ret = GST_FLOW_FLUSHING; + goto flushing; + } + do { + g_cond_wait_until (&stream->fragment_download_cond, + &stream->fragment_download_lock, end_time); + if (G_UNLIKELY (stream->cancelled)) { + g_mutex_unlock (&stream->fragment_download_lock); + GST_MANIFEST_LOCK (demux); + stream->last_ret = GST_FLOW_FLUSHING; + goto flushing; + } + } while (!stream->download_finished); + g_mutex_unlock (&stream->fragment_download_lock); + + GST_MANIFEST_LOCK (demux); } - } else if (ret == GST_FLOW_CUSTOM_ERROR - && !gst_adaptive_demux_stream_has_next_fragment (demux, stream)) { - /* If this is the last fragment, consider failures EOS and not actual - * errors. Due to rounding errors in the durations, the last fragment - * might not actually exist */ - GST_DEBUG_OBJECT (stream->pad, - "Converting error for last fragment to EOS"); - return GST_FLOW_EOS; } } + + flushing: + if (++stream->download_error_count <= MAX_DOWNLOAD_ERROR_COUNT) { + /* looks like there is no way of knowing when a live stream has ended + * Have to assume we are falling behind and cause a manifest reload */ + GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS"); + return GST_FLOW_EOS; + } } + else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) { + /* If this is the last fragment, consider failures EOS and not actual + * errors. Due to rounding errors in the durations, the last fragment + * might not actually exist */ + GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS"); + return GST_FLOW_EOS; + } else { + /* retry once (same segment) for 5xx (server errors) */ + if (!retried_once) { + retried_once = TRUE; + /* wait a short time in case the server needs a bit to recover, we don't + * care if we get woken up before end time. We can use sleep here since + * we're already blocking and just want to wait some time. */ + g_usleep (100000); /* a tenth of a second */ + goto again; + } + } + +beach: return ret; no_url_error: diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h index cd8ed107db..623cd32f28 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h @@ -145,6 +145,7 @@ struct _GstAdaptiveDemuxStream /* download tooling */ GstElement *src; + guint last_status_code; GstPad *src_srcpad; GstElement *uri_handler; GstElement *queue;