diff --git a/ChangeLog b/ChangeLog index 55c7259579..0fc1abc6fb 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,23 @@ +2004-06-25 Wim Taymans + + * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init), + (gst_tcpserversink_init), (gst_tcpserversink_handle_server_read), + (gst_tcpserversink_client_remove), + (gst_tcpserversink_handle_client_read), + (gst_tcpserversink_client_queue_data), + (gst_tcpserversink_client_queue_caps), + (gst_tcpserversink_client_queue_buffer), + (gst_tcpserversink_handle_client_write), + (gst_tcpserversink_queue_buffer), + (gst_tcpserversink_handle_clients), (gst_tcpserversink_thread), + (gst_tcpserversink_chain), (gst_tcpserversink_set_property), + (gst_tcpserversink_get_property), (gst_tcpserversink_init_send), + (gst_tcpserversink_close): + * gst/tcp/gsttcpserversink.h: + Serversink rewrite. Really do non blocking writes to clients and + maintain an internal queue to handle slower clients while not + disturbing fast clients. + 2004-06-25 Thomas Vander Stichele * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get): diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index 42780a1ac4..3c11d13971 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -36,6 +36,19 @@ #define TCP_DEFAULT_PORT 4953 #define TCP_BACKLOG 5 +#define CONTROL_RESTART 'R' /* restart the select call */ +#define CONTROL_STOP 'S' /* stop the select call */ +#define SEND_COMMAND(sink, command) \ +G_STMT_START { \ + unsigned char c; c = command; \ + write (sink->control_sock[1], &c, 1); \ +} G_STMT_END + +#define READ_COMMAND(sink, command) \ +G_STMT_START { \ + read(sink->control_sock[0], &command, 1); \ +} G_STMT_END + /* elementfactory information */ static GstElementDetails gst_tcpserversink_details = GST_ELEMENT_DETAILS ("TCP Server sink", @@ -46,6 +59,19 @@ GST_ELEMENT_DETAILS ("TCP Server sink", GST_DEBUG_CATEGORY (tcpserversink_debug); #define GST_CAT_DEFAULT (tcpserversink_debug) +typedef struct +{ + int fd; + int bufpos; /* position of this client in the global queue */ + + GList *sending; /* the buffers we need to send */ + int bufoffset; /* offset in the first buffer */ + + gboolean caps_sent; + gboolean streamheader_sent; +} +GstTCPClient; + /* TCPServerSink signals and args */ enum { @@ -54,12 +80,17 @@ enum LAST_SIGNAL }; +#define DEFAULT_BUFFERS_MAX 25 +#define DEFAULT_BUFFERS_SOFT_MAX 20 + enum { ARG_0, ARG_HOST, ARG_PORT, - ARG_PROTOCOL + ARG_PROTOCOL, + ARG_BUFFERS_MAX, + ARG_BUFFERS_SOFT_MAX, }; static void gst_tcpserversink_base_init (gpointer g_class); @@ -139,6 +170,14 @@ gst_tcpserversink_class_init (GstTCPServerSink * klass) g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", GST_TYPE_TCP_PROTOCOL_TYPE, GST_TCP_PROTOCOL_TYPE_NONE, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, + g_param_spec_int ("buffers-max", "Buffers max", + "max number of buffers to queue (0 = no limit)", 0, G_MAXINT, + DEFAULT_BUFFERS_MAX, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_SOFT_MAX, + g_param_spec_int ("buffers-soft-max", "Buffers soft max", + "Recover client when going over this limit (0 = no limit)", 0, + G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); gst_tcpserversink_signals[SIGNAL_CLIENT_ADDED] = g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), @@ -187,6 +226,17 @@ gst_tcpserversink_init (GstTCPServerSink * this) this->protocol = GST_TCP_PROTOCOL_TYPE_NONE; this->clock = NULL; + + this->clients = NULL; + + this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); + this->queuelock = g_mutex_new (); + this->queuecond = g_cond_new (); + this->buffers_max = DEFAULT_BUFFERS_MAX; + this->buffers_soft_max = DEFAULT_BUFFERS_SOFT_MAX; + + this->clientslock = g_mutex_new (); + } static void @@ -210,6 +260,7 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) int client_sock_fd; struct sockaddr_in client_address; int client_address_len; + GstTCPClient *client; client_sock_fd = accept (sink->server_sock_fd, (struct sockaddr *) &client_address, @@ -219,7 +270,24 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) ("Could not accept client on server socket: %s", g_strerror (errno))); return FALSE; } - FD_SET (client_sock_fd, &(sink->clientfds)); + + /* create client datastructure */ + client = g_new0 (GstTCPClient, 1); + client->fd = client_sock_fd; + client->bufpos = -1; + client->bufoffset = 0; + client->sending = NULL; + + g_mutex_lock (sink->clientslock); + sink->clients = g_list_prepend (sink->clients, client); + g_mutex_unlock (sink->clientslock); + + /* we always read from a client */ + FD_SET (client_sock_fd, &sink->readfds); + + /* set the socket to non blocking */ + fcntl (client_sock_fd, F_SETFL, O_NONBLOCK); + GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d", inet_ntoa (client_address.sin_addr), client_sock_fd); g_signal_emit (G_OBJECT (sink), @@ -230,16 +298,22 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) } static void -gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd) +gst_tcpserversink_client_remove (GstTCPServerSink * sink, GstTCPClient * client) { + int fd = client->fd; + /* FIXME: if we keep track of ip we can log it here and signal */ GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); if (close (fd) != 0) { GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); } - FD_CLR (fd, &sink->clientfds); - FD_CLR (fd, &sink->caps_sent); - FD_CLR (fd, &sink->streamheader_sent); + FD_CLR (fd, &sink->readfds); + FD_CLR (fd, &sink->writefds); + + sink->clients = g_list_remove (sink->clients, client); + + g_free (client); + g_signal_emit (G_OBJECT (sink), gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); } @@ -248,9 +322,12 @@ gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd) * which either indicates a close or should be ignored * returns FALSE if the client has been closed. */ static gboolean -gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd) +gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, + GstTCPClient * client) { - int nread; + int nread, fd; + + fd = client->fd; GST_LOG_OBJECT (sink, "select reports client read on fd %d", fd); @@ -258,7 +335,6 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd) if (nread == 0) { /* client sent close, so remove it */ GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd); - gst_tcpserversink_client_remove (sink, fd); return FALSE; } else { /* FIXME: we should probably just Read 'n' Drop */ @@ -267,128 +343,435 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd) return TRUE; } -/* Write a buffer to the given fd for the given element using the given - * protocol. - * Return number of buffer bytes written. - */ -static gint -gst_tcp_buffer_write (GstBuffer * buf, int fd, GstElement * element, - GstTCPProtocolType protocol) +static gboolean +gst_tcpserversink_client_queue_data (GstTCPServerSink * sink, + GstTCPClient * client, gchar * data, gint len) { - gint wrote = 0; + GstBuffer *buf; - /* write the buffer header if we have one */ - switch (protocol) { - case GST_TCP_PROTOCOL_TYPE_NONE: - break; + buf = gst_buffer_new (); + GST_BUFFER_DATA (buf) = data; + GST_BUFFER_SIZE (buf) = len; - case GST_TCP_PROTOCOL_TYPE_GDP: - GST_LOG_OBJECT (element, "Sending buffer header through GDP"); - if (!gst_tcp_gdp_write_header (element, fd, buf, FALSE, "unknown", 0)) - return 0; - break; - default: - g_warning ("Unhandled protocol type"); - break; - } + GST_DEBUG_OBJECT (sink, "Queueing data of length %d for fd %d", + len, client->fd); + client->sending = g_list_append (client->sending, buf); - /* serve data to client */ - GST_LOG_OBJECT (element, "serving data buffer of size %d to client on fd %d", - GST_BUFFER_SIZE (buf), fd); - - wrote = - gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)); - - if (wrote < GST_BUFFER_SIZE (buf)) { -/* FIXME: keep track of client ip and port and so on */ -/* - GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, - (_("Error while sending data to \"%s:%d\"."), - sink->host, sink->port), - ("Only %d of %d bytes written: %s", - bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno))); -*/ - /* FIXME: there should be a better way to report problems, since we - want to continue for other clients and just drop this particular one */ - GST_DEBUG_OBJECT (element, "Write failed: %d of %d bytes written", wrote, - GST_BUFFER_SIZE (buf)); - } - - return wrote; + return TRUE; } -/* handle a write on a client fd, - * which indicates a read request from a client */ static gboolean -gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd, - GstPad * pad, GstBuffer * buf) +gst_tcpserversink_client_queue_caps (GstTCPServerSink * sink, + GstTCPClient * client, const GstCaps * caps) { - gint wrote = 0; + guint8 *header; + guint8 *payload; + guint length; + gchar *string; - /* when using GDP, first check if we have sent caps yet */ + string = gst_caps_to_string (caps); + GST_DEBUG_OBJECT (sink, "Queueing caps %s for fd %d through GDP", string, + client->fd); + g_free (string); + + if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) { + GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps"); + return FALSE; + } + gst_tcpserversink_client_queue_data (sink, client, header, length); + + length = gst_dp_header_payload_length (header); + gst_tcpserversink_client_queue_data (sink, client, payload, length); + + return TRUE; +} + +static gboolean +gst_tcpserversink_client_queue_buffer (GstTCPServerSink * sink, + GstTCPClient * client, GstBuffer * buffer) +{ if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { - if (!FD_ISSET (fd, &(sink->caps_sent))) { - const GstCaps *caps; - gchar *string; + guint8 *header; + guint len; - caps = GST_PAD_CAPS (GST_PAD_PEER (pad)); - string = gst_caps_to_string (caps); - GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string, - fd); - if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE, - "unknown", 0)) { - GST_DEBUG_OBJECT (sink, "Failed sending caps, removing client"); - gst_tcpserversink_client_remove (sink, fd); - g_free (string); + if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) { + GST_DEBUG_OBJECT (sink, + "could not create header, removing client on fd %d", client->fd); + return FALSE; + } + gst_tcpserversink_client_queue_data (sink, client, header, len); + } + + gst_buffer_ref (buffer); + client->sending = g_list_append (client->sending, buffer); + + return TRUE; +} + + +/* handle a write on a client, + * which indicates a read request from a client. + * + * The strategy is as follows, for each client we maintain a queue of GstBuffers + * that contain the raw bytes we need to send to the client. In the case of the + * GDP protocol, we create buffers out of the header bytes so that we can only focus + * on sending buffers. + * + * We first check to see if we need to send caps (in GDP) and streamheaders. If so, + * we queue them. + * + * Then we run into the main loop that tries to send as many buffers as possible. It + * will first exhaust the client->sending queue and if the queue is empty, it will + * pick a buffer from the global queue. + * + * Sending the Buffers from the client->sending queue is basically writing the bytes + * to the socket and maintaining a count of the bytes that were sent. When the buffer + * is completely sent, it is removed from the client->sending queue and we try to pick + * a new buffer for sending. + * + * When the sending returns a partial buffer we stop sending more data as the next send + * operation could block. + */ +static gboolean +gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, + GstTCPClient * client) +{ + int fd = client->fd; + gboolean more; + gboolean res; + + /* when using GDP, first check if we have queued caps yet */ + if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { + if (!client->caps_sent) { + const GstCaps *caps = GST_PAD_CAPS (GST_PAD_PEER (sink->sinkpad)); + + /* queue caps for sending */ + res = gst_tcpserversink_client_queue_caps (sink, client, caps); + if (!res) { + GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client"); return FALSE; } - g_free (string); - FD_SET (fd, &(sink->caps_sent)); + client->caps_sent = TRUE; } } /* if we have streamheader buffers, and haven't sent them to this client * yet, send them out one by one */ - if (!FD_ISSET (fd, &(sink->streamheader_sent))) { + if (!client->streamheader_sent) { if (sink->streamheader) { GList *l; for (l = sink->streamheader; l; l = l->next) { - wrote = gst_tcp_buffer_write (l->data, fd, GST_ELEMENT (sink), - sink->protocol); - if (wrote < GST_BUFFER_SIZE (l->data)) { + /* queue stream headers for sending */ + res = + gst_tcpserversink_client_queue_buffer (sink, client, + GST_BUFFER (l->data)); + if (!res) { GST_DEBUG_OBJECT (sink, - "Failed sending streamheader, removing client"); - gst_tcpserversink_client_remove (sink, fd); + "Failed queueing streamheader, removing client"); + return FALSE; } } } - FD_SET (fd, &(sink->streamheader_sent)); + client->streamheader_sent = TRUE; } - /* now we write the data buffer */ - wrote = gst_tcp_buffer_write (buf, fd, GST_ELEMENT (sink), sink->protocol); - if (wrote < GST_BUFFER_SIZE (buf)) { - gst_tcpserversink_client_remove (sink, fd); - /* write failed, so drop the client */ - GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); - if (close (fd) != 0) { - GST_DEBUG_OBJECT (sink, "error closing fd %d after failed write: %s", - fd, g_strerror (errno)); + more = TRUE; + do { + gint maxsize; + + if (!client->sending) { + /* client is not working on a buffer */ + if (client->bufpos == -1) { + /* client is too fast, remove from write queue until new buffer is + * available */ + FD_CLR (fd, &sink->writefds); + return TRUE; + } else { + /* client can pick a buffer from the global queue */ + GstBuffer *buf; + + /* grab buffer and ref, we need to ref since it could be unreffed in + * another thread when we unlock the queuelock */ + g_mutex_lock (sink->queuelock); + buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); + client->bufpos--; + gst_buffer_ref (buf); + g_mutex_unlock (sink->queuelock); + + gst_tcpserversink_client_queue_buffer (sink, client, buf); + /* it is safe to unref now as queueing a buffer will ref it */ + gst_buffer_unref (buf); + /* need to start from the first byte for this new buffer */ + client->bufoffset = 0; + } } - return FALSE; - } + + /* see if we need to send something */ + if (client->sending) { + ssize_t wrote; + GstBuffer *head; + + /* pick first buffer from list */ + head = GST_BUFFER (client->sending->data); + maxsize = GST_BUFFER_SIZE (head) - client->bufoffset; + + /* try to write the complete buffer */ + wrote = + send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize, + MSG_NOSIGNAL); + if (wrote < 0) { + /* hmm error.. */ + if (errno == EAGAIN) { + /* nothing serious, resource was unavailable, try again later */ + more = FALSE; + } else { + GST_DEBUG_OBJECT (sink, "could not write, removing client on fd %d", + fd); + return FALSE; + } + } else if (wrote < maxsize) { + /* partial write means that the client cannot read more and we should + * stop sending more */ + GST_DEBUG_OBJECT (sink, "partial write on %d of %d bytes", fd, wrote); + client->bufoffset += wrote; + more = FALSE; + } else { + /* complete buffer was written, we can proceed to the next one */ + client->sending = g_list_remove (client->sending, head); + gst_buffer_unref (head); + /* make sure we start from byte 0 for the next buffer */ + client->bufoffset = 0; + } + } + } while (more); + return TRUE; } +/* Queue a buffer on the global queue. + * + * This functions adds the buffer to the front of a GArray. It removes the + * tail buffer if the max queue size is exceeded. Unreffing the buffer that + * is queued. Note that unreffing the buffer is not a problem as clients who + * started writing out this buffer will still have a reference to it in the + * client->sending queue. + * + * After adding the buffer, we update all client positions in the queue. If + * a client moves of the soft max, we start the recovery procedure for this + * slow client. If it goes over the hard max, it is put into the slow list + * and removed. + * + * Special care is taken of clients that were waiting for a new buffer (they + * had a position of -1) because they can proceed after adding this new buffer. + * This is done by adding the client back into the write fd_set and signalling + * the select thread that the fd_set changed. + * + */ +static void +gst_tcpserversink_queue_buffer (GstTCPServerSink * sink, GstBuffer * buf) +{ + GList *clients; + gint queuelen; + GList *slow = NULL; + gboolean need_signal = FALSE; + + g_mutex_lock (sink->queuelock); + /* add buffer to queue */ + g_array_prepend_val (sink->bufqueue, buf); + queuelen = sink->bufqueue->len; + if (queuelen > sink->buffers_max) { + GstBuffer *old; + + /* queue exceeded max size */ + queuelen--; + old = g_array_index (sink->bufqueue, GstBuffer *, queuelen); + sink->bufqueue = g_array_remove_index (sink->bufqueue, queuelen); + + /* unref tail buffer */ + gst_buffer_unref (old); + } + g_mutex_unlock (sink->queuelock); + + /* then loop over the clients and update the positions */ + g_mutex_lock (sink->clientslock); + for (clients = sink->clients; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) clients->data; + + client->bufpos++; + GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", + client, client->fd, client->bufpos); + if (client->bufpos >= sink->buffers_soft_max) { + if (client->bufpos == sink->buffers_soft_max) { + g_warning ("client %p with fd %d is lagging...", client, client->fd); + } + GST_LOG_OBJECT (sink, "client %p with fd %d is lagging", + client, client->fd); + } + if (client->bufpos >= queuelen) { + /* remove client */ + GST_LOG_OBJECT (sink, "client %p with fd %d is too slow, removing", + client, client->fd); + g_warning ("client %p with fd %d too slow, removing", client, client->fd); + FD_CLR (client->fd, &sink->readfds); + FD_CLR (client->fd, &sink->writefds); + slow = g_list_prepend (slow, client); + /* cannot send data to this client anymore. need to signal the select thread that + * the fd_set changed */ + need_signal = TRUE; + } else if (client->bufpos == 0) { + /* can send data to this client now. need to signal the select thread that + * the fd_set changed */ + FD_SET (client->fd, &sink->writefds); + need_signal = TRUE; + } + } + /* remove crap clients */ + for (clients = slow; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) slow->data; + + gst_tcpserversink_client_remove (sink, client); + } + g_list_free (slow); + g_mutex_unlock (sink->clientslock); + + /* and send a signal to thread if fd_set changed */ + if (need_signal) { + SEND_COMMAND (sink, CONTROL_RESTART); + } +} + +/* Handle the clients. Basically does a blocking select for one + * of the client fds to become read or writable. We also have a + * filedescriptor to receive commands on that we need to check. + * + * After going out of the select call, we read and write to all + * clients that can do so. Badly behaving clients are put on a + * garbage list and removed. + */ +static void +gst_tcpserversink_handle_clients (GstTCPServerSink * sink) +{ + int result; + fd_set testreadfds, testwritefds; + GList *clients, *error = NULL; + gboolean try_again; + + do { + try_again = FALSE; + + /* check for: + * - server socket input (ie, new client connections) + * - client socket input (ie, clients saying goodbye) + * - client socket output (ie, client reads) */ + testwritefds = sink->writefds; + testreadfds = sink->readfds; + FD_SET (sink->server_sock_fd, &testreadfds); + FD_SET (sink->control_sock[0], &testreadfds); + + GST_LOG_OBJECT (sink, "doing select on server + client fds for reads"); + gst_tcpserversink_debug_fdset (sink, &testreadfds); + GST_LOG_OBJECT (sink, "doing select on client fds for writes"); + gst_tcpserversink_debug_fdset (sink, &testwritefds); + + result = + select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, NULL); + + /* < 0 is an error, 0 just means a timeout happened */ + if (result < 0) { + GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), + ("select failed: %s", g_strerror (errno))); + return; + } + + GST_LOG_OBJECT (sink, "%d sockets had action", result); + GST_LOG_OBJECT (sink, "done select on server/client fds for reads"); + gst_tcpserversink_debug_fdset (sink, &testreadfds); + GST_LOG_OBJECT (sink, "done select on client fds for writes"); + gst_tcpserversink_debug_fdset (sink, &testwritefds); + + if (FD_ISSET (sink->control_sock[0], &testreadfds)) { + gchar command; + + READ_COMMAND (sink, command); + + switch (command) { + case CONTROL_RESTART: + /* need to restart the select call as the fd_set changed */ + try_again = TRUE; + break; + case CONTROL_STOP: + /* stop this function */ + return; + default: + g_warning ("tcpserversink: unknown control message received"); + break; + } + } + } while (try_again); + + if (FD_ISSET (sink->server_sock_fd, &testreadfds)) { + /* handle new client connection on server socket */ + if (!gst_tcpserversink_handle_server_read (sink)) { + GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), + ("client connection failed: %s", g_strerror (errno))); + return; + } + } + + /* Check the reads */ + g_mutex_lock (sink->clientslock); + for (clients = sink->clients; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + int fd; + + client = (GstTCPClient *) clients->data; + fd = client->fd; + + if (FD_ISSET (fd, &testreadfds)) { + /* handle client read */ + if (!gst_tcpserversink_handle_client_read (sink, client)) { + error = g_list_prepend (error, client); + continue; + } + } + if (FD_ISSET (fd, &testwritefds)) { + /* handle client write */ + if (!gst_tcpserversink_handle_client_write (sink, client)) { + error = g_list_prepend (error, client); + continue; + } + } + } + /* remove crappy clients */ + for (clients = error; clients; clients = g_list_next (clients)) { + GstTCPClient *client; + + client = (GstTCPClient *) error->data; + + GST_LOG_OBJECT (sink, "removing client %p with fd %d with errors", client, + client->fd); + gst_tcpserversink_client_remove (sink, client); + } + g_list_free (error); + g_mutex_unlock (sink->clientslock); +} + +static gpointer +gst_tcpserversink_thread (GstTCPServerSink * sink) +{ + while (sink->running) { + gst_tcpserversink_handle_clients (sink); + } + return NULL; +} + static void gst_tcpserversink_chain (GstPad * pad, GstData * _data) { - int result; - int fd; - fd_set testreadfds, testwritefds; - struct timeval timeout; - struct timeval *timeoutp; - GstBuffer *buf = GST_BUFFER (_data); GstTCPServerSink *sink; @@ -416,76 +799,11 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data) sink->streamheader = g_list_append (sink->streamheader, buf); return; } - /* if the incoming buffer has a duration, we can use that as the timeout - * value; otherwise, we block */ - timeout.tv_sec = 0; - timeout.tv_usec = 0; - timeoutp = NULL; - GST_LOG_OBJECT (sink, "incoming buffer duration: %" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_DURATION (buf))); - if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DURATION (buf))) { - GST_TIME_TO_TIMEVAL (GST_BUFFER_DURATION (buf), timeout); - timeoutp = &timeout; - GST_LOG_OBJECT (sink, "select will be with timeout %" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_DURATION (buf))); - GST_LOG_OBJECT (sink, "select will be with timeout %d.%d", - timeout.tv_sec, timeout.tv_usec); - } - /* check for: - * - server socket input (ie, new client connections) - * - client socket input (ie, clients saying goodbye) - * - client socket output (ie, client reads) */ - testwritefds = sink->clientfds; - testreadfds = sink->clientfds; - FD_SET (sink->server_sock_fd, &testreadfds); - GST_LOG_OBJECT (sink, "doing select on server + client fds for reads"); - gst_tcpserversink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "doing select on client fds for writes"); - gst_tcpserversink_debug_fdset (sink, &testwritefds); + /* queue the buffer */ + gst_tcpserversink_queue_buffer (sink, buf); - result = select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, - timeoutp); - /* < 0 is an error, 0 just means a timeout happened */ - if (result < 0) { - GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return; - } - GST_LOG_OBJECT (sink, "%d sockets had action", result); - GST_LOG_OBJECT (sink, "done select on server/client fds for reads"); - gst_tcpserversink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "done select on client fds for writes"); - gst_tcpserversink_debug_fdset (sink, &testwritefds); - - /* Check the reads */ - for (fd = 0; fd < FD_SETSIZE; fd++) { - if (FD_ISSET (fd, &testreadfds)) { - if (fd == sink->server_sock_fd) { - /* handle new client connection on server socket */ - if (!gst_tcpserversink_handle_server_read (sink)) - return; - } else { - /* handle client read */ - if (!gst_tcpserversink_handle_client_read (sink, fd)) - return; - } - } - } - - /* Check the writes */ - for (fd = 0; fd < FD_SETSIZE; fd++) { - if (FD_ISSET (fd, &testwritefds)) { - if (!gst_tcpserversink_handle_client_write (sink, fd, pad, buf)) { - gst_buffer_unref (buf); - return; - } - } - } sink->data_written += GST_BUFFER_SIZE (buf); - - gst_buffer_unref (buf); - } static void @@ -508,6 +826,12 @@ gst_tcpserversink_set_property (GObject * object, guint prop_id, case ARG_PROTOCOL: tcpserversink->protocol = g_value_get_enum (value); break; + case ARG_BUFFERS_MAX: + tcpserversink->buffers_max = g_value_get_int (value); + break; + case ARG_BUFFERS_SOFT_MAX: + tcpserversink->buffers_soft_max = g_value_get_int (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -534,6 +858,12 @@ gst_tcpserversink_get_property (GObject * object, guint prop_id, GValue * value, case ARG_PROTOCOL: g_value_set_enum (value, tcpserversink->protocol); break; + case ARG_BUFFERS_MAX: + g_value_set_int (value, tcpserversink->buffers_max); + break; + case ARG_BUFFERS_SOFT_MAX: + g_value_set_int (value, tcpserversink->buffers_soft_max); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -606,10 +936,18 @@ gst_tcpserversink_init_send (GstTCPServerSink * this) "listened on server socket %d, returning from connection setup", this->server_sock_fd); - FD_ZERO (&this->clientfds); - FD_ZERO (&this->caps_sent); - FD_ZERO (&this->streamheader_sent); - FD_SET (this->server_sock_fd, &this->clientfds); + FD_ZERO (&this->readfds); + FD_ZERO (&this->writefds); + FD_SET (this->server_sock_fd, &this->readfds); + + if (socketpair (PF_UNIX, SOCK_STREAM, 0, this->control_sock) < 0) { + perror ("creating socket pair"); + } + + this->running = TRUE; + this->thread = g_thread_create ((GThreadFunc) gst_tcpserversink_thread, + this, TRUE, NULL); + GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN); this->streamheader = NULL; @@ -621,6 +959,15 @@ gst_tcpserversink_init_send (GstTCPServerSink * this) static void gst_tcpserversink_close (GstTCPServerSink * this) { + this->running = FALSE; + + SEND_COMMAND (this, CONTROL_STOP); + + g_thread_join (this->thread); + + close (this->control_sock[0]); + close (this->control_sock[1]); + if (this->server_sock_fd != -1) { close (this->server_sock_fd); this->server_sock_fd = -1; diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h index 4499623684..903b62c80e 100644 --- a/gst/tcp/gsttcpserversink.h +++ b/gst/tcp/gsttcpserversink.h @@ -80,16 +80,28 @@ struct _GstTCPServerSink { size_t data_written; /* how much bytes have we written ? */ - fd_set clientfds; /* all the client file descriptors that are open */ - fd_set caps_sent; /* all the client file descriptors - * that have had caps sent */ - fd_set streamheader_sent; /* all the client file descriptors that have had - * streamheader sent */ + GMutex *clientslock; + GList *clients; /* list of clients we are serving */ + + fd_set readfds; /* all the client file descriptors that we can read from */ + fd_set writefds; /* all the client file descriptors that we can write to */ + + int control_sock[2]; /* sockets for controlling the select call */ GList *streamheader; /* GList of GstBuffers to use as streamheader */ GstTCPProtocolType protocol; guint mtu; GstClock *clock; + + GArray *bufqueue; + GMutex *queuelock; + GCond *queuecond; + + gboolean running; + GThread *thread; + + gint buffers_max; + gint buffers_soft_max; }; struct _GstTCPServerSinkClass {