diff --git a/gst/playback/gsturisourcebin.c b/gst/playback/gsturisourcebin.c index 54ec197441..4f7cc5c7ea 100644 --- a/gst/playback/gsturisourcebin.c +++ b/gst/playback/gsturisourcebin.c @@ -73,6 +73,23 @@ typedef struct _OutputSlotInfo OutputSlotInfo; #define GST_URI_SOURCE_BIN_LOCK(dec) (g_mutex_lock(&((GstURISourceBin*)(dec))->lock)) #define GST_URI_SOURCE_BIN_UNLOCK(dec) (g_mutex_unlock(&((GstURISourceBin*)(dec))->lock)) +#define BUFFERING_LOCK(ubin) G_STMT_START { \ + GST_LOG_OBJECT (ubin, \ + "buffering locking from thread %p", \ + g_thread_self ()); \ + g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \ + GST_LOG_OBJECT (ubin, \ + "buffering lock from thread %p", \ + g_thread_self ()); \ +} G_STMT_END + +#define BUFFERING_UNLOCK(ubin) G_STMT_START { \ + GST_LOG_OBJECT (ubin, \ + "buffering unlocking from thread %p", \ + g_thread_self ()); \ + g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \ +} G_STMT_END + /* Track a source pad from a child that * is linked or needs linking to an output * slot */ @@ -144,6 +161,8 @@ struct _GstURISourceBin GList *buffering_status; /* element currently buffering messages */ gint last_buffering_pct; /* Avoid sending buffering over and over */ + GMutex buffering_lock; + GMutex buffering_post_lock; }; struct _GstURISourceBinClass @@ -725,6 +744,9 @@ gst_uri_source_bin_init (GstURISourceBin * urisrc) g_mutex_init (&urisrc->lock); + g_mutex_init (&urisrc->buffering_lock); + g_mutex_init (&urisrc->buffering_post_lock); + urisrc->uri = g_strdup (DEFAULT_PROP_URI); urisrc->connection_speed = DEFAULT_CONNECTION_SPEED; @@ -1002,6 +1024,9 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot) /* Look for a suitable pending pad */ cur_caps = gst_pad_get_current_caps (slot->sinkpad); + GST_DEBUG_OBJECT (urisrc, + "Looking for a pending pad with caps %" GST_PTR_FORMAT, cur_caps); + for (cur = urisrc->pending_pads; cur != NULL; cur = g_list_next (cur)) { GstPad *pending = (GstPad *) (cur->data); ChildSrcPadInfo *cur_info = NULL; @@ -1029,8 +1054,8 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot) guint block_id = gst_pad_add_probe (slot->sinkpad, GST_PAD_PROBE_TYPE_BLOCK_UPSTREAM, NULL, NULL, NULL); - GST_DEBUG_OBJECT (urisrc, "Linking pending pad to existing output slot %p", - slot); + GST_DEBUG_OBJECT (urisrc, "Linking pending pad %" GST_PTR_FORMAT + " to existing output slot %p", out_info->demux_src_pad, slot); if (in_info) { gst_pad_unlink (in_info->demux_src_pad, slot->sinkpad); @@ -1042,6 +1067,11 @@ link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot) slot->sinkpad) == GST_PAD_LINK_OK) { out_info->output_slot = slot; slot->linked_info = out_info; + + BUFFERING_LOCK (urisrc); + /* A re-linked slot is no longer EOS */ + slot->is_eos = FALSE; + BUFFERING_UNLOCK (urisrc); res = TRUE; urisrc->pending_pads = g_list_remove (urisrc->pending_pads, out_info->demux_src_pad); @@ -1093,8 +1123,14 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) goto done; } + BUFFERING_LOCK (urisrc); /* Mark that we fed an EOS to this slot */ child_info->output_slot->is_eos = TRUE; + BUFFERING_UNLOCK (urisrc); + + /* EOS means this element is no longer buffering */ + remove_buffering_msgs (urisrc, + GST_OBJECT_CAST (child_info->output_slot->queue)); /* Actually feed a custom EOS event to avoid marking pads as EOSed */ s = gst_structure_new_empty ("urisourcebin-custom-eos"); @@ -1112,7 +1148,9 @@ demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) break; case GST_EVENT_STREAM_START: case GST_EVENT_FLUSH_STOP: + BUFFERING_LOCK (urisrc); child_info->output_slot->is_eos = FALSE; + BUFFERING_UNLOCK (urisrc); break; default: break; @@ -1370,17 +1408,23 @@ pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc) GstEvent *event; OutputSlotInfo *slot; - if (!info->output_slot->is_eos && urisrc->pending_pads && - link_pending_pad_to_output (urisrc, info->output_slot)) { + slot = info->output_slot; + + if (!slot->is_eos && urisrc->pending_pads && + link_pending_pad_to_output (urisrc, slot)) { /* Found a new source pad to give this slot data - no need to send EOS */ GST_URI_SOURCE_BIN_UNLOCK (urisrc); return; } - /* Unlink this pad from its output slot and send a fake EOS event to drain the - * queue */ - slot = info->output_slot; + BUFFERING_LOCK (urisrc); + /* Unlink this pad from its output slot and send a fake EOS event + * to drain the queue */ slot->is_eos = TRUE; + BUFFERING_UNLOCK (urisrc); + + remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue)); + slot->linked_info = NULL; info->output_slot = NULL; @@ -2378,7 +2422,7 @@ handle_redirect_message (GstURISourceBin * dec, GstMessage * msg) return new_msg; } -static GstMessage * +static void handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) { gint perc, msg_perc; @@ -2386,6 +2430,7 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) GstMessage *smaller = NULL; GList *found = NULL; GList *iter; + OutputSlotInfo *slot; /* buffering messages must be aggregated as there might be multiple * multiqueue in the pipeline and their independent buffering messages @@ -2405,7 +2450,22 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT " with %d%%", GST_MESSAGE_SRC (msg), msg_perc); - GST_OBJECT_LOCK (urisrc); + slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)), + "urisourcebin.slotinfo"); + + BUFFERING_LOCK (urisrc); + if (slot && slot->is_eos) { + /* Ignore buffering messages from queues we marked as EOS, + * we already removed those from the list of buffering + * objects */ + BUFFERING_UNLOCK (urisrc); + gst_message_replace (&msg, NULL); + return; + } + + + g_mutex_lock (&urisrc->buffering_post_lock); + /* * Single loop for 2 things: * 1) Look for a message with the same source @@ -2414,11 +2474,10 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) */ for (iter = urisrc->buffering_status; iter;) { GstMessage *bufstats = iter->data; - OutputSlotInfo *slot = - g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)), - "urisourcebin.slotinfo"); gboolean is_eos = FALSE; + slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)), + "urisourcebin.slotinfo"); if (slot) is_eos = slot->is_eos; @@ -2477,16 +2536,17 @@ handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg) gst_message_replace (&msg, smaller); } } - GST_OBJECT_UNLOCK (urisrc); + BUFFERING_UNLOCK (urisrc); if (msg) { GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT " with %d%%", GST_MESSAGE_SRC (msg), smaller_perc); + GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg); } else { GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%", smaller_perc); } - return msg; + g_mutex_unlock (&urisrc->buffering_post_lock); } /* Remove any buffering message from the given source */ @@ -2494,8 +2554,14 @@ static void remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src) { GList *iter; + gboolean removed = FALSE, post; + + BUFFERING_LOCK (urisrc); + g_mutex_lock (&urisrc->buffering_post_lock); + + GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT + " buffering messages", src); - GST_OBJECT_LOCK (urisrc); for (iter = urisrc->buffering_status; iter;) { GstMessage *bufstats = iter->data; if (GST_MESSAGE_SRC (bufstats) == src) { @@ -2506,7 +2572,19 @@ remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src) } iter = g_list_next (iter); } - GST_OBJECT_UNLOCK (urisrc); + + post = (removed && urisrc->buffering_status == NULL); + BUFFERING_UNLOCK (urisrc); + + if (post) { + GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%"); + + /* removed the last buffering element, post 100% */ + gst_element_post_message (GST_ELEMENT_CAST (urisrc), + gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100)); + } + + g_mutex_unlock (&urisrc->buffering_post_lock); } static void @@ -2526,7 +2604,8 @@ handle_message (GstBin * bin, GstMessage * msg) break; } case GST_MESSAGE_BUFFERING: - msg = handle_buffering_message (urisrc, msg); + handle_buffering_message (urisrc, msg); + msg = NULL; break; default: break;