diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c index 0c3ddd0908..4371da3a35 100644 --- a/ext/hls/gsthlsdemux.c +++ b/ext/hls/gsthlsdemux.c @@ -4,6 +4,7 @@ * Copyright (C) 2011, Hewlett-Packard Development Company, L.P. * Author: Youness Alaoui , Collabora Ltd. * Author: Sebastian Dröge , Collabora Ltd. + * Copyright (C) 2014 Sebastian Dröge * * Gsthlsdemux.c: * @@ -73,7 +74,7 @@ enum PROP_LAST }; -#define DEFAULT_FRAGMENTS_CACHE 3 +#define DEFAULT_FRAGMENTS_CACHE 1 #define DEFAULT_FAILED_COUNT 3 #define DEFAULT_BITRATE_LIMIT 0.8 #define DEFAULT_CONNECTION_SPEED 0 @@ -101,12 +102,11 @@ static gboolean gst_hls_demux_src_query (GstPad * pad, GstObject * parent, static void gst_hls_demux_stream_loop (GstHLSDemux * demux); static void gst_hls_demux_updates_loop (GstHLSDemux * demux); static void gst_hls_demux_stop (GstHLSDemux * demux); -static void gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching); -static gboolean gst_hls_demux_cache_fragments (GstHLSDemux * demux); -static gboolean gst_hls_demux_schedule (GstHLSDemux * demux); -static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux); -static gboolean gst_hls_demux_get_next_fragment (GstHLSDemux * demux, - gboolean caching, GError ** err); +static void gst_hls_demux_pause_tasks (GstHLSDemux * demux); +static gboolean gst_hls_demux_switch_playlist (GstHLSDemux * demux, + GstFragment * fragment); +static GstFragment *gst_hls_demux_get_next_fragment (GstHLSDemux * demux, + gboolean * end_of_playlist, GError ** err); static gboolean gst_hls_demux_update_playlist (GstHLSDemux * demux, gboolean update, GError ** err); static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose); @@ -123,33 +123,13 @@ gst_hls_demux_dispose (GObject * obj) GstHLSDemux *demux = GST_HLS_DEMUX (obj); if (demux->stream_task) { - if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { - GST_DEBUG_OBJECT (demux, "Leaving streaming task"); - gst_task_stop (demux->stream_task); - g_rec_mutex_lock (&demux->stream_lock); - g_rec_mutex_unlock (&demux->stream_lock); - gst_task_join (demux->stream_task); - } gst_object_unref (demux->stream_task); g_rec_mutex_clear (&demux->stream_lock); demux->stream_task = NULL; } if (demux->updates_task) { - if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { - GST_DEBUG_OBJECT (demux, "Leaving updates task"); - demux->cancelled = TRUE; - gst_uri_downloader_cancel (demux->downloader); - gst_task_stop (demux->updates_task); - g_mutex_lock (&demux->updates_timed_lock); - GST_TASK_SIGNAL (demux->updates_task); - g_rec_mutex_lock (&demux->updates_lock); - g_rec_mutex_unlock (&demux->updates_lock); - g_mutex_unlock (&demux->updates_timed_lock); - gst_task_join (demux->updates_task); - } gst_object_unref (demux->updates_task); - g_mutex_clear (&demux->updates_timed_lock); g_rec_mutex_clear (&demux->updates_lock); demux->updates_task = NULL; } @@ -161,7 +141,9 @@ gst_hls_demux_dispose (GObject * obj) gst_hls_demux_reset (demux, TRUE); - g_queue_free (demux->queue); + g_mutex_clear (&demux->download_lock); + g_cond_clear (&demux->download_cond); + g_cond_clear (&demux->updates_timed_cond); G_OBJECT_CLASS (parent_class)->dispose (obj); } @@ -238,14 +220,15 @@ gst_hls_demux_init (GstHLSDemux * demux) demux->bitrate_limit = DEFAULT_BITRATE_LIMIT; demux->connection_speed = DEFAULT_CONNECTION_SPEED; - demux->queue = g_queue_new (); + g_mutex_init (&demux->download_lock); + g_cond_init (&demux->download_cond); + g_cond_init (&demux->updates_timed_cond); /* Updates task */ g_rec_mutex_init (&demux->updates_lock); demux->updates_task = gst_task_new ((GstTaskFunction) gst_hls_demux_updates_loop, demux, NULL); gst_task_set_lock (demux->updates_task, &demux->updates_lock); - g_mutex_init (&demux->updates_timed_lock); /* Streaming task */ g_rec_mutex_init (&demux->stream_lock); @@ -320,10 +303,9 @@ gst_hls_demux_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: - demux->cancelled = TRUE; gst_hls_demux_stop (demux); - gst_task_join (demux->stream_task); gst_task_join (demux->updates_task); + gst_task_join (demux->stream_task); gst_hls_demux_reset (demux, FALSE); break; default: @@ -396,26 +378,13 @@ gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_pad_push_event (demux->srcpad, gst_event_new_flush_start ()); } - demux->cancelled = TRUE; - gst_task_pause (demux->stream_task); - gst_uri_downloader_cancel (demux->downloader); - gst_task_stop (demux->updates_task); - g_mutex_lock (&demux->updates_timed_lock); - GST_TASK_SIGNAL (demux->updates_task); - g_mutex_unlock (&demux->updates_timed_lock); - g_rec_mutex_lock (&demux->updates_lock); - g_rec_mutex_unlock (&demux->updates_lock); - gst_task_pause (demux->stream_task); + gst_hls_demux_pause_tasks (demux); /* wait for streaming to finish */ - g_rec_mutex_lock (&demux->stream_lock); + g_rec_mutex_lock (&demux->updates_lock); + g_rec_mutex_unlock (&demux->updates_lock); - demux->need_cache = TRUE; - while (!g_queue_is_empty (demux->queue)) { - GstFragment *fragment = g_queue_pop_head (demux->queue); - g_object_unref (fragment); - } - g_queue_clear (demux->queue); + g_rec_mutex_lock (&demux->stream_lock); GST_M3U8_CLIENT_LOCK (demux->client); GST_DEBUG_OBJECT (demux, "seeking to sequence %d", current_sequence); @@ -430,9 +399,11 @@ gst_hls_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_pad_push_event (demux->srcpad, gst_event_new_flush_stop (TRUE)); } - demux->cancelled = FALSE; + demux->stop_updates_task = FALSE; gst_uri_downloader_reset (demux->downloader); - gst_task_start (demux->stream_task); + demux->stop_stream_task = FALSE; + + gst_task_start (demux->updates_task); g_rec_mutex_unlock (&demux->stream_lock); return TRUE; @@ -497,7 +468,7 @@ gst_hls_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) return FALSE; } - gst_task_start (demux->stream_task); + gst_task_start (demux->updates_task); gst_event_unref (event); return TRUE; } @@ -594,21 +565,22 @@ gst_hls_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) } static void -gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching) +gst_hls_demux_pause_tasks (GstHLSDemux * demux) { if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { - demux->cancelled = TRUE; + g_mutex_lock (&demux->updates_timed_lock); + demux->stop_updates_task = TRUE; + g_cond_signal (&demux->updates_timed_cond); + g_mutex_unlock (&demux->updates_timed_lock); gst_uri_downloader_cancel (demux->downloader); gst_task_pause (demux->updates_task); - if (!caching) - g_mutex_lock (&demux->updates_timed_lock); - GST_TASK_SIGNAL (demux->updates_task); - if (!caching) - g_mutex_unlock (&demux->updates_timed_lock); } if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { + g_mutex_lock (&demux->download_lock); demux->stop_stream_task = TRUE; + g_cond_signal (&demux->download_cond); + g_mutex_unlock (&demux->download_lock); gst_task_pause (demux->stream_task); } } @@ -616,21 +588,22 @@ gst_hls_demux_pause_tasks (GstHLSDemux * demux, gboolean caching) static void gst_hls_demux_stop (GstHLSDemux * demux) { - gst_uri_downloader_cancel (demux->downloader); - if (GST_TASK_STATE (demux->updates_task) != GST_TASK_STOPPED) { - demux->cancelled = TRUE; + g_mutex_lock (&demux->updates_timed_lock); + demux->stop_updates_task = TRUE; + g_cond_signal (&demux->updates_timed_cond); + g_mutex_unlock (&demux->updates_timed_lock); gst_uri_downloader_cancel (demux->downloader); gst_task_stop (demux->updates_task); - g_mutex_lock (&demux->updates_timed_lock); - GST_TASK_SIGNAL (demux->updates_task); - g_mutex_unlock (&demux->updates_timed_lock); g_rec_mutex_lock (&demux->updates_lock); g_rec_mutex_unlock (&demux->updates_lock); } if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) { + g_mutex_lock (&demux->download_lock); demux->stop_stream_task = TRUE; + g_cond_signal (&demux->download_cond); + g_mutex_unlock (&demux->download_lock); gst_task_stop (demux->stream_task); g_rec_mutex_lock (&demux->stream_lock); g_rec_mutex_unlock (&demux->stream_lock); @@ -697,42 +670,110 @@ switch_pads (GstHLSDemux * demux, GstCaps * newcaps) static void gst_hls_demux_stream_loop (GstHLSDemux * demux) { - GstFragment *fragment = NULL; + GstFragment *fragment; GstBuffer *buf; GstFlowReturn ret; GstCaps *bufcaps, *srccaps = NULL; + gboolean end_of_playlist; + GError *err = NULL; - /* Loop for the source pad task. The task is started when we have - * received the main playlist from the source element. It tries first to - * cache the first fragments and then it waits until it has more data in the - * queue. This task is woken up when we push a new fragment to the queue or - * when we reached the end of the playlist */ + /* This task will download fragments as fast as possible, sends + * SEGMENT and CAPS events and switches pads if necessary. + * If downloading a fragment fails we try again up to 3 times + * after waiting a bit. If we're at the end of the playlist + * we wait for the playlist to update before getting the next + * fragment. + */ GST_DEBUG_OBJECT (demux, "Enter task"); - if (G_UNLIKELY (demux->need_cache)) { - if (!gst_hls_demux_cache_fragments (demux)) - goto cache_error; + if (demux->stop_stream_task) + goto pause_task; - /* Pop off the first fragment immediately so the - * update task can get the next one already */ - fragment = g_queue_pop_head (demux->queue); + demux->next_download = g_get_monotonic_time (); + if ((fragment = + gst_hls_demux_get_next_fragment (demux, &end_of_playlist, + &err)) == NULL) { + if (demux->stop_stream_task) { + g_clear_error (&err); + goto pause_task; + } - /* we can start now the updates thread (only if on playing) */ - gst_task_start (demux->updates_task); - GST_INFO_OBJECT (demux, "First fragments cached successfully"); + if (end_of_playlist) { + if (!gst_m3u8_client_is_live (demux->client)) { + GST_DEBUG_OBJECT (demux, "End of playlist"); + demux->end_of_playlist = TRUE; + goto end_of_playlist; + } else { + g_mutex_lock (&demux->download_lock); + + /* Wait until we're cancelled or there's something for + * us to download in the playlist or the playlist + * became non-live */ + while (TRUE) { + if (demux->stop_stream_task) { + g_mutex_unlock (&demux->download_lock); + goto pause_task; + } + + /* Got a new fragment or not live anymore? */ + if (gst_m3u8_client_get_next_fragment (demux->client, NULL, NULL, + NULL, NULL, NULL, NULL) + || !gst_m3u8_client_is_live (demux->client)) + break; + + GST_DEBUG_OBJECT (demux, + "No fragment left but live playlist, wait a bit"); + g_cond_wait (&demux->download_cond, &demux->download_lock); + } + g_mutex_unlock (&demux->download_lock); + GST_DEBUG_OBJECT (demux, "Retrying now"); + return; + } + } else { + demux->download_failed_count++; + if (demux->download_failed_count < DEFAULT_FAILED_COUNT) { + GST_WARNING_OBJECT (demux, "Could not fetch the next fragment"); + g_clear_error (&err); + /* Wait half the fragment duration before retrying */ + demux->next_download += + gst_util_uint64_scale (gst_m3u8_client_get_current_fragment_duration + (demux->client), G_USEC_PER_SEC, 2 * GST_SECOND); + g_mutex_lock (&demux->download_lock); + if (demux->stop_stream_task) { + g_mutex_unlock (&demux->download_lock); + goto pause_task; + } + g_cond_wait_until (&demux->download_cond, &demux->download_lock, + demux->next_download); + g_mutex_unlock (&demux->download_lock); + GST_DEBUG_OBJECT (demux, "Retrying now"); + return; + } else { + gst_element_post_message (GST_ELEMENT_CAST (demux), + gst_message_new_error (GST_OBJECT_CAST (demux), err, + "Could not fetch the next fragment")); + g_clear_error (&err); + goto pause_task; + } + } + } else { + demux->download_failed_count = 0; + gst_m3u8_client_advance_fragment (demux->client); + + if (demux->stop_updates_task) { + g_object_unref (fragment); + goto pause_task; + } + + /* try to switch to another bitrate if needed */ + gst_hls_demux_switch_playlist (demux, fragment); } - if (!fragment && g_queue_is_empty (demux->queue)) { - if (demux->end_of_playlist) - goto end_of_playlist; - + if (demux->stop_updates_task) { + g_object_unref (fragment); goto pause_task; } - /* If we didn't get our fragment above already */ - if (!fragment) - fragment = g_queue_pop_head (demux->queue); - /* Figure out if we need to create/switch pads */ if (G_LIKELY (demux->srcpad)) srccaps = gst_pad_get_current_caps (demux->srcpad); @@ -771,7 +812,8 @@ gst_hls_demux_stream_loop (GstHLSDemux * demux) demux->position_shift = 0; } - GST_DEBUG_OBJECT (demux, "Pushing buffer %p", buf); + GST_DEBUG_OBJECT (demux, "Pushing buffer %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf))); ret = gst_pad_push (demux->srcpad, buf); if (ret != GST_FLOW_OK) @@ -785,20 +827,7 @@ end_of_playlist: { GST_DEBUG_OBJECT (demux, "Reached end of playlist, sending EOS"); gst_pad_push_event (demux->srcpad, gst_event_new_eos ()); - gst_hls_demux_pause_tasks (demux, FALSE); - return; - } - -cache_error: - { - /* Pausing a stopped task will start it */ - if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) - gst_task_pause (demux->stream_task); - if (!demux->cancelled) { - GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND, - ("Could not cache the first fragments"), (NULL)); - gst_hls_demux_pause_tasks (demux, FALSE); - } + gst_hls_demux_pause_tasks (demux); return; } @@ -806,7 +835,7 @@ type_not_found: { GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, ("Could not determine type of stream"), (NULL)); - gst_hls_demux_pause_tasks (demux, FALSE); + gst_hls_demux_pause_tasks (demux); return; } @@ -820,7 +849,7 @@ error_pushing: GST_DEBUG_OBJECT (demux, "stream stopped, reason %s", gst_flow_get_name (ret)); } - gst_hls_demux_pause_tasks (demux, FALSE); + gst_hls_demux_pause_tasks (demux); return; } @@ -828,8 +857,7 @@ pause_task: { GST_DEBUG_OBJECT (demux, "Pause task"); /* Pausing a stopped task will start it */ - if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) - gst_task_pause (demux->stream_task); + gst_hls_demux_pause_tasks (demux); return; } } @@ -837,11 +865,12 @@ pause_task: static void gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) { - demux->need_cache = TRUE; demux->end_of_playlist = FALSE; - demux->cancelled = FALSE; + demux->stop_updates_task = FALSE; demux->do_typefind = TRUE; + demux->download_failed_count = 0; + g_free (demux->key_url); demux->key_url = NULL; @@ -868,12 +897,6 @@ gst_hls_demux_reset (GstHLSDemux * demux, gboolean dispose) demux->client = gst_m3u8_client_new (""); } - while (!g_queue_is_empty (demux->queue)) { - GstFragment *fragment = g_queue_pop_head (demux->queue); - g_object_unref (fragment); - } - g_queue_clear (demux->queue); - demux->position_shift = 0; demux->need_segment = TRUE; @@ -900,123 +923,14 @@ gst_hls_demux_set_location (GstHLSDemux * demux, const gchar * uri) void gst_hls_demux_updates_loop (GstHLSDemux * demux) { - /* Loop for the updates. It's started when the first fragments are cached and - * schedules the next update of the playlist (for lives sources) and the next - * update of fragments. When a new fragment is downloaded, it compares the - * download time with the next scheduled update to check if we can or should - * switch to a different bitrate */ + /* Loop for updating of the playlist. This periodically checks if + * the playlist is updated and does so, then signals the streaming + * thread in case it can continue downloading now. + * For non-live playlists this thread is not doing much else than + * setting up the initial playlist and then stopping. */ /* block until the next scheduled update or the signal to quit this thread */ - g_mutex_lock (&demux->updates_timed_lock); GST_DEBUG_OBJECT (demux, "Started updates task"); - while (TRUE) { - if (demux->cancelled) - goto quit; - - /* fetch the next fragment */ - if (g_queue_get_length (demux->queue) < demux->fragments_cache) { - GError *err = NULL; - - GST_DEBUG_OBJECT (demux, "queue not full, get next fragment"); - if (!gst_hls_demux_get_next_fragment (demux, FALSE, &err)) { - if (demux->cancelled) { - g_clear_error (&err); - goto quit; - } else if (!demux->end_of_playlist) { - demux->client->update_failed_count++; - if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) { - GST_WARNING_OBJECT (demux, "Could not fetch the next fragment"); - g_clear_error (&err); - continue; - } else { - gst_element_post_message (GST_ELEMENT_CAST (demux), - gst_message_new_error (GST_OBJECT_CAST (demux), err, - "Could not fetch the next fragment")); - g_clear_error (&err); - goto error; - } - } - } else { - demux->client->update_failed_count = 0; - - if (demux->cancelled) - goto quit; - - /* try to switch to another bitrate if needed */ - gst_hls_demux_switch_playlist (demux); - } - } - - /* schedule the next update */ - gst_hls_demux_schedule (demux); - - /* block until the next scheduled update or the signal to quit this thread */ - GST_DEBUG_OBJECT (demux, "Waiting"); - if (g_cond_wait_until (GST_TASK_GET_COND (demux->updates_task), - &demux->updates_timed_lock, demux->next_update)) { - GST_DEBUG_OBJECT (demux, "Unlocked"); - goto quit; - } - GST_DEBUG_OBJECT (demux, "Continue"); - - if (demux->cancelled) - goto quit; - - /* update the playlist for live sources */ - if (gst_m3u8_client_is_live (demux->client)) { - GError *err = NULL; - - if (!gst_hls_demux_update_playlist (demux, TRUE, &err)) { - if (demux->cancelled) - goto quit; - demux->client->update_failed_count++; - if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) { - GST_WARNING_OBJECT (demux, "Could not update the playlist"); - continue; - } else { - gst_element_post_message (GST_ELEMENT_CAST (demux), - gst_message_new_error (GST_OBJECT_CAST (demux), err, - "Could not update the playlist")); - g_error_free (err); - goto error; - } - } - } - - /* if it's a live source and the playlist couldn't be updated, there aren't - * more fragments in the playlist, so we just wait for the next schedulled - * update */ - if (gst_m3u8_client_is_live (demux->client) && - demux->client->update_failed_count > 0) { - GST_WARNING_OBJECT (demux, - "The playlist hasn't been updated, failed count is %d", - demux->client->update_failed_count); - continue; - } - - if (demux->cancelled) - goto quit; - } - -quit: - { - GST_DEBUG_OBJECT (demux, "Stopped updates task"); - g_mutex_unlock (&demux->updates_timed_lock); - return; - } - -error: - { - GST_DEBUG_OBJECT (demux, "Stopped updates task because of error"); - gst_hls_demux_pause_tasks (demux, TRUE); - g_mutex_unlock (&demux->updates_timed_lock); - } -} - -static gboolean -gst_hls_demux_cache_fragments (GstHLSDemux * demux) -{ - gint i; /* If this playlist is a variant playlist, select the first one * and update it */ @@ -1025,7 +939,6 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux) GError *err = NULL; if (demux->connection_speed == 0) { - GST_M3U8_CLIENT_LOCK (demux->client); child = demux->client->main->current_variant->data; GST_M3U8_CLIENT_UNLOCK (demux->client); @@ -1042,7 +955,7 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux) gst_message_new_error (GST_OBJECT_CAST (demux), err, "Could not fetch the child playlist")); g_error_free (err); - return FALSE; + goto error; } } @@ -1056,39 +969,76 @@ gst_hls_demux_cache_fragments (GstHLSDemux * demux) gst_message_new_duration_changed (GST_OBJECT (demux))); } - /* Cache the first fragments */ - for (i = 0; i < demux->fragments_cache; i++) { + /* Now start stream task */ + gst_task_start (demux->stream_task); + + demux->next_update = + g_get_monotonic_time () + + gst_util_uint64_scale (gst_m3u8_client_get_target_duration + (demux->client), G_USEC_PER_SEC, GST_SECOND); + + /* Updating playlist only needed for live playlists */ + while (gst_m3u8_client_is_live (demux->client)) { GError *err = NULL; - gst_element_post_message (GST_ELEMENT (demux), - gst_message_new_buffering (GST_OBJECT (demux), - 100 * i / demux->fragments_cache)); - demux->next_update = g_get_monotonic_time (); - if (!gst_hls_demux_get_next_fragment (demux, TRUE, &err)) { - if (demux->end_of_playlist) - break; - if (!demux->cancelled) { + /* Wait here until we should do the next update or we're cancelled */ + GST_DEBUG_OBJECT (demux, "Wait for next playlist update"); + g_mutex_lock (&demux->updates_timed_lock); + if (demux->stop_updates_task) { + g_mutex_unlock (&demux->updates_timed_lock); + goto quit; + } + g_cond_wait_until (&demux->updates_timed_cond, &demux->updates_timed_lock, + demux->next_update); + if (demux->stop_updates_task) { + g_mutex_unlock (&demux->updates_timed_lock); + goto quit; + } + g_mutex_unlock (&demux->updates_timed_lock); + + GST_DEBUG_OBJECT (demux, "Updating playlist"); + if (!gst_hls_demux_update_playlist (demux, TRUE, &err)) { + if (demux->stop_updates_task) + goto quit; + demux->client->update_failed_count++; + if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) { + GST_WARNING_OBJECT (demux, "Could not update the playlist"); + demux->next_update = + g_get_monotonic_time () + + gst_util_uint64_scale (gst_m3u8_client_get_target_duration + (demux->client), G_USEC_PER_SEC, 2 * GST_SECOND); + } else { gst_element_post_message (GST_ELEMENT_CAST (demux), gst_message_new_error (GST_OBJECT_CAST (demux), err, - "Error caching the first fragments")); + "Could not update the playlist")); + g_error_free (err); + goto error; } - g_clear_error (&err); - return FALSE; + } else { + GST_DEBUG_OBJECT (demux, "Updated playlist successfully"); + demux->next_update = + g_get_monotonic_time () + + gst_util_uint64_scale (gst_m3u8_client_get_target_duration + (demux->client), G_USEC_PER_SEC, GST_SECOND); + /* Wake up download task */ + g_mutex_lock (&demux->download_lock); + g_cond_signal (&demux->download_cond); + g_mutex_unlock (&demux->download_lock); } - /* make sure we stop caching fragments if something cancelled it */ - if (demux->cancelled) - return FALSE; - gst_hls_demux_switch_playlist (demux); } - gst_element_post_message (GST_ELEMENT (demux), - gst_message_new_buffering (GST_OBJECT (demux), 100)); - /* Start downloading 1s early to keep the risk of - * underflows lower */ - demux->next_update = g_get_monotonic_time () - G_USEC_PER_SEC; +quit: + { + GST_DEBUG_OBJECT (demux, "Stopped updates task"); + gst_task_pause (demux->updates_task); + return; + } - demux->need_cache = FALSE; - return TRUE; +error: + { + GST_DEBUG_OBJECT (demux, "Stopped updates task because of error"); + gst_hls_demux_pause_tasks (demux); + } } static gchar * @@ -1245,45 +1195,14 @@ retry_failover_protection: } static gboolean -gst_hls_demux_schedule (GstHLSDemux * demux) -{ - gfloat update_factor; - gint count; - - /* As defined in §6.3.4. Reloading the Playlist file: - * "If the client reloads a Playlist file and finds that it has not - * changed then it MUST wait for a period of time before retrying. The - * minimum delay is a multiple of the target duration. This multiple is - * 0.5 for the first attempt, 1.5 for the second, and 3.0 thereafter." - */ - count = demux->client->update_failed_count; - if (count == 0) - update_factor = 1.0; - else - update_factor = 0.5; - - /* schedule the next update using the target duration field of the - * playlist */ - demux->next_update += - gst_util_uint64_scale (gst_m3u8_client_get_current_fragment_duration - (demux->client), G_USEC_PER_SEC * update_factor, GST_SECOND); - GST_DEBUG_OBJECT (demux, "Next update scheduled at %" G_GINT64_FORMAT, - demux->next_update); - - return TRUE; -} - -static gboolean -gst_hls_demux_switch_playlist (GstHLSDemux * demux) +gst_hls_demux_switch_playlist (GstHLSDemux * demux, GstFragment * fragment) { GstClockTime diff; gsize size; gint bitrate; - GstFragment *fragment; GstBuffer *buffer; GST_M3U8_CLIENT_LOCK (demux->client); - fragment = g_queue_peek_tail (demux->queue); if (!demux->client->main->lists || !fragment) { GST_M3U8_CLIENT_UNLOCK (demux->client); return TRUE; @@ -1292,13 +1211,13 @@ gst_hls_demux_switch_playlist (GstHLSDemux * demux) /* compare the time when the fragment was downloaded with the time when it was * scheduled */ - diff = g_get_monotonic_time () - demux->next_update; + diff = g_get_monotonic_time () - demux->next_download; buffer = gst_fragment_get_buffer (fragment); size = gst_buffer_get_size (buffer); bitrate = (size * 8) / ((double) diff / G_USEC_PER_SEC); GST_DEBUG ("Downloaded %d bytes in %" GST_TIME_FORMAT ". Bitrate is : %d", - (guint) size, GST_TIME_ARGS (diff), bitrate); + (guint) size, GST_TIME_ARGS (diff * GST_USECOND), bitrate); gst_buffer_unref (buffer); return gst_hls_demux_change_playlist (demux, bitrate * demux->bitrate_limit); @@ -1441,9 +1360,9 @@ decrypt_error: return ret; } -static gboolean -gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching, - GError ** err) +static GstFragment * +gst_hls_demux_get_next_fragment (GstHLSDemux * demux, + gboolean * end_of_playlist, GError ** err) { GstFragment *download; const gchar *next_fragment_uri; @@ -1454,12 +1373,12 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching, const gchar *key = NULL; const guint8 *iv = NULL; + *end_of_playlist = FALSE; if (!gst_m3u8_client_get_next_fragment (demux->client, &discont, &next_fragment_uri, &duration, ×tamp, &key, &iv)) { GST_INFO_OBJECT (demux, "This playlist doesn't contain more fragments"); - demux->end_of_playlist = TRUE; - gst_task_start (demux->stream_task); - return FALSE; + *end_of_playlist = TRUE; + return NULL; } GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri); @@ -1516,16 +1435,10 @@ gst_hls_demux_get_next_fragment (GstHLSDemux * demux, gboolean caching, /* The buffer ref is still kept inside the fragment download */ gst_buffer_unref (buf); - GST_DEBUG_OBJECT (demux, "Pushing fragment in queue"); - g_queue_push_tail (demux->queue, download); - if (!caching) { - GST_TASK_SIGNAL (demux->updates_task); - gst_task_start (demux->stream_task); - } - return TRUE; + return download; error: { - return FALSE; + return NULL; } } diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h index 29d579da84..3cc28b34dc 100644 --- a/ext/hls/gsthlsdemux.h +++ b/ext/hls/gsthlsdemux.h @@ -66,9 +66,6 @@ struct _GstHLSDemux GstCaps *input_caps; GstUriDownloader *downloader; GstM3U8Client *client; /* M3U8 client */ - GQueue *queue; /* Queue storing the fetched fragments */ - gboolean need_cache; /* Wheter we need to cache some fragments before starting to push data */ - gboolean end_of_playlist; gboolean do_typefind; /* Whether we need to typefind the next buffer */ /* Properties */ @@ -80,13 +77,19 @@ struct _GstHLSDemux GstTask *stream_task; GRecMutex stream_lock; gboolean stop_stream_task; + GMutex download_lock; /* Used for protecting queue and the two conds */ + GCond download_cond; /* Signalled when something is added to the queue */ + gboolean end_of_playlist; + gint download_failed_count; + gint64 next_download; /* Updates task */ GstTask *updates_task; GRecMutex updates_lock; + gint64 next_update; /* Time of the next update */ + gboolean stop_updates_task; GMutex updates_timed_lock; - gint64 next_update; /* Time of the next update */ - gboolean cancelled; + GCond updates_timed_cond; /* Signalled when the playlist should be updated */ /* Position in the stream */ GstClockTime position_shift; diff --git a/ext/hls/m3u8.c b/ext/hls/m3u8.c index 771ab1e4d8..fed20db4ec 100644 --- a/ext/hls/m3u8.c +++ b/ext/hls/m3u8.c @@ -598,7 +598,6 @@ gst_m3u8_client_get_next_fragment (GstM3U8Client * client, g_return_val_if_fail (client != NULL, FALSE); g_return_val_if_fail (client->current != NULL, FALSE); - g_return_val_if_fail (discontinuity != NULL, FALSE); GST_M3U8_CLIENT_LOCK (client); GST_DEBUG ("Looking for fragment %d", client->sequence); @@ -609,22 +608,53 @@ gst_m3u8_client_get_next_fragment (GstM3U8Client * client, return FALSE; } - gst_m3u8_client_get_current_position (client, timestamp); - file = GST_M3U8_MEDIA_FILE (l->data); + GST_DEBUG ("Got fragment with sequence %u (client sequence %u)", + file->sequence, client->sequence); - *discontinuity = client->sequence != file->sequence; - client->sequence = file->sequence + 1; + if (timestamp) + gst_m3u8_client_get_current_position (client, timestamp); - *uri = file->uri; - *duration = file->duration; - *key = file->key; - *iv = file->iv; + if (discontinuity) + *discontinuity = client->sequence != file->sequence; + if (uri) + *uri = file->uri; + if (duration) + *duration = file->duration; + if (key) + *key = file->key; + if (iv) + *iv = file->iv; GST_M3U8_CLIENT_UNLOCK (client); return TRUE; } +void +gst_m3u8_client_advance_fragment (GstM3U8Client * client) +{ + GList *l; + GstM3U8MediaFile *file; + + g_return_if_fail (client != NULL); + g_return_if_fail (client->current != NULL); + + GST_M3U8_CLIENT_LOCK (client); + GST_DEBUG ("Looking for fragment %d", client->sequence); + l = g_list_find_custom (client->current->files, client, + (GCompareFunc) _find_next); + if (l == NULL) { + GST_ERROR ("Could not find current fragment"); + GST_M3U8_CLIENT_UNLOCK (client); + return; + } + + file = GST_M3U8_MEDIA_FILE (l->data); + GST_DEBUG ("Advancing from sequence %u", file->sequence); + client->sequence = file->sequence + 1; + GST_M3U8_CLIENT_UNLOCK (client); +} + static void _sum_duration (GstM3U8MediaFile * self, GstClockTime * duration) { diff --git a/ext/hls/m3u8.h b/ext/hls/m3u8.h index 14bf4294c4..4d80f04f70 100644 --- a/ext/hls/m3u8.h +++ b/ext/hls/m3u8.h @@ -87,6 +87,7 @@ void gst_m3u8_client_set_current (GstM3U8Client * client, GstM3U8 * m3u8); gboolean gst_m3u8_client_get_next_fragment (GstM3U8Client * client, gboolean * discontinuity, const gchar ** uri, GstClockTime * duration, GstClockTime * timestamp, const gchar ** key, const guint8 ** iv); +void gst_m3u8_client_advance_fragment (GstM3U8Client * client); void gst_m3u8_client_get_current_position (GstM3U8Client * client, GstClockTime * timestamp); GstClockTime gst_m3u8_client_get_duration (GstM3U8Client * client);