diff --git a/ext/smoothstreaming/gstmssdemux.c b/ext/smoothstreaming/gstmssdemux.c index 368863969d..fc7fa7188f 100644 --- a/ext/smoothstreaming/gstmssdemux.c +++ b/ext/smoothstreaming/gstmssdemux.c @@ -234,6 +234,7 @@ gst_mss_demux_stream_new (GstMssDemux * mssdemux, stream->downloader = gst_uri_downloader_new (); stream->dataqueue = gst_data_queue_new (_data_queue_check_full, NULL, NULL, stream); + g_mutex_init (&stream->mutex); /* Downloading task */ g_rec_mutex_init (&stream->download_lock); @@ -292,6 +293,7 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream) } if (stream->caps) gst_caps_unref (stream->caps); + g_mutex_clear (&stream->mutex); g_free (stream); } @@ -619,6 +621,9 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) GstMssDemuxStream *stream = iter->data; stream->eos = FALSE; + if (flags & GST_SEEK_FLAG_FLUSH) { + stream->last_ret = GST_FLOW_OK; + } gst_data_queue_flush (stream->dataqueue); gst_event_replace (&stream->pending_newsegment, newsegment); } @@ -637,6 +642,26 @@ gst_mss_demux_src_event (GstPad * pad, GstObject * parent, GstEvent * event) gst_event_unref (event); return TRUE; } + case GST_EVENT_RECONFIGURE:{ + GSList *iter; + + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + + if (stream->pad == pad) { + GST_MSS_DEMUX_STREAM_LOCK (stream); + if (GST_TASK_STATE (stream->download_task) == GST_TASK_PAUSED + && stream->last_ret == GST_FLOW_NOT_LINKED) { + stream->restart_download = TRUE; + gst_task_start (stream->download_task); + } + GST_MSS_DEMUX_STREAM_UNLOCK (stream); + gst_event_unref (event); + return TRUE; + } + } + } + break; default: break; } @@ -1051,16 +1076,23 @@ gst_mss_demux_stream_store_object (GstMssDemuxStream * stream, static GstFlowReturn gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, - gboolean * buffer_downloaded) + GstBuffer ** _buffer) { GstMssDemux *mssdemux = stream->parent; gchar *path; gchar *url; GstFragment *fragment; - GstBuffer *_buffer; + GstBuffer *buffer; GstFlowReturn ret = GST_FLOW_OK; guint64 before_download, after_download; + /* special case for not-linked streams */ + if (stream->last_ret == GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mssdemux, "Skipping download for not-linked stream %p", + stream); + return GST_FLOW_NOT_LINKED; + } + before_download = g_get_real_time (); GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream); @@ -1103,22 +1135,19 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, return GST_FLOW_ERROR; } - _buffer = gst_fragment_get_buffer (fragment); - _buffer = gst_buffer_make_writable (_buffer); - GST_BUFFER_TIMESTAMP (_buffer) = + buffer = gst_fragment_get_buffer (fragment); + *_buffer = buffer = gst_buffer_make_writable (buffer); + GST_BUFFER_TIMESTAMP (buffer) = gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream); - GST_BUFFER_DURATION (_buffer) = + GST_BUFFER_DURATION (buffer) = gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream); g_object_unref (fragment); - if (buffer_downloaded) - *buffer_downloaded = _buffer != NULL; - after_download = g_get_real_time (); if (_buffer) { #ifndef GST_DISABLE_GST_DEBUG - guint64 bitrate = (8 * gst_buffer_get_size (_buffer) * 1000000LLU) / + guint64 bitrate = (8 * gst_buffer_get_size (buffer) * 1000000LLU) / (after_download - before_download); #endif @@ -1126,16 +1155,8 @@ gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream, "Measured download bitrate: %s %" G_GUINT64_FORMAT " bps", GST_PAD_NAME (stream->pad), bitrate); gst_download_rate_add_rate (&stream->download_rate, - gst_buffer_get_size (_buffer), + gst_buffer_get_size (buffer), 1000 * (after_download - before_download)); - - GST_DEBUG_OBJECT (mssdemux, - "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT - " Duration: %" GST_TIME_FORMAT, - stream, GST_PAD_NAME (stream->pad), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)), - GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer))); - gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer)); } return ret; @@ -1160,22 +1181,130 @@ static void gst_mss_demux_download_loop (GstMssDemuxStream * stream) { GstMssDemux *mssdemux = stream->parent; - gboolean buffer_downloaded = FALSE; GstFlowReturn ret; + GstBuffer *buffer = NULL; + gboolean buffer_downloaded = FALSE; + GstEvent *gap = NULL; GST_LOG_OBJECT (mssdemux, "download loop start %p", stream); GST_OBJECT_LOCK (mssdemux); + if (G_UNLIKELY (stream->restart_download)) { + GstClockTime cur, ts; + gint64 pos; + + GST_MSS_DEMUX_STREAM_LOCK (stream); + + GST_DEBUG_OBJECT (mssdemux, + "Activating stream %p due to reconfigure " "event", stream); + + cur = GST_CLOCK_TIME_IS_VALID (stream->next_timestamp) ? + stream->next_timestamp : 0; + + if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) { + ts = (GstClockTime) pos; + GST_DEBUG_OBJECT (mssdemux, "Downstream position: %" + GST_TIME_FORMAT, GST_TIME_ARGS (ts)); + } else { + GST_DEBUG_OBJECT (mssdemux, "Downstream position query failed, " + "failling back to segment position"); + ts = mssdemux->segment.position; + } + + /* we might have already pushed this data */ + ts = MAX (ts, stream->next_timestamp); + + GST_DEBUG_OBJECT (mssdemux, "Restarting stream %p %s:%s at " + "position %" GST_TIME_FORMAT ", catching up until segment position %" + GST_TIME_FORMAT, stream, GST_DEBUG_PAD_NAME (stream->pad), + GST_TIME_ARGS (ts), GST_TIME_ARGS (mssdemux->segment.position)); + + if (GST_CLOCK_TIME_IS_VALID (ts)) { + gst_mss_stream_seek (stream->manifest_stream, ts); + + if (cur < ts) { + gap = gst_event_new_gap (cur, ts - cur); + } + } + + /* This stream might be entering into catching up mode, + * meaning that it will push buffers from this same download thread + * until it reaches the segment position. + * + * The reason for this is that in case of stream switching, the other + * stream that was previously active might be blocking the stream_loop + * in case it is ahead enough that all queues are filled. + * In this case, it is possible that a downstream input-selector is + * blocking waiting for the currently active stream to reach the + * same position of the old linked stream because of the 'sync-streams' + * behavior. + * + * We can push from this thread up to segment position as all other + * streams should be around the same timestamp. + */ + stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; + stream->eos = FALSE; + + gst_data_queue_set_flushing (stream->dataqueue, FALSE); + stream->restart_download = FALSE; + gst_task_start (mssdemux->stream_task); + GST_MSS_DEMUX_STREAM_UNLOCK (stream); + } + GST_DEBUG_OBJECT (mssdemux, "Starting streams reconfiguration due to bitrate changes"); gst_mss_demux_reconfigure_stream (stream); GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration"); GST_OBJECT_UNLOCK (mssdemux); - ret = gst_mss_demux_stream_download_fragment (stream, &buffer_downloaded); + if (gap != NULL) + gst_pad_push_event (stream->pad, gap); - if (stream->cancelled) + ret = gst_mss_demux_stream_download_fragment (stream, &buffer); + buffer_downloaded = buffer != NULL; + + if (stream->cancelled) { + if (buffer) + gst_buffer_unref (buffer); goto cancelled; + } + + if (buffer) { + gboolean catch_up = FALSE; + + /* Check if this stream is on catch up mode */ + if (stream->last_ret == GST_FLOW_CUSTOM_SUCCESS) { + GST_DEBUG_OBJECT (mssdemux, + "Catch up ts: %" GST_TIME_FORMAT ", buffer:%" GST_TIME_FORMAT, + GST_TIME_ARGS (mssdemux->segment.position), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); + if (GST_BUFFER_TIMESTAMP (buffer) < mssdemux->segment.position) { + catch_up = TRUE; + } else { + GST_OBJECT_LOCK (mssdemux); + stream->last_ret = GST_FLOW_OK; + gst_task_start (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); + } + } + + GST_DEBUG_OBJECT (mssdemux, + "%s buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT + " Duration: %" GST_TIME_FORMAT, + catch_up ? "Catch up push for" : "Storing", stream, + GST_PAD_NAME (stream->pad), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); + + if (catch_up) { + ret = stream->last_ret = gst_pad_push (stream->pad, buffer); + if (G_LIKELY (ret == GST_FLOW_OK)) + stream->last_ret = GST_FLOW_CUSTOM_SUCCESS; + /* TODO handle return */ + } else { + gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (buffer)); + } + } switch (ret) { case GST_FLOW_OK: @@ -1184,6 +1313,8 @@ gst_mss_demux_download_loop (GstMssDemuxStream * stream) goto eos; case GST_FLOW_ERROR: goto error; + case GST_FLOW_NOT_LINKED: + goto notlinked; default: break; } @@ -1203,7 +1334,10 @@ eos: GST_DEBUG_PAD_NAME (stream->pad)); gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (gst_event_new_eos ())); + GST_OBJECT_LOCK (mssdemux); gst_task_pause (stream->download_task); + gst_task_start (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } error: @@ -1222,6 +1356,15 @@ cancelled: gst_task_pause (stream->download_task); return; } +notlinked: + { + GST_MSS_DEMUX_STREAM_LOCK (stream); + if (stream->last_ret == GST_FLOW_NOT_LINKED) { + gst_task_pause (stream->download_task); + gst_data_queue_set_flushing (stream->dataqueue, TRUE); + } + GST_MSS_DEMUX_STREAM_UNLOCK (stream); + } } static GstFlowReturn @@ -1242,12 +1385,18 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux, GstDataQueueItem *item; other = iter->data; - if (other->eos) { + if (other->eos || other->last_ret == GST_FLOW_NOT_LINKED) { + GST_DEBUG_OBJECT (mssdemux, "Skipping stream %p eos:%d last-ret:%d", + other, other->eos, other->last_ret); continue; } if (!gst_data_queue_peek (other->dataqueue, &item)) { /* flushing */ + if (other->last_ret == GST_FLOW_NOT_LINKED) { + /* might have been unlinked and won't receive data for now */ + continue; + } return GST_FLOW_FLUSHING; } @@ -1269,6 +1418,27 @@ gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux, return ret; } +static GstFlowReturn +gst_mss_demux_combine_flows (GstMssDemux * mssdemux) +{ + gboolean all_notlinked = TRUE; + GSList *iter; + + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + + if (stream->last_ret != GST_FLOW_NOT_LINKED) + all_notlinked = FALSE; + + if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED + || stream->last_ret == GST_FLOW_FLUSHING) + return stream->last_ret; + } + if (all_notlinked) + return GST_FLOW_NOT_LINKED; + return GST_FLOW_OK; +} + static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux) { @@ -1289,6 +1459,8 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret, gst_flow_get_name (ret)); + /* Lock as this may change the tasks state */ + GST_OBJECT_LOCK (mssdemux); switch (ret) { case GST_FLOW_OK: break; @@ -1302,6 +1474,7 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) default: g_assert_not_reached (); } + GST_OBJECT_UNLOCK (mssdemux); GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s", stream, GST_PAD_NAME (stream->pad)); @@ -1332,17 +1505,20 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) GST_DEBUG_OBJECT (mssdemux, "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT - " discont:%d on pad %s", object, + " discont:%d on pad %s:%s", object, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)), GST_TIME_ARGS (GST_BUFFER_DURATION (object)), GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT), - GST_PAD_NAME (stream->pad)); + GST_DEBUG_PAD_NAME (stream->pad)); stream->next_timestamp = GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object); stream->have_data = TRUE; - ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object)); + mssdemux->segment.position = GST_BUFFER_TIMESTAMP (object); + stream->last_ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object)); + GST_DEBUG_OBJECT (mssdemux, "Pushed on pad %s:%s result: %d (%s)", + GST_DEBUG_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret)); } else if (GST_IS_EVENT (object)) { if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) { stream->eos = TRUE; @@ -1350,21 +1526,27 @@ gst_mss_demux_stream_loop (GstMssDemux * mssdemux) GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object, GST_PAD_NAME (stream->pad)); gst_pad_push_event (stream->pad, GST_EVENT_CAST (object)); + stream->last_ret = GST_FLOW_EOS; } else { g_return_if_reached (); } + /* Lock as this may change the tasks state */ + GST_OBJECT_LOCK (mssdemux); + ret = gst_mss_demux_combine_flows (mssdemux); switch (ret) { case GST_FLOW_EOS: goto eos; /* EOS ? */ case GST_FLOW_ERROR: goto error; case GST_FLOW_NOT_LINKED: - break; /* TODO what to do here? pause the task or just keep pushing? */ + /* stream won't download any more data until it gets a reconfigure */ + break; case GST_FLOW_OK: default: break; } + GST_OBJECT_UNLOCK (mssdemux); GST_LOG_OBJECT (mssdemux, "Stream loop end"); return; @@ -1373,18 +1555,21 @@ eos: { GST_DEBUG_OBJECT (mssdemux, "EOS on all pads"); gst_task_pause (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } error: { GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment"); gst_task_pause (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } stop: { GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task"); gst_task_pause (mssdemux->stream_task); + GST_OBJECT_UNLOCK (mssdemux); return; } } diff --git a/ext/smoothstreaming/gstmssdemux.h b/ext/smoothstreaming/gstmssdemux.h index 6bc3ba4653..d651b8050f 100644 --- a/ext/smoothstreaming/gstmssdemux.h +++ b/ext/smoothstreaming/gstmssdemux.h @@ -52,9 +52,14 @@ typedef struct _GstMssDemuxStream GstMssDemuxStream; typedef struct _GstMssDemux GstMssDemux; typedef struct _GstMssDemuxClass GstMssDemuxClass; +#define GST_MSS_DEMUX_STREAM_LOCK(s) g_mutex_lock (&(s)->mutex) +#define GST_MSS_DEMUX_STREAM_UNLOCK(s) g_mutex_unlock (&(s)->mutex) + struct _GstMssDemuxStream { GstPad *pad; + GMutex mutex; + GstCaps *caps; GstMssDemux *parent; @@ -72,9 +77,11 @@ struct _GstMssDemuxStream { GstTask *download_task; GRecMutex download_lock; + GstFlowReturn last_ret; gboolean eos; gboolean have_data; gboolean cancelled; + gboolean restart_download; GstDownloadRate download_rate;