tcpserversink: Port to GIO

And change multifdsink to GIO too and rename it to multisocketsink
because it only works on GSockets now, not generic fds.
This commit is contained in:
Sebastian Dröge 2012-01-16 10:08:24 +01:00
parent d29c7826ab
commit 17254bbd23
10 changed files with 949 additions and 1193 deletions

View File

@ -82,7 +82,7 @@ EXTRA_HFILES = \
$(top_srcdir)/gst/playback/gstsubtitleoverlay.h \ $(top_srcdir)/gst/playback/gstsubtitleoverlay.h \
$(top_srcdir)/gst/audiorate/gstaudiorate.h \ $(top_srcdir)/gst/audiorate/gstaudiorate.h \
$(top_srcdir)/gst/audioresample/gstaudioresample.h \ $(top_srcdir)/gst/audioresample/gstaudioresample.h \
$(top_srcdir)/gst/tcp/gstmultifdsink.h \ $(top_srcdir)/gst/tcp/gstmultisocketsink.h \
$(top_srcdir)/gst/tcp/gsttcpclientsrc.h \ $(top_srcdir)/gst/tcp/gsttcpclientsrc.h \
$(top_srcdir)/gst/tcp/gsttcpclientsink.h \ $(top_srcdir)/gst/tcp/gsttcpclientsink.h \
$(top_srcdir)/gst/tcp/gsttcpserversrc.h \ $(top_srcdir)/gst/tcp/gsttcpserversrc.h \

View File

@ -16,8 +16,8 @@ BUILT_SOURCES = $(built_sources) $(built_headers)
libgsttcp_la_SOURCES = \ libgsttcp_la_SOURCES = \
gsttcpplugin.c \ gsttcpplugin.c \
gsttcp.c \ gsttcp.c \
gstmultifdsink.c \
gsttcpclientsrc.c gsttcpclientsink.c \ gsttcpclientsrc.c gsttcpclientsink.c \
gstmultisocketsink.c \
gsttcpserversrc.c gsttcpserversink.c gsttcpserversrc.c gsttcpserversink.c
nodist_libgsttcp_la_SOURCES = \ nodist_libgsttcp_la_SOURCES = \
@ -29,10 +29,9 @@ libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_LIBS) $(GIO_LIBS)
libgsttcp_la_LIBTOOLFLAGS = --tag=disable-static libgsttcp_la_LIBTOOLFLAGS = --tag=disable-static
noinst_HEADERS = \ noinst_HEADERS = \
gsttcpplugin.h \
gsttcp.h \ gsttcp.h \
gstmultifdsink.h \
gsttcpclientsrc.h gsttcpclientsink.h \ gsttcpclientsrc.h gsttcpclientsink.h \
gstmultisocketsink.h \
gsttcpserversrc.h gsttcpserversink.h gsttcpserversrc.h gsttcpserversink.h
CLEANFILES = $(BUILT_SOURCES) CLEANFILES = $(BUILT_SOURCES)

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,8 @@
/* GStreamer /* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
* Copyright (C) <2011> Collabora Ltd.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
@ -19,38 +21,37 @@
*/ */
#ifndef __GST_MULTI_FD_SINK_H__ #ifndef __GST_MULTI_SOCKET_SINK_H__
#define __GST_MULTI_FD_SINK_H__ #define __GST_MULTI_SOCKET_SINK_H__
#include <gst/gst.h> #include <gst/gst.h>
#include <gst/base/gstbasesink.h> #include <gst/base/gstbasesink.h>
#include <gio/gio.h>
G_BEGIN_DECLS G_BEGIN_DECLS
#include "gsttcp.h" #define GST_TYPE_MULTI_SOCKET_SINK \
(gst_multi_socket_sink_get_type())
#define GST_TYPE_MULTI_FD_SINK \ #define GST_MULTI_SOCKET_SINK(obj) \
(gst_multi_fd_sink_get_type()) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_SOCKET_SINK,GstMultiSocketSink))
#define GST_MULTI_FD_SINK(obj) \ #define GST_MULTI_SOCKET_SINK_CLASS(klass) \
(G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_FD_SINK,GstMultiFdSink)) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_SOCKET_SINK,GstMultiSocketSinkClass))
#define GST_MULTI_FD_SINK_CLASS(klass) \ #define GST_IS_MULTI_SOCKET_SINK(obj) \
(G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_FD_SINK,GstMultiFdSinkClass)) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_SOCKET_SINK))
#define GST_IS_MULTI_FD_SINK(obj) \ #define GST_IS_MULTI_SOCKET_SINK_CLASS(klass) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_FD_SINK)) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_SOCKET_SINK))
#define GST_IS_MULTI_FD_SINK_CLASS(klass) \ #define GST_MULTI_SOCKET_SINK_GET_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_FD_SINK)) (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_MULTI_SOCKET_SINK, GstMultiSocketSinkClass))
#define GST_MULTI_FD_SINK_GET_CLASS(klass) \
(G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_MULTI_FD_SINK, GstMultiFdSinkClass))
typedef struct _GstMultiFdSink GstMultiFdSink; typedef struct _GstMultiSocketSink GstMultiSocketSink;
typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; typedef struct _GstMultiSocketSinkClass GstMultiSocketSinkClass;
typedef enum { typedef enum {
GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), GST_MULTI_SOCKET_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0),
GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) GST_MULTI_SOCKET_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2)
} GstMultiFdSinkFlags; } GstMultiSocketSinkFlags;
/** /**
* GstRecoverPolicy: * GstRecoverPolicy:
@ -95,23 +96,6 @@ typedef enum
GST_SYNC_METHOD_BURST_WITH_KEYFRAME GST_SYNC_METHOD_BURST_WITH_KEYFRAME
} GstSyncMethod; } GstSyncMethod;
/**
* GstTCPUnitType:
* @GST_TCP_UNIT_TYPE_UNDEFINED: undefined
* @GST_TCP_UNIT_TYPE_BUFFERS : buffers
* @GST_TCP_UNIT_TYPE_TIME : timeunits (in nanoseconds)
* @GST_TCP_UNIT_TYPE_BYTES : bytes
*
* The units used to specify limits.
*/
typedef enum
{
GST_TCP_UNIT_TYPE_UNDEFINED,
GST_TCP_UNIT_TYPE_BUFFERS,
GST_TCP_UNIT_TYPE_TIME,
GST_TCP_UNIT_TYPE_BYTES
} GstTCPUnitType;
/** /**
* GstClientStatus: * GstClientStatus:
* @GST_CLIENT_STATUS_OK : client is ok * @GST_CLIENT_STATUS_OK : client is ok
@ -123,7 +107,7 @@ typedef enum
* @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers. * @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers.
* *
* This specifies the reason why a client was removed from * This specifies the reason why a client was removed from
* multifdsink and is received in the "client-removed" signal. * multisocketsink and is received in the "client-removed" signal.
*/ */
typedef enum typedef enum
{ {
@ -139,30 +123,29 @@ typedef enum
/* structure for a client /* structure for a client
*/ */
typedef struct { typedef struct {
GstPollFD fd; GSocket *socket;
GSource *source;
gint bufpos; /* position of this client in the global queue */ gint bufpos; /* position of this client in the global queue */
gint flushcount; /* the remaining number of buffers to flush out or -1 if the gint flushcount; /* the remaining number of buffers to flush out or -1 if the
client is not flushing. */ client is not flushing. */
GstClientStatus status; GstClientStatus status;
gboolean is_socket;
GSList *sending; /* the buffers we need to send */ GSList *sending; /* the buffers we need to send */
gint bufoffset; /* offset in the first buffer */ gint bufoffset; /* offset in the first buffer */
gboolean discont; gboolean discont;
gboolean caps_sent;
gboolean new_connection; gboolean new_connection;
gboolean currently_removing; gboolean currently_removing;
/* method to sync client when connecting */ /* method to sync client when connecting */
GstSyncMethod sync_method; GstSyncMethod sync_method;
GstTCPUnitType burst_min_unit; GstFormat burst_min_format;
guint64 burst_min_value; guint64 burst_min_value;
GstTCPUnitType burst_max_unit; GstFormat burst_max_format;
guint64 burst_max_value; guint64 burst_max_value;
GstCaps *caps; /* caps of last queued buffer */ GstCaps *caps; /* caps of last queued buffer */
@ -176,19 +159,19 @@ typedef struct {
guint64 avg_queue_size; guint64 avg_queue_size;
guint64 first_buffer_ts; guint64 first_buffer_ts;
guint64 last_buffer_ts; guint64 last_buffer_ts;
} GstTCPClient; } GstSocketClient;
#define CLIENTS_LOCK_INIT(fdsink) (g_static_rec_mutex_init(&fdsink->clientslock)) #define CLIENTS_LOCK_INIT(socketsink) (g_static_rec_mutex_init(&socketsink->clientslock))
#define CLIENTS_LOCK_FREE(fdsink) (g_static_rec_mutex_free(&fdsink->clientslock)) #define CLIENTS_LOCK_FREE(socketsink) (g_static_rec_mutex_free(&socketsink->clientslock))
#define CLIENTS_LOCK(fdsink) (g_static_rec_mutex_lock(&fdsink->clientslock)) #define CLIENTS_LOCK(socketsink) (g_static_rec_mutex_lock(&socketsink->clientslock))
#define CLIENTS_UNLOCK(fdsink) (g_static_rec_mutex_unlock(&fdsink->clientslock)) #define CLIENTS_UNLOCK(socketsink) (g_static_rec_mutex_unlock(&socketsink->clientslock))
/** /**
* GstMultiFdSink: * GstMultiSocketSink:
* *
* The multifdsink object structure. * The multisocketsink object structure.
*/ */
struct _GstMultiFdSink { struct _GstMultiSocketSink {
GstBaseSink element; GstBaseSink element;
/*< private >*/ /*< private >*/
@ -197,17 +180,16 @@ struct _GstMultiFdSink {
GStaticRecMutex clientslock; /* lock to protect the clients list */ GStaticRecMutex clientslock; /* lock to protect the clients list */
GList *clients; /* list of clients we are serving */ GList *clients; /* list of clients we are serving */
GHashTable *fd_hash; /* index on fd to client */ GHashTable *socket_hash; /* index on socket to client */
guint clients_cookie; /* Cookie to detect changes to the clients list */ guint clients_cookie; /* Cookie to detect changes to the clients list */
gint mode; GMainContext *main_context;
GstPoll *fdset; GCancellable *cancellable;
GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ GSList *streamheader; /* GSList of GstBuffers to use as streamheader */
gboolean previous_buffer_in_caps; gboolean previous_buffer_in_caps;
guint mtu; guint mtu;
gint qos_dscp;
gboolean handle_read; gboolean handle_read;
GArray *bufqueue; /* global queue of buffers */ GArray *bufqueue; /* global queue of buffers */
@ -217,14 +199,14 @@ struct _GstMultiFdSink {
/* these values are used to check if a client is reading fast /* these values are used to check if a client is reading fast
* enough and to control receovery */ * enough and to control receovery */
GstTCPUnitType unit_type;/* the type of the units */ GstFormat unit_type;/* the format of the units */
gint64 units_max; /* max units to queue for a client */ gint64 units_max; /* max units to queue for a client */
gint64 units_soft_max; /* max units a client can lag before recovery starts */ gint64 units_soft_max; /* max units a client can lag before recovery starts */
GstRecoverPolicy recover_policy; GstRecoverPolicy recover_policy;
GstClockTime timeout; /* max amount of nanoseconds to remain idle */ GstClockTime timeout; /* max amount of nanoseconds to remain idle */
GstSyncMethod def_sync_method; /* what method to use for connecting clients */ GstSyncMethod def_sync_method; /* what method to use for connecting clients */
GstTCPUnitType def_burst_unit; GstFormat def_burst_format;
guint64 def_burst_value; guint64 def_burst_value;
/* these values are used to control the amount of data /* these values are used to control the amount of data
@ -244,42 +226,41 @@ struct _GstMultiFdSink {
guint8 header_flags; guint8 header_flags;
}; };
struct _GstMultiFdSinkClass { struct _GstMultiSocketSinkClass {
GstBaseSinkClass parent_class; GstBaseSinkClass parent_class;
/* element methods */ /* element methods */
void (*add) (GstMultiFdSink *sink, int fd); void (*add) (GstMultiSocketSink *sink, GSocket *socket);
void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync, void (*add_full) (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
GstTCPUnitType format, guint64 value, GstFormat format, guint64 value,
GstTCPUnitType max_unit, guint64 max_value); GstFormat max_format, guint64 max_value);
void (*remove) (GstMultiFdSink *sink, int fd); void (*remove) (GstMultiSocketSink *sink, GSocket *socket);
void (*remove_flush) (GstMultiFdSink *sink, int fd); void (*remove_flush) (GstMultiSocketSink *sink, GSocket *socket);
void (*clear) (GstMultiFdSink *sink); void (*clear) (GstMultiSocketSink *sink);
GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd); GstStructure* (*get_stats) (GstMultiSocketSink *sink, GSocket *socket);
/* vtable */ /* vtable */
gboolean (*init) (GstMultiFdSink *sink); gboolean (*init) (GstMultiSocketSink *sink);
gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set); gboolean (*close) (GstMultiSocketSink *sink);
gboolean (*close) (GstMultiFdSink *sink); void (*removed) (GstMultiSocketSink *sink, GSocket *socket);
void (*removed) (GstMultiFdSink *sink, int fd);
/* signals */ /* signals */
void (*client_added) (GstElement *element, gint fd); void (*client_added) (GstElement *element, GSocket *socket);
void (*client_removed) (GstElement *element, gint fd, GstClientStatus status); void (*client_removed) (GstElement *element, GSocket *socket, GstClientStatus status);
void (*client_fd_removed) (GstElement *element, gint fd); void (*client_socket_removed) (GstElement *element, GSocket *socket);
}; };
GType gst_multi_fd_sink_get_type (void); GType gst_multi_socket_sink_get_type (void);
void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); void gst_multi_socket_sink_add (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, void gst_multi_socket_sink_add_full (GstMultiSocketSink *sink, GSocket *socket, GstSyncMethod sync,
GstTCPUnitType min_unit, guint64 min_value, GstFormat min_format, guint64 min_value,
GstTCPUnitType max_unit, guint64 max_value); GstFormat max_format, guint64 max_value);
void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); void gst_multi_socket_sink_remove (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd); void gst_multi_socket_sink_remove_flush (GstMultiSocketSink *sink, GSocket *socket);
void gst_multi_fd_sink_clear (GstMultiFdSink *sink); void gst_multi_socket_sink_clear (GstMultiSocketSink *sink);
GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); GstStructure* gst_multi_socket_sink_get_stats (GstMultiSocketSink *sink, GSocket *socket);
G_END_DECLS G_END_DECLS
#endif /* __GST_MULTI_FD_SINK_H__ */ #endif /* __GST_MULTI_SOCKET_SINK_H__ */

View File

@ -1,5 +1,4 @@
VOID:STRING,UINT VOID:STRING,UINT
VOID:INT VOID:OBJECT,ENUM
VOID:INT,BOXED VOID:OBJECT,ENUM,ENUM,UINT64,ENUM,UINT64
VOID:INT,ENUM,INT,UINT64,INT,UINT64 BOXED:OBJECT
BOXED:INT

View File

@ -25,7 +25,6 @@
#include "gsttcp-enumtypes.h" #include "gsttcp-enumtypes.h"
#include <gst/gst.h> #include <gst/gst.h>
#undef GST_DISABLE_DEPRECATED
#define TCP_HIGHEST_PORT 65535 #define TCP_HIGHEST_PORT 65535
#define TCP_DEFAULT_HOST "localhost" #define TCP_DEFAULT_HOST "localhost"

View File

@ -25,7 +25,7 @@
#include "gsttcpclientsink.h" #include "gsttcpclientsink.h"
#include "gsttcpserversrc.h" #include "gsttcpserversrc.h"
#include "gsttcpserversink.h" #include "gsttcpserversink.h"
#include "gstmultifdsink.h" #include "gstmultisocketsink.h"
GST_DEBUG_CATEGORY (tcp_debug); GST_DEBUG_CATEGORY (tcp_debug);
@ -44,8 +44,8 @@ plugin_init (GstPlugin * plugin)
if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE, if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE,
GST_TYPE_TCP_SERVER_SRC)) GST_TYPE_TCP_SERVER_SRC))
return FALSE; return FALSE;
if (!gst_element_register (plugin, "multifdsink", GST_RANK_NONE, if (!gst_element_register (plugin, "multisocketsink", GST_RANK_NONE,
GST_TYPE_MULTI_FD_SINK)) GST_TYPE_MULTI_SOCKET_SINK))
return FALSE; return FALSE;
GST_DEBUG_CATEGORY_INIT (tcp_debug, "tcp", 0, "TCP calls"); GST_DEBUG_CATEGORY_INIT (tcp_debug, "tcp", 0, "TCP calls");

View File

@ -1,40 +0,0 @@
/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __GST_TCP_H__
#define __GST_TCP_H__
#ifdef __cplusplus
extern "C"
{
#endif /* __cplusplus */
typedef enum
{
CONTROL_ZERO,
CONTROL_NONE,
CONTROL_TCP
} Gst_TCP_Control;
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GST_TCP_H__ */

View File

@ -41,10 +41,6 @@
#include <sys/ioctl.h> #include <sys/ioctl.h>
#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
#endif
#include "gsttcp.h" #include "gsttcp.h"
#include "gsttcpserversink.h" #include "gsttcpserversink.h"
#include "gsttcp-marshal.h" #include "gsttcp-marshal.h"
@ -56,18 +52,17 @@ GST_DEBUG_CATEGORY_STATIC (tcpserversink_debug);
enum enum
{ {
ARG_0, PROP_0,
ARG_HOST, PROP_HOST,
ARG_PORT, PROP_PORT,
}; };
static void gst_tcp_server_sink_finalize (GObject * gobject); static void gst_tcp_server_sink_finalize (GObject * gobject);
static gboolean gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, static gboolean gst_tcp_server_sink_init_send (GstMultiSocketSink * this);
GstPoll * set); static gboolean gst_tcp_server_sink_close (GstMultiSocketSink * this);
static gboolean gst_tcp_server_sink_init_send (GstMultiFdSink * this); static void gst_tcp_server_sink_removed (GstMultiSocketSink * sink,
static gboolean gst_tcp_server_sink_close (GstMultiFdSink * this); GSocket * socket);
static void gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd);
static void gst_tcp_server_sink_set_property (GObject * object, guint prop_id, static void gst_tcp_server_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec); const GValue * value, GParamSpec * pspec);
@ -75,27 +70,28 @@ static void gst_tcp_server_sink_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec); GValue * value, GParamSpec * pspec);
#define gst_tcp_server_sink_parent_class parent_class #define gst_tcp_server_sink_parent_class parent_class
G_DEFINE_TYPE (GstTCPServerSink, gst_tcp_server_sink, GST_TYPE_MULTI_FD_SINK); G_DEFINE_TYPE (GstTCPServerSink, gst_tcp_server_sink,
GST_TYPE_MULTI_SOCKET_SINK);
static void static void
gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass) gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass)
{ {
GObjectClass *gobject_class; GObjectClass *gobject_class;
GstElementClass *gstelement_class; GstElementClass *gstelement_class;
GstMultiFdSinkClass *gstmultifdsink_class; GstMultiSocketSinkClass *gstmultifdsink_class;
gobject_class = (GObjectClass *) klass; gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass; gstelement_class = (GstElementClass *) klass;
gstmultifdsink_class = (GstMultiFdSinkClass *) klass; gstmultifdsink_class = (GstMultiSocketSinkClass *) klass;
gobject_class->set_property = gst_tcp_server_sink_set_property; gobject_class->set_property = gst_tcp_server_sink_set_property;
gobject_class->get_property = gst_tcp_server_sink_get_property; gobject_class->get_property = gst_tcp_server_sink_get_property;
gobject_class->finalize = gst_tcp_server_sink_finalize; gobject_class->finalize = gst_tcp_server_sink_finalize;
g_object_class_install_property (gobject_class, ARG_HOST, g_object_class_install_property (gobject_class, PROP_HOST,
g_param_spec_string ("host", "host", "The host/IP to send the packets to", g_param_spec_string ("host", "host", "The host/IP to send the packets to",
TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, ARG_PORT, g_object_class_install_property (gobject_class, PROP_PORT,
g_param_spec_int ("port", "port", "The port to send the packets to", g_param_spec_int ("port", "port", "The port to send the packets to",
0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
@ -106,7 +102,6 @@ gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass)
"Thomas Vander Stichele <thomas at apestaart dot org>"); "Thomas Vander Stichele <thomas at apestaart dot org>");
gstmultifdsink_class->init = gst_tcp_server_sink_init_send; gstmultifdsink_class->init = gst_tcp_server_sink_init_send;
gstmultifdsink_class->wait = gst_tcp_server_sink_handle_wait;
gstmultifdsink_class->close = gst_tcp_server_sink_close; gstmultifdsink_class->close = gst_tcp_server_sink_close;
gstmultifdsink_class->removed = gst_tcp_server_sink_removed; gstmultifdsink_class->removed = gst_tcp_server_sink_removed;
@ -121,7 +116,7 @@ gst_tcp_server_sink_init (GstTCPServerSink * this)
/* this->mtu = 1500; */ /* this->mtu = 1500; */
this->host = g_strdup (TCP_DEFAULT_HOST); this->host = g_strdup (TCP_DEFAULT_HOST);
this->server_sock.fd = -1; this->server_socket = NULL;
} }
static void static void
@ -129,7 +124,11 @@ gst_tcp_server_sink_finalize (GObject * gobject)
{ {
GstTCPServerSink *this = GST_TCP_SERVER_SINK (gobject); GstTCPServerSink *this = GST_TCP_SERVER_SINK (gobject);
if (this->server_socket)
g_object_unref (this->server_socket);
this->server_socket = NULL;
g_free (this->host); g_free (this->host);
this->host = NULL;
G_OBJECT_CLASS (parent_class)->finalize (gobject); G_OBJECT_CLASS (parent_class)->finalize (gobject);
} }
@ -139,26 +138,31 @@ gst_tcp_server_sink_finalize (GObject * gobject)
static gboolean static gboolean
gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink) gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink)
{ {
/* new client */ GSocket *client_socket;
int client_sock_fd; GError *err = NULL;
struct sockaddr_in client_address;
socklen_t client_address_len;
/* For some stupid reason, client_address and client_address_len has to be /* wait on server socket for connections */
* zeroed */ client_socket =
memset (&client_address, 0, sizeof (client_address)); g_socket_accept (sink->server_socket, sink->element.cancellable, &err);
client_address_len = 0; if (!client_socket)
client_sock_fd =
accept (sink->server_sock.fd, (struct sockaddr *) &client_address,
&client_address_len);
if (client_sock_fd == -1)
goto accept_failed; goto accept_failed;
gst_multi_fd_sink_add (GST_MULTI_FD_SINK (sink), client_sock_fd); gst_multi_socket_sink_add (GST_MULTI_SOCKET_SINK (sink), client_socket);
GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d", #ifndef GST_DISABLE_GST_DEBUG
inet_ntoa (client_address.sin_addr), client_sock_fd); {
GInetSocketAddress *addr =
G_INET_SOCKET_ADDRESS (g_socket_get_remote_address (client_socket,
NULL));
gchar *ip =
g_inet_address_to_string (g_inet_socket_address_get_address (addr));
GST_DEBUG_OBJECT (sink, "added new client ip %s:%u with socket %p",
ip, g_inet_socket_address_get_port (addr), client_socket);
g_free (ip);
}
#endif
return TRUE; return TRUE;
@ -166,45 +170,47 @@ gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink)
accept_failed: accept_failed:
{ {
GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL), GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
("Could not accept client on server socket %d: %s (%d)", ("Could not accept client on server socket %p: %s",
sink->server_sock.fd, g_strerror (errno), errno)); sink->server_socket, err->message));
g_clear_error (&err);
return FALSE; return FALSE;
} }
} }
static void static void
gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd) gst_tcp_server_sink_removed (GstMultiSocketSink * sink, GSocket * socket)
{ {
#ifndef GST_DISABLE_GST_DEBUG #ifndef GST_DISABLE_GST_DEBUG
GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink);
#endif #endif
GError *err = NULL;
GST_LOG_OBJECT (this, "closing fd %d", fd); GST_DEBUG_OBJECT (this, "closing socket");
if (close (fd) < 0) {
GST_WARNING_OBJECT (this, "error closing fd %d: %s", fd, if (!g_socket_close (socket, &err)) {
g_strerror (errno)); GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
g_clear_error (&err);
} }
} }
static gboolean static gboolean
gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstPoll * set) gst_tcp_server_sink_socket_condition (GSocket * socket, GIOCondition condition,
GstTCPServerSink * sink)
{ {
GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); if ((condition & G_IO_ERR)) {
goto error;
if (gst_poll_fd_can_read (set, &this->server_sock)) { } else if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
/* handle new client connection on server socket */ if (!gst_tcp_server_sink_handle_server_read (sink))
if (!gst_tcp_server_sink_handle_server_read (this)) return FALSE;
goto connection_failed;
} }
return TRUE; return TRUE;
/* ERRORS */ error:
connection_failed: GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
{ ("client connection failed"));
GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL),
("client connection failed: %s", g_strerror (errno))); return FALSE;
return FALSE;
}
} }
static void static void
@ -217,7 +223,7 @@ gst_tcp_server_sink_set_property (GObject * object, guint prop_id,
sink = GST_TCP_SERVER_SINK (object); sink = GST_TCP_SERVER_SINK (object);
switch (prop_id) { switch (prop_id) {
case ARG_HOST: case PROP_HOST:
if (!g_value_get_string (value)) { if (!g_value_get_string (value)) {
g_warning ("host property cannot be NULL"); g_warning ("host property cannot be NULL");
break; break;
@ -225,10 +231,9 @@ gst_tcp_server_sink_set_property (GObject * object, guint prop_id,
g_free (sink->host); g_free (sink->host);
sink->host = g_strdup (g_value_get_string (value)); sink->host = g_strdup (g_value_get_string (value));
break; break;
case ARG_PORT: case PROP_PORT:
sink->server_port = g_value_get_int (value); sink->server_port = g_value_get_int (value);
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -245,13 +250,12 @@ gst_tcp_server_sink_get_property (GObject * object, guint prop_id,
sink = GST_TCP_SERVER_SINK (object); sink = GST_TCP_SERVER_SINK (object);
switch (prop_id) { switch (prop_id) {
case ARG_HOST: case PROP_HOST:
g_value_set_string (value, sink->host); g_value_set_string (value, sink->host);
break; break;
case ARG_PORT: case PROP_PORT:
g_value_set_int (value, sink->server_port); g_value_set_int (value, sink->server_port);
break; break;
default: default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break; break;
@ -261,111 +265,152 @@ gst_tcp_server_sink_get_property (GObject * object, guint prop_id,
/* create a socket for sending to remote machine */ /* create a socket for sending to remote machine */
static gboolean static gboolean
gst_tcp_server_sink_init_send (GstMultiFdSink * parent) gst_tcp_server_sink_init_send (GstMultiSocketSink * parent)
{ {
int ret;
GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent); GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent);
GError *err = NULL;
GInetAddress *addr;
GSocketAddress *saddr;
/* create sending server socket */ /* create the server listener socket */
if ((this->server_sock.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) this->server_socket =
g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
G_SOCKET_PROTOCOL_TCP, &err);
if (!this->server_socket)
goto no_socket; goto no_socket;
GST_DEBUG_OBJECT (this, "opened sending server socket with fd %d", GST_DEBUG_OBJECT (this, "opened sending server socket with socket %p",
this->server_sock.fd); this->server_socket);
/* make address reusable */ /* look up name if we need to */
ret = 1; addr = g_inet_address_new_from_string (this->host);
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, if (!addr) {
(void *) &ret, sizeof (ret)) < 0) GResolver *resolver = g_resolver_get_default ();
goto reuse_failed; GList *results;
/* keep connection alive; avoids SIGPIPE during write */ results =
ret = 1; g_resolver_lookup_by_name (resolver, this->host,
if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, this->element.cancellable, &err);
(void *) &ret, sizeof (ret)) < 0) if (!results)
goto keepalive_failed; goto name_resolve;
addr = G_INET_ADDRESS (g_object_ref (results->data));
/* name the socket */ g_resolver_free_addresses (results);
memset (&this->server_sin, 0, sizeof (this->server_sin)); g_object_unref (resolver);
this->server_sin.sin_family = AF_INET; /* network socket */ }
this->server_sin.sin_port = htons (this->server_port); /* on port */ #ifndef GST_DISABLE_GST_DEBUG
this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); /* for hosts */ {
gchar *ip = g_inet_address_to_string (addr);
GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
g_free (ip);
}
#endif
g_socket_set_blocking (this->server_socket, FALSE);
/* bind it */ /* bind it */
saddr = g_inet_socket_address_new (addr, this->server_port);
GST_DEBUG_OBJECT (this, "binding server socket to address"); GST_DEBUG_OBJECT (this, "binding server socket to address");
ret = bind (this->server_sock.fd, (struct sockaddr *) &this->server_sin, if (!g_socket_bind (this->server_socket, saddr, TRUE, &err))
sizeof (this->server_sin));
if (ret)
goto bind_failed; goto bind_failed;
/* set the server socket to nonblocking */ GST_DEBUG_OBJECT (this, "listening on server socket");
fcntl (this->server_sock.fd, F_SETFL, O_NONBLOCK); g_socket_set_listen_backlog (this->server_socket, TCP_BACKLOG);
GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", if (!g_socket_listen (this->server_socket, &err))
this->server_sock.fd, TCP_BACKLOG);
if (listen (this->server_sock.fd, TCP_BACKLOG) == -1)
goto listen_failed; goto listen_failed;
GST_DEBUG_OBJECT (this, GST_DEBUG_OBJECT (this,
"listened on server socket %d, returning from connection setup", "listened on server socket %p, returning from connection setup",
this->server_sock.fd); this->server_socket);
gst_poll_add_fd (parent->fdset, &this->server_sock); this->server_source =
gst_poll_fd_ctl_read (parent->fdset, &this->server_sock, TRUE); g_socket_create_source (this->server_socket,
G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP,
this->element.cancellable);
g_source_set_callback (this->server_source,
(GSourceFunc) gst_tcp_server_sink_socket_condition, gst_object_ref (this),
(GDestroyNotify) gst_object_unref);
g_source_attach (this->server_source, this->element.main_context);
return TRUE; return TRUE;
/* ERRORS */ /* ERRORS */
no_socket: no_socket:
{ {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM);
return FALSE;
}
reuse_failed:
{
gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
}
keepalive_failed:
{
gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL),
("Could not setsockopt: %s", g_strerror (errno)));
return FALSE;
}
listen_failed:
{
gst_tcp_socket_close (&this->server_sock);
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Could not listen on server socket: %s", g_strerror (errno))); ("Failed to create socket: %s", err->message));
g_clear_error (&err);
return FALSE;
}
name_resolve:
{
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (this, "Cancelled name resolval");
} else {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Failed to resolve host '%s': %s", this->host, err->message));
}
g_clear_error (&err);
gst_tcp_server_sink_close (&this->element);
return FALSE; return FALSE;
} }
bind_failed: bind_failed:
{ {
gst_tcp_socket_close (&this->server_sock); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
switch (errno) { GST_DEBUG_OBJECT (this, "Cancelled binding");
default: } else {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("bind on port %d failed: %s", this->server_port, ("Failed to bind on host '%s:%d': %s", this->host, this->server_port,
g_strerror (errno))); err->message));
break;
} }
g_clear_error (&err);
g_object_unref (saddr);
g_object_unref (addr);
gst_tcp_server_sink_close (&this->element);
return FALSE;
}
listen_failed:
{
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_DEBUG_OBJECT (this, "Cancelled listening");
} else {
GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
("Failed to listen on host '%s:%d': %s", this->host,
this->server_port, err->message));
}
g_clear_error (&err);
g_object_unref (saddr);
g_object_unref (addr);
gst_tcp_server_sink_close (&this->element);
return FALSE; return FALSE;
} }
} }
static gboolean static gboolean
gst_tcp_server_sink_close (GstMultiFdSink * parent) gst_tcp_server_sink_close (GstMultiSocketSink * parent)
{ {
GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent); GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent);
if (this->server_sock.fd != -1) { if (this->server_source) {
gst_poll_remove_fd (parent->fdset, &this->server_sock); g_source_destroy (this->server_source);
g_source_unref (this->server_source);
close (this->server_sock.fd); this->server_source = NULL;
this->server_sock.fd = -1;
} }
if (this->server_socket) {
GError *err = NULL;
GST_DEBUG_OBJECT (this, "closing socket");
if (!g_socket_close (this->server_socket, &err)) {
GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
g_clear_error (&err);
}
g_object_unref (this->server_socket);
this->server_socket = NULL;
}
return TRUE; return TRUE;
} }

View File

@ -24,23 +24,11 @@
#include <gst/gst.h> #include <gst/gst.h>
#include <gio/gio.h>
G_BEGIN_DECLS G_BEGIN_DECLS
#include <stdio.h> #include "gstmultisocketsink.h"
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include "gstmultifdsink.h"
#define GST_TYPE_TCP_SERVER_SINK \ #define GST_TYPE_TCP_SERVER_SINK \
(gst_tcp_server_sink_get_type()) (gst_tcp_server_sink_get_type())
@ -68,19 +56,17 @@ typedef enum {
* Opaque data structure. * Opaque data structure.
*/ */
struct _GstTCPServerSink { struct _GstTCPServerSink {
GstMultiFdSink element; GstMultiSocketSink element;
/* server information */ /* server information */
int server_port; gint server_port;
gchar *host; gchar *host;
struct sockaddr_in server_sin; GSocket *server_socket;
GSource *server_source;
/* socket */
GstPollFD server_sock;
}; };
struct _GstTCPServerSinkClass { struct _GstTCPServerSinkClass {
GstMultiFdSinkClass parent_class; GstMultiSocketSinkClass parent_class;
}; };
GType gst_tcp_server_sink_get_type (void); GType gst_tcp_server_sink_get_type (void);