diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 830ee1df58..6ed3f73c76 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -145,6 +145,7 @@ struct _GstRtpSsrcDemuxPad }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found + * MUST be called with object lock */ static GstRtpSsrcDemuxPad * find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) @@ -160,6 +161,38 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) return NULL; } +/* returns a reference to the pad if found, %NULL otherwise */ +static GstPad * +get_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype) +{ + GstRtpSsrcDemuxPad *demuxpad; + GstPad *retpad; + + GST_OBJECT_LOCK (demux); + + demuxpad = find_demux_pad_for_ssrc (demux, ssrc); + if (!demuxpad) { + GST_OBJECT_UNLOCK (demux); + return NULL; + } + + switch (padtype) { + case RTP_PAD: + retpad = gst_object_ref (demuxpad->rtp_pad); + break; + case RTCP_PAD: + retpad = gst_object_ref (demuxpad->rtcp_pad); + break; + default: + retpad = NULL; + g_assert_not_reached (); + } + + GST_OBJECT_UNLOCK (demux); + + return retpad; +} + static GstEvent * add_ssrc_and_ref (GstEvent * event, guint32 ssrc) { @@ -229,6 +262,7 @@ forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad, gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata); } +/* MUST only be called from streaming thread */ static GstPad * find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype) @@ -242,22 +276,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, INTERNAL_STREAM_LOCK (demux); - demuxpad = find_demux_pad_for_ssrc (demux, ssrc); - if (demuxpad != NULL) { - switch (padtype) { - case RTP_PAD: - retpad = gst_object_ref (demuxpad->rtp_pad); - break; - case RTCP_PAD: - retpad = gst_object_ref (demuxpad->rtcp_pad); - break; - default: - retpad = NULL; - g_assert_not_reached (); - } - + retpad = get_demux_pad_for_ssrc (demux, ssrc, padtype); + if (retpad != NULL) { INTERNAL_STREAM_UNLOCK (demux); - return retpad; } @@ -283,7 +304,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_pad_set_element_private (rtp_pad, demuxpad); gst_pad_set_element_private (rtcp_pad, demuxpad); + GST_OBJECT_LOCK (demux); demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + GST_OBJECT_UNLOCK (demux); gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query); gst_pad_set_iterate_internal_links_function (rtp_pad, @@ -316,17 +339,11 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, g_assert_not_reached (); } - gst_object_ref (rtp_pad); - gst_object_ref (rtcp_pad); - g_signal_emit (G_OBJECT (demux), gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); INTERNAL_STREAM_UNLOCK (demux); - gst_object_unref (rtp_pad); - gst_object_unref (rtcp_pad); - return retpad; } @@ -486,17 +503,17 @@ gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { GstRtpSsrcDemuxPad *dpad; - INTERNAL_STREAM_LOCK (demux); + GST_OBJECT_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { - INTERNAL_STREAM_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); goto unknown_pad; } GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc); demux->srcpads = g_slist_remove (demux->srcpads, dpad); - INTERNAL_STREAM_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); gst_pad_set_active (dpad->rtp_pad, FALSE); gst_pad_set_active (dpad->rtcp_pad, FALSE); @@ -535,7 +552,7 @@ forward_event (GstPad * pad, gpointer user_data) GSList *walk = NULL; GstEvent *newevent = NULL; - INTERNAL_STREAM_LOCK (fdata->demux); + GST_OBJECT_LOCK (fdata->demux); for (walk = fdata->demux->srcpads; walk; walk = walk->next) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; @@ -544,7 +561,7 @@ forward_event (GstPad * pad, gpointer user_data) break; } } - INTERNAL_STREAM_UNLOCK (fdata->demux); + GST_OBJECT_UNLOCK (fdata->demux); if (newevent) fdata->res &= gst_pad_push_event (pad, newevent); @@ -582,7 +599,6 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) guint32 ssrc; GstRTPBuffer rtp = { NULL }; GstPad *srcpad; - GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (parent); @@ -602,14 +618,17 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) ret = gst_pad_push (srcpad, buf); if (ret != GST_FLOW_OK) { + GstPad *active_pad; + /* check if the ssrc still there, may have been removed */ - INTERNAL_STREAM_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL || dpad->rtp_pad != srcpad) { + active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTP_PAD); + + if (active_pad == NULL || active_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - INTERNAL_STREAM_UNLOCK (demux); + + g_clear_object (&active_pad); } gst_object_unref (srcpad); @@ -644,7 +663,6 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, GstRTCPPacket packet; GstRTCPBuffer rtcp = { NULL, }; GstPad *srcpad; - GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (parent); @@ -689,14 +707,16 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, ret = gst_pad_push (srcpad, buf); if (ret != GST_FLOW_OK) { + GstPad *active_pad; + /* check if the ssrc still there, may have been removed */ - INTERNAL_STREAM_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL || dpad->rtcp_pad != srcpad) { + active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD); + if (active_pad == NULL || active_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - INTERNAL_STREAM_UNLOCK (demux); + + g_clear_object (&active_pad); } gst_object_unref (srcpad); @@ -786,7 +806,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) demux = GST_RTP_SSRC_DEMUX (parent); - INTERNAL_STREAM_LOCK (demux); + GST_OBJECT_LOCK (demux); for (current = demux->srcpads; current; current = g_slist_next (current)) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data; @@ -807,7 +827,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) g_value_unset (&val); } - INTERNAL_STREAM_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); return it; } diff --git a/tests/check/elements/rtpssrcdemux.c b/tests/check/elements/rtpssrcdemux.c index a8b093b1cf..179d68e4fb 100644 --- a/tests/check/elements/rtpssrcdemux.c +++ b/tests/check/elements/rtpssrcdemux.c @@ -174,6 +174,72 @@ GST_START_TEST (test_event_forwarding) GST_END_TEST; +typedef struct +{ + gint ready; + GMutex mutex; + GCond cond; +} LockTestContext; + +static void +new_ssrc_pad_cb (GstElement * element, guint ssrc, GstPad * pad, + LockTestContext * ctx) +{ + g_message ("Signalling ready"); + g_atomic_int_set (&ctx->ready, 1); + + g_message ("Waiting no more ready"); + while (g_atomic_int_get (&ctx->ready)) + g_usleep (G_USEC_PER_SEC / 100); + + g_mutex_lock (&ctx->mutex); + g_mutex_unlock (&ctx->mutex); +} + +static gpointer +push_buffer_func (gpointer user_data) +{ + GstHarness *h = user_data; + gst_harness_push (h, create_buffer (0, 0xdeadbeef)); + return NULL; +} + +GST_START_TEST (test_oob_event_locking) +{ + GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL); + LockTestContext ctx = { FALSE, }; + GThread *thread; + + g_mutex_init (&ctx.mutex); + g_cond_init (&ctx.cond); + + gst_harness_set_src_caps_str (h, "application/x-rtp"); + g_signal_connect (h->element, + "new-ssrc-pad", G_CALLBACK (new_ssrc_pad_cb), &ctx); + + thread = g_thread_new ("streaming-thread", push_buffer_func, h); + + g_mutex_lock (&ctx.mutex); + + g_message ("Waiting for ready"); + while (!g_atomic_int_get (&ctx.ready)) + g_usleep (G_USEC_PER_SEC / 100); + g_message ("Signal no more ready"); + g_atomic_int_set (&ctx.ready, 0); + + gst_harness_push_event (h, + gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, NULL)); + + g_mutex_unlock (&ctx.mutex); + + g_thread_join (thread); + g_mutex_clear (&ctx.mutex); + g_cond_clear (&ctx.cond); + gst_harness_teardown (h); +} + +GST_END_TEST; + static Suite * rtpssrcdemux_suite (void) { @@ -182,6 +248,7 @@ rtpssrcdemux_suite (void) suite_add_tcase (s, tc_chain); tcase_add_test (tc_chain, test_event_forwarding); + tcase_add_test (tc_chain, test_oob_event_locking); return s; }