From d4c4af699eb3a728e84add8c4ea413e4f5d8c75c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 17 Dec 2014 17:54:09 +0100 Subject: [PATCH] aggregator: Add a timeout parameter to ::aggregate() When this is TRUE, we really have to produce output. This happens in live mixing mode when we have to output something for the current time, no matter if we have enough input or not. --- gst-libs/gst/base/gstaggregator.c | 10 +++++++--- gst-libs/gst/base/gstaggregator.h | 3 ++- gst-libs/gst/video/gstvideoaggregator.c | 4 ++-- gst/audiomixer/gstaudiomixer.c | 20 +++++++++++--------- gst/compositor/compositor.c | 3 +++ tests/check/libs/aggregator.c | 2 +- 6 files changed, 26 insertions(+), 16 deletions(-) diff --git a/gst-libs/gst/base/gstaggregator.c b/gst-libs/gst/base/gstaggregator.c index bb80527f35..16690f4568 100644 --- a/gst-libs/gst/base/gstaggregator.c +++ b/gst-libs/gst/base/gstaggregator.c @@ -486,12 +486,14 @@ gst_aggregator_get_next_time (GstAggregator * self) /* called with the src STREAM lock */ static gboolean -_wait_and_check (GstAggregator * self) +_wait_and_check (GstAggregator * self, gboolean * timeout) { GstClockTime latency_max, latency_min; GstClockTime start; gboolean live; + *timeout = FALSE; + gst_aggregator_get_latency (self, &live, &latency_min, &latency_max); if (gst_aggregator_iterate_sinkpads (self, @@ -561,6 +563,7 @@ _wait_and_check (GstAggregator * self) /* we timed out */ if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) { SRC_STREAM_UNLOCK (self); + *timeout = TRUE; return TRUE; } } @@ -575,6 +578,7 @@ aggregate_func (GstAggregator * self) { GstAggregatorPrivate *priv = self->priv; GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self); + gboolean timeout = FALSE; if (self->priv->running == FALSE) { GST_DEBUG_OBJECT (self, "Not running anymore"); @@ -583,12 +587,12 @@ aggregate_func (GstAggregator * self) GST_LOG_OBJECT (self, "Checking aggregate"); while (priv->send_eos && priv->running) { - if (!_wait_and_check (self)) + if (!_wait_and_check (self, &timeout)) continue; GST_TRACE_OBJECT (self, "Actually aggregating!"); - priv->flow_return = klass->aggregate (self); + priv->flow_return = klass->aggregate (self, timeout); if (priv->flow_return == GST_FLOW_EOS) { _push_eos (self); diff --git a/gst-libs/gst/base/gstaggregator.h b/gst-libs/gst/base/gstaggregator.h index 19b04e982c..d11545cb5d 100644 --- a/gst-libs/gst/base/gstaggregator.h +++ b/gst-libs/gst/base/gstaggregator.h @@ -237,7 +237,8 @@ struct _GstAggregatorClass { GstPadMode mode, gboolean active); - GstFlowReturn (*aggregate) (GstAggregator * aggregator); + GstFlowReturn (*aggregate) (GstAggregator * aggregator, + gboolean timeout); gboolean (*stop) (GstAggregator * aggregator); diff --git a/gst-libs/gst/video/gstvideoaggregator.c b/gst-libs/gst/video/gstvideoaggregator.c index a711beea8c..898de598f2 100644 --- a/gst-libs/gst/video/gstvideoaggregator.c +++ b/gst-libs/gst/video/gstvideoaggregator.c @@ -1230,7 +1230,7 @@ gst_videoaggregator_get_next_time (GstAggregator * agg) } static GstFlowReturn -gst_videoaggregator_aggregate (GstAggregator * agg) +gst_videoaggregator_aggregate (GstAggregator * agg, gboolean timeout) { GstFlowReturn ret; GstVideoAggregator *vagg = GST_VIDEO_AGGREGATOR (agg); @@ -1275,7 +1275,7 @@ gst_videoaggregator_aggregate (GstAggregator * agg) output_end_time); } - if (res == GST_FLOW_NEEDS_DATA) { + if (res == GST_FLOW_NEEDS_DATA && !timeout) { GST_DEBUG_OBJECT (vagg, "Need more data for decisions"); ret = GST_FLOW_OK; goto done; diff --git a/gst/audiomixer/gstaudiomixer.c b/gst/audiomixer/gstaudiomixer.c index 9cac2d7b92..5929b2dcc2 100644 --- a/gst/audiomixer/gstaudiomixer.c +++ b/gst/audiomixer/gstaudiomixer.c @@ -233,7 +233,8 @@ static void gst_audiomixer_release_pad (GstElement * element, GstPad * pad); static GstFlowReturn gst_audiomixer_do_clip (GstAggregator * agg, GstAggregatorPad * bpad, GstBuffer * buffer, GstBuffer ** outbuf); -static GstFlowReturn gst_audiomixer_aggregate (GstAggregator * agg); +static GstFlowReturn gst_audiomixer_aggregate (GstAggregator * agg, + gboolean timeout); static GstClockTime gst_audiomixer_get_next_time (GstAggregator * agg) @@ -1327,7 +1328,7 @@ gst_audio_mixer_mix_buffer (GstAudioMixer * audiomixer, GstAudioMixerPad * pad, } static GstFlowReturn -gst_audiomixer_aggregate (GstAggregator * agg) +gst_audiomixer_aggregate (GstAggregator * agg, gboolean timeout) { /* Get all pads that have data for us and store them in a * new list. @@ -1401,7 +1402,6 @@ gst_audiomixer_aggregate (GstAggregator * agg) } else { next_offset = audiomixer->offset - audiomixer->blocksize; } - next_timestamp = gst_util_uint64_scale (next_offset, GST_SECOND, rate); if (audiomixer->current_buffer) { @@ -1428,13 +1428,14 @@ gst_audiomixer_aggregate (GstAggregator * agg) GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (iter->data); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (iter->data); - inbuf = gst_aggregator_pad_get_buffer (aggpad); if (!inbuf) continue; + g_assert (!pad->buffer || pad->buffer == inbuf); + /* New buffer? */ - if (!pad->buffer || pad->buffer != inbuf) { + if (!pad->buffer) { /* Takes ownership of buffer */ if (!gst_audio_mixer_fill_buffer (audiomixer, pad, inbuf)) { dropped = TRUE; @@ -1451,11 +1452,13 @@ gst_audiomixer_aggregate (GstAggregator * agg) } /* At this point adata->output_offset >= audiomixer->offset or we have no buffer anymore */ + g_assert (!pad->buffer || pad->output_offset >= audiomixer->offset); if (pad->output_offset >= audiomixer->offset && pad->output_offset < audiomixer->offset + audiomixer->blocksize && pad->buffer) { GST_LOG_OBJECT (aggpad, "Mixing buffer for current offset"); gst_audio_mixer_mix_buffer (audiomixer, pad, &outmap); + if (pad->output_offset >= next_offset) { GST_DEBUG_OBJECT (pad, "Pad is after current offset: %" G_GUINT64_FORMAT " >= %" @@ -1469,17 +1472,17 @@ gst_audiomixer_aggregate (GstAggregator * agg) gst_buffer_unmap (outbuf, &outmap); - if (dropped) { + if (dropped && !timeout) { /* We dropped a buffer, retry */ GST_INFO_OBJECT (audiomixer, "A pad dropped a buffer, wait for the next one"); return GST_FLOW_OK; } - if (!is_done && !is_eos) { + if (!is_done && !is_eos && !timeout) { /* Get more buffers */ GST_INFO_OBJECT (audiomixer, - "We're not done yet for the current offset," " waiting for more data"); + "We're not done yet for the current offset, waiting for more data"); return GST_FLOW_OK; } @@ -1489,7 +1492,6 @@ gst_audiomixer_aggregate (GstAggregator * agg) GST_DEBUG_OBJECT (audiomixer, "We're EOS"); - GST_OBJECT_LOCK (agg); for (iter = GST_ELEMENT (agg)->sinkpads; iter; iter = iter->next) { GstAudioMixerPad *pad = GST_AUDIO_MIXER_PAD (iter->data); diff --git a/gst/compositor/compositor.c b/gst/compositor/compositor.c index 04d01a14cc..c5d09011c5 100644 --- a/gst/compositor/compositor.c +++ b/gst/compositor/compositor.c @@ -280,6 +280,9 @@ gst_compositor_pad_prepare_frame (GstVideoAggregatorPad * pad, static GstAllocationParams params = { 0, 15, 0, 0, }; gint width, height; + if (!pad->buffer) + return TRUE; + if (!gst_video_frame_map (frame, &pad->buffer_vinfo, pad->buffer, GST_MAP_READ)) { GST_WARNING_OBJECT (vagg, "Could not map input buffer"); diff --git a/tests/check/libs/aggregator.c b/tests/check/libs/aggregator.c index 46a657fa17..5e0c388d2f 100644 --- a/tests/check/libs/aggregator.c +++ b/tests/check/libs/aggregator.c @@ -63,7 +63,7 @@ struct _GstTestAggregatorClass }; static GstFlowReturn -gst_test_aggregator_aggregate (GstAggregator * aggregator) +gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout) { GstIterator *iter; gboolean all_eos = TRUE;