From dcb2be32821cb18f36ff2041f3ff36e0a462595b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 20 Mar 2020 19:28:37 +0200 Subject: [PATCH] filesink: Add a new full buffer mode to filesink Previously the default and full modes were the same. Now the default mode is like before: it accumulates all buffers in a buffer list until the threshold is reached and then writes them all out, potentially in multiple writes. The new full mode works by always copying memory to a single memory area and writing everything out with a single write once the threshold is reached. --- plugins/elements/gstfilesink.c | 161 ++++++++++++++++++++++++++------- plugins/elements/gstfilesink.h | 11 ++- 2 files changed, 139 insertions(+), 33 deletions(-) diff --git a/plugins/elements/gstfilesink.c b/plugins/elements/gstfilesink.c index aab3c9858c..3381d0ad4d 100644 --- a/plugins/elements/gstfilesink.c +++ b/plugins/elements/gstfilesink.c @@ -431,15 +431,24 @@ gst_file_sink_open_file (GstFileSink * sink) sink->seekable = gst_file_sink_do_seek (sink, 0); if (sink->buffer) - gst_buffer_list_unref (sink->buffer); + g_free (sink->buffer); sink->buffer = NULL; + if (sink->buffer_list) + gst_buffer_list_unref (sink->buffer_list); + sink->buffer_list = NULL; + if (sink->buffer_mode != GST_FILE_SINK_BUFFER_MODE_UNBUFFERED) { if (sink->buffer_size == 0) { sink->buffer_size = DEFAULT_BUFFER_SIZE; g_object_notify (G_OBJECT (sink), "buffer-size"); } - sink->buffer = gst_buffer_list_new (); + if (sink->buffer_mode == GST_FILE_SINK_BUFFER_MODE_FULL) { + sink->buffer = g_malloc (sink->buffer_size); + sink->allocated_buffer_size = sink->buffer_size; + } else { + sink->buffer_list = gst_buffer_list_new (); + } sink->current_buffer_size = 0; } @@ -481,9 +490,15 @@ gst_file_sink_close_file (GstFileSink * sink) } if (sink->buffer) { - gst_buffer_list_unref (sink->buffer); + g_free (sink->buffer); sink->buffer = NULL; } + sink->allocated_buffer_size = 0; + + if (sink->buffer_list) { + gst_buffer_list_unref (sink->buffer_list); + sink->buffer_list = NULL; + } sink->current_buffer_size = 0; } @@ -633,11 +648,11 @@ gst_file_sink_event (GstBaseSink * sink, GstEvent * event) if (ftruncate (fileno (filesink->file), 0)) goto truncate_failed; } - if (filesink->buffer) { - gst_buffer_list_unref (filesink->buffer); - filesink->buffer = gst_buffer_list_new (); - filesink->current_buffer_size = 0; + if (filesink->buffer_list) { + gst_buffer_list_unref (filesink->buffer_list); + filesink->buffer_list = gst_buffer_list_new (); } + filesink->current_buffer_size = 0; break; case GST_EVENT_EOS: if (gst_file_sink_flush_buffer (filesink) != GST_FLOW_OK) @@ -736,23 +751,36 @@ gst_file_sink_flush_buffer (GstFileSink * filesink) { GstFlowReturn flow_ret = GST_FLOW_OK; - if (filesink->buffer) { + GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %" G_GSIZE_FORMAT, + filesink->current_buffer_size); + + if (filesink->buffer && filesink->current_buffer_size) { + guint64 bytes_written = 0; + + flow_ret = + gst_writev_mem (GST_OBJECT_CAST (filesink), fileno (filesink->file), + NULL, filesink->buffer, filesink->current_buffer_size, &bytes_written, + 0, filesink->max_transient_error_timeout, filesink->current_pos, + &filesink->flushing); + + filesink->current_pos += bytes_written; + + } else if (filesink->buffer_list && filesink->current_buffer_size) { guint length; - length = gst_buffer_list_length (filesink->buffer); + length = gst_buffer_list_length (filesink->buffer_list); if (length > 0) { - GST_DEBUG_OBJECT (filesink, "Flushing out buffer of size %u", - filesink->current_buffer_size); flow_ret = - gst_file_sink_render_list_internal (filesink, filesink->buffer); + gst_file_sink_render_list_internal (filesink, filesink->buffer_list); /* Remove all buffers from the list but keep the list. This ensures that * we don't re-allocate the array storing the buffers all the time */ - gst_buffer_list_remove (filesink->buffer, 0, length); - filesink->current_buffer_size = 0; + gst_buffer_list_remove (filesink->buffer_list, 0, length); } } + filesink->current_buffer_size = 0; + return flow_ret; } @@ -796,7 +824,7 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list) gst_buffer_list_foreach (buffer_list, has_sync_after_buffer, &sync_after); - if (sync_after || !sink->buffer) { + if (sync_after || (!sink->buffer && !sink->buffer_list)) { flow = gst_file_sink_flush_buffer (sink); if (flow == GST_FLOW_OK) flow = gst_file_sink_render_list_internal (sink, buffer_list); @@ -809,15 +837,52 @@ gst_file_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list) G_GUINT64_FORMAT, size, num_buffers, sink->current_pos + sink->current_buffer_size); - for (i = 0; i < num_buffers; ++i) - gst_buffer_list_add (sink->buffer, - gst_buffer_ref (gst_buffer_list_get (buffer_list, i))); - sink->current_buffer_size += size; - - if (sink->current_buffer_size > sink->buffer_size) - flow = gst_file_sink_flush_buffer (sink); - else + if (sink->buffer) { flow = GST_FLOW_OK; + for (i = 0; i < num_buffers && flow == GST_FLOW_OK; i++) { + GstBuffer *buffer = gst_buffer_list_get (buffer_list, i); + gsize buffer_size = gst_buffer_get_size (buffer); + + if (sink->current_buffer_size + buffer_size > + sink->allocated_buffer_size) { + flow = gst_file_sink_flush_buffer (sink); + if (flow != GST_FLOW_OK) + return flow; + } + + if (buffer_size > sink->allocated_buffer_size) { + guint64 bytes_written = 0; + + GST_DEBUG_OBJECT (sink, + "writing buffer ( %" G_GSIZE_FORMAT + " bytes) at position %" G_GUINT64_FORMAT, + buffer_size, sink->current_pos); + + flow = + gst_writev_buffer (GST_OBJECT_CAST (sink), + fileno (sink->file), NULL, buffer, &bytes_written, 0, + sink->max_transient_error_timeout, sink->current_pos, + &sink->flushing); + + sink->current_pos += bytes_written; + } else { + sink->current_buffer_size += + gst_buffer_extract (buffer, 0, + sink->buffer + sink->current_buffer_size, buffer_size); + flow = GST_FLOW_OK; + } + } + } else { + for (i = 0; i < num_buffers; ++i) + gst_buffer_list_add (sink->buffer_list, + gst_buffer_ref (gst_buffer_list_get (buffer_list, i))); + sink->current_buffer_size += size; + + if (sink->current_buffer_size > sink->buffer_size) + flow = gst_file_sink_flush_buffer (sink); + else + flow = GST_FLOW_OK; + } } if (flow == GST_FLOW_OK && sync_after) { @@ -856,7 +921,8 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer) n_mem = gst_buffer_n_memory (buffer); - if (n_mem > 0 && (sync_after || !filesink->buffer)) { + if (n_mem > 0 && (sync_after || (!filesink->buffer + && !filesink->buffer_list))) { flow = gst_file_sink_flush_buffer (filesink); if (flow == GST_FLOW_OK) { guint64 bytes_written = 0; @@ -875,18 +941,51 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer) filesink->current_pos += bytes_written; } } else if (n_mem > 0) { + gsize size = gst_buffer_get_size (buffer); + GST_DEBUG_OBJECT (filesink, "Queueing buffer of %" G_GSIZE_FORMAT " bytes at offset %" - G_GUINT64_FORMAT, gst_buffer_get_size (buffer), + G_GUINT64_FORMAT, size, filesink->current_pos + filesink->current_buffer_size); - filesink->current_buffer_size += gst_buffer_get_size (buffer); - gst_buffer_list_add (filesink->buffer, gst_buffer_ref (buffer)); + if (filesink->buffer) { + if (filesink->current_buffer_size + size > + filesink->allocated_buffer_size) { + flow = gst_file_sink_flush_buffer (filesink); + if (flow != GST_FLOW_OK) + return flow; + } - if (filesink->current_buffer_size > filesink->buffer_size) - flow = gst_file_sink_flush_buffer (filesink); - else - flow = GST_FLOW_OK; + if (size > filesink->allocated_buffer_size) { + guint64 bytes_written = 0; + + GST_DEBUG_OBJECT (sink, + "writing buffer ( %" G_GSIZE_FORMAT + " bytes) at position %" G_GUINT64_FORMAT, + size, filesink->current_pos); + + flow = + gst_writev_buffer (GST_OBJECT_CAST (filesink), + fileno (filesink->file), NULL, buffer, &bytes_written, 0, + filesink->max_transient_error_timeout, filesink->current_pos, + &filesink->flushing); + + filesink->current_pos += bytes_written; + } else { + filesink->current_buffer_size += + gst_buffer_extract (buffer, 0, + filesink->buffer + filesink->current_buffer_size, size); + flow = GST_FLOW_OK; + } + } else { + filesink->current_buffer_size += gst_buffer_get_size (buffer); + gst_buffer_list_add (filesink->buffer_list, gst_buffer_ref (buffer)); + + if (filesink->current_buffer_size > filesink->buffer_size) + flow = gst_file_sink_flush_buffer (filesink); + else + flow = GST_FLOW_OK; + } } else { flow = GST_FLOW_OK; } diff --git a/plugins/elements/gstfilesink.h b/plugins/elements/gstfilesink.h index a5dc90bfb9..bc479c5430 100644 --- a/plugins/elements/gstfilesink.h +++ b/plugins/elements/gstfilesink.h @@ -81,8 +81,15 @@ struct _GstFileSink { gint buffer_mode; guint buffer_size; - GstBufferList *buffer; - guint current_buffer_size; + /* For default buffer mode */ + GstBufferList *buffer_list; + + /* For full buffer mode */ + guint8 *buffer; + gsize allocated_buffer_size; + + /* For default/full buffer mode */ + gsize current_buffer_size; gboolean append; gboolean o_sync;