From 990baba8e36f1f41373ec258c01f3e02c747448c Mon Sep 17 00:00:00 2001 From: Erik Walthinsen Date: Fri, 8 Dec 2000 10:33:01 +0000 Subject: [PATCH] Another big set of changes. Connections are now also pullfunc based. gstqueue has been updated, I don't know of any ... Original commit message from CVS: Another big set of changes. Connections are now also pullfunc based. gstqueue has been updated, I don't know of any other connections offhand. There are still a few things that need doing, specifically the concept of a source or connection with connections to multiple thread contexts is not dealt with. This may force us to move the threadstate from the element to the pad, maybe keeping the element's copy for simple cases. Then the Bin would create a structure to pass to the cothreaded _wrappers of any such elements, which would detail the pads that are to be dealt with by this particular cothread context. That will speed things up to, since we don't have to look through the list of all pads for every Src or Connection element for every iteration, we can simply step through the list provided by the plan. Special case might even have a single pad pointer sitting there to trump the list, if there's only one (the common case anyway). Task 23098 is tracking these changes. The main task 22588 depends on that subtask, as well as 22240, which is a consistency check on PAD_DISABLED. --- gst/elements/gstqueue.c | 11 +- gst/gstbin.c | 198 +++++++++++++++++++++--------------- gst/gstconnection.c | 21 ---- gst/gstconnection.h | 5 - gst/gstdebug.h | 21 +++- gst/gstelement.c | 1 + gst/gstpad.c | 4 +- plugins/elements/gstqueue.c | 11 +- 8 files changed, 148 insertions(+), 124 deletions(-) diff --git a/gst/elements/gstqueue.c b/gst/elements/gstqueue.c index ec142722f9..7fc2d0a242 100644 --- a/gst/elements/gstqueue.c +++ b/gst/elements/gstqueue.c @@ -58,7 +58,7 @@ static void gst_queue_init (GstQueue *queue); static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_queue_push (GstConnection *connection); +static void gst_queue_pull (GstPad *pad); static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static void gst_queue_flush (GstQueue *queue); @@ -107,8 +107,6 @@ gst_queue_class_init (GstQueueClass *klass) gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT, GTK_ARG_READWRITE, ARG_MAX_LEVEL); - gstconnection_class->push = gst_queue_push; - gtkobject_class->set_arg = gst_queue_set_arg; gtkobject_class->get_arg = gst_queue_get_arg; @@ -123,6 +121,7 @@ gst_queue_init (GstQueue *queue) gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain); queue->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function (queue->srcpad, gst_queue_pull); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); queue->queue = NULL; @@ -219,14 +218,14 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) } static void -gst_queue_push (GstConnection *connection) +gst_queue_pull (GstPad *pad) { - GstQueue *queue = GST_QUEUE (connection); + GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad)); GstBuffer *buf = NULL; GSList *front; gboolean tosignal = FALSE; const guchar *name; - + name = gst_element_get_name (GST_ELEMENT (queue)); /* have to lock for thread-safety */ diff --git a/gst/gstbin.c b/gst/gstbin.c index 84d912c0ba..6aa516305a 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -479,14 +479,14 @@ gst_bin_iterate (GstBin *bin) { GstBinClass *oclass; - DEBUG_ENTER("('%s')",gst_element_get_name(GST_ELEMENT(bin))); + DEBUG_ENTER("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass); if (oclass->iterate) (oclass->iterate) (bin); - DEBUG_LEAVE("('%s')",gst_element_get_name(GST_ELEMENT(bin))); + DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); } /** @@ -531,29 +531,21 @@ gst_bin_loopfunc_wrapper (int argc,char *argv[]) DEBUG("element %s ended loop function\n", name); } else { DEBUG("element %s is chain-based\n", name); - if (GST_IS_CONNECTION (element) && argc == 1) { - while (1) { - DEBUG("calling push function of connection %s\n", name); - gst_connection_push (GST_CONNECTION (element)); - DEBUG("calling push function of connection %s done\n", name); - } - } else { - DEBUG("stepping through pads\n"); - do { - pads = element->pads; - while (pads) { - pad = GST_PAD (pads->data); - if (pad->direction == GST_PAD_SINK) { - DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); - buf = gst_pad_pull (pad); - DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); - (pad->chainfunc) (pad,buf); - DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); - } - pads = g_list_next (pads); + DEBUG("stepping through pads\n"); + do { + pads = element->pads; + while (pads) { + pad = GST_PAD (pads->data); + if (pad->direction == GST_PAD_SINK) { + DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); + buf = gst_pad_pull (pad); + DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); + (pad->chainfunc) (pad,buf); + DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); - } + pads = g_list_next (pads); + } + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); } GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); @@ -562,39 +554,57 @@ gst_bin_loopfunc_wrapper (int argc,char *argv[]) } static int -gst_bin_pullsrc_wrapper (int argc,char *argv[]) +gst_bin_src_wrapper (int argc,char *argv[]) { GstElement *element = GST_ELEMENT (argv); GList *pads; GstPad *pad; G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); - DEBUG("entering gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name); + DEBUG_ENTER("(%d,\"%s\")",argc,name); do { pads = element->pads; while (pads) { pad = GST_PAD (pads->data); if (pad->direction == GST_PAD_SRC) { - region_struct *region = cothread_get_data (element->threadstate, "region"); - if (region) { - //gst_src_push_region (GST_SRC (element), region->offset, region->size); - if (pad->pullregionfunc == NULL) - fprintf(stderr,"error, no pullregionfunc in \"%s\"\n", name); - (pad->pullregionfunc)(pad, region->offset, region->size); - } - else { - if (pad->pullfunc == NULL) - fprintf(stderr,"error, no pullfunc in \"%s\"\n", name); - (pad->pullfunc)(pad); - } + if (pad->pullfunc == NULL) DEBUG("error, no pullfunc in \"%s\"\n", name); + (pad->pullfunc)(pad); } pads = g_list_next(pads); } } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); - DEBUG("leaving gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name); + DEBUG_LEAVE(""); + return 0; +} + +static int +gst_bin_connection_wrapper (int argc,char *argv[]) +{ + GstElement *element = GST_ELEMENT (argv); + GList *pads; + GstPad *pad; + G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); + + DEBUG_ENTER("(%d,\"%s\")",argc,name); + + do { + pads = element->pads; + while (pads) { + pad = GST_PAD (pads->data); + if (pad->direction == GST_PAD_SRC) { + DEBUG("pullfunc for %s:%s(%p) at %p is set to %p\n",GST_DEBUG_PAD_NAME(pad),pad,&pad->pullfunc,pad->pullfunc); + if (pad->pullfunc == NULL) fprintf(stderr,"error, no pullfunc in \"%s\"\n", name); + (pad->pullfunc)(pad); + } + pads = g_list_next(pads); + } + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); + + DEBUG_LEAVE(""); return 0; } @@ -632,14 +642,17 @@ gst_bin_pushfunc_proxy (GstPad *pad) static void gst_bin_create_plan_func (GstBin *bin) { + const gchar *binname = gst_element_get_name(GST_ELEMENT(bin)); GList *elements; GstElement *element; int sink_pads; GList *pads; - GstPad *pad, *opad, *peer; + GstPad *pad, *peer; GstElement *outside; - g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin))); + DEBUG_SET_STRING("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); + DEBUG_ENTER_STRING; +// g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin))); // first loop through all children to see if we need cothreads // we break immediately when we find we need to, why keep searching? @@ -647,25 +660,25 @@ gst_bin_create_plan_func (GstBin *bin) while (elements) { element = GST_ELEMENT (elements->data); - g_print("gstbin: found element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); + DEBUG("found element \"%s\" in bin \"%s\"\n", + gst_element_get_name (element), + gst_element_get_name (GST_ELEMENT (bin))); // if it's a loop-based element, use cothreads if (element->loopfunc != NULL) { - g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); + DEBUG("loop based element \"%s\" in bin \"%s\"\n", + gst_element_get_name (element), + gst_element_get_name (GST_ELEMENT (bin))); bin->need_cothreads = TRUE; break; } // if it's a complex element, use cothreads else if (GST_ELEMENT_IS_MULTI_IN (element)) { - g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); - + DEBUG("complex element \"%s\" in bin \"%s\"\n", + gst_element_get_name (element), + gst_element_get_name (GST_ELEMENT (bin))); + bin->need_cothreads = TRUE; break; } @@ -679,9 +692,9 @@ gst_bin_create_plan_func (GstBin *bin) pads = g_list_next (pads); } if (sink_pads > 1) { - g_print("gstbin: more than 1 sinkpad for element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); + DEBUG("more than 1 sinkpad for element \"%s\" in bin \"%s\"\n", + gst_element_get_name (element), + gst_element_get_name (GST_ELEMENT (bin))); bin->need_cothreads = TRUE; break; @@ -700,12 +713,12 @@ gst_bin_create_plan_func (GstBin *bin) bin->numentries = 0; if (bin->need_cothreads) { - g_print("gstbin: need cothreads\n"); + DEBUG("NEED COTHREADS\n"); // first create thread context if (bin->threadcontext == NULL) { bin->threadcontext = cothread_init (); - g_print("gstbin: initialized cothread context\n"); + DEBUG("initialized cothread context\n"); } // walk through all the children @@ -718,59 +731,68 @@ gst_bin_create_plan_func (GstBin *bin) element->threadstate = cothread_create (bin->threadcontext); cothread_setfunc (element->threadstate, gst_bin_loopfunc_wrapper, 0, (char **)element); + DEBUG("created element threadstate %p for \"%s\"\n",element->threadstate, + gst_element_get_name(element)); } if (GST_IS_BIN (element)) { gst_bin_create_plan (GST_BIN (element)); } if (GST_IS_SRC (element)) { - g_print("gstbin: adding '%s' as entry point\n",gst_element_get_name (element)); + DEBUG("adding '%s' as entry point\n",gst_element_get_name (element)); bin->entries = g_list_prepend (bin->entries,element); bin->numentries++; - cothread_setfunc(element->threadstate,gst_bin_pullsrc_wrapper,0,(char **)element); + cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element); } pads = gst_element_get_pad_list (element); while (pads) { pad = GST_PAD(pads->data); - g_print("gstbin: setting push&pull handlers for %s:%s\n", - gst_element_get_name (element), gst_pad_get_name (pad)); +// DEBUG("setting push&pull handlers for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - // an internal connection will push outside this bin. - if (!GST_IS_CONNECTION (element)) { - pad->pushfunc = gst_bin_pushfunc_proxy; - } - if (!pad->pullfunc) - pad->pullfunc = gst_bin_pullfunc_proxy; - if (!pad->pullregionfunc) - pad->pullregionfunc = gst_bin_pullregionfunc_proxy; + if (gst_pad_get_direction (pad) == GST_PAD_SRC) { + DEBUG("checking/setting push proxy for srcpad %s:%s\n", + GST_DEBUG_PAD_NAME(pad)); + // set the proxy functions + if (!pad->pushfunc) + pad->pushfunc = gst_bin_pushfunc_proxy; - /* we only worry about sink pads */ - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - /* get the pad's peer */ + } else if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("checking/setting pull proxies for sinkpad %s:%s\n", + GST_DEBUG_PAD_NAME(pad)); + // set the proxy functions + if (!pad->pullfunc) + pad->pullfunc = gst_bin_pullfunc_proxy; + if (!pad->pullfunc) + pad->pullregionfunc = gst_bin_pullregionfunc_proxy; + + // ***** check for possible connections outside + // get the pad's peer peer = gst_pad_get_peer (pad); + // FIXME this should be an error condition, if not disabled if (!peer) break; - /* get the parent of the peer of the pad */ + // get the parent of the peer of the pad outside = GST_ELEMENT (gst_pad_get_parent (peer)); + // FIXME this should *really* be an error condition if (!outside) break; /* if it's a connection and it's not ours... */ if (GST_IS_CONNECTION (outside) && (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) { + +/***** this is irrelevant now ***** GList *connection_pads = gst_element_get_pad_list (outside); while (connection_pads) { opad = GST_PAD (connection_pads->data); if (gst_pad_get_direction (opad) == GST_PAD_SRC) { g_print("gstbin: setting push&pull handlers for %s:%s SRC connection %p %p\n", - gst_element_get_name (outside),gst_pad_get_name (opad), opad, opad->pullfunc); + GST_DEBUG_PAD_NAME(pad), opad, opad->pullfunc); opad->pushfunc = gst_bin_pushfunc_proxy; opad->pullfunc = gst_bin_pullfunc_proxy; opad->pullregionfunc = gst_bin_pullregionfunc_proxy; - if (outside->threadstate == NULL) { - outside->threadstate = cothread_create (bin->threadcontext); - cothread_setfunc (outside->threadstate, gst_bin_loopfunc_wrapper, - 1, (char **)outside); + cothread_setfunc (outside->threadstate, gst_bin_loopfunc_wrapper, + 1, (char **)outside); } } connection_pads = g_list_next (connection_pads); @@ -779,8 +801,19 @@ gst_bin_create_plan_func (GstBin *bin) "for internal element \"%s\"\n", gst_element_get_name (GST_ELEMENT (outside)), gst_element_get_name (GST_ELEMENT (element))); +*****/ + DEBUG("connection '%s' outside bin is an entry\n",gst_element_get_name(outside)); bin->entries = g_list_prepend (bin->entries,outside); bin->numentries++; +// FIXME: this won't work in the case where a connection feeds multiple contexts. +// have to have a list of internal structures of connection entries, struct passed to +// the cothread wrapper, struct listing pads that enter that context. + if (outside->threadstate == NULL) { + outside->threadstate = cothread_create (bin->threadcontext); + DEBUG("created element threadstate %p for \"%s\"\n",outside->threadstate, + gst_element_get_name(outside)); + } + cothread_setfunc(outside->threadstate,gst_bin_connection_wrapper,0,(char **)outside); } } pads = g_list_next (pads); @@ -808,7 +841,7 @@ gst_bin_create_plan_func (GstBin *bin) pad = GST_PAD (pads->data); /* we only worry about sink pads */ if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - g_print("gstbin: found SINK pad %s\n", gst_pad_get_name (pad)); + g_print("gstbin '%s': found SINK pad %s:%s\n", binname, GST_DEBUG_PAD_NAME(pad)); /* get the pad's peer */ peer = gst_pad_get_peer (pad); if (!peer) { @@ -848,7 +881,8 @@ gst_bin_iterate_func (GstBin *bin) GList *pads; GstPad *pad; - DEBUG_ENTER("(%s)", gst_element_get_name (GST_ELEMENT (bin))); + DEBUG_SET_STRING("(\"%s\")", gst_element_get_name (GST_ELEMENT (bin))); + DEBUG_ENTER_STRING; g_return_if_fail (bin != NULL); g_return_if_fail (GST_IS_BIN (bin)); @@ -887,9 +921,9 @@ gst_bin_iterate_func (GstBin *bin) } pads = g_list_next (pads); } - } else if (GST_IS_CONNECTION (entry)) - gst_connection_push (GST_CONNECTION (entry)); - else if (GST_IS_BIN (entry)) +// } else if (GST_IS_CONNECTION (entry)) { +// gst_connection_push (GST_CONNECTION (entry)); + } else if (GST_IS_BIN (entry)) gst_bin_iterate (GST_BIN (entry)); else { fprintf(stderr, "gstbin: entry \"%s\" cannot be handled\n", gst_element_get_name (entry)); diff --git a/gst/gstconnection.c b/gst/gstconnection.c index b4037cb9d2..540375580c 100644 --- a/gst/gstconnection.c +++ b/gst/gstconnection.c @@ -91,24 +91,3 @@ gst_connection_new (gchar *name) return connection; } - -/** - * gst_connection_push: - * @connection: the connection to push - * - * Push a buffer along a connection - */ -void -gst_connection_push (GstConnection *connection) -{ - GstConnectionClass *oclass; - - g_return_if_fail (connection != NULL); - g_return_if_fail (GST_IS_CONNECTION (connection)); - - oclass = (GstConnectionClass *)(GTK_OBJECT (connection)->klass); - - g_return_if_fail (oclass->push != NULL); - - (oclass->push)(connection); -} diff --git a/gst/gstconnection.h b/gst/gstconnection.h index 6295fe4833..1640ff4ee0 100644 --- a/gst/gstconnection.h +++ b/gst/gstconnection.h @@ -50,16 +50,11 @@ struct _GstConnection { struct _GstConnectionClass { GstElementClass parent_class; - - /* push function */ - void (*push) (GstConnection *connection); }; GtkType gst_connection_get_type (void); GstElement* gst_connection_new (gchar *name); -void gst_connection_push (GstConnection *connection); - #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/gst/gstdebug.h b/gst/gstdebug.h index b168e928f9..5b5894e682 100644 --- a/gst/gstdebug.h +++ b/gst/gstdebug.h @@ -92,13 +92,28 @@ G_GNUC_UNUSED static GModule *_debug_self_module = NULL; } #ifdef GST_DEBUG_ENABLED -#define DEBUG(format,args...) fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args )) -#define DEBUG_ENTER(format, args...) fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args )) -#define DEBUG_LEAVE(format, args...) fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args )) +#define DEBUG(format,args...) \ + (_debug_string != NULL) ? \ + fprintf(stderr,GST_DEBUG_PREFIX("%s: "format , _debug_string , ## args )) : \ + fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args )) +#define DEBUG_ENTER(format, args...) \ + fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args )) +#define DEBUG_SET_STRING(format, args...) \ + gchar *_debug_string = g_strdup_printf(format , ## args ) +#define DEBUG_ENTER_STRING DEBUG_ENTER("%s",_debug_string) +#define DEBUG_LEAVE(format, args...) \ + if (_debug_string != NULL) g_free(_debug_string),\ +fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args )) #else #define DEBUG(format, args...) #define DEBUG_ENTER(format, args...) #define DEBUG_LEAVE(format, args...) #endif + + +/********** some convenience macros for debugging **********/ +#define GST_DEBUG_PAD_NAME(pad) \ + ((pad)->parent != NULL) ? gst_element_get_name(GST_ELEMENT((pad)->parent)) : "''", gst_pad_get_name(pad) + #endif /* __GST_H__ */ diff --git a/gst/gstelement.c b/gst/gstelement.c index 9a565145da..c931af0301 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -111,6 +111,7 @@ gst_element_init (GstElement *element) element->numpads = 0; element->pads = NULL; element->loopfunc = NULL; + element->threadstate = NULL; } /** diff --git a/gst/gstpad.c b/gst/gstpad.c index 86be1ba110..f9732f2ff7 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -240,9 +240,11 @@ gst_pad_set_pull_function (GstPad *pad, g_return_if_fail (pad != NULL); g_return_if_fail (GST_IS_PAD (pad)); - g_print("gstpad: pad setting pull function\n"); + // the if and such should optimize out when DEBUG is off + DEBUG("setting pull function for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); pad->pullfunc = pull; + DEBUG("pullfunc for %s:%s(%p) at %p is set to %p\n",GST_DEBUG_PAD_NAME(pad),pad,&pad->pullfunc,pull); } /** diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index ec142722f9..7fc2d0a242 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -58,7 +58,7 @@ static void gst_queue_init (GstQueue *queue); static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_queue_push (GstConnection *connection); +static void gst_queue_pull (GstPad *pad); static void gst_queue_chain (GstPad *pad, GstBuffer *buf); static void gst_queue_flush (GstQueue *queue); @@ -107,8 +107,6 @@ gst_queue_class_init (GstQueueClass *klass) gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT, GTK_ARG_READWRITE, ARG_MAX_LEVEL); - gstconnection_class->push = gst_queue_push; - gtkobject_class->set_arg = gst_queue_set_arg; gtkobject_class->get_arg = gst_queue_get_arg; @@ -123,6 +121,7 @@ gst_queue_init (GstQueue *queue) gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain); queue->srcpad = gst_pad_new ("src", GST_PAD_SRC); + gst_pad_set_pull_function (queue->srcpad, gst_queue_pull); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); queue->queue = NULL; @@ -219,14 +218,14 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) } static void -gst_queue_push (GstConnection *connection) +gst_queue_pull (GstPad *pad) { - GstQueue *queue = GST_QUEUE (connection); + GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad)); GstBuffer *buf = NULL; GSList *front; gboolean tosignal = FALSE; const guchar *name; - + name = gst_element_get_name (GST_ELEMENT (queue)); /* have to lock for thread-safety */