diff --git a/sys/shm/gstshmsink.c b/sys/shm/gstshmsink.c index a7d7e52073..cb6c92bfa6 100644 --- a/sys/shm/gstshmsink.c +++ b/sys/shm/gstshmsink.c @@ -45,7 +45,8 @@ enum PROP_SOCKET_PATH, PROP_PERMS, PROP_SHM_SIZE, - PROP_WAIT_FOR_CONNECTION + PROP_WAIT_FOR_CONNECTION, + PROP_BUFFER_TIME }; struct GstShmClient @@ -162,6 +163,13 @@ gst_shm_sink_class_init (GstShmSinkClass * klass) DEFAULT_WAIT_FOR_CONNECTION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_BUFFER_TIME, + g_param_spec_uint64 ("buffer-time", + "Buffer Time of the shm buffer", + "Maximum Size of the shm buffer in nanoseconds (-1 to disable)", + 0, G_MAXUINT64, GST_CLOCK_TIME_NONE, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + signals[SIGNAL_CLIENT_CONNECTED] = g_signal_new ("client-connected", GST_TYPE_SHM_SINK, G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); @@ -229,6 +237,12 @@ gst_shm_sink_set_property (GObject * object, guint prop_id, GST_OBJECT_UNLOCK (object); g_cond_broadcast (self->cond); break; + case PROP_BUFFER_TIME: + GST_OBJECT_LOCK (object); + self->buffer_time = g_value_get_uint64 (value); + GST_OBJECT_UNLOCK (object); + g_cond_broadcast (self->cond); + break; default: break; } @@ -263,6 +277,9 @@ gst_shm_sink_get_property (GObject * object, guint prop_id, case PROP_WAIT_FOR_CONNECTION: g_value_set_boolean (value, self->wait_for_connection); break; + case PROP_BUFFER_TIME: + g_value_set_uint64 (value, self->buffer_time); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -358,6 +375,24 @@ gst_shm_sink_stop (GstBaseSink * bsink) return TRUE; } +static gboolean +gst_shm_sink_can_render (GstShmSink * self, GstClockTime time) +{ + ShmBuffer *b; + + if (time == GST_CLOCK_TIME_NONE || self->buffer_time == GST_CLOCK_TIME_NONE) + return TRUE; + + b = sp_writer_get_pending_buffers (self->pipe); + for (; b != NULL; b = sp_writer_get_next_buffer (b)) { + GstClockTime t = sp_writer_buf_get_tag (b); + if (GST_CLOCK_DIFF (time, t) > self->buffer_time) + return FALSE; + } + + return TRUE; +} + static GstFlowReturn gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) { @@ -373,8 +408,16 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) } } + while (!gst_shm_sink_can_render (self, GST_BUFFER_TIMESTAMP (buf))) { + g_cond_wait (self->cond, GST_OBJECT_GET_LOCK (self)); + if (self->unlock) { + GST_OBJECT_UNLOCK (self); + return GST_FLOW_WRONG_STATE; + } + } + rv = sp_writer_send_buf (self->pipe, (char *) GST_BUFFER_DATA (buf), - GST_BUFFER_SIZE (buf)); + GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); if (rv == -1) { ShmBlock *block = NULL; @@ -396,10 +439,10 @@ gst_shm_sink_render (GstBaseSink * bsink, GstBuffer * buf) } } - shmbuf = sp_writer_block_get_buf (block); memcpy (shmbuf, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)); - sp_writer_send_buf (self->pipe, shmbuf, GST_BUFFER_SIZE (buf)); + sp_writer_send_buf (self->pipe, shmbuf, GST_BUFFER_SIZE (buf), + GST_BUFFER_TIMESTAMP (buf)); sp_writer_free_block (block); } diff --git a/sys/shm/gstshmsink.h b/sys/shm/gstshmsink.h index bc6da74bc7..83dad393e5 100644 --- a/sys/shm/gstshmsink.h +++ b/sys/shm/gstshmsink.h @@ -61,6 +61,7 @@ struct _GstShmSink gboolean wait_for_connection; gboolean stop; gboolean unlock; + GstClockTime buffer_time; GCond *cond; };