From 738eb0d8edd72eb5e76a5c1cbd9f2672c314840c Mon Sep 17 00:00:00 2001 From: Petr Kulhavy Date: Tue, 6 Mar 2018 02:14:34 +0100 Subject: [PATCH] udpsrc: switch to using a buffer pool This exposes a new property, mtu, which is used to determine the initial size of buffers from the buffer pool. If received data exceeds this, the element gracefully handles that in a manner similar to what we had previously: a large memory gets filled and reallocated at the next call to "fill". The default size is set to 1500, which should cover most use cases. With contributions from Mathieu Duponchelle https://bugzilla.gnome.org/show_bug.cgi?id=772841 --- gst/udp/gstudpsrc.c | 284 ++++++++++++++++++++------------------------ gst/udp/gstudpsrc.h | 12 +- 2 files changed, 136 insertions(+), 160 deletions(-) diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index fdeca5dadd..bf9de10f76 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -407,6 +407,43 @@ gst_ip_recvdstaddr_message_class_init (GstIPRecvdstaddrMessageClass * class) } #endif +static gboolean +gst_udpsrc_decide_allocation (GstBaseSrc * bsrc, GstQuery * query) +{ + GstUDPSrc *udpsrc; + GstBufferPool *pool; + gboolean update; + GstStructure *config; + GstCaps *caps = NULL; + + udpsrc = GST_UDPSRC (bsrc); + + if (gst_query_get_n_allocation_pools (query) > 0) { + update = TRUE; + } else { + update = FALSE; + } + + pool = gst_buffer_pool_new (); + + config = gst_buffer_pool_get_config (pool); + + gst_query_parse_allocation (query, &caps, NULL); + + gst_buffer_pool_config_set_params (config, caps, udpsrc->mtu, 0, 0); + + gst_buffer_pool_set_config (pool, config); + + if (update) + gst_query_set_nth_allocation_pool (query, 0, pool, udpsrc->mtu, 0, 0); + else + gst_query_add_allocation_pool (query, pool, udpsrc->mtu, 0, 0); + + gst_object_unref (pool); + + return TRUE; +} + /* not 100% correct, but a good upper bound for memory allocation purposes */ #define MAX_IPV4_UDP_PACKET_SIZE (65536 - 8) @@ -433,6 +470,7 @@ static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", #define UDP_DEFAULT_REUSE TRUE #define UDP_DEFAULT_LOOP TRUE #define UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS TRUE +#define UDP_DEFAULT_MTU (1492) enum { @@ -453,17 +491,17 @@ enum PROP_REUSE, PROP_ADDRESS, PROP_LOOP, - PROP_RETRIEVE_SENDER_ADDRESS + PROP_RETRIEVE_SENDER_ADDRESS, + PROP_MTU, }; static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data); static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter); -static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf); static gboolean gst_udpsrc_close (GstUDPSrc * src); static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc); static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc); -static gboolean gst_udpsrc_negotiate (GstBaseSrc * basesrc); +static GstFlowReturn gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf); static void gst_udpsrc_finalize (GObject * object); @@ -602,6 +640,24 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass) "meta. Disabling this might result in minor performance improvements " "in certain scenarios", UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstUDPSrc::mtu: + * + * Maximum expected packet size. This directly defines the allocation + * size of the receive buffer pool. + * + * In case more data is received, a new #GstMemory is appended to the + * output buffer, ensuring no data is lost, this however leads to that + * buffer being freed and reallocated. + * + * Since: 1.14 + */ + g_object_class_install_property (gobject_class, PROP_MTU, + g_param_spec_uint ("mtu", "Expected Maximum Transmission Unit", + "Maximum expected packet size. This directly defines the allocation" + "size of the receive buffer pool.", + 0, G_MAXINT, UDP_DEFAULT_MTU, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_static_pad_template (gstelement_class, &src_template); @@ -616,9 +672,9 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass) gstbasesrc_class->unlock = gst_udpsrc_unlock; gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop; gstbasesrc_class->get_caps = gst_udpsrc_getcaps; - gstbasesrc_class->negotiate = gst_udpsrc_negotiate; + gstbasesrc_class->decide_allocation = gst_udpsrc_decide_allocation; - gstpushsrc_class->create = gst_udpsrc_create; + gstpushsrc_class->fill = gst_udpsrc_fill; } static void @@ -642,6 +698,7 @@ gst_udpsrc_init (GstUDPSrc * udpsrc) udpsrc->reuse = UDP_DEFAULT_REUSE; udpsrc->loop = UDP_DEFAULT_LOOP; udpsrc->retrieve_sender_address = UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS; + udpsrc->mtu = UDP_DEFAULT_MTU; /* configure basesrc to be a live source */ gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE); @@ -680,6 +737,10 @@ gst_udpsrc_finalize (GObject * object) g_object_unref (udpsrc->used_socket); udpsrc->used_socket = NULL; + if (udpsrc->extra_mem) + gst_memory_unref (udpsrc->extra_mem); + udpsrc->extra_mem = NULL; + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -709,112 +770,6 @@ gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter) return result; } -static void -gst_udpsrc_reset_memory_allocator (GstUDPSrc * src) -{ - if (src->mem != NULL) { - gst_memory_unmap (src->mem, &src->map); - gst_memory_unref (src->mem); - src->mem = NULL; - } - if (src->mem_max != NULL) { - gst_memory_unmap (src->mem_max, &src->map_max); - gst_memory_unref (src->mem_max); - src->mem_max = NULL; - } - - src->vec[0].buffer = NULL; - src->vec[0].size = 0; - src->vec[1].buffer = NULL; - src->vec[1].size = 0; - - if (src->allocator != NULL) { - gst_object_unref (src->allocator); - src->allocator = NULL; - } -} - -static gboolean -gst_udpsrc_negotiate (GstBaseSrc * basesrc) -{ - GstUDPSrc *src = GST_UDPSRC_CAST (basesrc); - gboolean ret; - - /* just chain up to the default implementation, we just want to - * retrieve the allocator at the end of it (if there is one) */ - ret = GST_BASE_SRC_CLASS (parent_class)->negotiate (basesrc); - - if (ret) { - GstAllocationParams new_params; - GstAllocator *new_allocator = NULL; - - /* retrieve new allocator */ - gst_base_src_get_allocator (basesrc, &new_allocator, &new_params); - - if (src->allocator != new_allocator || - memcmp (&src->params, &new_params, sizeof (GstAllocationParams)) != 0) { - /* drop old allocator and throw away any memory allocated with it */ - gst_udpsrc_reset_memory_allocator (src); - - /* and save the new allocator and/or new allocation parameters */ - src->allocator = new_allocator; - src->params = new_params; - - GST_INFO_OBJECT (src, "new allocator: %" GST_PTR_FORMAT, new_allocator); - } - } - - return ret; -} - -static gboolean -gst_udpsrc_alloc_mem (GstUDPSrc * src, GstMemory ** p_mem, GstMapInfo * map, - gsize size) -{ - GstMemory *mem; - - mem = gst_allocator_alloc (src->allocator, size, &src->params); - - if (!gst_memory_map (mem, map, GST_MAP_WRITE)) { - gst_memory_unref (mem); - memset (map, 0, sizeof (GstMapInfo)); - return FALSE; - } - *p_mem = mem; - return TRUE; -} - -static gboolean -gst_udpsrc_ensure_mem (GstUDPSrc * src) -{ - if (src->mem == NULL) { - gsize mem_size = 1500; /* typical max. MTU */ - - /* if packets are likely to be smaller, just use that size, otherwise - * default to assuming incoming packets are around MTU size */ - if (src->max_size > 0 && src->max_size < mem_size) - mem_size = src->max_size; - - if (!gst_udpsrc_alloc_mem (src, &src->mem, &src->map, mem_size)) - return FALSE; - - src->vec[0].buffer = src->map.data; - src->vec[0].size = src->map.size; - } - - if (src->mem_max == NULL) { - gsize max_size = MAX_IPV4_UDP_PACKET_SIZE; - - if (!gst_udpsrc_alloc_mem (src, &src->mem_max, &src->map_max, max_size)) - return FALSE; - - src->vec[1].buffer = src->map_max.data; - src->vec[1].size = src->map_max.size; - } - - return TRUE; -} - static void gst_udpsrc_create_cancellable (GstUDPSrc * src) { @@ -836,10 +791,9 @@ gst_udpsrc_free_cancellable (GstUDPSrc * src) } static GstFlowReturn -gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) +gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf) { GstUDPSrc *udpsrc; - GstBuffer *outbuf = NULL; GSocketAddress *saddr = NULL; GSocketAddress **p_saddr; gint flags = G_SOCKET_MSG_NONE; @@ -850,12 +804,12 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) GSocketControlMessage **msgs = NULL; GSocketControlMessage ***p_msgs; gint n_msgs = 0, i; + GstMapInfo info; + GstMapInfo extra_info; + GInputVector ivec[2]; udpsrc = GST_UDPSRC_CAST (psrc); - if (!gst_udpsrc_ensure_mem (udpsrc)) - goto memory_alloc_error; - /* optimization: use messages only in multicast mode and * if we can't let the kernel do the filtering for us */ p_msgs = @@ -870,6 +824,38 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) /* Retrieve sender address unless we've been configured not to do so */ p_saddr = (udpsrc->retrieve_sender_address) ? &saddr : NULL; + if (!gst_buffer_map (outbuf, &info, GST_MAP_READWRITE)) + goto buffer_map_error; + + ivec[0].buffer = info.data; + ivec[0].size = info.size; + + /* Prepare memory in case the data size exceeds mtu */ + if (udpsrc->extra_mem == NULL) { + GstBufferPool *pool; + GstStructure *config; + GstAllocator *allocator = NULL; + GstAllocationParams params; + + pool = gst_base_src_get_buffer_pool (GST_BASE_SRC_CAST (psrc)); + config = gst_buffer_pool_get_config (pool); + gst_buffer_pool_config_get_allocator (config, &allocator, ¶ms); + + udpsrc->extra_mem = + gst_allocator_alloc (allocator, MAX_IPV4_UDP_PACKET_SIZE, ¶ms); + + gst_object_unref (pool); + gst_structure_free (config); + if (allocator) + gst_object_unref (allocator); + } + + if (!gst_memory_map (udpsrc->extra_mem, &extra_info, GST_MAP_READWRITE)) + goto memory_map_error; + + ivec[1].buffer = extra_info.data; + ivec[1].size = extra_info.size; + retry: if (saddr != NULL) { g_object_unref (saddr); @@ -909,7 +895,7 @@ retry: } while (G_UNLIKELY (try_again)); res = - g_socket_receive_message (udpsrc->used_socket, p_saddr, udpsrc->vec, 2, + g_socket_receive_message (udpsrc->used_socket, p_saddr, ivec, 2, p_msgs, &n_msgs, &flags, udpsrc->cancellable, &err); if (G_UNLIKELY (res < 0)) { @@ -929,10 +915,6 @@ retry: goto receive_error; } - /* remember maximum packet size */ - if (res > udpsrc->max_size) - udpsrc->max_size = res; - /* Retry if multicast and the destination address is not ours. We don't want * to receive arbitrary packets */ if (p_msgs) { @@ -983,27 +965,17 @@ retry: } } - outbuf = gst_buffer_new (); + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); - /* append first memory chunk to buffer */ - gst_buffer_append_memory (outbuf, udpsrc->mem); - - /* if the packet didn't fit into the first chunk, add second one as well */ - if (res > udpsrc->map.size) { - gst_buffer_append_memory (outbuf, udpsrc->mem_max); - gst_memory_unmap (udpsrc->mem_max, &udpsrc->map_max); - udpsrc->vec[1].buffer = NULL; - udpsrc->vec[1].size = 0; - udpsrc->mem_max = NULL; + /* If this is the case, the buffer will be freed once unreffed, + * and the buffer pool will have to reallocate a new one. + */ + if (res > udpsrc->mtu) { + gst_buffer_append_memory (outbuf, udpsrc->extra_mem); + udpsrc->extra_mem = NULL; } - /* make sure we allocate a new chunk next time (we do this only here because - * we look at map.size to see if the second memory chunk is needed above) */ - gst_memory_unmap (udpsrc->mem, &udpsrc->map); - udpsrc->vec[0].buffer = NULL; - udpsrc->vec[0].size = 0; - udpsrc->mem = NULL; - offset = udpsrc->skip_first_bytes; if (G_UNLIKELY (offset > 0 && res < offset)) @@ -1020,19 +992,26 @@ retry: GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res); - *buf = GST_BUFFER_CAST (outbuf); - return GST_FLOW_OK; /* ERRORS */ -memory_alloc_error: +buffer_map_error: { GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), - ("Failed to allocate or map memory")); + ("Failed to map memory")); + return GST_FLOW_ERROR; + } +memory_map_error: + { + gst_buffer_unmap (outbuf, &info); + GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), + ("Failed to map memory")); return GST_FLOW_ERROR; } select_error: { + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), ("select error: %s", err->message)); g_clear_error (&err); @@ -1040,12 +1019,16 @@ select_error: } stopped: { + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); GST_DEBUG ("stop called"); g_clear_error (&err); return GST_FLOW_FLUSHING; } receive_error: { + gst_buffer_unmap (outbuf, &info); + gst_memory_unmap (udpsrc->extra_mem, &extra_info); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_clear_error (&err); @@ -1059,8 +1042,6 @@ receive_error: } skip_error: { - gst_buffer_unref (outbuf); - GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL), ("UDP buffer to small to skip header")); return GST_FLOW_ERROR; @@ -1201,6 +1182,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, case PROP_RETRIEVE_SENDER_ADDRESS: udpsrc->retrieve_sender_address = g_value_get_boolean (value); break; + case PROP_MTU: + udpsrc->mtu = g_value_get_uint (value); + break; default: break; } @@ -1261,6 +1245,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_RETRIEVE_SENDER_ADDRESS: g_value_set_boolean (value, udpsrc->retrieve_sender_address); break; + case PROP_MTU: + g_value_set_uint (value, udpsrc->mtu); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -1535,11 +1522,6 @@ gst_udpsrc_open (GstUDPSrc * src) g_object_unref (addr); } - src->allocator = NULL; - gst_allocation_params_init (&src->params); - - src->max_size = 0; - return TRUE; /* ERRORS */ @@ -1666,8 +1648,6 @@ gst_udpsrc_close (GstUDPSrc * src) src->addr = NULL; } - gst_udpsrc_reset_memory_allocator (src); - gst_udpsrc_free_cancellable (src); return TRUE; diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h index 6c512562ee..456bddc38d 100644 --- a/gst/udp/gstudpsrc.h +++ b/gst/udp/gstudpsrc.h @@ -74,15 +74,11 @@ struct _GstUDPSrc { gboolean external_socket; gboolean made_cancel_fd; - /* memory management */ - GstAllocator *allocator; - GstAllocationParams params; + /* Initial size of buffers in the buffer pool */ + guint mtu; - GstMemory *mem; - GstMapInfo map; - GstMemory *mem_max; - GstMapInfo map_max; - GInputVector vec[2]; + /* Extra memory for buffers with a size superior to max_packet_size */ + GstMemory *extra_mem; gchar *uri; };