diff --git a/gst/tcp/gstsocketsrc.c b/gst/tcp/gstsocketsrc.c index cb70254d55..7263e40f10 100644 --- a/gst/tcp/gstsocketsrc.c +++ b/gst/tcp/gstsocketsrc.c @@ -76,8 +76,8 @@ G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC); static void gst_socket_src_finalize (GObject * gobject); -static GstFlowReturn gst_socket_src_create (GstPushSrc * psrc, - GstBuffer ** outbuf); +static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc, + GstBuffer * outbuf); static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc); static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc); @@ -120,7 +120,7 @@ gst_socket_src_class_init (GstSocketSrcClass * klass) gstbasesrc_class->unlock = gst_socket_src_unlock; gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop; - gstpush_src_class->create = gst_socket_src_create; + gstpush_src_class->fill = gst_socket_src_fill; GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source"); } @@ -148,14 +148,13 @@ gst_socket_src_finalize (GObject * gobject) } static GstFlowReturn -gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) +gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf) { GstSocketSrc *src; GstFlowReturn ret = GST_FLOW_OK; gssize rret; GError *err = NULL; GstMapInfo map; - gssize avail, read; GSocket *socket; src = GST_SOCKET_SRC (psrc); @@ -173,60 +172,14 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_LOG_OBJECT (src, "asked for a buffer"); - /* read the buffer header */ - avail = g_socket_get_available_bytes (socket); - if (avail < 0) { - goto get_available_error; - } else if (avail == 0) { - GIOCondition condition; - - if (!g_socket_condition_wait (socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err)) - goto select_error; - - condition = - g_socket_condition_check (socket, - G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); - - if ((condition & G_IO_ERR)) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Socket in error state")); - *outbuf = NULL; - ret = GST_FLOW_ERROR; - goto done; - } else if ((condition & G_IO_HUP)) { - GST_DEBUG_OBJECT (src, "Connection closed"); - *outbuf = NULL; - ret = GST_FLOW_EOS; - goto done; - } - avail = g_socket_get_available_bytes (socket); - if (avail < 0) - goto get_available_error; - } - - if (avail > 0) { - read = MIN (avail, MAX_READ_SIZE); - *outbuf = gst_buffer_new_and_alloc (read); - gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE); - rret = - g_socket_receive (socket, (gchar *) map.data, read, - src->cancellable, &err); - } else { - /* Connection closed */ - *outbuf = NULL; - read = 0; - rret = 0; - } + gst_buffer_map (outbuf, &map, GST_MAP_READWRITE); + rret = g_socket_receive_with_blocking (socket, (gchar *) map.data, + map.size, TRUE, src->cancellable, &err); + gst_buffer_unmap (outbuf, &map); if (rret == 0) { GST_DEBUG_OBJECT (src, "Connection closed"); ret = GST_FLOW_EOS; - if (*outbuf) { - gst_buffer_unmap (*outbuf, &map); - gst_buffer_unref (*outbuf); - } - *outbuf = NULL; } else if (rret < 0) { if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { ret = GST_FLOW_FLUSHING; @@ -236,50 +189,24 @@ gst_socket_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Failed to read from socket: %s", err->message)); } - gst_buffer_unmap (*outbuf, &map); - gst_buffer_unref (*outbuf); - *outbuf = NULL; } else { ret = GST_FLOW_OK; - gst_buffer_unmap (*outbuf, &map); - gst_buffer_resize (*outbuf, 0, rret); + gst_buffer_resize (outbuf, 0, rret); GST_LOG_OBJECT (src, "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, - gst_buffer_get_size (*outbuf), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), - GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); + gst_buffer_get_size (outbuf), + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)), + GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)), + GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf)); } g_clear_error (&err); -done: g_object_unref (socket); return ret; -select_error: - { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - GST_DEBUG_OBJECT (src, "Cancelled"); - ret = GST_FLOW_FLUSHING; - } else { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Select failed: %s", err->message)); - ret = GST_FLOW_ERROR; - } - g_clear_error (&err); - g_object_unref (socket); - return ret; - } -get_available_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Failed to get available bytes from socket")); - g_object_unref (socket); - return GST_FLOW_ERROR; - } no_socket: { GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),