From f9b0d59e84211fb822ca61634c57fe128e4dfeb0 Mon Sep 17 00:00:00 2001 From: Thibault Saunier Date: Thu, 15 Mar 2012 14:42:44 -0400 Subject: [PATCH] hlsdemux: Replace the fetcher code with a GstURIDownloader object --- gst/hls/gsthlsdemux.c | 350 ++++++++++-------------------------------- gst/hls/gsthlsdemux.h | 17 +- 2 files changed, 86 insertions(+), 281 deletions(-) diff --git a/gst/hls/gsthlsdemux.c b/gst/hls/gsthlsdemux.c index 1aff78893b..f0054bfafd 100644 --- a/gst/hls/gsthlsdemux.c +++ b/gst/hls/gsthlsdemux.c @@ -60,11 +60,6 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-hls")); -static GstStaticPadTemplate fetchertemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - GST_DEBUG_CATEGORY_STATIC (gst_hls_demux_debug); #define GST_CAT_DEFAULT gst_hls_demux_debug @@ -95,30 +90,23 @@ static GstStateChangeReturn gst_hls_demux_change_state (GstElement * element, GstStateChange transition); /* GstHLSDemux */ -static GstBusSyncReply gst_hls_demux_fetcher_bus_handler (GstBus * bus, - GstMessage * message, gpointer data); static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstBuffer * buf); static gboolean gst_hls_demux_sink_event (GstPad * pad, GstEvent * event); static gboolean gst_hls_demux_src_event (GstPad * pad, GstEvent * event); static gboolean gst_hls_demux_src_query (GstPad * pad, GstQuery * query); -static GstFlowReturn gst_hls_demux_fetcher_chain (GstPad * pad, - GstBuffer * buf); -static gboolean gst_hls_demux_fetcher_sink_event (GstPad * pad, - GstEvent * event); static void gst_hls_demux_stream_loop (GstHLSDemux * demux); static void gst_hls_demux_updates_loop (GstHLSDemux * demux); static void gst_hls_demux_stop (GstHLSDemux * demux); -static void gst_hls_demux_stop_fetcher_locked (GstHLSDemux * demux, - gboolean cancelled); static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux); static gboolean gst_hls_demux_schedule (GstHLSDemux * demux); static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux); -static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux); +static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux, + gboolean caching); static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux); static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose); static gboolean gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri); -static gchar *gst_hls_src_buf_to_utf8_playlist (gchar * string, guint size); +static gchar *gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf); static void _do_init (GType type) @@ -152,9 +140,6 @@ gst_hls_demux_dispose (GObject * obj) { GstHLSDemux *demux = GST_HLS_DEMUX (obj); - g_cond_free (demux->fetcher_cond); - g_mutex_free (demux->fetcher_lock); - if (demux->stream_task) { if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { GST_DEBUG_OBJECT (demux, "Leaving streaming task"); @@ -178,13 +163,14 @@ gst_hls_demux_dispose (GObject * obj) demux->updates_task = NULL; } - gst_object_unref (demux->fetcher_bus); - gst_object_unref (demux->fetcherpad); + if (demux->downloader != NULL) { + g_object_unref (demux->downloader); + demux->downloader = NULL; + } gst_hls_demux_reset (demux, TRUE); g_queue_free (demux->queue); - gst_object_unref (demux->download); G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -231,15 +217,8 @@ gst_hls_demux_init (GstHLSDemux * demux, GstHLSDemuxClass * klass) GST_DEBUG_FUNCPTR (gst_hls_demux_sink_event)); gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad); - /* fetcher pad */ - demux->fetcherpad = - gst_pad_new_from_static_template (&fetchertemplate, "sink"); - gst_pad_set_chain_function (demux->fetcherpad, - GST_DEBUG_FUNCPTR (gst_hls_demux_fetcher_chain)); - gst_pad_set_event_function (demux->fetcherpad, - GST_DEBUG_FUNCPTR (gst_hls_demux_fetcher_sink_event)); - gst_pad_set_element_private (demux->fetcherpad, demux); - gst_pad_activate_push (demux->fetcherpad, TRUE); + /* Downloader */ + demux->downloader = gst_uri_downloader_new (); demux->do_typefind = TRUE; @@ -247,12 +226,6 @@ gst_hls_demux_init (GstHLSDemux * demux, GstHLSDemuxClass * klass) demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE; demux->bitrate_switch_tol = DEFAULT_BITRATE_SWITCH_TOLERANCE; - demux->download = gst_adapter_new (); - demux->fetcher_bus = gst_bus_new (); - gst_bus_set_sync_handler (demux->fetcher_bus, - gst_hls_demux_fetcher_bus_handler, demux); - demux->fetcher_cond = g_cond_new (); - demux->fetcher_lock = g_mutex_new (); demux->queue = g_queue_new (); /* Updates task */ @@ -413,9 +386,7 @@ gst_hls_demux_src_event (GstPad * pad, GstEvent * event) demux->cancelled = TRUE; gst_task_pause (demux->stream_task); - g_mutex_lock (demux->fetcher_lock); - gst_hls_demux_stop_fetcher_locked (demux, TRUE); - g_mutex_unlock (demux->fetcher_lock); + gst_uri_downloader_cancel (demux->downloader); gst_task_stop (demux->updates_task); gst_task_pause (demux->stream_task); @@ -424,11 +395,10 @@ gst_hls_demux_src_event (GstPad * pad, GstEvent * event) demux->need_cache = TRUE; while (!g_queue_is_empty (demux->queue)) { - GstBuffer *buf = g_queue_pop_head (demux->queue); - gst_buffer_unref (buf); + GstBufferList *buf_list = g_queue_pop_head (demux->queue); + gst_buffer_list_unref (buf_list); } g_queue_clear (demux->queue); - gst_adapter_clear (demux->download); GST_M3U8_CLIENT_LOCK (demux->client); GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence); @@ -487,9 +457,7 @@ gst_hls_demux_sink_event (GstPad * pad, GstEvent * event) } gst_query_unref (query); - playlist = gst_hls_src_buf_to_utf8_playlist ((gchar *) - GST_BUFFER_DATA (demux->playlist), GST_BUFFER_SIZE (demux->playlist)); - gst_buffer_unref (demux->playlist); + playlist = gst_hls_src_buf_to_utf8_playlist (demux->playlist); demux->playlist = NULL; if (playlist == NULL) { GST_WARNING_OBJECT (demux, "Error validating first playlist."); @@ -592,27 +560,6 @@ gst_hls_demux_src_query (GstPad * pad, GstQuery * query) return ret; } -static gboolean -gst_hls_demux_fetcher_sink_event (GstPad * pad, GstEvent * event) -{ - GstHLSDemux *demux = GST_HLS_DEMUX (gst_pad_get_element_private (pad)); - - switch (event->type) { - case GST_EVENT_EOS:{ - GST_DEBUG_OBJECT (demux, "Got EOS on the fetcher pad"); - /* signal we have fetched the URI */ - if (!demux->cancelled) { - g_cond_broadcast (demux->fetcher_cond); - } - } - default: - break; - } - - gst_event_unref (event); - return FALSE; -} - static GstFlowReturn gst_hls_demux_chain (GstPad * pad, GstBuffer * buf) { @@ -628,70 +575,10 @@ gst_hls_demux_chain (GstPad * pad, GstBuffer * buf) return GST_FLOW_OK; } -static GstFlowReturn -gst_hls_demux_fetcher_chain (GstPad * pad, GstBuffer * buf) -{ - GstHLSDemux *demux = GST_HLS_DEMUX (gst_pad_get_element_private (pad)); - - /* The source element can be an http source element. In case we get a 404, - * the html response will be sent downstream and the adapter - * will not be null, which might make us think that the request proceed - * successfully. But it will also post an error message in the bus that - * is handled synchronously and that will set demux->fetcher_error to TRUE, - * which is used to discard this buffer with the html response. */ - if (demux->fetcher_error) { - goto done; - } - - gst_adapter_push (demux->download, buf); - -done: - { - return GST_FLOW_OK; - } -} - -static void -gst_hls_demux_stop_fetcher_locked (GstHLSDemux * demux, gboolean cancelled) -{ - GstPad *pad; - - /* When the fetcher is stopped while it's downloading, we will get an EOS that - * unblocks the fetcher thread and tries to stop it again from that thread. - * Here we check if the fetcher as already been stopped before continuing */ - if (demux->fetcher == NULL || demux->stopping_fetcher) - return; - - GST_DEBUG_OBJECT (demux, "Stopping fetcher."); - demux->stopping_fetcher = TRUE; - /* set the element state to NULL */ - gst_element_set_state (demux->fetcher, GST_STATE_NULL); - gst_element_get_state (demux->fetcher, NULL, NULL, GST_CLOCK_TIME_NONE); - /* unlink it from the internal pad */ - pad = gst_pad_get_peer (demux->fetcherpad); - if (pad) { - gst_pad_unlink (pad, demux->fetcherpad); - gst_object_unref (pad); - } - /* and finally unref it */ - gst_object_unref (demux->fetcher); - demux->fetcher = NULL; - - /* if we stopped it to cancell a download, free the cached buffer */ - if (cancelled && gst_adapter_available (demux->download)) { - gst_adapter_clear (demux->download); - } - /* signal the fetcher thread that the download has finished/cancelled */ - if (cancelled) - g_cond_broadcast (demux->fetcher_cond); -} - static void gst_hls_demux_stop (GstHLSDemux * demux) { - g_mutex_lock (demux->fetcher_lock); - gst_hls_demux_stop_fetcher_locked (demux, TRUE); - g_mutex_unlock (demux->fetcher_lock); + gst_uri_downloader_cancel (demux->downloader); if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { demux->stop_stream_task = TRUE; @@ -708,7 +595,8 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) { GstPad *oldpad = demux->srcpad; - GST_DEBUG ("Switching pads (oldpad:%p)", oldpad); + GST_DEBUG ("Switching pads (oldpad:%p) with caps: %" GST_PTR_FORMAT, oldpad, + newcaps); /* FIXME: This is a workaround for a bug in playsink. * If we're switching from an audio-only or video-only fragment @@ -746,6 +634,7 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) static void gst_hls_demux_stream_loop (GstHLSDemux * demux) { + GstBufferList *buffer_list; GstBuffer *buf; GstFlowReturn ret; @@ -772,8 +661,9 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux) goto pause_task; } - buf = g_queue_pop_head (demux->queue); - + buffer_list = g_queue_pop_head (demux->queue); + /* Work with the first buffer of the list */ + buf = gst_buffer_list_get (buffer_list, 0, 0); /* Figure out if we need to create/switch pads */ if (G_UNLIKELY (!demux->srcpad || GST_BUFFER_CAPS (buf) != GST_PAD_CAPS (demux->srcpad) @@ -796,9 +686,9 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux) if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (buf))) demux->position += GST_BUFFER_DURATION (buf); - ret = gst_pad_push (demux->srcpad, buf); + ret = gst_pad_push_list (demux->srcpad, buffer_list); if (ret != GST_FLOW_OK) - goto error; + goto error_pushing; return; @@ -821,10 +711,11 @@ cache_error: return; } -error: +error_pushing: { /* FIXME: handle error */ - GST_DEBUG_OBJECT (demux, "error, stopping task"); + GST_DEBUG_OBJECT (demux, "Error pushing buffer: %s... stopping task", + gst_flow_get_name (ret)); gst_hls_demux_stop (demux); return; } @@ -836,50 +727,6 @@ pause_task: } } -static GstBusSyncReply -gst_hls_demux_fetcher_bus_handler (GstBus * bus, - GstMessage * message, gpointer data) -{ - GstHLSDemux *demux = GST_HLS_DEMUX (data); - - if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) { - demux->fetcher_error = TRUE; - if (!demux->cancelled) { - g_mutex_lock (demux->fetcher_lock); - g_cond_broadcast (demux->fetcher_cond); - g_mutex_unlock (demux->fetcher_lock); - } - } - - gst_message_unref (message); - return GST_BUS_DROP; -} - -static gboolean -gst_hls_demux_make_fetcher_locked (GstHLSDemux * demux, const gchar * uri) -{ - GstPad *pad; - - if (!gst_uri_is_valid (uri)) - return FALSE; - - GST_DEBUG_OBJECT (demux, "Creating fetcher for the URI:%s", uri); - demux->fetcher = gst_element_make_from_uri (GST_URI_SRC, uri, NULL); - if (!demux->fetcher) - return FALSE; - - demux->fetcher_error = FALSE; - demux->stopping_fetcher = FALSE; - gst_element_set_bus (GST_ELEMENT (demux->fetcher), demux->fetcher_bus); - - pad = gst_element_get_static_pad (demux->fetcher, "src"); - if (pad) { - gst_pad_link (pad, demux->fetcherpad); - gst_object_unref (pad); - } - return TRUE; -} - static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) { @@ -899,8 +746,6 @@ gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) demux->playlist = NULL; } - gst_adapter_clear (demux->download); - if (demux->client) { gst_m3u8_client_free (demux->client); demux->client = NULL; @@ -911,8 +756,8 @@ gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) } while (!g_queue_is_empty (demux->queue)) { - GstBuffer *buf = g_queue_pop_head (demux->queue); - gst_buffer_unref (buf); + GstBufferList *buffer_list = g_queue_pop_head (demux->queue); + gst_buffer_list_unref (buffer_list); } g_queue_clear (demux->queue); @@ -980,7 +825,7 @@ gst_hls_demux_updates_loop (GstHLSDemux * demux) /* fetch the next fragment */ if (g_queue_is_empty (demux->queue)) { - if (!gst_hls_demux_get_next_fragment (demux)) { + if (!gst_hls_demux_get_next_fragment (demux, FALSE)) { if (!demux->end_of_playlist && !demux->cancelled) { demux->client->update_failed_count++; if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) { @@ -1003,6 +848,7 @@ gst_hls_demux_updates_loop (GstHLSDemux * demux) quit: { + GST_DEBUG_OBJECT (demux, "Stopped updates task"); gst_hls_demux_stop (demux); g_mutex_unlock (demux->updates_timed_lock); } @@ -1060,7 +906,7 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux) g_time_val_add (&demux->next_update, gst_m3u8_client_get_target_duration (demux->client) / GST_SECOND * G_USEC_PER_SEC); - if (!gst_hls_demux_get_next_fragment (demux)) { + if (!gst_hls_demux_get_next_fragment (demux, TRUE)) { if (demux->end_of_playlist) break; if (!demux->cancelled) @@ -1079,98 +925,58 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux) demux->need_cache = FALSE; return TRUE; -} -static gboolean -gst_hls_demux_fetch_location (GstHLSDemux * demux, const gchar * uri) -{ - GstStateChangeReturn ret; - gboolean bret = FALSE; - - g_mutex_lock (demux->fetcher_lock); - - while (demux->fetcher) - g_cond_wait (demux->fetcher_cond, demux->fetcher_lock); - - if (demux->cancelled) - goto quit; - - if (!gst_hls_demux_make_fetcher_locked (demux, uri)) { - goto uri_error; - } - - ret = gst_element_set_state (demux->fetcher, GST_STATE_PLAYING); - if (ret == GST_STATE_CHANGE_FAILURE) - goto state_change_error; - - /* wait until we have fetched the uri */ - GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI"); - g_cond_wait (demux->fetcher_cond, demux->fetcher_lock); - - gst_hls_demux_stop_fetcher_locked (demux, FALSE); - - if (!demux->fetcher_error && gst_adapter_available (demux->download)) { - GST_INFO_OBJECT (demux, "URI fetched successfully"); - bret = TRUE; - } - goto quit; - -uri_error: - { - GST_ELEMENT_ERROR (demux, RESOURCE, OPEN_READ, - ("Could not create an element to fetch the given URI."), ("URI: \"%s\"", - uri)); - bret = FALSE; - goto quit; - } - -state_change_error: - { - GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE, - ("Error changing state of the fetcher element."), (NULL)); - bret = FALSE; - goto quit; - } - -quit: - { - /* Unlock any other fetcher that might be waiting */ - g_cond_broadcast (demux->fetcher_cond); - g_mutex_unlock (demux->fetcher_lock); - return bret; - } } static gchar * -gst_hls_src_buf_to_utf8_playlist (gchar * data, guint size) +gst_hls_src_buf_to_utf8_playlist (GstBuffer * buf) { + gint size; + gchar *data; gchar *playlist; + data = (gchar *) GST_BUFFER_DATA (buf); + size = GST_BUFFER_SIZE (buf); + if (!g_utf8_validate (data, size, NULL)) return NULL; /* alloc size + 1 to end with a null character */ playlist = g_malloc0 (size + 1); memcpy (playlist, data, size + 1); + + gst_buffer_unref (buf); return playlist; } static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux) { - const guint8 *data; + GstFragment *download; + GstBufferListIterator *it; + GstBuffer *buf; gchar *playlist; - guint avail; + const gchar *uri = gst_m3u8_client_get_current_uri (demux->client); - GST_INFO_OBJECT (demux, "Updating the playlist %s", uri); - if (!gst_hls_demux_fetch_location (demux, uri)) + download = gst_uri_downloader_fetch_uri (demux->downloader, uri); + + if (download == NULL) return FALSE; - avail = gst_adapter_available (demux->download); - data = gst_adapter_peek (demux->download, avail); - playlist = gst_hls_src_buf_to_utf8_playlist ((gchar *) data, avail); - gst_adapter_clear (demux->download); + /* Merge all the buffers in the list to build a unique buffer with the + * playlist */ + it = gst_buffer_list_iterate (gst_fragment_get_buffer_list (download)); + + /* skip the first group, which contains the headers, which are not set in the + * demuxer*/ + gst_buffer_list_iterator_next_group (it); + buf = gst_buffer_list_iterator_merge_group (it); + + playlist = gst_hls_src_buf_to_utf8_playlist (buf); + gst_buffer_list_iterator_free (it); + g_object_unref (download); + if (playlist == NULL) { GST_WARNING_OBJECT (demux, "Couldn't not validate playlist encoding"); return FALSE; @@ -1298,13 +1104,14 @@ gst_hls_demux_switch_playlist (GstHLSDemux * demux) } static gboolean -gst_hls_demux_get_next_fragment (GstHLSDemux * demux) +gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching) { - GstBuffer *buf; - guint avail; + GstFragment *download; const gchar *next_fragment_uri; GstClockTime duration; GstClockTime timestamp; + GstBufferList *buffer_list; + GstBuffer *buf; gboolean discont; if (!gst_m3u8_client_get_next_fragment (demux->client, &discont, @@ -1317,18 +1124,14 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux) GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri); - if (!gst_hls_demux_fetch_location (demux, next_fragment_uri)) { - /* FIXME: The gst_m3u8_get_next_fragment increments the sequence number - but another thread might call get_next_fragment and this decrement - will not redownload the failed fragment, but might duplicate the - download of a succeeded fragment - */ - g_atomic_int_add (&demux->client->sequence, -1); - return FALSE; - } + download = gst_uri_downloader_fetch_uri (demux->downloader, + next_fragment_uri); - avail = gst_adapter_available (demux->download); - buf = gst_adapter_take_buffer (demux->download, avail); + if (download == NULL) + goto error; + + buffer_list = gst_fragment_get_buffer_list (download); + buf = gst_buffer_list_get (buffer_list, 0, 0); GST_BUFFER_DURATION (buf) = duration; GST_BUFFER_TIMESTAMP (buf) = timestamp; @@ -1352,8 +1155,17 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux) GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT); } - g_queue_push_tail (demux->queue, buf); - gst_task_start (demux->stream_task); - gst_adapter_clear (demux->download); + g_queue_push_tail (demux->queue, buffer_list); + g_object_unref (download); + if (!caching) { + GST_TASK_SIGNAL (demux->updates_task); + } + return TRUE; + +error: + { + gst_hls_demux_stop (demux); + return FALSE; + } } diff --git a/gst/hls/gsthlsdemux.h b/gst/hls/gsthlsdemux.h index f304a06e73..16c86775ea 100644 --- a/gst/hls/gsthlsdemux.h +++ b/gst/hls/gsthlsdemux.h @@ -27,6 +27,8 @@ #include #include #include "m3u8.h" +#include "gstfragmented.h" +#include "gsturidownloader.h" G_BEGIN_DECLS #define GST_TYPE_HLS_DEMUX \ @@ -53,8 +55,10 @@ struct _GstHLSDemux GstPad *srcpad; GstPad *sinkpad; + GstBuffer *playlist; GstCaps *input_caps; + GstUriDownloader *downloader; GstM3U8Client *client; /* M3U8 client */ GQueue *queue; /* Queue storing the fetched fragments */ gboolean need_cache; /* Wheter we need to cache some fragments before starting to push data */ @@ -70,24 +74,13 @@ struct _GstHLSDemux GStaticRecMutex stream_lock; gboolean stop_stream_task; - /* Fragments fetcher */ - GstElement *fetcher; - GstBus *fetcher_bus; - GstPad *fetcherpad; - GMutex *fetcher_lock; - GCond *fetcher_cond; - GTimeVal *timeout; - gboolean fetcher_error; - gboolean stopping_fetcher; - gboolean cancelled; - GstAdapter *download; - /* Updates task */ GstTask *updates_task; GStaticRecMutex updates_lock; GMutex *updates_timed_lock; GTimeVal next_update; /* Time of the next update */ gint64 accumulated_delay; /* Delay accumulated fetching fragments, used to decide a playlist switch */ + gboolean cancelled; /* Position in the stream */ GstClockTime position;