gstmsesrc: Added locking, improved seek implementation

The position is no longer duplicated across each pad and pad's segment. The
position is now only updated if it changes in the direction of playback so that
quickly repeated forward seeks do not cause the stream to seek from 0.

Reverse playback is expressly disallowed and an unnecessary extra flush of track
when seeking was removed.

A background task was added to periodically check on the current position and
the media source's buffering levels to keep the ready state up-to-date. The
source buffer no longer needs to trigger this update, it will happen whenever
the element state is READY or higher.

Finally, added proper error reporting when failing to push a buffer and improved
debug logging.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/8512>
This commit is contained in:
Jordan Yelloz 2025-02-18 13:08:38 -07:00 committed by GStreamer Marge Bot
parent 96ec1ba613
commit 42f927f980
2 changed files with 192 additions and 95 deletions

View File

@ -43,9 +43,6 @@ GST_MSE_PRIVATE
void gst_mse_src_emit_streams (GstMseSrc * self, GstMediaSourceTrack ** tracks,
gsize n_tracks);
GST_MSE_PRIVATE
void gst_mse_src_update_ready_state (GstMseSrc * self);
GST_MSE_PRIVATE
void gst_mse_src_attach (GstMseSrc * self, GstMediaSource * media_source);

View File

@ -63,7 +63,7 @@
#include "gstsourcebuffer.h"
#include "gstsourcebuffer-private.h"
#define DEFAULT_POSITION GST_CLOCK_TIME_NONE
#define DEFAULT_POSITION 0
#define DEFAULT_DURATION GST_CLOCK_TIME_NONE
#define DEFAULT_READY_STATE GST_MSE_SRC_READY_STATE_HAVE_NOTHING
#define DECODE_ERROR "decode error"
@ -96,6 +96,13 @@ static GstStaticPadTemplate gst_mse_src_template =
GST_STATIC_PAD_TEMPLATE ("src_%s", GST_PAD_SRC, GST_PAD_SOMETIMES,
GST_STATIC_CAPS_ANY);
typedef struct
{
GWeakRef parent;
GstTask *task;
GRecMutex mutex;
} ReadyStateUpdateTask;
/**
* GstMseSrcPad:
*
@ -110,8 +117,6 @@ struct _GstMseSrcPad
GstCaps *most_recent_caps;
GstSegment segment;
GstClockTime position;
gboolean sent_stream_collection;
gboolean sent_stream_start;
gboolean sent_initial_caps;
@ -120,9 +125,11 @@ struct _GstMseSrcPad
GCond linked_or_flushing_cond;
GMutex linked_or_flushing_lock;
gboolean flushing;
gboolean eos;
};
#define MEDIA_SOURCE_LOCK(a) g_mutex_lock (&(a)->media_source_lock)
#define MEDIA_SOURCE_UNLOCK(a) g_mutex_unlock (&(a)->media_source_lock)
#define STREAMS_LOCK(a) (g_mutex_lock (&a->streams_lock))
#define STREAMS_UNLOCK(a) (g_mutex_unlock (&a->streams_lock))
@ -148,6 +155,19 @@ static gboolean pad_query (GstMseSrcPad * pad, GstObject * parent,
GstQuery * query);
static void pad_task (GstMseSrcPad * pad);
static ReadyStateUpdateTask *ready_state_update_task_new (GstMseSrc * parent);
static void ready_state_update_task_free (ReadyStateUpdateTask * task);
static void ready_state_update_task_func (ReadyStateUpdateTask * task);
static void ready_state_update_task_start (ReadyStateUpdateTask * task);
static void ready_state_update_task_stop (ReadyStateUpdateTask * task);
static void ready_state_update_task_join (ReadyStateUpdateTask * task);
static const gchar *
mse_src_ready_state_name (GstMseSrcReadyState state)
{
return gst_mse_enum_value_nick (gst_mse_src_ready_state_get_type (), state);
}
static GstPad *
gst_mse_src_pad_new (GstMediaSourceTrack * track, GstStream * stream,
guint id, GstClockTime start, gdouble rate)
@ -172,9 +192,7 @@ gst_mse_src_pad_init (GstMseSrcPad * self)
self->sent_stream_start = FALSE;
self->sent_initial_caps = FALSE;
self->does_need_segment = TRUE;
self->position = DEFAULT_POSITION;
self->flushing = FALSE;
self->eos = FALSE;
g_mutex_init (&self->linked_or_flushing_lock);
g_cond_init (&self->linked_or_flushing_cond);
@ -223,6 +241,7 @@ struct _GstMseSrc
GstElement base;
GstMediaSource *media_source;
GMutex media_source_lock;
guint group_id;
GstStreamCollection *collection;
@ -234,12 +253,11 @@ struct _GstMseSrc
gdouble rate;
GstMseSrcReadyState ready_state;
ReadyStateUpdateTask *ready_state_update_task;
GstFlowCombiner *flow_combiner;
GMutex flow_combiner_lock;
GCond eos_cond;
GMutex eos_lock;
gchar *uri;
};
@ -249,7 +267,7 @@ static GstStateChangeReturn gst_mse_src_change_state (GstElement * element,
GstStateChange transition);
static gboolean gst_mse_src_send_event (GstElement * element, GstEvent * event);
static void update_ready_state_for_init_segment (GstMseSrc * self);
static void update_ready_state_for_sample (GstMseSrc * self);
static void update_ready_state (GstMseSrc * self);
G_DEFINE_TYPE_WITH_CODE (GstMseSrc, gst_mse_src, GST_TYPE_ELEMENT,
G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_mse_src_uri_handler_init));
@ -267,13 +285,15 @@ static void
gst_mse_src_dispose (GObject * object)
{
GstMseSrc *self = GST_MSE_SRC (object);
g_clear_pointer (&self->ready_state_update_task,
ready_state_update_task_free);
gst_clear_object (&self->media_source);
g_mutex_clear (&self->media_source_lock);
gst_clear_object (&self->collection);
g_clear_pointer (&self->streams, g_hash_table_unref);
g_mutex_clear (&self->streams_lock);
g_clear_pointer (&self->flow_combiner, gst_flow_combiner_free);
g_mutex_clear (&self->flow_combiner_lock);
g_cond_clear (&self->eos_cond);
g_mutex_clear (&self->eos_lock);
G_OBJECT_CLASS (gst_mse_src_parent_class)->dispose (object);
}
@ -464,6 +484,70 @@ collection_init (const GstMseSrc * self)
return gst_stream_collection_new (G_OBJECT_TYPE_NAME (self));
}
static ReadyStateUpdateTask *
ready_state_update_task_new (GstMseSrc * parent)
{
ReadyStateUpdateTask *task = g_new0 (ReadyStateUpdateTask, 1);
g_rec_mutex_init (&task->mutex);
g_weak_ref_init (&task->parent, parent);
task->task =
gst_task_new ((GstTaskFunction) ready_state_update_task_func, task, NULL);
gst_task_set_lock (task->task, &task->mutex);
return task;
}
static void
ready_state_update_task_free (ReadyStateUpdateTask * task)
{
g_weak_ref_set (&task->parent, NULL);
gst_task_join (task->task);
gst_clear_object (&task->task);
g_weak_ref_clear (&task->parent);
g_rec_mutex_clear (&task->mutex);
g_free (task);
}
static void
ready_state_update_task_start (ReadyStateUpdateTask * task)
{
GstMseSrc *parent = g_weak_ref_get (&task->parent);
if (parent) {
gchar *name = g_strdup_printf ("%s:ready-state", GST_OBJECT_NAME (parent));
g_object_set (task->task, "name", name, NULL);
g_clear_pointer (&name, g_free);
}
gst_clear_object (&parent);
gst_task_start (task->task);
}
static void
ready_state_update_task_stop (ReadyStateUpdateTask * task)
{
gst_task_stop (task->task);
}
static void
ready_state_update_task_join (ReadyStateUpdateTask * task)
{
gst_task_join (task->task);
}
static void
ready_state_update_task_func (ReadyStateUpdateTask * task)
{
GstMseSrc *self = (GstMseSrc *) g_weak_ref_get (&task->parent);
if (self == NULL) {
GST_ERROR_OBJECT (task->task, "parent object is gone, stopping");
gst_task_stop (task->task);
return;
}
update_ready_state (self);
gst_object_unref (self);
g_usleep (G_TIME_SPAN_SECOND);
}
static void
gst_mse_src_init (GstMseSrc * self)
{
@ -473,11 +557,12 @@ gst_mse_src_init (GstMseSrc * self)
self->uri = NULL;
self->start_time = 0;
self->rate = 1;
self->media_source = NULL;
g_mutex_init (&self->media_source_lock);
g_mutex_init (&self->streams_lock);
self->flow_combiner = gst_flow_combiner_new ();
g_mutex_init (&self->flow_combiner_lock);
g_cond_init (&self->eos_cond);
g_mutex_init (&self->eos_lock);
self->ready_state_update_task = ready_state_update_task_new (self);
}
/**
@ -713,11 +798,13 @@ flush_stream (GstMseSrc * self, Stream * stream, gboolean is_seek)
GstSegment *segment = &(pad->segment);
segment->base = 0;
segment->start = self->start_time;
segment->position = self->start_time;
segment->time = self->start_time;
segment->rate = self->rate;
} else {
gst_media_source_track_flush (stream->track);
}
gst_media_source_track_flush (stream->track);
g_atomic_int_set (&pad->does_need_segment, TRUE);
gst_pad_push_event (GST_PAD (pad), gst_event_new_flush_stop (is_seek));
@ -787,11 +874,16 @@ gst_mse_src_change_state (GstElement * element, GstStateChange transition)
GstMseSrc *self = GST_MSE_SRC (element);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
ready_state_update_task_stop (self->ready_state_update_task);
tear_down_all_streams (self);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
ready_state_update_task_join (self->ready_state_update_task);
gst_mse_src_detach (self);
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
ready_state_update_task_start (self->ready_state_update_task);
break;
default:
break;
}
@ -802,9 +894,12 @@ gst_mse_src_change_state (GstElement * element, GstStateChange transition)
static void
gst_mse_src_seek (GstMseSrc * self, GstClockTime start_time, gdouble rate)
{
GST_OBJECT_LOCK (self);
self->start_time = start_time;
self->rate = rate;
GST_OBJECT_UNLOCK (self);
MEDIA_SOURCE_LOCK (self);
flush_all_streams (self, TRUE);
if (self->media_source) {
GST_DEBUG_OBJECT (self, "seeking on media source %" GST_PTR_FORMAT,
@ -813,6 +908,7 @@ gst_mse_src_seek (GstMseSrc * self, GstClockTime start_time, gdouble rate)
} else {
GST_DEBUG_OBJECT (self, "detached, not seeking on media source");
}
MEDIA_SOURCE_UNLOCK (self);
resume_all_streams (self);
}
@ -835,7 +931,7 @@ gst_mse_src_send_event (GstElement * element, GstEvent * event)
gst_event_unref (event);
if (format != GST_FORMAT_TIME || seek_type != GST_SEEK_TYPE_SET) {
if (format != GST_FORMAT_TIME || seek_type != GST_SEEK_TYPE_SET || rate < 0) {
GST_ERROR_OBJECT (self,
"Rejecting unsupported seek event: %" GST_PTR_FORMAT, event);
return FALSE;
@ -852,55 +948,32 @@ is_flushing (GstMseSrcPad * pad)
return g_atomic_int_get (&pad->flushing) || GST_PAD_IS_FLUSHING (pad);
}
static void
static gboolean
await_pad_linked_or_flushing (GstMseSrcPad * pad)
{
GST_TRACE_OBJECT (pad, "waiting for link");
LINKED_OR_FLUSHING_LOCK (pad);
gboolean flushing = FALSE;
while (!gst_pad_is_linked (GST_PAD_CAST (pad)) && !is_flushing (pad)) {
LINKED_OR_FLUSHING_WAIT (pad);
}
flushing = is_flushing (pad);
LINKED_OR_FLUSHING_UNLOCK (pad);
GST_TRACE_OBJECT (pad, "linked");
}
static gboolean
all_pads_eos_fold (const GValue * item, gboolean * all_eos, gpointer user_data)
{
GstMseSrcPad *pad = g_value_get_object (item);
if (pad->eos) {
return TRUE;
} else {
*all_eos = FALSE;
return FALSE;
}
}
static gboolean
all_pads_eos (GstMseSrc * self)
{
GstIterator *iter = gst_element_iterate_src_pads (GST_ELEMENT_CAST (self));
gboolean all_eos = TRUE;
while (gst_iterator_fold (iter,
(GstIteratorFoldFunction) all_pads_eos_fold, (GValue *) & all_eos,
NULL) == GST_ITERATOR_RESYNC) {
gst_iterator_resync (iter);
}
gst_iterator_free (iter);
return all_eos;
return flushing;
}
static void
pad_task (GstMseSrcPad * pad)
{
await_pad_linked_or_flushing (pad);
if (is_flushing (pad)) {
GstMseSrc *self = NULL;
gboolean flushing = await_pad_linked_or_flushing (pad);
if (flushing) {
GST_TRACE_OBJECT (pad, "pad is flushing");
goto pause;
}
GstMseSrc *self = GST_MSE_SRC (gst_pad_get_parent_element (GST_PAD (pad)));
self = GST_MSE_SRC (gst_pad_get_parent_element (GST_PAD (pad)));
GstMediaSourceTrack *track = pad->track;
@ -976,7 +1049,11 @@ pad_task (GstMseSrcPad * pad)
GstBuffer *buffer = gst_buffer_copy (gst_sample_get_buffer (sample));
if (GST_BUFFER_DTS_IS_VALID (buffer)) {
pad->position = GST_BUFFER_DTS (buffer);
GstClockTime duration =
GST_BUFFER_DURATION_IS_VALID (buffer) ? GST_BUFFER_DURATION (buffer) :
1;
GstClockTime buffer_end = GST_BUFFER_DTS (buffer) + duration;
pad->segment.position = MAX (pad->segment.position, buffer_end);
}
GstFlowReturn push_result = gst_pad_push (GST_PAD (pad), buffer);
@ -987,26 +1064,24 @@ pad_task (GstMseSrcPad * pad)
GST_PAD_CAST (pad), push_result);
FLOW_COMBINER_UNLOCK (self);
if (combined_result != GST_FLOW_OK) {
GST_DEBUG_OBJECT (pad, "push result: %s, combined result: %s",
gst_flow_get_name (push_result), gst_flow_get_name (combined_result));
goto pause;
switch (combined_result) {
case GST_FLOW_OK:
break;
case GST_FLOW_FLUSHING:
goto pause;
default:
GST_ELEMENT_ERROR (self, CORE, PAD, ("failed to push data downstream"),
("pad result: %s, combined result %s",
gst_flow_get_name (push_result),
gst_flow_get_name (combined_result)));
goto pause;
}
} else if (GST_IS_EVENT (object)) {
if (GST_EVENT_TYPE (object) == GST_EVENT_EOS) {
g_mutex_lock (&self->eos_lock);
pad->eos = TRUE;
g_cond_broadcast (&self->eos_cond);
g_mutex_unlock (&self->eos_lock);
g_mutex_lock (&self->eos_lock);
while (!all_pads_eos (self)) {
GST_DEBUG_OBJECT (pad, "waiting for eos on all tracks");
g_cond_wait (&self->eos_cond, &self->eos_lock);
}
g_mutex_unlock (&self->eos_lock);
GST_DEBUG_OBJECT (pad, "have eos on all tracks");
GstEvent *event = GST_EVENT (g_steal_pointer (&object));
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_LOG_OBJECT (self, "EOS");
}
if (!gst_pad_push_event (GST_PAD (pad), GST_EVENT (object))) {
if (!gst_pad_push_event (GST_PAD (pad), event)) {
GST_ERROR_OBJECT (self, "failed to push enqueued event");
goto pause;
}
@ -1016,12 +1091,14 @@ pad_task (GstMseSrcPad * pad)
g_assert_not_reached ();
}
gst_clear_object (&self);
return;
pause:
if (!g_atomic_int_get (&pad->flushing)) {
gst_pad_pause_task (GST_PAD (pad));
}
gst_clear_object (&self);
}
static gboolean
@ -1073,7 +1150,7 @@ pad_query (GstMseSrcPad * pad, GstObject * parent, GstQuery * query)
GstMseSrc *self = GST_MSE_SRC (parent);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_POSITION:{
GstClockTime position = pad->position;
GstClockTime position = pad->segment.position;
GstFormat fmt;
gst_query_parse_position (query, &fmt, NULL);
if (fmt == GST_FORMAT_TIME && GST_CLOCK_TIME_IS_VALID (position)) {
@ -1181,13 +1258,6 @@ gst_mse_src_emit_streams (GstMseSrc * self, GstMediaSourceTrack ** tracks,
self->collection));
}
void
gst_mse_src_update_ready_state (GstMseSrc * self)
{
g_return_if_fail (GST_IS_MSE_SRC (self));
update_ready_state_for_sample (self);
}
static GstURIType
gst_mse_src_uri_get_type (GType type)
{
@ -1205,7 +1275,10 @@ static gchar *
gst_mse_src_uri_get_uri (GstURIHandler * handler)
{
GstMseSrc *self = GST_MSE_SRC (handler);
return g_strdup (self->uri);
GST_OBJECT_LOCK (self);
gchar *uri = g_strdup (self->uri);
GST_OBJECT_UNLOCK (self);
return uri;
}
static gboolean
@ -1213,8 +1286,10 @@ gst_mse_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
GError ** error)
{
GstMseSrc *self = GST_MSE_SRC (handler);
GST_OBJECT_LOCK (self);
g_free (self->uri);
self->uri = g_strdup (uri);
GST_OBJECT_UNLOCK (self);
return TRUE;
}
@ -1233,14 +1308,20 @@ gst_mse_src_attach (GstMseSrc * self, GstMediaSource * media_source)
{
g_return_if_fail (GST_IS_MSE_SRC (self));
g_return_if_fail (GST_IS_MEDIA_SOURCE (media_source));
MEDIA_SOURCE_LOCK (self);
g_set_object (&self->media_source, media_source);
MEDIA_SOURCE_UNLOCK (self);
}
void
gst_mse_src_detach (GstMseSrc * self)
{
g_return_if_fail (GST_IS_MSE_SRC (self));
MEDIA_SOURCE_LOCK (self);
gst_clear_object (&self->media_source);
MEDIA_SOURCE_UNLOCK (self);
}
static void
@ -1249,7 +1330,9 @@ set_ready_state (GstMseSrc * self, GstMseSrcReadyState ready_state)
if (ready_state == self->ready_state) {
return;
}
GST_DEBUG_OBJECT (self, "ready state %d=>%d", self->ready_state, ready_state);
GST_DEBUG_OBJECT (self, "ready state %s=>%s",
mse_src_ready_state_name (self->ready_state),
mse_src_ready_state_name (ready_state));
self->ready_state = ready_state;
g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_READY_STATE]);
}
@ -1257,9 +1340,13 @@ set_ready_state (GstMseSrc * self, GstMseSrcReadyState ready_state)
static void
update_ready_state_for_init_segment (GstMseSrc * self)
{
g_return_if_fail (GST_IS_MEDIA_SOURCE (self->media_source));
MEDIA_SOURCE_LOCK (self);
if (self->media_source == NULL) {
goto done;
}
if (self->ready_state != GST_MSE_SRC_READY_STATE_HAVE_NOTHING) {
return;
goto done;
}
GstSourceBufferList *buffers = gst_media_source_get_source_buffers
(self->media_source);
@ -1275,13 +1362,17 @@ update_ready_state_for_init_segment (GstMseSrc * self)
if (!all_received_init_segment) {
return;
}
set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_METADATA);
set_ready_state (self, MAX (self->ready_state,
GST_MSE_SRC_READY_STATE_HAVE_METADATA));
done:
MEDIA_SOURCE_UNLOCK (self);
}
static gboolean
has_current_data (GstMseSrc * self)
has_current_data (GstMseSrc * self, GstClockTime position,
GstClockTime duration)
{
GstClockTime position = gst_mse_src_get_position (self);
if (!GST_CLOCK_TIME_IS_VALID (position)) {
return FALSE;
}
@ -1306,10 +1397,8 @@ has_current_data (GstMseSrc * self)
}
static gboolean
has_future_data (GstMseSrc * self)
has_future_data (GstMseSrc * self, GstClockTime position, GstClockTime duration)
{
GstClockTime position = gst_mse_src_get_position (self);
GstClockTime duration = self->duration;
if (!GST_CLOCK_TIME_IS_VALID (position)
|| !GST_CLOCK_TIME_IS_VALID (duration)) {
return FALSE;
@ -1338,10 +1427,8 @@ has_future_data (GstMseSrc * self)
}
static gboolean
has_enough_data (GstMseSrc * self)
has_enough_data (GstMseSrc * self, GstClockTime position, GstClockTime duration)
{
GstClockTime position = gst_mse_src_get_position (self);
GstClockTime duration = self->duration;
if (!GST_CLOCK_TIME_IS_VALID (position)
|| !GST_CLOCK_TIME_IS_VALID (duration)) {
return FALSE;
@ -1370,18 +1457,31 @@ has_enough_data (GstMseSrc * self)
}
static void
update_ready_state_for_sample (GstMseSrc * self)
update_ready_state (GstMseSrc * self)
{
g_return_if_fail (GST_IS_MEDIA_SOURCE (self->media_source));
g_return_if_fail (self->ready_state >= GST_MSE_SRC_READY_STATE_HAVE_METADATA);
MEDIA_SOURCE_LOCK (self);
if (has_enough_data (self)) {
if (self->media_source == NULL) {
goto done;
}
if (self->ready_state < GST_MSE_SRC_READY_STATE_HAVE_METADATA) {
goto done;
}
GstClockTime position = gst_mse_src_get_position (self);
GstClockTime duration = self->duration;
if (has_enough_data (self, position, duration)) {
set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_ENOUGH_DATA);
} else if (has_future_data (self)) {
} else if (has_future_data (self, position, duration)) {
set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_FUTURE_DATA);
} else if (has_current_data (self)) {
} else if (has_current_data (self, position, duration)) {
set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_CURRENT_DATA);
} else {
set_ready_state (self, GST_MSE_SRC_READY_STATE_HAVE_METADATA);
}
done:
MEDIA_SOURCE_UNLOCK (self);
}