From f7ddf811d76285229f6e0a4a5ce8cce83e1729a9 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 6 Apr 2011 15:49:01 +0200 Subject: [PATCH] rtspsrc: fix and improve async handling Simplify the command handling; passing a command to thread means we really want it to get the message, which means to always flush provided the command can handle being interrupted. Command thread indicates whether command allows interruption and ensure non-flushing connection as it subsequently needs it. In particular, this also makes the TEARDOWN sequence interruptable and also prevents races where _loop_ could miss a command and would continue receiving (or at least trying to). See #632504. --- gst/rtsp/gstrtspsrc.c | 67 +++++++++++++++---------------------------- gst/rtsp/gstrtspsrc.h | 1 - 2 files changed, 23 insertions(+), 45 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index f5bbcf898c..82743147fa 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -3540,23 +3540,11 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec", tv_timeout.tv_sec, tv_timeout.tv_usec); - GST_OBJECT_LOCK (src); - if (!src->flushing) { - src->waiting = TRUE; - GST_OBJECT_UNLOCK (src); - - /* protect the connection with the connection lock so that we can see when - * we are finished doing server communication */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, - &message, src->ptcp_timeout); - - GST_OBJECT_LOCK (src); - src->waiting = FALSE; - } else { - res = GST_RTSP_EINTR; - } - GST_OBJECT_UNLOCK (src); + /* protect the connection with the connection lock so that we can see when + * we are finished doing server communication */ + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, src->ptcp_timeout); switch (res) { case GST_RTSP_OK: @@ -3786,24 +3774,11 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) gst_rtsp_message_unset (&message); - GST_OBJECT_LOCK (src); - if (!src->flushing) { - src->waiting = TRUE; - GST_OBJECT_UNLOCK (src); - - /* we should continue reading the TCP socket because the server might - * send us requests. When the session timeout expires, we need to send a - * keep-alive request to keep the session open. */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, - &message, &tv_timeout); - - GST_OBJECT_LOCK (src); - src->waiting = FALSE; - } else { - res = GST_RTSP_EINTR; - } - GST_OBJECT_UNLOCK (src); + /* we should continue reading the TCP socket because the server might + * send us requests. When the session timeout expires, we need to send a + * keep-alive request to keep the session open. */ + res = gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, &tv_timeout); switch (res) { case GST_RTSP_OK: @@ -3942,7 +3917,6 @@ gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async) /* only restart when the pads were not yet activated, else we were * streaming over UDP */ restart = src->need_activate; - src->flushing = FALSE; GST_OBJECT_UNLOCK (src); /* no need to restart, we're done */ @@ -4102,6 +4076,8 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) { gint old; + /* FIXME flush param mute; remove at discretion */ + /* start new request */ gst_rtspsrc_loop_start_cmd (src, cmd); @@ -4115,14 +4091,10 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) GST_OBJECT_LOCK (src); } src->loop_cmd = cmd; - src->flushing = flush; - if (flush) { - if (src->waiting) { - GST_DEBUG_OBJECT (src, "start connection flush"); - gst_rtspsrc_connection_flush (src, TRUE); - } - } else { - GST_DEBUG_OBJECT (src, "stop connection flush"); + /* interrupt if allowed */ + if (src->waiting) { + GST_DEBUG_OBJECT (src, "start connection flush"); + gst_rtspsrc_connection_flush (src, TRUE); } if (src->task) gst_task_start (src->task); @@ -6426,6 +6398,13 @@ gst_rtspsrc_thread (GstRTSPSrc * src) cmd = src->loop_cmd; src->loop_cmd = CMD_WAIT; GST_DEBUG_OBJECT (src, "got command %d", cmd); + + /* we got the message command, so ensure communication is possible again */ + gst_rtspsrc_connection_flush (src, FALSE); + + /* we allow these to be interrupted */ + if (cmd == CMD_LOOP || cmd == CMD_CLOSE) + src->waiting = TRUE; GST_OBJECT_UNLOCK (src); switch (cmd) { diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index f69a3edd27..1d6f84950b 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -180,7 +180,6 @@ struct _GstRTSPSrc { /* UDP mode loop */ gint loop_cmd; gboolean ignore_timeout; - gboolean flushing; gboolean waiting; gboolean open_error;