From 4316be657102fb9d00388339f863db79cf297f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 21 Sep 2021 13:37:35 +0300 Subject: [PATCH] splitmuxsink: Keep track of the pending input GOPs in a queue This cleans up input GOP handling and makes it possible to handle more than 2 pending GOPs, which could happen before if keyframes are arriving with too short of a distance between them. Part-of: --- .../gst/multifile/gstsplitmuxsink.c | 499 ++++++++++-------- .../gst/multifile/gstsplitmuxsink.h | 57 +- 2 files changed, 316 insertions(+), 240 deletions(-) diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c index 6063b43953..05bf741a86 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.c @@ -244,7 +244,10 @@ static GstElement *create_element (GstSplitMuxSink * splitmux, const gchar * factory, const gchar * name, gboolean locked); static void do_async_done (GstSplitMuxSink * splitmux); -static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux); + +static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux, + const GstVideoTimeCode * cur_tc, GstClockTime running_time, + GstVideoTimeCode ** next_tc); static MqStreamBuf * mq_stream_buf_new (void) @@ -270,6 +273,13 @@ out_cmd_buf_free (SplitMuxOutputCommand * data) g_slice_free (SplitMuxOutputCommand, data); } +static void +input_gop_free (InputGop * gop) +{ + g_clear_pointer (&gop->start_tc, gst_video_time_code_free); + g_slice_free (InputGop, gop); +} + static void gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) { @@ -600,7 +610,6 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux) splitmux->reset_muxer = DEFAULT_RESET_MUXER; splitmux->threshold_timecode_str = NULL; - gst_splitmux_reset_timecode (splitmux); splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE; splitmux->muxer_factory = g_strdup (DEFAULT_MUXER); @@ -613,6 +622,8 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux) splitmux->do_split_next_gop = FALSE; splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8); splitmux->next_fku_time = GST_CLOCK_TIME_NONE; + + g_queue_init (&splitmux->pending_input_gops); } static void @@ -632,15 +643,6 @@ gst_splitmux_reset_elements (GstSplitMuxSink * splitmux) splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL; } -static void -gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux) -{ - g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free); - g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free); - g_clear_pointer (&splitmux->next_gop_start_tc, gst_video_time_code_free); - splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE; -} - static void gst_splitmux_sink_dispose (GObject * object) { @@ -656,12 +658,17 @@ static void gst_splitmux_sink_finalize (GObject * object) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); + g_cond_clear (&splitmux->input_cond); g_cond_clear (&splitmux->output_cond); g_mutex_clear (&splitmux->lock); g_mutex_clear (&splitmux->state_lock); g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL); g_queue_clear (&splitmux->out_cmd_q); + g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL); + g_queue_clear (&splitmux->pending_input_gops); + + g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free); if (splitmux->muxerpad_map) gst_structure_free (splitmux->muxerpad_map); @@ -698,7 +705,6 @@ gst_splitmux_sink_finalize (GObject * object) * because the dispose will have freed all request pads though */ g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL); g_list_free (splitmux->contexts); - gst_splitmux_reset_timecode (splitmux); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -773,7 +779,6 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id, /* will be calculated later */ g_clear_pointer (&splitmux->tc_interval, gst_video_time_code_interval_free); - gst_splitmux_reset_timecode (splitmux); splitmux->threshold_timecode_str = g_value_dup_string (value); if (splitmux->threshold_timecode_str) { @@ -787,6 +792,12 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id, splitmux->threshold_timecode_str = NULL; } } + splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE; + if (splitmux->tc_interval && splitmux->fragment_start_tc) { + splitmux->next_fragment_start_tc_time = + calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc, + splitmux->fragment_start_time, NULL); + } GST_OBJECT_UNLOCK (splitmux); break; case PROP_SEND_KEYFRAME_REQUESTS: @@ -1435,9 +1446,24 @@ calculate_next_max_timecode (GstSplitMuxSink * splitmux, next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time; } - GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT - " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time), - GST_TIME_ARGS (cur_tc_time)); +#ifndef GST_DISABLE_GST_DEBUG + { + gchar *next_max_tc_str, *cur_tc_str; + + cur_tc_str = gst_video_time_code_to_string (cur_tc); + next_max_tc_str = gst_video_time_code_to_string (target_tc); + + GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT + " from ref timecode %s time: %" GST_TIME_FORMAT, + next_max_tc_str, + GST_TIME_ARGS (next_max_tc_time), + cur_tc_str, GST_TIME_ARGS (cur_tc_time)); + + g_free (next_max_tc_str); + g_free (cur_tc_str); + } +#endif + if (next_tc) *next_tc = target_tc; else @@ -1457,41 +1483,51 @@ request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer, GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE; GstClockTime next_fku_time = GST_CLOCK_TIME_NONE; GstClockTime tc_rounding_error = 5 * GST_USECOND; - - GstClockTimeDiff gop_start_time, gop_start_time_pts; - const GstVideoTimeCode *gop_start_tc; - - if (splitmux->next_gop_start_time != GST_CLOCK_STIME_NONE) { - GST_DEBUG_OBJECT (splitmux, "Using next GOP"); - gop_start_time = splitmux->next_gop_start_time; - gop_start_time_pts = splitmux->next_gop_start_time_pts; - gop_start_tc = splitmux->next_gop_start_tc; - } else { - GST_DEBUG_OBJECT (splitmux, "Using current GOP"); - gop_start_time = splitmux->gop_start_time; - gop_start_time_pts = splitmux->gop_start_time_pts; - gop_start_tc = splitmux->gop_start_tc; - } + InputGop *newest_gop = NULL; + GList *l; if (!splitmux->send_keyframe_requests) return TRUE; - if (running_time_dts != GST_CLOCK_STIME_NONE - && gop_start_time_pts != GST_CLOCK_STIME_NONE - && running_time_dts < gop_start_time_pts) { - GST_DEBUG_OBJECT (splitmux, - "Waiting until DTS (%" GST_STIME_FORMAT - ") has passed GOP start PTS (%" GST_STIME_FORMAT ")", - GST_STIME_ARGS (running_time_dts), GST_STIME_ARGS (gop_start_time_pts)); + /* Find the newest GOP where we passed in DTS the start PTS */ + for (l = splitmux->pending_input_gops.tail; l; l = l->prev) { + InputGop *tmp = l->data; + + GST_TRACE_OBJECT (splitmux, + "Having pending input GOP with start PTS %" GST_STIME_FORMAT + " and start time %" GST_STIME_FORMAT, + GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time)); + + if (tmp->sent_fku) { + GST_DEBUG_OBJECT (splitmux, + "Already checked for a keyframe request for this GOP"); + return TRUE; + } + + if (running_time_dts == GST_CLOCK_STIME_NONE || + tmp->start_time_pts == GST_CLOCK_STIME_NONE || + running_time_dts >= tmp->start_time_pts) { + GST_DEBUG_OBJECT (splitmux, + "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %" + GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts), + GST_STIME_ARGS (tmp->start_time)); + newest_gop = tmp; + break; + } + } + + if (!newest_gop) { + GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP"); return TRUE; } if (splitmux->tc_interval) { - if (gop_start_tc && gst_video_time_code_is_valid (gop_start_tc)) { + if (newest_gop->start_tc + && gst_video_time_code_is_valid (newest_gop->start_tc)) { GstVideoTimeCode *next_tc = NULL; max_tc_time = - calculate_next_max_timecode (splitmux, gop_start_tc, - gop_start_time, &next_tc); + calculate_next_max_timecode (splitmux, newest_gop->start_tc, + newest_gop->start_time, &next_tc); /* calculate the next expected keyframe time to prevent too early fku * event */ @@ -1538,7 +1574,7 @@ request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer, next_fku_time = 0; } } else { - target_time = gop_start_time + splitmux->threshold_time; + target_time = newest_gop->start_time + splitmux->threshold_time; } if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) { @@ -1579,13 +1615,15 @@ request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer, next_fku_time = target_time + splitmux->threshold_time; } - splitmux->next_fku_time = next_fku_time; - - ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0); GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT ", the next expected keyframe request time is %" GST_TIME_FORMAT, GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time)); + newest_gop->sent_fku = TRUE; + + splitmux->next_fku_time = next_fku_time; + ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0); + return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev); } @@ -2293,6 +2331,7 @@ need_new_fragment (GstSplitMuxSink * splitmux, gboolean check_robust_muxing; GstClockTime time_to_split = GST_CLOCK_TIME_NONE; GstClockTime *ptr_to_time; + const InputGop *gop, *next_gop; GST_OBJECT_LOCK (splitmux); thresh_bytes = splitmux->threshold_bytes; @@ -2320,8 +2359,14 @@ need_new_fragment (GstSplitMuxSink * splitmux, return TRUE; } + gop = g_queue_peek_head (&splitmux->pending_input_gops); + /* We need a full GOP queued up at this point */ + g_assert (gop != NULL); + next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1); + /* And the beginning of the next GOP or otherwise EOS */ + /* User told us to split at this running time */ - if (splitmux->gop_start_time >= time_to_split) { + if (gop->start_time >= time_to_split) { GST_OBJECT_LOCK (splitmux); /* Dequeue running time */ gst_queue_array_pop_head_struct (splitmux->times_to_split); @@ -2329,7 +2374,7 @@ need_new_fragment (GstSplitMuxSink * splitmux, ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split); while (ptr_to_time) { time_to_split = *ptr_to_time; - if (splitmux->gop_start_time < time_to_split) { + if (gop->start_time < time_to_split) { break; } gst_queue_array_pop_head_struct (splitmux->times_to_split); @@ -2337,7 +2382,7 @@ need_new_fragment (GstSplitMuxSink * splitmux, } GST_TRACE_OBJECT (splitmux, "GOP start time %" GST_STIME_FORMAT " is after requested split point %" - GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->gop_start_time), + GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time), GST_STIME_ARGS (time_to_split)); GST_OBJECT_UNLOCK (splitmux); return TRUE; @@ -2356,17 +2401,20 @@ need_new_fragment (GstSplitMuxSink * splitmux, return TRUE; /* Would overrun time limit */ } - if (splitmux->tc_interval && - GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) && - GST_CLOCK_STIME_IS_VALID (splitmux->next_gop_start_time) && - splitmux->next_gop_start_time > - splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) { - GST_TRACE_OBJECT (splitmux, - "in running time %" GST_STIME_FORMAT " overruns time limit %" - GST_TIME_FORMAT, - GST_STIME_ARGS (splitmux->next_gop_start_time), - GST_TIME_ARGS (splitmux->next_fragment_start_tc_time)); - return TRUE; + if (splitmux->tc_interval) { + GstClockTime next_gop_start_time = + next_gop ? next_gop->start_time : splitmux->max_in_running_time; + + if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) && + GST_CLOCK_STIME_IS_VALID (next_gop_start_time) && + next_gop_start_time > + splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) { + GST_TRACE_OBJECT (splitmux, + "in running time %" GST_STIME_FORMAT " overruns time limit %" + GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time), + GST_TIME_ARGS (splitmux->next_fragment_start_tc_time)); + return TRUE; + } } if (check_robust_muxing) { @@ -2421,12 +2469,12 @@ video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc) * go to COLLECTING_GOP_START state */ static void -handle_gathered_gop (GstSplitMuxSink * splitmux) +handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop, + GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time) { guint64 queued_bytes; GstClockTimeDiff queued_time = 0; GstClockTimeDiff queued_gop_time = 0; - GstClockTimeDiff new_out_ts = splitmux->next_gop_start_time; SplitMuxOutputCommand *cmd; /* Assess if the multiqueue contents overflowed the current file */ @@ -2435,25 +2483,21 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) * reference stream. Other streams might have run ahead a little bit, * but extra pieces won't be released to the muxer beyond the reference * stream cut-off anyway - so it forms the limit. */ - queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes; - queued_time = - (splitmux->next_gop_start_time == - GST_CLOCK_STIME_NONE) ? splitmux-> - reference_ctx->in_running_time : splitmux->next_gop_start_time; + queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes; + queued_time = next_gop_start_time; /* queued_gop_time tracks how much unwritten data there is waiting to * be written to this fragment including this GOP */ if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE) queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time; else - queued_gop_time = queued_time - splitmux->gop_start_time; + queued_gop_time = queued_time - gop->start_time; GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes); GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT " gop start time %" GST_STIME_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes, - GST_STIME_ARGS (splitmux->next_gop_start_time), - GST_STIME_ARGS (splitmux->gop_start_time)); + GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time)); if (queued_gop_time < 0) goto error_gop_duration; @@ -2482,38 +2526,33 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) GST_INFO_OBJECT (splitmux, "This GOP (dur %" GST_STIME_FORMAT ") would overflow the fragment, Sending start_new_fragment cmd", - GST_STIME_ARGS (splitmux->reference_ctx->in_running_time - - splitmux->gop_start_time)); + GST_STIME_ARGS (queued_gop_time)); cmd = out_cmd_buf_new (); cmd->start_new_fragment = TRUE; g_queue_push_head (&splitmux->out_cmd_q, cmd); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); - splitmux->fragment_start_time = splitmux->gop_start_time; - splitmux->fragment_start_time_pts = splitmux->gop_start_time_pts; + splitmux->fragment_start_time = gop->start_time; + splitmux->fragment_start_time_pts = gop->start_time_pts; splitmux->fragment_total_bytes = 0; splitmux->fragment_reference_bytes = 0; - video_time_code_replace (&splitmux->fragment_start_tc, - splitmux->gop_start_tc); + video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc); splitmux->next_fragment_start_tc_time = calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc, splitmux->fragment_start_time, NULL); if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) { GST_WARNING_OBJECT (splitmux, "Couldn't calculate next fragment start time for timecode mode"); - /* shouldn't happen, but reset all and try again with next buffers */ - gst_splitmux_reset_timecode (splitmux); } } /* And set up to collect the next GOP */ - if (!splitmux->reference_ctx->in_eos) { + if (max_out_running_time != G_MAXINT64) { splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START; } else { /* This is probably already the current state, but just in case: */ splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP; - new_out_ts = G_MAXINT64; /* EOS runs until forever */ } /* And wake all input contexts to send a wake-up event */ @@ -2521,19 +2560,10 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) GST_SPLITMUX_BROADCAST_INPUT (splitmux); /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */ - splitmux->fragment_total_bytes += splitmux->gop_total_bytes; - splitmux->fragment_reference_bytes += splitmux->gop_reference_bytes; + splitmux->fragment_total_bytes += gop->total_bytes; + splitmux->fragment_reference_bytes += gop->reference_bytes; - splitmux->gop_start_time = splitmux->next_gop_start_time; - splitmux->gop_start_time_pts = splitmux->next_gop_start_time_pts; - video_time_code_replace (&splitmux->gop_start_tc, - splitmux->next_gop_start_tc); - - splitmux->next_gop_start_time = GST_CLOCK_STIME_NONE; - splitmux->next_gop_start_time_pts = GST_CLOCK_STIME_NONE; - video_time_code_replace (&splitmux->next_gop_start_tc, NULL); - - if (splitmux->gop_total_bytes > 0) { + if (gop->total_bytes > 0) { GST_LOG_OBJECT (splitmux, "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT " time %" GST_STIME_FORMAT, @@ -2542,19 +2572,14 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) /* Send this GOP to the output command queue */ cmd = out_cmd_buf_new (); cmd->start_new_fragment = FALSE; - cmd->max_output_ts = new_out_ts; + cmd->max_output_ts = max_out_running_time; GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %" - GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts)); + GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time)); g_queue_push_head (&splitmux->out_cmd_q, cmd); GST_SPLITMUX_BROADCAST_OUTPUT (splitmux); } - splitmux->gop_total_bytes = splitmux->next_gop_total_bytes; - splitmux->gop_reference_bytes = splitmux->next_gop_reference_bytes; - - splitmux->next_gop_total_bytes = 0; - splitmux->next_gop_reference_bytes = 0; return; error_gop_duration: @@ -2609,14 +2634,59 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) } do { + GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE; + if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) { + GstClockTimeDiff max_out_running_time; gboolean ready = TRUE; + InputGop *gop; + const InputGop *next_gop; + + gop = g_queue_peek_head (&splitmux->pending_input_gops); + next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1); + + /* If we have no GOP or no next GOP here then the reference context is + * at EOS, otherwise use the start time of the next GOP if we're far + * enough in the GOP to know it */ + if (gop && next_gop) { + if (!splitmux->reference_ctx->in_eos + && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE + && splitmux->max_in_running_time_dts < next_gop->start_time_pts) { + GST_LOG_OBJECT (splitmux, + "No further GOPs finished collecting, waiting until current DTS %" + GST_STIME_FORMAT " has passed next GOP start PTS %" + GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_in_running_time_dts), + GST_STIME_ARGS (next_gop->start_time_pts)); + break; + } + + GST_LOG_OBJECT (splitmux, + "Finished collecting GOP with start time %" GST_STIME_FORMAT + ", next GOP start time %" GST_STIME_FORMAT, + GST_STIME_ARGS (gop->start_time), + GST_STIME_ARGS (next_gop->start_time)); + next_gop_start = next_gop->start_time; + max_out_running_time = + splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time; + } else if (!next_gop) { + GST_LOG_OBJECT (splitmux, "Reference context is EOS"); + next_gop_start = splitmux->max_in_running_time; + max_out_running_time = G_MAXINT64; + } else if (!gop) { + GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting"); + break; + } else { + g_assert_not_reached (); + } + + g_assert (gop != NULL); /* Iterate each pad, and check that the input running time is at least - * up to the reference running time, and if so handle the collected GOP */ - GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" - GST_STIME_FORMAT " ctx %p", - GST_STIME_ARGS (splitmux->max_in_running_time), ctx); + * up to the start running time of the next GOP or EOS, and if so handle + * the collected GOP */ + GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %" + GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx); for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); @@ -2626,9 +2696,8 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) " EOS %d", tmpctx, tmpctx->sinkpad, GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); - if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE && - tmpctx->in_running_time < splitmux->max_in_running_time && - !tmpctx->in_eos) { + if (next_gop_start != GST_CLOCK_STIME_NONE && + tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) { GST_LOG_OBJECT (splitmux, "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep", tmpctx, tmpctx->sinkpad); @@ -2640,7 +2709,11 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_DEBUG_OBJECT (splitmux, "Collected GOP is complete. Processing (ctx %p)", ctx); /* All pads have a complete GOP, release it into the multiqueue */ - handle_gathered_gop (splitmux); + handle_gathered_gop (splitmux, gop, next_gop_start, + max_out_running_time); + + g_queue_pop_head (&splitmux->pending_input_gops); + input_gop_free (gop); /* The user has requested a split, we can split now that the previous GOP * has been collected to the correct location */ @@ -2658,8 +2731,8 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT && !ctx->flushing && - (ctx->in_running_time >= splitmux->max_in_running_time) && - (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) { + ctx->in_running_time >= next_gop_start && + next_gop_start != GST_CLOCK_STIME_NONE) { /* Some pad is not yet ready, or GOP is being pushed * either way, sleep and wait to get woken */ GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx); @@ -2767,14 +2840,24 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) * so that we start counting overflow from the first frame */ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) splitmux->max_in_running_time = rtime; + if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts)) + splitmux->max_in_running_time_dts = rtime; } /* Similarly take it as fragment start PTS and GOP start time if * these are not set */ if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts)) splitmux->fragment_start_time_pts = rtime; - if (!GST_CLOCK_STIME_IS_VALID (splitmux->gop_start_time)) - splitmux->gop_start_time = rtime; + + if (g_queue_is_empty (&splitmux->pending_input_gops)) { + InputGop *gop = g_slice_new0 (InputGop); + + gop->from_gap = TRUE; + gop->start_time = rtime; + gop->start_time_pts = rtime; + + g_queue_push_tail (&splitmux->pending_input_gops, gop); + } } GST_SPLITMUX_UNLOCK (splitmux); @@ -2855,6 +2938,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) buf_info->duration = GST_BUFFER_DURATION (buf); if (ctx->is_reference) { + InputGop *gop = NULL; GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf); /* initialize fragment_start_time if it was not set yet (i.e. for the @@ -2876,6 +2960,9 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) || splitmux->max_in_running_time < splitmux->fragment_start_time) splitmux->max_in_running_time = splitmux->fragment_start_time; + if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts)) + splitmux->max_in_running_time_dts = running_time_dts; + if (tc_meta) { video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc); @@ -2889,80 +2976,74 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) tc_str = gst_video_time_code_to_string (&tc_meta->tc); GST_DEBUG_OBJECT (splitmux, - "Initialize next fragment start tc %s time %" GST_TIME_FORMAT, - tc_str, GST_TIME_ARGS (splitmux->next_fragment_start_tc_time)); + "Initialize fragment start timecode %s, next fragment start timecode time %" + GST_TIME_FORMAT, tc_str, + GST_TIME_ARGS (splitmux->next_fragment_start_tc_time)); g_free (tc_str); } #endif } } - /* If this buffer is a keyframe and a keyframe is already queued (i.e. - * time tracking for the current GOP is already initialized), then - * initialize the time tracking for the next GOP. - * - * If time tracking for the next GOP is already initialized but the - * running time is before the previously known start running time, set it - * again. */ - if ((!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) - && GST_CLOCK_STIME_IS_VALID (splitmux->gop_start_time)) - || (GST_CLOCK_STIME_IS_VALID (splitmux->next_gop_start_time) - && splitmux->next_gop_start_time > running_time)) { - if (!GST_CLOCK_STIME_IS_VALID (splitmux->next_gop_start_time)) - splitmux->next_gop_start_time_pts = running_time_pts; - splitmux->next_gop_start_time = running_time; + + /* First check if we're at the very first GOP and the tracking was created + * from a GAP event. In that case don't start a new GOP on keyframes but + * just updated it as needed */ + gop = g_queue_peek_tail (&splitmux->pending_input_gops); + + if (!gop || (!gop->from_gap + && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) { + gop = g_slice_new0 (InputGop); + + gop->start_time = running_time; + gop->start_time_pts = running_time_pts; GST_LOG_OBJECT (splitmux, "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %" - GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->next_gop_start_time), - GST_STIME_ARGS (splitmux->next_gop_start_time_pts)); + GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time), + GST_STIME_ARGS (gop->start_time_pts)); if (tc_meta) { - video_time_code_replace (&splitmux->next_gop_start_tc, &tc_meta->tc); + video_time_code_replace (&gop->start_tc, &tc_meta->tc); #ifndef GST_DISABLE_GST_DEBUG { gchar *tc_str; tc_str = gst_video_time_code_to_string (&tc_meta->tc); - GST_DEBUG_OBJECT (splitmux, "Initialize next GOP start tc %s", - tc_str); + GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str); g_free (tc_str); } #endif } - } - /* similarly initialize the current GOP start time if it was not - * initialized yet, i.e. for the first buffer, or if the current running - * time is smaller. - * - * This is the GOP that would be drained out next. If we started queueing - * the next GOP already then this code would not trigger unless the next - * GOP has a smaller PTS, which should not happen. */ - if (!GST_CLOCK_STIME_IS_VALID (splitmux->gop_start_time) - || splitmux->gop_start_time > running_time) { - if (!GST_CLOCK_STIME_IS_VALID (splitmux->gop_start_time)) - splitmux->gop_start_time_pts = running_time_pts; - splitmux->gop_start_time = running_time; + g_queue_push_tail (&splitmux->pending_input_gops, gop); + } else { + gop->from_gap = FALSE; - GST_LOG_OBJECT (splitmux, - "GOP start time now %" GST_STIME_FORMAT " (initial PTS %" - GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->gop_start_time), - GST_STIME_ARGS (splitmux->gop_start_time_pts)); + if (!GST_CLOCK_STIME_IS_VALID (gop->start_time) + || gop->start_time > running_time) { + gop->start_time = running_time; - if (tc_meta) { - video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc); + GST_LOG_OBJECT (splitmux, + "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %" + GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time), + GST_STIME_ARGS (gop->start_time_pts)); + + if (tc_meta) { + video_time_code_replace (&gop->start_tc, &tc_meta->tc); #ifndef GST_DISABLE_GST_DEBUG - { - gchar *tc_str; + { + gchar *tc_str; - tc_str = gst_video_time_code_to_string (&tc_meta->tc); - GST_DEBUG_OBJECT (splitmux, "Initialize GOP start tc %s", tc_str); - g_free (tc_str); - } + tc_str = gst_video_time_code_to_string (&tc_meta->tc); + GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s", + tc_str); + g_free (tc_str); + } #endif + } } } @@ -2974,10 +3055,16 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } } - GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT - " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %" - G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts), - splitmux->gop_total_bytes, splitmux->next_gop_total_bytes); + { + InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops); + + if (gop) { + GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT + " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %" + G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts), + gop->total_bytes, gop->total_bytes); + } + } loop_again = TRUE; do { @@ -2992,6 +3079,8 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) "running. Data might not drain correctly"); loop_again = FALSE; } else if (ctx->is_reference) { + const InputGop *gop, *next_gop; + /* This is the reference context. If it's a keyframe, * it marks the start of a new GOP and we should wait in * check_completed_gop before continuing, but either way @@ -2999,39 +3088,41 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) * so set loop_again to FALSE */ loop_again = FALSE; + gop = g_queue_peek_head (&splitmux->pending_input_gops); + g_assert (gop != NULL); + next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1); + if (ctx->in_running_time > splitmux->max_in_running_time) splitmux->max_in_running_time = ctx->in_running_time; + if (running_time_dts > splitmux->max_in_running_time_dts) + splitmux->max_in_running_time_dts = running_time_dts; + GST_LOG_OBJECT (splitmux, - "Max in running time now %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->max_in_running_time)); + "Max in running time now %" GST_STIME_FORMAT ", DTS %" + GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time), + GST_STIME_ARGS (splitmux->max_in_running_time_dts)); - if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) { - GST_INFO_OBJECT (pad, - "Have keyframe with running time %" GST_STIME_FORMAT, - GST_STIME_ARGS (ctx->in_running_time)); - keyframe = TRUE; - } else if (splitmux->next_gop_start_time_pts == GST_CLOCK_STIME_NONE) { - /* We didn't get the keyframe after the current GOP yet, so - * allow other input pads to catch up to here too */ - GST_SPLITMUX_BROADCAST_INPUT (splitmux); - break; - } - - - if (splitmux->next_gop_start_time_pts == GST_CLOCK_STIME_NONE) { + if (!next_gop) { GST_DEBUG_OBJECT (pad, "Waiting for end of GOP"); /* Allow other input pads to catch up to here too */ GST_SPLITMUX_BROADCAST_INPUT (splitmux); break; } + if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) { + GST_INFO_OBJECT (pad, + "Have keyframe with running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time)); + keyframe = TRUE; + } + if (running_time_dts != GST_CLOCK_STIME_NONE - && running_time_dts < splitmux->next_gop_start_time_pts) { + && running_time_dts < next_gop->start_time_pts) { GST_DEBUG_OBJECT (splitmux, "Waiting until DTS (%" GST_STIME_FORMAT ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")", GST_STIME_ARGS (running_time_dts), - GST_STIME_ARGS (splitmux->next_gop_start_time_pts)); + GST_STIME_ARGS (next_gop->start_time_pts)); /* Allow other input pads to catch up to here too */ GST_SPLITMUX_BROADCAST_INPUT (splitmux); break; @@ -3057,17 +3148,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } break; case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{ - /* We're collecting a GOP. If this is the reference context, - * we need to check if this is a keyframe that marks the start - * of the next GOP. If it is, it marks the end of the GOP we're - * collecting, so sleep and wait until all the other pads also - * reach that timestamp - at which point, we have an entire GOP - * and either go to ENDING_FILE or release this GOP to the muxer and - * go back to COLLECT_GOP_START. */ + /* We're collecting a GOP, this is only ever called for non-reference + * contexts as the reference context would be waiting inside + * check_completed_gop() */ + + g_assert (!ctx->is_reference); /* If we overran the target timestamp, it might be time to process - * the GOP, otherwise bail out for more data - */ + * the GOP, otherwise bail out for more data. */ GST_LOG_OBJECT (pad, "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time), @@ -3097,16 +3185,17 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) splitmux->queued_keyframes++; buf_info->keyframe = keyframe; - /* Update total input byte counter for overflow detect */ - if (GST_CLOCK_STIME_IS_VALID (splitmux->next_gop_start_time)) { - splitmux->next_gop_total_bytes += buf_info->buf_size; + /* Update total input byte counter for overflow detect unless we're after + * EOS now */ + if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP) { + InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops); + + /* We must have a GOP at this point */ + g_assert (gop != NULL); + + gop->total_bytes += buf_info->buf_size; if (ctx->is_reference) { - splitmux->next_gop_reference_bytes += buf_info->buf_size; - } - } else { - splitmux->gop_total_bytes += buf_info->buf_size; - if (ctx->is_reference) { - splitmux->gop_reference_bytes += buf_info->buf_size; + gop->reference_bytes += buf_info->buf_size; } } @@ -3810,18 +3899,18 @@ static void gst_splitmux_sink_reset (GstSplitMuxSink * splitmux) { splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; - splitmux->next_gop_start_time = splitmux->gop_start_time = - splitmux->fragment_start_time = GST_CLOCK_STIME_NONE; - splitmux->next_gop_start_time_pts = splitmux->gop_start_time_pts = - splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE; + splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE; + + splitmux->fragment_start_time = GST_CLOCK_STIME_NONE; + splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE; + g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free); + + g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL); + g_queue_clear (&splitmux->pending_input_gops); splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; splitmux->fragment_total_bytes = 0; splitmux->fragment_reference_bytes = 0; - splitmux->gop_total_bytes = 0; - splitmux->gop_reference_bytes = 0; - splitmux->next_gop_total_bytes = 0; - splitmux->next_gop_reference_bytes = 0; splitmux->muxed_out_bytes = 0; splitmux->ready_for_output = FALSE; @@ -3836,8 +3925,6 @@ gst_splitmux_sink_reset (GstSplitMuxSink * splitmux) g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL); g_queue_clear (&splitmux->out_cmd_q); - - gst_splitmux_reset_timecode (splitmux); } static GstStateChangeReturn diff --git a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h index 5b5262e44c..7cf8c45124 100644 --- a/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h +++ b/subprojects/gst-plugins-good/gst/multifile/gstsplitmuxsink.h @@ -67,6 +67,26 @@ typedef struct _MqStreamBuf GstClockTime duration; } MqStreamBuf; +typedef struct { + /* For the very first GOP if it was created from a GAP event */ + gboolean from_gap; + + /* Minimum start time (PTS or DTS) of the GOP */ + GstClockTimeDiff start_time; + /* Start time (PTS) of the GOP */ + GstClockTimeDiff start_time_pts; + /* Minimum start timecode of the GOP */ + GstVideoTimeCode *start_tc; + + /* Number of bytes we've collected into the GOP */ + guint64 total_bytes; + /* Number of bytes from the reference context + * that we've collected into the GOP */ + guint64 reference_bytes; + + gboolean sent_fku; +} InputGop; + typedef struct _MqStreamCtx { GstSplitMuxSink *splitmux; @@ -146,6 +166,7 @@ struct _GstSplitMuxSink SplitMuxInputState input_state; GstClockTimeDiff max_in_running_time; + GstClockTimeDiff max_in_running_time_dts; /* Number of bytes sent to the * current fragment */ @@ -154,20 +175,6 @@ struct _GstSplitMuxSink * stream in this fragment */ guint64 fragment_reference_bytes; - /* Number of bytes we've collected into - * the GOP that's being collected */ - guint64 gop_total_bytes; - /* Number of bytes from the reference context - * that we've collected into the current GOP */ - guint64 gop_reference_bytes; - - /* Number of bytes we've collected into - * the next GOP that's being collected */ - guint64 next_gop_total_bytes; - /* Number of bytes from the reference context - * that we've collected into the next GOP */ - guint64 next_gop_reference_bytes; - /* Minimum start time (PTS or DTS) of the current fragment */ GstClockTimeDiff fragment_start_time; /* Start time (PTS) of the current fragment */ @@ -175,26 +182,8 @@ struct _GstSplitMuxSink /* Minimum start timecode of the current fragment */ GstVideoTimeCode *fragment_start_tc; - /* Current GOP is the oldest GOP that is currently queued, i.e. the one that - * would be drained out next */ - - /* Minimum start time (PTS or DTS) of the current GOP */ - GstClockTimeDiff gop_start_time; - /* Start time (PTS) of the next GOP */ - GstClockTimeDiff gop_start_time_pts; - /* Minimum start timecode of the current GOP */ - GstVideoTimeCode *gop_start_tc; - - /* Next GOP is the next GOP that comes after the current GOP. Only its - * start is queued before draining the current GOP to accurately determine - * the end time of the current GOP. */ - - /* Minimum start time (PTS or DTS) of the next GOP */ - GstClockTimeDiff next_gop_start_time; - /* Start time (PTS) of the current GOP */ - GstClockTimeDiff next_gop_start_time_pts; - /* Minimum start timecode of the next GOP */ - GstVideoTimeCode *next_gop_start_tc; + /* Oldest GOP at head, newest GOP at tail */ + GQueue pending_input_gops; /* expected running time of next fragment in timecode mode */ GstClockTime next_fragment_start_tc_time;