diff --git a/gst-libs/gst/rtsp/gstrtspconnection.c b/gst-libs/gst/rtsp/gstrtspconnection.c index b2a30d87e8..18f83890a5 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.c +++ b/gst-libs/gst/rtsp/gstrtspconnection.c @@ -1834,7 +1834,7 @@ normalize_line (guint8 * buffer) /* returns: * GST_RTSP_OK when a complete message was read. - * GST_RTSP_EEOF: when the socket is closed + * GST_RTSP_EEOF: when the read socket is closed * GST_RTSP_EINTR: when more data is needed. * GST_RTSP_..: some other error occured. */ @@ -3006,8 +3006,10 @@ gst_rtsp_connection_do_tunnel (GstRTSPConnection * conn, return GST_RTSP_OK; } -#define READ_COND (G_IO_IN | G_IO_HUP | G_IO_ERR) -#define WRITE_COND (G_IO_OUT | G_IO_ERR) +#define READ_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define READ_COND (G_IO_IN | READ_ERR) +#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL) +#define WRITE_COND (G_IO_OUT | WRITE_ERR) typedef struct { @@ -3028,7 +3030,6 @@ struct _GstRTSPWatch GPollFD readfd; GPollFD writefd; - gboolean write_added; /* queued message for transmission */ guint id; @@ -3077,17 +3078,35 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, gpointer user_data G_GNUC_UNUSED) { GstRTSPWatch *watch = (GstRTSPWatch *) source; - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_ERROR; + gboolean keep_running = TRUE; /* first read as much as we can */ if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) { do { + if (watch->readfd.revents & READ_ERR) + goto read_error; + res = build_next (&watch->builder, &watch->message, watch->conn); if (res == GST_RTSP_EINTR) break; - else if (G_UNLIKELY (res == GST_RTSP_EEOF)) - goto eof; - else if (G_LIKELY (res == GST_RTSP_OK)) { + else if (G_UNLIKELY (res == GST_RTSP_EEOF)) { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + /* When we are in tunnelled mode, the read socket can be closed and we + * should be prepared for a new POST method to reopen it */ + if (watch->conn->tstate == TUNNEL_STATE_COMPLETE) { + /* remove the read connection for the tunnel */ + /* we accept a new POST request */ + watch->conn->tstate = TUNNEL_STATE_GET; + /* and signal that we lost our tunnel */ + if (watch->funcs.tunnel_lost) + res = watch->funcs.tunnel_lost (watch, watch->user_data); + goto read_done; + } else + goto eof; + } else if (G_LIKELY (res == GST_RTSP_OK)) { if (!watch->conn->manual_http && watch->message.type == GST_RTSP_MESSAGE_HTTP_REQUEST) { if (watch->conn->tstate == TUNNEL_STATE_NONE && @@ -3144,11 +3163,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, watch->funcs.message_received (watch, &watch->message, watch->user_data); } else { - if (watch->funcs.error_full) - GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message, - 0, watch->user_data), error); - else - goto error; + goto read_error; } read_done: @@ -3158,6 +3173,9 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, } if (watch->writefd.revents & WRITE_COND) { + if (watch->writefd.revents & WRITE_ERR) + goto write_error; + g_mutex_lock (watch->mutex); do { if (watch->write_data == NULL) { @@ -3166,7 +3184,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, /* get a new message from the queue */ rec = g_queue_pop_tail (watch->messages); if (rec == NULL) - goto done; + break; watch->write_off = 0; watch->write_data = rec->data; @@ -3179,17 +3197,14 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, res = write_bytes (watch->writefd.fd, watch->write_data, &watch->write_off, watch->write_size); g_mutex_unlock (watch->mutex); + if (res == GST_RTSP_EINTR) goto write_blocked; else if (G_LIKELY (res == GST_RTSP_OK)) { if (watch->funcs.message_sent) watch->funcs.message_sent (watch, watch->write_id, watch->user_data); } else { - if (watch->funcs.error_full) - GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL, - watch->write_id, watch->user_data), error); - else - goto error; + goto write_error; } g_mutex_lock (watch->mutex); @@ -3197,31 +3212,61 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED, watch->write_data = NULL; } while (TRUE); - done: - if (watch->write_added) { - g_source_remove_poll ((GSource *) watch, &watch->writefd); - watch->write_added = FALSE; - watch->writefd.revents = 0; - } + watch->writefd.events = WRITE_ERR; g_mutex_unlock (watch->mutex); } write_blocked: - return TRUE; + return keep_running; /* ERRORS */ eof: { if (watch->funcs.closed) watch->funcs.closed (watch, watch->user_data); + + /* always stop when the readfd returns EOF in non-tunneled mode */ return FALSE; } +read_error: + { + watch->readfd.events = 0; + watch->readfd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->readfd); + keep_running = (watch->writefd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, &watch->message, + 0, watch->user_data), error); + else + goto error; + } else + goto eof; + } +write_error: + { + watch->writefd.events = 0; + watch->writefd.revents = 0; + g_source_remove_poll ((GSource *) watch, &watch->writefd); + keep_running = (watch->readfd.events != 0); + + if (keep_running) { + if (watch->funcs.error_full) + GST_RTSP_CHECK (watch->funcs.error_full (watch, res, NULL, + watch->write_id, watch->user_data), error); + else + goto error; + } else + goto eof; + } error: { if (watch->funcs.error) watch->funcs.error (watch, res, watch->user_data); - return FALSE; + + return keep_running; } } @@ -3245,11 +3290,10 @@ gst_rtsp_source_finalize (GSource * source) g_queue_foreach (watch->messages, (GFunc) gst_rtsp_rec_free, NULL); g_queue_free (watch->messages); watch->messages = NULL; + g_free (watch->write_data); g_mutex_free (watch->mutex); - g_free (watch->write_data); - if (watch->notify) watch->notify (watch->user_data); } @@ -3312,10 +3356,6 @@ gst_rtsp_watch_new (GstRTSPConnection * conn, result->user_data = user_data; result->notify = notify; - /* only add the read fd, the write fd is only added when we have data - * to send. */ - g_source_add_poll ((GSource *) result, &result->readfd); - return result; } @@ -3341,11 +3381,13 @@ gst_rtsp_watch_reset (GstRTSPWatch * watch) watch->readfd.revents = 0; watch->writefd.fd = watch->conn->writefd->fd; - watch->writefd.events = WRITE_COND; + watch->writefd.events = WRITE_ERR; watch->writefd.revents = 0; - watch->write_added = FALSE; - g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->readfd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->readfd); + if (watch->writefd.fd != -1) + g_source_add_poll ((GSource *) watch, &watch->writefd); } /** @@ -3411,6 +3453,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, GstRTSPResult res; GstRTSPRec *rec; guint off = 0; + GMainContext *context = NULL; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); @@ -3418,6 +3461,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, g_mutex_lock (watch->mutex); + /* try to send the message synchronously first */ if (watch->messages->length == 0) { res = write_bytes (watch->writefd.fd, data, &off, size); if (res != GST_RTSP_EINTR) { @@ -3428,7 +3472,7 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, } } - /* make a record with the data and id */ + /* make a record with the data and id for sending async */ rec = g_slice_new (GstRTSPRec); if (off == 0) { rec->data = (guint8 *) data; @@ -3449,9 +3493,9 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, /* make sure the main context will now also check for writability on the * socket */ - if (!watch->write_added) { - g_source_add_poll ((GSource *) watch, &watch->writefd); - watch->write_added = TRUE; + if (watch->writefd.events != WRITE_COND) { + watch->writefd.events = WRITE_COND; + context = ((GSource *) watch)->context; } if (id != NULL) @@ -3460,6 +3504,10 @@ gst_rtsp_watch_write_data (GstRTSPWatch * watch, const guint8 * data, done: g_mutex_unlock (watch->mutex); + + if (context) + g_main_context_wakeup (context); + return res; } @@ -3528,6 +3576,7 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data, guint size) { GstRTSPRec *rec; + GMainContext *context = NULL; g_return_val_if_fail (watch != NULL, GST_RTSP_EINVAL); g_return_val_if_fail (data != NULL, GST_RTSP_EINVAL); @@ -3549,12 +3598,15 @@ gst_rtsp_watch_queue_data (GstRTSPWatch * watch, const guint8 * data, /* make sure the main context will now also check for writability on the * socket */ - if (!watch->write_added) { - g_source_add_poll ((GSource *) watch, &watch->writefd); - watch->write_added = TRUE; + if (watch->writefd.events != WRITE_COND) { + watch->writefd.events = WRITE_COND; + context = ((GSource *) watch)->context; } - g_mutex_unlock (watch->mutex); + + if (context) + g_main_context_wakeup (context); + return rec->id; } #endif /* GST_REMOVE_DEPRECATED */ diff --git a/gst-libs/gst/rtsp/gstrtspconnection.h b/gst-libs/gst/rtsp/gstrtspconnection.h index c811b127b9..2a34abd0f2 100644 --- a/gst-libs/gst/rtsp/gstrtspconnection.h +++ b/gst-libs/gst/rtsp/gstrtspconnection.h @@ -153,6 +153,8 @@ typedef struct _GstRTSPWatch GstRTSPWatch; * gst_rtsp_connection_do_tunnel(). * @error_full: callback when an error occured with more information than * the @error callback. Since 0.10.25 + * @tunnel_lost: callback when the post connection of a tunnel is closed. + * Since 0.10.29 * * Callback functions from a #GstRTSPWatch. * @@ -171,9 +173,10 @@ typedef struct { GstRTSPResult (*error_full) (GstRTSPWatch *watch, GstRTSPResult result, GstRTSPMessage *message, guint id, gpointer user_data); + GstRTSPResult (*tunnel_lost) (GstRTSPWatch *watch, gpointer user_data); /*< private >*/ - gpointer _gst_reserved[GST_PADDING - 1]; + gpointer _gst_reserved[GST_PADDING - 2]; } GstRTSPWatchFuncs; GstRTSPWatch * gst_rtsp_watch_new (GstRTSPConnection *conn,