diff --git a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c index 7979b6d7a9..1c646d2b36 100644 --- a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c +++ b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.c @@ -509,8 +509,6 @@ static GstFlowReturn gst_rtspsrc_push_backchannel_buffer (GstRTSPSrc * src, static GstFlowReturn gst_rtspsrc_push_backchannel_sample (GstRTSPSrc * src, guint id, GstSample * sample); -static void gst_rtspsrc_reset_flows (GstRTSPSrc * src); - static GstCaps *signal_get_srtcp_params (GstRTSPSrc * src, GstRTSPStream * stream); @@ -1811,6 +1809,9 @@ gst_rtspsrc_init (GstRTSPSrc * src) g_mutex_init (&src->group_lock); + src->flow_combiner = gst_flow_combiner_new (); + g_mutex_init (&src->flow_combiner_lock); + GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE); gst_bin_set_suppressed_flags (GST_BIN (src), GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK); @@ -1866,6 +1867,9 @@ gst_rtspsrc_finalize (GObject * object) rtspsrc->prop_extra_http_request_headers = NULL; } + gst_flow_combiner_unref (rtspsrc->flow_combiner); + g_mutex_clear (&rtspsrc->flow_combiner_lock); + /* free locks */ g_rec_mutex_clear (&rtspsrc->stream_rec_lock); g_rec_mutex_clear (&rtspsrc->state_rec_lock); @@ -2367,19 +2371,6 @@ find_stream_by_channel (GstRTSPStream * stream, gint * channel) return -1; } -static gint -find_stream_by_udpsrc (GstRTSPStream * stream, gconstpointer a) -{ - GstElement *src = (GstElement *) a; - - if (stream->udpsrc[0] == src) - return 0; - if (stream->udpsrc[1] == src) - return 0; - - return -1; -} - static gint find_stream_by_setup (GstRTSPStream * stream, gconstpointer a) { @@ -2805,8 +2796,12 @@ gst_rtspsrc_stream_free (GstRTSPSrc * src, GstRTSPStream * stream) } if (stream->srcpad) { gst_pad_set_active (stream->srcpad, FALSE); - if (stream->added) + if (stream->added) { + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_remove_pad (src->flow_combiner, stream->srcpad); + g_mutex_unlock (&src->flow_combiner_lock); gst_element_remove_pad (GST_ELEMENT_CAST (src), stream->srcpad); + } } if (stream->srtpenc) gst_object_unref (stream->srtpenc); @@ -2844,6 +2839,9 @@ gst_rtspsrc_cleanup (GstRTSPSrc * src) } g_list_free (src->streams); src->streams = NULL; + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_reset (src->flow_combiner); + g_mutex_unlock (&src->flow_combiner_lock); if (src->manager) { if (src->manager_sig_id) { g_signal_handler_disconnect (src->manager, src->manager_sig_id); @@ -3110,7 +3108,9 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing) state = GST_STATE_PAUSED; } gst_rtspsrc_push_event (src, event); - gst_rtspsrc_reset_flows (src); + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_reset (src->flow_combiner); + g_mutex_unlock (&src->flow_combiner_lock); gst_rtspsrc_loop_send_cmd (src, cmd, CMD_LOOP); gst_rtspsrc_set_state (src, state); } @@ -3465,6 +3465,44 @@ gst_rtspsrc_handle_src_sink_event (GstPad * pad, GstObject * parent, return gst_pad_push_event (stream->srcpad, event); } +static GstFlowReturn +gst_rtspsrc_handle_src_sink_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstFlowReturn ret; + GstRTSPStream *stream; + + stream = gst_pad_get_element_private (pad); + + ret = gst_pad_push (stream->srcpad, buffer); + g_mutex_lock (&stream->parent->flow_combiner_lock); + ret = + gst_flow_combiner_update_pad_flow (stream->parent->flow_combiner, + stream->srcpad, ret); + g_mutex_unlock (&stream->parent->flow_combiner_lock); + + return ret; +} + +static GstFlowReturn +gst_rtspsrc_handle_src_sink_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * list) +{ + GstFlowReturn ret; + GstRTSPStream *stream; + + stream = gst_pad_get_element_private (pad); + + ret = gst_pad_push_list (stream->srcpad, list); + g_mutex_lock (&stream->parent->flow_combiner_lock); + ret = + gst_flow_combiner_update_pad_flow (stream->parent->flow_combiner, + stream->srcpad, ret); + g_mutex_unlock (&stream->parent->flow_combiner_lock); + + return ret; +} + /* this is the final event function we receive on the internal source pad when * we deal with TCP connections */ static gboolean @@ -3889,6 +3927,9 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src) GST_PAD (gst_proxy_pad_get_internal (GST_PROXY_PAD (stream->srcpad))); gst_pad_set_element_private (internal_src, stream); gst_pad_set_event_function (internal_src, gst_rtspsrc_handle_src_sink_event); + gst_pad_set_chain_function (internal_src, gst_rtspsrc_handle_src_sink_chain); + gst_pad_set_chain_list_function (internal_src, + gst_rtspsrc_handle_src_sink_chain_list); gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event); gst_pad_set_query_function (stream->srcpad, gst_rtspsrc_handle_src_query); @@ -3902,10 +3943,14 @@ new_manager_pad (GstElement * manager, GstPad * pad, GstRTSPSrc * src) gst_object_unref (internal_src); /* don't add the srcpad if this is a sendonly stream */ - if (stream->is_backchannel) + if (stream->is_backchannel) { add_backchannel_fakesink (src, stream, stream->srcpad); - else + } else { + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_add_pad (src->flow_combiner, stream->srcpad); + g_mutex_unlock (&src->flow_combiner_lock); gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad); + } if (all_added) { GST_DEBUG_OBJECT (src, "We added all streams"); @@ -5373,6 +5418,10 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream, gst_pad_set_element_private (internal_src, stream); gst_pad_set_event_function (internal_src, gst_rtspsrc_handle_src_sink_event); + gst_pad_set_chain_function (internal_src, + gst_rtspsrc_handle_src_sink_chain); + gst_pad_set_chain_list_function (internal_src, + gst_rtspsrc_handle_src_sink_chain_list); gst_object_unref (internal_src); gst_pad_set_event_function (stream->srcpad, gst_rtspsrc_handle_src_event); @@ -5470,10 +5519,14 @@ gst_rtspsrc_activate_streams (GstRTSPSrc * src) /* add the pad */ if (!stream->added) { GST_DEBUG_OBJECT (src, "adding stream pad %p", stream); - if (stream->is_backchannel) + if (stream->is_backchannel) { add_backchannel_fakesink (src, stream, stream->srcpad); - else + } else { + g_mutex_lock (&src->flow_combiner_lock); + gst_flow_combiner_add_pad (src->flow_combiner, stream->srcpad); + g_mutex_unlock (&src->flow_combiner_lock); gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad); + } stream->added = TRUE; } } @@ -5556,49 +5609,6 @@ gst_rtspsrc_configure_caps (GstRTSPSrc * src, GstSegment * segment, } } -static GstFlowReturn -gst_rtspsrc_combine_flows (GstRTSPSrc * src, GstRTSPStream * stream, - GstFlowReturn ret) -{ - GList *streams; - - /* store the value */ - stream->last_ret = ret; - - /* if it's success we can return the value right away */ - if (ret == GST_FLOW_OK) - goto done; - - /* any other error that is not-linked can be returned right - * away */ - if (ret != GST_FLOW_NOT_LINKED) - goto done; - - /* only return NOT_LINKED if all other pads returned NOT_LINKED */ - for (streams = src->streams; streams; streams = g_list_next (streams)) { - GstRTSPStream *ostream = (GstRTSPStream *) streams->data; - - ret = ostream->last_ret; - /* some other return value (must be SUCCESS but we can return - * other values as well) */ - if (ret != GST_FLOW_NOT_LINKED) - goto done; - } - /* if we get here, all other pads were unlinked and we return - * NOT_LINKED then */ -done: - return ret; -} - -static void -gst_rtspsrc_reset_flows (GstRTSPSrc * src) -{ - for (GList * streams = src->streams; streams; streams = g_list_next (streams)) { - GstRTSPStream *ostream = (GstRTSPStream *) streams->data; - ostream->last_ret = GST_FLOW_OK; - } -} - static gboolean gst_rtspsrc_stream_push_event (GstRTSPSrc * src, GstRTSPStream * stream, GstEvent * event) @@ -6237,10 +6247,6 @@ gst_rtspsrc_handle_data (GstRTSPSrc * src, GstRTSPMessage * message) else ret = gst_pad_push (outpad, buf); - if (!is_rtcp) { - /* combine all stream flows for the data transport */ - ret = gst_rtspsrc_combine_flows (src, stream, ret); - } return ret; /* ERRORS */ @@ -9986,42 +9992,6 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message) GST_BIN_CLASS (parent_class)->handle_message (bin, message); break; } - case GST_MESSAGE_ERROR: - { - GstObject *udpsrc; - GstRTSPStream *stream; - GstFlowReturn ret; - - udpsrc = GST_MESSAGE_SRC (message); - - GST_DEBUG_OBJECT (rtspsrc, "got error from %s", - GST_ELEMENT_NAME (udpsrc)); - - stream = find_stream (rtspsrc, udpsrc, (gpointer) find_stream_by_udpsrc); - if (!stream) - goto forward; - - /* we ignore the RTCP udpsrc */ - if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc)) - goto done; - - /* if we get error messages from the udp sources, that's not a problem as - * long as not all of them error out. We also don't really know what the - * problem is, the message does not give enough detail... */ - ret = gst_rtspsrc_combine_flows (rtspsrc, stream, GST_FLOW_NOT_LINKED); - GST_DEBUG_OBJECT (rtspsrc, "combined flows: %s", gst_flow_get_name (ret)); - if (ret != GST_FLOW_OK) - goto forward; - - done: - gst_message_unref (message); - break; - - forward: - /* fatal but not our message, forward */ - GST_BIN_CLASS (parent_class)->handle_message (bin, message); - break; - } default: { GST_BIN_CLASS (parent_class)->handle_message (bin, message); diff --git a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h index 93b53c58c7..cbe1a1e693 100644 --- a/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h +++ b/subprojects/gst-plugins-good/gst/rtsp/gstrtspsrc.h @@ -45,6 +45,7 @@ #define __GST_RTSPSRC_H__ #include +#include G_BEGIN_DECLS @@ -227,6 +228,8 @@ struct _GstRTSPSrc { GstSDPMessage *sdp; gboolean from_sdp; GList *streams; + GMutex flow_combiner_lock; + GstFlowCombiner *flow_combiner; GstStructure *props; gboolean need_activate;