From 8345caf6e05f20c878f98e6c74a4812270b53b9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 29 Jun 2020 10:10:09 +0300 Subject: [PATCH] imagefreeze: Add a live mode Previously imagefreeze would always operate as non-live element and output frames as fast as possible according to the configured segment (via SEEK events) and the negotiated framerate from start to stop or the other way around. With the new live mode (enabled via the is-live property) it would only output frames in PLAYING. Frames would be output according to the negotiated framerate unless it would be too late, in which case it would jump ahead and skip over the requirement amount of frames. This makes it possible to actually use imagefreeze in live pipelines without having to manually ensure somehow that it would start outputting at the current running time and without still risking to fall behind without recovery. Part-of: --- docs/gst_plugins_cache.json | 12 ++ gst/imagefreeze/gstimagefreeze.c | 194 ++++++++++++++++++++++++++++--- gst/imagefreeze/gstimagefreeze.h | 5 + 3 files changed, 197 insertions(+), 14 deletions(-) diff --git a/docs/gst_plugins_cache.json b/docs/gst_plugins_cache.json index 7a2de1f35b..c02e205074 100644 --- a/docs/gst_plugins_cache.json +++ b/docs/gst_plugins_cache.json @@ -7341,6 +7341,18 @@ "type": "gboolean", "writable": true }, + "is-live": { + "blurb": "Whether to output a live video stream", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "null", + "readable": true, + "type": "gboolean", + "writable": true + }, "num-buffers": { "blurb": "Number of buffers to output before sending EOS (-1 = unlimited)", "conditionally-available": false, diff --git a/gst/imagefreeze/gstimagefreeze.c b/gst/imagefreeze/gstimagefreeze.c index 873ba4afa9..12d83b249b 100644 --- a/gst/imagefreeze/gstimagefreeze.c +++ b/gst/imagefreeze/gstimagefreeze.c @@ -1,6 +1,7 @@ /* GStreamer * Copyright (c) 2005 Edward Hervey * Copyright (C) 2010 Sebastian Dröge + * Copyright (C) 2020 Sebastian Dröge * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -47,12 +48,14 @@ #define DEFAULT_NUM_BUFFERS -1 #define DEFAULT_ALLOW_REPLACE FALSE +#define DEFAULT_IS_LIVE FALSE enum { PROP_0, PROP_NUM_BUFFERS, PROP_ALLOW_REPLACE, + PROP_IS_LIVE, }; static void gst_image_freeze_finalize (GObject * object); @@ -61,6 +64,7 @@ static void gst_image_freeze_reset (GstImageFreeze * self); static GstStateChangeReturn gst_image_freeze_change_state (GstElement * element, GstStateChange transition); +static GstClock *gst_image_freeze_provide_clock (GstElement * element); static void gst_image_freeze_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -119,8 +123,24 @@ gst_image_freeze_class_init (GstImageFreezeClass * klass) "Allow replacing the input buffer and always output the latest", DEFAULT_ALLOW_REPLACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstImageFreeze:is-live + * + * Selects whether the output stream should be a non-live stream based on + * the segment configured via a %GST_EVENT_SEEK, or whether the output + * stream should be a live stream with the negotiated framerate. + * + * Since: 1.18 + */ + g_object_class_install_property (gobject_class, PROP_IS_LIVE, + g_param_spec_boolean ("is-live", "Is Live", + "Whether to output a live video stream", + DEFAULT_IS_LIVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_image_freeze_change_state); + gstelement_class->provide_clock = + GST_DEBUG_FUNCPTR (gst_image_freeze_provide_clock); gst_element_class_set_static_metadata (gstelement_class, "Still frame stream generator", @@ -156,9 +176,11 @@ gst_image_freeze_init (GstImageFreeze * self) gst_element_add_pad (GST_ELEMENT (self), self->srcpad); g_mutex_init (&self->lock); + g_cond_init (&self->blocked_cond); self->num_buffers = DEFAULT_NUM_BUFFERS; self->allow_replace = DEFAULT_ALLOW_REPLACE; + self->is_live = DEFAULT_IS_LIVE; gst_image_freeze_reset (self); } @@ -173,6 +195,7 @@ gst_image_freeze_finalize (GObject * object) gst_image_freeze_reset (self); g_mutex_clear (&self->lock); + g_cond_clear (&self->blocked_cond); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -529,17 +552,30 @@ gst_image_freeze_src_query (GstPad * pad, GstObject * parent, GstQuery * query) gboolean seekable; gst_query_parse_seeking (query, &format, NULL, NULL, NULL); - seekable = (format == GST_FORMAT_TIME || format == GST_FORMAT_DEFAULT); + seekable = !self->is_live && (format == GST_FORMAT_TIME + || format == GST_FORMAT_DEFAULT); gst_query_set_seeking (query, format, seekable, (seekable ? 0 : -1), -1); ret = TRUE; break; } case GST_QUERY_LATENCY: - /* We never run as a live element, even if upstream is live, and never - * output any buffers with latency but immediately generate buffers as - * fast as we can according to the negotiated framerate */ - gst_query_set_latency (query, FALSE, 0, GST_CLOCK_TIME_NONE); + if (self->is_live) { + /* If we run live, we output the buffer without any latency but allow + * for at most one frame of latency. If downstream takes longer to + * consume out frame we would skip ahead */ + if (self->fps_n > 0 && self->fps_d > 0) + gst_query_set_latency (query, TRUE, 0, + gst_util_uint64_scale_ceil (GST_SECOND, self->fps_d, + self->fps_n)); + else + gst_query_set_latency (query, TRUE, 0, GST_CLOCK_TIME_NONE); + } else { + /* If we don't run live, even if upstream is live, we never output any + * buffers with latency but immediately generate buffers as fast as we + * can according to the negotiated framerate */ + gst_query_set_latency (query, FALSE, 0, GST_CLOCK_TIME_NONE); + } break; default: ret = FALSE; @@ -626,6 +662,13 @@ gst_image_freeze_src_event (GstPad * pad, GstObject * parent, GstEvent * event) gboolean flush; guint32 seqnum; + if (self->is_live) { + GST_ERROR_OBJECT (pad, "Can't seek in live mode"); + ret = FALSE; + gst_event_unref (event); + break; + } + seqnum = gst_event_get_seqnum (event); gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, &stop_type, &stop); @@ -751,6 +794,9 @@ gst_image_freeze_set_property (GObject * object, guint prop_id, case PROP_ALLOW_REPLACE: self->allow_replace = g_value_get_boolean (value); break; + case PROP_IS_LIVE: + self->is_live = g_value_get_boolean (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -772,6 +818,9 @@ gst_image_freeze_get_property (GObject * object, guint prop_id, GValue * value, case PROP_ALLOW_REPLACE: g_value_set_boolean (value, self->allow_replace); break; + case PROP_IS_LIVE: + g_value_set_boolean (value, self->is_live); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -904,16 +953,97 @@ gst_image_freeze_src_loop (GstPad * pad) g_mutex_lock (&self->lock); offset = self->offset; + if (self->is_live) { + GstClockTime base_time, clock_time; + GstClockTimeDiff jitter; + GstClockReturn clock_ret; + GstClock *clock; - if (self->fps_n != 0) { - timestamp = - gst_util_uint64_scale (offset, self->fps_d * GST_SECOND, self->fps_n); - timestamp_end = - gst_util_uint64_scale (offset + 1, self->fps_d * GST_SECOND, - self->fps_n); + /* Wait until the element went to PLAYING or flushing */ + while (self->blocked && !self->flushing) + g_cond_wait (&self->blocked_cond, &self->lock); + + if (self->flushing) { + g_mutex_unlock (&self->lock); + gst_buffer_unref (buffer); + flow_ret = GST_FLOW_FLUSHING; + goto pause_task; + } + + /* Wait on the clock until the time for our current frame is reached */ + clock = gst_element_get_clock (GST_ELEMENT (self)); + base_time = gst_element_get_base_time (GST_ELEMENT (self)); + if (self->fps_n != 0) { + clock_time = + base_time + gst_util_uint64_scale (offset, self->fps_d * GST_SECOND, + self->fps_n); + } else { + clock_time = base_time; + } + + self->clock_id = gst_clock_new_single_shot_id (clock, clock_time); + g_mutex_unlock (&self->lock); + GST_TRACE_OBJECT (self, + "Waiting for %" GST_TIME_FORMAT ", now %" GST_TIME_FORMAT, + GST_TIME_ARGS (clock_time), GST_TIME_ARGS (gst_clock_get_time (clock))); + clock_ret = gst_clock_id_wait (self->clock_id, &jitter); + GST_TRACE_OBJECT (self, + "Waited for %" GST_TIME_FORMAT ", clock ret %d, jitter %" + GST_STIME_FORMAT, GST_TIME_ARGS (clock_time), clock_ret, + GST_STIME_ARGS (jitter)); + g_mutex_lock (&self->lock); + gst_clock_id_unref (self->clock_id); + self->clock_id = NULL; + gst_object_unref (clock); + + if (self->flushing || clock_ret == GST_CLOCK_UNSCHEDULED) { + g_mutex_unlock (&self->lock); + gst_buffer_unref (buffer); + flow_ret = GST_FLOW_FLUSHING; + goto pause_task; + } + + /* If we were late, adjust our offset and jump ahead if needed */ + if (self->fps_n != 0) { + if (jitter > 0) { + guint64 new_offset = + gst_util_uint64_scale (clock_time + jitter - base_time, self->fps_n, + self->fps_d * GST_SECOND); + + if (new_offset != offset) { + GST_INFO_OBJECT (self, + "Late by %" GST_TIME_FORMAT ", old offset %" G_GUINT64_FORMAT + ", new offset %" G_GUINT64_FORMAT, GST_TIME_ARGS (jitter), offset, + new_offset); + self->offset = offset = new_offset; + } + } + + timestamp = + gst_util_uint64_scale (offset, self->fps_d * GST_SECOND, self->fps_n); + timestamp_end = + gst_util_uint64_scale (offset + 1, self->fps_d * GST_SECOND, + self->fps_n); + } else { + /* If we have no framerate then we output a single frame now */ + if (jitter > 0) + timestamp = jitter; + else + timestamp = 0; + + timestamp_end = GST_CLOCK_TIME_NONE; + } } else { - timestamp = self->segment.start; - timestamp_end = GST_CLOCK_TIME_NONE; + if (self->fps_n != 0) { + timestamp = + gst_util_uint64_scale (offset, self->fps_d * GST_SECOND, self->fps_n); + timestamp_end = + gst_util_uint64_scale (offset + 1, self->fps_d * GST_SECOND, + self->fps_n); + } else { + timestamp = self->segment.start; + timestamp_end = GST_CLOCK_TIME_NONE; + } } eos = (self->fps_n == 0 && offset > 0) || @@ -1022,17 +1152,36 @@ gst_image_freeze_change_state (GstElement * element, GstStateChange transition) { GstImageFreeze *self = GST_IMAGE_FREEZE (element); GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + gboolean no_preroll = FALSE; switch (transition) { case GST_STATE_CHANGE_READY_TO_PAUSED: gst_image_freeze_reset (self); g_mutex_lock (&self->lock); self->flushing = FALSE; + self->blocked = TRUE; + g_mutex_unlock (&self->lock); + if (self->is_live) + no_preroll = TRUE; + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + g_mutex_lock (&self->lock); + self->blocked = FALSE; + g_cond_signal (&self->blocked_cond); g_mutex_unlock (&self->lock); break; case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_pad_stop_task (self->srcpad); + g_mutex_lock (&self->lock); + self->flushing = TRUE; + if (self->clock_id) { + GST_DEBUG_OBJECT (self, "unlock clock wait"); + gst_clock_id_unschedule (self->clock_id); + } + self->blocked = FALSE; + g_cond_signal (&self->blocked_cond); + g_mutex_unlock (&self->lock); gst_image_freeze_reset (self); + gst_pad_stop_task (self->srcpad); break; default: break; @@ -1042,13 +1191,30 @@ gst_image_freeze_change_state (GstElement * element, GstStateChange transition) ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + g_mutex_lock (&self->lock); + self->blocked = TRUE; + g_mutex_unlock (&self->lock); + if (self->is_live) + no_preroll = TRUE; + break; default: break; } + if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS) + ret = GST_STATE_CHANGE_NO_PREROLL; + return ret; } +/* FIXME: GStreamer 2.0 */ +static GstClock * +gst_image_freeze_provide_clock (GstElement * element) +{ + return gst_system_clock_obtain (); +} + static gboolean plugin_init (GstPlugin * plugin) { diff --git a/gst/imagefreeze/gstimagefreeze.h b/gst/imagefreeze/gstimagefreeze.h index 08af41f8c3..d23feaa973 100644 --- a/gst/imagefreeze/gstimagefreeze.h +++ b/gst/imagefreeze/gstimagefreeze.h @@ -63,6 +63,11 @@ struct _GstImageFreeze gboolean allow_replace; + gboolean is_live; + gboolean blocked; + GCond blocked_cond; + GstClockID clock_id; + guint64 offset; gboolean flushing;