diff --git a/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc-private.h b/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc-private.h index b40b0a6553..b0d5099fbd 100644 --- a/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc-private.h +++ b/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc-private.h @@ -43,9 +43,6 @@ GST_MSE_PRIVATE void gst_mse_src_emit_streams (GstMseSrc * self, GstMediaSourceTrack ** tracks, gsize n_tracks); -GST_MSE_PRIVATE -void gst_mse_src_update_ready_state (GstMseSrc * self); - GST_MSE_PRIVATE void gst_mse_src_attach (GstMseSrc * self, GstMediaSource * media_source); diff --git a/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc.c b/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc.c index f9a8b115ef..6939514a25 100644 --- a/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc.c +++ b/subprojects/gst-plugins-bad/gst-libs/gst/mse/gstmsesrc.c @@ -63,7 +63,7 @@ #include "gstsourcebuffer.h" #include "gstsourcebuffer-private.h" -#define DEFAULT_POSITION GST_CLOCK_TIME_NONE +#define DEFAULT_POSITION 0 #define DEFAULT_DURATION GST_CLOCK_TIME_NONE #define DEFAULT_READY_STATE GST_MSE_SRC_READY_STATE_HAVE_NOTHING #define DECODE_ERROR "decode error" @@ -96,6 +96,13 @@ static GstStaticPadTemplate gst_mse_src_template = GST_STATIC_PAD_TEMPLATE ("src_%s", GST_PAD_SRC, GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY); +typedef struct +{ + GWeakRef parent; + GstTask *task; + GRecMutex mutex; +} ReadyStateUpdateTask; + /** * GstMseSrcPad: * @@ -110,8 +117,6 @@ struct _GstMseSrcPad GstCaps *most_recent_caps; GstSegment segment; - GstClockTime position; - gboolean sent_stream_collection; gboolean sent_stream_start; gboolean sent_initial_caps; @@ -120,9 +125,11 @@ struct _GstMseSrcPad GCond linked_or_flushing_cond; GMutex linked_or_flushing_lock; gboolean flushing; - gboolean eos; }; +#define MEDIA_SOURCE_LOCK(a) g_mutex_lock (&(a)->media_source_lock) +#define MEDIA_SOURCE_UNLOCK(a) g_mutex_unlock (&(a)->media_source_lock) + #define STREAMS_LOCK(a) (g_mutex_lock (&a->streams_lock)) #define STREAMS_UNLOCK(a) (g_mutex_unlock (&a->streams_lock)) @@ -148,6 +155,19 @@ static gboolean pad_query (GstMseSrcPad * pad, GstObject * parent, GstQuery * query); static void pad_task (GstMseSrcPad * pad); +static ReadyStateUpdateTask *ready_state_update_task_new (GstMseSrc * parent); +static void ready_state_update_task_free (ReadyStateUpdateTask * task); +static void ready_state_update_task_func (ReadyStateUpdateTask * task); +static void ready_state_update_task_start (ReadyStateUpdateTask * task); +static void ready_state_update_task_stop (ReadyStateUpdateTask * task); +static void ready_state_update_task_join (ReadyStateUpdateTask * task); + +static const gchar * +mse_src_ready_state_name (GstMseSrcReadyState state) +{ + return gst_mse_enum_value_nick (gst_mse_src_ready_state_get_type (), state); +} + static GstPad * gst_mse_src_pad_new (GstMediaSourceTrack * track, GstStream * stream, guint id, GstClockTime start, gdouble rate) @@ -172,9 +192,7 @@ gst_mse_src_pad_init (GstMseSrcPad * self) self->sent_stream_start = FALSE; self->sent_initial_caps = FALSE; self->does_need_segment = TRUE; - self->position = DEFAULT_POSITION; self->flushing = FALSE; - self->eos = FALSE; g_mutex_init (&self->linked_or_flushing_lock); g_cond_init (&self->linked_or_flushing_cond); @@ -223,6 +241,7 @@ struct _GstMseSrc GstElement base; GstMediaSource *media_source; + GMutex media_source_lock; guint group_id; GstStreamCollection *collection; @@ -234,12 +253,11 @@ struct _GstMseSrc gdouble rate; GstMseSrcReadyState ready_state; + ReadyStateUpdateTask *ready_state_update_task; + GstFlowCombiner *flow_combiner; GMutex flow_combiner_lock; - GCond eos_cond; - GMutex eos_lock; - gchar *uri; }; @@ -249,7 +267,7 @@ static GstStateChangeReturn gst_mse_src_change_state (GstElement * element, GstStateChange transition); static gboolean gst_mse_src_send_event (GstElement * element, GstEvent * event); static void update_ready_state_for_init_segment (GstMseSrc * self); -static void update_ready_state_for_sample (GstMseSrc * self); +static void update_ready_state (GstMseSrc * self); G_DEFINE_TYPE_WITH_CODE (GstMseSrc, gst_mse_src, GST_TYPE_ELEMENT, G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_mse_src_uri_handler_init)); @@ -267,13 +285,15 @@ static void gst_mse_src_dispose (GObject * object) { GstMseSrc *self = GST_MSE_SRC (object); + g_clear_pointer (&self->ready_state_update_task, + ready_state_update_task_free); + gst_clear_object (&self->media_source); + g_mutex_clear (&self->media_source_lock); gst_clear_object (&self->collection); g_clear_pointer (&self->streams, g_hash_table_unref); g_mutex_clear (&self->streams_lock); g_clear_pointer (&self->flow_combiner, gst_flow_combiner_free); g_mutex_clear (&self->flow_combiner_lock); - g_cond_clear (&self->eos_cond); - g_mutex_clear (&self->eos_lock); G_OBJECT_CLASS (gst_mse_src_parent_class)->dispose (object); } @@ -464,6 +484,70 @@ collection_init (const GstMseSrc * self) return gst_stream_collection_new (G_OBJECT_TYPE_NAME (self)); } +static ReadyStateUpdateTask * +ready_state_update_task_new (GstMseSrc * parent) +{ + ReadyStateUpdateTask *task = g_new0 (ReadyStateUpdateTask, 1); + g_rec_mutex_init (&task->mutex); + g_weak_ref_init (&task->parent, parent); + task->task = + gst_task_new ((GstTaskFunction) ready_state_update_task_func, task, NULL); + gst_task_set_lock (task->task, &task->mutex); + return task; +} + +static void +ready_state_update_task_free (ReadyStateUpdateTask * task) +{ + g_weak_ref_set (&task->parent, NULL); + gst_task_join (task->task); + gst_clear_object (&task->task); + g_weak_ref_clear (&task->parent); + g_rec_mutex_clear (&task->mutex); + g_free (task); +} + +static void +ready_state_update_task_start (ReadyStateUpdateTask * task) +{ + GstMseSrc *parent = g_weak_ref_get (&task->parent); + if (parent) { + gchar *name = g_strdup_printf ("%s:ready-state", GST_OBJECT_NAME (parent)); + g_object_set (task->task, "name", name, NULL); + g_clear_pointer (&name, g_free); + } + gst_clear_object (&parent); + gst_task_start (task->task); +} + +static void +ready_state_update_task_stop (ReadyStateUpdateTask * task) +{ + gst_task_stop (task->task); +} + +static void +ready_state_update_task_join (ReadyStateUpdateTask * task) +{ + gst_task_join (task->task); +} + +static void +ready_state_update_task_func (ReadyStateUpdateTask * task) +{ + GstMseSrc *self = (GstMseSrc *) g_weak_ref_get (&task->parent); + + if (self == NULL) { + GST_ERROR_OBJECT (task->task, "parent object is gone, stopping"); + gst_task_stop (task->task); + return; + } + + update_ready_state (self); + gst_object_unref (self); + g_usleep (G_TIME_SPAN_SECOND); +} + static void gst_mse_src_init (GstMseSrc * self) { @@ -473,11 +557,12 @@ gst_mse_src_init (GstMseSrc * self) self->uri = NULL; self->start_time = 0; self->rate = 1; + self->media_source = NULL; + g_mutex_init (&self->media_source_lock); g_mutex_init (&self->streams_lock); self->flow_combiner = gst_flow_combiner_new (); g_mutex_init (&self->flow_combiner_lock); - g_cond_init (&self->eos_cond); - g_mutex_init (&self->eos_lock); + self->ready_state_update_task = ready_state_update_task_new (self); } /** @@ -713,11 +798,13 @@ flush_stream (GstMseSrc * self, Stream * stream, gboolean is_seek) GstSegment *segment = &(pad->segment); segment->base = 0; segment->start = self->start_time; + segment->position = self->start_time; segment->time = self->start_time; segment->rate = self->rate; + } else { + gst_media_source_track_flush (stream->track); } - gst_media_source_track_flush (stream->track); g_atomic_int_set (&pad->does_need_segment, TRUE); gst_pad_push_event (GST_PAD (pad), gst_event_new_flush_stop (is_seek)); @@ -787,11 +874,16 @@ gst_mse_src_change_state (GstElement * element, GstStateChange transition) GstMseSrc *self = GST_MSE_SRC (element); switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: + ready_state_update_task_stop (self->ready_state_update_task); tear_down_all_streams (self); break; case GST_STATE_CHANGE_READY_TO_NULL: + ready_state_update_task_join (self->ready_state_update_task); gst_mse_src_detach (self); break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + ready_state_update_task_start (self->ready_state_update_task); + break; default: break; } @@ -802,9 +894,12 @@ gst_mse_src_change_state (GstElement * element, GstStateChange transition) static void gst_mse_src_seek (GstMseSrc * self, GstClockTime start_time, gdouble rate) { + GST_OBJECT_LOCK (self); self->start_time = start_time; self->rate = rate; + GST_OBJECT_UNLOCK (self); + MEDIA_SOURCE_LOCK (self); flush_all_streams (self, TRUE); if (self->media_source) { GST_DEBUG_OBJECT (self, "seeking on media source %" GST_PTR_FORMAT, @@ -813,6 +908,7 @@ gst_mse_src_seek (GstMseSrc * self, GstClockTime start_time, gdouble rate) } else { GST_DEBUG_OBJECT (self, "detached, not seeking on media source"); } + MEDIA_SOURCE_UNLOCK (self); resume_all_streams (self); } @@ -835,7 +931,7 @@ gst_mse_src_send_event (GstElement * element, GstEvent * event) gst_event_unref (event); - if (format != GST_FORMAT_TIME || seek_type != GST_SEEK_TYPE_SET) { + if (format != GST_FORMAT_TIME || seek_type != GST_SEEK_TYPE_SET || rate < 0) { GST_ERROR_OBJECT (self, "Rejecting unsupported seek event: %" GST_PTR_FORMAT, event); return FALSE; @@ -852,55 +948,32 @@ is_flushing (GstMseSrcPad * pad) return g_atomic_int_get (&pad->flushing) || GST_PAD_IS_FLUSHING (pad); } -static void +static gboolean await_pad_linked_or_flushing (GstMseSrcPad * pad) { GST_TRACE_OBJECT (pad, "waiting for link"); LINKED_OR_FLUSHING_LOCK (pad); + gboolean flushing = FALSE; while (!gst_pad_is_linked (GST_PAD_CAST (pad)) && !is_flushing (pad)) { LINKED_OR_FLUSHING_WAIT (pad); } + flushing = is_flushing (pad); LINKED_OR_FLUSHING_UNLOCK (pad); GST_TRACE_OBJECT (pad, "linked"); -} - -static gboolean -all_pads_eos_fold (const GValue * item, gboolean * all_eos, gpointer user_data) -{ - GstMseSrcPad *pad = g_value_get_object (item); - if (pad->eos) { - return TRUE; - } else { - *all_eos = FALSE; - return FALSE; - } -} - -static gboolean -all_pads_eos (GstMseSrc * self) -{ - GstIterator *iter = gst_element_iterate_src_pads (GST_ELEMENT_CAST (self)); - gboolean all_eos = TRUE; - while (gst_iterator_fold (iter, - (GstIteratorFoldFunction) all_pads_eos_fold, (GValue *) & all_eos, - NULL) == GST_ITERATOR_RESYNC) { - gst_iterator_resync (iter); - } - gst_iterator_free (iter); - return all_eos; + return flushing; } static void pad_task (GstMseSrcPad * pad) { - await_pad_linked_or_flushing (pad); - - if (is_flushing (pad)) { + GstMseSrc *self = NULL; + gboolean flushing = await_pad_linked_or_flushing (pad); + if (flushing) { GST_TRACE_OBJECT (pad, "pad is flushing"); goto pause; } - GstMseSrc *self = GST_MSE_SRC (gst_pad_get_parent_element (GST_PAD (pad))); + self = GST_MSE_SRC (gst_pad_get_parent_element (GST_PAD (pad))); GstMediaSourceTrack *track = pad->track; @@ -976,7 +1049,11 @@ pad_task (GstMseSrcPad * pad) GstBuffer *buffer = gst_buffer_copy (gst_sample_get_buffer (sample)); if (GST_BUFFER_DTS_IS_VALID (buffer)) { - pad->position = GST_BUFFER_DTS (buffer); + GstClockTime duration = + GST_BUFFER_DURATION_IS_VALID (buffer) ? GST_BUFFER_DURATION (buffer) : + 1; + GstClockTime buffer_end = GST_BUFFER_DTS (buffer) + duration; + pad->segment.position = MAX (pad->segment.position, buffer_end); } GstFlowReturn push_result = gst_pad_push (GST_PAD (pad), buffer); @@ -987,26 +1064,24 @@ pad_task (GstMseSrcPad * pad) GST_PAD_CAST (pad), push_result); FLOW_COMBINER_UNLOCK (self); - if (combined_result != GST_FLOW_OK) { - GST_DEBUG_OBJECT (pad, "push result: %s, combined result: %s", - gst_flow_get_name (push_result), gst_flow_get_name (combined_result)); - goto pause; + switch (combined_result) { + case GST_FLOW_OK: + break; + case GST_FLOW_FLUSHING: + goto pause; + default: + GST_ELEMENT_ERROR (self, CORE, PAD, ("failed to push data downstream"), + ("pad result: %s, combined result %s", + gst_flow_get_name (push_result), + gst_flow_get_name (combined_result))); + goto pause; } } else if (GST_IS_EVENT (object)) { - if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) { - g_mutex_lock (&self->eos_lock); - pad->eos = TRUE; - g_cond_broadcast (&self->eos_cond); - g_mutex_unlock (&self->eos_lock); - g_mutex_lock (&self->eos_lock); - while (!all_pads_eos (self)) { - GST_DEBUG_OBJECT (pad, "waiting for eos on all tracks"); - g_cond_wait (&self->eos_cond, &self->eos_lock); - } - g_mutex_unlock (&self->eos_lock); - GST_DEBUG_OBJECT (pad, "have eos on all tracks"); + GstEvent *event = GST_EVENT (g_steal_pointer (&object)); + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + GST_LOG_OBJECT (self, "EOS"); } - if (!gst_pad_push_event (GST_PAD (pad), GST_EVENT (object))) { + if (!gst_pad_push_event (GST_PAD (pad), event)) { GST_ERROR_OBJECT (self, "failed to push enqueued event"); goto pause; } @@ -1016,12 +1091,14 @@ pad_task (GstMseSrcPad * pad) g_assert_not_reached (); } + gst_clear_object (&self); return; pause: if (!g_atomic_int_get (&pad->flushing)) { gst_pad_pause_task (GST_PAD (pad)); } + gst_clear_object (&self); } static gboolean @@ -1073,7 +1150,7 @@ pad_query (GstMseSrcPad * pad, GstObject * parent, GstQuery * query) GstMseSrc *self = GST_MSE_SRC (parent); switch (GST_QUERY_TYPE (query)) { case GST_QUERY_POSITION:{ - GstClockTime position = pad->position; + GstClockTime position = pad->segment.position; GstFormat fmt; gst_query_parse_position (query, &fmt, NULL); if (fmt == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (position)) { @@ -1181,13 +1258,6 @@ gst_mse_src_emit_streams (GstMseSrc * self, GstMediaSourceTrack ** tracks, self->collection)); } -void -gst_mse_src_update_ready_state (GstMseSrc * self) -{ - g_return_if_fail (GST_IS_MSE_SRC (self)); - update_ready_state_for_sample (self); -} - static GstURIType gst_mse_src_uri_get_type (GType type) { @@ -1205,7 +1275,10 @@ static gchar * gst_mse_src_uri_get_uri (GstURIHandler * handler) { GstMseSrc *self = GST_MSE_SRC (handler); - return g_strdup (self->uri); + GST_OBJECT_LOCK (self); + gchar *uri = g_strdup (self->uri); + GST_OBJECT_UNLOCK (self); + return uri; } static gboolean @@ -1213,8 +1286,10 @@ gst_mse_src_uri_set_uri (GstURIHandler * handler, const gchar * uri, GError ** error) { GstMseSrc *self = GST_MSE_SRC (handler); + GST_OBJECT_LOCK (self); g_free (self->uri); self->uri = g_strdup (uri); + GST_OBJECT_UNLOCK (self); return TRUE; } @@ -1233,14 +1308,20 @@ gst_mse_src_attach (GstMseSrc * self, GstMediaSource * media_source) { g_return_if_fail (GST_IS_MSE_SRC (self)); g_return_if_fail (GST_IS_MEDIA_SOURCE (media_source)); + + MEDIA_SOURCE_LOCK (self); g_set_object (&self->media_source, media_source); + MEDIA_SOURCE_UNLOCK (self); } void gst_mse_src_detach (GstMseSrc * self) { g_return_if_fail (GST_IS_MSE_SRC (self)); + + MEDIA_SOURCE_LOCK (self); gst_clear_object (&self->media_source); + MEDIA_SOURCE_UNLOCK (self); } static void @@ -1249,7 +1330,9 @@ set_ready_state (GstMseSrc * self, GstMseSrcReadyState ready_state) if (ready_state == self->ready_state) { return; } - GST_DEBUG_OBJECT (self, "ready state %d=>%d", self->ready_state, ready_state); + GST_DEBUG_OBJECT (self, "ready state %s=>%s", + mse_src_ready_state_name (self->ready_state), + mse_src_ready_state_name (ready_state)); self->ready_state = ready_state; g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_READY_STATE]); } @@ -1257,9 +1340,13 @@ set_ready_state (GstMseSrc * self, GstMseSrcReadyState ready_state) static void update_ready_state_for_init_segment (GstMseSrc * self) { - g_return_if_fail (GST_IS_MEDIA_SOURCE (self->media_source)); + MEDIA_SOURCE_LOCK (self); + + if (self->media_source == NULL) { + goto done; + } if (self->ready_state != GST_MSE_SRC_READY_STATE_HAVE_NOTHING) { - return; + goto done; } GstSourceBufferList *buffers = gst_media_source_get_source_buffers (self->media_source); @@ -1275,13 +1362,17 @@ update_ready_state_for_init_segment (GstMseSrc * self) if (!all_received_init_segment) { return; } - set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_METADATA); + set_ready_state (self, MAX (self->ready_state, + GST_MSE_SRC_READY_STATE_HAVE_METADATA)); + +done: + MEDIA_SOURCE_UNLOCK (self); } static gboolean -has_current_data (GstMseSrc * self) +has_current_data (GstMseSrc * self, GstClockTime position, + GstClockTime duration) { - GstClockTime position = gst_mse_src_get_position (self); if (!GST_CLOCK_TIME_IS_VALID (position)) { return FALSE; } @@ -1306,10 +1397,8 @@ has_current_data (GstMseSrc * self) } static gboolean -has_future_data (GstMseSrc * self) +has_future_data (GstMseSrc * self, GstClockTime position, GstClockTime duration) { - GstClockTime position = gst_mse_src_get_position (self); - GstClockTime duration = self->duration; if (!GST_CLOCK_TIME_IS_VALID (position) || !GST_CLOCK_TIME_IS_VALID (duration)) { return FALSE; @@ -1338,10 +1427,8 @@ has_future_data (GstMseSrc * self) } static gboolean -has_enough_data (GstMseSrc * self) +has_enough_data (GstMseSrc * self, GstClockTime position, GstClockTime duration) { - GstClockTime position = gst_mse_src_get_position (self); - GstClockTime duration = self->duration; if (!GST_CLOCK_TIME_IS_VALID (position) || !GST_CLOCK_TIME_IS_VALID (duration)) { return FALSE; @@ -1370,18 +1457,31 @@ has_enough_data (GstMseSrc * self) } static void -update_ready_state_for_sample (GstMseSrc * self) +update_ready_state (GstMseSrc * self) { - g_return_if_fail (GST_IS_MEDIA_SOURCE (self->media_source)); - g_return_if_fail (self->ready_state >= GST_MSE_SRC_READY_STATE_HAVE_METADATA); + MEDIA_SOURCE_LOCK (self); - if (has_enough_data (self)) { + if (self->media_source == NULL) { + goto done; + } + + if (self->ready_state < GST_MSE_SRC_READY_STATE_HAVE_METADATA) { + goto done; + } + + GstClockTime position = gst_mse_src_get_position (self); + GstClockTime duration = self->duration; + + if (has_enough_data (self, position, duration)) { set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_ENOUGH_DATA); - } else if (has_future_data (self)) { + } else if (has_future_data (self, position, duration)) { set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_FUTURE_DATA); - } else if (has_current_data (self)) { + } else if (has_current_data (self, position, duration)) { set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_CURRENT_DATA); } else { set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_METADATA); } + +done: + MEDIA_SOURCE_UNLOCK (self); }