splitmuxsrc: Add num-lookahead property

Add a `num-lookahead` property that will 'prepare' a number of
fragments in advance of the playhead if they have been deactivated
or closed by a limited number of `num-open-fragments`. It can help
to avoid any play stalls reading the indexes or headers of the next
file from high-latency media or on resource limited machines.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7053>
This commit is contained in:
Jan Schmidt 2024-05-15 18:45:15 +10:00
parent 93c04e7473
commit 64fd2b265f
5 changed files with 172 additions and 10 deletions

View File

@ -1028,6 +1028,10 @@ gst_splitmux_part_reader_finish_measuring_streams (GstSplitMuxPartReader *
info.duration, reader->cb_data); info.duration, reader->cb_data);
} }
do_async_done (reader); do_async_done (reader);
if (reader->loaded_cb) {
reader->loaded_cb (reader, reader->cb_data);
}
} else { } else {
SPLITMUX_PART_UNLOCK (reader); SPLITMUX_PART_UNLOCK (reader);
} }
@ -1409,11 +1413,13 @@ gst_splitmux_part_reader_set_flushing_locked (GstSplitMuxPartReader * reader,
void void
gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader, gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader * reader,
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb, gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb,
GstSplitMuxPartReaderMeasuredCb measured_cb) GstSplitMuxPartReaderMeasuredCb measured_cb,
GstSplitMuxPartReaderLoadedCb loaded_cb)
{ {
reader->cb_data = cb_data; reader->cb_data = cb_data;
reader->get_pad_cb = get_pad_cb; reader->get_pad_cb = get_pad_cb;
reader->measured_cb = measured_cb; reader->measured_cb = measured_cb;
reader->loaded_cb = loaded_cb;
} }
GstClockTime GstClockTime

View File

@ -53,6 +53,7 @@ typedef enum
typedef GstPad *(*GstSplitMuxPartReaderPadCb)(GstSplitMuxPartReader *reader, GstPad *src_pad, gpointer cb_data); typedef GstPad *(*GstSplitMuxPartReaderPadCb)(GstSplitMuxPartReader *reader, GstPad *src_pad, gpointer cb_data);
typedef void (*GstSplitMuxPartReaderMeasuredCb)(GstSplitMuxPartReader *reader, const gchar *filename, GstClockTime offset, GstClockTime duration, gpointer cb_data); typedef void (*GstSplitMuxPartReaderMeasuredCb)(GstSplitMuxPartReader *reader, const gchar *filename, GstClockTime offset, GstClockTime duration, gpointer cb_data);
typedef void (*GstSplitMuxPartReaderLoadedCb)(GstSplitMuxPartReader *reader, gpointer cb_data);
struct _GstSplitMuxPartReaderInfo struct _GstSplitMuxPartReaderInfo
{ {
@ -97,6 +98,7 @@ struct _GstSplitMuxPartReader
GstSplitMuxPartReaderPadCb get_pad_cb; GstSplitMuxPartReaderPadCb get_pad_cb;
GstSplitMuxPartReaderMeasuredCb measured_cb; GstSplitMuxPartReaderMeasuredCb measured_cb;
GstSplitMuxPartReaderLoadedCb loaded_cb;
gpointer cb_data; gpointer cb_data;
}; };
@ -114,7 +116,9 @@ gboolean gst_splitmux_part_reader_is_loaded (GstSplitMuxPartReader *part);
gboolean gst_splitmux_part_reader_is_playing (GstSplitMuxPartReader *part); gboolean gst_splitmux_part_reader_is_playing (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader *reader, void gst_splitmux_part_reader_set_callbacks (GstSplitMuxPartReader *reader,
gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb, GstSplitMuxPartReaderMeasuredCb measured_cb); gpointer cb_data, GstSplitMuxPartReaderPadCb get_pad_cb, GstSplitMuxPartReaderMeasuredCb measured_cb,
GstSplitMuxPartReaderLoadedCb loaded_cb);
gboolean gst_splitmux_part_reader_prepare (GstSplitMuxPartReader *part); gboolean gst_splitmux_part_reader_prepare (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader *part); void gst_splitmux_part_reader_unprepare (GstSplitMuxPartReader *part);
void gst_splitmux_part_reader_set_location (GstSplitMuxPartReader *reader, void gst_splitmux_part_reader_set_location (GstSplitMuxPartReader *reader,

View File

@ -61,10 +61,12 @@ enum
{ {
PROP_0, PROP_0,
PROP_LOCATION, PROP_LOCATION,
PROP_NUM_OPEN_FRAGMENTS PROP_NUM_OPEN_FRAGMENTS,
PROP_NUM_LOOKAHEAD
}; };
#define DEFAULT_OPEN_FRAGMENTS 100 #define DEFAULT_OPEN_FRAGMENTS 100
#define DEFAULT_LOOKAHEAD 1
enum enum
{ {
@ -122,6 +124,10 @@ static void
gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part, gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part,
const gchar * filename, GstClockTime offset, GstClockTime duration, const gchar * filename, GstClockTime offset, GstClockTime duration,
GstSplitMuxSrc * splitmux); GstSplitMuxSrc * splitmux);
static void
gst_splitmux_part_loaded_cb (GstSplitMuxPartReader * part,
GstSplitMuxSrc * splitmux);
static GstPad *gst_splitmux_find_output_pad (GstSplitMuxPartReader * part, static GstPad *gst_splitmux_find_output_pad (GstSplitMuxPartReader * part,
GstPad * pad, GstSplitMuxSrc * splitmux); GstPad * pad, GstSplitMuxSrc * splitmux);
static gboolean gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, static gboolean gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux,
@ -135,6 +141,8 @@ static gboolean gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux,
static gboolean gst_splitmuxsrc_add_fragment (GstSplitMuxSrc * splitmux, static gboolean gst_splitmuxsrc_add_fragment (GstSplitMuxSrc * splitmux,
const gchar * filename, GstClockTime offset, GstClockTime duration); const gchar * filename, GstClockTime offset, GstClockTime duration);
static void schedule_lookahead_check (GstSplitMuxSrc * src);
#define _do_init \ #define _do_init \
G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, splitmux_src_uri_handler_init); \ G_IMPLEMENT_INTERFACE(GST_TYPE_URI_HANDLER, splitmux_src_uri_handler_init); \
GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsrc", 0, "Split File Demuxing Source"); GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsrc", 0, "Split File Demuxing Source");
@ -252,6 +260,13 @@ gst_splitmux_src_class_init (GstSplitMuxSrcClass * klass)
"May still use slightly more if set to less than the number of streams in the files", "May still use slightly more if set to less than the number of streams in the files",
0, G_MAXUINT, DEFAULT_OPEN_FRAGMENTS, 0, G_MAXUINT, DEFAULT_OPEN_FRAGMENTS,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_LOOKAHEAD,
g_param_spec_uint ("num-lookahead", "Fragment Lookahead",
"When switching fragments, ensure the next N fragments are prepared. "
"Useful on slow devices if opening/preparing a new fragment can cause playback stalls",
0, G_MAXUINT, DEFAULT_LOOKAHEAD,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/** /**
* GstSplitMuxSrc::format-location: * GstSplitMuxSrc::format-location:
@ -301,6 +316,7 @@ gst_splitmux_src_init (GstSplitMuxSrc * splitmux)
splitmux->total_duration = GST_CLOCK_TIME_NONE; splitmux->total_duration = GST_CLOCK_TIME_NONE;
gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME); gst_segment_init (&splitmux->play_segment, GST_FORMAT_TIME);
splitmux->target_max_readers = DEFAULT_OPEN_FRAGMENTS; splitmux->target_max_readers = DEFAULT_OPEN_FRAGMENTS;
splitmux->num_lookahead = DEFAULT_LOOKAHEAD;
} }
static void static void
@ -354,6 +370,11 @@ gst_splitmux_src_set_property (GObject * object, guint prop_id,
splitmux->target_max_readers = g_value_get_uint (value); splitmux->target_max_readers = g_value_get_uint (value);
GST_OBJECT_UNLOCK (splitmux); GST_OBJECT_UNLOCK (splitmux);
break; break;
case PROP_NUM_LOOKAHEAD:
GST_OBJECT_LOCK (splitmux);
splitmux->num_lookahead = g_value_get_uint (value);
GST_OBJECT_UNLOCK (splitmux);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -377,6 +398,11 @@ gst_splitmux_src_get_property (GObject * object, guint prop_id,
g_value_set_uint (value, splitmux->target_max_readers); g_value_set_uint (value, splitmux->target_max_readers);
GST_OBJECT_UNLOCK (splitmux); GST_OBJECT_UNLOCK (splitmux);
break; break;
case PROP_NUM_LOOKAHEAD:
GST_OBJECT_LOCK (splitmux);
g_value_set_uint (value, splitmux->num_lookahead);
GST_OBJECT_UNLOCK (splitmux);
break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -473,6 +499,7 @@ gst_splitmux_src_activate_first_part (GstSplitMuxSrc * splitmux)
GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL), GST_ELEMENT_ERROR (splitmux, RESOURCE, OPEN_READ, (NULL),
("Failed to activate first part for playback")); ("Failed to activate first part for playback"));
} }
schedule_lookahead_check (splitmux);
} }
SPLITMUX_SRC_UNLOCK (splitmux); SPLITMUX_SRC_UNLOCK (splitmux);
} }
@ -551,6 +578,19 @@ gst_splitmux_part_measured_cb (GstSplitMuxPartReader * part,
SPLITMUX_SRC_UNLOCK (splitmux); SPLITMUX_SRC_UNLOCK (splitmux);
} }
static void
gst_splitmux_part_loaded_cb (GstSplitMuxPartReader * part,
GstSplitMuxSrc * splitmux)
{
SPLITMUX_SRC_LOCK (splitmux);
if (splitmux->did_initial_measuring) {
/* If we've already moved to playing, do another lookahead check for each fragment
* we load, to trigger loading another if needed */
schedule_lookahead_check (splitmux);
}
SPLITMUX_SRC_UNLOCK (splitmux);
}
static GstBusSyncReply static GstBusSyncReply
gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg, gst_splitmux_part_bus_handler (GstBus * bus, GstMessage * msg,
gpointer user_data) gpointer user_data)
@ -631,7 +671,8 @@ gst_splitmux_part_reader_create (GstSplitMuxSrc * splitmux,
gst_splitmux_part_reader_set_callbacks (r, splitmux, gst_splitmux_part_reader_set_callbacks (r, splitmux,
(GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad, (GstSplitMuxPartReaderPadCb) gst_splitmux_find_output_pad,
(GstSplitMuxPartReaderMeasuredCb) gst_splitmux_part_measured_cb); (GstSplitMuxPartReaderMeasuredCb) gst_splitmux_part_measured_cb,
(GstSplitMuxPartReaderLoadedCb) gst_splitmux_part_loaded_cb);
gst_splitmux_part_reader_set_location (r, filename); gst_splitmux_part_reader_set_location (r, filename);
bus = gst_element_get_bus (GST_ELEMENT_CAST (r)); bus = gst_element_get_bus (GST_ELEMENT_CAST (r));
@ -936,7 +977,7 @@ reduce_active_readers (GstSplitMuxSrc * splitmux)
static void static void
add_to_active_readers (GstSplitMuxSrc * splitmux, add_to_active_readers (GstSplitMuxSrc * splitmux,
GstSplitMuxPartReader * reader) GstSplitMuxPartReader * reader, gboolean add_as_oldest)
{ {
if (splitmux->target_max_readers != 0) { if (splitmux->target_max_readers != 0) {
/* Check if it's already in the active reader pool, and move this reader /* Check if it's already in the active reader pool, and move this reader
@ -952,9 +993,13 @@ add_to_active_readers (GstSplitMuxSrc * splitmux,
/* When adding a new reader to the list, reduce active readers first */ /* When adding a new reader to the list, reduce active readers first */
reduce_active_readers (splitmux); reduce_active_readers (splitmux);
} }
if (add_as_oldest) {
g_queue_push_head (splitmux->active_parts, reader);
} else {
g_queue_push_tail (splitmux->active_parts, reader); g_queue_push_tail (splitmux->active_parts, reader);
} }
} }
}
static gboolean static gboolean
gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part, gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part,
@ -964,7 +1009,7 @@ gst_splitmux_src_activate_part (GstSplitMuxSrc * splitmux, guint part,
GstSplitMuxPartReader *reader = gst_object_ref (splitmux->parts[part]); GstSplitMuxPartReader *reader = gst_object_ref (splitmux->parts[part]);
splitmux->cur_part = part; splitmux->cur_part = part;
add_to_active_readers (splitmux, reader); add_to_active_readers (splitmux, reader, FALSE);
SPLITMUX_SRC_UNLOCK (splitmux); SPLITMUX_SRC_UNLOCK (splitmux);
/* Drop lock around calling activate, as it might call back /* Drop lock around calling activate, as it might call back
@ -1038,7 +1083,7 @@ gst_splitmux_src_measure_next_part (GstSplitMuxSrc * splitmux)
GST_DEBUG_OBJECT (splitmux, "Measuring file part %s (%u)", GST_DEBUG_OBJECT (splitmux, "Measuring file part %s (%u)",
reader->path, idx); reader->path, idx);
add_to_active_readers (splitmux, reader); add_to_active_readers (splitmux, reader, TRUE);
SPLITMUX_SRC_UNLOCK (splitmux); SPLITMUX_SRC_UNLOCK (splitmux);
if (!gst_splitmux_part_reader_prepare (reader)) { if (!gst_splitmux_part_reader_prepare (reader)) {
@ -1163,6 +1208,7 @@ gst_splitmux_src_start (GstSplitMuxSrc * splitmux)
gst_element_call_async (GST_ELEMENT_CAST (splitmux), gst_element_call_async (GST_ELEMENT_CAST (splitmux),
(GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part, (GstElementCallAsyncFunc) gst_splitmux_src_activate_first_part,
NULL, NULL); NULL, NULL);
splitmux->did_initial_measuring = TRUE;
} }
SPLITMUX_SRC_UNLOCK (splitmux); SPLITMUX_SRC_UNLOCK (splitmux);
@ -1460,7 +1506,7 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad)
GST_DEBUG_OBJECT (splitpad, GST_DEBUG_OBJECT (splitpad,
"First pad to change part. Activating part %d with seg %" "First pad to change part. Activating part %d with seg %"
GST_SEGMENT_FORMAT, next_part, &tmp); GST_SEGMENT_FORMAT, next_part, &tmp);
add_to_active_readers (splitmux, splitpad->reader); add_to_active_readers (splitmux, splitpad->reader, FALSE);
if (!gst_splitmux_part_reader_activate (splitpad->reader, &tmp, if (!gst_splitmux_part_reader_activate (splitpad->reader, &tmp,
GST_SEEK_FLAG_NONE)) { GST_SEEK_FLAG_NONE)) {
@ -1468,6 +1514,7 @@ gst_splitmux_end_of_part (GstSplitMuxSrc * splitmux, SplitMuxSrcPad * splitpad)
} }
} }
splitmux->cur_part = next_part; splitmux->cur_part = next_part;
schedule_lookahead_check (splitmux);
} }
if (splitpad->part_pad) if (splitpad->part_pad)
@ -1810,3 +1857,105 @@ gst_splitmuxsrc_add_fragment (GstSplitMuxSrc * splitmux,
SPLITMUX_SRC_UNLOCK (splitmux); SPLITMUX_SRC_UNLOCK (splitmux);
return TRUE; return TRUE;
} }
static void
do_lookahead_check (GstSplitMuxSrc * splitmux)
{
SPLITMUX_SRC_LOCK (splitmux);
splitmux->lookahead_check_pending = FALSE;
if (!splitmux->running) {
goto done;
}
GST_OBJECT_LOCK (splitmux);
guint lookahead = splitmux->num_lookahead;
GST_OBJECT_UNLOCK (splitmux);
if (splitmux->target_max_readers != 0 &&
splitmux->target_max_readers <= lookahead) {
/* Don't let lookahead activate more readers than the target */
lookahead = splitmux->target_max_readers - 1;
}
if (lookahead == 0) {
goto done;
}
if (splitmux->play_segment.rate > 0.0) {
/* Walk forward */
guint i;
gsize limit = splitmux->cur_part + lookahead;
if (limit >= splitmux->num_parts) {
/* Don't check past the end */
limit = splitmux->num_parts - 1;
}
for (i = splitmux->cur_part + 1; i <= limit; i++) {
GstSplitMuxPartReader *reader = splitmux->parts[i];
if (!gst_splitmux_part_reader_is_loaded (reader)) {
GST_DEBUG_OBJECT (splitmux,
"Loading part %u reader %" GST_PTR_FORMAT " for lookahead (cur %u)",
i, reader, splitmux->cur_part);
gst_object_ref (reader);
add_to_active_readers (splitmux, reader, FALSE);
SPLITMUX_SRC_UNLOCK (splitmux);
/* Drop lock before calling activate, as it might call back
* into the splitmuxsrc when exposing pads */
gst_splitmux_part_reader_prepare (reader);
gst_object_unref (reader);
/* Only prepare one part at a time */
return;
}
/* Already active, but promote it in the LRU list */
add_to_active_readers (splitmux, reader, FALSE);
}
} else {
/* playing backward */
guint i;
gsize limit = 0;
if (splitmux->cur_part > lookahead) {
limit = splitmux->cur_part - lookahead;
}
for (i = splitmux->cur_part; i > limit; i--) {
GstSplitMuxPartReader *reader = splitmux->parts[i - 1];
if (!gst_splitmux_part_reader_is_loaded (reader)) {
GST_DEBUG_OBJECT (splitmux,
"Loading part %u reader %" GST_PTR_FORMAT " for lookahead (cur %u)",
i - 1, reader, splitmux->cur_part);
gst_object_ref (reader);
add_to_active_readers (splitmux, reader, FALSE);
SPLITMUX_SRC_UNLOCK (splitmux);
/* Drop lock before calling activate, as it might call back
* into the splitmuxsrc when exposing pads */
gst_splitmux_part_reader_prepare (reader);
gst_object_unref (reader);
/* Only prepare one part at a time */
return;
}
/* Already active, but promote it in the LRU list */
add_to_active_readers (splitmux, reader, FALSE);
}
}
done:
SPLITMUX_SRC_UNLOCK (splitmux);
}
/* Called with SPLITMUX lock held */
static void
schedule_lookahead_check (GstSplitMuxSrc * splitmux)
{
if (splitmux->lookahead_check_pending || splitmux->num_lookahead == 0
|| splitmux->target_max_readers == 0) {
/* No need to do lookahead checks */
return;
}
splitmux->lookahead_check_pending = TRUE;
gst_element_call_async (GST_ELEMENT_CAST (splitmux),
(GstElementCallAsyncFunc) do_lookahead_check, NULL, NULL);
}

View File

@ -71,6 +71,9 @@ struct _GstSplitMuxSrc
guint target_max_readers; /* Maximum number of readers we try to keep open */ guint target_max_readers; /* Maximum number of readers we try to keep open */
GQueue *active_parts; GQueue *active_parts;
guint num_lookahead;
gboolean lookahead_check_pending;
}; };
struct _GstSplitMuxSrcClass struct _GstSplitMuxSrcClass

View File

@ -87,7 +87,7 @@ main (int argc, char **argv)
/* Connect to prepare signal */ /* Connect to prepare signal */
g_signal_connect (data.reader, "prepared", (GCallback) part_prepared, &data); g_signal_connect (data.reader, "prepared", (GCallback) part_prepared, &data);
gst_splitmux_part_reader_set_callbacks (data.reader, &data, gst_splitmux_part_reader_set_callbacks (data.reader, &data,
(GstSplitMuxPartReaderPadCb) handle_get_pad, NULL); (GstSplitMuxPartReaderPadCb) handle_get_pad, NULL, NULL);
g_idle_add ((GSourceFunc) start_reader, &data); g_idle_add ((GSourceFunc) start_reader, &data);