From 1f9c1aa489dd0168261a35a8384533009fa1d1c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 30 Jan 2020 15:56:36 +0200 Subject: [PATCH] sctpdec: Use a flow combiner for the source pad flow returns and propagate errors upstream Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/1180 --- ext/sctp/gstsctpdec.c | 66 ++++++++++++++++++++++++++++++++++++++----- ext/sctp/gstsctpdec.h | 3 ++ 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/ext/sctp/gstsctpdec.c b/ext/sctp/gstsctpdec.c index 48a0175874..51a1638214 100644 --- a/ext/sctp/gstsctpdec.c +++ b/ext/sctp/gstsctpdec.c @@ -143,6 +143,7 @@ static void gst_sctp_dec_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); +static void gst_sctp_dec_finalize (GObject * object); static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element, GstStateChange transition); static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, @@ -160,7 +161,7 @@ static void stop_srcpad_task (GstPad * pad); static void stop_all_srcpad_tasks (GstSctpDec * self); static void sctpdec_cleanup (GstSctpDec * self); static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id); -static void remove_pad (GstElement * element, GstPad * pad); +static void remove_pad (GstSctpDec * self, GstPad * pad); static void on_reset_stream (GstSctpDec * self, guint stream_id); static void @@ -182,6 +183,7 @@ gst_sctp_dec_class_init (GstSctpDecClass * klass) gobject_class->set_property = gst_sctp_dec_set_property; gobject_class->get_property = gst_sctp_dec_get_property; + gobject_class->finalize = gst_sctp_dec_finalize; element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state); @@ -223,6 +225,8 @@ gst_sctp_dec_init (GstSctpDec * self) self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID; self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT; + self->flow_combiner = gst_flow_combiner_new (); + self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink"); gst_pad_set_chain_function (self->sink_pad, GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain)); @@ -270,6 +274,17 @@ gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value, } } +static void +gst_sctp_dec_finalize (GObject * object) +{ + GstSctpDec *self = GST_SCTP_DEC (object); + + gst_flow_combiner_free (self->flow_combiner); + self->flow_combiner = NULL; + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element, GstStateChange transition) { @@ -278,11 +293,13 @@ gst_sctp_dec_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_flow_combiner_reset (self->flow_combiner); if (!configure_association (self)) ret = GST_STATE_CHANGE_FAILURE; break; case GST_STATE_CHANGE_PAUSED_TO_READY: sctpdec_cleanup (self); + gst_flow_combiner_reset (self->flow_combiner); break; default: break; @@ -297,6 +314,7 @@ gst_sctp_dec_change_state (GstElement * element, GstStateChange transition) static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf) { + GstFlowReturn flow_ret; GstMapInfo map; if (!gst_buffer_map (buf, &map, GST_MAP_READ)) { @@ -310,7 +328,16 @@ gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf) gst_buffer_unmap (buf, &map); gst_buffer_unref (buf); - return GST_FLOW_OK; + GST_OBJECT_LOCK (self); + /* This gets the last combined flow return from all source pads */ + flow_ret = gst_flow_combiner_update_flow (self->flow_combiner, GST_FLOW_OK); + GST_OBJECT_UNLOCK (self); + + if (flow_ret != GST_FLOW_OK) { + GST_DEBUG_OBJECT (self, "Returning %s", gst_flow_get_name (flow_ret)); + } + + return flow_ret; } static void @@ -373,14 +400,25 @@ gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event) static void gst_sctp_data_srcpad_loop (GstPad * pad) { + GstSctpDec *self; GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad); GstDataQueueItem *item; + self = GST_SCTP_DEC (gst_pad_get_parent (pad)); + if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) { + GstBuffer *buffer; GstFlowReturn flow_ret; - flow_ret = gst_pad_push (pad, GST_BUFFER (item->object)); + buffer = GST_BUFFER (item->object); + + flow_ret = gst_pad_push (pad, buffer); item->object = NULL; + + GST_OBJECT_LOCK (self); + gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, flow_ret); + GST_OBJECT_UNLOCK (self); + if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING || flow_ret == GST_FLOW_NOT_LINKED)) { GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s", @@ -399,9 +437,16 @@ gst_sctp_data_srcpad_loop (GstPad * pad) item->destroy (item); } else { + GST_OBJECT_LOCK (self); + gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, + GST_FLOW_FLUSHING); + GST_OBJECT_UNLOCK (self); + GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing"); gst_pad_pause_task (pad); } + + gst_object_unref (self); } static gboolean @@ -523,6 +568,10 @@ get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id) if (!gst_element_add_pad (GST_ELEMENT (self), new_pad)) goto error_cleanup; + GST_OBJECT_LOCK (self); + gst_flow_combiner_add_pad (self->flow_combiner, new_pad); + GST_OBJECT_UNLOCK (self); + gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, new_pad, NULL); @@ -536,11 +585,14 @@ error_cleanup: } static void -remove_pad (GstElement * element, GstPad * pad) +remove_pad (GstSctpDec * self, GstPad * pad) { stop_srcpad_task (pad); gst_pad_set_active (pad, FALSE); - gst_element_remove_pad (element, pad); + gst_element_remove_pad (GST_ELEMENT (self), pad); + GST_OBJECT_LOCK (self); + gst_flow_combiner_remove_pad (self->flow_combiner, pad); + GST_OBJECT_UNLOCK (self); } static void @@ -557,7 +609,7 @@ on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association, GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad"); return; } - remove_pad (GST_ELEMENT (self), srcpad); + remove_pad (self, srcpad); gst_object_unref (srcpad); } @@ -615,7 +667,7 @@ remove_pad_it (const GValue * item, gpointer user_data) GstPad *pad = g_value_get_object (item); GstSctpDec *self = user_data; - remove_pad (GST_ELEMENT (self), pad); + remove_pad (self, pad); } static void diff --git a/ext/sctp/gstsctpdec.h b/ext/sctp/gstsctpdec.h index 845fac4d41..6a5591f551 100644 --- a/ext/sctp/gstsctpdec.h +++ b/ext/sctp/gstsctpdec.h @@ -27,6 +27,7 @@ #define __GST_SCTP_DEC_H__ #include +#include #include "sctpassociation.h" @@ -44,6 +45,8 @@ struct _GstSctpDec { GstElement element; + GstFlowCombiner *flow_combiner; + GstPad *sink_pad; guint sctp_association_id; guint local_sctp_port;