From dfadb5df47b6591507c393570d5f952e93046abb Mon Sep 17 00:00:00 2001 From: Thomas Vander Stichele Date: Tue, 8 Jun 2004 10:44:59 +0000 Subject: [PATCH] use streamheader Original commit message from CVS: use streamheader --- ChangeLog | 10 +++ gst/tcp/gsttcpclientsrc.c | 1 + gst/tcp/gsttcpserversink.c | 169 ++++++++++++++++++++++++++----------- gst/tcp/gsttcpserversink.h | 1 + 4 files changed, 130 insertions(+), 51 deletions(-) diff --git a/ChangeLog b/ChangeLog index 0b0fc38cb0..4fc6440811 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,13 @@ +2004-06-08 Thomas Vander Stichele + + * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get): + * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_client_remove), + (gst_tcpserversink_handle_client_read), (gst_tcp_buffer_write), + (gst_tcpserversink_handle_client_write), (gst_tcpserversink_chain), + (gst_tcpserversink_init_send), (gst_tcpserversink_close): + * gst/tcp/gsttcpserversink.h: + take streamheader into account + 2004-06-08 Thomas Vander Stichele * gst/level/Makefile.am: diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c index 45bd4f58a7..1431c49164 100644 --- a/gst/tcp/gsttcpclientsrc.c +++ b/gst/tcp/gsttcpclientsrc.c @@ -231,6 +231,7 @@ gst_tcpclientsrc_get (GstPad * pad) ("ioctl failed: %s", g_strerror (errno))); return NULL; } + GST_LOG_OBJECT (src, "ioctl says %d bytes available", readsize); buf = gst_buffer_new_and_alloc (readsize); break; case GST_TCP_PROTOCOL_TYPE_GDP: diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c index cc6d268b37..d9201862cf 100644 --- a/gst/tcp/gsttcpserversink.c +++ b/gst/tcp/gsttcpserversink.c @@ -229,8 +229,24 @@ gst_tcpserversink_handle_server_read (GstTCPServerSink * sink) return TRUE; } +static void +gst_tcpserversink_client_remove (GstTCPServerSink * sink, int fd) +{ + /* FIXME: if we keep track of ip we can log it here and signal */ + GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); + if (close (fd) != 0) { + GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); + } + FD_CLR (fd, &sink->clientfds); + FD_CLR (fd, &sink->caps_sent); + FD_CLR (fd, &sink->streamheader_sent); + g_signal_emit (G_OBJECT (sink), + gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); +} + /* handle a read on a client fd, - * which either indicates a close or should be ignored */ + * which either indicates a close or should be ignored + * returns FALSE if the client has been closed. */ static gboolean gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd) { @@ -241,17 +257,9 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd) ioctl (fd, FIONREAD, &nread); if (nread == 0) { /* client sent close, so remove it */ - GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); - if (close (fd) != 0) { - GST_ELEMENT_ERROR (sink, RESOURCE, CLOSE, (NULL), - ("error closing fd %d: %s", fd, g_strerror (errno))); - return FALSE; - } - FD_CLR (fd, &sink->clientfds); - FD_CLR (fd, &sink->caps_sent); - /* FIXME: we need to keep track of IP info so we can signal it here */ - g_signal_emit (G_OBJECT (sink), - gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); + GST_DEBUG_OBJECT (sink, "client asked for close, removing on fd %d", fd); + gst_tcpserversink_client_remove (sink, fd); + return FALSE; } else { /* FIXME: we should probably just Read 'n' Drop */ g_warning ("Don't know what to do with %d bytes to read", nread); @@ -259,40 +267,25 @@ gst_tcpserversink_handle_client_read (GstTCPServerSink * sink, int fd) return TRUE; } -/* handle a write on a client fd, - * which indicates a read request from a client */ -static gboolean -gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd, - GstPad * pad, GstBuffer * buf) +/* Write a buffer to the given fd for the given element using the given + * protocol. + * Return number of buffer bytes written. + */ +static gint +gst_tcp_buffer_write (GstBuffer * buf, int fd, GstElement * element, + GstTCPProtocolType protocol) { + gint wrote = 0; + /* write the buffer header if we have one */ - switch (sink->protocol) { + switch (protocol) { case GST_TCP_PROTOCOL_TYPE_NONE: break; case GST_TCP_PROTOCOL_TYPE_GDP: - /* if we haven't sent caps yet, send them first */ - if (!FD_ISSET (fd, &(sink->caps_sent))) { - const GstCaps *caps; - gchar *string; - - caps = GST_PAD_CAPS (GST_PAD_PEER (pad)); - string = gst_caps_to_string (caps); - GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string, - fd); - /* FIXME: fix this again so that write_caps is non-fatal for multiple clients; also use a fd, host, port struct */ - if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE, - "unknown", 0)) { - g_free (string); - return FALSE; - } - g_free (string); - FD_SET (fd, &(sink->caps_sent)); - } - GST_LOG_OBJECT (sink, "Sending buffer header through GDP"); - if (!gst_tcp_gdp_write_header (GST_ELEMENT (sink), fd, buf, FALSE, - "unknown", 0)) - return FALSE; + GST_LOG_OBJECT (element, "Sending buffer header through GDP"); + if (!gst_tcp_gdp_write_header (element, fd, buf, FALSE, "unknown", 0)) + return 0; break; default: g_warning ("Unhandled protocol type"); @@ -300,11 +293,9 @@ gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd, } /* serve data to client */ - GST_LOG_OBJECT (sink, "serving data buffer of size %d to client on fd %d", + GST_LOG_OBJECT (element, "serving data buffer of size %d to client on fd %d", GST_BUFFER_SIZE (buf), fd); - int wrote = 0; - wrote = gst_tcp_socket_write (fd, GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf)); @@ -312,24 +303,78 @@ gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd, /* FIXME: keep track of client ip and port and so on */ /* GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, - (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port), + (_("Error while sending data to \"%s:%d\"."), + sink->host, sink->port), ("Only %d of %d bytes written: %s", bytes_written, GST_BUFFER_SIZE (buf), g_strerror (errno))); */ /* FIXME: there should be a better way to report problems, since we want to continue for other clients and just drop this particular one */ - GST_DEBUG_OBJECT (sink, "Write failed: %d of %d bytes written", wrote, + GST_DEBUG_OBJECT (element, "Write failed: %d of %d bytes written", wrote, GST_BUFFER_SIZE (buf)); + } + + return wrote; +} + +/* handle a write on a client fd, + * which indicates a read request from a client */ +static gboolean +gst_tcpserversink_handle_client_write (GstTCPServerSink * sink, int fd, + GstPad * pad, GstBuffer * buf) +{ + gint wrote = 0; + + /* when using GDP, first check if we have sent caps yet */ + if (sink->protocol == GST_TCP_PROTOCOL_TYPE_GDP) { + if (!FD_ISSET (fd, &(sink->caps_sent))) { + const GstCaps *caps; + gchar *string; + + caps = GST_PAD_CAPS (GST_PAD_PEER (pad)); + string = gst_caps_to_string (caps); + GST_DEBUG_OBJECT (sink, "Sending caps %s for fd %d through GDP", string, + fd); + if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), fd, caps, FALSE, + "unknown", 0)) { + GST_DEBUG_OBJECT (sink, "Failed sending caps, removing client"); + gst_tcpserversink_client_remove (sink, fd); + g_free (string); + return FALSE; + } + g_free (string); + FD_SET (fd, &(sink->caps_sent)); + } + } + /* if we have streamheader buffers, and haven't sent them to this client + * yet, send them out one by one */ + if (!FD_ISSET (fd, &(sink->streamheader_sent))) { + if (sink->streamheader) { + GList *l; + + for (l = sink->streamheader; l; l = l->next) { + wrote = gst_tcp_buffer_write (l->data, fd, GST_ELEMENT (sink), + sink->protocol); + if (wrote < GST_BUFFER_SIZE (l->data)) { + GST_DEBUG_OBJECT (sink, + "Failed sending streamheader, removing client"); + gst_tcpserversink_client_remove (sink, fd); + } + } + } + FD_SET (fd, &(sink->streamheader_sent)); + } + + /* now we write the data buffer */ + wrote = gst_tcp_buffer_write (buf, fd, GST_ELEMENT (sink), sink->protocol); + if (wrote < GST_BUFFER_SIZE (buf)) { + gst_tcpserversink_client_remove (sink, fd); /* write failed, so drop the client */ GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); if (close (fd) != 0) { GST_DEBUG_OBJECT (sink, "error closing fd %d after failed write: %s", fd, g_strerror (errno)); } - FD_CLR (fd, &sink->clientfds); - FD_CLR (fd, &sink->caps_sent); - g_signal_emit (G_OBJECT (sink), - gst_tcpserversink_signals[SIGNAL_CLIENT_REMOVED], 0, NULL, fd); return FALSE; } return TRUE; @@ -354,10 +399,23 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data) g_return_if_fail (GST_FLAG_IS_SET (sink, GST_TCPSERVERSINK_OPEN)); if (GST_IS_EVENT (buf)) { - g_warning ("FIXME: handl events"); + g_warning ("FIXME: handle events"); return; } + /* if the incoming buffer is marked as IN CAPS, then we assume for now + * it's a streamheader that needs to be sent to each new client, so we + * put it on our internal list of streamheader buffers. + * After that we return, since we only send these out when we get + * non IN_CAPS buffers so we properly keep track of clients that got + * streamheaders. */ + if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_IN_CAPS)) { + GST_DEBUG_OBJECT (sink, + "appending IN_CAPS buffer with length %d to streamheader", + GST_BUFFER_SIZE (buf)); + sink->streamheader = g_list_append (sink->streamheader, buf); + return; + } /* if the incoming buffer has a duration, we can use that as the timeout * value; otherwise, we block */ timeout.tv_sec = 0; @@ -428,7 +486,6 @@ gst_tcpserversink_chain (GstPad * pad, GstData * _data) gst_buffer_unref (buf); - /* FIXME: emit signal ? */ } static void @@ -551,8 +608,10 @@ gst_tcpserversink_init_send (GstTCPServerSink * this) FD_ZERO (&this->clientfds); FD_ZERO (&this->caps_sent); + FD_ZERO (&this->streamheader_sent); FD_SET (this->server_sock_fd, &this->clientfds); GST_FLAG_SET (this, GST_TCPSERVERSINK_OPEN); + this->streamheader = NULL; this->data_written = 0; @@ -567,6 +626,14 @@ gst_tcpserversink_close (GstTCPServerSink * this) this->server_sock_fd = -1; } + if (this->streamheader) { + GList *l; + + for (l = sink->streamheader; l; l = l->next) { + gst_buffer_unref (l->data); + } + g_list_free (this->streamheader); + } GST_FLAG_UNSET (this, GST_TCPSERVERSINK_OPEN); } diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h index fb496a6a11..4499623684 100644 --- a/gst/tcp/gsttcpserversink.h +++ b/gst/tcp/gsttcpserversink.h @@ -86,6 +86,7 @@ struct _GstTCPServerSink { fd_set streamheader_sent; /* all the client file descriptors that have had * streamheader sent */ + GList *streamheader; /* GList of GstBuffers to use as streamheader */ GstTCPProtocolType protocol; guint mtu; GstClock *clock;