sctpdec: Use a flow combiner for the source pad flow returns and propagate errors upstream
Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/issues/1180
This commit is contained in:
parent
8d522bf3e6
commit
1f9c1aa489
@ -143,6 +143,7 @@ static void gst_sctp_dec_set_property (GObject * object, guint prop_id,
|
|||||||
const GValue * value, GParamSpec * pspec);
|
const GValue * value, GParamSpec * pspec);
|
||||||
static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
|
static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
|
||||||
GValue * value, GParamSpec * pspec);
|
GValue * value, GParamSpec * pspec);
|
||||||
|
static void gst_sctp_dec_finalize (GObject * object);
|
||||||
static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
|
static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
|
||||||
GstStateChange transition);
|
GstStateChange transition);
|
||||||
static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
|
static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
|
||||||
@ -160,7 +161,7 @@ static void stop_srcpad_task (GstPad * pad);
|
|||||||
static void stop_all_srcpad_tasks (GstSctpDec * self);
|
static void stop_all_srcpad_tasks (GstSctpDec * self);
|
||||||
static void sctpdec_cleanup (GstSctpDec * self);
|
static void sctpdec_cleanup (GstSctpDec * self);
|
||||||
static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
|
static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
|
||||||
static void remove_pad (GstElement * element, GstPad * pad);
|
static void remove_pad (GstSctpDec * self, GstPad * pad);
|
||||||
static void on_reset_stream (GstSctpDec * self, guint stream_id);
|
static void on_reset_stream (GstSctpDec * self, guint stream_id);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -182,6 +183,7 @@ gst_sctp_dec_class_init (GstSctpDecClass * klass)
|
|||||||
|
|
||||||
gobject_class->set_property = gst_sctp_dec_set_property;
|
gobject_class->set_property = gst_sctp_dec_set_property;
|
||||||
gobject_class->get_property = gst_sctp_dec_get_property;
|
gobject_class->get_property = gst_sctp_dec_get_property;
|
||||||
|
gobject_class->finalize = gst_sctp_dec_finalize;
|
||||||
|
|
||||||
element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
|
element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
|
||||||
|
|
||||||
@ -223,6 +225,8 @@ gst_sctp_dec_init (GstSctpDec * self)
|
|||||||
self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
|
self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
|
||||||
self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
|
self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
|
||||||
|
|
||||||
|
self->flow_combiner = gst_flow_combiner_new ();
|
||||||
|
|
||||||
self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
|
self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
|
||||||
gst_pad_set_chain_function (self->sink_pad,
|
gst_pad_set_chain_function (self->sink_pad,
|
||||||
GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
|
GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
|
||||||
@ -270,6 +274,17 @@ gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
gst_sctp_dec_finalize (GObject * object)
|
||||||
|
{
|
||||||
|
GstSctpDec *self = GST_SCTP_DEC (object);
|
||||||
|
|
||||||
|
gst_flow_combiner_free (self->flow_combiner);
|
||||||
|
self->flow_combiner = NULL;
|
||||||
|
|
||||||
|
G_OBJECT_CLASS (parent_class)->finalize (object);
|
||||||
|
}
|
||||||
|
|
||||||
static GstStateChangeReturn
|
static GstStateChangeReturn
|
||||||
gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
|
gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
|
||||||
{
|
{
|
||||||
@ -278,11 +293,13 @@ gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
|
|||||||
|
|
||||||
switch (transition) {
|
switch (transition) {
|
||||||
case GST_STATE_CHANGE_READY_TO_PAUSED:
|
case GST_STATE_CHANGE_READY_TO_PAUSED:
|
||||||
|
gst_flow_combiner_reset (self->flow_combiner);
|
||||||
if (!configure_association (self))
|
if (!configure_association (self))
|
||||||
ret = GST_STATE_CHANGE_FAILURE;
|
ret = GST_STATE_CHANGE_FAILURE;
|
||||||
break;
|
break;
|
||||||
case GST_STATE_CHANGE_PAUSED_TO_READY:
|
case GST_STATE_CHANGE_PAUSED_TO_READY:
|
||||||
sctpdec_cleanup (self);
|
sctpdec_cleanup (self);
|
||||||
|
gst_flow_combiner_reset (self->flow_combiner);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
@ -297,6 +314,7 @@ gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
|
|||||||
static GstFlowReturn
|
static GstFlowReturn
|
||||||
gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
|
gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
|
||||||
{
|
{
|
||||||
|
GstFlowReturn flow_ret;
|
||||||
GstMapInfo map;
|
GstMapInfo map;
|
||||||
|
|
||||||
if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
|
if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
|
||||||
@ -310,7 +328,16 @@ gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
|
|||||||
gst_buffer_unmap (buf, &map);
|
gst_buffer_unmap (buf, &map);
|
||||||
gst_buffer_unref (buf);
|
gst_buffer_unref (buf);
|
||||||
|
|
||||||
return GST_FLOW_OK;
|
GST_OBJECT_LOCK (self);
|
||||||
|
/* This gets the last combined flow return from all source pads */
|
||||||
|
flow_ret = gst_flow_combiner_update_flow (self->flow_combiner, GST_FLOW_OK);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
|
if (flow_ret != GST_FLOW_OK) {
|
||||||
|
GST_DEBUG_OBJECT (self, "Returning %s", gst_flow_get_name (flow_ret));
|
||||||
|
}
|
||||||
|
|
||||||
|
return flow_ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -373,14 +400,25 @@ gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
|
|||||||
static void
|
static void
|
||||||
gst_sctp_data_srcpad_loop (GstPad * pad)
|
gst_sctp_data_srcpad_loop (GstPad * pad)
|
||||||
{
|
{
|
||||||
|
GstSctpDec *self;
|
||||||
GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
|
GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
|
||||||
GstDataQueueItem *item;
|
GstDataQueueItem *item;
|
||||||
|
|
||||||
|
self = GST_SCTP_DEC (gst_pad_get_parent (pad));
|
||||||
|
|
||||||
if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
|
if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
|
||||||
|
GstBuffer *buffer;
|
||||||
GstFlowReturn flow_ret;
|
GstFlowReturn flow_ret;
|
||||||
|
|
||||||
flow_ret = gst_pad_push (pad, GST_BUFFER (item->object));
|
buffer = GST_BUFFER (item->object);
|
||||||
|
|
||||||
|
flow_ret = gst_pad_push (pad, buffer);
|
||||||
item->object = NULL;
|
item->object = NULL;
|
||||||
|
|
||||||
|
GST_OBJECT_LOCK (self);
|
||||||
|
gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, flow_ret);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
|
if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
|
||||||
|| flow_ret == GST_FLOW_NOT_LINKED)) {
|
|| flow_ret == GST_FLOW_NOT_LINKED)) {
|
||||||
GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
|
GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
|
||||||
@ -399,9 +437,16 @@ gst_sctp_data_srcpad_loop (GstPad * pad)
|
|||||||
|
|
||||||
item->destroy (item);
|
item->destroy (item);
|
||||||
} else {
|
} else {
|
||||||
|
GST_OBJECT_LOCK (self);
|
||||||
|
gst_flow_combiner_update_pad_flow (self->flow_combiner, pad,
|
||||||
|
GST_FLOW_FLUSHING);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
|
GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
|
||||||
gst_pad_pause_task (pad);
|
gst_pad_pause_task (pad);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gst_object_unref (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
@ -523,6 +568,10 @@ get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id)
|
|||||||
if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
|
if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
|
||||||
goto error_cleanup;
|
goto error_cleanup;
|
||||||
|
|
||||||
|
GST_OBJECT_LOCK (self);
|
||||||
|
gst_flow_combiner_add_pad (self->flow_combiner, new_pad);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
|
|
||||||
gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
|
gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
|
||||||
new_pad, NULL);
|
new_pad, NULL);
|
||||||
|
|
||||||
@ -536,11 +585,14 @@ error_cleanup:
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
remove_pad (GstElement * element, GstPad * pad)
|
remove_pad (GstSctpDec * self, GstPad * pad)
|
||||||
{
|
{
|
||||||
stop_srcpad_task (pad);
|
stop_srcpad_task (pad);
|
||||||
gst_pad_set_active (pad, FALSE);
|
gst_pad_set_active (pad, FALSE);
|
||||||
gst_element_remove_pad (element, pad);
|
gst_element_remove_pad (GST_ELEMENT (self), pad);
|
||||||
|
GST_OBJECT_LOCK (self);
|
||||||
|
gst_flow_combiner_remove_pad (self->flow_combiner, pad);
|
||||||
|
GST_OBJECT_UNLOCK (self);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -557,7 +609,7 @@ on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association,
|
|||||||
GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
|
GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
remove_pad (GST_ELEMENT (self), srcpad);
|
remove_pad (self, srcpad);
|
||||||
gst_object_unref (srcpad);
|
gst_object_unref (srcpad);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -615,7 +667,7 @@ remove_pad_it (const GValue * item, gpointer user_data)
|
|||||||
GstPad *pad = g_value_get_object (item);
|
GstPad *pad = g_value_get_object (item);
|
||||||
GstSctpDec *self = user_data;
|
GstSctpDec *self = user_data;
|
||||||
|
|
||||||
remove_pad (GST_ELEMENT (self), pad);
|
remove_pad (self, pad);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
#define __GST_SCTP_DEC_H__
|
#define __GST_SCTP_DEC_H__
|
||||||
|
|
||||||
#include <gst/gst.h>
|
#include <gst/gst.h>
|
||||||
|
#include <gst/base/base.h>
|
||||||
|
|
||||||
#include "sctpassociation.h"
|
#include "sctpassociation.h"
|
||||||
|
|
||||||
@ -44,6 +45,8 @@ struct _GstSctpDec
|
|||||||
{
|
{
|
||||||
GstElement element;
|
GstElement element;
|
||||||
|
|
||||||
|
GstFlowCombiner *flow_combiner;
|
||||||
|
|
||||||
GstPad *sink_pad;
|
GstPad *sink_pad;
|
||||||
guint sctp_association_id;
|
guint sctp_association_id;
|
||||||
guint local_sctp_port;
|
guint local_sctp_port;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user