splitmuxsink: Add fragment offset and duration to message

Publish the playback offset for and duration into the
splitmuxsink-fragment-closed bus message as each fragment
finishes.

These can be passed to splitmuxsrc via the 'add-fragment'
signal to avoid splitmuxsrc measuring all files on startup

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7053>
This commit is contained in:
Jan Schmidt 2024-04-30 00:33:35 +10:00
parent b0df6ee408
commit ed03e8f8ab
6 changed files with 446 additions and 53 deletions

View File

@ -183,7 +183,7 @@ GST_STATIC_PAD_TEMPLATE ("caption_%u",
static GQuark PAD_CONTEXT; static GQuark PAD_CONTEXT;
static GQuark EOS_FROM_US; static GQuark EOS_FROM_US;
static GQuark RUNNING_TIME; static GQuark SINK_FRAGMENT_INFO;
/* EOS_FROM_US is only valid in async-finalize mode. We need to know whether /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
* to forward an incoming EOS message, but we cannot rely on the state of the * to forward an incoming EOS message, but we cannot rely on the state of the
* splitmux anymore, so we set this qdata on the sink instead. * splitmux anymore, so we set this qdata on the sink instead.
@ -203,9 +203,10 @@ static GQuark RUNNING_TIME;
static void static void
_do_init (void) _do_init (void)
{ {
PAD_CONTEXT = g_quark_from_static_string ("pad-context"); PAD_CONTEXT = g_quark_from_static_string ("splitmuxsink-pad-context");
EOS_FROM_US = g_quark_from_static_string ("eos-from-us"); EOS_FROM_US = g_quark_from_static_string ("splitmuxsink-eos-from-us");
RUNNING_TIME = g_quark_from_static_string ("running-time"); SINK_FRAGMENT_INFO =
g_quark_from_static_string ("splitmuxsink-fragment-info");
GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0, GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
"Split File Muxing Sink"); "Split File Muxing Sink");
} }
@ -1079,7 +1080,8 @@ mq_stream_ctx_reset (MqStreamCtx * ctx)
{ {
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE; ctx->out_fragment_start_runts = ctx->in_running_time = ctx->out_running_time =
GST_CLOCK_STIME_NONE;
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL); g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs); g_queue_clear (&ctx->queued_bufs);
} }
@ -1125,33 +1127,46 @@ send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
GstElement * sink) GstElement * sink)
{ {
gchar *location = NULL; gchar *location = NULL;
GstMessage *msg;
const gchar *msg_name = opened ? const gchar *msg_name = opened ?
"splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed"; "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
GstClockTime running_time = splitmux->reference_ctx->out_running_time; OutputFragmentInfo *out_fragment_info = &splitmux->out_fragment_info;
if (!opened) { if (!opened) {
GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME); OutputFragmentInfo *sink_fragment_info =
if (rtime) g_object_get_qdata (G_OBJECT (sink), SINK_FRAGMENT_INFO);
running_time = *rtime; if (sink_fragment_info != NULL) {
out_fragment_info = sink_fragment_info;
}
} }
if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink), if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
"location") != NULL) "location") != NULL) {
g_object_get (sink, "location", &location, NULL); g_object_get (sink, "location", &location, NULL);
}
GST_DEBUG_OBJECT (splitmux, GST_DEBUG_OBJECT (splitmux,
"Sending %s message. Running time %" GST_TIME_FORMAT " location %s", "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location)); msg_name, GST_TIME_ARGS (out_fragment_info->last_running_time),
GST_STR_NULL (location));
/* If it's in the middle of a teardown, the reference_ctc might have become /* If it's in the middle of a teardown, the reference_ctc might have become
* NULL */ * NULL */
if (splitmux->reference_ctx) { if (splitmux->reference_ctx) {
msg = gst_message_new_element (GST_OBJECT (splitmux), GstStructure *s = gst_structure_new (msg_name,
gst_structure_new (msg_name, "location", G_TYPE_STRING, location,
"location", G_TYPE_STRING, location, "running-time", GST_TYPE_CLOCK_TIME,
"running-time", GST_TYPE_CLOCK_TIME, running_time, out_fragment_info->last_running_time, "sink", GST_TYPE_ELEMENT,
"sink", GST_TYPE_ELEMENT, sink, NULL)); sink, NULL);
if (!opened) {
GstClockTime offset = out_fragment_info->fragment_offset;
GstClockTime duration = out_fragment_info->fragment_duration;
gst_structure_set (s,
"fragment-offset", GST_TYPE_CLOCK_TIME, offset,
"fragment-duration", GST_TYPE_CLOCK_TIME, duration, NULL);
}
GstMessage *msg = gst_message_new_element (GST_OBJECT (splitmux), s);
gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg); gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
} }
@ -1250,6 +1265,43 @@ all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
return ret; return ret;
} }
/* Called with splitmux lock held before ending a fragment,
* to update the fragment info used for sending fragment opened/closed messages */
static void
update_output_fragment_info (GstSplitMuxSink * splitmux)
{
// Update the fragment output info before finalization
GstClockTime offset =
splitmux->out_fragment_start_runts - splitmux->out_start_runts;
GstClockTime duration = GST_CLOCK_TIME_NONE;
/* Look for the largest duration across all streams */
for (GList * item = splitmux->contexts; item; item = item->next) {
MqStreamCtx *ctx = item->data;
if (ctx->out_running_time_end > splitmux->out_fragment_start_runts) {
GstClockTime ctx_duration =
ctx->out_running_time_end - splitmux->out_fragment_start_runts;
if (!GST_CLOCK_TIME_IS_VALID (duration) || ctx_duration > duration) {
duration = ctx_duration;
}
}
}
GST_LOG_OBJECT (splitmux,
"Updating fragment info with reference TS %" GST_STIME_FORMAT
" with fragment-offset %" GST_TIMEP_FORMAT
" and fragment-duration %" GST_TIMEP_FORMAT,
GST_STIME_ARGS (splitmux->reference_ctx->out_running_time),
&offset, &duration);
splitmux->out_fragment_info.last_running_time =
splitmux->reference_ctx->out_running_time;
splitmux->out_fragment_info.fragment_offset = offset;
splitmux->out_fragment_info.fragment_duration = duration;
}
/* Called with splitmux lock held to check if this output /* Called with splitmux lock held to check if this output
* context needs to sleep to wait for the release of the * context needs to sleep to wait for the release of the
* next GOP, or to send EOS to close out the current file * next GOP, or to send EOS to close out the current file
@ -1286,6 +1338,8 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
GST_STIME_ARGS (my_max_out_running_time)); GST_STIME_ARGS (my_max_out_running_time));
if (can_output) { if (can_output) {
/* Always outputting everything up to the next max_out_running_time
* before advancing the state machine */
if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE && if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
ctx->out_running_time < my_max_out_running_time) { ctx->out_running_time < my_max_out_running_time) {
return GST_FLOW_OK; return GST_FLOW_OK;
@ -1303,14 +1357,18 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
case SPLITMUX_OUTPUT_STATE_ENDING_STREAM: case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
/* We've reached the max out running_time to get here, so end this file now */ /* We've reached the max out running_time to get here, so end this file now */
if (ctx->out_eos == FALSE) { if (ctx->out_eos == FALSE) {
update_output_fragment_info (splitmux);
if (splitmux->async_finalize) { if (splitmux->async_finalize) {
/* For async finalization, we must store the fragment timing /* For async finalization, we must store the fragment timing
* info on the element via qdata, because EOS will be processed * info on the element via qdata, because EOS will be processed
* asynchronously */ * asynchronously */
GstClockTime *sink_running_time = g_new (GstClockTime, 1);
*sink_running_time = splitmux->reference_ctx->out_running_time; OutputFragmentInfo *sink_fragment_info =
g_new (OutputFragmentInfo, 1);
*sink_fragment_info = splitmux->out_fragment_info;
g_object_set_qdata_full (G_OBJECT (splitmux->sink), g_object_set_qdata_full (G_OBJECT (splitmux->sink),
RUNNING_TIME, sink_running_time, g_free); SINK_FRAGMENT_INFO, sink_fragment_info, g_free);
/* We must set EOS asynchronously at this point. We cannot defer /* We must set EOS asynchronously at this point. We cannot defer
* it, because we need all contexts to wake up, for the * it, because we need all contexts to wake up, for the
@ -1333,9 +1391,15 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
send_eos (splitmux, ctx); send_eos (splitmux, ctx);
continue; continue;
} }
} else if (splitmux->output_state ==
SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
GST_LOG_OBJECT (splitmux,
"At end-of-stream state, and context %p is already EOS. Returning.",
ctx);
return GST_FLOW_OK;
} else { } else {
GST_INFO_OBJECT (splitmux, GST_INFO_OBJECT (splitmux,
"At end-of-file state, but context %p is already EOS", ctx); "At end-of-file state, and context %p is already EOS.", ctx);
} }
break; break;
case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE: case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
@ -1721,18 +1785,23 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
locked = TRUE; locked = TRUE;
if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED) if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach; goto beach;
GST_INFO_OBJECT (splitmux,
"Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
ctx->out_eos = TRUE; ctx->out_eos = TRUE;
if (ctx == splitmux->reference_ctx) { if (ctx == splitmux->reference_ctx) {
GST_INFO_OBJECT (splitmux,
"EOS on reference context - ending the recording");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM; splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
update_output_fragment_info (splitmux);
// Waiting before outputting will ensure the muxer end-of-stream // Waiting before outputting will ensure the muxer end-of-stream
// qdata is set without racing against this EOS event reaching the muxer // qdata is set without racing against this EOS event reaching the muxer
wait = TRUE; wait = TRUE;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
} }
GST_INFO_OBJECT (splitmux,
"Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
break; break;
case GST_EVENT_GAP:{ case GST_EVENT_GAP:{
GstClockTime gap_ts; GstClockTime gap_ts;
@ -1822,6 +1891,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
"New caps were not accepted. Switching output file"); "New caps were not accepted. Switching output file");
if (ctx->out_eos == FALSE) { if (ctx->out_eos == FALSE) {
splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE; splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
update_output_fragment_info (splitmux);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
} }
} }
@ -1871,6 +1941,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
splitmux->queued_keyframes--; splitmux->queued_keyframes--;
ctx->out_running_time = buf_info->run_ts; ctx->out_running_time = buf_info->run_ts;
ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info); ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (splitmux, GST_LOG_OBJECT (splitmux,
@ -1884,9 +1955,56 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
splitmux->muxed_out_bytes += buf_info->buf_size; splitmux->muxed_out_bytes += buf_info->buf_size;
if (GST_CLOCK_STIME_IS_VALID (buf_info->run_ts)) {
if (!GST_CLOCK_STIME_IS_VALID (ctx->out_fragment_start_runts)) {
ctx->out_fragment_start_runts = buf_info->run_ts;
/* For the first fragment check if this is the earliest of all start running times */
if (splitmux->fragment_id == 1) {
if (!GST_CLOCK_STIME_IS_VALID (splitmux->out_start_runts)
|| (ctx->out_fragment_start_runts < splitmux->out_start_runts)) {
splitmux->out_start_runts = ctx->out_fragment_start_runts;
GST_LOG_OBJECT (splitmux,
"Overall recording start TS now %" GST_STIMEP_FORMAT,
&splitmux->out_start_runts);
}
}
if (!GST_CLOCK_STIME_IS_VALID (splitmux->out_fragment_start_runts)
|| (ctx->out_fragment_start_runts <
splitmux->out_fragment_start_runts)) {
splitmux->out_fragment_start_runts = ctx->out_fragment_start_runts;
GST_LOG_OBJECT (splitmux,
"Overall fragment start TS now %" GST_STIMEP_FORMAT,
&splitmux->out_fragment_start_runts);
}
GST_LOG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " buffer run TS %" GST_STIME_FORMAT
" is first for this fragment", pad,
GST_STIME_ARGS (ctx->out_fragment_start_runts));
}
/* Extend the context end running time if it grew */
GstClockTime end_run_ts = buf_info->run_ts;
if (GST_CLOCK_TIME_IS_VALID (buf_info->duration)) {
end_run_ts += buf_info->duration;
}
if (!GST_CLOCK_TIME_IS_VALID (ctx->out_running_time_end) ||
end_run_ts > ctx->out_running_time_end) {
ctx->out_running_time_end = end_run_ts;
GstClockTimeDiff duration = end_run_ts - ctx->out_fragment_start_runts;
GST_LOG_OBJECT (splitmux,
"Pad %" GST_PTR_FORMAT " fragment duration now %" GST_STIMEP_FORMAT,
pad, &duration);
}
}
#ifndef GST_DISABLE_GST_DEBUG #ifndef GST_DISABLE_GST_DEBUG
{ {
GstBuffer *buf = gst_pad_probe_info_get_buffer (info); GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf, " run ts %" GST_STIME_FORMAT, buf,
GST_STIME_ARGS (ctx->out_running_time)); GST_STIME_ARGS (ctx->out_running_time));
@ -1947,6 +2065,7 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
/* Clear EOS flag if not actually EOS */ /* Clear EOS flag if not actually EOS */
ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad); ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
ctx->out_eos_async_done = ctx->out_eos; ctx->out_eos_async_done = ctx->out_eos;
ctx->out_fragment_start_runts = GST_CLOCK_STIME_NONE;
gst_object_unref (peer); gst_object_unref (peer);
} }
@ -2162,6 +2281,7 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
GST_SPLITMUX_LOCK (splitmux); GST_SPLITMUX_LOCK (splitmux);
set_next_filename (splitmux, ctx); set_next_filename (splitmux, ctx);
splitmux->muxed_out_bytes = 0; splitmux->muxed_out_bytes = 0;
splitmux->out_fragment_start_runts = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux); GST_SPLITMUX_UNLOCK (splitmux);
if (gst_element_set_state (sink, if (gst_element_set_state (sink,
@ -3595,6 +3715,7 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
" feeds queue pad %" GST_PTR_FORMAT, ret, q_sink); " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
ctx->ctx_id = g_list_length (splitmux->contexts);
splitmux->contexts = g_list_append (splitmux->contexts, ctx); splitmux->contexts = g_list_append (splitmux->contexts, ctx);
g_free (gname); g_free (gname);
@ -4008,6 +4129,9 @@ gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL); g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
g_queue_clear (&splitmux->out_cmd_q); g_queue_clear (&splitmux->out_cmd_q);
splitmux->out_fragment_start_runts = splitmux->out_start_runts =
GST_CLOCK_STIME_NONE;
} }
static GstStateChangeReturn static GstStateChangeReturn

View File

@ -68,6 +68,14 @@ typedef struct _SplitMuxOutputCommand
} release_gop; } release_gop;
} SplitMuxOutputCommand; } SplitMuxOutputCommand;
typedef struct
{
GstClockTime last_running_time;
GstClockTime fragment_offset;
GstClockTime fragment_duration;
} OutputFragmentInfo;
typedef struct _MqStreamBuf typedef struct _MqStreamBuf
{ {
gboolean keyframe; gboolean keyframe;
@ -99,6 +107,7 @@ typedef struct {
typedef struct _MqStreamCtx typedef struct _MqStreamCtx
{ {
GstSplitMuxSink *splitmux; GstSplitMuxSink *splitmux;
guint ctx_id;
guint q_overrun_id; guint q_overrun_id;
guint sink_pad_block_id; guint sink_pad_block_id;
@ -116,9 +125,12 @@ typedef struct _MqStreamCtx
GstSegment in_segment; GstSegment in_segment;
GstSegment out_segment; GstSegment out_segment;
GstClockTimeDiff out_fragment_start_runts;
GstClockTimeDiff in_running_time; GstClockTimeDiff in_running_time;
GstClockTimeDiff out_running_time; GstClockTimeDiff out_running_time;
GstClockTimeDiff out_running_time_end; /* max run ts + durations */
GstElement *q; GstElement *q;
GQueue queued_bufs; GQueue queued_bufs;
@ -128,6 +140,7 @@ typedef struct _MqStreamCtx
GstBuffer *cur_out_buffer; GstBuffer *cur_out_buffer;
GstEvent *pending_gap; GstEvent *pending_gap;
} MqStreamCtx; } MqStreamCtx;
struct _GstSplitMuxSink struct _GstSplitMuxSink
@ -183,11 +196,11 @@ struct _GstSplitMuxSink
* stream in this fragment */ * stream in this fragment */
guint64 fragment_reference_bytes; guint64 fragment_reference_bytes;
/* Minimum start time (PTS or DTS) of the current fragment */ /* Minimum start time (PTS or DTS) of the current fragment (reference stream, input side) */
GstClockTimeDiff fragment_start_time; GstClockTimeDiff fragment_start_time;
/* Start time (PTS) of the current fragment */ /* Start time (PTS) of the current fragment (reference stream, input side) */
GstClockTimeDiff fragment_start_time_pts; GstClockTimeDiff fragment_start_time_pts;
/* Minimum start timecode of the current fragment */ /* Minimum start timecode of the current fragment (reference stream, input side) */
GstVideoTimeCode *fragment_start_tc; GstVideoTimeCode *fragment_start_tc;
/* Oldest GOP at head, newest GOP at tail */ /* Oldest GOP at head, newest GOP at tail */
@ -200,6 +213,12 @@ struct _GstSplitMuxSink
SplitMuxOutputState output_state; SplitMuxOutputState output_state;
GstClockTimeDiff max_out_running_time; GstClockTimeDiff max_out_running_time;
OutputFragmentInfo out_fragment_info;
/* Track the earliest running time (across all inputs) for the first fragment */
GstClockTimeDiff out_start_runts;
/* Track the earliest running time (across all inputs) for the *current* fragment */
GstClockTimeDiff out_fragment_start_runts;
guint64 muxed_out_bytes; guint64 muxed_out_bytes;

View File

@ -103,19 +103,63 @@ dump_error (GstMessage * msg)
} }
static GstMessage * static GstMessage *
run_pipeline (GstElement * pipeline) run_pipeline (GstElement * pipeline, guint num_fragments_expected,
const GstClockTime * fragment_offsets,
const GstClockTime * fragment_durations)
{ {
GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline)); GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline));
GstMessage *msg; GstMessage *msg;
guint fragment_number = 0;
gst_element_set_state (pipeline, GST_STATE_PLAYING); gst_element_set_state (pipeline, GST_STATE_PLAYING);
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); do {
msg =
gst_bus_poll (bus,
GST_MESSAGE_EOS | GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT, -1);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS
|| GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) {
break;
}
if (num_fragments_expected != 0) {
// Handle element message
const GstStructure *s = gst_message_get_structure (msg);
if (gst_structure_has_name (s, "splitmuxsrc-fragment-info") ||
gst_structure_has_name (s, "splitmuxsink-fragment-closed")) {
GstClockTime fragment_offset, fragment_duration;
fail_unless (gst_structure_get_clock_time (s, "fragment-offset",
&fragment_offset));
fail_unless (gst_structure_get_clock_time (s, "fragment-duration",
&fragment_duration));
if (fragment_offsets != NULL) {
fail_unless (fragment_offsets[fragment_number] == fragment_offset,
"Expected offset %" GST_TIME_FORMAT
" for fragment %u. Got offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (fragment_offsets[fragment_number]),
fragment_number, GST_TIME_ARGS (fragment_offset));
}
if (fragment_durations != NULL) {
fail_unless (fragment_durations[fragment_number] == fragment_duration,
"Expected duration %" GST_TIME_FORMAT
" for fragment %u. Got duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (fragment_durations[fragment_number]),
fragment_number, GST_TIME_ARGS (fragment_duration));
}
fragment_number++;
}
}
gst_message_unref (msg);
} while (TRUE);
gst_element_set_state (pipeline, GST_STATE_NULL); gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (bus); gst_object_unref (bus);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
else if (num_fragments_expected != 0) {
// Success. Check we got the expected number of fragment messages
fail_unless (fragment_number == num_fragments_expected);
}
return msg; return msg;
} }
@ -222,7 +266,9 @@ receive_sample (GstAppSink * appsink, gpointer user_data)
static void static void
test_playback (const gchar * in_pattern, GstClockTime exp_first_time, test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
GstClockTime exp_last_time, gboolean test_reverse) GstClockTime exp_last_time, gboolean test_reverse,
guint num_fragments_expected, const GstClockTime * fragment_offsets,
const GstClockTime * fragment_durations)
{ {
GstMessage *msg; GstMessage *msg;
GstElement *pipeline; GstElement *pipeline;
@ -256,7 +302,9 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
/* test forwards */ /* test forwards */
seek_pipeline (pipeline, 1.0, 0, -1); seek_pipeline (pipeline, 1.0, 0, -1);
fail_unless (first_ts == GST_CLOCK_TIME_NONE); fail_unless (first_ts == GST_CLOCK_TIME_NONE);
msg = run_pipeline (pipeline); msg =
run_pipeline (pipeline, num_fragments_expected, fragment_offsets,
fragment_durations);
fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS); fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
gst_message_unref (msg); gst_message_unref (msg);
@ -272,7 +320,9 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
if (test_reverse) { if (test_reverse) {
/* Test backwards */ /* Test backwards */
seek_pipeline (pipeline, -1.0, 0, -1); seek_pipeline (pipeline, -1.0, 0, -1);
msg = run_pipeline (pipeline); msg =
run_pipeline (pipeline, num_fragments_expected, fragment_offsets,
fragment_durations);
fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS); fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
gst_message_unref (msg); gst_message_unref (msg);
/* Check we saw the entire range of values */ /* Check we saw the entire range of values */
@ -394,7 +444,9 @@ GST_START_TEST (test_splitmuxsink)
&location_state, NULL); &location_state, NULL);
gst_object_unref (bus); gst_object_unref (bus);
msg = run_pipeline (pipeline); GstClockTime offsets[] = { 0, GST_SECOND, 2 * GST_SECOND };
GstClockTime durations[] = { GST_SECOND, GST_SECOND, GST_SECOND };
msg = run_pipeline (pipeline, 3, offsets, durations);
/* Clean up the location state */ /* Clean up the location state */
g_free (location_state.current_location); g_free (location_state.current_location);
@ -427,7 +479,7 @@ GST_START_TEST (test_splitmuxsink)
fail_unless (count == 3, "Expected 3 output files, got %d", count); fail_unless (count == 3, "Expected 3 output files, got %d", count);
in_pattern = g_build_filename (tmpdir, "out*.ogg", NULL); in_pattern = g_build_filename (tmpdir, "out*.ogg", NULL);
test_playback (in_pattern, 0, 3 * GST_SECOND, TRUE); test_playback (in_pattern, 0, 3 * GST_SECOND, TRUE, 3, offsets, durations);
g_free (in_pattern); g_free (in_pattern);
} }
@ -458,7 +510,7 @@ GST_START_TEST (test_splitmuxsink_clean_failure)
g_object_set (sink, "sink", fakesink, NULL); g_object_set (sink, "sink", fakesink, NULL);
gst_object_unref (sink); gst_object_unref (sink);
msg = run_pipeline (pipeline); msg = run_pipeline (pipeline, 0, NULL, NULL);
fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR); fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR);
gst_message_unref (msg); gst_message_unref (msg);
@ -501,7 +553,9 @@ GST_START_TEST (test_splitmuxsink_multivid)
g_free (dest_pattern); g_free (dest_pattern);
g_object_unref (sink); g_object_unref (sink);
msg = run_pipeline (pipeline); GstClockTime offsets[] = { 0, GST_SECOND, 2 * GST_SECOND };
GstClockTime durations[] = { GST_SECOND, GST_SECOND, GST_SECOND };
msg = run_pipeline (pipeline, 3, offsets, durations);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
@ -519,7 +573,7 @@ GST_START_TEST (test_splitmuxsink_multivid)
* written, and causes test failures like buffers being output * written, and causes test failures like buffers being output
* multiple times by qtdemux as it loops through GOPs. Disable that * multiple times by qtdemux as it loops through GOPs. Disable that
* for now */ * for now */
test_playback (in_pattern, 0, 3 * GST_SECOND, FALSE); test_playback (in_pattern, 0, 3 * GST_SECOND, FALSE, 3, offsets, durations);
g_free (in_pattern); g_free (in_pattern);
} }
@ -553,7 +607,9 @@ GST_START_TEST (test_splitmuxsink_async)
g_free (dest_pattern); g_free (dest_pattern);
g_object_unref (sink); g_object_unref (sink);
msg = run_pipeline (pipeline); GstClockTime offsets[] = { 0, GST_SECOND, 2 * GST_SECOND };
GstClockTime durations[] = { GST_SECOND, GST_SECOND, GST_SECOND };
msg = run_pipeline (pipeline, 3, offsets, durations);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
@ -583,7 +639,7 @@ GST_START_TEST (test_splitmuxsink_async)
fail_unless (count == 3, "Expected 3 output files, got %d", count); fail_unless (count == 3, "Expected 3 output files, got %d", count);
in_pattern = g_build_filename (tmpdir, "matroska*.mkv", NULL); in_pattern = g_build_filename (tmpdir, "matroska*.mkv", NULL);
test_playback (in_pattern, 0, 3 * GST_SECOND, TRUE); test_playback (in_pattern, 0, 3 * GST_SECOND, TRUE, 3, offsets, durations);
g_free (in_pattern); g_free (in_pattern);
} }
@ -690,7 +746,7 @@ run_eos_pipeline (guint num_video_buf, guint num_audio_buf,
fail_if (pipeline == NULL); fail_if (pipeline == NULL);
msg = run_pipeline (pipeline); msg = run_pipeline (pipeline, 0, NULL, NULL);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
@ -778,7 +834,7 @@ splitmuxsink_split_by_keyframe (gboolean send_keyframe_request,
gst_object_unref (srcpad); gst_object_unref (srcpad);
gst_object_unref (enc); gst_object_unref (enc);
msg = run_pipeline (pipeline); msg = run_pipeline (pipeline, 0, 0, NULL);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
@ -810,7 +866,7 @@ splitmuxsink_split_by_keyframe (gboolean send_keyframe_request,
* written, and causes test failures like buffers being output * written, and causes test failures like buffers being output
* multiple times by qtdemux as it loops through GOPs. Disable that * multiple times by qtdemux as it loops through GOPs. Disable that
* for now */ * for now */
test_playback (in_pattern, 0, 6 * GST_SECOND, FALSE); test_playback (in_pattern, 0, 6 * GST_SECOND, FALSE, 0, NULL, NULL);
g_free (in_pattern); g_free (in_pattern);
} }

View File

@ -103,19 +103,63 @@ dump_error (GstMessage * msg)
} }
static GstMessage * static GstMessage *
run_pipeline (GstElement * pipeline) run_pipeline (GstElement * pipeline, guint num_fragments_expected,
const GstClockTime * fragment_offsets,
const GstClockTime * fragment_durations)
{ {
GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline)); GstBus *bus = gst_element_get_bus (GST_ELEMENT (pipeline));
GstMessage *msg; GstMessage *msg;
guint fragment_number = 0;
gst_element_set_state (pipeline, GST_STATE_PLAYING); gst_element_set_state (pipeline, GST_STATE_PLAYING);
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1); do {
msg =
gst_bus_poll (bus,
GST_MESSAGE_EOS | GST_MESSAGE_ERROR | GST_MESSAGE_ELEMENT, -1);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS
|| GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) {
break;
}
if (num_fragments_expected != 0) {
// Handle element message
const GstStructure *s = gst_message_get_structure (msg);
if (gst_structure_has_name (s, "splitmuxsrc-fragment-info") ||
gst_structure_has_name (s, "splitmuxsink-fragment-closed")) {
GstClockTime fragment_offset, fragment_duration;
fail_unless (gst_structure_get_clock_time (s, "fragment-offset",
&fragment_offset));
fail_unless (gst_structure_get_clock_time (s, "fragment-duration",
&fragment_duration));
if (fragment_offsets != NULL) {
fail_unless (fragment_offsets[fragment_number] == fragment_offset,
"Expected offset %" GST_TIME_FORMAT
" for fragment %u. Got offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (fragment_offsets[fragment_number]),
fragment_number, GST_TIME_ARGS (fragment_offset));
}
if (fragment_durations != NULL) {
fail_unless (fragment_durations[fragment_number] == fragment_duration,
"Expected duration %" GST_TIME_FORMAT
" for fragment %u. Got duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (fragment_durations[fragment_number]),
fragment_number, GST_TIME_ARGS (fragment_duration));
}
fragment_number++;
}
}
gst_message_unref (msg);
} while (TRUE);
gst_element_set_state (pipeline, GST_STATE_NULL); gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (bus); gst_object_unref (bus);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
else if (num_fragments_expected != 0) {
// Success. Check we got the expected number of fragment messages
fail_unless (fragment_number == num_fragments_expected);
}
return msg; return msg;
} }
@ -216,7 +260,9 @@ receive_sample (GstAppSink * appsink, gpointer user_data G_GNUC_UNUSED)
static void static void
test_playback (const gchar * in_pattern, GstClockTime exp_first_time, test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
GstClockTime exp_last_time, gboolean test_reverse) GstClockTime exp_last_time, gboolean test_reverse,
guint num_fragments_expected, const GstClockTime * fragment_offsets,
const GstClockTime * fragment_durations)
{ {
GstMessage *msg; GstMessage *msg;
GstElement *pipeline; GstElement *pipeline;
@ -250,7 +296,9 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
/* test forwards */ /* test forwards */
seek_pipeline (pipeline, 1.0, 0, -1); seek_pipeline (pipeline, 1.0, 0, -1);
fail_unless (first_ts == GST_CLOCK_TIME_NONE); fail_unless (first_ts == GST_CLOCK_TIME_NONE);
msg = run_pipeline (pipeline); msg =
run_pipeline (pipeline, num_fragments_expected, fragment_offsets,
fragment_durations);
fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS); fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
gst_message_unref (msg); gst_message_unref (msg);
@ -266,7 +314,9 @@ test_playback (const gchar * in_pattern, GstClockTime exp_first_time,
if (test_reverse) { if (test_reverse) {
/* Test backwards */ /* Test backwards */
seek_pipeline (pipeline, -1.0, 0, -1); seek_pipeline (pipeline, -1.0, 0, -1);
msg = run_pipeline (pipeline); msg =
run_pipeline (pipeline, num_fragments_expected, fragment_offsets,
fragment_durations);
fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS); fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
gst_message_unref (msg); gst_message_unref (msg);
/* Check we saw the entire range of values */ /* Check we saw the entire range of values */
@ -287,7 +337,10 @@ GST_START_TEST (test_splitmuxsrc)
{ {
gchar *in_pattern = gchar *in_pattern =
g_build_filename (GST_TEST_FILES_PATH, "splitvideo*.ogg", NULL); g_build_filename (GST_TEST_FILES_PATH, "splitvideo*.ogg", NULL);
test_playback (in_pattern, 0, 3 * GST_SECOND, TRUE);
GstClockTime offsets[] = { 0, GST_SECOND, 2 * GST_SECOND };
GstClockTime durations[] = { GST_SECOND, GST_SECOND, GST_SECOND };
test_playback (in_pattern, 0, 3 * GST_SECOND, TRUE, 3, offsets, durations);
g_free (in_pattern); g_free (in_pattern);
} }
@ -320,7 +373,7 @@ GST_START_TEST (test_splitmuxsrc_format_location)
(GCallback) src_format_location_cb, NULL); (GCallback) src_format_location_cb, NULL);
g_object_unref (src); g_object_unref (src);
msg = run_pipeline (pipeline); msg = run_pipeline (pipeline, 0, NULL, NULL);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
@ -533,7 +586,14 @@ GST_START_TEST (test_splitmuxsrc_sparse_streams)
(GCallback) new_sample_verify_1sec_offset, &tsink_prev_ts); (GCallback) new_sample_verify_1sec_offset, &tsink_prev_ts);
g_clear_object (&element); g_clear_object (&element);
msg = run_pipeline (pipeline); /* Vorbis packet sizes cause some slightly strange fragment sizes */
GstClockTime offsets[] = { 0, 999666666, 2 * (GstClockTime) 999666666,
3 * (GstClockTime) 999666666, 4 * (GstClockTime) 999666666
};
GstClockTime durations[] =
{ 1017600000, GST_SECOND, GST_SECOND, GST_SECOND, 1107200000 };
msg = run_pipeline (pipeline, 5, offsets, durations);
} }
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
@ -608,7 +668,10 @@ GST_START_TEST (test_splitmuxsrc_caps_change)
gst_object_unref (sinkpad); gst_object_unref (sinkpad);
gst_object_unref (cf); gst_object_unref (cf);
msg = run_pipeline (pipeline); GstClockTime offsets[] = { 0, GST_SECOND / 2 };
GstClockTime durations[] = { GST_SECOND / 2, GST_SECOND / 2 };
msg = run_pipeline (pipeline, 2, offsets, durations);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
@ -621,7 +684,7 @@ GST_START_TEST (test_splitmuxsrc_caps_change)
fail_unless (count == 2, "Expected 2 output files, got %d", count); fail_unless (count == 2, "Expected 2 output files, got %d", count);
in_pattern = g_build_filename (tmpdir, "out*.mp4", NULL); in_pattern = g_build_filename (tmpdir, "out*.mp4", NULL);
test_playback (in_pattern, 0, GST_SECOND, TRUE); test_playback (in_pattern, 0, GST_SECOND, TRUE, 2, offsets, durations);
g_free (in_pattern); g_free (in_pattern);
} }
@ -654,7 +717,7 @@ GST_START_TEST (test_splitmuxsrc_robust_mux)
g_free (dest_pattern); g_free (dest_pattern);
g_object_unref (sink); g_object_unref (sink);
msg = run_pipeline (pipeline); msg = run_pipeline (pipeline, 0, NULL, NULL);
if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR)
dump_error (msg); dump_error (msg);
@ -668,7 +731,7 @@ GST_START_TEST (test_splitmuxsrc_robust_mux)
* reserved duration property. All we care about is that the muxing didn't fail because space ran out */ * reserved duration property. All we care about is that the muxing didn't fail because space ran out */
in_pattern = g_build_filename (tmpdir, "out*.mp4", NULL); in_pattern = g_build_filename (tmpdir, "out*.mp4", NULL);
test_playback (in_pattern, 0, GST_SECOND, TRUE); test_playback (in_pattern, 0, GST_SECOND, TRUE, 0, NULL, NULL);
g_free (in_pattern); g_free (in_pattern);
} }

View File

@ -1,3 +1,9 @@
executable('splitmuxsink-fragment-info', 'splitmuxsink-fragment-info.c',
dependencies: [gst_dep],
c_args : gst_plugins_good_args,
include_directories : [configinc],
install: false)
executable('splitmuxsrc-extract', 'splitmuxsrc-extract.c', executable('splitmuxsrc-extract', 'splitmuxsrc-extract.c',
dependencies: [gst_dep], dependencies: [gst_dep],
c_args : gst_plugins_good_args, c_args : gst_plugins_good_args,

View File

@ -0,0 +1,125 @@
/* GStreamer
* Copyright (C) 2024 Jan Schmidt <jan@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/*
* This example creates a test recording using splitmuxsink,
* listening for the fragment-closed messages from splitmuxsink
* and writing a CSV file with the fragment offsets and durations
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <gst/gst.h>
GMainLoop *loop;
FILE *out_csv = NULL;
static gboolean
message_handler (GstBus * bus, GstMessage * message, gpointer data)
{
if (message->type == GST_MESSAGE_ELEMENT) {
const GstStructure *s = gst_message_get_structure (message);
const gchar *name = gst_structure_get_name (s);
if (strcmp (name, "splitmuxsink-fragment-closed") == 0) {
const gchar *fname = gst_structure_get_string (s, "location");
GstClockTime start_offset, duration;
if (!gst_structure_get_uint64 (s, "fragment-offset", &start_offset) ||
!gst_structure_get_uint64 (s, "fragment-duration", &duration)) {
g_assert_not_reached ();
}
fprintf (out_csv, "\"%s\",%" G_GUINT64_FORMAT ",%" G_GUINT64_FORMAT "\n",
fname, start_offset, duration);
}
} else if (message->type == GST_MESSAGE_EOS) {
g_main_loop_quit (loop);
} else if (message->type == GST_MESSAGE_ERROR) {
GError *err;
gchar *debug_info;
gst_message_parse_error (message, &err, &debug_info);
g_printerr ("Error received from element %s: %s\n",
GST_OBJECT_NAME (message->src), err->message);
g_printerr ("Debugging information: %s\n",
debug_info ? debug_info : "none");
g_main_loop_quit (loop);
}
return TRUE;
}
int
main (int argc, char *argv[])
{
GstElement *pipe;
GstBus *bus;
gst_init (&argc, &argv);
if (argc < 3) {
g_printerr
("Usage: %s target_dir out.csv\n Pass splitmuxsink target directory for generated recording, and out.csv to receive the fragment info\n",
argv[0]);
return 1;
}
out_csv = fopen (argv[2], "w");
if (out_csv == NULL) {
g_printerr ("Failed to open output file %s", argv[2]);
return 2;
}
GError *error = NULL;
pipe =
gst_parse_launch
("videotestsrc num-buffers=300 ! video/x-raw,framerate=30/1 ! timeoverlay ! x264enc key-int-max=30 ! "
"h264parse ! queue ! splitmuxsink name=sink "
"audiotestsrc samplesperbuffer=1600 num-buffers=300 ! audio/x-raw,rate=48000 ! opusenc ! queue ! sink.audio_0 ",
&error);
if (pipe == NULL || error != NULL) {
g_print ("Failed to create pipeline. Error %s\n", error->message);
return 3;
}
GstElement *splitmuxsink = gst_bin_get_by_name (GST_BIN (pipe), "sink");
/* Set the files glob on src */
gchar *file_pattern = g_strdup_printf ("%s/test%%05d.mp4", argv[1]);
g_object_set (splitmuxsink, "location", file_pattern, NULL);
g_object_set (splitmuxsink, "max-size-time", GST_SECOND, NULL);
gst_object_unref (splitmuxsink);
bus = gst_element_get_bus (pipe);
gst_bus_add_watch (bus, message_handler, NULL);
gst_object_unref (bus);
gst_element_set_state (pipe, GST_STATE_PLAYING);
loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (loop);
fclose (out_csv);
gst_element_set_state (pipe, GST_STATE_NULL);
gst_object_unref (pipe);
return 0;
}