From f076a9bf2f47a20a1025fe09f3071c47e4416365 Mon Sep 17 00:00:00 2001 From: Xavier Claessens Date: Fri, 22 Sep 2023 14:32:27 -0400 Subject: [PATCH] unixfd: Serialize buffer metas Serialize every GstMeta that supports serialization into the NEW_BUFFER payload. This is especially important for GstVideoMeta in the case of multiplanar buffers, or if stride!=width. Sponsored-by: Netflix Inc. Part-of: --- .../gst-plugins-bad/gst/unixfd/gstunixfd.c | 20 ++++---- .../gst-plugins-bad/gst/unixfd/gstunixfd.h | 18 +++---- .../gst/unixfd/gstunixfdsink.c | 50 ++++++++++++++----- .../gst-plugins-bad/gst/unixfd/gstunixfdsrc.c | 24 +++++++-- .../tests/check/elements/unixfd.c | 47 ++++++++++++++++- 5 files changed, 121 insertions(+), 38 deletions(-) diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c index b1f119a8f6..35555cd5a0 100644 --- a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c @@ -49,7 +49,7 @@ G_STATIC_ASSERT (sizeof (ReleaseBufferPayload) == 8); gboolean gst_unix_fd_send_command (GSocket * socket, CommandType type, GUnixFDList * fds, - const gchar * payload, gsize payload_size, GError ** error) + const guint8 * payload, gsize payload_size, GError ** error) { Command command = { type, payload_size }; GOutputVector vect[] = { @@ -76,7 +76,7 @@ gst_unix_fd_send_command (GSocket * socket, CommandType type, GUnixFDList * fds, gboolean gst_unix_fd_receive_command (GSocket * socket, GCancellable * cancellable, - CommandType * type, GUnixFDList ** fds, gchar ** payload, + CommandType * type, GUnixFDList ** fds, guint8 ** payload, gsize * payload_size, GError ** error) { Command command; @@ -98,8 +98,8 @@ gst_unix_fd_receive_command (GSocket * socket, GCancellable * cancellable, if (command.payload_size > 0) { *payload = g_malloc (command.payload_size); *payload_size = command.payload_size; - if (g_socket_receive (socket, *payload, command.payload_size, cancellable, - error) < (gssize) command.payload_size) { + if (g_socket_receive (socket, (gchar *) * payload, command.payload_size, + cancellable, error) < (gssize) command.payload_size) { g_clear_pointer (payload, g_free); ret = FALSE; goto out; @@ -133,8 +133,8 @@ out: } gboolean -gst_unix_fd_parse_new_buffer (gchar * payload, gsize payload_size, - NewBufferPayload ** new_buffer) +gst_unix_fd_parse_new_buffer (guint8 * payload, gsize payload_size, + NewBufferPayload ** new_buffer, guint32 * consumed) { if (payload == NULL || payload_size < sizeof (NewBufferPayload)) return FALSE; @@ -146,11 +146,13 @@ gst_unix_fd_parse_new_buffer (gchar * payload, gsize payload_size, if (payload_size < struct_size) return FALSE; + *consumed = struct_size; + return TRUE; } gboolean -gst_unix_fd_parse_release_buffer (gchar * payload, gsize payload_size, +gst_unix_fd_parse_release_buffer (guint8 * payload, gsize payload_size, ReleaseBufferPayload ** release_buffer) { if (payload == NULL || payload_size < sizeof (ReleaseBufferPayload)) @@ -162,12 +164,12 @@ gst_unix_fd_parse_release_buffer (gchar * payload, gsize payload_size, } gboolean -gst_unix_fd_parse_caps (gchar * payload, gsize payload_size, gchar ** caps_str) +gst_unix_fd_parse_caps (guint8 * payload, gsize payload_size, gchar ** caps_str) { if (payload == NULL || payload_size < 1 || payload[payload_size - 1] != '\0') return FALSE; - *caps_str = payload; + *caps_str = (gchar *) payload; return TRUE; } diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h index 47b2353a1f..849474ae11 100644 --- a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h @@ -57,7 +57,7 @@ typedef struct { guint32 flags; guint8 type; guint8 n_memory; - guint16 padding; + guint16 n_meta; MemoryPayload memories[]; } NewBufferPayload; @@ -66,17 +66,17 @@ typedef struct { } ReleaseBufferPayload; gboolean gst_unix_fd_send_command(GSocket * socket, CommandType type, - GUnixFDList * fds, const gchar * payload, gsize payload_size, + GUnixFDList * fds, const guint8 * payload, gsize payload_size, GError ** error); -gboolean gst_unix_fd_receive_command(GSocket * socket, - GCancellable * cancellable, CommandType *type, GUnixFDList ** fds, - gchar ** payload, gsize *payload_size, GError ** error); +gboolean gst_unix_fd_receive_command (GSocket *socket, + GCancellable *cancellable, CommandType *type, GUnixFDList **fds, + guint8 **payload, gsize *payload_size, GError **error); -gboolean gst_unix_fd_parse_new_buffer(gchar *payload, gsize payload_size, - NewBufferPayload **new_buffer); -gboolean gst_unix_fd_parse_release_buffer(gchar *payload, gsize payload_size, +gboolean gst_unix_fd_parse_new_buffer (guint8 *payload, gsize payload_size, + NewBufferPayload **new_buffer, guint32 *consumed); +gboolean gst_unix_fd_parse_release_buffer(guint8 *payload, gsize payload_size, ReleaseBufferPayload **release_buffer); -gboolean gst_unix_fd_parse_caps(gchar *payload, gsize payload_size, +gboolean gst_unix_fd_parse_caps (guint8 *payload, gsize payload_size, gchar **caps_str); GSocket *gst_unix_fd_socket_new(const gchar *socket_path, diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c index 8514c59143..0e7dd8cde8 100644 --- a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c @@ -86,6 +86,7 @@ struct _GstUnixFdSink GHashTable *clients; GstCaps *caps; gboolean uses_monotonic_clock; + GByteArray *payload; }; G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK); @@ -199,7 +200,7 @@ incoming_command_cb (GSocket * socket, GIOCondition cond, gpointer user_data) GstUnixFdSink *self = user_data; Client *client; CommandType command; - gchar *payload = NULL; + guint8 *payload = NULL; gsize payload_size; GError *error = NULL; @@ -264,12 +265,12 @@ on_error: return G_SOURCE_REMOVE; } -static gchar * +static guint8 * caps_to_payload (GstCaps * caps, gsize * payload_size) { gchar *payload = gst_caps_to_string (caps); *payload_size = strlen (payload) + 1; - return payload; + return (guint8 *) payload; } static gboolean @@ -303,7 +304,7 @@ new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data) * we don't want this client to miss a caps event or receive a buffer while we * send initial caps. */ gsize payload_size; - gchar *payload = caps_to_payload (self->caps, &payload_size); + guint8 *payload = caps_to_payload (self->caps, &payload_size); if (!gst_unix_fd_send_command (client_socket, COMMAND_TYPE_CAPS, NULL, payload, payload_size, &error)) { GST_ERROR_OBJECT (self, "Failed to send caps to new client %p: %s", client, @@ -365,6 +366,13 @@ gst_unix_fd_sink_start (GstBaseSink * bsink) self->thread = g_thread_new ("unixfdsink", thread_cb, self); + /* Preallocate the minimum payload size for a buffer with a single memory and + * no metas. Chances are that every buffer will require roughly the same + * payload size, by reusing the same GByteArray we avoid reallocations. */ + self->payload = + g_byte_array_sized_new (sizeof (NewBufferPayload) + + sizeof (MemoryPayload)); + out: GST_OBJECT_UNLOCK (self); g_clear_error (&error); @@ -385,6 +393,7 @@ gst_unix_fd_sink_stop (GstBaseSink * bsink) g_clear_object (&self->socket); gst_clear_caps (&self->caps); g_hash_table_remove_all (self->clients); + g_clear_pointer (&self->payload, g_byte_array_unref); if (self->socket_type == G_UNIX_SOCKET_ADDRESS_PATH) g_unlink (self->socket_path); @@ -394,7 +403,7 @@ gst_unix_fd_sink_stop (GstBaseSink * bsink) static void send_command_to_all (GstUnixFdSink * self, CommandType type, GUnixFDList * fds, - const gchar * payload, gsize payload_size, GstBuffer * buffer) + const guint8 * payload, gsize payload_size, GstBuffer * buffer) { GHashTableIter iter; GSocket *socket; @@ -435,6 +444,21 @@ calculate_timestamp (GstClockTime timestamp, GstClockTime base_time, return timestamp; } +static guint16 +serialize_metas (GstBuffer * buffer, GByteArray * payload) +{ + gpointer state = NULL; + GstMeta *meta; + guint16 n_meta = 0; + + while ((meta = gst_buffer_iterate_meta (buffer, &state)) != NULL) { + if (gst_meta_serialize_simple (meta, payload)) + n_meta++; + } + + return n_meta; +} + static GstFlowReturn gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) { @@ -442,11 +466,11 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GstFlowReturn ret = GST_FLOW_OK; GError *error = NULL; - /* Allocate payload */ guint n_memory = gst_buffer_n_memory (buffer); - gsize payload_size = + gsize struct_size = sizeof (NewBufferPayload) + sizeof (MemoryPayload) * n_memory; - gchar *payload = g_malloc0 (payload_size); + g_byte_array_set_size (self->payload, struct_size); + guint32 n_meta = serialize_metas (buffer, self->payload); GstClockTime latency = gst_base_sink_get_latency (GST_BASE_SINK_CAST (self)); GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT_CAST (self)); @@ -456,7 +480,7 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_clock_get_time (GST_ELEMENT_CLOCK (self))); } - NewBufferPayload *new_buffer = (NewBufferPayload *) payload; + NewBufferPayload *new_buffer = (NewBufferPayload *) self->payload->data; /* Cast buffer pointer to guint64 identifier. Client will send us back that * id so we know which buffer to unref. */ new_buffer->id = (guint64) buffer; @@ -472,6 +496,7 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) new_buffer->flags = GST_BUFFER_FLAGS (buffer); new_buffer->type = MEMORY_TYPE_DEFAULT; new_buffer->n_memory = n_memory; + new_buffer->n_meta = n_meta; gboolean dmabuf_count = 0; GUnixFDList *fds = g_unix_fd_list_new (); @@ -507,14 +532,13 @@ gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) new_buffer->type = MEMORY_TYPE_DMABUF; GST_OBJECT_LOCK (self); - send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds, payload, - payload_size, buffer); + send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds, + self->payload->data, self->payload->len, buffer); GST_OBJECT_UNLOCK (self); out: g_clear_object (&fds); g_clear_error (&error); - g_free (payload); return ret; } @@ -532,7 +556,7 @@ gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event) GST_DEBUG_OBJECT (self, "Send new caps to all clients: %" GST_PTR_FORMAT, self->caps); gsize payload_size; - gchar *payload = caps_to_payload (self->caps, &payload_size); + guint8 *payload = caps_to_payload (self->caps, &payload_size); send_command_to_all (self, COMMAND_TYPE_CAPS, NULL, payload, payload_size, NULL); g_free (payload); diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c index cb433803bc..39f375382d 100644 --- a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c @@ -103,7 +103,7 @@ memory_weak_ref_cb (GstUnixFdSrc * self, GstMemory * mem) ReleaseBufferPayload payload = { ctx->id }; GError *error = NULL; if (!gst_unix_fd_send_command (self->socket, COMMAND_TYPE_RELEASE_BUFFER, - NULL, (const gchar *) &payload, sizeof (payload), &error)) { + NULL, (guint8 *) & payload, sizeof (payload), &error)) { GST_WARNING_OBJECT (self, "Failed to send release-buffer command: %s", error->message); g_clear_error (&error); @@ -301,7 +301,7 @@ gst_unix_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GstUnixFdSrc *self = GST_UNIX_FD_SRC (psrc); CommandType command; GUnixFDList *fds = NULL; - gchar *payload = NULL; + guint8 *payload = NULL; gsize payload_size; GError *error = NULL; GstFlowReturn ret = GST_FLOW_OK; @@ -327,7 +327,9 @@ again: goto on_error; case COMMAND_TYPE_NEW_BUFFER:{ NewBufferPayload *new_buffer; - if (!gst_unix_fd_parse_new_buffer (payload, payload_size, &new_buffer)) { + guint32 payload_off = 0; + if (!gst_unix_fd_parse_new_buffer (payload, payload_size, &new_buffer, + &payload_off)) { GST_ERROR_OBJECT (self, "Received new-buffer with wrong payload size"); ret = GST_FLOW_ERROR; goto on_error; @@ -341,8 +343,8 @@ again: if (g_unix_fd_list_get_length (fds) != new_buffer->n_memory) { GST_ERROR_OBJECT (self, - "Received new buffer command with %d file descriptors instead of %d", - g_unix_fd_list_get_length (fds), new_buffer->n_memory); + "Received new buffer command with %d file descriptors instead of " + "%d", g_unix_fd_list_get_length (fds), new_buffer->n_memory); ret = GST_FLOW_ERROR; goto on_error; } @@ -379,6 +381,18 @@ again: GST_BUFFER_OFFSET_END (*outbuf) = new_buffer->offset_end; GST_BUFFER_FLAGS (*outbuf) = new_buffer->flags; + for (int i = 0; i < new_buffer->n_meta; i++) { + guint32 consumed = 0; + gst_meta_deserialize (*outbuf, (guint8 *) payload + payload_off, + payload_size - payload_off, &consumed); + if (consumed == 0) { + GST_ERROR_OBJECT (self, "Malformed meta serialization"); + ret = GST_FLOW_ERROR; + goto on_error; + } + payload_off += consumed; + } + GST_OBJECT_LOCK (self); for (int i = 0; i < new_buffer->n_memory; i++) { GstMemory *mem = gst_fd_allocator_alloc (allocator, fds_arr[i], diff --git a/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c b/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c index dcd0608c7b..ec98805dda 100644 --- a/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c +++ b/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c @@ -37,10 +37,26 @@ wait_preroll (GstElement * element) fail_unless (state_res == GST_STATE_CHANGE_SUCCESS); } +static GstPadProbeReturn +buffer_pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info); + if (buffer != NULL) { + GstCustomMeta *cmeta = + gst_buffer_add_custom_meta (buffer, "unix-fd-custom-meta"); + GstStructure *s = gst_custom_meta_get_structure (cmeta); + gst_structure_set (s, "field", G_TYPE_INT, 42, NULL); + } + return GST_PAD_PROBE_OK; +} + GST_START_TEST (test_unixfd_videotestsrc) { GError *error = NULL; + const gchar *tags[] = { NULL }; + gst_meta_register_custom ("unix-fd-custom-meta", tags, NULL, NULL, NULL); + /* Ensure we don't have socket from previous failed test */ gchar *socket_path = g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ()); @@ -50,15 +66,26 @@ GST_START_TEST (test_unixfd_videotestsrc) /* Setup source */ gchar *pipeline_str = - g_strdup_printf ("videotestsrc ! unixfdsink socket-path=%s", socket_path); + g_strdup_printf ("videotestsrc name=src ! unixfdsink socket-path=%s", + socket_path); GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error); g_assert_no_error (error); g_free (pipeline_str); + + /* Add a custom meta on each buffer */ + GstElement *src = gst_bin_get_by_name (GST_BIN (pipeline_service), "src"); + GstPad *pad = gst_element_get_static_pad (src, "src"); + gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER, buffer_pad_probe_cb, NULL, + NULL); + gst_object_unref (src); + gst_object_unref (pad); + wait_preroll (pipeline_service); /* Setup sink */ pipeline_str = - g_strdup_printf ("unixfdsrc socket-path=%s ! fakesink", socket_path); + g_strdup_printf ("unixfdsrc socket-path=%s ! fakesink name=sink", + socket_path); GstElement *pipeline_client_1 = gst_parse_launch (pipeline_str, &error); g_assert_no_error (error); wait_preroll (pipeline_client_1); @@ -73,6 +100,22 @@ GST_START_TEST (test_unixfd_videotestsrc) g_assert_no_error (error); wait_preroll (pipeline_client_2); + /* Check we received our custom meta */ + GstSample *sample; + GstElement *sink = gst_bin_get_by_name (GST_BIN (pipeline_client_2), "sink"); + g_object_get (sink, "last-sample", &sample, NULL); + fail_unless (sample); + GstBuffer *buffer = gst_sample_get_buffer (sample); + GstCustomMeta *cmeta = + gst_buffer_get_custom_meta (buffer, "unix-fd-custom-meta"); + fail_unless (cmeta); + GstStructure *s = gst_custom_meta_get_structure (cmeta); + gint value; + fail_unless (gst_structure_get_int (s, "field", &value)); + fail_unless_equals_int (value, 42); + gst_object_unref (sink); + gst_sample_unref (sample); + /* Teardown */ fail_unless (gst_element_set_state (pipeline_client_1, GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);