gst/tcp/: Some extra checks in gstfdset.

Original commit message from CVS:
* gst/tcp/gstfdset.c: (gst_fdset_free), (gst_fdset_set_mode),
(gst_fdset_get_mode), (gst_fdset_add_fd), (gst_fdset_remove_fd),
(gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read),
(gst_fdset_fd_has_closed), (gst_fdset_fd_has_error),
(gst_fdset_fd_can_read), (gst_fdset_fd_can_write),
(gst_fdset_wait):
* gst/tcp/gstfdset.h:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write):
* gst/tcp/gstmultifdsink.h:
Some extra checks in gstfdset.
Only use send() when the fd is a socket. Don't try to
read from write only fds.
This commit is contained in:
Wim Taymans 2004-08-18 16:13:19 +00:00
parent 36db5cb890
commit 5df309dd0f
5 changed files with 94 additions and 13 deletions

View File

@ -1,3 +1,20 @@
2004-08-18 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstfdset.c: (gst_fdset_free), (gst_fdset_set_mode),
(gst_fdset_get_mode), (gst_fdset_add_fd), (gst_fdset_remove_fd),
(gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read),
(gst_fdset_fd_has_closed), (gst_fdset_fd_has_error),
(gst_fdset_fd_can_read), (gst_fdset_fd_can_write),
(gst_fdset_wait):
* gst/tcp/gstfdset.h:
* gst/tcp/gstmultifdsink.c: (gst_multifdsink_add),
(gst_multifdsink_client_queue_buffer),
(gst_multifdsink_handle_client_write):
* gst/tcp/gstmultifdsink.h:
Some extra checks in gstfdset.
Only use send() when the fd is a socket. Don't try to
read from write only fds.
2004-08-18 Wim Taymans <wim@fluendo.com> 2004-08-18 Wim Taymans <wim@fluendo.com>
* gst/tcp/gstfdset.c: (gst_fdset_add_fd), (gst_fdset_remove_fd), * gst/tcp/gstfdset.c: (gst_fdset_add_fd), (gst_fdset_remove_fd),

View File

@ -128,6 +128,8 @@ gst_fdset_new (GstFDSetMode mode)
void void
gst_fdset_free (GstFDSet * set) gst_fdset_free (GstFDSet * set)
{ {
g_return_if_fail (set != NULL);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
break; break;
@ -147,21 +149,30 @@ gst_fdset_free (GstFDSet * set)
void void
gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode) gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode)
{ {
g_return_if_fail (set != NULL);
g_warning ("implement me"); g_warning ("implement me");
} }
GstFDSetMode GstFDSetMode
gst_fdset_get_mode (GstFDSet * set) gst_fdset_get_mode (GstFDSet * set)
{ {
g_return_val_if_fail (set != NULL, FALSE);
return set->mode; return set->mode;
} }
void gboolean
gst_fdset_add_fd (GstFDSet * set, GstFD * fd) gst_fdset_add_fd (GstFDSet * set, GstFD * fd)
{ {
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
/* nothing */ res = TRUE;
break; break;
case GST_FDSET_MODE_POLL: case GST_FDSET_MODE_POLL:
{ {
@ -193,26 +204,36 @@ gst_fdset_add_fd (GstFDSet * set, GstFD * fd)
set->free = -1; set->free = -1;
g_mutex_unlock (set->poll_lock); g_mutex_unlock (set->poll_lock);
res = TRUE;
break; break;
} }
case GST_FDSET_MODE_EPOLL: case GST_FDSET_MODE_EPOLL:
break; break;
} }
return res;
} }
void gboolean
gst_fdset_remove_fd (GstFDSet * set, GstFD * fd) gst_fdset_remove_fd (GstFDSet * set, GstFD * fd)
{ {
gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
/* nothing */ /* nothing */
FD_CLR (fd->fd, &set->writefds); FD_CLR (fd->fd, &set->writefds);
FD_CLR (fd->fd, &set->readfds); FD_CLR (fd->fd, &set->readfds);
res = TRUE;
break; break;
case GST_FDSET_MODE_POLL: case GST_FDSET_MODE_POLL:
{ {
g_mutex_lock (set->poll_lock); g_mutex_lock (set->poll_lock);
/* FIXME on some platforms poll doesn't ignore the fd
* when set to -1 */
set->pollfds[fd->idx].fd = -1; set->pollfds[fd->idx].fd = -1;
set->pollfds[fd->idx].events = 0; set->pollfds[fd->idx].events = 0;
set->pollfds[fd->idx].revents = 0; set->pollfds[fd->idx].revents = 0;
@ -229,16 +250,21 @@ gst_fdset_remove_fd (GstFDSet * set, GstFD * fd)
set->free = MIN (set->free, fd->idx); set->free = MIN (set->free, fd->idx);
} }
g_mutex_unlock (set->poll_lock); g_mutex_unlock (set->poll_lock);
res = TRUE;
break; break;
} }
case GST_FDSET_MODE_EPOLL: case GST_FDSET_MODE_EPOLL:
break; break;
} }
return res;
} }
void void
gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active) gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active)
{ {
g_return_if_fail (set != NULL);
g_return_if_fail (fd != NULL);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
if (active) if (active)
@ -266,6 +292,9 @@ gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active)
void void
gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active) gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active)
{ {
g_return_if_fail (set != NULL);
g_return_if_fail (fd != NULL);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
if (active) if (active)
@ -295,6 +324,9 @@ gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd)
{ {
gboolean res = FALSE; gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
res = FALSE; res = FALSE;
@ -320,6 +352,9 @@ gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd)
{ {
gboolean res = FALSE; gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
res = FALSE; res = FALSE;
@ -345,6 +380,9 @@ gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd)
{ {
gboolean res = FALSE; gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
res = FD_ISSET (fd->fd, &set->testreadfds); res = FD_ISSET (fd->fd, &set->testreadfds);
@ -370,6 +408,9 @@ gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd)
{ {
gboolean res = FALSE; gboolean res = FALSE;
g_return_val_if_fail (set != NULL, FALSE);
g_return_val_if_fail (fd != NULL, FALSE);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
res = FD_ISSET (fd->fd, &set->testwritefds); res = FD_ISSET (fd->fd, &set->testwritefds);
@ -390,11 +431,13 @@ gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd)
return res; return res;
} }
int gint
gst_fdset_wait (GstFDSet * set, int timeout) gst_fdset_wait (GstFDSet * set, int timeout)
{ {
int res = -1; int res = -1;
g_return_val_if_fail (set != NULL, -1);
switch (set->mode) { switch (set->mode) {
case GST_FDSET_MODE_SELECT: case GST_FDSET_MODE_SELECT:
{ {

View File

@ -51,8 +51,8 @@ void gst_fdset_free (GstFDSet *set);
void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode); void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode);
GstFDSetMode gst_fdset_get_mode (GstFDSet *set); GstFDSetMode gst_fdset_get_mode (GstFDSet *set);
void gst_fdset_add_fd (GstFDSet *set, GstFD *fd); gboolean gst_fdset_add_fd (GstFDSet *set, GstFD *fd);
void gst_fdset_remove_fd (GstFDSet *set, GstFD *fd); gboolean gst_fdset_remove_fd (GstFDSet *set, GstFD *fd);
void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active); void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active);
void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active); void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active);
@ -62,7 +62,7 @@ gboolean gst_fdset_fd_has_error (GstFDSet *set, GstFD *fd);
gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd); gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd);
gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd); gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd);
int gst_fdset_wait (GstFDSet *set, int timeout); gint gst_fdset_wait (GstFDSet *set, int timeout);
G_END_DECLS G_END_DECLS

View File

@ -28,6 +28,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h>
#ifdef HAVE_FIONREAD_IN_SYS_FILIO #ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h> #include <sys/filio.h>
@ -391,6 +392,8 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
{ {
GstTCPClient *client; GstTCPClient *client;
GTimeVal now; GTimeVal now;
gint flags, res;
struct stat statbuf;
GST_DEBUG_OBJECT (sink, "adding client on fd %d", fd); GST_DEBUG_OBJECT (sink, "adding client on fd %d", fd);
@ -418,10 +421,20 @@ gst_multifdsink_add (GstMultiFdSink * sink, int fd)
sink->clients = g_list_prepend (sink->clients, client); sink->clients = g_list_prepend (sink->clients, client);
/* set the socket to non blocking */ /* set the socket to non blocking */
fcntl (fd, F_SETFL, O_NONBLOCK); res = fcntl (fd, F_SETFL, O_NONBLOCK);
/* we always read from a client */ /* we always read from a client */
gst_fdset_add_fd (sink->fdset, &client->fd); gst_fdset_add_fd (sink->fdset, &client->fd);
gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
/* we don't try to read from write only fds */
flags = fcntl (fd, F_GETFL, 0);
if ((flags & O_ACCMODE) != O_WRONLY) {
gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE);
}
/* figure out the mode, can't use send() for non sockets */
res = fstat (fd, &statbuf);
if (S_ISSOCK (statbuf.st_mode)) {
client->is_socket = TRUE;
}
SEND_COMMAND (sink, CONTROL_RESTART); SEND_COMMAND (sink, CONTROL_RESTART);
@ -858,16 +871,23 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink,
#else #else
#define FLAGS 0 #define FLAGS 0
#endif #endif
wrote = if (client->is_socket) {
send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize, FLAGS); wrote =
send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize,
FLAGS);
} else {
wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize);
}
if (wrote < 0) { if (wrote < 0) {
/* hmm error.. */ /* hmm error.. */
if (errno == EAGAIN) { if (errno == EAGAIN) {
/* nothing serious, resource was unavailable, try again later */ /* nothing serious, resource was unavailable, try again later */
more = FALSE; more = FALSE;
} else { } else {
GST_WARNING_OBJECT (sink, "could not write, removing client on fd %d", GST_WARNING_OBJECT (sink,
fd); "could not write, removing client on fd %d: %s", fd,
g_strerror (errno));
client->status = GST_CLIENT_STATUS_ERROR; client->status = GST_CLIENT_STATUS_ERROR;
return FALSE; return FALSE;
} }

View File

@ -87,6 +87,7 @@ typedef struct {
gint bufpos; /* position of this client in the global queue */ gint bufpos; /* position of this client in the global queue */
GstClientStatus status; GstClientStatus status;
gboolean is_socket;
GSList *sending; /* the buffers we need to send */ GSList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */ gint bufoffset; /* offset in the first buffer */