diff --git a/gst/udp/gstmultiudpsink.c b/gst/udp/gstmultiudpsink.c index 2b346a1f93..d2fbca7933 100644 --- a/gst/udp/gstmultiudpsink.c +++ b/gst/udp/gstmultiudpsink.c @@ -120,6 +120,8 @@ static void gst_multiudpsink_finalize (GObject * object); static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink, GstBuffer * buffer); +static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink, + GstBufferList * buffer_list); static gboolean gst_multiudpsink_start (GstBaseSink * bsink); static gboolean gst_multiudpsink_stop (GstBaseSink * bsink); @@ -359,6 +361,7 @@ gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass) "Wim Taymans "); gstbasesink_class->render = gst_multiudpsink_render; + gstbasesink_class->render_list = gst_multiudpsink_render_list; gstbasesink_class->start = gst_multiudpsink_start; gstbasesink_class->stop = gst_multiudpsink_stop; gstbasesink_class->unlock = gst_multiudpsink_unlock; @@ -408,6 +411,9 @@ gst_multiudpsink_init (GstMultiUDPSink * sink) sink->vec = g_new (GOutputVector, max_mem); sink->map = g_new (GstMapInfo, max_mem); + + /* we assume that the number of memories per buffer can fit into a guint8 */ + g_warn_if_fail (max_mem <= G_MAXUINT8); } static GstUDPClient * @@ -533,6 +539,375 @@ gst_multiudpsink_finalize (GObject * object) G_OBJECT_CLASS (parent_class)->finalize (object); } +/* replacement until we can depend unconditionally on the real one in GLib */ +#ifndef HAVE_G_SOCKET_SEND_MESSAGES +#define g_socket_send_messages gst_socket_send_messages + +static gint +gst_socket_send_messages (GSocket * socket, GstOutputMessage * messages, + guint num_messages, gint flags, GCancellable * cancellable, GError ** error) +{ + gssize result; + gint i; + + for (i = 0; i < num_messages; ++i) { + GstOutputMessage *msg = &messages[i]; + GError *msg_error = NULL; + + result = g_socket_send_message (socket, msg->address, + msg->vectors, msg->num_vectors, + msg->control_messages, msg->num_control_messages, + flags, cancellable, &msg_error); + + if (result < 0) { + /* if we couldn't send all messages, just return how many we did + * manage to send, provided we managed to send at least one */ + if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0) { + g_error_free (msg_error); + return i; + } else { + g_propagate_error (error, msg_error); + return -1; + } + } + + msg->bytes_sent = result; + } + + return i; +} +#endif /* HAVE_G_SOCKET_SEND_MESSAGES */ + +static gsize +fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf) +{ + GstMemory *mem; + gsize size = 0; + guint i; + + g_assert (gst_buffer_n_memory (buf) == n); + + for (i = 0; i < n; ++i) { + mem = gst_buffer_peek_memory (buf, i); + if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) { + vecs[i].buffer = maps[i].data; + vecs[i].size = maps[i].size; + } else { + GST_WARNING ("Failed to map memory %p for reading", mem); + vecs[i].buffer = ""; + vecs[i].size = 0; + } + size += vecs[i].size; + } + + return size; +} + +static gsize +gst_udp_calc_message_size (GstOutputMessage * msg) +{ + gsize size = 0; + guint i; + + for (i = 0; i < msg->num_vectors; ++i) + size += msg->vectors[i].size; + + return size; +} + +static gint +gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages, + guint num_messages) +{ + guint i; + + for (i = 0; i < num_messages; ++i) { + GstOutputMessage *msg = &messages[i]; + + if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0) + return i; + } + + return -1; +} + +static inline gchar * +gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size) +{ + GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr); + GInetAddress *ia; + gchar *addr_str; + + ia = g_inet_socket_address_get_address (isa); + addr_str = g_inet_address_to_string (ia); + g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa)); + g_free (addr_str); + g_object_unref (ia); + + return s; +} + +/* Wrapper around g_socket_send_messages() plus error handling (ignoring). + * Returns FALSE if we got cancelled, otherwise TRUE. */ +static gboolean +gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket, + GstOutputMessage * messages, guint num_messages) +{ + gboolean sent_max_size_warning = FALSE; + + while (num_messages > 0) { + gchar astr[64] G_GNUC_UNUSED; + GError *err = NULL; + guint msg_size, skip, i; + gint ret, err_idx; + + ret = g_socket_send_messages (socket, messages, num_messages, 0, + sink->cancellable, &err); + + if (G_UNLIKELY (ret < 0)) { + GstOutputMessage *msg; + + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_clear_error (&err); + return FALSE; + } + + err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages); + if (err_idx < 0) + break; + + msg = &messages[err_idx]; + msg_size = gst_udp_calc_message_size (msg); + + GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size, + gst_udp_address_get_string (msg->address, astr, sizeof (astr)), + err->message); + + skip = 1; + if (msg_size > UDP_MAX_SIZE) { + if (!sent_max_size_warning) { + GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, + ("Attempting to send a UDP packets larger than maximum size " + "(%u > %d)", msg_size, UDP_MAX_SIZE), + ("Reason: %s", err ? err->message : "unknown reason")); + sent_max_size_warning = FALSE; + } + } else { + GST_ELEMENT_WARNING (sink, RESOURCE, WRITE, + ("Error sending UDP packets"), ("client %s, reason: %s", + gst_udp_address_get_string (msg->address, astr, sizeof (astr)), + (err != NULL) ? err->message : "unknown reason")); + + for (i = err_idx + 1; i < num_messages; ++i, ++skip) { + if (messages[i].address != msg->address) + break; + } + GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip); + } + + /* ignore any errors and try sending the rest */ + g_clear_error (&err); + ret = skip; + } + + g_assert (ret <= num_messages); + + messages += ret; + num_messages -= ret; + } + + return TRUE; +} + +static GstFlowReturn +gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers, + guint num_buffers, guint8 * mem_nums, guint total_mem_num) +{ + GstOutputMessage *msgs; + gboolean send_duplicates; + GstUDPClient **clients; + GOutputVector *vecs; + GstMapInfo *map_infos; + GstFlowReturn flow_ret; + guint num_addr_v4, num_addr_v6; + guint num_addr, num_msgs; + GError *err = NULL; + guint i, j, mem; + gsize size = 0; + GList *l; + + send_duplicates = sink->send_duplicates; + + g_mutex_lock (&sink->client_lock); + + if (send_duplicates) { + num_addr_v4 = sink->num_v4_all; + num_addr_v6 = sink->num_v6_all; + } else { + num_addr_v4 = sink->num_v4_unique; + num_addr_v6 = sink->num_v6_unique; + } + num_addr = num_addr_v4 + num_addr_v6; + + if (num_addr == 0) + goto no_clients; + + clients = g_newa (GstUDPClient *, num_addr); + for (l = sink->clients, i = 0; l != NULL; l = l->next) { + GstUDPClient *client = l->data; + + clients[i++] = gst_udp_client_ref (client); + for (j = 1; send_duplicates && j < client->add_count; ++j) + clients[i++] = gst_udp_client_ref (client); + } + g_assert_cmpuint (i, ==, num_addr); + + g_mutex_unlock (&sink->client_lock); + + GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients", + num_buffers, total_mem_num, num_addr); + + vecs = g_newa (GOutputVector, total_mem_num); + map_infos = g_newa (GstMapInfo, total_mem_num); + + num_msgs = num_addr * num_buffers; + msgs = g_newa (GstOutputMessage, num_msgs); + + /* populate first num_buffers messages with output vectors for the buffers */ + for (i = 0, mem = 0; i < num_buffers; ++i) { + size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]); + msgs[i].vectors = &vecs[mem]; + msgs[i].num_vectors = mem_nums[i]; + msgs[i].num_control_messages = 0; + msgs[i].control_messages = NULL; + msgs[i].address = clients[0]->addr; + mem += mem_nums[i]; + } + + /* FIXME: how about some locking? (there wasn't any before either, but..) */ + sink->bytes_to_serve += size; + + /* now copy the pre-filled num_buffer messages over to the next num_buffer + * messages for the next client, where we also change the target adddress */ + for (i = 1; i < num_addr; ++i) { + for (j = 0; j < num_buffers; ++j) { + msgs[i * num_buffers + j] = msgs[j]; + msgs[i * num_buffers + j].address = clients[i]->addr; + } + } + + /* now send it! */ + { + gboolean ret; + + /* no IPv4 socket? Send it all from the IPv6 socket then.. */ + if (sink->used_socket == NULL) { + ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6, + msgs, num_msgs); + } else { + guint num_msgs_v4 = num_buffers * num_addr_v4; + guint num_msgs_v6 = num_buffers * num_addr_v6; + + /* FIXME: assumes clients are sorted in our list! */ + ret = gst_multiudpsink_send_messages (sink, sink->used_socket, + msgs, num_msgs_v4); + + if (!ret) + goto cancelled; + + ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6, + msgs + num_msgs_v4, num_msgs_v6); + } + + if (!ret) + goto cancelled; + } + + flow_ret = GST_FLOW_OK; + + /* now update stats */ + g_mutex_lock (&sink->client_lock); + + for (i = 0; i < num_addr; ++i) { + GstUDPClient *client = clients[i]; + + for (j = 0; j < num_buffers; ++j) { + gsize bytes_sent; + + bytes_sent = msgs[i * num_buffers + j].bytes_sent; + + client->bytes_sent += bytes_sent; + client->packets_sent++; + sink->bytes_served += bytes_sent; + } + gst_udp_client_unref (client); + } + + g_mutex_unlock (&sink->client_lock); + +out: + + for (i = 0; i < mem; ++i) + gst_memory_unmap (map_infos[i].memory, &map_infos[i]); + + return flow_ret;; + +no_clients: + { + g_mutex_unlock (&sink->client_lock); + GST_LOG_OBJECT (sink, "no clients"); + return GST_FLOW_OK; + } +cancelled: + { + GST_INFO_OBJECT (sink, "cancelled"); + g_clear_error (&err); + flow_ret = GST_FLOW_FLUSHING; + + g_mutex_lock (&sink->client_lock); + for (i = 0; i < num_addr; ++i) + gst_udp_client_unref (clients[i]); + g_mutex_unlock (&sink->client_lock); + goto out; + } +} + +static GstFlowReturn +gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list) +{ + GstMultiUDPSink *sink; + GstBuffer **buffers; + GstFlowReturn flow; + guint8 *mem_nums; + guint total_mems; + guint i, num_buffers; + + sink = GST_MULTIUDPSINK_CAST (bsink); + + num_buffers = gst_buffer_list_length (buffer_list); + if (num_buffers == 0) + goto no_data; + + buffers = g_newa (GstBuffer *, num_buffers); + mem_nums = g_newa (guint8, num_buffers); + for (i = 0, total_mems = 0; i < num_buffers; ++i) { + buffers[i] = gst_buffer_list_get (buffer_list, i); + mem_nums[i] = gst_buffer_n_memory (buffers[i]); + total_mems += mem_nums[i]; + } + + flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers, + mem_nums, total_mems); + + return flow; + +no_data: + { + GST_LOG_OBJECT (sink, "empty buffer"); + return GST_FLOW_OK; + } +} + static GstFlowReturn gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer) { diff --git a/gst/udp/gstmultiudpsink.h b/gst/udp/gstmultiudpsink.h index ee9776dde8..f443dd9f98 100644 --- a/gst/udp/gstmultiudpsink.h +++ b/gst/udp/gstmultiudpsink.h @@ -38,6 +38,28 @@ G_BEGIN_DECLS typedef struct _GstMultiUDPSink GstMultiUDPSink; typedef struct _GstMultiUDPSinkClass GstMultiUDPSinkClass; +#if GLIB_CHECK_VERSION (2, 43, 2) +#define HAVE_G_SOCKET_SEND_MESSAGES +#endif + +#ifndef HAVE_G_SOCKET_SEND_MESSAGES +/* same as GOutputMessage used for g_socket_send_messages() */ +typedef struct { + /*< private >*/ + GSocketAddress *address; + + GOutputVector *vectors; + guint num_vectors; + + guint bytes_sent; + + GSocketControlMessage **control_messages; + guint num_control_messages; +} GstOutputMessage; +#else +typedef GOutputMessage GstOutputMessage; +#endif /* HAVE_G_SOCKET_SEND_MESSAGES*/ + typedef struct { gint ref_count; /* for memory management */ gint add_count; /* how often this address has been added */ @@ -61,6 +83,7 @@ struct _GstMultiUDPSink { GSocket *used_socket, *used_socket_v6; GCancellable *cancellable; + /* client management */ GMutex client_lock; GList *clients; guint num_v4_unique; /* number IPv4 clients (excluding duplicates) */ @@ -68,6 +91,7 @@ struct _GstMultiUDPSink { guint num_v6_unique; /* number IPv6 clients (excluding duplicates) */ guint num_v6_all; /* number IPv6 clients (including duplicates) */ + /* pre-allocated scrap space for render function */ GOutputVector *vec; GstMapInfo *map;