diff --git a/subprojects/gst-plugins-base/gst-libs/gst/audio/gstaudioaggregator.c b/subprojects/gst-plugins-base/gst-libs/gst/audio/gstaudioaggregator.c index a811c4789d..784c047cf3 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/audio/gstaudioaggregator.c +++ b/subprojects/gst-plugins-base/gst-libs/gst/audio/gstaudioaggregator.c @@ -537,6 +537,7 @@ static GstSample *gst_audio_aggregator_peek_next_sample (GstAggregator * agg, #define DEFAULT_DISCONT_WAIT (1 * GST_SECOND) #define DEFAULT_OUTPUT_BUFFER_DURATION_N (1) #define DEFAULT_OUTPUT_BUFFER_DURATION_D (100) +#define DEFAULT_FORCE_LIVE FALSE enum { @@ -546,6 +547,7 @@ enum PROP_DISCONT_WAIT, PROP_OUTPUT_BUFFER_DURATION_FRACTION, PROP_IGNORE_INACTIVE_PADS, + PROP_FORCE_LIVE, }; G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GstAudioAggregator, gst_audio_aggregator, @@ -728,6 +730,23 @@ gst_audio_aggregator_class_init (GstAudioAggregatorClass * klass) "Ignore inactive pads", "Avoid timing out waiting for inactive pads", FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstAudioAggregator:force-live: + * + * Causes the element to aggregate on a timeout even when no live source is + * connected to its sinks. See #GstAggregator:min-upstream-latency for a + * companion property: in the vast majority of cases where you plan to plug in + * live sources with a non-zero latency, you should set it to a non-zero value. + * + * Since: 1.22 + */ + g_object_class_install_property (gobject_class, PROP_FORCE_LIVE, + g_param_spec_boolean ("force-live", "Force live", + "Always operate in live mode and aggregate on timeout regardless of " + "whether any live sources are linked upstream", + DEFAULT_FORCE_LIVE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY)); } static void @@ -797,6 +816,10 @@ gst_audio_aggregator_set_property (GObject * object, guint prop_id, gst_aggregator_set_ignore_inactive_pads (GST_AGGREGATOR (object), g_value_get_boolean (value)); break; + case PROP_FORCE_LIVE: + gst_aggregator_set_force_live (GST_AGGREGATOR (object), + g_value_get_boolean (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -829,6 +852,10 @@ gst_audio_aggregator_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, gst_aggregator_get_ignore_inactive_pads (GST_AGGREGATOR (object))); break; + case PROP_FORCE_LIVE: + g_value_set_boolean (value, + gst_aggregator_get_force_live (GST_AGGREGATOR (object))); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2195,7 +2222,7 @@ gst_audio_aggregator_aggregate (GstAggregator * agg, gboolean timeout) gint64 next_timestamp; gint rate, bpf; gboolean dropped = FALSE; - gboolean is_eos = TRUE; + gboolean is_eos = !gst_aggregator_get_force_live (agg); gboolean is_done = TRUE; guint blocksize; GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad); diff --git a/subprojects/gst-plugins-base/gst-libs/gst/video/gstvideoaggregator.c b/subprojects/gst-plugins-base/gst-libs/gst/video/gstvideoaggregator.c index 31e9d700e6..ffbf69d401 100644 --- a/subprojects/gst-plugins-base/gst-libs/gst/video/gstvideoaggregator.c +++ b/subprojects/gst-plugins-base/gst-libs/gst/video/gstvideoaggregator.c @@ -955,7 +955,13 @@ static void g_thread_self()); \ } G_STMT_END +enum +{ + PROP_0, + PROP_FORCE_LIVE, +}; +#define DEFAULT_FORCE_LIVE FALSE /* Can't use the G_DEFINE_TYPE macros because we need the * videoaggregator class in the _init to be able to set @@ -1717,7 +1723,7 @@ gst_video_aggregator_fill_queues (GstVideoAggregator * vagg, GstClockTime output_end_running_time, gboolean timeout) { GList *l; - gboolean eos = TRUE; + gboolean eos = !gst_aggregator_get_force_live (GST_AGGREGATOR (vagg)); gboolean repeat_pad_eos = FALSE; gboolean has_no_repeat_pads = FALSE; gboolean need_more_data = FALSE; @@ -3002,6 +3008,10 @@ gst_video_aggregator_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { switch (prop_id) { + case PROP_FORCE_LIVE: + g_value_set_boolean (value, + gst_aggregator_get_force_live (GST_AGGREGATOR (object))); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3013,6 +3023,10 @@ gst_video_aggregator_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { switch (prop_id) { + case PROP_FORCE_LIVE: + gst_aggregator_set_force_live (GST_AGGREGATOR (object), + g_value_get_boolean (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3070,6 +3084,23 @@ gst_video_aggregator_class_init (GstVideoAggregatorClass * klass) /* Register the pad class */ g_type_class_ref (GST_TYPE_VIDEO_AGGREGATOR_PAD); + + /** + * GstVideoAggregator:force-live: + * + * Causes the element to aggregate on a timeout even when no live source is + * connected to its sinks. See #GstAggregator:min-upstream-latency for a + * companion property: in the vast majority of cases where you plan to plug in + * live sources with a non-zero latency, you should set it to a non-zero value. + * + * Since: 1.22 + */ + g_object_class_install_property (gobject_class, PROP_FORCE_LIVE, + g_param_spec_boolean ("force-live", "Force live", + "Always operate in live mode and aggregate on timeout regardless of " + "whether any live sources are linked upstream", + DEFAULT_FORCE_LIVE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY)); } static void diff --git a/subprojects/gstreamer/libs/gst/base/gstaggregator.c b/subprojects/gstreamer/libs/gst/base/gstaggregator.c index d51ac5647d..abd711052d 100644 --- a/subprojects/gstreamer/libs/gst/base/gstaggregator.c +++ b/subprojects/gstreamer/libs/gst/base/gstaggregator.c @@ -371,6 +371,7 @@ struct _GstAggregatorPrivate gboolean send_segment; gboolean flushing; gboolean send_eos; /* protected by srcpad stream lock */ + gboolean got_eos_event; /* protected by srcpad stream lock */ GstCaps *srccaps; /* protected by the srcpad stream lock */ @@ -409,8 +410,16 @@ struct _GstAggregatorPrivate gint64 latency; /* protected by both src_lock and all pad locks */ gboolean emit_signals; gboolean ignore_inactive_pads; + gboolean force_live; /* Construct only, doesn't need any locking */ }; +/* With SRC_LOCK */ +static gboolean +is_live_unlocked (GstAggregator * self) +{ + return self->priv->peer_latency_live || self->priv->force_live; +} + /* Seek event forwarding helper */ typedef struct { @@ -429,6 +438,7 @@ typedef struct #define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO #define DEFAULT_START_TIME (-1) #define DEFAULT_EMIT_SIGNALS FALSE +#define DEFAULT_FORCE_LIVE FALSE enum { @@ -501,7 +511,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self, break; } - if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live && + if (self->priv->ignore_inactive_pads && is_live_unlocked (self) && pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) { PAD_UNLOCK (pad); GST_LOG_OBJECT (pad, "Ignoring inactive pad"); @@ -528,7 +538,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self, } else { GST_TRACE_OBJECT (pad, "Have %" GST_TIME_FORMAT " queued in %u buffers", GST_TIME_ARGS (pad->priv->time_level), pad->priv->num_buffers); - if (self->priv->peer_latency_live) { + if (is_live_unlocked (self)) { /* In live mode, having a single pad with buffers is enough to * generate a start time from it. In non-live mode all pads need * to have a buffer @@ -541,7 +551,7 @@ gst_aggregator_check_pads_ready (GstAggregator * self, PAD_UNLOCK (pad); } - if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live + if (self->priv->ignore_inactive_pads && is_live_unlocked (self) && n_ready == 0) goto no_sinkpads; @@ -1388,10 +1398,15 @@ gst_aggregator_aggregate_func (GstAggregator * self) if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK) goto handle_error; - if (self->priv->peer_latency_live) + if (is_live_unlocked (self)) gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self), gst_aggregator_pad_skip_buffers, NULL); + if (self->priv->got_eos_event) { + gst_aggregator_push_eos (self); + continue; + } + /* Ensure we have buffers ready (either in clipped_buffer or at the head of * the queue */ if (!gst_aggregator_wait_and_check (self, &timeout)) { @@ -1478,6 +1493,7 @@ gst_aggregator_start (GstAggregator * self) self->priv->send_stream_start = TRUE; self->priv->send_segment = TRUE; self->priv->send_eos = TRUE; + self->priv->got_eos_event = FALSE; self->priv->srccaps = NULL; self->priv->has_peer_latency = FALSE; @@ -1699,6 +1715,7 @@ gst_aggregator_default_sink_event (GstAggregator * self, event = NULL; SRC_LOCK (self); priv->send_eos = TRUE; + priv->got_eos_event = FALSE; SRC_BROADCAST (self); SRC_UNLOCK (self); @@ -1977,6 +1994,14 @@ gst_aggregator_change_state (GstElement * element, GstStateChange transition) SRC_LOCK (self); SRC_BROADCAST (self); SRC_UNLOCK (self); + if (self->priv->force_live) { + ret = GST_STATE_CHANGE_NO_PREROLL; + } + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + if (self->priv->force_live) { + ret = GST_STATE_CHANGE_NO_PREROLL; + } break; default: break; @@ -2195,12 +2220,18 @@ gst_aggregator_get_latency_unlocked (GstAggregator * self) ret = gst_aggregator_query_latency_unlocked (self, query); gst_query_unref (query); - if (!ret) + /* If we've been set to live, we don't wait for a peer latency, we will + * simply query it again next time around */ + if (!ret && !self->priv->force_live) return GST_CLOCK_TIME_NONE; } - if (!self->priv->has_peer_latency || !self->priv->peer_latency_live) - return GST_CLOCK_TIME_NONE; + /* If we've been set to live, we don't wait for a peer latency, we will + * simply query it again next time around */ + if (!self->priv->force_live) { + if (!self->priv->has_peer_latency || !self->priv->peer_latency_live) + return GST_CLOCK_TIME_NONE; + } /* latency_min is never GST_CLOCK_TIME_NONE by construction */ latency = self->priv->peer_latency_min; @@ -2262,8 +2293,17 @@ gst_aggregator_send_event (GstElement * element, GstEvent * event) GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event); } + GST_STATE_UNLOCK (element); + + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + SRC_LOCK (self); + self->priv->got_eos_event = TRUE; + SRC_BROADCAST (self); + SRC_UNLOCK (self); + } + return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element, event); } @@ -2627,6 +2667,16 @@ flushing: return FALSE; } +static void +gst_aggregator_constructed (GObject * object) +{ + GstAggregator *agg = GST_AGGREGATOR (object); + + if (agg->priv->force_live) { + GST_OBJECT_FLAG_SET (agg, GST_ELEMENT_FLAG_SOURCE); + } +} + static void gst_aggregator_finalize (GObject * object) { @@ -2818,6 +2868,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass) gobject_class->set_property = gst_aggregator_set_property; gobject_class->get_property = gst_aggregator_get_property; + gobject_class->constructed = gst_aggregator_constructed; gobject_class->finalize = gst_aggregator_finalize; g_object_class_install_property (gobject_class, PROP_LATENCY, @@ -2955,6 +3006,7 @@ gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass) self->priv->latency = DEFAULT_LATENCY; self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION; self->priv->start_time = DEFAULT_START_TIME; + self->priv->force_live = DEFAULT_FORCE_LIVE; g_mutex_init (&self->priv->src_lock); g_cond_init (&self->priv->src_cond); @@ -3007,7 +3059,7 @@ gst_aggregator_pad_has_space (GstAggregator * self, GstAggregatorPad * aggpad) /* We also want at least two buffers, one is being processed and one is ready * for the next iteration when we operate in live mode. */ - if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2) + if (is_live_unlocked (self) && aggpad->priv->num_buffers < 2) return TRUE; /* On top of our latency, we also want to allow buffering up to the @@ -3648,7 +3700,7 @@ gst_aggregator_pad_is_inactive (GstAggregatorPad * pad) g_assert_nonnull (self); PAD_LOCK (pad); - inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live + inactive = self->priv->ignore_inactive_pads && is_live_unlocked (self) && pad->priv->first_buffer; PAD_UNLOCK (pad); @@ -3933,3 +3985,33 @@ gst_aggregator_get_ignore_inactive_pads (GstAggregator * self) return ret; } + +/** + * gst_aggregator_get_force_live: + * + * Subclasses may use the return value to inform whether they should return + * %GST_FLOW_EOS from their aggregate implementation. + * + * Returns: whether live status was forced on @self. + * + * Since: 1.22 + */ +gboolean +gst_aggregator_get_force_live (GstAggregator * self) +{ + return self->priv->force_live; +} + +/** + * gst_aggregator_set_force_live: + * + * Subclasses should call this at construction time in order for @self to + * aggregate on a timeout even when no live source is connected. + * + * Since: 1.22 + */ +void +gst_aggregator_set_force_live (GstAggregator * self, gboolean force_live) +{ + self->priv->force_live = force_live; +} diff --git a/subprojects/gstreamer/libs/gst/base/gstaggregator.h b/subprojects/gstreamer/libs/gst/base/gstaggregator.h index 45ced12d39..bbac9e4c04 100644 --- a/subprojects/gstreamer/libs/gst/base/gstaggregator.h +++ b/subprojects/gstreamer/libs/gst/base/gstaggregator.h @@ -434,6 +434,13 @@ void gst_aggregator_set_ignore_inactive_pads (GstAggregator * self, GST_BASE_API gboolean gst_aggregator_get_ignore_inactive_pads (GstAggregator * self); +GST_BASE_API +gboolean gst_aggregator_get_force_live (GstAggregator *self); + +GST_BASE_API +void gst_aggregator_set_force_live (GstAggregator *self, + gboolean force_live); + /** * GstAggregatorStartTimeSelection: * @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0. diff --git a/subprojects/gstreamer/tests/check/libs/aggregator.c b/subprojects/gstreamer/tests/check/libs/aggregator.c index 23ca1adb51..215b06186c 100644 --- a/subprojects/gstreamer/tests/check/libs/aggregator.c +++ b/subprojects/gstreamer/tests/check/libs/aggregator.c @@ -26,6 +26,7 @@ #include #include +#include #include /* dummy aggregator based element */ @@ -143,10 +144,12 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout) } gst_iterator_free (iter); - if (all_eos == TRUE) { - GST_INFO_OBJECT (testagg, "no data available, must be EOS"); - gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ()); - return GST_FLOW_EOS; + if (!gst_aggregator_get_force_live (aggregator)) { + if (all_eos == TRUE) { + GST_INFO_OBJECT (testagg, "no data available, must be EOS"); + gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ()); + return GST_FLOW_EOS; + } } buf = gst_buffer_new (); @@ -188,6 +191,8 @@ gst_test_aggregator_class_init (GstTestAggregatorClass * klass) base_aggregator_class->aggregate = GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate); + + base_aggregator_class->get_next_time = gst_aggregator_simple_get_next_time; } static void @@ -846,8 +851,8 @@ GST_START_TEST (test_flushing_seek) GST_BUFFER_TIMESTAMP (buf) = 0; _chain_data_init (&data2, test.aggregator, buf, NULL); - gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test. - aggregator)->srcpad)->segment, GST_FORMAT_TIME); + gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.aggregator)-> + srcpad)->segment, GST_FORMAT_TIME); /* now do a successful flushing seek */ event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, @@ -1353,6 +1358,29 @@ GST_START_TEST (test_remove_pad_on_aggregate) GST_END_TEST; +GST_START_TEST (test_force_live) +{ + GstElement *agg; + GstHarness *h; + GstBuffer *buf; + + agg = gst_check_setup_element ("testaggregator"); + g_object_set (agg, "latency", GST_USECOND, NULL); + gst_aggregator_set_force_live (GST_AGGREGATOR (agg), TRUE); + h = gst_harness_new_with_element (agg, NULL, "src"); + + gst_harness_play (h); + + gst_harness_crank_single_clock_wait (h); + buf = gst_harness_pull (h); + + gst_buffer_unref (buf); + gst_harness_teardown (h); + gst_object_unref (agg); +} + +GST_END_TEST; + static Suite * gst_aggregator_suite (void) { @@ -1382,6 +1410,7 @@ gst_aggregator_suite (void) tcase_add_test (general, test_change_state_intensive); tcase_add_test (general, test_flush_on_aggregate); tcase_add_test (general, test_remove_pad_on_aggregate); + tcase_add_test (general, test_force_live); return suite; }