unifxfdsink: Add an property to allow copying

By design, unixfd is meant to be used for zero-copy and failing when the data is
not FD based memory is wanted to help debug pipelines. Though, there exists
cases, notably with RTP payloader and demuxers, where its not possible
to get all the data into FD memory through allocation queries.

To allow using unixfd for these cases, introduce a property on the unixfdsink
that enable copying the non FD data into freshly allocated memfd.

Co-authored-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/8861>
This commit is contained in:
Xavier Claessens 2025-04-17 15:41:05 -04:00 committed by GStreamer Marge Bot
parent d71d26d901
commit 352a8a8b12
9 changed files with 429 additions and 10 deletions

View File

@ -512,6 +512,10 @@ gst_shm_allocator_init_once() has not been previously called.</doc>
<type name="none" c:type="void"/>
</return-value>
</function>
<field name="parent_instance" version="1.28">
<doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/allocators/gstshmallocator.h">Parent instance.</doc>
<type name="FdAllocator" c:type="GstFdAllocator"/>
</field>
</class>
<record name="ShmAllocatorClass" c:type="GstShmAllocatorClass" glib:is-gtype-struct-for="ShmAllocator">
<source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/allocators/gstshmallocator.h"/>

View File

@ -249203,6 +249203,20 @@
}
},
"properties": {
"min-memory-size": {
"blurb": "Minimum size to allocate in the case a copy into shared memory is needed.",
"conditionally-available": false,
"construct": true,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "9223372036854775807",
"min": "-1",
"mutable": "null",
"readable": true,
"type": "gint64",
"writable": true
},
"socket-path": {
"blurb": "The path to the control socket used to control the shared memory transport. This may be modified during the NULL->READY transition",
"conditionally-available": false,

View File

@ -0,0 +1,150 @@
/* GStreamer unix file-descriptor source/sink
*
* Copyright (C) 2025 Netflix Inc.
* Author: Xavier Claessens <xclaessens@netflix.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "gstunixfdallocator.h"
struct _GstUnixFdAllocator
{
GstShmAllocator parent;
GMutex lock;
GList *pool;
gboolean flush;
};
G_DEFINE_TYPE (GstUnixFdAllocator, gst_unix_fd_allocator,
GST_TYPE_SHM_ALLOCATOR);
static gboolean
gst_unix_fd_allocator_mem_dispose (GstMiniObject * obj)
{
GstMemory *mem = GST_MEMORY_CAST (obj);
GstUnixFdAllocator *self = GST_UNIX_FD_ALLOCATOR (mem->allocator);
g_mutex_lock (&self->lock);
if (self->flush) {
g_mutex_unlock (&self->lock);
return TRUE;
}
gsize offset, maxsize;
gst_memory_get_sizes (mem, &offset, &maxsize);
gst_memory_resize (mem, -offset, maxsize);
self->pool = g_list_prepend (self->pool, gst_memory_ref (mem));
g_mutex_unlock (&self->lock);
return FALSE;
}
static GstMemory *
gst_unix_fd_allocator_alloc (GstAllocator * allocator, gsize size,
GstAllocationParams * params)
{
GstUnixFdAllocator *self = GST_UNIX_FD_ALLOCATOR (allocator);
gsize smallest_size = G_MAXSIZE;
GList *smallest_link = NULL;
/* Check if we have a memory big enough in our pool. */
g_mutex_lock (&self->lock);
for (GList * l = self->pool; l != NULL; l = l->next) {
GstMemory *mem = l->data;
gsize maxsize;
gst_memory_get_sizes (mem, NULL, &maxsize);
if (maxsize >= size) {
self->pool = g_list_delete_link (self->pool, l);
g_mutex_unlock (&self->lock);
return mem;
}
if (maxsize < smallest_size) {
smallest_size = maxsize;
smallest_link = l;
}
}
/* All our memories are too small. Delete the smallest one to converge to a
* size that will avoid re-allocations in the future. */
if (smallest_link != NULL) {
GstMemory *mem = smallest_link->data;
self->pool = g_list_delete_link (self->pool, smallest_link);
GST_MINI_OBJECT_CAST (mem)->dispose = NULL;
gst_memory_unref (mem);
}
g_mutex_unlock (&self->lock);
/* Allocate a new memory */
GstMemory *mem =
GST_ALLOCATOR_CLASS (gst_unix_fd_allocator_parent_class)->alloc
(allocator, size, params);
if (mem != NULL)
GST_MINI_OBJECT_CAST (mem)->dispose = gst_unix_fd_allocator_mem_dispose;
return mem;
}
static void
gst_unix_fd_allocator_finalize (GObject * object)
{
GstUnixFdAllocator *self = GST_UNIX_FD_ALLOCATOR (object);
g_mutex_clear (&self->lock);
G_OBJECT_CLASS (gst_unix_fd_allocator_parent_class)->finalize (object);
}
static void
gst_unix_fd_allocator_class_init (GstUnixFdAllocatorClass * klass)
{
GstAllocatorClass *alloc_class = (GstAllocatorClass *) klass;
GObjectClass *object_class = (GObjectClass *) klass;
object_class->finalize = gst_unix_fd_allocator_finalize;
alloc_class->alloc = GST_DEBUG_FUNCPTR (gst_unix_fd_allocator_alloc);
}
static void
gst_unix_fd_allocator_init (GstUnixFdAllocator * self)
{
g_mutex_init (&self->lock);
}
GstUnixFdAllocator *
gst_unix_fd_allocator_new (void)
{
return g_object_new (GST_TYPE_UNIX_FD_ALLOCATOR, NULL);
}
void
gst_unix_fd_allocator_flush (GstUnixFdAllocator * self)
{
g_return_if_fail (GST_IS_UNIX_FD_ALLOCATOR (self));
g_mutex_lock (&self->lock);
GList *pool = self->pool;
self->pool = NULL;
self->flush = TRUE;
g_mutex_unlock (&self->lock);
g_list_free_full (pool, (GDestroyNotify) gst_memory_unref);
}

View File

@ -0,0 +1,29 @@
/* GStreamer unix file-descriptor source/sink
*
* Copyright (C) 2025 Netflix Inc.
* Author: Xavier Claessens <xclaessens@netflix.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <gst/allocators/allocators.h>
#define GST_TYPE_UNIX_FD_ALLOCATOR gst_unix_fd_allocator_get_type()
G_DECLARE_FINAL_TYPE (GstUnixFdAllocator, gst_unix_fd_allocator,
GST, UNIX_FD_ALLOCATOR, GstShmAllocator);
GstUnixFdAllocator *gst_unix_fd_allocator_new (void);
void gst_unix_fd_allocator_flush (GstUnixFdAllocator * self);

View File

@ -44,6 +44,8 @@
#include "gstunixfd.h"
#include "gstunixfdallocator.h"
#include <gst/base/base.h>
#include <gst/allocators/allocators.h>
@ -91,6 +93,9 @@ struct _GstUnixFdSink
gboolean wait_for_connection;
GCond wait_for_connection_cond;
gboolean unlock;
GstUnixFdAllocator *allocator;
gint64 min_memory_size;
};
G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK);
@ -99,6 +104,7 @@ GST_ELEMENT_REGISTER_DEFINE (unixfdsink, "unixfdsink", GST_RANK_NONE,
#define DEFAULT_SOCKET_TYPE G_UNIX_SOCKET_ADDRESS_PATH
#define DEFAULT_WAIT_FOR_CONNECTION FALSE
#define DEFAULT_MIN_MEMORY_SIZE 0
enum
{
@ -106,6 +112,7 @@ enum
PROP_SOCKET_PATH,
PROP_SOCKET_TYPE,
PROP_WAIT_FOR_CONNECTION,
PROP_MIN_MEMORY_SIZE,
};
@ -118,6 +125,66 @@ client_free (Client * client)
g_free (client);
}
static GstMemory *
copy_to_shm (GstUnixFdSink * self, GstMemory * mem)
{
GST_OBJECT_LOCK (self);
if (self->min_memory_size < 0) {
GST_ERROR_OBJECT (self,
"Buffer has non-FD memories and copying is disabled. Set min-memory-size to a value >= 0 to allow copying.");
GST_OBJECT_UNLOCK (self);
return NULL;
}
if (self->allocator == NULL)
self->allocator = gst_unix_fd_allocator_new ();
gsize size = gst_memory_get_sizes (mem, NULL, NULL);
gsize alloc_size = MAX (size, self->min_memory_size);
GstMemory *fd_mem =
gst_allocator_alloc (GST_ALLOCATOR_CAST (self->allocator), alloc_size,
NULL);
GST_OBJECT_UNLOCK (self);
if (fd_mem == NULL) {
GST_ERROR_OBJECT (self, "Shared memory allocation failed.");
return NULL;
}
gst_memory_resize (fd_mem, 0, size);
GstMapInfo src_map, dst_map;
if (!gst_memory_map (mem, &src_map, GST_MAP_READ)) {
GST_ERROR_OBJECT (self, "Mapping of source memory failed.");
gst_memory_unref (fd_mem);
return NULL;
}
if (!gst_memory_map (fd_mem, &dst_map, GST_MAP_WRITE)) {
GST_ERROR_OBJECT (self, "Mapping of shared memory failed.");
gst_memory_unmap (mem, &src_map);
gst_memory_unref (fd_mem);
return NULL;
}
memcpy (dst_map.data, src_map.data, src_map.size);
gst_memory_unmap (mem, &src_map);
gst_memory_unmap (fd_mem, &dst_map);
return fd_mem;
}
static void
allocator_unref (GstUnixFdAllocator * allocator)
{
gst_unix_fd_allocator_flush (allocator);
g_object_unref (allocator);
}
static void
gst_unix_fd_sink_init (GstUnixFdSink * self)
{
@ -175,6 +242,10 @@ gst_unix_fd_sink_set_property (GObject * object, guint prop_id,
self->wait_for_connection = g_value_get_boolean (value);
g_cond_signal (&self->wait_for_connection_cond);
break;
case PROP_MIN_MEMORY_SIZE:
self->min_memory_size = g_value_get_int64 (value);
g_clear_pointer (&self->allocator, allocator_unref);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -201,6 +272,9 @@ gst_unix_fd_sink_get_property (GObject * object, guint prop_id,
case PROP_WAIT_FOR_CONNECTION:
g_value_set_boolean (value, self->wait_for_connection);
break;
case PROP_MIN_MEMORY_SIZE:
g_value_set_int64 (value, self->min_memory_size);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -534,14 +608,29 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
return GST_FLOW_ERROR;
}
gboolean dmabuf_count = 0;
/* dst_buffer is used to hold reference on new GstMemory we'll create, if any.
* ref_original_buffer is set to TRUE if dst_buffer also needs to hold
* reference on the original buffer. */
GstBuffer *dst_buffer = NULL;
gboolean ref_original_buffer = FALSE;
gint dmabuf_count = 0;
GUnixFDList *fds = g_unix_fd_list_new ();
for (int i = 0; i < n_memory; i++) {
GstMemory *mem = gst_buffer_peek_memory (buffer, i);
if (!gst_is_fd_memory (mem)) {
GST_ERROR_OBJECT (self, "Expecting buffers with FD memories");
ret = GST_FLOW_ERROR;
goto out;
if (dst_buffer == NULL)
dst_buffer = gst_buffer_new ();
mem = copy_to_shm (self, mem);
if (mem == NULL) {
ret = GST_FLOW_ERROR;
goto out;
}
gst_buffer_append_memory (dst_buffer, mem);
} else {
ref_original_buffer = TRUE;
}
if (gst_is_dmabuf_memory (mem))
@ -567,6 +656,13 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
if (dmabuf_count > 0)
new_buffer->type = MEMORY_TYPE_DMABUF;
if (dst_buffer != NULL) {
new_buffer->id = (guint64) dst_buffer;
if (ref_original_buffer)
gst_buffer_add_parent_buffer_meta (dst_buffer, buffer);
buffer = dst_buffer;
}
GST_OBJECT_LOCK (self);
while (self->wait_for_connection && g_hash_table_size (self->clients) == 0) {
@ -586,6 +682,7 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
GST_OBJECT_UNLOCK (self);
out:
gst_clear_buffer (&dst_buffer);
g_clear_object (&fds);
g_clear_error (&error);
return ret;
@ -634,6 +731,9 @@ gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event)
send_command_to_all (self, COMMAND_TYPE_CAPS, NULL, payload, payload_size,
NULL);
g_free (payload);
/* New caps could mean new buffer size, or even no copies needed anymore.
* We'll create a new pool if still needed. */
g_clear_pointer (&self->allocator, allocator_unref);
GST_OBJECT_UNLOCK (self);
break;
}
@ -677,6 +777,26 @@ gst_unix_fd_sink_set_clock (GstElement * element, GstClock * clock)
clock);
}
static GstStateChangeReturn
gst_unix_fd_sink_change_state (GstElement * element, GstStateChange transition)
{
GstUnixFdSink *self = (GstUnixFdSink *) element;
GstStateChangeReturn ret =
GST_ELEMENT_CLASS (gst_unix_fd_sink_parent_class)->change_state (element,
transition);
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
g_clear_pointer (&self->allocator, allocator_unref);
break;
default:
break;
}
return ret;
}
static void
gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
{
@ -698,6 +818,8 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
gobject_class->get_property = gst_unix_fd_sink_get_property;
gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_set_clock);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_unix_fd_sink_change_state);
gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_start);
gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_stop);
@ -738,4 +860,25 @@ gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass)
"Block the stream until a least one client is connected",
DEFAULT_WAIT_FOR_CONNECTION,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
/**
* GstUnixFdSink:min-memory-size:
*
* Minimum size to allocate in the case a copy into shared memory is needed.
* Memories are kept in a pool and reused when possible.
*
* A value of 0 (the default) means only the needed size is allocated which
* reduces the possibility of reusing the memory in the case not all buffers
* need the same size.
*
* A negative value disables copying and the pipeline will stop with an error
* in the case a copy into shared memory is needed.
*
* Since: 1.28
*/
g_object_class_install_property (gobject_class, PROP_MIN_MEMORY_SIZE,
g_param_spec_int64 ("min-memory-size", "Minimum memory size",
"Minimum size to allocate in the case a copy into shared memory is needed.",
-1, G_MAXINT64, DEFAULT_MIN_MEMORY_SIZE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
}

View File

@ -1,5 +1,6 @@
unixfd_sources = [
'gstunixfd.c',
'gstunixfdallocator.c',
'gstunixfdsink.c',
'gstunixfdsrc.c',
]

View File

@ -224,6 +224,76 @@ GST_START_TEST (test_unixfd_segment)
GST_END_TEST;
GST_START_TEST (test_unixfd_copy)
{
GError *error = NULL;
/* Ensure we don't have socket from previous failed test */
gchar *tempdir = g_dir_make_tmp ("unixfd-test-XXXXXX", &error);
g_assert_no_error (error);
gchar *socket_path = g_strdup_printf ("%s/socket", tempdir);
GstCaps *caps = gst_caps_new_empty_simple ("video/x-raw");
/* Setup service */
gchar *pipeline_str =
g_strdup_printf
("appsrc name=src format=time ! unixfdsink socket-path=%s sync=false async=false wait-for-connection=true",
socket_path);
GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error);
g_assert_no_error (error);
fail_unless (gst_element_set_state (pipeline_service,
GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS);
GstElement *appsrc = gst_bin_get_by_name (GST_BIN (pipeline_service), "src");
gst_object_unref (appsrc);
g_free (pipeline_str);
/* Setup client */
pipeline_str =
g_strdup_printf
("unixfdsrc socket-path=%s ! appsink name=sink sync=false async=false",
socket_path);
GstElement *pipeline_client = gst_parse_launch (pipeline_str, &error);
g_assert_no_error (error);
fail_unless (gst_element_set_state (pipeline_client,
GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS);
GstElement *appsink = gst_bin_get_by_name (GST_BIN (pipeline_client), "sink");
gst_object_unref (appsink);
g_free (pipeline_str);
/* Send a buffer with system memory */
GstSegment segment;
gst_segment_init (&segment, GST_FORMAT_TIME);
const char content[] = "Hello world!";
GstBuffer *buf = gst_buffer_new_memdup (content, strlen (content));
GstSample *sample = gst_sample_new (buf, caps, &segment, NULL);
gst_app_src_push_sample (GST_APP_SRC (appsrc), sample);
gst_sample_unref (sample);
gst_buffer_unref (buf);
/* Wait for it */
sample = gst_app_sink_pull_sample (GST_APP_SINK (appsink));
buf = gst_sample_get_buffer (sample);
fail_unless (gst_buffer_memcmp (buf, 0, content, strlen (content)) == 0);
gst_sample_unref (sample);
/* Teardown */
fail_unless (gst_element_set_state (pipeline_client,
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
fail_unless (gst_element_set_state (pipeline_service,
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
g_rmdir (tempdir);
g_free (tempdir);
gst_object_unref (pipeline_service);
gst_object_unref (pipeline_client);
g_free (socket_path);
gst_caps_unref (caps);
}
GST_END_TEST;
static Suite *
unixfd_suite (void)
{
@ -233,6 +303,7 @@ unixfd_suite (void)
suite_add_tcase (s, tc);
tcase_add_test (tc, test_unixfd_videotestsrc);
tcase_add_test (tc, test_unixfd_segment);
tcase_add_test (tc, test_unixfd_copy);
return s;
}

View File

@ -54,11 +54,6 @@
#include <sys/mman.h>
#endif
struct _GstShmAllocator
{
GstFdAllocator parent;
};
#define GST_CAT_DEFAULT gst_shm_debug
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);

View File

@ -46,6 +46,14 @@ G_BEGIN_DECLS
*/
/**
* GstShmAllocator.parent_instance:
*
* Parent instance.
*
* Since: 1.28
*/
/**
* GstShmAllocatorClass.parent_class:
*
* Parent Class.
@ -62,7 +70,11 @@ G_BEGIN_DECLS
*/
#define GST_TYPE_SHM_ALLOCATOR gst_shm_allocator_get_type ()
GST_ALLOCATORS_API
G_DECLARE_FINAL_TYPE (GstShmAllocator, gst_shm_allocator, GST, SHM_ALLOCATOR, GstFdAllocator)
G_DECLARE_DERIVABLE_TYPE (GstShmAllocator, gst_shm_allocator, GST, SHM_ALLOCATOR, GstFdAllocator)
struct _GstShmAllocatorClass {
GstFdAllocatorClass parent_class;
};
GST_ALLOCATORS_API void gst_shm_allocator_init_once (void);
GST_ALLOCATORS_API GstAllocator* gst_shm_allocator_get (void);