diff --git a/subprojects/gst-plugins-bad/gst/meson.build b/subprojects/gst-plugins-bad/gst/meson.build index 4e5226b317..699560f9b9 100644 --- a/subprojects/gst-plugins-bad/gst/meson.build +++ b/subprojects/gst-plugins-bad/gst/meson.build @@ -11,7 +11,7 @@ foreach plugin : ['accurip', 'adpcmdec', 'adpcmenc', 'aiff', 'asfmux', 'mxf', 'netsim', 'onvif', 'pcapparse', 'pnm', 'proxy', 'rawparse', 'removesilence', 'rist', 'rtmp2', 'rtp', 'sdp', 'segmentclip', 'siren', 'smooth', 'speed', 'subenc', 'switchbin', - 'timecode', 'transcode', 'videofilters', + 'timecode', 'transcode', 'unixfd', 'videofilters', 'videoframe_audiolevel', 'videoparsers', 'videosignal', 'vmnc', 'y4m'] if not get_option(plugin).disabled() diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c new file mode 100644 index 0000000000..fdd56ea2d4 --- /dev/null +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.c @@ -0,0 +1,189 @@ +/* GStreamer unix file-descriptor source/sink + * + * Copyright (C) 2023 Netflix Inc. + * Author: Xavier Claessens + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * plugin-unixfd: + * + * Since: 1.24 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstunixfd.h" + +#include + +typedef struct +{ + guint32 type; + guint32 payload_size; +} Command; + +/* For backward compatibility, do not change the size of Command. It should have + * same size on 32bits and 64bits arches. */ +G_STATIC_ASSERT (sizeof (Command) == 8); +G_STATIC_ASSERT (sizeof (MemoryPayload) == 16); +G_STATIC_ASSERT (sizeof (NewBufferPayload) == 56); +G_STATIC_ASSERT (sizeof (ReleaseBufferPayload) == 8); + +gboolean +gst_unix_fd_send_command (GSocket * socket, CommandType type, GUnixFDList * fds, + const gchar * payload, gsize payload_size, GError ** error) +{ + Command command = { type, payload_size }; + GOutputVector vect[] = { + {&command, sizeof (Command)}, + {payload, payload_size}, + }; + GSocketControlMessage *msg = NULL; + gint num_msg = 0; + gboolean ret = TRUE; + + if (fds != NULL) { + msg = g_unix_fd_message_new_with_fd_list (fds); + num_msg++; + } + + if (g_socket_send_message (socket, NULL, vect, G_N_ELEMENTS (vect), + &msg, num_msg, G_SOCKET_MSG_NONE, NULL, error) < 0) { + ret = FALSE; + } + + g_clear_object (&msg); + return ret; +} + +gboolean +gst_unix_fd_receive_command (GSocket * socket, GCancellable * cancellable, + CommandType * type, GUnixFDList ** fds, gchar ** payload, + gsize * payload_size, GError ** error) +{ + Command command; + GInputVector vect = { &command, sizeof (Command) }; + GSocketControlMessage **msg = NULL; + gint num_msg = 0; + gint flags = G_SOCKET_MSG_NONE; + gboolean ret = TRUE; + + if (g_socket_receive_message (socket, NULL, &vect, 1, &msg, &num_msg, &flags, + cancellable, error) <= 0) { + return FALSE; + } + + *type = command.type; + *payload = NULL; + *payload_size = 0; + + if (command.payload_size > 0) { + *payload = g_malloc (command.payload_size); + *payload_size = command.payload_size; + if (g_socket_receive (socket, *payload, command.payload_size, cancellable, + error) < (gssize) command.payload_size) { + g_clear_pointer (payload, g_free); + ret = FALSE; + goto out; + } + } + + if (fds != NULL) { + *fds = NULL; + for (int i = 0; i < num_msg; i++) { + if (G_IS_UNIX_FD_MESSAGE (msg[i])) { + if (*fds != NULL) { + g_set_error (error, GST_STREAM_ERROR, GST_STREAM_ERROR_FAILED, + "Received more than one fd message"); + g_clear_pointer (payload, g_free); + g_clear_object (fds); + ret = FALSE; + goto out; + } + *fds = g_object_ref (g_unix_fd_message_get_fd_list ((GUnixFDMessage *) + msg[i])); + } + } + } + +out: + for (int i = 0; i < num_msg; i++) + g_object_unref (msg[i]); + g_free (msg); + + return ret; +} + +gboolean +gst_unix_fd_parse_new_buffer (gchar * payload, gsize payload_size, + NewBufferPayload ** new_buffer) +{ + if (payload == NULL || payload_size < sizeof (NewBufferPayload)) + return FALSE; + + *new_buffer = (NewBufferPayload *) payload; + gsize struct_size = + sizeof (NewBufferPayload) + + sizeof (MemoryPayload) * (*new_buffer)->n_memory; + if (payload_size < struct_size) + return FALSE; + + return TRUE; +} + +gboolean +gst_unix_fd_parse_release_buffer (gchar * payload, gsize payload_size, + ReleaseBufferPayload ** release_buffer) +{ + if (payload == NULL || payload_size < sizeof (ReleaseBufferPayload)) + return FALSE; + + *release_buffer = (ReleaseBufferPayload *) payload; + + return TRUE; +} + +gboolean +gst_unix_fd_parse_caps (gchar * payload, gsize payload_size, gchar ** caps_str) +{ + if (payload == NULL || payload_size < 1 || payload[payload_size - 1] != '\0') + return FALSE; + + *caps_str = payload; + + return TRUE; +} + +static gboolean +plugin_init (GstPlugin * plugin) +{ + gboolean ret = FALSE; + + ret |= GST_ELEMENT_REGISTER (unixfdsrc, plugin); + ret |= GST_ELEMENT_REGISTER (unixfdsink, plugin); + + return ret; +} + +GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, + GST_VERSION_MINOR, + unixfd, + "Unix file descriptor sink and source", + plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h new file mode 100644 index 0000000000..0eab2b46f7 --- /dev/null +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfd.h @@ -0,0 +1,78 @@ +/* GStreamer unix file-descriptor source/sink + * + * Copyright (C) 2023 Netflix Inc. + * Author: Xavier Claessens + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#pragma once + +#include +#include +#include + +G_BEGIN_DECLS + +typedef enum +{ + COMMAND_TYPE_NEW_BUFFER = 0, + COMMAND_TYPE_RELEASE_BUFFER = 1, + COMMAND_TYPE_CAPS = 2, + COMMAND_TYPE_EOS = 3, +} CommandType; + +typedef struct { + guint64 size; + guint64 offset; +} MemoryPayload; + +typedef struct { + guint64 id; + guint64 pts; + guint64 dts; + guint64 duration; + guint64 offset; + guint64 offset_end; + guint32 flags; + guint8 n_memory; + guint8 padding8; + guint16 padding16; + MemoryPayload memories[]; +} NewBufferPayload; + +typedef struct { + guint64 id; +} ReleaseBufferPayload; + +gboolean gst_unix_fd_send_command(GSocket * socket, CommandType type, + GUnixFDList * fds, const gchar * payload, gsize payload_size, + GError ** error); +gboolean gst_unix_fd_receive_command(GSocket * socket, + GCancellable * cancellable, CommandType *type, GUnixFDList ** fds, + gchar ** payload, gsize *payload_size, GError ** error); + +gboolean gst_unix_fd_parse_new_buffer(gchar *payload, gsize payload_size, + NewBufferPayload **new_buffer); +gboolean gst_unix_fd_parse_release_buffer(gchar *payload, gsize payload_size, + ReleaseBufferPayload **release_buffer); +gboolean gst_unix_fd_parse_caps(gchar *payload, gsize payload_size, + gchar **caps_str); + +GST_ELEMENT_REGISTER_DECLARE (unixfdsrc); +GST_ELEMENT_REGISTER_DECLARE (unixfdsink); + +G_END_DECLS diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c new file mode 100644 index 0000000000..b35d1c1314 --- /dev/null +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsink.c @@ -0,0 +1,596 @@ +/* GStreamer unix file-descriptor source/sink + * + * Copyright (C) 2023 Netflix Inc. + * Author: Xavier Claessens + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-unixfdsink + * @title: unixfdsink + * + * Send file-descriptor backed buffers (e.g. memfd, dmabuf) over unix socket to + * matching unixfdsrc. There can be any number of clients, if none are connected + * buffers are dropped. + * + * Buffers can have any number of #GstMemory, but it is an error if any one of + * them lacks a file-descriptor. + * + * #GstShmAllocator is added into the allocation proposition, which makes + * most sources write their data into shared memory automatically. + * + * ## Example launch lines + * |[ + * gst-launch-1.0 -v videotestsrc ! unixfdsink socket-path=/tmp/blah + * gst-launch-1.0 -v unixfdsrc socket-path=/tmp/blah ! autovideosink + * ]| + * + * Since: 1.24 + */ + +#include "gstunixfd.h" + +#include +#include + +#include +#include +#include + +GST_DEBUG_CATEGORY (unixfdsink_debug); +#define GST_CAT_DEFAULT (unixfdsink_debug) + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define GST_TYPE_UNIX_FD_SINK gst_unix_fd_sink_get_type() +G_DECLARE_FINAL_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST, UNIX_FD_SINK, + GstBaseSink); + +typedef struct +{ + GHashTable *buffers; + GSource *source; +} Client; + +struct _GstUnixFdSink +{ + GstBaseSink parent; + + GThread *thread; + GMainContext *context; + GMainLoop *loop; + + gchar *socket_path; + GSocket *socket; + GSource *source; + + /* GSocket -> Client */ + GHashTable *clients; + GstCaps *caps; + gboolean uses_monotonic_clock; +}; + +G_DEFINE_TYPE (GstUnixFdSink, gst_unix_fd_sink, GST_TYPE_BASE_SINK); +GST_ELEMENT_REGISTER_DEFINE (unixfdsink, "unixfdsink", GST_RANK_NONE, + GST_TYPE_UNIX_FD_SINK); + +enum +{ + PROP_0, + PROP_SOCKET_PATH, +}; + + +static void +client_free (Client * client) +{ + g_hash_table_unref (client->buffers); + g_source_destroy (client->source); + g_source_unref (client->source); + g_free (client); +} + +static void +gst_unix_fd_sink_init (GstUnixFdSink * self) +{ + g_return_if_fail (GST_IS_UNIX_FD_SINK (self)); + + self->context = g_main_context_new (); + self->loop = g_main_loop_new (self->context, FALSE); + self->clients = + g_hash_table_new_full (NULL, NULL, g_object_unref, + (GDestroyNotify) client_free); +} + +static void +gst_unix_fd_sink_finalize (GObject * object) +{ + GstUnixFdSink *self = GST_UNIX_FD_SINK (object); + + g_free (self->socket_path); + g_main_context_unref (self->context); + g_main_loop_unref (self->loop); + g_hash_table_unref (self->clients); + + G_OBJECT_CLASS (gst_unix_fd_sink_parent_class)->finalize (object); +} + +static void +gst_unix_fd_sink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstUnixFdSink *self = GST_UNIX_FD_SINK (object); + + GST_OBJECT_LOCK (self); + + switch (prop_id) { + case PROP_SOCKET_PATH: + if (self->socket) { + GST_WARNING_OBJECT (self, + "Can only change socket path in NULL or READY state"); + break; + } + g_free (self->socket_path); + self->socket_path = g_value_dup_string (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_OBJECT_UNLOCK (self); +} + +static void +gst_unix_fd_sink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstUnixFdSink *self = GST_UNIX_FD_SINK (object); + + GST_OBJECT_LOCK (self); + + switch (prop_id) { + case PROP_SOCKET_PATH: + g_value_set_string (value, self->socket_path); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_OBJECT_UNLOCK (self); +} + +static gboolean +incoming_command_cb (GSocket * socket, GIOCondition cond, gpointer user_data) +{ + GstUnixFdSink *self = user_data; + Client *client; + CommandType command; + gchar *payload = NULL; + gsize payload_size; + GError *error = NULL; + + GST_OBJECT_LOCK (self); + + client = g_hash_table_lookup (self->clients, socket); + + if (client == NULL) { + GST_ERROR_OBJECT (self, "Received data from unknown client"); + goto on_error; + } + + if (!gst_unix_fd_receive_command (socket, NULL, &command, NULL, &payload, + &payload_size, &error)) { + GST_DEBUG_OBJECT (self, "Failed to receive message from client %p: %s", + client, error != NULL ? error->message : "Connection closed by peer"); + goto on_error; + } + + switch (command) { + case COMMAND_TYPE_NEW_BUFFER: + case COMMAND_TYPE_CAPS: + GST_ERROR_OBJECT (self, "Received wrong command %d from client %p", + command, client); + goto on_error; + case COMMAND_TYPE_RELEASE_BUFFER:{ + ReleaseBufferPayload *release_buffer; + if (!gst_unix_fd_parse_release_buffer (payload, payload_size, + &release_buffer)) { + GST_ERROR_OBJECT (self, + "Received release-buffer with wrong payload size from client %p", + client); + goto on_error; + } + /* id is actually the GstBuffer pointer casted to guint64. + * We can now drop its reference kept for this client. */ + if (!g_hash_table_remove (client->buffers, (gpointer) release_buffer->id)) { + GST_ERROR_OBJECT (self, + "Received wrong id %" G_GUINT64_FORMAT + " in release-buffer command from client %p", release_buffer->id, + client); + goto on_error; + } + break; + } + default: + /* Protocol could have been extended with new command */ + GST_DEBUG_OBJECT (self, "Ignoring unknown command %d", command); + break; + } + + g_free (payload); + GST_OBJECT_UNLOCK (self); + + return G_SOURCE_CONTINUE; + +on_error: + g_hash_table_remove (self->clients, socket); + g_clear_error (&error); + g_free (payload); + GST_OBJECT_UNLOCK (self); + return G_SOURCE_REMOVE; +} + +static gchar * +caps_to_payload (GstCaps * caps, gsize * payload_size) +{ + gchar *payload = gst_caps_to_string (caps); + *payload_size = strlen (payload) + 1; + return payload; +} + +static gboolean +new_client_cb (GSocket * socket, GIOCondition cond, gpointer user_data) +{ + GstUnixFdSink *self = user_data; + Client *client; + GError *error = NULL; + + GSocket *client_socket = g_socket_accept (self->socket, NULL, &error); + if (client_socket == NULL) { + GST_ERROR_OBJECT (self, "Failed to accept connection: %s", error->message); + return G_SOURCE_CONTINUE; + } + + client = g_new0 (Client, 1); + client->buffers = + g_hash_table_new_full (NULL, NULL, (GDestroyNotify) gst_buffer_unref, + NULL); + client->source = g_socket_create_source (client_socket, G_IO_IN, NULL); + g_source_set_callback (client->source, (GSourceFunc) incoming_command_cb, + self, NULL); + g_source_attach (client->source, self->context); + + GST_OBJECT_LOCK (self); + + GST_DEBUG_OBJECT (self, "New client %p", client); + g_hash_table_insert (self->clients, client_socket, client); + + /* Start by sending our current caps. Keep the lock while doing that because + * we don't want this client to miss a caps event or receive a buffer while we + * send initial caps. */ + gsize payload_size; + gchar *payload = caps_to_payload (self->caps, &payload_size); + if (!gst_unix_fd_send_command (client_socket, COMMAND_TYPE_CAPS, NULL, + payload, payload_size, &error)) { + GST_ERROR_OBJECT (self, "Failed to send caps to new client %p: %s", client, + error->message); + g_hash_table_remove (self->clients, client_socket); + g_clear_error (&error); + } + g_free (payload); + + GST_OBJECT_UNLOCK (self); + + return G_SOURCE_CONTINUE; +} + +static gpointer +thread_cb (gpointer user_data) +{ + GstUnixFdSink *self = user_data; + g_main_loop_run (self->loop); + return NULL; +} + +static gboolean +gst_unix_fd_sink_start (GstBaseSink * bsink) +{ + GstUnixFdSink *self = (GstUnixFdSink *) bsink; + GSocketAddress *addr = NULL; + GError *error = NULL; + gboolean ret = TRUE; + + GST_OBJECT_LOCK (self); + + if (self->socket_path == NULL) { + GST_ERROR_OBJECT (self, "Socket path is NULL"); + ret = FALSE; + goto out; + } + + addr = g_unix_socket_address_new (self->socket_path); + + self->socket = + g_socket_new (G_SOCKET_FAMILY_UNIX, G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_DEFAULT, &error); + if (self->socket == NULL) { + GST_ERROR_OBJECT (self, "Failed to create UNIX socket: %s", error->message); + ret = FALSE; + goto out; + } + + if (!g_socket_bind (self->socket, addr, TRUE, &error)) { + GST_ERROR_OBJECT (self, "Failed to bind socket: %s", error->message); + g_clear_object (&self->socket); + ret = FALSE; + goto out; + } + + if (!g_socket_listen (self->socket, &error)) { + GST_ERROR_OBJECT (self, "Failed to listen socket: %s", error->message); + g_clear_object (&self->socket); + ret = FALSE; + goto out; + } + + self->source = g_socket_create_source (self->socket, G_IO_IN, NULL); + g_source_set_callback (self->source, (GSourceFunc) new_client_cb, self, NULL); + g_source_attach (self->source, self->context); + + self->thread = g_thread_new ("unixfdsink", thread_cb, self); + +out: + GST_OBJECT_UNLOCK (self); + g_clear_error (&error); + g_clear_object (&addr); + return ret; +} + +static gboolean +gst_unix_fd_sink_stop (GstBaseSink * bsink) +{ + GstUnixFdSink *self = (GstUnixFdSink *) bsink; + + g_main_loop_quit (self->loop); + g_thread_join (self->thread); + + g_source_destroy (self->source); + g_clear_pointer (&self->source, g_source_unref); + g_clear_object (&self->socket); + gst_clear_caps (&self->caps); + g_hash_table_remove_all (self->clients); + g_unlink (self->socket_path); + + return TRUE; +} + +static void +send_command_to_all (GstUnixFdSink * self, CommandType type, GUnixFDList * fds, + const gchar * payload, gsize payload_size, GstBuffer * buffer) +{ + GHashTableIter iter; + GSocket *socket; + Client *client; + GError *error = NULL; + + g_hash_table_iter_init (&iter, self->clients); + while (g_hash_table_iter_next (&iter, (gpointer) & socket, + (gpointer) & client)) { + if (!gst_unix_fd_send_command (socket, type, fds, payload, payload_size, + &error)) { + GST_ERROR_OBJECT (self, "Failed to send command %d to client %p: %s", + type, client, error->message); + g_clear_error (&error); + g_hash_table_iter_remove (&iter); + continue; + } + /* Keep a ref on this buffer until all clients released it. */ + if (buffer != NULL) + g_hash_table_add (client->buffers, gst_buffer_ref (buffer)); + } +} + +static GstClockTime +calculate_timestamp (GstClockTime timestamp, GstClockTime base_time, + GstClockTime latency, GstClockTimeDiff clock_diff) +{ + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* Convert running time to pipeline clock time */ + timestamp += base_time; + if (GST_CLOCK_TIME_IS_VALID (latency)) + timestamp += latency; + /* Convert to system monotonic clock time */ + if (clock_diff < 0 && -clock_diff > timestamp) + return 0; + timestamp += clock_diff; + } + return timestamp; +} + +static GstFlowReturn +gst_unix_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer) +{ + GstUnixFdSink *self = (GstUnixFdSink *) bsink; + GstFlowReturn ret = GST_FLOW_OK; + GError *error = NULL; + + /* Allocate payload */ + guint n_memory = gst_buffer_n_memory (buffer); + gsize payload_size = + sizeof (NewBufferPayload) + sizeof (MemoryPayload) * n_memory; + gchar *payload = g_malloc0 (payload_size); + + GstClockTime latency = gst_base_sink_get_latency (GST_BASE_SINK_CAST (self)); + GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT_CAST (self)); + GstClockTimeDiff clock_diff = 0; + if (!self->uses_monotonic_clock) { + clock_diff = GST_CLOCK_DIFF (g_get_monotonic_time () * GST_USECOND, + gst_clock_get_time (GST_ELEMENT_CLOCK (self))); + } + + NewBufferPayload *new_buffer = (NewBufferPayload *) payload; + /* Cast buffer pointer to guint64 identifier. Client will send us back that + * id so we know which buffer to unref. */ + new_buffer->id = (guint64) buffer; + new_buffer->pts = + calculate_timestamp (GST_BUFFER_PTS (buffer), base_time, latency, + clock_diff); + new_buffer->dts = + calculate_timestamp (GST_BUFFER_DTS (buffer), base_time, latency, + clock_diff); + new_buffer->duration = GST_BUFFER_DURATION (buffer); + new_buffer->offset = GST_BUFFER_OFFSET (buffer); + new_buffer->offset_end = GST_BUFFER_OFFSET_END (buffer); + new_buffer->flags = GST_BUFFER_FLAGS (buffer); + new_buffer->n_memory = n_memory; + + GUnixFDList *fds = g_unix_fd_list_new (); + for (int i = 0; i < n_memory; i++) { + GstMemory *mem = gst_buffer_peek_memory (buffer, i); + if (!gst_is_fd_memory (mem)) { + GST_ERROR_OBJECT (self, "Expecting buffers with FD memories"); + ret = GST_FLOW_ERROR; + goto out; + } + + if (g_unix_fd_list_append (fds, gst_fd_memory_get_fd (mem), &error) < 0) { + GST_ERROR_OBJECT (self, "Failed to append FD: %s", error->message); + ret = GST_FLOW_ERROR; + goto out; + } + + gsize offset; + new_buffer->memories[i].size = gst_memory_get_sizes (mem, &offset, NULL); + new_buffer->memories[i].offset = offset; + } + + GST_OBJECT_LOCK (self); + send_command_to_all (self, COMMAND_TYPE_NEW_BUFFER, fds, payload, + payload_size, buffer); + GST_OBJECT_UNLOCK (self); + +out: + g_clear_object (&fds); + g_clear_error (&error); + g_free (payload); + return ret; +} + +static gboolean +gst_unix_fd_sink_event (GstBaseSink * bsink, GstEvent * event) +{ + GstUnixFdSink *self = (GstUnixFdSink *) bsink; + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_CAPS:{ + GST_OBJECT_LOCK (self); + gst_clear_caps (&self->caps); + gst_event_parse_caps (event, &self->caps); + gst_caps_ref (self->caps); + GST_DEBUG_OBJECT (self, "Send new caps to all clients: %" GST_PTR_FORMAT, + self->caps); + gsize payload_size; + gchar *payload = caps_to_payload (self->caps, &payload_size); + send_command_to_all (self, COMMAND_TYPE_CAPS, NULL, payload, payload_size, + NULL); + g_free (payload); + GST_OBJECT_UNLOCK (self); + break; + } + case GST_EVENT_EOS:{ + GST_OBJECT_LOCK (self); + send_command_to_all (self, COMMAND_TYPE_EOS, NULL, NULL, 0, NULL); + GST_OBJECT_UNLOCK (self); + break; + } + default: + break; + } + + return GST_BASE_SINK_CLASS (gst_unix_fd_sink_parent_class)->event (bsink, + event); +} + +static gboolean +gst_unix_fd_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query) +{ + GstAllocator *allocator = gst_shm_allocator_get (); + gst_query_add_allocation_param (query, allocator, NULL); + gst_object_unref (allocator); + + return TRUE; +} + +static gboolean +gst_unix_fd_sink_set_clock (GstElement * element, GstClock * clock) +{ + GstUnixFdSink *self = (GstUnixFdSink *) element; + + self->uses_monotonic_clock = FALSE; + if (clock != NULL && G_OBJECT_TYPE (clock) == GST_TYPE_SYSTEM_CLOCK) { + GstClockType clock_type; + g_object_get (clock, "clock-type", &clock_type, NULL); + self->uses_monotonic_clock = clock_type == GST_CLOCK_TYPE_MONOTONIC; + } + + return GST_ELEMENT_CLASS (gst_unix_fd_sink_parent_class)->set_clock (element, + clock); +} + +static void +gst_unix_fd_sink_class_init (GstUnixFdSinkClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstElementClass *gstelement_class = (GstElementClass *) klass; + GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass; + + GST_DEBUG_CATEGORY_INIT (unixfdsink_debug, "unixfdsink", 0, + "Unix file descriptor sink"); + gst_element_class_set_static_metadata (gstelement_class, + "Unix file descriptor sink", "Sink", "Unix file descriptor sink", + "Xavier Claessens "); + gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); + + gst_shm_allocator_init_once (); + + gobject_class->finalize = gst_unix_fd_sink_finalize; + gobject_class->set_property = gst_unix_fd_sink_set_property; + gobject_class->get_property = gst_unix_fd_sink_get_property; + + gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_set_clock); + + gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_start); + gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_stop); + gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_render); + gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_unix_fd_sink_event); + gstbasesink_class->propose_allocation = + GST_DEBUG_FUNCPTR (gst_unix_fd_sink_propose_allocation); + + g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, + g_param_spec_string ("socket-path", + "Path to the control socket", + "The path to the control socket used to control the shared memory " + "transport. This may be modified during the NULL->READY transition", + NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); +} diff --git a/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c new file mode 100644 index 0000000000..7d41cdf0ff --- /dev/null +++ b/subprojects/gst-plugins-bad/gst/unixfd/gstunixfdsrc.c @@ -0,0 +1,477 @@ +/* GStreamer unix file-descriptor source/sink + * + * Copyright (C) 2023 Netflix Inc. + * Author: Xavier Claessens + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-unixfdsrc + * @title: unixfdsrc + * + * Receive file-descriptor backed buffers (e.g. memfd, dmabuf) over unix socket + * from matching unixfdsink. + * + * ## Example launch lines + * |[ + * gst-launch-1.0 -v videotestsrc ! unixfdsink socket-path=/tmp/blah + * gst-launch-1.0 -v unixfdsrc socket-path=/tmp/blah ! autovideosink + * ]| + * + * Since: 1.24 + */ + +#include "gstunixfd.h" + +#include +#include + +#include +#include +#include +#include + +GST_DEBUG_CATEGORY (unixfdsrc_debug); +#define GST_CAT_DEFAULT (unixfdsrc_debug) + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define GST_TYPE_UNIX_FD_SRC gst_unix_fd_src_get_type() +G_DECLARE_FINAL_TYPE (GstUnixFdSrc, gst_unix_fd_src, GST, UNIX_FD_SRC, + GstPushSrc); + +struct _GstUnixFdSrc +{ + GstPushSrc parent; + + gchar *socket_path; + GSocket *socket; + GCancellable *cancellable; + + GstAllocator *allocator; + GHashTable *memories; + gboolean uses_monotonic_clock; +}; + +G_DEFINE_TYPE (GstUnixFdSrc, gst_unix_fd_src, GST_TYPE_PUSH_SRC); +GST_ELEMENT_REGISTER_DEFINE (unixfdsrc, "unixfdsrc", GST_RANK_NONE, + GST_TYPE_UNIX_FD_SRC); + +enum +{ + PROP_0, + PROP_SOCKET_PATH, +}; + +typedef struct +{ + guint64 id; + guint n_memory; +} BufferContext; + +static void +memory_weak_ref_cb (GstUnixFdSrc * self, GstMemory * mem) +{ + GST_OBJECT_LOCK (self); + + BufferContext *ctx = g_hash_table_lookup (self->memories, mem); + if (ctx == NULL) + goto out; + + if (--ctx->n_memory == 0) { + /* Notify that we are not using this buffer anymore */ + ReleaseBufferPayload payload = { ctx->id }; + GError *error = NULL; + if (!gst_unix_fd_send_command (self->socket, COMMAND_TYPE_RELEASE_BUFFER, + NULL, (const gchar *) &payload, sizeof (payload), &error)) { + GST_WARNING_OBJECT (self, "Failed to send release-buffer command: %s", + error->message); + g_clear_error (&error); + } + g_free (ctx); + } + + g_hash_table_remove (self->memories, mem); + +out: + GST_OBJECT_UNLOCK (self); +} + +static void +gst_unix_fd_src_init (GstUnixFdSrc * self) +{ + g_return_if_fail (GST_IS_UNIX_FD_SRC (self)); + + self->cancellable = g_cancellable_new (); + self->memories = g_hash_table_new (NULL, NULL); + self->allocator = gst_fd_allocator_new (); + gst_base_src_set_live (GST_BASE_SRC (self), TRUE); +} + +static void +gst_unix_fd_src_finalize (GObject * object) +{ + GstUnixFdSrc *self = GST_UNIX_FD_SRC (object); + + g_free (self->socket_path); + g_object_unref (self->cancellable); + g_hash_table_unref (self->memories); + gst_object_unref (self->allocator); + + G_OBJECT_CLASS (gst_unix_fd_src_parent_class)->finalize (object); +} + +static void +gst_unix_fd_src_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstUnixFdSrc *self = GST_UNIX_FD_SRC (object); + + GST_OBJECT_LOCK (self); + + switch (prop_id) { + case PROP_SOCKET_PATH: + if (self->socket) { + GST_WARNING_OBJECT (self, + "Can only change socket path in NULL or READY state"); + break; + } + g_free (self->socket_path); + self->socket_path = g_value_dup_string (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_OBJECT_UNLOCK (self); +} + +static void +gst_unix_fd_src_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstUnixFdSrc *self = GST_UNIX_FD_SRC (object); + + GST_OBJECT_LOCK (self); + + switch (prop_id) { + case PROP_SOCKET_PATH: + g_value_set_string (value, self->socket_path); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } + + GST_OBJECT_UNLOCK (self); +} + +static gboolean +gst_unix_fd_src_start (GstBaseSrc * bsrc) +{ + GstUnixFdSrc *self = (GstUnixFdSrc *) bsrc; + GSocketAddress *addr = NULL; + GError *error = NULL; + gboolean ret = TRUE; + + gst_base_src_set_format (bsrc, GST_FORMAT_TIME); + + GST_OBJECT_LOCK (self); + + if (self->socket_path == NULL) { + GST_ERROR_OBJECT (self, "Socket path is NULL"); + ret = FALSE; + goto out; + } + + addr = g_unix_socket_address_new (self->socket_path); + + self->socket = + g_socket_new (G_SOCKET_FAMILY_UNIX, G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_DEFAULT, &error); + if (self->socket == NULL) { + GST_ERROR_OBJECT (self, "Failed to create UNIX socket: %s", error->message); + ret = FALSE; + goto out; + } + + if (!g_socket_connect (self->socket, addr, NULL, &error)) { + GST_ERROR_OBJECT (self, "Failed to connect socket: %s", error->message); + g_clear_object (&self->socket); + ret = FALSE; + goto out; + } + +out: + GST_OBJECT_UNLOCK (self); + g_clear_error (&error); + g_clear_object (&addr); + return ret; +} + +static gboolean +gst_unix_fd_src_stop (GstBaseSrc * bsrc) +{ + GstUnixFdSrc *self = (GstUnixFdSrc *) bsrc; + + GST_OBJECT_LOCK (self); + + /* Remove all weak refs we could still have to not be called back later. + * Service side will cleanup pending buffers when socket gets closed. */ + GstMemory *mem; + BufferContext *ctx; + GHashTableIter iter; + g_hash_table_iter_init (&iter, self->memories); + while (g_hash_table_iter_next (&iter, (gpointer *) & mem, (gpointer *) & ctx)) { + gst_mini_object_weak_unref (GST_MINI_OBJECT_CAST (mem), + (GstMiniObjectNotify) memory_weak_ref_cb, self); + if (--ctx->n_memory == 0) + g_free (ctx); + } + g_hash_table_remove_all (self->memories); + g_clear_object (&self->socket); + + GST_OBJECT_UNLOCK (self); + + return TRUE; +} + +static gboolean +gst_unix_fd_src_unlock (GstBaseSrc * bsrc) +{ + GstUnixFdSrc *self = GST_UNIX_FD_SRC (bsrc); + g_cancellable_cancel (self->cancellable); + return TRUE; +} + +static gboolean +gst_unix_fd_src_unlock_stop (GstBaseSrc * bsrc) +{ + GstUnixFdSrc *self = GST_UNIX_FD_SRC (bsrc); + g_cancellable_reset (self->cancellable); + return TRUE; +} + +static GstClockTime +calculate_timestamp (GstClockTime timestamp, GstClockTime base_time, + GstClockTimeDiff clock_diff) +{ + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* Convert from system monotonic clock time to pipeline clock time */ + if (clock_diff > 0 && clock_diff > timestamp) + return 0; + timestamp -= clock_diff; + /* Convert to running time */ + if (base_time > timestamp) + return 0; + timestamp -= base_time; + } + return timestamp; +} + +static GstFlowReturn +gst_unix_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) +{ + GstUnixFdSrc *self = GST_UNIX_FD_SRC (psrc); + CommandType command; + GUnixFDList *fds = NULL; + gchar *payload = NULL; + gsize payload_size; + GError *error = NULL; + GstFlowReturn ret = GST_FLOW_OK; + +again: + /* Block until we receive a command */ + if (!gst_unix_fd_receive_command (self->socket, self->cancellable, &command, + &fds, &payload, &payload_size, &error)) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + ret = GST_FLOW_FLUSHING; + goto on_error; + } + GST_ERROR_OBJECT (self, "Failed to read from sink element: %s", + error != NULL ? error->message : "Connection closed by peer"); + ret = GST_FLOW_ERROR; + goto on_error; + } + + switch (command) { + case COMMAND_TYPE_RELEASE_BUFFER: + GST_ERROR_OBJECT (self, "Received wrong command %d", command); + ret = GST_FLOW_ERROR; + goto on_error; + case COMMAND_TYPE_NEW_BUFFER:{ + NewBufferPayload *new_buffer; + if (!gst_unix_fd_parse_new_buffer (payload, payload_size, &new_buffer)) { + GST_ERROR_OBJECT (self, "Received new-buffer with wrong payload size"); + ret = GST_FLOW_ERROR; + goto on_error; + } + + if (fds == NULL) { + GST_ERROR_OBJECT (self, + "Received new buffer command without file descriptors"); + return GST_FLOW_ERROR; + } + + if (g_unix_fd_list_get_length (fds) != new_buffer->n_memory) { + GST_ERROR_OBJECT (self, + "Received new buffer command with %d file descriptors instead of %d", + g_unix_fd_list_get_length (fds), new_buffer->n_memory); + ret = GST_FLOW_ERROR; + goto on_error; + } + + gint *fds_arr = g_unix_fd_list_steal_fds (fds, NULL); + + BufferContext *ctx = g_new0 (BufferContext, 1); + ctx->id = new_buffer->id; + ctx->n_memory = new_buffer->n_memory; + + *outbuf = gst_buffer_new (); + + GstClockTime base_time = + gst_element_get_base_time (GST_ELEMENT_CAST (self)); + GstClockTimeDiff clock_diff = 0; + if (!self->uses_monotonic_clock) { + clock_diff = GST_CLOCK_DIFF (g_get_monotonic_time () * GST_USECOND, + gst_clock_get_time (GST_ELEMENT_CLOCK (self))); + } + + GST_BUFFER_PTS (*outbuf) = + calculate_timestamp (new_buffer->pts, base_time, clock_diff); + GST_BUFFER_DTS (*outbuf) = + calculate_timestamp (new_buffer->dts, base_time, clock_diff); + GST_BUFFER_DURATION (*outbuf) = new_buffer->duration; + GST_BUFFER_OFFSET (*outbuf) = new_buffer->offset; + GST_BUFFER_OFFSET_END (*outbuf) = new_buffer->offset_end; + GST_BUFFER_FLAGS (*outbuf) = new_buffer->flags; + + GST_OBJECT_LOCK (self); + for (int i = 0; i < new_buffer->n_memory; i++) { + GstMemory *mem = gst_fd_allocator_alloc (self->allocator, fds_arr[i], + new_buffer->memories[i].size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (mem, new_buffer->memories[i].offset, + new_buffer->memories[i].size); + GST_MINI_OBJECT_FLAG_SET (mem, GST_MEMORY_FLAG_READONLY); + + g_hash_table_insert (self->memories, mem, ctx); + gst_mini_object_weak_ref (GST_MINI_OBJECT_CAST (mem), + (GstMiniObjectNotify) memory_weak_ref_cb, self); + + gst_buffer_append_memory (*outbuf, mem); + } + GST_OBJECT_UNLOCK (self); + + g_free (fds_arr); + + break; + } + case COMMAND_TYPE_CAPS:{ + gchar *caps_str; + if (!gst_unix_fd_parse_caps (payload, payload_size, &caps_str)) { + GST_ERROR_OBJECT (self, "Received caps string is not nul-terminated"); + ret = GST_FLOW_ERROR; + goto on_error; + } + GstCaps *caps = gst_caps_from_string (caps_str); + GST_DEBUG_OBJECT (self, "Received caps %" GST_PTR_FORMAT, caps); + gst_base_src_set_caps (GST_BASE_SRC_CAST (self), caps); + gst_caps_unref (caps); + break; + } + case COMMAND_TYPE_EOS:{ + GST_DEBUG_OBJECT (self, "Received EOS"); + ret = GST_FLOW_EOS; + break; + } + default: + /* Protocol could have been extended with new command */ + GST_DEBUG_OBJECT (self, "Ignoring unknown command %d", command); + break; + } + + if (*outbuf == NULL && ret == GST_FLOW_OK) { + g_clear_object (&fds); + g_clear_pointer (&payload, g_free); + goto again; + } + +on_error: + g_clear_error (&error); + g_clear_object (&fds); + g_free (payload); + return ret; +} + +static gboolean +gst_unix_fd_src_set_clock (GstElement * element, GstClock * clock) +{ + GstUnixFdSrc *self = (GstUnixFdSrc *) element; + + self->uses_monotonic_clock = FALSE; + if (clock != NULL && G_OBJECT_TYPE (clock) == GST_TYPE_SYSTEM_CLOCK) { + GstClockType clock_type; + g_object_get (clock, "clock-type", &clock_type, NULL); + self->uses_monotonic_clock = clock_type == GST_CLOCK_TYPE_MONOTONIC; + } + + return GST_ELEMENT_CLASS (gst_unix_fd_src_parent_class)->set_clock (element, + clock); +} + +static void +gst_unix_fd_src_class_init (GstUnixFdSrcClass * klass) +{ + GObjectClass *gobject_class = (GObjectClass *) klass; + GstElementClass *gstelement_class = (GstElementClass *) klass; + GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass; + GstPushSrcClass *gstpushsrc_class = (GstPushSrcClass *) klass; + + GST_DEBUG_CATEGORY_INIT (unixfdsrc_debug, "unixfdsrc", 0, + "Unix file descriptor source"); + gst_element_class_set_static_metadata (gstelement_class, + "Unix file descriptor source", "Src", "Unix file descriptor source", + "Xavier Claessens "); + gst_element_class_add_static_pad_template (gstelement_class, &srctemplate); + + gobject_class->finalize = gst_unix_fd_src_finalize; + gobject_class->set_property = gst_unix_fd_src_set_property; + gobject_class->get_property = gst_unix_fd_src_get_property; + + gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_unix_fd_src_set_clock); + + gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_unix_fd_src_start); + gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_unix_fd_src_stop); + gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_unix_fd_src_unlock); + gstbasesrc_class->unlock_stop = + GST_DEBUG_FUNCPTR (gst_unix_fd_src_unlock_stop); + + gstpushsrc_class->create = gst_unix_fd_src_create; + + g_object_class_install_property (gobject_class, PROP_SOCKET_PATH, + g_param_spec_string ("socket-path", + "Path to the control socket", + "The path to the control socket used to control the shared memory " + "transport. This may be modified during the NULL->READY transition", + NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); +} diff --git a/subprojects/gst-plugins-bad/gst/unixfd/meson.build b/subprojects/gst-plugins-bad/gst/unixfd/meson.build new file mode 100644 index 0000000000..4b3cab8b36 --- /dev/null +++ b/subprojects/gst-plugins-bad/gst/unixfd/meson.build @@ -0,0 +1,23 @@ +sources = [ + 'gstunixfd.c', + 'gstunixfdsink.c', + 'gstunixfdsrc.c', +] + +gstallocators_dep = dependency('gstreamer-allocators-1.0') + +gio_unix_dep = dependency('gio-unix-2.0', required: get_option('unixfd')) +if not gio_unix_dep.found() + subdir_done() +endif + +gstunixfd_plugin = library('gstunixfd', + sources, + c_args: gst_plugins_bad_args, + include_directories: [configinc], + dependencies : [gstbase_dep, gstallocators_dep, gio_dep, gio_unix_dep], + install : true, + install_dir : plugins_install_dir, +) + +plugins += [gstunixfd_plugin] diff --git a/subprojects/gst-plugins-bad/meson_options.txt b/subprojects/gst-plugins-bad/meson_options.txt index ba2b0cea5d..662b0246f3 100644 --- a/subprojects/gst-plugins-bad/meson_options.txt +++ b/subprojects/gst-plugins-bad/meson_options.txt @@ -63,6 +63,7 @@ option('speed', type : 'feature', value : 'auto') option('subenc', type : 'feature', value : 'auto') option('switchbin', type : 'feature', value : 'auto') option('timecode', type : 'feature', value : 'auto') +option('unixfd', type : 'feature', value : 'auto') option('videofilters', type : 'feature', value : 'auto') option('videoframe_audiolevel', type : 'feature', value : 'auto') option('videoparsers', type : 'feature', value : 'auto') diff --git a/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c b/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c new file mode 100644 index 0000000000..dcd0608c7b --- /dev/null +++ b/subprojects/gst-plugins-bad/tests/check/elements/unixfd.c @@ -0,0 +1,106 @@ +/* GStreamer unix file-descriptor source/sink tests + * + * Copyright (C) 2023 Netflix Inc. + * Author: Xavier Claessens + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include +#include + +static void +wait_preroll (GstElement * element) +{ + GstStateChangeReturn state_res = + gst_element_set_state (element, GST_STATE_PLAYING); + fail_unless (state_res != GST_STATE_CHANGE_FAILURE); + state_res = gst_element_get_state (element, NULL, NULL, GST_CLOCK_TIME_NONE); + fail_unless (state_res == GST_STATE_CHANGE_SUCCESS); +} + +GST_START_TEST (test_unixfd_videotestsrc) +{ + GError *error = NULL; + + /* Ensure we don't have socket from previous failed test */ + gchar *socket_path = + g_strdup_printf ("%s/unixfd-test-socket", g_get_user_runtime_dir ()); + if (g_file_test (socket_path, G_FILE_TEST_EXISTS)) { + g_unlink (socket_path); + } + + /* Setup source */ + gchar *pipeline_str = + g_strdup_printf ("videotestsrc ! unixfdsink socket-path=%s", socket_path); + GstElement *pipeline_service = gst_parse_launch (pipeline_str, &error); + g_assert_no_error (error); + g_free (pipeline_str); + wait_preroll (pipeline_service); + + /* Setup sink */ + pipeline_str = + g_strdup_printf ("unixfdsrc socket-path=%s ! fakesink", socket_path); + GstElement *pipeline_client_1 = gst_parse_launch (pipeline_str, &error); + g_assert_no_error (error); + wait_preroll (pipeline_client_1); + + /* disconnect, reconnect */ + fail_unless (gst_element_set_state (pipeline_client_1, + GST_STATE_READY) == GST_STATE_CHANGE_SUCCESS); + wait_preroll (pipeline_client_1); + + /* Connect 2nd sink */ + GstElement *pipeline_client_2 = gst_parse_launch (pipeline_str, &error); + g_assert_no_error (error); + wait_preroll (pipeline_client_2); + + /* Teardown */ + fail_unless (gst_element_set_state (pipeline_client_1, + GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); + fail_unless (gst_element_set_state (pipeline_client_2, + GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); + fail_unless (gst_element_set_state (pipeline_service, + GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS); + fail_if (g_file_test (socket_path, G_FILE_TEST_EXISTS)); + + gst_object_unref (pipeline_service); + gst_object_unref (pipeline_client_1); + gst_object_unref (pipeline_client_2); + g_free (socket_path); + g_free (pipeline_str); +} + +GST_END_TEST; + +static Suite * +unixfd_suite (void) +{ + Suite *s = suite_create ("unixfd"); + TCase *tc = tcase_create ("unixfd"); + + suite_add_tcase (s, tc); + tcase_add_test (tc, test_unixfd_videotestsrc); + + return s; +} + +GST_CHECK_MAIN (unixfd); diff --git a/subprojects/gst-plugins-bad/tests/check/meson.build b/subprojects/gst-plugins-bad/tests/check/meson.build index 522a4884cc..173722f44f 100644 --- a/subprojects/gst-plugins-bad/tests/check/meson.build +++ b/subprojects/gst-plugins-bad/tests/check/meson.build @@ -142,6 +142,7 @@ if host_machine.system() != 'windows' [['elements/jpegparse.c'], not cdata.has('HAVE_UNISTD_H')], [['elements/netsim.c']], [['elements/shm.c'], not shm_enabled, shm_deps], + [['elements/unixfd.c'], not gio_unix_dep.found()], [['elements/voaacenc.c'], not voaac_dep.found() or not cdata.has('HAVE_UNISTD_H'), [voaac_dep]], [['elements/webrtcbin.c'], not libnice_dep.found(), [gstwebrtc_dep]],