From 4df5d896f096c6ab730ba8f7cb2e9af6ff412b03 Mon Sep 17 00:00:00 2001 From: Debarshi Ray Date: Sun, 15 May 2011 23:25:15 +0300 Subject: [PATCH 01/19] matroskaparse: calculate segment duration after parsing all the IDs Since the segment duration is given in terms of the GST_MATROSKA_ID_TIMECODESCALE we should only convert it into nanoseconds when we are sure that any scale specified in the file has been read. https://bugzilla.gnome.org/show_bug.cgi?id=650258 --- gst/matroska/matroska-parse.c | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/gst/matroska/matroska-parse.c b/gst/matroska/matroska-parse.c index 24a95a9cc1..7d488494cf 100644 --- a/gst/matroska/matroska-parse.c +++ b/gst/matroska/matroska-parse.c @@ -2808,6 +2808,7 @@ static GstFlowReturn gst_matroska_parse_parse_info (GstMatroskaParse * parse, GstEbmlRead * ebml) { GstFlowReturn ret = GST_FLOW_OK; + gdouble dur_f = -1.0; guint32 id; DEBUG_ELEMENT_START (parse, ebml, "SegmentInfo"); @@ -2836,23 +2837,15 @@ gst_matroska_parse_parse_info (GstMatroskaParse * parse, GstEbmlRead * ebml) } case GST_MATROSKA_ID_DURATION:{ - gdouble num; - GstClockTime dur; - - if ((ret = gst_ebml_read_float (ebml, &id, &num)) != GST_FLOW_OK) + if ((ret = gst_ebml_read_float (ebml, &id, &dur_f)) != GST_FLOW_OK) break; - if (num <= 0.0) { - GST_WARNING_OBJECT (parse, "Invalid duration %lf", num); + if (dur_f <= 0.0) { + GST_WARNING_OBJECT (parse, "Invalid duration %lf", dur_f); break; } - GST_DEBUG_OBJECT (parse, "Duration: %lf", num); - - dur = gst_gdouble_to_guint64 (num * - gst_guint64_to_gdouble (parse->time_scale)); - if (GST_CLOCK_TIME_IS_VALID (dur) && dur <= G_MAXINT64) - gst_segment_set_duration (&parse->segment, GST_FORMAT_TIME, dur); + GST_DEBUG_OBJECT (parse, "Duration: %lf", dur_f); break; } @@ -2923,6 +2916,15 @@ gst_matroska_parse_parse_info (GstMatroskaParse * parse, GstEbmlRead * ebml) } } + if (dur_f > 0.0) { + GstClockTime dur_u; + + dur_u = gst_gdouble_to_guint64 (dur_f * + gst_guint64_to_gdouble (parse->time_scale)); + if (GST_CLOCK_TIME_IS_VALID (dur_u) && dur_u <= G_MAXINT64) + gst_segment_set_duration (&parse->segment, GST_FORMAT_TIME, dur_u); + } + DEBUG_ELEMENT_STOP (parse, ebml, "SegmentInfo", ret); parse->segmentinfo_parsed = TRUE; From 1bf94a92b04e4b0ff83fd015daa46e880cc4d920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Fri, 6 May 2011 18:12:53 -0400 Subject: [PATCH 02/19] rtpssrcdemux: Release lock before emitting signal If the lock is not released before emitting a signal, it may cause a deadlock if any other function in the element is called. Also removed an unused timestamp parameter https://bugzilla.gnome.org/show_bug.cgi?id=649617 --- gst/rtpmanager/gstrtpssrcdemux.c | 48 ++++++++++++-------------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 2688775e71..b8a83b197b 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -153,10 +153,8 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) return NULL; } -/* with PAD_LOCK */ static GstRtpSsrcDemuxPad * -create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, - GstClockTime timestamp) +find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { GstPad *rtp_pad, *rtcp_pad; GstElementClass *klass; @@ -166,6 +164,14 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + GST_OBJECT_LOCK (demux); + + demuxpad = find_demux_pad_for_ssrc (demux, ssrc); + if (demuxpad != NULL) { + GST_OBJECT_UNLOCK (demux); + return demuxpad; + } + klass = GST_ELEMENT_GET_CLASS (demux); templ = gst_element_class_get_pad_template (klass, "src_%d"); padname = g_strdup_printf ("src_%d", ssrc); @@ -177,20 +183,12 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, rtcp_pad = gst_pad_new_from_template (templ, padname); g_free (padname); - /* we use the first timestamp received to calculate the difference between - * timestamps on all streams */ - GST_DEBUG_OBJECT (demux, "SSRC %08x, first timestamp %" GST_TIME_FORMAT, - ssrc, GST_TIME_ARGS (timestamp)); - /* wrap in structure and add to list */ demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1); demuxpad->ssrc = ssrc; demuxpad->rtp_pad = rtp_pad; demuxpad->rtcp_pad = rtcp_pad; - GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT, - GST_TIME_ARGS (timestamp)); - gst_pad_set_element_private (rtp_pad, demuxpad); gst_pad_set_element_private (rtcp_pad, demuxpad); @@ -213,6 +211,8 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_rtp_ssrc_demux_iterate_internal_links); gst_pad_set_active (rtcp_pad, TRUE); + GST_OBJECT_UNLOCK (demux); + gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); @@ -521,15 +521,9 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc); - GST_PAD_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL) { - if (!(dpad = - create_demux_pad_for_ssrc (demux, ssrc, - GST_BUFFER_TIMESTAMP (buf)))) - goto create_failed; - } - GST_PAD_UNLOCK (demux); + dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc); + if (dpad == NULL) + goto create_failed; /* push to srcpad */ ret = gst_pad_push (dpad->rtp_pad, buf); @@ -549,7 +543,6 @@ create_failed: { GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Could not create new pad")); - GST_PAD_UNLOCK (demux); gst_buffer_unref (buf); return GST_FLOW_ERROR; } @@ -585,14 +578,10 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc); - GST_PAD_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL) { - GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); - if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc, -1))) - goto create_failed; - } - GST_PAD_UNLOCK (demux); + dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc); + if (dpad == NULL) + goto create_failed; + /* push to srcpad */ ret = gst_pad_push (dpad->rtcp_pad, buf); @@ -618,7 +607,6 @@ create_failed: { GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Could not create new pad")); - GST_PAD_UNLOCK (demux); gst_buffer_unref (buf); return GST_FLOW_ERROR; } From 23b6c8febc71b3303ae6bca19ae9ef9a7f2ca1e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Fri, 6 May 2011 18:41:01 -0400 Subject: [PATCH 03/19] rtpssrcdemux: iterate pad function is only valid for src pads The iterate function is only used for src pads, so mark it as such and remove dead code. https://bugzilla.gnome.org/show_bug.cgi?id=649617 --- gst/rtpmanager/gstrtpssrcdemux.c | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index b8a83b197b..32bc559824 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -121,7 +121,8 @@ static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); -static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links (GstPad * pad); +static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * + pad); static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query); static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 }; @@ -203,12 +204,12 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event); gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query); gst_pad_set_iterate_internal_links_function (rtp_pad, - gst_rtp_ssrc_demux_iterate_internal_links); + gst_rtp_ssrc_demux_iterate_internal_links_src); gst_pad_set_active (rtp_pad, TRUE); gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event); gst_pad_set_iterate_internal_links_function (rtcp_pad, - gst_rtp_ssrc_demux_iterate_internal_links); + gst_rtp_ssrc_demux_iterate_internal_links_src); gst_pad_set_active (rtcp_pad, TRUE); GST_OBJECT_UNLOCK (demux); @@ -652,25 +653,23 @@ gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event) } static GstIterator * -gst_rtp_ssrc_demux_iterate_internal_links (GstPad * pad) +gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad) { GstRtpSsrcDemux *demux; GstPad *otherpad = NULL; - GstIterator *it; + GstIterator *it = NULL; GSList *current; demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); + if (!demux) + return NULL; + GST_PAD_LOCK (demux); for (current = demux->srcpads; current; current = g_slist_next (current)) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data; - if (pad == demux->rtp_sink) { - otherpad = dpad->rtp_pad; - break; - } else if (pad == demux->rtcp_sink) { - otherpad = dpad->rtcp_pad; - } else if (pad == dpad->rtp_pad) { + if (pad == dpad->rtp_pad) { otherpad = demux->rtp_sink; break; } else if (pad == dpad->rtcp_pad) { From b6bfc512e8c3781b272732b9b342a225bb32412a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20Cr=C3=AAte?= Date: Fri, 6 May 2011 19:09:17 -0400 Subject: [PATCH 04/19] ssrcdemux: Implement iterate internal links for sink pads https://bugzilla.gnome.org/show_bug.cgi?id=649617 --- gst/rtpmanager/gstrtpssrcdemux.c | 44 ++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 32bc559824..54c14fb8b6 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -118,6 +118,8 @@ static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf); static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event); +static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * + pad); /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); @@ -320,6 +322,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux, "sink"), "sink"); gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain); gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event); + gst_pad_set_iterate_internal_links_function (demux->rtp_sink, + gst_rtp_ssrc_demux_iterate_internal_links_sink); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink); demux->rtcp_sink = @@ -328,6 +332,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux, gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain); gst_pad_set_event_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_sink_event); + gst_pad_set_iterate_internal_links_function (demux->rtcp_sink, + gst_rtp_ssrc_demux_iterate_internal_links_sink); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); demux->padlock = g_mutex_new (); @@ -685,6 +691,44 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad) return it; } +/* Should return 0 for elements to be included */ +static gint +src_pad_compare_func (gconstpointer a, gconstpointer b) +{ + GstPad *pad = GST_PAD (a); + const gchar *prefix = b; + gint res = 1; + + GST_OBJECT_LOCK (pad); + res = !GST_PAD_NAME (pad) || g_str_has_prefix (GST_PAD_NAME (pad), prefix); + GST_OBJECT_UNLOCK (pad); + + return res; +} + +static GstIterator * +gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad) +{ + GstRtpSsrcDemux *demux; + GstIterator *it = NULL; + const gchar *prefix; + + demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); + + if (!demux) + return NULL; + + if (pad == demux->rtp_sink) + prefix = "src_"; + else if (pad == demux->rtcp_sink) + prefix = "rtcp_src_"; + + it = gst_element_iterate_src_pads (GST_ELEMENT (demux)); + + return gst_iterator_filter (it, src_pad_compare_func, (gpointer) prefix); +} + + static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query) { From 0f05d3e5a50b9142b8b2439940705be4c30e8809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 17 May 2011 09:24:08 +0200 Subject: [PATCH 05/19] rtpssrcdemux: Fix uninitialized variable compiler warning --- gst/rtpmanager/gstrtpssrcdemux.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 54c14fb8b6..41910b5075 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -722,6 +722,8 @@ gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad) prefix = "src_"; else if (pad == demux->rtcp_sink) prefix = "rtcp_src_"; + else + g_assert_not_reached (); it = gst_element_iterate_src_pads (GST_ELEMENT (demux)); From 1a310d558c6c123f73593009200589f252d2d949 Mon Sep 17 00:00:00 2001 From: Edward Hervey Date: Thu, 28 Apr 2011 15:57:04 +0200 Subject: [PATCH 06/19] v4l2objects: Only allow mpeg-ts on source objects Ugly fix for #648312 --- sys/v4l2/gstv4l2object.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sys/v4l2/gstv4l2object.c b/sys/v4l2/gstv4l2object.c index 52961974d5..e2713fc251 100644 --- a/sys/v4l2/gstv4l2object.c +++ b/sys/v4l2/gstv4l2object.c @@ -2049,7 +2049,9 @@ gst_v4l2_object_set_format (GstV4l2Object * v4l2object, guint32 pixelformat, GST_V4L2_CHECK_OPEN (v4l2object); GST_V4L2_CHECK_NOT_ACTIVE (v4l2object); - if (pixelformat == GST_MAKE_FOURCC ('M', 'P', 'E', 'G')) + /* Only unconditionally accept mpegts for sources */ + if ((v4l2object->type == V4L2_BUF_TYPE_VIDEO_CAPTURE) && + (pixelformat == GST_MAKE_FOURCC ('M', 'P', 'E', 'G'))) return TRUE; memset (&format, 0x00, sizeof (struct v4l2_format)); From b694bfeca36ff035bd7d619405584dc95afe6514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 17 May 2011 10:47:32 +0200 Subject: [PATCH 07/19] ssrcdemux: Fix uninitialized variable compiler warning for (pre-) releases too --- gst/rtpmanager/gstrtpssrcdemux.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 41910b5075..25ab9c3485 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -711,7 +711,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad) { GstRtpSsrcDemux *demux; GstIterator *it = NULL; - const gchar *prefix; + const gchar *prefix = NULL; demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); From dae679e5602884e135a72e9e277211016da97e54 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jan 2011 11:40:11 +0100 Subject: [PATCH 08/19] rtspsrc: small header cleanups --- gst/rtsp/gstrtspsrc.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index 460f29a0f5..c09442bc24 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -131,7 +131,7 @@ struct _GstRTSPStream { gboolean container; /* original control url */ gchar *control_url; - guint32 ssrc; + guint32 ssrc; guint32 seqbase; guint64 timebase; @@ -201,7 +201,7 @@ struct _GstRTSPSrc { /* properties */ GstRTSPLowerTrans protocols; gboolean debug; - guint retry; + guint retry; guint64 udp_timeout; GTimeVal tcp_timeout; GTimeVal *ptcp_timeout; From 2873585238280a150558919a217e554326ed8395 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jan 2011 11:40:32 +0100 Subject: [PATCH 09/19] rtspsrc: first attempt at async implementation --- gst/rtsp/gstrtspsrc.c | 703 +++++++++++++++++++++++++----------------- 1 file changed, 422 insertions(+), 281 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 8990d6ffb8..404a362152 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -246,25 +246,41 @@ static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, static GstRTSPResult gst_rtspsrc_send_cb (GstRTSPExtension * ext, GstRTSPMessage * request, GstRTSPMessage * response, GstRTSPSrc * src); -static gboolean gst_rtspsrc_open (GstRTSPSrc * src); -static gboolean gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment); -static gboolean gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle); -static gboolean gst_rtspsrc_close (GstRTSPSrc * src); +static GstRTSPResult gst_rtspsrc_open (GstRTSPSrc * src, gboolean async); +static GstRTSPResult gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, + gboolean async); +static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, + gboolean async); +static GstRTSPResult gst_rtspsrc_close (GstRTSPSrc * src, gboolean async); static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri); static gboolean gst_rtspsrc_activate_streams (GstRTSPSrc * src); -static void gst_rtspsrc_loop (GstRTSPSrc * src); +static gboolean gst_rtspsrc_loop (GstRTSPSrc * src); static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream, GstEvent * event, gboolean source); static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source); /* commands we send to out loop to notify it of events */ -#define CMD_WAIT 0 -#define CMD_RECONNECT 1 -#define CMD_STOP 2 +#define CMD_OPEN 0 +#define CMD_PLAY 1 +#define CMD_PAUSE 2 +#define CMD_CLOSE 3 +#define CMD_WAIT 4 +#define CMD_RECONNECT 5 +#define CMD_STOP 6 +#define CMD_LOOP 7 + +#define GST_ELEMENT_PROGRESS(el, type, code, text) \ +G_STMT_START { \ + gchar *__txt = _gst_element_error_printf text; \ + gst_element_post_message (GST_ELEMENT_CAST (el), \ + gst_message_new_progress (GST_OBJECT_CAST (el), \ + GST_PROGRESS_TYPE_ ##type, code, __txt)); \ + g_free (__txt); \ +} G_STMT_END /*static guint gst_rtspsrc_signals[LAST_SIGNAL] = { 0 }; */ @@ -1657,7 +1673,7 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush) } else { event = gst_event_new_flush_stop (); GST_DEBUG_OBJECT (src, "stop flush"); - cmd = CMD_WAIT; + cmd = CMD_LOOP; state = GST_STATE_PLAYING; clock = gst_element_get_clock (GST_ELEMENT_CAST (src)); if (clock) { @@ -1850,14 +1866,14 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event) if (playing) { /* obtain current position in case seek fails */ gst_rtspsrc_get_position (src); - gst_rtspsrc_pause (src, FALSE); + gst_rtspsrc_pause (src, FALSE, FALSE); } gst_rtspsrc_do_seek (src, &seeksegment); /* and continue playing */ if (playing) - gst_rtspsrc_play (src, &seeksegment); + gst_rtspsrc_play (src, &seeksegment, FALSE); /* prepare for streaming again */ if (flush) { @@ -3307,6 +3323,8 @@ gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info) if (!info->connected) { /* connect */ + GST_ELEMENT_PROGRESS (src, CONTINUE, "connect", + ("Connecting to %s", info->location)); GST_DEBUG_OBJECT (src, "connecting (%s)...", info->location); if ((res = gst_rtsp_connection_connect (info->connection, @@ -3710,7 +3728,6 @@ interrupt: { gst_rtsp_message_unset (&message); GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush"); - /* unset flushing so we can do something else */ gst_rtspsrc_connection_flush (src, FALSE); return GST_FLOW_WRONG_STATE; } @@ -3752,108 +3769,101 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) GstRTSPMessage message = { 0 }; gint retry = 0; + while (TRUE) { + GTimeVal tv_timeout; + + /* get the next timeout interval */ + gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout); + + GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds", + (gint) tv_timeout.tv_sec); + + gst_rtsp_message_unset (&message); + /* we should continue reading the TCP socket because the server might + * send us requests. When the session timeout expires, we need to send a + * keep-alive request to keep the session open. */ + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, &tv_timeout); + + switch (res) { + case GST_RTSP_OK: + GST_DEBUG_OBJECT (src, "we received a server message"); + break; + case GST_RTSP_EINTR: + /* we got interrupted, see what we have to do */ + GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush"); + goto interrupt; + case GST_RTSP_ETIMEOUT: + /* send keep-alive, ignore the result, a warning will be posted. */ + GST_DEBUG_OBJECT (src, "timeout, sending keep-alive"); + gst_rtspsrc_send_keep_alive (src); + continue; + case GST_RTSP_EEOF: + /* server closed the connection. not very fatal for UDP, reconnect and + * see what happens. */ + GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), + ("The server closed the connection.")); + if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) < 0) + goto connect_error; + + continue; + default: + goto receive_error; + } + + switch (message.type) { + case GST_RTSP_MESSAGE_REQUEST: + /* server sends us a request message, handle it */ + res = + gst_rtspsrc_handle_request (src, src->conninfo.connection, + &message); + if (res == GST_RTSP_EEOF) + goto server_eof; + else if (res < 0) + goto handle_request_failed; + break; + case GST_RTSP_MESSAGE_RESPONSE: + /* we ignore response and data messages */ + GST_DEBUG_OBJECT (src, "ignoring response message"); + if (src->debug) + gst_rtsp_message_dump (&message); + if (message.type_data.response.code == GST_RTSP_STS_UNAUTHORIZED) { + GST_DEBUG_OBJECT (src, "but is Unauthorized response ..."); + if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) { + GST_DEBUG_OBJECT (src, "so retrying keep-alive"); + gst_rtspsrc_send_keep_alive (src); + } + } else { + retry = 0; + } + break; + case GST_RTSP_MESSAGE_DATA: + /* we ignore response and data messages */ + GST_DEBUG_OBJECT (src, "ignoring data message"); + break; + default: + GST_WARNING_OBJECT (src, "ignoring unknown message type %d", + message.type); + break; + } + } + +interrupt: + /* we get here when the connection got interrupted */ GST_OBJECT_LOCK (src); - if (src->loop_cmd == CMD_STOP) + gst_rtspsrc_connection_flush (src, FALSE); + GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd); + if (src->loop_cmd != CMD_RECONNECT) goto stopping; - while (src->loop_cmd == CMD_WAIT) { - GST_OBJECT_UNLOCK (src); - while (TRUE) { - GTimeVal tv_timeout; + /* when we get here we have to reconnect using tcp */ + src->loop_cmd = CMD_LOOP; - /* get the next timeout interval */ - gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout); - - GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds", - (gint) tv_timeout.tv_sec); - - gst_rtsp_message_unset (&message); - /* we should continue reading the TCP socket because the server might - * send us requests. When the session timeout expires, we need to send a - * keep-alive request to keep the session open. */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, - &message, &tv_timeout); - - switch (res) { - case GST_RTSP_OK: - GST_DEBUG_OBJECT (src, "we received a server message"); - break; - case GST_RTSP_EINTR: - /* we got interrupted, see what we have to do */ - GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush"); - /* unset flushing so we can do something else */ - gst_rtspsrc_connection_flush (src, FALSE); - goto interrupt; - case GST_RTSP_ETIMEOUT: - /* send keep-alive, ignore the result, a warning will be posted. */ - GST_DEBUG_OBJECT (src, "timeout, sending keep-alive"); - gst_rtspsrc_send_keep_alive (src); - continue; - case GST_RTSP_EEOF: - /* server closed the connection. not very fatal for UDP, reconnect and - * see what happens. */ - GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), - ("The server closed the connection.")); - if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) < 0) - goto connect_error; - - continue; - default: - goto receive_error; - } - - switch (message.type) { - case GST_RTSP_MESSAGE_REQUEST: - /* server sends us a request message, handle it */ - res = - gst_rtspsrc_handle_request (src, src->conninfo.connection, - &message); - if (res == GST_RTSP_EEOF) - goto server_eof; - else if (res < 0) - goto handle_request_failed; - break; - case GST_RTSP_MESSAGE_RESPONSE: - /* we ignore response and data messages */ - GST_DEBUG_OBJECT (src, "ignoring response message"); - if (src->debug) - gst_rtsp_message_dump (&message); - if (message.type_data.response.code == GST_RTSP_STS_UNAUTHORIZED) { - GST_DEBUG_OBJECT (src, "but is Unauthorized response ..."); - if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) { - GST_DEBUG_OBJECT (src, "so retrying keep-alive"); - gst_rtspsrc_send_keep_alive (src); - } - } else { - retry = 0; - } - break; - case GST_RTSP_MESSAGE_DATA: - /* we ignore response and data messages */ - GST_DEBUG_OBJECT (src, "ignoring data message"); - break; - default: - GST_WARNING_OBJECT (src, "ignoring unknown message type %d", - message.type); - break; - } - } - interrupt: - GST_OBJECT_LOCK (src); - GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd); - if (src->loop_cmd == CMD_STOP) - goto stopping; - } - if (src->loop_cmd == CMD_RECONNECT) { - /* when we get here we have to reconnect using tcp */ - src->loop_cmd = CMD_WAIT; - - /* only restart when the pads were not yet activated, else we were - * streaming over UDP */ - restart = src->need_activate; - } + /* only restart when the pads were not yet activated, else we were + * streaming over UDP */ + restart = src->need_activate; GST_OBJECT_UNLOCK (src); /* no need to restart, we're done */ @@ -3864,18 +3874,10 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP; /* pause to prepare for a restart */ - gst_rtspsrc_pause (src, FALSE); + gst_rtspsrc_pause (src, FALSE, FALSE); - if (src->task) { - /* stop task, we cannot join as this would deadlock, the task will stop when - * we exit this function below. */ - gst_task_stop (src->task); - /* and free the task so that _close will not stop/join it again. */ - gst_object_unref (GST_OBJECT (src->task)); - src->task = NULL; - } /* close and cleanup our state */ - gst_rtspsrc_close (src); + gst_rtspsrc_close (src, FALSE); /* see if we have TCP left to try. Also don't try TCP when we were configured * with an SDP. */ @@ -3890,11 +3892,11 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) gst_guint64_to_gdouble (src->udp_timeout / 1000000.0))); /* open new connection using tcp */ - if (!gst_rtspsrc_open (src)) + if (!gst_rtspsrc_open (src, FALSE)) goto open_failed; /* start playback */ - if (!gst_rtspsrc_play (src, &src->segment)) + if (!gst_rtspsrc_play (src, &src->segment, FALSE)) goto play_failed; done: @@ -3979,10 +3981,12 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) GST_DEBUG_OBJECT (src, "stop connection flush"); gst_rtspsrc_connection_flush (src, FALSE); } + if (src->task) + gst_task_start (src->task); GST_OBJECT_UNLOCK (src); } -static void +static gboolean gst_rtspsrc_loop (GstRTSPSrc * src) { GstFlowReturn ret; @@ -3995,7 +3999,7 @@ gst_rtspsrc_loop (GstRTSPSrc * src) if (ret != GST_FLOW_OK) goto pause; - return; + return TRUE; /* ERRORS */ pause: @@ -4004,10 +4008,6 @@ pause: GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason); src->running = FALSE; - if (src->task) { - /* can be NULL when we stopped and unreffed already */ - gst_task_pause (src->task); - } if (ret == GST_FLOW_UNEXPECTED) { /* perform EOS logic */ if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { @@ -4025,7 +4025,7 @@ pause: ("streaming task paused, reason %s (%d)", reason, ret)); gst_rtspsrc_push_event (src, gst_event_new_eos (), FALSE); } - return; + return FALSE; } } @@ -4400,7 +4400,6 @@ send_error: } receive_error: { - switch (res) { case GST_RTSP_EEOF: GST_WARNING_OBJECT (src, "server closed connection, doing reconnect"); @@ -4857,8 +4856,8 @@ gst_rtspsrc_stream_is_real_media (GstRTSPStream * stream) * This function will also configure the stream for the selected transport, * which basically means creating the pipeline. */ -static gboolean -gst_rtspsrc_setup_streams (GstRTSPSrc * src) +static GstRTSPResult +gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async) { GList *walk; GstRTSPResult res; @@ -5009,6 +5008,10 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src) g_free (hval); } + if (async) + GST_ELEMENT_PROGRESS (src, CONTINUE, "request", ("SETUP stream %d", + stream->id)); + /* handle the code ourselves */ if ((res = gst_rtspsrc_send (src, conn, &request, &response, &code) < 0)) goto send_error; @@ -5131,7 +5134,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src) if (!src->need_activate) goto nothing_to_activate; - return TRUE; + return res; /* ERRORS */ no_protocols: @@ -5139,7 +5142,7 @@ no_protocols: /* no transport possible, post an error and stop */ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Could not connect to server, no protocols left")); - return FALSE; + return GST_RTSP_ERROR; } create_request_failed: { @@ -5154,6 +5157,7 @@ setup_transport_failed: { GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), ("Could not setup transport.")); + res = GST_RTSP_ERROR; goto cleanup_error; } response_error: @@ -5162,6 +5166,7 @@ response_error: GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), ("Error (%d): %s", code, GST_STR_NULL (str))); + res = GST_RTSP_ERROR; goto cleanup_error; } send_error: @@ -5177,6 +5182,7 @@ no_transport: { GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), ("Server did not select transport.")); + res = GST_RTSP_ERROR; goto cleanup_error; } nothing_to_activate: @@ -5193,13 +5199,13 @@ nothing_to_activate: "more transport protocols or may otherwise be missing " "the right GStreamer RTSP extension plugin.")), (NULL)); } - return FALSE; + return GST_RTSP_ERROR; } cleanup_error: { gst_rtsp_message_unset (&request); gst_rtsp_message_unset (&response); - return FALSE; + return res; } } @@ -5272,9 +5278,11 @@ gst_rtspsrc_parse_range (GstRTSPSrc * src, const gchar * range, } /* must be called with the RTSP state lock */ -static gboolean -gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp) +static GstRTSPResult +gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp, + gboolean async) { + GstRTSPResult res; gint i, n_streams; /* prepare global stream caps properties */ @@ -5344,7 +5352,7 @@ gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp) GST_OBJECT_FLAG_SET (src, GST_ELEMENT_IS_SOURCE); /* setup streams */ - if (!gst_rtspsrc_setup_streams (src)) + if ((res = gst_rtspsrc_setup_streams (src, async)) < 0) goto setup_failed; /* reset our state */ @@ -5353,18 +5361,19 @@ gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp) src->state = GST_RTSP_STATE_READY; - return TRUE; + return res; /* ERRORS */ setup_failed: { GST_ERROR_OBJECT (src, "setup failed"); - return FALSE; + return res; } } -static gboolean -gst_rtspsrc_retrieve_sdp (GstRTSPSrc * src, GstSDPMessage ** sdp) +static GstRTSPResult +gst_rtspsrc_retrieve_sdp (GstRTSPSrc * src, GstSDPMessage ** sdp, + gboolean async) { GstRTSPResult res; GstRTSPMessage request = { 0 }; @@ -5394,8 +5403,13 @@ restart: /* send OPTIONS */ GST_DEBUG_OBJECT (src, "send options..."); - if (gst_rtspsrc_send (src, src->conninfo.connection, &request, &response, - NULL) < 0) + + if (async) + GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving server options")); + + if ((res = + gst_rtspsrc_send (src, src->conninfo.connection, &request, &response, + NULL)) < 0) goto send_error; /* parse OPTIONS */ @@ -5416,8 +5430,13 @@ restart: /* send DESCRIBE */ GST_DEBUG_OBJECT (src, "send describe..."); - if (gst_rtspsrc_send (src, src->conninfo.connection, &request, &response, - NULL) < 0) + + if (async) + GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving media info")); + + if ((res = + gst_rtspsrc_send (src, src->conninfo.connection, &request, &response, + NULL)) < 0) goto send_error; /* we only perform redirect for the describe, currently */ @@ -5460,7 +5479,7 @@ restart: gst_rtsp_message_unset (&request); gst_rtsp_message_unset (&response); - return TRUE; + return res; /* ERRORS */ no_url: @@ -5491,23 +5510,27 @@ send_error: { /* Don't post a message - the rtsp_send method will have * taken care of it because we passed NULL for the response code */ + goto cleanup_error; } methods_error: { /* error was posted */ + res = GST_RTSP_ERROR; goto cleanup_error; } wrong_content_type: { GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), ("Server does not support SDP, got %s.", respcont)); + res = GST_RTSP_ERROR; goto cleanup_error; } no_describe: { GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), ("Server can not provide an SDP.")); + res = GST_RTSP_ERROR; goto cleanup_error; } cleanup_error: @@ -5518,105 +5541,75 @@ cleanup_error: } gst_rtsp_message_unset (&request); gst_rtsp_message_unset (&response); - return FALSE; + return res; } } -static gboolean -gst_rtspsrc_open (GstRTSPSrc * src) +static void +gst_rtspsrc_open_async (GstRTSPSrc * src) { - gboolean res; + GST_ELEMENT_PROGRESS (src, START, "open", ("Opening Stream")); + gst_rtspsrc_loop_send_cmd (src, CMD_OPEN, FALSE); +} + +static GstRTSPResult +gst_rtspsrc_open (GstRTSPSrc * src, gboolean async) +{ + GstRTSPResult ret; src->methods = GST_RTSP_SETUP | GST_RTSP_PLAY | GST_RTSP_PAUSE | GST_RTSP_TEARDOWN; - GST_RTSP_STATE_LOCK (src); - if (src->sdp == NULL) { - if (!(res = gst_rtspsrc_retrieve_sdp (src, &src->sdp))) + if ((ret = gst_rtspsrc_retrieve_sdp (src, &src->sdp, async)) < 0) goto no_sdp; } - if (!(res = gst_rtspsrc_open_from_sdp (src, src->sdp))) + if ((ret = gst_rtspsrc_open_from_sdp (src, src->sdp, async)) < 0) goto open_failed; - GST_RTSP_STATE_UNLOCK (src); - - return res; +done: + if (async) { + if (ret == GST_RTSP_OK) + GST_ELEMENT_PROGRESS (src, COMPLETE, "open", ("Opened stream")); + else if (ret == GST_RTSP_EINTR) + GST_ELEMENT_PROGRESS (src, CANCELED, "open", ("Open canceled")); + else + GST_ELEMENT_PROGRESS (src, ERROR, "open", ("Open failed")); + } + return ret; /* ERRORS */ no_sdp: { GST_WARNING_OBJECT (src, "can't get sdp"); - GST_RTSP_STATE_UNLOCK (src); - return FALSE; + goto done; } open_failed: { GST_WARNING_OBJECT (src, "can't setup streaming from sdp"); - GST_RTSP_STATE_UNLOCK (src); - return FALSE; + goto done; } } -#if 0 -static gboolean -gst_rtspsrc_async_open (GstRTSPSrc * src) +static void +gst_rtspsrc_close_async (GstRTSPSrc * src) { - GError *error = NULL; - gboolean res = TRUE; - - src->thread = - g_thread_create ((GThreadFunc) gst_rtspsrc_open, src, TRUE, &error); - if (error != NULL) { - GST_ELEMENT_ERROR (src, RESOURCE, INIT, (NULL), - ("Could not start async thread (%s).", error->message)); - } - return res; + GST_ELEMENT_PROGRESS (src, START, "close", ("Closing Stream")); + gst_rtspsrc_loop_send_cmd (src, CMD_CLOSE, FALSE); } -#endif - -static gboolean -gst_rtspsrc_close (GstRTSPSrc * src) +static GstRTSPResult +gst_rtspsrc_close (GstRTSPSrc * src, gboolean async) { GstRTSPMessage request = { 0 }; GstRTSPMessage response = { 0 }; - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_OK; GList *walk; - gboolean ret = FALSE; gchar *control; GST_DEBUG_OBJECT (src, "TEARDOWN..."); - GST_RTSP_STATE_LOCK (src); - - gst_rtspsrc_loop_send_cmd (src, CMD_STOP, TRUE); - - /* stop task if any */ - if (src->task) { - /* release lock before trying to get the streamlock */ - GST_RTSP_STATE_UNLOCK (src); - - gst_task_stop (src->task); - - /* make sure it is not running */ - GST_RTSP_STREAM_LOCK (src); - GST_RTSP_STREAM_UNLOCK (src); - - /* now wait for the task to finish */ - gst_task_join (src->task); - - /* and free the task */ - gst_object_unref (GST_OBJECT (src->task)); - src->task = NULL; - - GST_RTSP_STATE_LOCK (src); - } - - /* make sure we're not flushing anymore */ - gst_rtspsrc_connection_flush (src, FALSE); - if (src->state < GST_RTSP_STATE_READY) { GST_DEBUG_OBJECT (src, "not ready, doing cleanup"); goto close; @@ -5658,7 +5651,12 @@ gst_rtspsrc_close (GstRTSPSrc * src) if (res < 0) goto create_request_failed; - if (gst_rtspsrc_send (src, info->connection, &request, &response, NULL) < 0) + if (async) + GST_ELEMENT_PROGRESS (src, CONTINUE, "close", ("Closing stream")); + + if ((res = + gst_rtspsrc_send (src, info->connection, &request, &response, + NULL)) < 0) goto send_error; /* FIXME, parse result? */ @@ -5684,24 +5682,35 @@ close: gst_rtspsrc_cleanup (src); src->state = GST_RTSP_STATE_INVALID; - GST_RTSP_STATE_UNLOCK (src); - return ret; + if (async) { + if (res == GST_RTSP_OK) + GST_ELEMENT_PROGRESS (src, COMPLETE, "close", ("Closed stream")); + else if (res == GST_RTSP_EINTR) + GST_ELEMENT_PROGRESS (src, CANCELED, "close", ("Close canceled")); + else + GST_ELEMENT_PROGRESS (src, ERROR, "close", ("Close failed")); + } + return res; /* ERRORS */ create_request_failed: { + gchar *str = gst_rtsp_strresult (res); + GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), - ("Could not create request.")); - ret = FALSE; + ("Could not create request. (%s)", str)); + g_free (str); goto close; } send_error: { + gchar *str = gst_rtsp_strresult (res); + gst_rtsp_message_unset (&request); GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message.")); - ret = FALSE; + ("Could not send message. (%s)", str)); + g_free (str); goto close; } not_supported: @@ -5824,19 +5833,24 @@ clear_rtp_base (GstRTSPSrc * src, GstRTSPStream * stream) } } -static gboolean -gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) +static void +gst_rtspsrc_play_async (GstRTSPSrc * src) +{ + GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PLAY request")); + gst_rtspsrc_loop_send_cmd (src, CMD_PLAY, FALSE); +} + +static GstRTSPResult +gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) { GstRTSPMessage request = { 0 }; GstRTSPMessage response = { 0 }; - GstRTSPResult res; + GstRTSPResult res = GST_RTSP_OK; GList *walk; gchar *hval; gint hval_idx; gchar *control; - GST_RTSP_STATE_LOCK (src); - GST_DEBUG_OBJECT (src, "PLAY..."); if (!(src->methods & GST_RTSP_PLAY)) @@ -5855,8 +5869,12 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) GST_DEBUG_OBJECT (src, "connection is idle now"); GST_RTSP_CONN_UNLOCK (src); - GST_DEBUG_OBJECT (src, "stop connection flush"); - gst_rtspsrc_connection_flush (src, FALSE); + /* send some dummy packets before we activate the receive in the + * udp sources */ + gst_rtspsrc_send_dummy_packets (src); + + /* activate receive elements */ + gst_element_set_state (GST_ELEMENT_CAST (src), GST_STATE_PLAYING); /* construct a control url */ if (src->control) @@ -5905,7 +5923,10 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) gst_rtsp_message_add_header (&request, GST_RTSP_HDR_SPEED, hval); } - if (gst_rtspsrc_send (src, conn, &request, &response, NULL) < 0) + if (async) + GST_ELEMENT_PROGRESS (src, CONTINUE, "request", ("Sending PLAY request")); + + if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0) goto send_error; /* seek may have silently failed as it is not supported */ @@ -5971,18 +5992,10 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) /* configure the caps of the streams after we parsed all headers. */ gst_rtspsrc_configure_caps (src, segment); - /* for interleaved transport, we receive the data on the RTSP connection - * instead of UDP. We start a task to select and read from that connection. - * For UDP we start the task as well to look for server info and UDP timeouts. */ - if (src->task == NULL) { - src->task = gst_task_create ((GstTaskFunction) gst_rtspsrc_loop, src); - gst_task_set_lock (src->task, GST_RTSP_STREAM_GET_LOCK (src)); - } src->running = TRUE; src->base_time = -1; src->state = GST_RTSP_STATE_PLAYING; - gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, FALSE); - gst_task_start (src->task); + src->loop_cmd = CMD_LOOP; /* mark discont */ GST_DEBUG_OBJECT (src, "mark DISCONT, we did a seek to another position"); @@ -5992,9 +6005,16 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment) } done: - GST_RTSP_STATE_UNLOCK (src); - - return TRUE; + if (async) { + if (res == GST_RTSP_OK) + GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("PLAY request sent")); + else if (res == GST_RTSP_EINTR) + GST_ELEMENT_PROGRESS (src, CANCELED, "request", + ("PLAY request canceled")); + else + GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PLAY request failed")); + } + return res; /* ERRORS */ not_supported: @@ -6009,31 +6029,41 @@ was_playing: } create_request_failed: { - GST_RTSP_STATE_UNLOCK (src); + gchar *str = gst_rtsp_strresult (res); + GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), - ("Could not create request.")); - return FALSE; + ("Could not create request. (%s)", str)); + g_free (str); + goto done; } send_error: { - GST_RTSP_STATE_UNLOCK (src); + gchar *str = gst_rtsp_strresult (res); + gst_rtsp_message_unset (&request); GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message.")); - return FALSE; + ("Could not send message. (%s)", str)); + g_free (str); + goto done; } } -static gboolean -gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle) +static void +gst_rtspsrc_pause_async (GstRTSPSrc * src) { + GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PAUSE request")); + gst_rtspsrc_loop_send_cmd (src, CMD_PAUSE, FALSE); +} + +static GstRTSPResult +gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async) +{ + GstRTSPResult res = GST_RTSP_OK; GstRTSPMessage request = { 0 }; GstRTSPMessage response = { 0 }; GList *walk; gchar *control; - GST_RTSP_STATE_LOCK (src); - GST_DEBUG_OBJECT (src, "PAUSE..."); if (!(src->methods & GST_RTSP_PAUSE)) @@ -6052,9 +6082,6 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle) if (!src->conninfo.connection || !src->conninfo.connected) goto no_connection; - GST_DEBUG_OBJECT (src, "stop connection flush"); - gst_rtspsrc_connection_flush (src, FALSE); - /* construct a control url */ if (src->control) control = src->control; @@ -6082,10 +6109,16 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle) continue; } - if (gst_rtsp_message_init_request (&request, GST_RTSP_PAUSE, setup_url) < 0) + if (async) + GST_ELEMENT_PROGRESS (src, CONTINUE, "request", + ("Sending PAUSE request")); + + if ((res = + gst_rtsp_message_init_request (&request, GST_RTSP_PAUSE, + setup_url)) < 0) goto create_request_failed; - if (gst_rtspsrc_send (src, conn, &request, &response, NULL) < 0) + if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0) goto send_error; gst_rtsp_message_unset (&request); @@ -6099,17 +6132,23 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle) if (idle && src->task) { GST_DEBUG_OBJECT (src, "starting idle task again"); src->base_time = -1; - gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, FALSE); - gst_task_start (src->task); + src->loop_cmd = CMD_LOOP; } no_connection: src->state = GST_RTSP_STATE_READY; done: - GST_RTSP_STATE_UNLOCK (src); - - return TRUE; + if (async) { + if (res == GST_RTSP_OK) + GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("PAUSE request sent")); + else if (res == GST_RTSP_EINTR) + GST_ELEMENT_PROGRESS (src, CANCELED, "request", + ("PAUSE request canceled")); + else + GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PAUSE request failed")); + } + return res; /* ERRORS */ not_supported: @@ -6124,18 +6163,22 @@ was_paused: } create_request_failed: { - GST_RTSP_STATE_UNLOCK (src); + gchar *str = gst_rtsp_strresult (res); + GST_ELEMENT_ERROR (src, LIBRARY, INIT, (NULL), - ("Could not create request.")); - return FALSE; + ("Could not create request. (%s)", str)); + g_free (str); + goto done; } send_error: { - GST_RTSP_STATE_UNLOCK (src); + gchar *str = gst_rtsp_strresult (res); + gst_rtsp_message_unset (&request); GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message.")); - return FALSE; + ("Could not send message. (%s)", str)); + g_free (str); + goto done; } } @@ -6219,6 +6262,111 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message) } } +/* the thread where everything happens */ +static void +gst_rtspsrc_thread (GstRTSPSrc * src) +{ + gint cmd; + GstRTSPResult ret; + gboolean running = FALSE; + + GST_OBJECT_LOCK (src); + cmd = src->loop_cmd; + src->loop_cmd = CMD_WAIT; + GST_DEBUG_OBJECT (src, "got command %d", cmd); + GST_OBJECT_UNLOCK (src); + + switch (cmd) { + case CMD_OPEN: + src->cur_protocols = src->protocols; + /* first attempt, don't ignore timeouts */ + src->ignore_timeout = FALSE; + ret = gst_rtspsrc_open (src, TRUE); + break; + case CMD_PLAY: + ret = gst_rtspsrc_play (src, &src->segment, TRUE); + break; + case CMD_PAUSE: + ret = gst_rtspsrc_pause (src, TRUE, TRUE); + break; + case CMD_CLOSE: + ret = gst_rtspsrc_close (src, TRUE); + break; + case CMD_LOOP: + running = gst_rtspsrc_loop (src); + break; + default: + break; + } + + GST_OBJECT_LOCK (src); + /* and go back to sleep */ + if (!running && src->loop_cmd == CMD_WAIT && src->task) + gst_task_pause (src->task); + GST_OBJECT_UNLOCK (src); +} + +static gboolean +gst_rtspsrc_start (GstRTSPSrc * src) +{ + GST_DEBUG_OBJECT (src, "starting"); + + GST_OBJECT_LOCK (src); + + src->loop_cmd = CMD_WAIT; + + if (src->task == NULL) { + src->task = gst_task_create ((GstTaskFunction) gst_rtspsrc_thread, src); + if (src->task == NULL) + goto task_error; + + gst_task_set_lock (src->task, GST_RTSP_STREAM_GET_LOCK (src)); + } + GST_OBJECT_UNLOCK (src); + + return TRUE; + + /* ERRORS */ +task_error: + { + GST_ERROR_OBJECT (src, "failed to create task"); + return FALSE; + } +} + +static gboolean +gst_rtspsrc_stop (GstRTSPSrc * src) +{ + GstTask *task; + + GST_DEBUG_OBJECT (src, "stopping"); + + gst_rtspsrc_connection_flush (src, TRUE); + + GST_OBJECT_LOCK (src); + if ((task = src->task)) { + src->task = NULL; + GST_OBJECT_UNLOCK (src); + + gst_task_stop (task); + + /* make sure it is not running */ + GST_RTSP_STREAM_LOCK (src); + GST_RTSP_STREAM_UNLOCK (src); + + /* now wait for the task to finish */ + gst_task_join (task); + + /* and free the task */ + gst_object_unref (GST_OBJECT (task)); + + GST_OBJECT_LOCK (src); + } + GST_OBJECT_UNLOCK (src); + + return TRUE; +} + static GstStateChangeReturn gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) { @@ -6229,25 +6377,18 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: + if (!gst_rtspsrc_start (rtspsrc)) + goto start_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: - rtspsrc->cur_protocols = rtspsrc->protocols; - /* first attempt, don't ignore timeouts */ - rtspsrc->ignore_timeout = FALSE; - if (!gst_rtspsrc_open (rtspsrc)) - goto open_failed; + gst_rtspsrc_open_async (rtspsrc); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - GST_DEBUG_OBJECT (rtspsrc, "PAUSED->PLAYING: stop connection flush"); - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_STOP, TRUE); - /* send some dummy packets before we chain up to the parent to activate - * the receive in the udp sources */ - gst_rtspsrc_send_dummy_packets (rtspsrc); - break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + /* unblock the tcp tasks and make the loop waiting */ + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_WAIT, TRUE); + break; case GST_STATE_CHANGE_PAUSED_TO_READY: - GST_DEBUG_OBJECT (rtspsrc, "state change: sending stop command"); - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_STOP, TRUE); break; default: break; @@ -6259,21 +6400,21 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - /* chained up to parent so the udp sources are activated and receiving */ - gst_rtspsrc_play (rtspsrc, &rtspsrc->segment); + gst_rtspsrc_play_async (rtspsrc); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* send pause request and keep the idle task around */ - gst_rtspsrc_pause (rtspsrc, TRUE); + gst_rtspsrc_pause_async (rtspsrc); ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_READY_TO_PAUSED: ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_rtspsrc_close (rtspsrc); + gst_rtspsrc_close_async (rtspsrc); break; case GST_STATE_CHANGE_READY_TO_NULL: + gst_rtspsrc_stop (rtspsrc); break; default: break; @@ -6282,9 +6423,9 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) done: return ret; -open_failed: +start_failed: { - GST_DEBUG_OBJECT (rtspsrc, "open failed"); + GST_DEBUG_OBJECT (rtspsrc, "start failed"); return GST_STATE_CHANGE_FAILURE; } } From 220e47adcf3c6471bbcb17e85b9fa124ba35ae8b Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jan 2011 13:43:06 +0100 Subject: [PATCH 10/19] rtspsrc: implement more async handling Remove some old locks. Make sure we never go into the loop function when flushing. --- gst/rtsp/gstrtspsrc.c | 111 +++++++++++++++++++++++------------------- gst/rtsp/gstrtspsrc.h | 9 +--- 2 files changed, 63 insertions(+), 57 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 404a362152..b2c1c42bbb 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -270,8 +270,7 @@ static gboolean gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, #define CMD_CLOSE 3 #define CMD_WAIT 4 #define CMD_RECONNECT 5 -#define CMD_STOP 6 -#define CMD_LOOP 7 +#define CMD_LOOP 6 #define GST_ELEMENT_PROGRESS(el, type, code, text) \ G_STMT_START { \ @@ -527,10 +526,6 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class) src->state_rec_lock = g_new (GStaticRecMutex, 1); g_static_rec_mutex_init (src->state_rec_lock); - /* protects access to the server connection */ - src->conn_rec_lock = g_new (GStaticRecMutex, 1); - g_static_rec_mutex_init (src->conn_rec_lock); - src->state = GST_RTSP_STATE_INVALID; } @@ -558,8 +553,6 @@ gst_rtspsrc_finalize (GObject * object) g_free (rtspsrc->stream_rec_lock); g_static_rec_mutex_free (rtspsrc->state_rec_lock); g_free (rtspsrc->state_rec_lock); - g_static_rec_mutex_free (rtspsrc->conn_rec_lock); - g_free (rtspsrc->conn_rec_lock); #ifdef G_OS_WIN32 WSACleanup (); @@ -1668,7 +1661,7 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush) if (flush) { event = gst_event_new_flush_start (); GST_DEBUG_OBJECT (src, "start flush"); - cmd = CMD_STOP; + cmd = CMD_WAIT; state = GST_STATE_PAUSED; } else { event = gst_event_new_flush_stop (); @@ -1718,12 +1711,10 @@ gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn, { GstRTSPResult ret; - GST_RTSP_CONN_LOCK (src); if (conn) ret = gst_rtsp_connection_send (conn, message, timeout); else ret = GST_RTSP_ERROR; - GST_RTSP_CONN_UNLOCK (src); return ret; } @@ -1734,12 +1725,10 @@ gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn, { GstRTSPResult ret; - GST_RTSP_CONN_LOCK (src); if (conn) ret = gst_rtsp_connection_receive (conn, message, timeout); else ret = GST_RTSP_ERROR; - GST_RTSP_CONN_UNLOCK (src); return ret; } @@ -3537,8 +3526,10 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) /* see if the timeout period expired */ if ((tv_timeout.tv_sec | tv_timeout.tv_usec) == 0) { GST_DEBUG_OBJECT (src, "timout, sending keep-alive"); - /* send keep-alive, ignore the result, a warning will be posted. */ - gst_rtspsrc_send_keep_alive (src); + /* send keep-alive, only act on interrupt, a warning will be posted for + * other errors. */ + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; /* get new timeout */ gst_rtsp_connection_next_timeout (src->conninfo.connection, &tv_timeout); } @@ -3546,11 +3537,23 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec", tv_timeout.tv_sec, tv_timeout.tv_usec); - /* protect the connection with the connection lock so that we can see when - * we are finished doing server communication */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, &message, - src->ptcp_timeout); + GST_OBJECT_LOCK (src); + if (src->loop_cmd == CMD_LOOP && !src->flushing) { + src->waiting = TRUE; + GST_OBJECT_UNLOCK (src); + + /* protect the connection with the connection lock so that we can see when + * we are finished doing server communication */ + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, src->ptcp_timeout); + + GST_OBJECT_LOCK (src); + src->waiting = FALSE; + } else { + res = GST_RTSP_EINTR; + } + GST_OBJECT_UNLOCK (src); switch (res) { case GST_RTSP_OK: @@ -3562,7 +3565,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) case GST_RTSP_ETIMEOUT: /* no reply, send keep alive */ GST_DEBUG_OBJECT (src, "timeout, sending keep-alive"); - gst_rtspsrc_send_keep_alive (src); + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; continue; case GST_RTSP_EEOF: /* go EOS when the server closed the connection */ @@ -3779,12 +3783,25 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) (gint) tv_timeout.tv_sec); gst_rtsp_message_unset (&message); - /* we should continue reading the TCP socket because the server might - * send us requests. When the session timeout expires, we need to send a - * keep-alive request to keep the session open. */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, - &message, &tv_timeout); + + GST_OBJECT_LOCK (src); + if (src->loop_cmd == CMD_LOOP && !src->flushing) { + src->waiting = TRUE; + GST_OBJECT_UNLOCK (src); + + /* we should continue reading the TCP socket because the server might + * send us requests. When the session timeout expires, we need to send a + * keep-alive request to keep the session open. */ + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, &tv_timeout); + + GST_OBJECT_LOCK (src); + src->waiting = FALSE; + } else { + res = GST_RTSP_EINTR; + } + GST_OBJECT_UNLOCK (src); switch (res) { case GST_RTSP_OK: @@ -3797,7 +3814,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) case GST_RTSP_ETIMEOUT: /* send keep-alive, ignore the result, a warning will be posted. */ GST_DEBUG_OBJECT (src, "timeout, sending keep-alive"); - gst_rtspsrc_send_keep_alive (src); + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; continue; case GST_RTSP_EEOF: /* server closed the connection. not very fatal for UDP, reconnect and @@ -3832,7 +3850,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "but is Unauthorized response ..."); if (gst_rtspsrc_setup_auth (src, &message) && !(retry++)) { GST_DEBUG_OBJECT (src, "so retrying keep-alive"); - gst_rtspsrc_send_keep_alive (src); + if ((res = gst_rtspsrc_send_keep_alive (src)) == GST_RTSP_EINTR) + goto interrupt; } } else { retry = 0; @@ -3857,7 +3876,6 @@ interrupt: if (src->loop_cmd != CMD_RECONNECT) goto stopping; - /* when we get here we have to reconnect using tcp */ src->loop_cmd = CMD_LOOP; @@ -3974,12 +3992,14 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) { GST_OBJECT_LOCK (src); src->loop_cmd = cmd; + src->flushing = flush; if (flush) { - GST_DEBUG_OBJECT (src, "start connection flush"); - gst_rtspsrc_connection_flush (src, TRUE); + if (src->waiting) { + GST_DEBUG_OBJECT (src, "start connection flush"); + gst_rtspsrc_connection_flush (src, TRUE); + } } else { GST_DEBUG_OBJECT (src, "stop connection flush"); - gst_rtspsrc_connection_flush (src, FALSE); } if (src->task) gst_task_start (src->task); @@ -5862,13 +5882,6 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) if (!src->conninfo.connection || !src->conninfo.connected) goto done; - /* waiting for connection idle, we were flushing so any attempt at doing data - * transfer will result in pausing the tasks. */ - GST_DEBUG_OBJECT (src, "wait for connection idle"); - GST_RTSP_CONN_LOCK (src); - GST_DEBUG_OBJECT (src, "connection is idle now"); - GST_RTSP_CONN_UNLOCK (src); - /* send some dummy packets before we activate the receive in the * udp sources */ gst_rtspsrc_send_dummy_packets (src); @@ -6072,13 +6085,6 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async) if (src->state == GST_RTSP_STATE_READY) goto was_paused; - /* waiting for connection idle, we were flushing so any attempt at doing data - * transfer will result in pausing the tasks. */ - GST_DEBUG_OBJECT (src, "wait for connection idle"); - GST_RTSP_CONN_LOCK (src); - GST_DEBUG_OBJECT (src, "connection is idle now"); - GST_RTSP_CONN_UNLOCK (src); - if (!src->conninfo.connection || !src->conninfo.connected) goto no_connection; @@ -6272,7 +6278,8 @@ gst_rtspsrc_thread (GstRTSPSrc * src) GST_OBJECT_LOCK (src); cmd = src->loop_cmd; - src->loop_cmd = CMD_WAIT; + if (cmd != CMD_LOOP || src->flushing) + src->loop_cmd = CMD_WAIT; GST_DEBUG_OBJECT (src, "got command %d", cmd); GST_OBJECT_UNLOCK (src); @@ -6301,8 +6308,12 @@ gst_rtspsrc_thread (GstRTSPSrc * src) GST_OBJECT_LOCK (src); /* and go back to sleep */ - if (!running && src->loop_cmd == CMD_WAIT && src->task) - gst_task_pause (src->task); + if (src->loop_cmd == CMD_WAIT) { + if (running) + src->loop_cmd = CMD_LOOP; + else if (src->task) + gst_task_pause (src->task); + } GST_OBJECT_UNLOCK (src); } diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index c09442bc24..b5744248cc 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -79,10 +79,6 @@ typedef struct _GstRTSPSrcClass GstRTSPSrcClass; #define GST_RTSP_STREAM_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_STREAM_GET_LOCK(rtsp))) #define GST_RTSP_STREAM_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_STREAM_GET_LOCK(rtsp))) -#define GST_RTSP_CONN_GET_LOCK(rtsp) (GST_RTSPSRC_CAST(rtsp)->conn_rec_lock) -#define GST_RTSP_CONN_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_CONN_GET_LOCK(rtsp))) -#define GST_RTSP_CONN_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_CONN_GET_LOCK(rtsp))) - typedef struct _GstRTSPConnInfo GstRTSPConnInfo; struct _GstRTSPConnInfo { @@ -184,13 +180,12 @@ struct _GstRTSPSrc { /* UDP mode loop */ gint loop_cmd; gboolean ignore_timeout; + gboolean flushing; + gboolean waiting; /* mutex for protecting state changes */ GStaticRecMutex *state_rec_lock; - /* mutex for protecting the connection */ - GStaticRecMutex *conn_rec_lock; - GstSDPMessage *sdp; gboolean from_sdp; gint numstreams; From 852c6e11cd3f0f7e3911b5dcf35578bd36d54dd8 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jan 2011 15:15:49 +0100 Subject: [PATCH 11/19] rtspsrc: don't post errors when interrupting --- gst/rtsp/gstrtspsrc.c | 57 +++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index b2c1c42bbb..a0e11bed47 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -4413,8 +4413,12 @@ send_error: { gchar *str = gst_rtsp_strresult (res); - GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message. (%s)", str)); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), + ("Could not send message. (%s)", str)); + } else { + GST_WARNING_OBJECT (src, "send interrupted"); + } g_free (str); return res; } @@ -4434,8 +4438,12 @@ receive_error: { gchar *str = gst_rtsp_strresult (res); - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not receive message. (%s)", str)); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Could not receive message. (%s)", str)); + } else { + GST_WARNING_OBJECT (src, "receive interrupted"); + } g_free (str); break; } @@ -5193,8 +5201,12 @@ send_error: { gchar *str = gst_rtsp_strresult (res); - GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message. (%s)", str)); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), + ("Could not send message. (%s)", str)); + } else { + GST_WARNING_OBJECT (src, "send interrupted"); + } g_free (str); goto cleanup_error; } @@ -5512,8 +5524,12 @@ connect_failed: { gchar *str = gst_rtsp_strresult (res); - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - ("Failed to connect. (%s)", str)); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + ("Failed to connect. (%s)", str)); + } else { + GST_WARNING_OBJECT (src, "connect interrupted"); + } g_free (str); goto cleanup_error; } @@ -5530,7 +5546,6 @@ send_error: { /* Don't post a message - the rtsp_send method will have * taken care of it because we passed NULL for the response code */ - goto cleanup_error; } methods_error: @@ -5728,8 +5743,12 @@ send_error: gchar *str = gst_rtsp_strresult (res); gst_rtsp_message_unset (&request); - GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message. (%s)", str)); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), + ("Could not send message. (%s)", str)); + } else { + GST_WARNING_OBJECT (src, "TEARDOWN interrupted"); + } g_free (str); goto close; } @@ -6054,8 +6073,12 @@ send_error: gchar *str = gst_rtsp_strresult (res); gst_rtsp_message_unset (&request); - GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message. (%s)", str)); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), + ("Could not send message. (%s)", str)); + } else { + GST_WARNING_OBJECT (src, "PLAY interrupted"); + } g_free (str); goto done; } @@ -6181,8 +6204,12 @@ send_error: gchar *str = gst_rtsp_strresult (res); gst_rtsp_message_unset (&request); - GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message. (%s)", str)); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), + ("Could not send message. (%s)", str)); + } else { + GST_WARNING_OBJECT (src, "PAUSE interrupted"); + } g_free (str); goto done; } From c27c10f8f49379d6f9d7aad165ae2b9af0379c93 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jan 2011 17:19:59 +0100 Subject: [PATCH 12/19] rtspsrc: small cleanups Make sure we cancel the previous task when queuing a new one. Move the messages to a central place so we can more easily post them. --- gst/rtsp/gstrtspsrc.c | 207 ++++++++++++++++++++++++++---------------- 1 file changed, 130 insertions(+), 77 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index a0e11bed47..f372156f01 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -3876,9 +3876,6 @@ interrupt: if (src->loop_cmd != CMD_RECONNECT) goto stopping; - /* when we get here we have to reconnect using tcp */ - src->loop_cmd = CMD_LOOP; - /* only restart when the pads were not yet activated, else we were * streaming over UDP */ restart = src->need_activate; @@ -3987,10 +3984,118 @@ server_eof: } } +static void +gst_rtspsrc_loop_start_cmd (GstRTSPSrc * src, gint cmd) +{ + switch (cmd) { + case CMD_OPEN: + GST_ELEMENT_PROGRESS (src, START, "open", ("Opening Stream")); + break; + case CMD_PLAY: + GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PLAY request")); + break; + case CMD_PAUSE: + GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PAUSE request")); + break; + case CMD_CLOSE: + GST_ELEMENT_PROGRESS (src, START, "close", ("Closing Stream")); + break; + default: + break; + } +} + +static void +gst_rtspsrc_loop_complete_cmd (GstRTSPSrc * src, gint cmd) +{ + switch (cmd) { + case CMD_OPEN: + GST_ELEMENT_PROGRESS (src, COMPLETE, "open", ("Opened Stream")); + break; + case CMD_PLAY: + GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("Sent PLAY request")); + break; + case CMD_PAUSE: + GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("Sent PAUSE request")); + break; + case CMD_CLOSE: + GST_ELEMENT_PROGRESS (src, COMPLETE, "close", ("Closed Stream")); + break; + default: + break; + } +} + +static void +gst_rtspsrc_loop_cancel_cmd (GstRTSPSrc * src, gint cmd) +{ + switch (cmd) { + case CMD_OPEN: + GST_ELEMENT_PROGRESS (src, CANCELED, "open", ("Open canceled")); + break; + case CMD_PLAY: + GST_ELEMENT_PROGRESS (src, CANCELED, "request", ("PLAY canceled")); + break; + case CMD_PAUSE: + GST_ELEMENT_PROGRESS (src, CANCELED, "request", ("PAUSE canceled")); + break; + case CMD_CLOSE: + GST_ELEMENT_PROGRESS (src, CANCELED, "close", ("Close canceled")); + break; + default: + break; + } +} + +static void +gst_rtspsrc_loop_error_cmd (GstRTSPSrc * src, gint cmd) +{ + switch (cmd) { + case CMD_OPEN: + GST_ELEMENT_PROGRESS (src, ERROR, "open", ("Open failed")); + break; + case CMD_PLAY: + GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PLAY failed")); + break; + case CMD_PAUSE: + GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PAUSE failed")); + break; + case CMD_CLOSE: + GST_ELEMENT_PROGRESS (src, ERROR, "close", ("Close failed")); + break; + default: + break; + } +} + +static void +gst_rtspsrc_loop_end_cmd (GstRTSPSrc * src, gint cmd, GstRTSPResult ret) +{ + if (ret == GST_RTSP_OK) + gst_rtspsrc_loop_complete_cmd (src, cmd); + else if (ret == GST_RTSP_EINTR) + gst_rtspsrc_loop_cancel_cmd (src, cmd); + else + gst_rtspsrc_loop_error_cmd (src, cmd); +} + static void gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) { + gint old; + + /* start new request */ + gst_rtspsrc_loop_start_cmd (src, cmd); + GST_OBJECT_LOCK (src); + old = src->loop_cmd; + if (old != CMD_WAIT) { + src->loop_cmd = CMD_WAIT; + GST_OBJECT_UNLOCK (src); + /* cancel previous request */ + gst_rtspsrc_loop_cancel_cmd (src, old); + GST_OBJECT_LOCK (src); + } src->loop_cmd = cmd; src->flushing = flush; if (flush) { @@ -5580,13 +5685,6 @@ cleanup_error: } } -static void -gst_rtspsrc_open_async (GstRTSPSrc * src) -{ - GST_ELEMENT_PROGRESS (src, START, "open", ("Opening Stream")); - gst_rtspsrc_loop_send_cmd (src, CMD_OPEN, FALSE); -} - static GstRTSPResult gst_rtspsrc_open (GstRTSPSrc * src, gboolean async) { @@ -5604,14 +5702,9 @@ gst_rtspsrc_open (GstRTSPSrc * src, gboolean async) goto open_failed; done: - if (async) { - if (ret == GST_RTSP_OK) - GST_ELEMENT_PROGRESS (src, COMPLETE, "open", ("Opened stream")); - else if (ret == GST_RTSP_EINTR) - GST_ELEMENT_PROGRESS (src, CANCELED, "open", ("Open canceled")); - else - GST_ELEMENT_PROGRESS (src, ERROR, "open", ("Open failed")); - } + if (async) + gst_rtspsrc_loop_end_cmd (src, CMD_OPEN, ret); + return ret; /* ERRORS */ @@ -5627,13 +5720,6 @@ open_failed: } } -static void -gst_rtspsrc_close_async (GstRTSPSrc * src) -{ - GST_ELEMENT_PROGRESS (src, START, "close", ("Closing Stream")); - gst_rtspsrc_loop_send_cmd (src, CMD_CLOSE, FALSE); -} - static GstRTSPResult gst_rtspsrc_close (GstRTSPSrc * src, gboolean async) { @@ -5718,14 +5804,9 @@ close: src->state = GST_RTSP_STATE_INVALID; - if (async) { - if (res == GST_RTSP_OK) - GST_ELEMENT_PROGRESS (src, COMPLETE, "close", ("Closed stream")); - else if (res == GST_RTSP_EINTR) - GST_ELEMENT_PROGRESS (src, CANCELED, "close", ("Close canceled")); - else - GST_ELEMENT_PROGRESS (src, ERROR, "close", ("Close failed")); - } + if (async) + gst_rtspsrc_loop_end_cmd (src, CMD_CLOSE, res); + return res; /* ERRORS */ @@ -5872,13 +5953,6 @@ clear_rtp_base (GstRTSPSrc * src, GstRTSPStream * stream) } } -static void -gst_rtspsrc_play_async (GstRTSPSrc * src) -{ - GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PLAY request")); - gst_rtspsrc_loop_send_cmd (src, CMD_PLAY, FALSE); -} - static GstRTSPResult gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) { @@ -6027,7 +6101,6 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) src->running = TRUE; src->base_time = -1; src->state = GST_RTSP_STATE_PLAYING; - src->loop_cmd = CMD_LOOP; /* mark discont */ GST_DEBUG_OBJECT (src, "mark DISCONT, we did a seek to another position"); @@ -6037,15 +6110,9 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) } done: - if (async) { - if (res == GST_RTSP_OK) - GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("PLAY request sent")); - else if (res == GST_RTSP_EINTR) - GST_ELEMENT_PROGRESS (src, CANCELED, "request", - ("PLAY request canceled")); - else - GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PLAY request failed")); - } + if (async) + gst_rtspsrc_loop_end_cmd (src, CMD_PLAY, res); + return res; /* ERRORS */ @@ -6084,13 +6151,6 @@ send_error: } } -static void -gst_rtspsrc_pause_async (GstRTSPSrc * src) -{ - GST_ELEMENT_PROGRESS (src, START, "request", ("Sending PAUSE request")); - gst_rtspsrc_loop_send_cmd (src, CMD_PAUSE, FALSE); -} - static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async) { @@ -6158,25 +6218,13 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async) break; } - if (idle && src->task) { - GST_DEBUG_OBJECT (src, "starting idle task again"); - src->base_time = -1; - src->loop_cmd = CMD_LOOP; - } - no_connection: src->state = GST_RTSP_STATE_READY; done: - if (async) { - if (res == GST_RTSP_OK) - GST_ELEMENT_PROGRESS (src, COMPLETE, "request", ("PAUSE request sent")); - else if (res == GST_RTSP_EINTR) - GST_ELEMENT_PROGRESS (src, CANCELED, "request", - ("PAUSE request canceled")); - else - GST_ELEMENT_PROGRESS (src, ERROR, "request", ("PAUSE request failed")); - } + if (async) + gst_rtspsrc_loop_end_cmd (src, CMD_PAUSE, res); + return res; /* ERRORS */ @@ -6319,9 +6367,13 @@ gst_rtspsrc_thread (GstRTSPSrc * src) break; case CMD_PLAY: ret = gst_rtspsrc_play (src, &src->segment, TRUE); + if (ret == GST_RTSP_OK) + running = TRUE; break; case CMD_PAUSE: ret = gst_rtspsrc_pause (src, TRUE, TRUE); + if (ret == GST_RTSP_OK) + running = TRUE; break; case CMD_CLOSE: ret = gst_rtspsrc_close (src, TRUE); @@ -6379,7 +6431,8 @@ gst_rtspsrc_stop (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "stopping"); - gst_rtspsrc_connection_flush (src, TRUE); + /* also cancels pending task */ + gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, TRUE); GST_OBJECT_LOCK (src); if ((task = src->task)) { @@ -6419,7 +6472,7 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) goto start_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: - gst_rtspsrc_open_async (rtspsrc); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_OPEN, FALSE); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: case GST_STATE_CHANGE_PLAYING_TO_PAUSED: @@ -6438,18 +6491,18 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - gst_rtspsrc_play_async (rtspsrc); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PLAY, FALSE); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* send pause request and keep the idle task around */ - gst_rtspsrc_pause_async (rtspsrc); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_PAUSE, FALSE); ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_READY_TO_PAUSED: ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_PAUSED_TO_READY: - gst_rtspsrc_close_async (rtspsrc); + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_CLOSE, FALSE); break; case GST_STATE_CHANGE_READY_TO_NULL: gst_rtspsrc_stop (rtspsrc); From 25132074330af7e52cc4c80cc465385364120f01 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jan 2011 18:02:49 +0100 Subject: [PATCH 13/19] rtspsrc: rework reconnect code Use the same async code path to implement reconnects. Make sure we only post progress messages when doing async things. --- gst/rtsp/gstrtspsrc.c | 163 +++++++++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 67 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index f372156f01..e838726c51 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -3277,7 +3277,8 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event, gboolean source) } static GstRTSPResult -gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info) +gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info, + gboolean async) { GstRTSPResult res; @@ -3312,8 +3313,9 @@ gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info) if (!info->connected) { /* connect */ - GST_ELEMENT_PROGRESS (src, CONTINUE, "connect", - ("Connecting to %s", info->location)); + if (async) + GST_ELEMENT_PROGRESS (src, CONTINUE, "connect", + ("Connecting to %s", info->location)); GST_DEBUG_OBJECT (src, "connecting (%s)...", info->location); if ((res = gst_rtsp_connection_connect (info->connection, @@ -3365,13 +3367,14 @@ gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info, } static GstRTSPResult -gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info) +gst_rtsp_conninfo_reconnect (GstRTSPSrc * src, GstRTSPConnInfo * info, + gboolean async) { GstRTSPResult res; GST_DEBUG_OBJECT (src, "reconnecting connection..."); gst_rtsp_conninfo_close (src, info, FALSE); - res = gst_rtsp_conninfo_connect (src, info); + res = gst_rtsp_conninfo_connect (src, info, async); return res; } @@ -3768,7 +3771,6 @@ invalid_length: static GstFlowReturn gst_rtspsrc_loop_udp (GstRTSPSrc * src) { - gboolean restart = FALSE; GstRTSPResult res; GstRTSPMessage message = { 0 }; gint retry = 0; @@ -3809,7 +3811,6 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) break; case GST_RTSP_EINTR: /* we got interrupted, see what we have to do */ - GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush"); goto interrupt; case GST_RTSP_ETIMEOUT: /* send keep-alive, ignore the result, a warning will be posted. */ @@ -3822,7 +3823,8 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) * see what happens. */ GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), ("The server closed the connection.")); - if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) < 0) + if ((res = + gst_rtsp_conninfo_reconnect (src, &src->conninfo, FALSE)) < 0) goto connect_error; continue; @@ -3868,17 +3870,79 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) } } -interrupt: /* we get here when the connection got interrupted */ - GST_OBJECT_LOCK (src); - gst_rtspsrc_connection_flush (src, FALSE); - GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd); - if (src->loop_cmd != CMD_RECONNECT) - goto stopping; +interrupt: + { + gst_rtsp_message_unset (&message); + GST_DEBUG_OBJECT (src, "got interrupted: stop connection flush"); + gst_rtspsrc_connection_flush (src, FALSE); + return GST_FLOW_WRONG_STATE; + } +connect_error: + { + gchar *str = gst_rtsp_strresult (res); + GstFlowReturn ret; + src->conninfo.connected = FALSE; + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + ("Could not connect to server. (%s)", str)); + g_free (str); + ret = GST_FLOW_ERROR; + } else { + ret = GST_FLOW_WRONG_STATE; + } + return ret; + } +receive_error: + { + gchar *str = gst_rtsp_strresult (res); + + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Could not receive message. (%s)", str)); + g_free (str); + return GST_FLOW_ERROR; + } +handle_request_failed: + { + gchar *str = gst_rtsp_strresult (res); + GstFlowReturn ret; + + gst_rtsp_message_unset (&message); + if (res != GST_RTSP_EINTR) { + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), + ("Could not handle server message. (%s)", str)); + g_free (str); + ret = GST_FLOW_ERROR; + } else { + ret = GST_FLOW_WRONG_STATE; + } + return ret; + } +server_eof: + { + GST_DEBUG_OBJECT (src, "we got an eof from the server"); + GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), + ("The server closed the connection.")); + src->conninfo.connected = FALSE; + gst_rtsp_message_unset (&message); + return GST_FLOW_UNEXPECTED; + } +} + +static GstRTSPResult +gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async) +{ + GstRTSPResult res = GST_RTSP_OK; + gboolean restart; + + GST_DEBUG_OBJECT (src, "doing reconnect"); + + GST_OBJECT_LOCK (src); /* only restart when the pads were not yet activated, else we were * streaming over UDP */ restart = src->need_activate; + src->flushing = FALSE; GST_OBJECT_UNLOCK (src); /* no need to restart, we're done */ @@ -3889,10 +3953,12 @@ interrupt: src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP; /* pause to prepare for a restart */ - gst_rtspsrc_pause (src, FALSE, FALSE); + if ((res = gst_rtspsrc_pause (src, FALSE, async)) < 0) + goto done; /* close and cleanup our state */ - gst_rtspsrc_close (src, FALSE); + if ((res = gst_rtspsrc_close (src, async)) < 0) + goto done; /* see if we have TCP left to try. Also don't try TCP when we were configured * with an SDP. */ @@ -3907,52 +3973,17 @@ interrupt: gst_guint64_to_gdouble (src->udp_timeout / 1000000.0))); /* open new connection using tcp */ - if (!gst_rtspsrc_open (src, FALSE)) + if (gst_rtspsrc_open (src, async) < 0) goto open_failed; /* start playback */ - if (!gst_rtspsrc_play (src, &src->segment, FALSE)) + if (gst_rtspsrc_play (src, &src->segment, async) < 0) goto play_failed; done: - return GST_FLOW_OK; + return res; /* ERRORS */ -stopping: - { - GST_DEBUG_OBJECT (src, "we are stopping"); - GST_OBJECT_UNLOCK (src); - return GST_FLOW_WRONG_STATE; - } -receive_error: - { - gchar *str = gst_rtsp_strresult (res); - - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not receive message. (%s)", str)); - g_free (str); - return GST_FLOW_ERROR; - } -handle_request_failed: - { - gchar *str = gst_rtsp_strresult (res); - - GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not handle server message. (%s)", str)); - g_free (str); - gst_rtsp_message_unset (&message); - return GST_FLOW_ERROR; - } -connect_error: - { - gchar *str = gst_rtsp_strresult (res); - - src->conninfo.connected = FALSE; - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - ("Could not connect to server. (%s)", str)); - g_free (str); - return GST_FLOW_ERROR; - } no_protocols: { src->cur_protocols = 0; @@ -3973,15 +4004,6 @@ play_failed: GST_DEBUG_OBJECT (src, "play failed"); return GST_FLOW_OK; } -server_eof: - { - GST_DEBUG_OBJECT (src, "we got an eof from the server"); - GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), - ("The server closed the connection.")); - src->conninfo.connected = FALSE; - gst_rtsp_message_unset (&message); - return GST_FLOW_UNEXPECTED; - } } static void @@ -4535,7 +4557,9 @@ receive_error: if (try == 0) { try++; /* if reconnect succeeds, try again */ - if ((res = gst_rtsp_conninfo_reconnect (src, &src->conninfo)) == 0) + if ((res = + gst_rtsp_conninfo_reconnect (src, &src->conninfo, + FALSE)) == 0) goto again; } /* only try once after reconnect, then fallthrough and error out */ @@ -5068,7 +5092,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async) } if (src->conninfo.connection == NULL) { - if (!gst_rtsp_conninfo_connect (src, &stream->conninfo)) { + if (!gst_rtsp_conninfo_connect (src, &stream->conninfo, async)) { GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream); continue; } @@ -5469,7 +5493,7 @@ gst_rtspsrc_open_from_sdp (GstRTSPSrc * src, GstSDPMessage * sdp, src->conninfo.location = g_strdup (control); /* make a connection for this, if there was a connection already, nothing * happens. */ - if (gst_rtsp_conninfo_connect (src, &src->conninfo) < 0) { + if (gst_rtsp_conninfo_connect (src, &src->conninfo, async) < 0) { GST_ERROR_OBJECT (src, "could not connect"); } } @@ -5527,7 +5551,7 @@ restart: goto no_url; src->tried_url_auth = FALSE; - if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo)) < 0) + if ((res = gst_rtsp_conninfo_connect (src, &src->conninfo, async)) < 0) goto connect_failed; /* create OPTIONS */ @@ -6381,6 +6405,11 @@ gst_rtspsrc_thread (GstRTSPSrc * src) case CMD_LOOP: running = gst_rtspsrc_loop (src); break; + case CMD_RECONNECT: + ret = gst_rtspsrc_reconnect (src, FALSE); + if (ret == GST_RTSP_OK) + running = TRUE; + break; default: break; } From 6fe680934ae4f33c46f60478cb7023cea4c6b240 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 10 Jan 2011 11:45:03 +0100 Subject: [PATCH 14/19] rtspsrc: improve async handling Simplify the command handling, only continue looping when we have not received another command or when the previous loop was successfull. Avoid looping on a disconnected socket. --- gst/rtsp/gstrtspsrc.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index e838726c51..c8fb921370 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -3541,7 +3541,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) tv_timeout.tv_sec, tv_timeout.tv_usec); GST_OBJECT_LOCK (src); - if (src->loop_cmd == CMD_LOOP && !src->flushing) { + if (!src->flushing) { src->waiting = TRUE; GST_OBJECT_UNLOCK (src); @@ -3787,7 +3787,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) gst_rtsp_message_unset (&message); GST_OBJECT_LOCK (src); - if (src->loop_cmd == CMD_LOOP && !src->flushing) { + if (!src->flushing) { src->waiting = TRUE; GST_OBJECT_UNLOCK (src); @@ -4138,6 +4138,9 @@ gst_rtspsrc_loop (GstRTSPSrc * src) { GstFlowReturn ret; + if (!src->conninfo.connection || !src->conninfo.connected) + goto no_connection; + if (src->interleaved) ret = gst_rtspsrc_loop_interleaved (src); else @@ -4149,6 +4152,12 @@ gst_rtspsrc_loop (GstRTSPSrc * src) return TRUE; /* ERRORS */ +no_connection: + { + GST_WARNING_OBJECT (src, "we are not connected"); + ret = GST_FLOW_WRONG_STATE; + goto pause; + } pause: { const gchar *reason = gst_flow_get_name (ret); @@ -6377,8 +6386,7 @@ gst_rtspsrc_thread (GstRTSPSrc * src) GST_OBJECT_LOCK (src); cmd = src->loop_cmd; - if (cmd != CMD_LOOP || src->flushing) - src->loop_cmd = CMD_WAIT; + src->loop_cmd = CMD_WAIT; GST_DEBUG_OBJECT (src, "got command %d", cmd); GST_OBJECT_UNLOCK (src); From ddfcd8bbfdf58884674397b7c9015c31c6c9ab56 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 10 Jan 2011 12:46:37 +0100 Subject: [PATCH 15/19] rtspsrc: open on play and pause when not done yet With the async state changes, it is possible that we need to open the stream before play and pause. Also make sure we remember a previous open failure so that we don't keep trying again. --- gst/rtsp/gstrtspsrc.c | 47 +++++++++++++++++++++++++++++++++++++++---- gst/rtsp/gstrtspsrc.h | 1 + 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index c8fb921370..832656a054 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -3952,10 +3952,6 @@ gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async) /* we can try only TCP now */ src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP; - /* pause to prepare for a restart */ - if ((res = gst_rtspsrc_pause (src, FALSE, async)) < 0) - goto done; - /* close and cleanup our state */ if ((res = gst_rtspsrc_close (src, async)) < 0) goto done; @@ -5744,11 +5740,13 @@ done: no_sdp: { GST_WARNING_OBJECT (src, "can't get sdp"); + src->open_error = TRUE; goto done; } open_failed: { GST_WARNING_OBJECT (src, "can't setup streaming from sdp"); + src->open_error = TRUE; goto done; } } @@ -5986,6 +5984,30 @@ clear_rtp_base (GstRTSPSrc * src, GstRTSPStream * stream) } } +static GstRTSPResult +gst_rtspsrc_ensure_open (GstRTSPSrc * src, gboolean async) +{ + GstRTSPResult res = GST_RTSP_OK; + + if (src->state < GST_RTSP_STATE_READY) { + res = GST_RTSP_ERROR; + if (src->open_error) { + GST_DEBUG_OBJECT (src, "the stream was in error"); + goto done; + } + if (async) + gst_rtspsrc_loop_start_cmd (src, CMD_OPEN); + + if ((res = gst_rtspsrc_open (src, async)) < 0) { + GST_DEBUG_OBJECT (src, "failed to open stream"); + goto done; + } + } + +done: + return res; +} + static GstRTSPResult gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) { @@ -5999,6 +6021,9 @@ gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async) GST_DEBUG_OBJECT (src, "PLAY..."); + if ((res = gst_rtspsrc_ensure_open (src, async)) < 0) + goto open_failed; + if (!(src->methods & GST_RTSP_PLAY)) goto not_supported; @@ -6149,6 +6174,11 @@ done: return res; /* ERRORS */ +open_failed: + { + GST_DEBUG_OBJECT (src, "failed to open stream"); + goto done; + } not_supported: { GST_DEBUG_OBJECT (src, "PLAY is not supported"); @@ -6195,6 +6225,9 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async) GST_DEBUG_OBJECT (src, "PAUSE..."); + if ((res = gst_rtspsrc_ensure_open (src, async)) < 0) + goto open_failed; + if (!(src->methods & GST_RTSP_PAUSE)) goto not_supported; @@ -6261,6 +6294,11 @@ done: return res; /* ERRORS */ +open_failed: + { + GST_DEBUG_OBJECT (src, "failed to open stream"); + goto done; + } not_supported: { GST_DEBUG_OBJECT (src, "PAUSE is not supported"); @@ -6395,6 +6433,7 @@ gst_rtspsrc_thread (GstRTSPSrc * src) src->cur_protocols = src->protocols; /* first attempt, don't ignore timeouts */ src->ignore_timeout = FALSE; + src->open_error = FALSE; ret = gst_rtspsrc_open (src, TRUE); break; case CMD_PLAY: diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index b5744248cc..f69a3edd27 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -182,6 +182,7 @@ struct _GstRTSPSrc { gboolean ignore_timeout; gboolean flushing; gboolean waiting; + gboolean open_error; /* mutex for protecting state changes */ GStaticRecMutex *state_rec_lock; From e6798ad54ccf6100c9eb37106526005291a1f340 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 6 Apr 2011 14:53:27 +0200 Subject: [PATCH 16/19] rtspsrc: tweak post-seek loop handling --- gst/rtsp/gstrtspsrc.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 832656a054..f5bbcf898c 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -1827,9 +1827,6 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event) * with the above flush/pause code */ GST_RTSP_STREAM_LOCK (src); - /* stop flushing state */ - gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, FALSE); - GST_DEBUG_OBJECT (src, "stopped streaming"); /* copy segment, we need this because we still need the old @@ -1870,6 +1867,9 @@ gst_rtspsrc_perform_seek (GstRTSPSrc * src, GstEvent * event) GST_DEBUG_OBJECT (src, "stopping flush"); gst_rtspsrc_flush (src, FALSE); } else if (src->running) { + /* re-engage loop */ + gst_rtspsrc_loop_send_cmd (src, CMD_LOOP, FALSE); + /* we are running the current segment and doing a non-flushing seek, * close the segment first based on the previous last_stop. */ GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT From f7ddf811d76285229f6e0a4a5ce8cce83e1729a9 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 6 Apr 2011 15:49:01 +0200 Subject: [PATCH 17/19] rtspsrc: fix and improve async handling Simplify the command handling; passing a command to thread means we really want it to get the message, which means to always flush provided the command can handle being interrupted. Command thread indicates whether command allows interruption and ensure non-flushing connection as it subsequently needs it. In particular, this also makes the TEARDOWN sequence interruptable and also prevents races where _loop_ could miss a command and would continue receiving (or at least trying to). See #632504. --- gst/rtsp/gstrtspsrc.c | 67 +++++++++++++++---------------------------- gst/rtsp/gstrtspsrc.h | 1 - 2 files changed, 23 insertions(+), 45 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index f5bbcf898c..82743147fa 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -3540,23 +3540,11 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) GST_DEBUG_OBJECT (src, "doing receive with timeout %ld seconds, %ld usec", tv_timeout.tv_sec, tv_timeout.tv_usec); - GST_OBJECT_LOCK (src); - if (!src->flushing) { - src->waiting = TRUE; - GST_OBJECT_UNLOCK (src); - - /* protect the connection with the connection lock so that we can see when - * we are finished doing server communication */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, - &message, src->ptcp_timeout); - - GST_OBJECT_LOCK (src); - src->waiting = FALSE; - } else { - res = GST_RTSP_EINTR; - } - GST_OBJECT_UNLOCK (src); + /* protect the connection with the connection lock so that we can see when + * we are finished doing server communication */ + res = + gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, src->ptcp_timeout); switch (res) { case GST_RTSP_OK: @@ -3786,24 +3774,11 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) gst_rtsp_message_unset (&message); - GST_OBJECT_LOCK (src); - if (!src->flushing) { - src->waiting = TRUE; - GST_OBJECT_UNLOCK (src); - - /* we should continue reading the TCP socket because the server might - * send us requests. When the session timeout expires, we need to send a - * keep-alive request to keep the session open. */ - res = - gst_rtspsrc_connection_receive (src, src->conninfo.connection, - &message, &tv_timeout); - - GST_OBJECT_LOCK (src); - src->waiting = FALSE; - } else { - res = GST_RTSP_EINTR; - } - GST_OBJECT_UNLOCK (src); + /* we should continue reading the TCP socket because the server might + * send us requests. When the session timeout expires, we need to send a + * keep-alive request to keep the session open. */ + res = gst_rtspsrc_connection_receive (src, src->conninfo.connection, + &message, &tv_timeout); switch (res) { case GST_RTSP_OK: @@ -3942,7 +3917,6 @@ gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async) /* only restart when the pads were not yet activated, else we were * streaming over UDP */ restart = src->need_activate; - src->flushing = FALSE; GST_OBJECT_UNLOCK (src); /* no need to restart, we're done */ @@ -4102,6 +4076,8 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) { gint old; + /* FIXME flush param mute; remove at discretion */ + /* start new request */ gst_rtspsrc_loop_start_cmd (src, cmd); @@ -4115,14 +4091,10 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) GST_OBJECT_LOCK (src); } src->loop_cmd = cmd; - src->flushing = flush; - if (flush) { - if (src->waiting) { - GST_DEBUG_OBJECT (src, "start connection flush"); - gst_rtspsrc_connection_flush (src, TRUE); - } - } else { - GST_DEBUG_OBJECT (src, "stop connection flush"); + /* interrupt if allowed */ + if (src->waiting) { + GST_DEBUG_OBJECT (src, "start connection flush"); + gst_rtspsrc_connection_flush (src, TRUE); } if (src->task) gst_task_start (src->task); @@ -6426,6 +6398,13 @@ gst_rtspsrc_thread (GstRTSPSrc * src) cmd = src->loop_cmd; src->loop_cmd = CMD_WAIT; GST_DEBUG_OBJECT (src, "got command %d", cmd); + + /* we got the message command, so ensure communication is possible again */ + gst_rtspsrc_connection_flush (src, FALSE); + + /* we allow these to be interrupted */ + if (cmd == CMD_LOOP || cmd == CMD_CLOSE) + src->waiting = TRUE; GST_OBJECT_UNLOCK (src); switch (cmd) { diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index f69a3edd27..1d6f84950b 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -180,7 +180,6 @@ struct _GstRTSPSrc { /* UDP mode loop */ gint loop_cmd; gboolean ignore_timeout; - gboolean flushing; gboolean waiting; gboolean open_error; From 283e4e4afdf7c5a6b2a93ee95d6b3de0aebf1120 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 6 Apr 2011 15:51:49 +0200 Subject: [PATCH 18/19] rtspsrc: ensure proper closing and cleanup ... since the TEARDOWN sequence might not have had a chance to even start, but at least connections should be closed (synchronously) and state cleaned up. See #632504. --- gst/rtsp/gstrtspsrc.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 82743147fa..46996c9087 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -251,7 +251,8 @@ static GstRTSPResult gst_rtspsrc_play (GstRTSPSrc * src, GstSegment * segment, gboolean async); static GstRTSPResult gst_rtspsrc_pause (GstRTSPSrc * src, gboolean idle, gboolean async); -static GstRTSPResult gst_rtspsrc_close (GstRTSPSrc * src, gboolean async); +static GstRTSPResult gst_rtspsrc_close (GstRTSPSrc * src, gboolean async, + gboolean only_close); static gboolean gst_rtspsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri); @@ -3927,7 +3928,7 @@ gst_rtspsrc_reconnect (GstRTSPSrc * src, gboolean async) src->cur_protocols = GST_RTSP_LOWER_TRANS_TCP; /* close and cleanup our state */ - if ((res = gst_rtspsrc_close (src, async)) < 0) + if ((res = gst_rtspsrc_close (src, async, FALSE)) < 0) goto done; /* see if we have TCP left to try. Also don't try TCP when we were configured @@ -5724,7 +5725,7 @@ open_failed: } static GstRTSPResult -gst_rtspsrc_close (GstRTSPSrc * src, gboolean async) +gst_rtspsrc_close (GstRTSPSrc * src, gboolean async, gboolean only_close) { GstRTSPMessage request = { 0 }; GstRTSPMessage response = { 0 }; @@ -5739,6 +5740,9 @@ gst_rtspsrc_close (GstRTSPSrc * src, gboolean async) goto close; } + if (only_close) + goto close; + /* construct a control url */ if (src->control) control = src->control; @@ -6426,7 +6430,7 @@ gst_rtspsrc_thread (GstRTSPSrc * src) running = TRUE; break; case CMD_CLOSE: - ret = gst_rtspsrc_close (src, TRUE); + ret = gst_rtspsrc_close (src, TRUE, FALSE); break; case CMD_LOOP: running = gst_rtspsrc_loop (src); @@ -6510,6 +6514,9 @@ gst_rtspsrc_stop (GstRTSPSrc * src) } GST_OBJECT_UNLOCK (src); + /* ensure synchronously all is closed and clean */ + gst_rtspsrc_close (src, FALSE, TRUE); + return TRUE; } From dc2ddea91befb4bc38116f4e08e459f99e0b3329 Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 6 Apr 2011 16:05:55 +0200 Subject: [PATCH 19/19] rtspsrc: also allow PAUSE to be interrupted ... as it is on the way out to NULL. See #632504. --- gst/rtsp/gstrtspsrc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index 46996c9087..86ba69bf54 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -6407,7 +6407,7 @@ gst_rtspsrc_thread (GstRTSPSrc * src) gst_rtspsrc_connection_flush (src, FALSE); /* we allow these to be interrupted */ - if (cmd == CMD_LOOP || cmd == CMD_CLOSE) + if (cmd == CMD_LOOP || cmd == CMD_CLOSE || cmd == CMD_PAUSE) src->waiting = TRUE; GST_OBJECT_UNLOCK (src);