From f087fddb7e101388f2739436fce4680552e95d00 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 20 Jul 2004 09:55:04 +0000 Subject: [PATCH] gst/tcp/gstmultifdsink.*: Added more stats, added timeout for a client, fixed some typos and added some comments. Original commit message from CVS: * gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init), (gst_multifdsink_init), (gst_multifdsink_add), (gst_multifdsink_client_remove), (gst_multifdsink_handle_client_write), (gst_multifdsink_queue_buffer), (gst_multifdsink_chain), (gst_multifdsink_set_property), (gst_multifdsink_get_property), (gst_multifdsink_init_send): * gst/tcp/gstmultifdsink.h: Added more stats, added timeout for a client, fixed some typos and added some comments. --- ChangeLog | 13 +++++ gst/tcp/gstmultifdsink.c | 112 +++++++++++++++++++++++++++++---------- gst/tcp/gstmultifdsink.h | 6 ++- 3 files changed, 101 insertions(+), 30 deletions(-) diff --git a/ChangeLog b/ChangeLog index 380d56ecbd..00b57af4b0 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +2004-07-20 Wim Taymans + + * gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init), + (gst_multifdsink_init), (gst_multifdsink_add), + (gst_multifdsink_client_remove), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_queue_buffer), (gst_multifdsink_chain), + (gst_multifdsink_set_property), (gst_multifdsink_get_property), + (gst_multifdsink_init_send): + * gst/tcp/gstmultifdsink.h: + Added more stats, added timeout for a client, fixed some typos + and added some comments. + 2004-07-20 Wim Taymans * gst/tcp/gstmultifdsink.c: (gst_multifdsink_class_init), diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c index 9915c7490d..15cf270007 100644 --- a/gst/tcp/gstmultifdsink.c +++ b/gst/tcp/gstmultifdsink.c @@ -32,6 +32,8 @@ #include "gstmultifdsink.h" #include "gsttcp-marshal.h" +/* the select call is also performed on the control sockets, that way + * we can send special commands to unblock or restart the select call */ #define CONTROL_RESTART 'R' /* restart the select call */ #define CONTROL_STOP 'S' /* stop the select call */ #define CONTROL_SOCKETS(sink) sink->control_sock @@ -50,10 +52,11 @@ G_STMT_START { \ /* elementfactory information */ static GstElementDetails gst_multifdsink_details = -GST_ELEMENT_DETAILS ("TCP Server sink", +GST_ELEMENT_DETAILS ("MultiFd sink", "Sink/Network", - "Send data as a server over the network via TCP", - "Thomas Vander Stichele "); + "Send data to multiple filedescriptors", + "Thomas Vander Stichele , " + "Wim Taymans "); GST_DEBUG_CATEGORY (multifdsink_debug); #define GST_CAT_DEFAULT (multifdsink_debug) @@ -66,15 +69,20 @@ enum SIGNAL_REMOVE, SIGNAL_CLEAR, SIGNAL_GET_STATS, + /* signals */ SIGNAL_CLIENT_ADDED, SIGNAL_CLIENT_REMOVED, + LAST_SIGNAL }; /* this is really arbitrary choosen */ +#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 +#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE +#define DEFAULT_TIMEOUT 0 enum { @@ -84,6 +92,9 @@ enum ARG_BUFFERS_SOFT_MAX, ARG_BUFFERS_QUEUED, ARG_RECOVER_POLICY, + ARG_TIMEOUT, + ARG_BYTES_TO_SERVE, + ARG_BYTES_SERVED, }; #define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type()) @@ -179,8 +190,7 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) g_object_class_install_property (gobject_class, ARG_PROTOCOL, g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", - GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, - G_PARAM_READWRITE)); + GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, g_param_spec_int ("buffers-max", "Buffers max", "max number of buffers to queue (-1 = no limit)", -1, G_MAXINT, @@ -191,12 +201,24 @@ gst_multifdsink_class_init (GstMultiFdSinkClass * klass) G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED, g_param_spec_int ("buffers-queued", "Buffers queued", - "Number of buffers current queued", 0, G_MAXINT, 0, + "Number of buffers currently queued", 0, G_MAXINT, 0, G_PARAM_READABLE)); g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY, g_param_spec_enum ("recover-policy", "Recover Policy", "How to recover when client reaches the soft max", - GST_TYPE_RECOVER_POLICY, GST_RECOVER_POLICY_NONE, G_PARAM_READWRITE)); + GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_TIMEOUT, + g_param_spec_uint64 ("timeout", "Timeout", + "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)", + 0, G_MAXUINT64, DEFAULT_TIMEOUT, G_PARAM_READABLE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_TO_SERVE, + g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve", + "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0, + G_PARAM_READABLE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_SERVED, + g_param_spec_uint64 ("bytes-served", "Bytes served", + "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, + G_PARAM_READABLE)); gst_multifdsink_signals[SIGNAL_ADD] = g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, @@ -250,16 +272,17 @@ gst_multifdsink_init (GstMultiFdSink * this) GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN); - this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; + this->protocol = DEFAULT_PROTOCOL; + this->clientslock = g_mutex_new (); this->clients = NULL; this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); this->buffers_max = DEFAULT_BUFFERS_MAX; this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX; + this->recover_policy = DEFAULT_RECOVER_POLICY; - this->clientslock = g_mutex_new (); - this->recover_policy = GST_RECOVER_POLICY_NONE; + this->timeout = DEFAULT_TIMEOUT; } static void @@ -293,6 +316,8 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd) /* update start time */ g_get_current_time (&now); client->connect_time = GST_TIMEVAL_TO_TIME (now); + /* send last activity time to connect time */ + client->last_activity_time = GST_TIMEVAL_TO_TIME (now); g_mutex_lock (sink->clientslock); @@ -404,8 +429,6 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) } SEND_COMMAND (sink, CONTROL_RESTART); - sink->clients = g_list_remove (sink->clients, client); - g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); client->connect_interval = client->disconnect_time = client->connect_time; @@ -413,6 +436,8 @@ gst_multifdsink_client_remove (GstMultiFdSink * sink, GstTCPClient * client) g_signal_emit (G_OBJECT (sink), gst_multifdsink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); + sink->clients = g_list_remove (sink->clients, client); + g_free (client); } @@ -552,6 +577,11 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, int fd = client->fd; gboolean more; gboolean res; + GstClockTime now; + GTimeVal nowtv; + + g_get_current_time (&nowtv); + now = GST_TIMEVAL_TO_TIME (nowtv); /* when using GDP, first check if we have queued caps yet */ if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { @@ -643,20 +673,24 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, fd); return FALSE; } - } else if (wrote < maxsize) { - /* partial write means that the client cannot read more and we should - * stop sending more */ - GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote); - client->bufoffset += wrote; - client->bytes_sent += wrote; - more = FALSE; } else { - /* complete buffer was written, we can proceed to the next one */ - client->sending = g_list_remove (client->sending, head); - gst_buffer_unref (head); - /* make sure we start from byte 0 for the next buffer */ - client->bufoffset = 0; + if (wrote < maxsize) { + /* partial write means that the client cannot read more and we should + * stop sending more */ + GST_LOG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote); + client->bufoffset += wrote; + more = FALSE; + } else { + /* complete buffer was written, we can proceed to the next one */ + client->sending = g_list_remove (client->sending, head); + gst_buffer_unref (head); + /* make sure we start from byte 0 for the next buffer */ + client->bufoffset = 0; + } + /* update stats */ client->bytes_sent += wrote; + client->last_activity_time = now; + sink->bytes_served += wrote; } } } while (more); @@ -732,6 +766,11 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) gboolean need_signal = FALSE; gint max_buffer_usage; gint i; + GTimeVal nowtv; + GstClockTime now; + + g_get_current_time (&nowtv); + now = GST_TIMEVAL_TO_TIME (nowtv); g_mutex_lock (sink->clientslock); /* add buffer to queue */ @@ -763,8 +802,10 @@ gst_multifdsink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) "client %p with fd %d not recovering position", client, client->fd); } } - /* check hard max, remove client */ - if (sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) { + /* check hard max and timeout, remove client */ + if ((sink->buffers_max > 0 && client->bufpos >= sink->buffers_max) || + (sink->timeout > 0 + && now - client->last_activity_time > sink->timeout)) { /* remove client */ GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing", client, client->fd); @@ -943,6 +984,8 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) g_mutex_unlock (sink->clientslock); } +/* we handle the client communication in another thread so that we do not block + * the gstreamer thread while we select() on the client fds */ static gpointer gst_multifdsink_thread (GstMultiFdSink * sink) { @@ -986,7 +1029,7 @@ gst_multifdsink_chain (GstPad * pad, GstData * _data) /* queue the buffer */ gst_multifdsink_queue_buffer (sink, buf); - sink->data_written += GST_BUFFER_SIZE (buf); + sink->bytes_to_serve += GST_BUFFER_SIZE (buf); } static void @@ -1011,6 +1054,9 @@ gst_multifdsink_set_property (GObject * object, guint prop_id, case ARG_RECOVER_POLICY: multifdsink->recover_policy = g_value_get_enum (value); break; + case ARG_TIMEOUT: + multifdsink->timeout = g_value_get_uint64 (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1043,6 +1089,15 @@ gst_multifdsink_get_property (GObject * object, guint prop_id, GValue * value, case ARG_RECOVER_POLICY: g_value_set_enum (value, multifdsink->recover_policy); break; + case ARG_TIMEOUT: + g_value_set_uint64 (value, multifdsink->timeout); + break; + case ARG_BYTES_TO_SERVE: + g_value_set_uint64 (value, multifdsink->bytes_to_serve); + break; + case ARG_BYTES_SERVED: + g_value_set_uint64 (value, multifdsink->bytes_served); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1070,7 +1125,8 @@ gst_multifdsink_init_send (GstMultiFdSink * this) fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK); this->streamheader = NULL; - this->data_written = 0; + this->bytes_to_serve = 0; + this->bytes_served = 0; if (fclass->init) { fclass->init (this); diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h index a05653da8d..6efa9acbe5 100644 --- a/gst/tcp/gstmultifdsink.h +++ b/gst/tcp/gstmultifdsink.h @@ -96,6 +96,7 @@ typedef struct { guint64 connect_time; guint64 disconnect_time; guint64 connect_interval; + guint64 last_activity_time; guint64 dropped_buffers; guint64 avg_queue_size; @@ -107,7 +108,8 @@ struct _GstMultiFdSink { /* pad */ GstPad *sinkpad; - size_t data_written; /* how much bytes have we written ? */ + guint64 bytes_to_serve; /* how much bytes we must serve */ + guint64 bytes_served; /* how much bytes have we served */ GMutex *clientslock; /* lock to protect the clients list */ GList *clients; /* list of clients we are serving */ @@ -127,7 +129,7 @@ struct _GstMultiFdSink { GThread *thread; /* the sender thread */ gint buffers_max; /* max buffers to queue */ - gint buffers_soft_max; /* max buffers a client can lay before recoevery starts */ + gint buffers_soft_max; /* max buffers a client can lag before recovery starts */ GstRecoverPolicy recover_policy; GstClockTime timeout; /* max amount of nanoseconds to remain idle */ /* stats */