diff --git a/ext/smoothstreaming/gstmssdemux.c b/ext/smoothstreaming/gstmssdemux.c index 86d7c20347..42c9b8dd03 100644 --- a/ext/smoothstreaming/gstmssdemux.c +++ b/ext/smoothstreaming/gstmssdemux.c @@ -258,6 +258,7 @@ gst_mss_demux_reset (GstMssDemux * mssdemux) mssdemux->n_videos = mssdemux->n_audios = 0; g_free (mssdemux->base_url); + g_free (mssdemux->manifest_uri); mssdemux->base_url = NULL; } @@ -562,14 +563,24 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query) GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration)); break; } - case GST_QUERY_LATENCY: - gst_query_set_latency (query, FALSE, 0, -1); + case GST_QUERY_LATENCY:{ + gboolean live = FALSE; + + live = mssdemux->manifest + && gst_mss_manifest_is_live (mssdemux->manifest); + + gst_query_set_latency (query, live, 0, -1); ret = TRUE; break; + } case GST_QUERY_SEEKING:{ GstFormat fmt; gint64 stop = -1; + if (mssdemux->manifest && gst_mss_manifest_is_live (mssdemux->manifest)) { + return FALSE; /* no live seeking */ + } + gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL); GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d", fmt); @@ -728,6 +739,7 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux) gst_query_parse_uri (query, &uri); GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri); + mssdemux->manifest_uri = g_strdup (uri); baseurl_end = g_strrstr (uri, "/Manifest"); if (baseurl_end) { /* set the new end of the string */ @@ -754,6 +766,9 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux) return FALSE; } + GST_INFO_OBJECT (mssdemux, "Live stream: %d", + gst_mss_manifest_is_live (mssdemux->manifest)); + gst_mss_demux_create_streams (mssdemux); for (iter = mssdemux->streams; iter;) { GSList *current = iter; @@ -779,6 +794,27 @@ gst_mss_demux_process_manifest (GstMssDemux * mssdemux) return TRUE; } +static void +gst_mss_demux_reload_manifest (GstMssDemux * mssdemux) +{ + GstUriDownloader *downloader; + GstFragment *manifest_data; + GstBuffer *manifest_buffer; + + downloader = gst_uri_downloader_new (); + + manifest_data = + gst_uri_downloader_fetch_uri (downloader, mssdemux->manifest_uri); + manifest_buffer = gst_fragment_get_buffer (manifest_data); + g_object_unref (manifest_data); + + gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer); + gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer); + gst_buffer_unref (manifest_buffer); + + g_object_unref (downloader); +} + static void gst_mss_demux_reconfigure (GstMssDemux * mssdemux) { @@ -911,6 +947,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, case GST_FLOW_OK: break; /* all is good, let's go */ case GST_FLOW_UNEXPECTED: /* EOS */ + gst_mss_demux_reload_manifest (mssdemux); + return GST_FLOW_OK; return GST_FLOW_UNEXPECTED; case GST_FLOW_ERROR: goto error; @@ -924,6 +962,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, url = g_strdup_printf ("%s/%s", mssdemux->base_url, path); + GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream); + fragment = gst_uri_downloader_fetch_uri (stream->downloader, url); g_free (path); g_free (url); @@ -931,6 +971,11 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, if (!fragment) { GST_INFO_OBJECT (mssdemux, "No fragment downloaded"); /* TODO check if we are truly stoping */ + if (gst_mss_manifest_is_live (mssdemux->manifest)) { + /* looks like there is no way of knowing when a live stream has ended + * Have to assume we are falling behind and cause a manifest reload */ + return GST_FLOW_OK; + } return GST_FLOW_ERROR; } @@ -949,9 +994,11 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, if (_buffer) { GST_DEBUG_OBJECT (mssdemux, - "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT, + "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT + " Duration: %" GST_TIME_FORMAT, stream, GST_PAD_NAME (stream->pad), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer))); + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer))); gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer)); } @@ -995,9 +1042,9 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream) break; } - g_assert (buffer != NULL); - - gst_mss_stream_advance_fragment (stream->manifest_stream); + if (buffer) { + gst_mss_stream_advance_fragment (stream->manifest_stream); + } GST_LOG_OBJECT (mssdemux, "download loop end %p", stream); return; @@ -1130,10 +1177,25 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) } if (G_LIKELY (GST_IS_BUFFER (object))) { + if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) { + GST_ERROR_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%" + GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)), + GST_TIME_ARGS (stream->next_timestamp)); + GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT); + } + GST_DEBUG_OBJECT (mssdemux, - "Pushing buffer %p %" GST_TIME_FORMAT " on pad %s", object, + "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT + " discont:%d on pad %s", object, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)), + GST_TIME_ARGS (GST_BUFFER_DURATION (object)), + GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT), GST_PAD_NAME (stream->pad)); + + stream->next_timestamp = + GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object); + ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object)); } else if (GST_IS_EVENT (object)) { if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) diff --git a/ext/smoothstreaming/gstmssdemux.h b/ext/smoothstreaming/gstmssdemux.h index ceb471bdaf..26cbb2b809 100644 --- a/ext/smoothstreaming/gstmssdemux.h +++ b/ext/smoothstreaming/gstmssdemux.h @@ -63,6 +63,8 @@ struct _GstMssDemuxStream { GstEvent *pending_newsegment; + GstClockTime next_timestamp; + /* Downloading task */ GstTask *download_task; GStaticRecMutex download_lock; @@ -80,6 +82,7 @@ struct _GstMssDemux { GstMssManifest *manifest; gchar *base_url; + gchar *manifest_uri; GSList *streams; guint n_videos; diff --git a/ext/smoothstreaming/gstmssmanifest.c b/ext/smoothstreaming/gstmssmanifest.c index 0b822133fd..b641675505 100644 --- a/ext/smoothstreaming/gstmssmanifest.c +++ b/ext/smoothstreaming/gstmssmanifest.c @@ -22,6 +22,8 @@ #include #include +#include +#include #include #include @@ -43,6 +45,8 @@ #define MSS_PROP_TIMESCALE "TimeScale" #define MSS_PROP_URL "Url" +#define TO_LOWER(str) { char* p = str; for ( ; *p; ++p) *p = tolower(*p); } + /* TODO check if atoi is successful? */ typedef struct _GstMssStreamFragment @@ -85,6 +89,8 @@ struct _GstMssManifest xmlDocPtr xml; xmlNodePtr xmlrootnode; + gboolean is_live; + GSList *streams; }; @@ -187,7 +193,6 @@ _gst_mss_stream_init (GstMssStream * stream, xmlNodePtr node) /* we reverse it later */ stream->fragments = g_list_prepend (stream->fragments, fragment); - } else if (node_has_type (iter, MSS_NODE_STREAM_QUALITY)) { GstMssStreamQuality *quality = gst_mss_stream_quality_new (iter); stream->qualities = g_list_prepend (stream->qualities, quality); @@ -215,6 +220,7 @@ gst_mss_manifest_new (const GstBuffer * data) GstMssManifest *manifest; xmlNodePtr root; xmlNodePtr nodeiter; + gchar *live_str; manifest = g_malloc0 (sizeof (GstMssManifest)); @@ -222,6 +228,12 @@ gst_mss_manifest_new (const GstBuffer * data) GST_BUFFER_SIZE (data), "manifest", NULL, 0); root = manifest->xmlrootnode = xmlDocGetRootElement (manifest->xml); + live_str = (gchar *) xmlGetProp (root, (xmlChar *) "IsLive"); + if (live_str) { + TO_LOWER (live_str); + manifest->is_live = strcmp (live_str, "true") == 0; + } + for (nodeiter = root->children; nodeiter; nodeiter = nodeiter->next) { if (nodeiter->type == XML_ELEMENT_NODE && (strcmp ((const char *) nodeiter->name, "StreamIndex") == 0)) { @@ -777,6 +789,138 @@ gst_mss_manifest_get_current_bitrate (GstMssManifest * manifest) return bitrate; } +gboolean +gst_mss_manifest_is_live (GstMssManifest * manifest) +{ + return manifest->is_live; +} + +static void +gst_mss_stream_reload_fragments (GstMssStream * stream, xmlNodePtr streamIndex) +{ + xmlNodePtr iter; + GList *new_fragments = NULL; + GstMssStreamFragment *previous_fragment = NULL; + GstMssStreamFragment *current_fragment = + stream->current_fragment ? stream->current_fragment->data : NULL; + guint64 current_time = gst_mss_stream_get_fragment_gst_timestamp (stream); + guint fragment_number = 0; + guint64 fragment_time_accum = 0; + + if (!current_fragment && stream->fragments) { + current_fragment = g_list_last (stream->fragments)->data; + } else if (g_list_previous (stream->current_fragment)) { + /* rewind one as this is the next to be pushed */ + current_fragment = g_list_previous (stream->current_fragment)->data; + } else { + current_fragment = NULL; + } + + if (current_fragment) { + current_time = current_fragment->time; + fragment_number = current_fragment->number; + fragment_time_accum = current_fragment->time; + } + + for (iter = streamIndex->children; iter; iter = iter->next) { + if (node_has_type (iter, MSS_NODE_STREAM_FRAGMENT)) { + gchar *duration_str; + gchar *time_str; + gchar *seqnum_str; + GstMssStreamFragment *fragment = g_new (GstMssStreamFragment, 1); + + duration_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_DURATION); + time_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_TIME); + seqnum_str = (gchar *) xmlGetProp (iter, (xmlChar *) MSS_PROP_NUMBER); + + /* use the node's seq number or use the previous + 1 */ + if (seqnum_str) { + fragment->number = g_ascii_strtoull (seqnum_str, NULL, 10); + g_free (seqnum_str); + } else { + fragment->number = fragment_number; + } + fragment_number = fragment->number + 1; + + if (time_str) { + fragment->time = g_ascii_strtoull (time_str, NULL, 10); + g_free (time_str); + fragment_time_accum = fragment->time; + } else { + fragment->time = fragment_time_accum; + } + + /* if we have a previous fragment, means we need to set its duration */ + if (previous_fragment) + previous_fragment->duration = fragment->time - previous_fragment->time; + + if (duration_str) { + fragment->duration = g_ascii_strtoull (duration_str, NULL, 10); + + previous_fragment = NULL; + fragment_time_accum += fragment->duration; + g_free (duration_str); + } else { + /* store to set the duration at the next iteration */ + previous_fragment = fragment; + } + + if (fragment->time > current_time) { + new_fragments = g_list_append (new_fragments, fragment); + } else { + previous_fragment = NULL; + g_free (fragment); + } + + } else { + /* TODO gst log this */ + } + } + + /* store the new fragments list */ + if (new_fragments) { + g_list_free_full (stream->fragments, g_free); + stream->fragments = new_fragments; + stream->current_fragment = new_fragments; + } +} + +static void +gst_mss_manifest_reload_fragments_from_xml (GstMssManifest * manifest, + xmlNodePtr root) +{ + xmlNodePtr nodeiter; + GSList *streams = manifest->streams; + + /* we assume the server is providing the streams in the same order in + * every manifest */ + for (nodeiter = root->children; nodeiter && streams; + nodeiter = nodeiter->next) { + if (nodeiter->type == XML_ELEMENT_NODE + && (strcmp ((const char *) nodeiter->name, "StreamIndex") == 0)) { + gst_mss_stream_reload_fragments (streams->data, nodeiter); + streams = g_slist_next (streams); + } + } +} + +void +gst_mss_manifest_reload_fragments (GstMssManifest * manifest, GstBuffer * data) +{ + xmlDocPtr xml; + xmlNodePtr root; + + g_return_if_fail (manifest->is_live); + + xml = xmlReadMemory ((const gchar *) GST_BUFFER_DATA (data), + GST_BUFFER_SIZE (data), "manifest", NULL, 0); + root = xmlDocGetRootElement (xml); + + gst_mss_manifest_reload_fragments_from_xml (manifest, root); + + xmlFreeDoc (xml); +} + static gboolean gst_mss_stream_select_bitrate (GstMssStream * stream, guint64 bitrate) { diff --git a/ext/smoothstreaming/gstmssmanifest.h b/ext/smoothstreaming/gstmssmanifest.h index 796130a746..982f32d501 100644 --- a/ext/smoothstreaming/gstmssmanifest.h +++ b/ext/smoothstreaming/gstmssmanifest.h @@ -47,6 +47,8 @@ GstClockTime gst_mss_manifest_get_gst_duration (GstMssManifest * manifest); gboolean gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time); gboolean gst_mss_manifest_change_bitrate (GstMssManifest *manifest, guint64 bitrate); guint64 gst_mss_manifest_get_current_bitrate (GstMssManifest * manifest); +gboolean gst_mss_manifest_is_live (GstMssManifest * manifest); +void gst_mss_manifest_reload_fragments (GstMssManifest * manifest, GstBuffer * data); GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream); GstCaps * gst_mss_stream_get_caps (GstMssStream * stream);