diff --git a/gst/elements/gstasyncdisksrc.c b/gst/elements/gstasyncdisksrc.c index dd3f60d865..06961c083a 100644 --- a/gst/elements/gstasyncdisksrc.c +++ b/gst/elements/gstasyncdisksrc.c @@ -57,8 +57,8 @@ static void gst_asyncdisksrc_init (GstAsyncDiskSrc *asyncdisksrc); static void gst_asyncdisksrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_asyncdisksrc_get (GstPad *pad); -static void gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size); +static GstBuffer * gst_asyncdisksrc_get (GstPad *pad); +static GstBuffer * gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size); static GstElementStateReturn gst_asyncdisksrc_change_state (GstElement *element); @@ -209,35 +209,34 @@ gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) * * Push a new buffer from the asyncdisksrc at the current offset. */ -static void +static GstBuffer * gst_asyncdisksrc_get (GstPad *pad) { GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); - g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); - + g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL); + /* deal with EOF state */ if (src->curoffset >= src->size) { gst_src_signal_eos (GST_SRC (src)); - return; + return NULL; } /* create the buffer */ // FIXME: should eventually use a bufferpool for this buf = gst_buffer_new (); - - g_return_if_fail (buf != NULL); + + g_return_val_if_fail (buf != NULL, NULL); /* simply set the buffer to point to the correct region of the file */ GST_BUFFER_DATA (buf) = src->map + src->curoffset; GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE); - if ((src->curoffset + src->bytes_per_read) > - src->size) { + if ((src->curoffset + src->bytes_per_read) > src->size) { GST_BUFFER_SIZE (buf) = src->size - src->curoffset; // FIXME: set the buffer's EOF bit here } else @@ -250,8 +249,8 @@ gst_asyncdisksrc_get (GstPad *pad) src->new_seek = FALSE; } - /* we're done, push the buffer off now */ - gst_pad_push (pad, buf); + /* we're done, return the buffer */ + return buf; } /** @@ -262,29 +261,29 @@ gst_asyncdisksrc_get (GstPad *pad) * * Push a new buffer from the asyncdisksrc of given size at given offset. */ -static void +static GstBuffer * gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size) { GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); - g_return_if_fail (GST_IS_ASYNCDISKSRC (src)); - g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); + g_return_val_if_fail (GST_IS_ASYNCDISKSRC (src), NULL); + g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL); /* deal with EOF state */ if (offset >= src->size) { gst_src_signal_eos (GST_SRC (src)); - return; + return NULL; } /* create the buffer */ // FIXME: should eventually use a bufferpool for this buf = gst_buffer_new (); - g_return_if_fail (buf); + g_return_val_if_fail (buf != NULL, NULL); /* simply set the buffer to point to the correct region of the file */ GST_BUFFER_DATA (buf) = src->map + offset; diff --git a/gst/elements/gstaudiosrc.c b/gst/elements/gstaudiosrc.c index 91a3a2bf18..1cae833421 100644 --- a/gst/elements/gstaudiosrc.c +++ b/gst/elements/gstaudiosrc.c @@ -64,7 +64,7 @@ static void gst_audiosrc_close_audio (GstAudioSrc *src); static gboolean gst_audiosrc_open_audio (GstAudioSrc *src); static void gst_audiosrc_sync_parms (GstAudioSrc *audiosrc); -static void gst_audiosrc_get (GstPad *pad); +static GstBuffer * gst_audiosrc_get (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_audiosrc_signals[LAST_SIGNAL] = { 0 }; @@ -140,27 +140,29 @@ gst_audiosrc_init (GstAudioSrc *audiosrc) audiosrc->seq = 0; } -void gst_audiosrc_get(GstPad *pad) { +static GstBuffer * +gst_audiosrc_get (GstPad *pad) +{ GstAudioSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_AUDIOSRC(gst_pad_get_parent(pad)); // g_print("attempting to read something from soundcard\n"); buf = gst_buffer_new (); - g_return_if_fail (buf); + g_return_val_if_fail (buf, NULL); GST_BUFFER_DATA (buf) = (gpointer)g_malloc (src->bytes_per_read); - + readbytes = read (src->fd,GST_BUFFER_DATA (buf), src->bytes_per_read); if (readbytes == 0) { gst_src_signal_eos (GST_SRC (src)); - return; + return NULL; } GST_BUFFER_SIZE (buf) = readbytes; @@ -170,8 +172,8 @@ void gst_audiosrc_get(GstPad *pad) { // gst_buffer_add_meta(buf,GST_META(newmeta)); - gst_pad_push (pad,buf); // g_print("pushed buffer from soundcard of %d bytes\n",readbytes); + return buf; } static void diff --git a/gst/elements/gstdisksrc.c b/gst/elements/gstdisksrc.c index ea49fd769d..7df94eb4f7 100644 --- a/gst/elements/gstdisksrc.c +++ b/gst/elements/gstdisksrc.c @@ -58,7 +58,7 @@ static void gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_disksrc_close_file (GstDiskSrc *src); -static void gst_disksrc_get (GstPad *pad); +static GstBuffer * gst_disksrc_get (GstPad *pad); static GstElementStateReturn gst_disksrc_change_state (GstElement *element); @@ -199,7 +199,7 @@ gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } } -static void +static GstBuffer * gst_disksrc_get (GstPad *pad) { GstDiskSrc *src; @@ -250,8 +250,8 @@ gst_disksrc_get (GstPad *pad) DEBUG("pushing %d bytes with offset %d\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf)); /* we're done, push the buffer off now */ - gst_pad_push (pad, buf); - DEBUG("pushing %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf)); + DEBUG("returning %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf)); + return buf; } diff --git a/gst/elements/gstfakesrc.c b/gst/elements/gstfakesrc.c index 306460f34e..c5c28c1b52 100644 --- a/gst/elements/gstfakesrc.c +++ b/gst/elements/gstfakesrc.c @@ -39,14 +39,17 @@ enum { enum { ARG_0, - /* FILL ME */ + ARG_NUM_SOURCES, }; -static void gst_fakesrc_class_init (GstFakeSrcClass *klass); -static void gst_fakesrc_init (GstFakeSrc *fakesrc); +static void gst_fakesrc_class_init (GstFakeSrcClass *klass); +static void gst_fakesrc_init (GstFakeSrc *fakesrc); -static void gst_fakesrc_get (GstPad *pad); +static void gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id); + +static GstBuffer * gst_fakesrc_get (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_fakesrc_signals[LAST_SIGNAL] = { 0 }; @@ -75,44 +78,95 @@ gst_fakesrc_get_type (void) static void gst_fakesrc_class_init (GstFakeSrcClass *klass) { + GtkObjectClass *gtkobject_class; GstSrcClass *gstsrc_class; + gtkobject_class = (GtkObjectClass*)klass; gstsrc_class = (GstSrcClass*)klass; parent_class = gtk_type_class (GST_TYPE_SRC); + + gtk_object_add_arg_type ("GstFakeSrc::num_sources", GTK_TYPE_INT, + GTK_ARG_READWRITE, ARG_NUM_SOURCES); + + gtkobject_class->set_arg = gst_fakesrc_set_arg; + gtkobject_class->get_arg = gst_fakesrc_get_arg; } static void gst_fakesrc_init(GstFakeSrc *fakesrc) { - // create our output pad - fakesrc->srcpad = gst_pad_new("src",GST_PAD_SRC); - gst_pad_set_get_function(fakesrc->srcpad,gst_fakesrc_get); - gst_element_add_pad(GST_ELEMENT(fakesrc),fakesrc->srcpad); + GstPad *pad; + + // set the default number of + fakesrc->numsrcpads = 1; + + // create our first output pad + pad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_get_function(pad,gst_fakesrc_get); + gst_element_add_pad(GST_ELEMENT(fakesrc),pad); + fakesrc->srcpads = g_slist_append(NULL,pad); // we're ready right away, since we don't have any args... // gst_element_set_state(GST_ELEMENT(fakesrc),GST_STATE_READY); } -/** - * gst_fakesrc_new: - * @name: then name of the fakse source - * - * create a new fakesrc - * - * Returns: The new element. - */ -GstElement *gst_fakesrc_new(gchar *name) { - GstElement *fakesrc = GST_ELEMENT(gtk_type_new(GST_TYPE_FAKESRC)); - gst_element_set_name(GST_ELEMENT(fakesrc),name); - return fakesrc; +static void +gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id) +{ + GstFakeSrc *src; + gint new_numsrcs; + GstPad *pad; + + /* it's not null if we got it, but it might not be ours */ + src = GST_FAKESRC (object); + + switch(id) { + case ARG_NUM_SOURCES: + new_numsrcs = GTK_VALUE_INT (*arg); + if (new_numsrcs > src->numsrcpads) { + while (src->numsrcpads != new_numsrcs) { + pad = gst_pad_new(g_strdup_printf("src%d",src->numsrcpads),GST_PAD_SRC); + gst_pad_set_get_function(pad,gst_fakesrc_get); + gst_element_add_pad(GST_ELEMENT(src),pad); + src->srcpads = g_slist_append(src->srcpads,pad); + src->numsrcpads; + } + } + break; + default: + break; + } } +static void +gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id) +{ + GstFakeSrc *src; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail (GST_IS_FAKESRC (object)); + + src = GST_FAKESRC (object); + + switch (id) { + case ARG_NUM_SOURCES: + GTK_VALUE_INT (*arg) = src->numsrcpads; + break; + default: + arg->type = GTK_TYPE_INVALID; + break; + } +} + + /** * gst_fakesrc_get: * @src: the faksesrc to get * * generate an empty buffer and push it to the next element. */ -void gst_fakesrc_get(GstPad *pad) { +static GstBuffer * +gst_fakesrc_get(GstPad *pad) +{ GstFakeSrc *src; GstBuffer *buf; @@ -122,5 +176,5 @@ void gst_fakesrc_get(GstPad *pad) { g_print("(%s:%s)> ",GST_DEBUG_PAD_NAME(pad)); buf = gst_buffer_new(); - gst_pad_push(pad,buf); + return buf; } diff --git a/gst/elements/gstfakesrc.h b/gst/elements/gstfakesrc.h index 242c258133..bbda3fda45 100644 --- a/gst/elements/gstfakesrc.h +++ b/gst/elements/gstfakesrc.h @@ -51,7 +51,8 @@ typedef struct _GstFakeSrcClass GstFakeSrcClass; struct _GstFakeSrc { GstSrc src; - GstPad *srcpad; + gint numsrcpads; + GSList *srcpads; }; struct _GstFakeSrcClass { diff --git a/gst/elements/gstfdsrc.c b/gst/elements/gstfdsrc.c index dfb2447997..e3df978262 100644 --- a/gst/elements/gstfdsrc.c +++ b/gst/elements/gstfdsrc.c @@ -51,13 +51,13 @@ enum { }; -static void gst_fdsrc_class_init (GstFdSrcClass *klass); -static void gst_fdsrc_init (GstFdSrc *fdsrc); +static void gst_fdsrc_class_init (GstFdSrcClass *klass); +static void gst_fdsrc_init (GstFdSrc *fdsrc); -static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_fdsrc_get (GstPad *pad); +static GstBuffer * gst_fdsrc_get (GstPad *pad); static GstSrcClass *parent_class = NULL; @@ -175,28 +175,30 @@ gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } } -void gst_fdsrc_get(GstPad *pad) { +static GstBuffer * +gst_fdsrc_get(GstPad *pad) +{ GstFdSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_FDSRC(gst_pad_get_parent(pad)); /* create the buffer */ // FIXME: should eventually use a bufferpool for this buf = gst_buffer_new (); - g_return_if_fail (buf); + g_return_val_if_fail (buf, NULL); /* allocate the space for the buffer data */ GST_BUFFER_DATA(buf) = g_malloc(src->bytes_per_read); - g_return_if_fail(GST_BUFFER_DATA(buf) != NULL); + g_return_val_if_fail(GST_BUFFER_DATA(buf) != NULL, NULL); /* read it in from the file */ readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read); if (readbytes == 0) { gst_src_signal_eos(GST_SRC(src)); - return; + return NULL; } /* if we didn't get as many bytes as we asked for, we're at EOF */ @@ -208,6 +210,6 @@ void gst_fdsrc_get(GstPad *pad) { GST_BUFFER_SIZE(buf) = readbytes; src->curoffset += readbytes; - /* we're done, push the buffer off now */ - gst_pad_push(pad,buf); + /* we're done, return the buffer */ + return buf; } diff --git a/gst/elements/gsthttpsrc.c b/gst/elements/gsthttpsrc.c index 15eb2a9efc..0559e39f55 100644 --- a/gst/elements/gsthttpsrc.c +++ b/gst/elements/gsthttpsrc.c @@ -55,7 +55,7 @@ static void gst_httpsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_httpsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); static GstElementStateReturn gst_httpsrc_change_state (GstElement *element); -static void gst_httpsrc_get (GstPad *pad); +static GstBuffer * gst_httpsrc_get (GstPad *pad); static gboolean gst_httpsrc_open_url (GstHttpSrc *src); static void gst_httpsrc_close_url (GstHttpSrc *src); @@ -122,12 +122,14 @@ static void gst_httpsrc_init(GstHttpSrc *httpsrc) { httpsrc->bytes_per_read = 4096; } -static void gst_httpsrc_get(GstPad *pad) { +static GstBuffer * +gst_httpsrc_get(GstPad *pad) +{ GstHttpSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_HTTPSRC(gst_pad_get_parent(pad)); buf = gst_buffer_new(); @@ -136,7 +138,7 @@ static void gst_httpsrc_get(GstPad *pad) { if (readbytes == 0) { gst_src_signal_eos(GST_SRC(src)); - return; + return NULL; } if (readbytes < src->bytes_per_read) { @@ -146,7 +148,7 @@ static void gst_httpsrc_get(GstPad *pad) { GST_BUFFER_SIZE(buf) = readbytes; src->curoffset += readbytes; - gst_pad_push(pad,buf); + return buf; } static gboolean diff --git a/gst/elements/gstqueue.c b/gst/elements/gstqueue.c index 2f29e4b90c..e9c7a67486 100644 --- a/gst/elements/gstqueue.c +++ b/gst/elements/gstqueue.c @@ -116,12 +116,14 @@ gst_queue_class_init (GstQueueClass *klass) static void gst_queue_init (GstQueue *queue) { + GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY); + queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain); + gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); queue->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_pad_set_get_function (queue->srcpad, gst_queue_get); + gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); queue->queue = NULL; @@ -173,19 +175,19 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) /* we have to lock the queue since we span threads */ - DEBUG("queue: try have queue lock\n"); +// DEBUG("queue: try have queue lock\n"); GST_LOCK (queue); - DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ()); - DEBUG("queue: have queue lock\n"); +// DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ()); +// DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) { gst_queue_flush (queue); } - DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); +// DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); while (queue->level_buffers >= queue->max_buffers) { - DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); +// DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); GST_UNLOCK (queue); g_mutex_lock (queue->fulllock); @@ -193,26 +195,27 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) g_mutex_unlock (queue->fulllock); GST_LOCK (queue); STATUS("%s: O+\n"); - DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); +// DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } /* put the buffer on the tail of the list */ queue->queue = g_slist_append (queue->queue, buf); - STATUS("%s: +\n"); +// STATUS("%s: +\n"); + g_print("(%s:%s)+ ",GST_DEBUG_PAD_NAME(pad)); /* if we were empty, but aren't any more, signal a condition */ tosignal = (queue->level_buffers >= 0); queue->level_buffers++; /* we can unlock now */ - DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); +// DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); GST_UNLOCK (queue); if (tosignal) { g_mutex_lock (queue->emptylock); - STATUS("%s: >\n"); +// STATUS("%s: >\n"); g_cond_signal (queue->emptycond); - STATUS("%s: >>\n"); +// STATUS("%s: >>\n"); g_mutex_unlock (queue->emptylock); } } @@ -229,19 +232,19 @@ gst_queue_get (GstPad *pad) name = gst_element_get_name (GST_ELEMENT (queue)); /* have to lock for thread-safety */ - DEBUG("queue: %s try have queue lock\n", name); +// DEBUG("queue: %s try have queue lock\n", name); GST_LOCK (queue); - DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); - DEBUG("queue: %s have queue lock\n", name); +// DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); +// DEBUG("queue: %s have queue lock\n", name); while (!queue->level_buffers) { - STATUS("queue: %s U released lock\n"); - GST_UNLOCK (queue); +// STATUS("queue: %s U released lock\n"); +// GST_UNLOCK (queue); g_mutex_lock (queue->emptylock); g_cond_wait (queue->emptycond, queue->emptylock); g_mutex_unlock (queue->emptylock); GST_LOCK (queue); - STATUS("queue: %s U- getting lock\n"); +// STATUS("queue: %s U- getting lock\n"); } front = queue->queue; @@ -250,21 +253,22 @@ gst_queue_get (GstPad *pad) g_slist_free (front); queue->level_buffers--; - STATUS("%s: -\n"); +// STATUS("%s: -\n"); + g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); tosignal = queue->level_buffers < queue->max_buffers; GST_UNLOCK(queue); if (tosignal) { g_mutex_lock (queue->fulllock); - STATUS("%s: < \n"); +// STATUS("%s: < \n"); g_cond_signal (queue->fullcond); - STATUS("%s: << \n"); +// STATUS("%s: << \n"); g_mutex_unlock (queue->fulllock); } - DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); +// DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); gst_pad_push (queue->srcpad, buf); - DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); +// DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); /* unlock now */ } diff --git a/gst/elements/gstsinesrc.c b/gst/elements/gstsinesrc.c index e15c6c2bee..5527847dd9 100644 --- a/gst/elements/gstsinesrc.c +++ b/gst/elements/gstsinesrc.c @@ -62,7 +62,7 @@ static void gst_sinesrc_get_arg(GtkObject *object,GtkArg *arg,guint id); //static gboolean gst_sinesrc_open_audio(GstSineSrc *src); void gst_sinesrc_sync_parms(GstSineSrc *sinesrc); -void gst_sinesrc_get(GstPad *pad); +static GstBuffer * gst_sinesrc_get(GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_sinesrc_signals[LAST_SIGNAL] = { 0 }; @@ -142,7 +142,9 @@ GstElement *gst_sinesrc_new_with_fd(gchar *name,gchar *filename) { return sinesrc; } -void gst_sinesrc_get(GstPad *pad) { +static GstBuffer * +gst_sinesrc_get(GstPad *pad) +{ GstSineSrc *src; GstBuffer *buf; gint16 *samples; @@ -150,11 +152,11 @@ void gst_sinesrc_get(GstPad *pad) { gint volume; gdouble val; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_SINESRC(gst_pad_get_parent(pad)); buf = gst_buffer_new(); - g_return_if_fail(buf); + g_return_val_if_fail (buf, NULL); GST_BUFFER_DATA(buf) = (gpointer)malloc(4096); samples = (gint16*)GST_BUFFER_DATA(buf); GST_BUFFER_SIZE(buf) = 4096; @@ -173,8 +175,8 @@ void gst_sinesrc_get(GstPad *pad) { src->sentmeta = TRUE; } - gst_pad_push(pad,buf); g_print(">"); + return buf; } static void gst_sinesrc_set_arg(GtkObject *object,GtkArg *arg,guint id) { diff --git a/gst/gstbin.c b/gst/gstbin.c index ac24737612..37d4ff5105 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -121,6 +121,9 @@ gst_bin_class_init (GstBinClass *klass) static void gst_bin_init (GstBin *bin) { + // in general, we prefer to use cothreads for most things + GST_FLAG_SET (bin, GST_BIN_FLAG_PREFER_COTHREADS); + bin->numchildren = 0; bin->children = NULL; // FIXME temporary testing measure @@ -213,12 +216,13 @@ gst_bin_change_state (GstElement *element) GList *children; GstElement *child; + DEBUG_ENTER("(\"%s\")",gst_element_get_name (element)); + g_return_val_if_fail (GST_IS_BIN (element), GST_STATE_FAILURE); bin = GST_BIN (element); - g_print("gst_bin_change_state(\"%s\"): currently %d(%s), %d(%s) pending\n", - gst_element_get_name (element), GST_STATE (element), + DEBUG("currently %d(%s), %d(%s) pending\n", GST_STATE (element), _gst_print_statename (GST_STATE (element)), GST_STATE_PENDING (element), _gst_print_statename (GST_STATE_PENDING (element))); @@ -228,17 +232,16 @@ gst_bin_change_state (GstElement *element) children = bin->children; while (children) { child = GST_ELEMENT (children->data); - g_print("gst_bin_change_state: setting state on '%s'\n", - gst_element_get_name (child)); + DEBUG("setting state on '%s'\n",gst_element_get_name (child)); switch (gst_element_set_state (child, GST_STATE_PENDING (element))) { case GST_STATE_FAILURE: GST_STATE_PENDING (element) = GST_STATE_NONE_PENDING; - g_print("gstbin: child '%s' failed to go to state %d(%s)\n", gst_element_get_name (child), - GST_STATE_PENDING (element), _gst_print_statename (GST_STATE_PENDING (element))); + DEBUG("child '%s' failed to go to state %d(%s)\n", gst_element_get_name (child), + GST_STATE_PENDING (element), _gst_print_statename (GST_STATE_PENDING (element))); return GST_STATE_FAILURE; break; case GST_STATE_ASYNC: - g_print("gstbin: child '%s' is changing state asynchronously\n", gst_element_get_name (child)); + DEBUG("child '%s' is changing state asynchronously\n", gst_element_get_name (child)); break; } // g_print("\n"); @@ -492,7 +495,7 @@ gst_bin_iterate (GstBin *bin) /** * gst_bin_create_plan: - * @bin: #Gstbin to create the plan for + * @bin: #GstBin to create the plan for * * let the bin figure out how to handle the plugins in it. */ @@ -516,38 +519,47 @@ static int gst_bin_loopfunc_wrapper (int argc,char *argv[]) { GstElement *element = GST_ELEMENT (argv); - GList *pads; - GstPad *pad; - GstBuffer *buf; G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); DEBUG_ENTER("(%d,'%s')",argc,name); -// DEBUG("entering gst_bin_loopfunc_wrapper(%d,\"%s\")\n", -// argc,gst_element_get_name (element)); - - if (element->loopfunc != NULL) { - DEBUG("element %s has loop function, calling it\n", name); + do { + DEBUG("calling loopfunc %s for element %s\n", + GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name); (element->loopfunc) (element); DEBUG("element %s ended loop function\n", name); - } else { - DEBUG("element %s is chain-based\n", name); - DEBUG("stepping through pads\n"); - do { - pads = element->pads; - while (pads) { - pad = GST_PAD (pads->data); - if (pad->direction == GST_PAD_SINK) { - DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); - buf = gst_pad_pull (pad); - DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); - (pad->chainfunc) (pad,buf); - DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); - } - pads = g_list_next (pads); + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); + + DEBUG_LEAVE("(%d,'%s')",argc,name); + return 0; +} + +static int +gst_bin_chain_wrapper (int argc,char *argv[]) +{ + GstElement *element = GST_ELEMENT (argv); + G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); + GList *pads; + GstPad *pad; + GstBuffer *buf; + + DEBUG_ENTER("(\"%s\")",name); + DEBUG("stepping through pads\n"); + do { + pads = element->pads; + while (pads) { + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + if (pad->direction == GST_PAD_SINK) { + DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); + buf = gst_pad_pull (pad); + DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); + (pad->chainfunc) (pad,buf); + DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); - } + } + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); DEBUG_LEAVE("(%d,'%s')",argc,name); @@ -560,6 +572,7 @@ gst_bin_src_wrapper (int argc,char *argv[]) GstElement *element = GST_ELEMENT (argv); GList *pads; GstPad *pad; + GstBuffer *buf; G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); DEBUG_ENTER("(%d,\"%s\")",argc,name); @@ -575,12 +588,15 @@ gst_bin_src_wrapper (int argc,char *argv[]) //gst_src_push_region (GST_SRC (element), region->offset, region->size); if (pad->getregionfunc == NULL) fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); - (pad->getregionfunc)(pad, region->offset, region->size); + buf = (pad->getregionfunc)(pad, region->offset, region->size); } else { if (pad->getfunc == NULL) fprintf(stderr,"error, no getfunc in \"%s\"\n", name); - (pad->getfunc)(pad); + buf = (pad->getfunc)(pad); } + + DEBUG("calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + gst_pad_push (pad, buf); } pads = g_list_next(pads); } @@ -591,25 +607,6 @@ gst_bin_src_wrapper (int argc,char *argv[]) return 0; } -static void -gst_bin_pullregionfunc_proxy (GstPad *pad, - gulong offset, - gulong size) -{ - region_struct region; - cothread_state *threadstate; - - DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size); - - region.offset = offset; - region.size = size; - - threadstate = GST_ELEMENT(pad->parent)->threadstate; - cothread_set_data (threadstate, "region", ®ion); - cothread_switch (threadstate); - cothread_set_data (threadstate, "region", NULL); -} - static void gst_bin_pushfunc_proxy (GstPad *pad, GstBuffer *buf) { @@ -639,202 +636,258 @@ gst_bin_pullfunc_proxy (GstPad *pad) return buf; } -static void -gst_bin_pushfunc_fake_proxy (GstPad *pad) +static GstBuffer * +gst_bin_chainfunc_proxy (GstPad *pad) { + GstBuffer *buf; } +// FIXME!!! +static void +gst_bin_pullregionfunc_proxy (GstPad *pad, + gulong offset, + gulong size) +{ + region_struct region; + cothread_state *threadstate; + + DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size); + + region.offset = offset; + region.size = size; + + threadstate = GST_ELEMENT(pad->parent)->threadstate; + cothread_set_data (threadstate, "region", ®ion); + cothread_switch (threadstate); + cothread_set_data (threadstate, "region", NULL); +} + + static void gst_bin_create_plan_func (GstBin *bin) { + GstElement *manager; GList *elements; GstElement *element; - int sink_pads; + const gchar *elementname; + GSList *pending_bins = NULL; + GstBin *pending_bin; GList *pads; - GstPad *pad, *peer; - GstElement *outside; + GstPad *pad; + GstElement *peer_manager; + cothread_func wrapper_function; - DEBUG_SET_STRING("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); + DEBUG_SET_STRING("(\"%s\")",gst_element_get_name (GST_ELEMENT (bin))); DEBUG_ENTER_STRING; - // first loop through all children to see if we need cothreads - // we break immediately when we find we need to, why keep searching? + // first figure out which element is the manager of this and all child elements + // if we're a managing bin ourselves, that'd be us + if (GST_FLAG_IS_SET (bin, GST_BIN_FLAG_MANAGER)) { + manager = GST_ELEMENT (bin); + DEBUG("setting manager to self\n"); + // otherwise, it's what our parent says it is + } else { + manager = gst_element_get_manager (GST_ELEMENT (bin)); + DEBUG("setting manager to \"%s\"\n", gst_element_get_name (manager)); + } + + // perform the first recursive pass of plan generation + // we set the manager of every element but those who manage themselves + // the need for cothreads is also determined recursively + DEBUG("performing first-phase recursion\n"); + bin->need_cothreads = bin->use_cothreads; + if (bin->need_cothreads) + DEBUG("requiring cothreads because we're forced to\n"); + elements = bin->children; while (elements) { element = GST_ELEMENT (elements->data); - - DEBUG("found element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); - - // if it's a loop-based element, use cothreads - if (element->loopfunc != NULL) { - DEBUG("loop based element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); - - bin->need_cothreads = TRUE; - DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element)); - break; - } - - // if it's a complex element, use cothreads - else if (GST_ELEMENT_IS_MULTI_IN (element)) { - DEBUG("complex element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); - - bin->need_cothreads = TRUE; - DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element)); - break; - } - - // if it has more than one input pad, use cothreads - sink_pads = 0; - pads = gst_element_get_pad_list (element); - while (pads) { - pad = GST_PAD (pads->data); - if (pad->direction == GST_PAD_SINK) - sink_pads++; - pads = g_list_next (pads); - } - if (sink_pads > 1) { - DEBUG("more than 1 sinkpad for element \"%s\" in bin \"%s\"\n", - gst_element_get_name (element), - gst_element_get_name (GST_ELEMENT (bin))); - - bin->need_cothreads = TRUE; - DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element)); - break; - } - elements = g_list_next (elements); +#ifdef GST_DEBUG_ENABLED + elementname = gst_element_get_name (element); +#endif + DEBUG("have element \"%s\"\n",elementname); + + // first set their manager + DEBUG("setting manager of \"%s\" to \"%s\"\n",elementname,gst_element_get_name(manager)); + gst_element_set_manager (element, manager); + + // we do recursion and such for Bins + if (GST_IS_BIN (element)) { + // recurse into the child Bin + DEBUG("recursing into child Bin \"%s\"\n",elementname); + gst_bin_create_plan (GST_BIN (element)); + // check to see if it needs cothreads and isn't self-managing + if (((GST_BIN (element))->need_cothreads) && !GST_FLAG_IS_SET(element,GST_BIN_FLAG_MANAGER)) { + DEBUG("requiring cothreads because child bin \"%s\" does\n",elementname); + bin->need_cothreads = TRUE; + } + } else { + // then we need to determine whether they need cothreads + // if it's a loop-based element, use cothreads + if (element->loopfunc != NULL) { + DEBUG("requiring cothreads because \"%s\" is a loop-based element\n",elementname); + bin->need_cothreads = TRUE; + // if it's a 'complex' element, use cothreads + } else if (GST_FLAG_IS_SET (element, GST_ELEMENT_COMPLEX)) { + DEBUG("requiring cothreads because \"%s\" is complex\n",elementname); + bin->need_cothreads = TRUE; + // if the element has more than one sink pad, use cothreads + } else if (element->numsinkpads > 1) { + DEBUG("requiring cothreads because \"%s\" has more than one sink pad\n",elementname); + bin->need_cothreads = TRUE; + } + } } - // FIXME -// bin->need_cothreads &= bin->use_cothreads; - // FIXME temporary testing measure - if (bin->use_cothreads) bin->need_cothreads = TRUE; + + // if we're not a manager thread, we're done. + if (!GST_FLAG_IS_SET (bin, GST_BIN_FLAG_MANAGER)) { + DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); + return; + } // clear previous plan state + g_list_free (bin->managed_elements); + bin->managed_elements = NULL; + bin->num_managed_elements = 0; g_list_free (bin->entries); bin->entries = NULL; - bin->numentries = 0; + bin->num_entries = 0; + // find all the managed children + // here we pull off the trick of walking an entire arbitrary tree without recursion + DEBUG("attempting to find all the elements to manage\n"); + pending_bins = g_slist_prepend (pending_bins, bin); + do { + // retrieve the top of the stack and pop it + pending_bin = GST_BIN (pending_bins->data); + pending_bins = g_slist_remove (pending_bins, pending_bin); + + // walk the list of elements, find bins, and do stuff + DEBUG("checking Bin \"%s\" for managed elements\n", + gst_element_get_name (GST_ELEMENT (pending_bin))); + elements = pending_bin->children; + while (elements) { + element = GST_ELEMENT (elements->data); + elements = g_list_next (elements); +#ifdef GST_DEBUG_ENABLED + elementname = gst_element_get_name (element); +#endif + + // if it's ours, add it to the list + if (element->manager == GST_ELEMENT(bin)) { + // if it's a Bin, add it to the list of Bins to check + if (GST_IS_BIN (element)) { + DEBUG("flattened recurse into \"%s\"\n",elementname); + pending_bins = g_slist_prepend (pending_bins, element); + // otherwise add it to the list of elements + } else { + DEBUG("found element \"%s\" that I manage\n",elementname); + bin->managed_elements = g_list_prepend (bin->managed_elements, element); + bin->num_managed_elements++; + } + } + } + } while (pending_bins); + + DEBUG("have %d elements to manage, implementing plan\n",bin->num_managed_elements); + + // If cothreads are needed, we need to not only find elements but + // set up cothread states and various proxy functions. if (bin->need_cothreads) { // first create thread context if (bin->threadcontext == NULL) { + DEBUG("initializing cothread context\n"); bin->threadcontext = cothread_init (); - DEBUG("initialized cothread context\n"); } // walk through all the children - elements = bin->children; + elements = bin->managed_elements; while (elements) { element = GST_ELEMENT (elements->data); - - // start by creating thread state for the element - if (element->threadstate == NULL) { - element->threadstate = cothread_create (bin->threadcontext); - cothread_setfunc (element->threadstate, gst_bin_loopfunc_wrapper, - 0, (char **)element); - DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate, - &element->threadstate,gst_element_get_name(element)); - } - - if (GST_IS_BIN (element)) { - gst_bin_create_plan (GST_BIN (element)); - - } else if (GST_IS_SRC (element)) { - DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); - bin->entries = g_list_prepend (bin->entries,element); - bin->numentries++; - cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element); - } - - pads = gst_element_get_pad_list (element); - while (pads) { - pad = GST_PAD(pads->data); - - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - // set the proxy functions - pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); - DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy); - } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { - DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - // set the proxy functions - pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); - DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n", - &pad->pullfunc,gst_bin_pullfunc_proxy); - pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy); - } - pads = g_list_next (pads); - } elements = g_list_next (elements); - // if there are no entries, we have to pick one at random - if (bin->numentries == 0) - bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data)); - } - } else { - DEBUG("don't need cothreads, looking for entry points\n"); - // we have to find which elements will drive an iteration - elements = bin->children; - while (elements) { - element = GST_ELEMENT (elements->data); - DEBUG("found element \"%s\"\n", gst_element_get_name (element)); - if (GST_IS_BIN (element)) { - gst_bin_create_plan (GST_BIN (element)); - } - if (GST_IS_SRC (element)) { - DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); - bin->entries = g_list_prepend (bin->entries, element); - bin->numentries++; + // start out with a NULL warpper function, we'll set it if we want a cothread + wrapper_function = NULL; + + // have to decide if we need to or can use a cothreads, and if so which wrapper + // first of all, if there's a loopfunc, the decision's already made + if (element->loopfunc != NULL) { + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper); + DEBUG("element %s is a loopfunc, must use a cothread\n",gst_element_get_name(element)); + } else { + // otherwise we need to decide if it needs a cothread + // if it's complex, or cothreads are preferred and it's *not* passive, cothread it + if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) || + (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) && + !GST_FLAG_IS_SET (element,GST_ELEMENT_SCHEDULE_PASSIVELY))) { + // base it on whether we're going to loop through source or sink pads + if (element->numsinkpads == 0) + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper); + else + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper); + } } - // go through all the pads, set pointers, and check for connections + // walk through the all the pads for this element, setting proxy functions + // the selection of proxy functions depends on whether we're in a cothread or not pads = gst_element_get_pad_list (element); while (pads) { pad = GST_PAD (pads->data); + pads = g_list_next (pads); - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad)); - - // copy the peer's chain function, easy enough - DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad)); - pad->pushfunc = pad->peer->chainfunc; - - // need to walk through and check for outside connections -//FIXME need to do this for all pads - /* get the pad's peer */ - peer = gst_pad_get_peer (pad); - if (!peer) { - DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad)); - break; - } - /* get the parent of the peer of the pad */ - outside = GST_ELEMENT (gst_pad_get_parent (peer)); - if (!outside) break; - /* if it's a connection and it's not ours... */ - if (GST_IS_CONNECTION (outside) && - (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) { - gst_info("gstbin: element \"%s\" is the external source Connection " - "for internal element \"%s\"\n", - gst_element_get_name (GST_ELEMENT (outside)), - gst_element_get_name (GST_ELEMENT (element))); - bin->entries = g_list_prepend (bin->entries, outside); - bin->numentries++; - } - } - else { - DEBUG("found pad %s\n", gst_pad_get_name (pad)); - } - pads = g_list_next (pads); + // check to see if someone else gets to set up the element + peer_manager = GST_ELEMENT((pad)->peer->parent)->manager; + if (peer_manager != GST_ELEMENT(bin)) + DEBUG("WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad)); + // if the wrapper_function is set, we need to use the proxy functions + if (wrapper_function != NULL) { + // set up proxy functions + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); + } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { + DEBUG("setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); + } + } else { + // otherwise we need to set up for 'traditional' chaining + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + // we can just copy the chain function, since it shares the prototype + DEBUG("copying chain function into push proxy for %s:%s\n", + GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = pad->chainfunc; + } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { + // we can just copy the get function, since it shares the prototype + DEBUG("copying get function into pull proxy for %s:%s\n", + GST_DEBUG_PAD_NAME(pad)); + pad->pullfunc = pad->getfunc; + } + } } - elements = g_list_next (elements); + + // if a loopfunc has been specified, create and set up a cothread + if (wrapper_function != NULL) { + if (element->threadstate == NULL) { + element->threadstate = cothread_create (bin->threadcontext); + DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate, + &element->threadstate,gst_element_get_name(element)); + } + cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element); + DEBUG("set wrapper function for \"%s\" to &%s\n",gst_element_get_name(element), + GST_DEBUG_FUNCPTR_NAME(wrapper_function)); + } + +// // HACK: if the element isn't passive, it's an entry +// if (!GST_FLAG_IS_SET(element,GST_ELEMENT_SCHEDULE_PASSIVELY)) +// bin->entries = g_list_append(bin->entries, element); } + + // otherwise, cothreads are not needed + } else { + ; } DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); @@ -847,7 +900,7 @@ gst_bin_iterate_func (GstBin *bin) GstElement *entry; GList *pads; GstPad *pad; - _GstBinOutsideSchedule *sched; + GstBuffer *buf; DEBUG_SET_STRING("(\"%s\")", gst_element_get_name (GST_ELEMENT (bin))); DEBUG_ENTER_STRING; @@ -861,24 +914,16 @@ gst_bin_iterate_func (GstBin *bin) // FIXME this should be lots more intelligent about where to start DEBUG("starting iteration via cothreads\n"); - if (GST_IS_ELEMENT(bin->entries->data)) { - entry = GST_ELEMENT (bin->entries->data); - GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING); - DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n", - gst_element_get_name(entry),entry); - cothread_switch (entry->threadstate); - } else { - sched = (_GstBinOutsideSchedule *) (bin->entries->data); - sched->flags |= GST_ELEMENT_COTHREAD_STOPPING; - DEBUG("set COTHREAD STOPPING flag on sched for \"%s\"(@%p)\n", - gst_element_get_name(sched->element),sched->element); - cothread_switch (sched->threadstate); - } + entry = GST_ELEMENT (bin->managed_elements->data); + GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING); + DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n", + gst_element_get_name(entry),entry); + cothread_switch (entry->threadstate); } else { DEBUG("starting iteration via chain-functions\n"); - if (bin->numentries <= 0) { + if (bin->num_entries <= 0) { //printf("gstbin: no entries in bin \"%s\" trying children...\n", gst_element_get_name(GST_ELEMENT(bin))); // we will try loop over the elements then... entries = bin->children; @@ -900,7 +945,8 @@ gst_bin_iterate_func (GstBin *bin) if (pad->getfunc == NULL) fprintf(stderr, "error, no getfunc in \"%s\"\n", gst_element_get_name (entry)); else - (pad->getfunc)(pad); + buf = (pad->getfunc)(pad); + gst_pad_push(pad,buf); } pads = g_list_next (pads); } @@ -945,3 +991,101 @@ gst_bin_iterate_func (GstBin *bin) } } else { */ + + + + + +/* + } else if (GST_IS_SRC (element)) { + DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); + bin->entries = g_list_prepend (bin->entries,element); + bin->num_entries++; + cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element); + } + + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD(pads->data); + + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + // set the proxy functions + pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); + DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy); + } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { + DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + // set the proxy functions + pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); + DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n", + &pad->pullfunc,gst_bin_pullfunc_proxy); + pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy); + } + pads = g_list_next (pads); + } + elements = g_list_next (elements); + + // if there are no entries, we have to pick one at random + if (bin->num_entries == 0) + bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data)); + } + } else { + DEBUG("don't need cothreads, looking for entry points\n"); + // we have to find which elements will drive an iteration + elements = bin->children; + while (elements) { + element = GST_ELEMENT (elements->data); + DEBUG("found element \"%s\"\n", gst_element_get_name (element)); + if (GST_IS_BIN (element)) { + gst_bin_create_plan (GST_BIN (element)); + } + if (GST_IS_SRC (element)) { + DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); + bin->entries = g_list_prepend (bin->entries, element); + bin->num_entries++; + } + + // go through all the pads, set pointers, and check for connections + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD (pads->data); + + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad)); + + // copy the peer's chain function, easy enough + DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = GST_DEBUG_FUNCPTR(pad->peer->chainfunc); + + // need to walk through and check for outside connections +//FIXME need to do this for all pads + // get the pad's peer + peer = gst_pad_get_peer (pad); + if (!peer) { + DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad)); + break; + } + // get the parent of the peer of the pad + outside = GST_ELEMENT (gst_pad_get_parent (peer)); + if (!outside) break; + // if it's a connection and it's not ours... + if (GST_IS_CONNECTION (outside) && + (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) { + gst_info("gstbin: element \"%s\" is the external source Connection " + "for internal element \"%s\"\n", + gst_element_get_name (GST_ELEMENT (outside)), + gst_element_get_name (GST_ELEMENT (element))); + bin->entries = g_list_prepend (bin->entries, outside); + bin->num_entries++; + } + } + else { + DEBUG("found pad %s\n", gst_pad_get_name (pad)); + } + pads = g_list_next (pads); + + } + elements = g_list_next (elements); + } +*/ + diff --git a/gst/gstbin.h b/gst/gstbin.h index 1212d77f07..c34434a124 100644 --- a/gst/gstbin.h +++ b/gst/gstbin.h @@ -41,7 +41,16 @@ extern GstElementDetails gst_bin_details; #define GST_IS_BIN_CLASS(obj) \ (GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_BIN)) -#define GST_BIN_FLAG_LAST (GST_ELEMENT_FLAG_LAST + 2) +typedef enum { + // this bin is a manager of child elements, i.e. a pipeline or thread + GST_BIN_FLAG_MANAGER = GST_ELEMENT_FLAG_LAST, + + // we prefer to have cothreads when its an option, over chain-based + GST_BIN_FLAG_PREFER_COTHREADS, + + /* padding */ + GST_BIN_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 4, +} GstBinFlags; typedef struct _GstBin GstBin; typedef struct _GstBinClass GstBinClass; @@ -55,8 +64,10 @@ struct _GstBin { // iteration state gboolean need_cothreads; + GList *managed_elements; + gint num_managed_elements; GList *entries; - gint numentries; + gint num_entries; cothread_context *threadcontext; gboolean use_cothreads; @@ -66,29 +77,19 @@ struct _GstBin { struct _GstBinClass { GstElementClass parent_class; + /* signals */ void (*object_added) (GstObject *object, GstObject *child); /* change the state of elements of the given type */ gboolean (*change_state_type) (GstBin *bin, GstElementState state, GtkType type); - /* create a plan for the execution of the bin */ void (*create_plan) (GstBin *bin); - /* run a full iteration of operation */ void (*iterate) (GstBin *bin); }; -/* this struct is used for odd scheduling cases */ -typedef struct __GstBinOutsideSchedule { - guint32 flags; - GstElement *element; - GstBin *bin; - cothread_state *threadstate; - GSList *padlist; -} _GstBinOutsideSchedule; - GtkType gst_bin_get_type (void); @@ -96,22 +97,26 @@ GstElement* gst_bin_new (gchar *name); #define gst_bin_destroy(bin) gst_object_destroy(GST_OBJECT(bin)) /* add and remove elements from the bin */ -void gst_bin_add (GstBin *bin, GstElement *element); -void gst_bin_remove (GstBin *bin, GstElement *element); +void gst_bin_add (GstBin *bin, + GstElement *element); +void gst_bin_remove (GstBin *bin, + GstElement *element); /* retrieve a single element or the list of children */ -GstElement* gst_bin_get_by_name (GstBin *bin, gchar *name); +GstElement* gst_bin_get_by_name (GstBin *bin, + gchar *name); GList* gst_bin_get_list (GstBin *bin); void gst_bin_create_plan (GstBin *bin); gboolean gst_bin_set_state_type (GstBin *bin, - GstElementState state, - GtkType type); + GstElementState state, + GtkType type); void gst_bin_iterate (GstBin *bin); // hack FIXME -void gst_bin_use_cothreads (GstBin *bin, gboolean enabled); +void gst_bin_use_cothreads (GstBin *bin, + gboolean enabled); #ifdef __cplusplus } diff --git a/gst/gstelement.c b/gst/gstelement.c index f07cc73470..f68099ec46 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -109,6 +109,8 @@ gst_element_init (GstElement *element) element->current_state = GST_STATE_NULL; element->pending_state = -1; element->numpads = 0; + element->numsrcpads = 0; + element->numsinkpads = 0; element->pads = NULL; element->loopfunc = NULL; element->threadstate = NULL; @@ -151,6 +153,10 @@ gst_element_add_pad (GstElement *element, GstPad *pad) /* add it to the list */ element->pads = g_list_append (element->pads, pad); element->numpads++; + if (gst_pad_get_direction (pad) == GST_PAD_SRC) + element->numsrcpads++; + else + element->numsinkpads++; /* emit the NEW_PAD signal */ // g_print("emitting NEW_PAD signal, \"%s\"!\n",gst_pad_get_name(pad)); diff --git a/gst/gstelement.h b/gst/gstelement.h index 8e8bfe8b6e..468bf66227 100644 --- a/gst/gstelement.h +++ b/gst/gstelement.h @@ -75,18 +75,24 @@ static inline char *_gst_print_statename(int state) { (GTK_CHECK_CLASS_TYPE((klass),GST_TYPE_ELEMENT)) typedef enum { - GST_ELEMENT_MULTI_IN = GST_OBJECT_FLAG_LAST, + // element is complex (for some def.) and generally require a cothread + GST_ELEMENT_COMPLEX = GST_OBJECT_FLAG_LAST, + // not to be scheduled directly, let others trigger all events + GST_ELEMENT_SCHEDULE_PASSIVELY, + // this element should be placed in a thread if at all possible GST_ELEMENT_THREAD_SUGGESTED, + // this element is incable of seeking (FIXME: does this apply to filters?) GST_ELEMENT_NO_SEEK, + // there is a new loopfunction ready for placement GST_ELEMENT_NEW_LOOPFUNC, + // the cothread holding this element needs to be stopped GST_ELEMENT_COTHREAD_STOPPING, /* use some padding for future expansion */ GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 8, } GstElementFlags; -#define GST_ELEMENT_IS_MULTI_IN(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_MULTI_IN)) #define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED)) #define GST_ELEMENT_IS_COTHREAD_STOPPING(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_COTHREAD_STOPPING)) @@ -110,6 +116,8 @@ struct _GstElement { cothread_state *threadstate; guint16 numpads; + guint16 numsrcpads; + guint16 numsinkpads; GList *pads; GstElement *manager; diff --git a/gst/gstpad.h b/gst/gstpad.h index 0db874dfe2..b40c37e819 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -52,8 +52,8 @@ typedef struct _GstPadClass GstPadClass; * pad is the sink pad (so the same chain function can be used for N pads) * buf is the buffer being passed */ typedef void (*GstPadChainFunction) (GstPad *pad,GstBuffer *buf); -typedef void (*GstPadGetFunction) (GstPad *pad); -typedef void (*GstPadGetRegionFunction) (GstPad *pad, gulong offset, gulong size); +typedef GstBuffer *(*GstPadGetFunction) (GstPad *pad); +typedef GstBuffer *(*GstPadGetRegionFunction) (GstPad *pad, gulong offset, gulong size); typedef void (*GstPadQoSFunction) (GstPad *pad, glong qos_message); typedef void (*GstPadPushFunction) (GstPad *pad, GstBuffer *buf); diff --git a/gst/gstpipeline.c b/gst/gstpipeline.c index cc0163607d..45593d2250 100644 --- a/gst/gstpipeline.c +++ b/gst/gstpipeline.c @@ -94,6 +94,9 @@ gst_pipeline_class_init (GstPipelineClass *klass) static void gst_pipeline_init (GstPipeline *pipeline) { + // we're a manager by default + GST_FLAG_SET (pipeline, GST_BIN_FLAG_MANAGER); + pipeline->src = NULL; pipeline->sinks = NULL; } @@ -345,7 +348,7 @@ gst_pipeline_autoplug (GstPipeline *pipeline) base_factories = g_new0(GList *, numsinks); i = 0; - // fase 2, loop over all the sinks.. + // fase 2, loop over all the sinks.. while (elements) { GList *pads; GstPad *pad; diff --git a/gst/gstthread.c b/gst/gstthread.c index 7e8232df08..07172eec6b 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -116,6 +116,10 @@ gst_thread_class_init (GstThreadClass *klass) static void gst_thread_init (GstThread *thread) { + // we're a manager by default + GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER); + + // default is to create a thread GST_FLAG_SET (thread, GST_THREAD_CREATE); thread->lock = g_mutex_new(); diff --git a/plugins/elements/gstasyncdisksrc.c b/plugins/elements/gstasyncdisksrc.c index dd3f60d865..06961c083a 100644 --- a/plugins/elements/gstasyncdisksrc.c +++ b/plugins/elements/gstasyncdisksrc.c @@ -57,8 +57,8 @@ static void gst_asyncdisksrc_init (GstAsyncDiskSrc *asyncdisksrc); static void gst_asyncdisksrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_asyncdisksrc_get (GstPad *pad); -static void gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size); +static GstBuffer * gst_asyncdisksrc_get (GstPad *pad); +static GstBuffer * gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size); static GstElementStateReturn gst_asyncdisksrc_change_state (GstElement *element); @@ -209,35 +209,34 @@ gst_asyncdisksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) * * Push a new buffer from the asyncdisksrc at the current offset. */ -static void +static GstBuffer * gst_asyncdisksrc_get (GstPad *pad) { GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); - g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); - + g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL); + /* deal with EOF state */ if (src->curoffset >= src->size) { gst_src_signal_eos (GST_SRC (src)); - return; + return NULL; } /* create the buffer */ // FIXME: should eventually use a bufferpool for this buf = gst_buffer_new (); - - g_return_if_fail (buf != NULL); + + g_return_val_if_fail (buf != NULL, NULL); /* simply set the buffer to point to the correct region of the file */ GST_BUFFER_DATA (buf) = src->map + src->curoffset; GST_BUFFER_OFFSET (buf) = src->curoffset; GST_BUFFER_FLAG_SET (buf, GST_BUFFER_DONTFREE); - if ((src->curoffset + src->bytes_per_read) > - src->size) { + if ((src->curoffset + src->bytes_per_read) > src->size) { GST_BUFFER_SIZE (buf) = src->size - src->curoffset; // FIXME: set the buffer's EOF bit here } else @@ -250,8 +249,8 @@ gst_asyncdisksrc_get (GstPad *pad) src->new_seek = FALSE; } - /* we're done, push the buffer off now */ - gst_pad_push (pad, buf); + /* we're done, return the buffer */ + return buf; } /** @@ -262,29 +261,29 @@ gst_asyncdisksrc_get (GstPad *pad) * * Push a new buffer from the asyncdisksrc of given size at given offset. */ -static void +static GstBuffer * gst_asyncdisksrc_get_region (GstPad *pad, gulong offset, gulong size) { GstAsyncDiskSrc *src; GstBuffer *buf; - g_return_if_fail (pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_ASYNCDISKSRC (gst_pad_get_parent(pad)); - g_return_if_fail (GST_IS_ASYNCDISKSRC (src)); - g_return_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN)); + g_return_val_if_fail (GST_IS_ASYNCDISKSRC (src), NULL); + g_return_val_if_fail (GST_FLAG_IS_SET (src, GST_ASYNCDISKSRC_OPEN), NULL); /* deal with EOF state */ if (offset >= src->size) { gst_src_signal_eos (GST_SRC (src)); - return; + return NULL; } /* create the buffer */ // FIXME: should eventually use a bufferpool for this buf = gst_buffer_new (); - g_return_if_fail (buf); + g_return_val_if_fail (buf != NULL, NULL); /* simply set the buffer to point to the correct region of the file */ GST_BUFFER_DATA (buf) = src->map + offset; diff --git a/plugins/elements/gstaudiosrc.c b/plugins/elements/gstaudiosrc.c index 91a3a2bf18..1cae833421 100644 --- a/plugins/elements/gstaudiosrc.c +++ b/plugins/elements/gstaudiosrc.c @@ -64,7 +64,7 @@ static void gst_audiosrc_close_audio (GstAudioSrc *src); static gboolean gst_audiosrc_open_audio (GstAudioSrc *src); static void gst_audiosrc_sync_parms (GstAudioSrc *audiosrc); -static void gst_audiosrc_get (GstPad *pad); +static GstBuffer * gst_audiosrc_get (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_audiosrc_signals[LAST_SIGNAL] = { 0 }; @@ -140,27 +140,29 @@ gst_audiosrc_init (GstAudioSrc *audiosrc) audiosrc->seq = 0; } -void gst_audiosrc_get(GstPad *pad) { +static GstBuffer * +gst_audiosrc_get (GstPad *pad) +{ GstAudioSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_AUDIOSRC(gst_pad_get_parent(pad)); // g_print("attempting to read something from soundcard\n"); buf = gst_buffer_new (); - g_return_if_fail (buf); + g_return_val_if_fail (buf, NULL); GST_BUFFER_DATA (buf) = (gpointer)g_malloc (src->bytes_per_read); - + readbytes = read (src->fd,GST_BUFFER_DATA (buf), src->bytes_per_read); if (readbytes == 0) { gst_src_signal_eos (GST_SRC (src)); - return; + return NULL; } GST_BUFFER_SIZE (buf) = readbytes; @@ -170,8 +172,8 @@ void gst_audiosrc_get(GstPad *pad) { // gst_buffer_add_meta(buf,GST_META(newmeta)); - gst_pad_push (pad,buf); // g_print("pushed buffer from soundcard of %d bytes\n",readbytes); + return buf; } static void diff --git a/plugins/elements/gstdisksrc.c b/plugins/elements/gstdisksrc.c index ea49fd769d..7df94eb4f7 100644 --- a/plugins/elements/gstdisksrc.c +++ b/plugins/elements/gstdisksrc.c @@ -58,7 +58,7 @@ static void gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_disksrc_close_file (GstDiskSrc *src); -static void gst_disksrc_get (GstPad *pad); +static GstBuffer * gst_disksrc_get (GstPad *pad); static GstElementStateReturn gst_disksrc_change_state (GstElement *element); @@ -199,7 +199,7 @@ gst_disksrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } } -static void +static GstBuffer * gst_disksrc_get (GstPad *pad) { GstDiskSrc *src; @@ -250,8 +250,8 @@ gst_disksrc_get (GstPad *pad) DEBUG("pushing %d bytes with offset %d\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf)); /* we're done, push the buffer off now */ - gst_pad_push (pad, buf); - DEBUG("pushing %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf)); + DEBUG("returning %d bytes with offset %d done\n", GST_BUFFER_SIZE(buf), GST_BUFFER_OFFSET (buf)); + return buf; } diff --git a/plugins/elements/gstfakesrc.c b/plugins/elements/gstfakesrc.c index 306460f34e..c5c28c1b52 100644 --- a/plugins/elements/gstfakesrc.c +++ b/plugins/elements/gstfakesrc.c @@ -39,14 +39,17 @@ enum { enum { ARG_0, - /* FILL ME */ + ARG_NUM_SOURCES, }; -static void gst_fakesrc_class_init (GstFakeSrcClass *klass); -static void gst_fakesrc_init (GstFakeSrc *fakesrc); +static void gst_fakesrc_class_init (GstFakeSrcClass *klass); +static void gst_fakesrc_init (GstFakeSrc *fakesrc); -static void gst_fakesrc_get (GstPad *pad); +static void gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id); + +static GstBuffer * gst_fakesrc_get (GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_fakesrc_signals[LAST_SIGNAL] = { 0 }; @@ -75,44 +78,95 @@ gst_fakesrc_get_type (void) static void gst_fakesrc_class_init (GstFakeSrcClass *klass) { + GtkObjectClass *gtkobject_class; GstSrcClass *gstsrc_class; + gtkobject_class = (GtkObjectClass*)klass; gstsrc_class = (GstSrcClass*)klass; parent_class = gtk_type_class (GST_TYPE_SRC); + + gtk_object_add_arg_type ("GstFakeSrc::num_sources", GTK_TYPE_INT, + GTK_ARG_READWRITE, ARG_NUM_SOURCES); + + gtkobject_class->set_arg = gst_fakesrc_set_arg; + gtkobject_class->get_arg = gst_fakesrc_get_arg; } static void gst_fakesrc_init(GstFakeSrc *fakesrc) { - // create our output pad - fakesrc->srcpad = gst_pad_new("src",GST_PAD_SRC); - gst_pad_set_get_function(fakesrc->srcpad,gst_fakesrc_get); - gst_element_add_pad(GST_ELEMENT(fakesrc),fakesrc->srcpad); + GstPad *pad; + + // set the default number of + fakesrc->numsrcpads = 1; + + // create our first output pad + pad = gst_pad_new("src",GST_PAD_SRC); + gst_pad_set_get_function(pad,gst_fakesrc_get); + gst_element_add_pad(GST_ELEMENT(fakesrc),pad); + fakesrc->srcpads = g_slist_append(NULL,pad); // we're ready right away, since we don't have any args... // gst_element_set_state(GST_ELEMENT(fakesrc),GST_STATE_READY); } -/** - * gst_fakesrc_new: - * @name: then name of the fakse source - * - * create a new fakesrc - * - * Returns: The new element. - */ -GstElement *gst_fakesrc_new(gchar *name) { - GstElement *fakesrc = GST_ELEMENT(gtk_type_new(GST_TYPE_FAKESRC)); - gst_element_set_name(GST_ELEMENT(fakesrc),name); - return fakesrc; +static void +gst_fakesrc_set_arg (GtkObject *object, GtkArg *arg, guint id) +{ + GstFakeSrc *src; + gint new_numsrcs; + GstPad *pad; + + /* it's not null if we got it, but it might not be ours */ + src = GST_FAKESRC (object); + + switch(id) { + case ARG_NUM_SOURCES: + new_numsrcs = GTK_VALUE_INT (*arg); + if (new_numsrcs > src->numsrcpads) { + while (src->numsrcpads != new_numsrcs) { + pad = gst_pad_new(g_strdup_printf("src%d",src->numsrcpads),GST_PAD_SRC); + gst_pad_set_get_function(pad,gst_fakesrc_get); + gst_element_add_pad(GST_ELEMENT(src),pad); + src->srcpads = g_slist_append(src->srcpads,pad); + src->numsrcpads; + } + } + break; + default: + break; + } } +static void +gst_fakesrc_get_arg (GtkObject *object, GtkArg *arg, guint id) +{ + GstFakeSrc *src; + + /* it's not null if we got it, but it might not be ours */ + g_return_if_fail (GST_IS_FAKESRC (object)); + + src = GST_FAKESRC (object); + + switch (id) { + case ARG_NUM_SOURCES: + GTK_VALUE_INT (*arg) = src->numsrcpads; + break; + default: + arg->type = GTK_TYPE_INVALID; + break; + } +} + + /** * gst_fakesrc_get: * @src: the faksesrc to get * * generate an empty buffer and push it to the next element. */ -void gst_fakesrc_get(GstPad *pad) { +static GstBuffer * +gst_fakesrc_get(GstPad *pad) +{ GstFakeSrc *src; GstBuffer *buf; @@ -122,5 +176,5 @@ void gst_fakesrc_get(GstPad *pad) { g_print("(%s:%s)> ",GST_DEBUG_PAD_NAME(pad)); buf = gst_buffer_new(); - gst_pad_push(pad,buf); + return buf; } diff --git a/plugins/elements/gstfakesrc.h b/plugins/elements/gstfakesrc.h index 242c258133..bbda3fda45 100644 --- a/plugins/elements/gstfakesrc.h +++ b/plugins/elements/gstfakesrc.h @@ -51,7 +51,8 @@ typedef struct _GstFakeSrcClass GstFakeSrcClass; struct _GstFakeSrc { GstSrc src; - GstPad *srcpad; + gint numsrcpads; + GSList *srcpads; }; struct _GstFakeSrcClass { diff --git a/plugins/elements/gstfdsrc.c b/plugins/elements/gstfdsrc.c index dfb2447997..e3df978262 100644 --- a/plugins/elements/gstfdsrc.c +++ b/plugins/elements/gstfdsrc.c @@ -51,13 +51,13 @@ enum { }; -static void gst_fdsrc_class_init (GstFdSrcClass *klass); -static void gst_fdsrc_init (GstFdSrc *fdsrc); +static void gst_fdsrc_class_init (GstFdSrcClass *klass); +static void gst_fdsrc_init (GstFdSrc *fdsrc); -static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_fdsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); +static void gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); -static void gst_fdsrc_get (GstPad *pad); +static GstBuffer * gst_fdsrc_get (GstPad *pad); static GstSrcClass *parent_class = NULL; @@ -175,28 +175,30 @@ gst_fdsrc_get_arg (GtkObject *object, GtkArg *arg, guint id) } } -void gst_fdsrc_get(GstPad *pad) { +static GstBuffer * +gst_fdsrc_get(GstPad *pad) +{ GstFdSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_FDSRC(gst_pad_get_parent(pad)); /* create the buffer */ // FIXME: should eventually use a bufferpool for this buf = gst_buffer_new (); - g_return_if_fail (buf); + g_return_val_if_fail (buf, NULL); /* allocate the space for the buffer data */ GST_BUFFER_DATA(buf) = g_malloc(src->bytes_per_read); - g_return_if_fail(GST_BUFFER_DATA(buf) != NULL); + g_return_val_if_fail(GST_BUFFER_DATA(buf) != NULL, NULL); /* read it in from the file */ readbytes = read(src->fd,GST_BUFFER_DATA(buf),src->bytes_per_read); if (readbytes == 0) { gst_src_signal_eos(GST_SRC(src)); - return; + return NULL; } /* if we didn't get as many bytes as we asked for, we're at EOF */ @@ -208,6 +210,6 @@ void gst_fdsrc_get(GstPad *pad) { GST_BUFFER_SIZE(buf) = readbytes; src->curoffset += readbytes; - /* we're done, push the buffer off now */ - gst_pad_push(pad,buf); + /* we're done, return the buffer */ + return buf; } diff --git a/plugins/elements/gsthttpsrc.c b/plugins/elements/gsthttpsrc.c index 15eb2a9efc..0559e39f55 100644 --- a/plugins/elements/gsthttpsrc.c +++ b/plugins/elements/gsthttpsrc.c @@ -55,7 +55,7 @@ static void gst_httpsrc_set_arg (GtkObject *object, GtkArg *arg, guint id); static void gst_httpsrc_get_arg (GtkObject *object, GtkArg *arg, guint id); static GstElementStateReturn gst_httpsrc_change_state (GstElement *element); -static void gst_httpsrc_get (GstPad *pad); +static GstBuffer * gst_httpsrc_get (GstPad *pad); static gboolean gst_httpsrc_open_url (GstHttpSrc *src); static void gst_httpsrc_close_url (GstHttpSrc *src); @@ -122,12 +122,14 @@ static void gst_httpsrc_init(GstHttpSrc *httpsrc) { httpsrc->bytes_per_read = 4096; } -static void gst_httpsrc_get(GstPad *pad) { +static GstBuffer * +gst_httpsrc_get(GstPad *pad) +{ GstHttpSrc *src; GstBuffer *buf; glong readbytes; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_HTTPSRC(gst_pad_get_parent(pad)); buf = gst_buffer_new(); @@ -136,7 +138,7 @@ static void gst_httpsrc_get(GstPad *pad) { if (readbytes == 0) { gst_src_signal_eos(GST_SRC(src)); - return; + return NULL; } if (readbytes < src->bytes_per_read) { @@ -146,7 +148,7 @@ static void gst_httpsrc_get(GstPad *pad) { GST_BUFFER_SIZE(buf) = readbytes; src->curoffset += readbytes; - gst_pad_push(pad,buf); + return buf; } static gboolean diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 2f29e4b90c..e9c7a67486 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -116,12 +116,14 @@ gst_queue_class_init (GstQueueClass *klass) static void gst_queue_init (GstQueue *queue) { + GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY); + queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain); + gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); queue->srcpad = gst_pad_new ("src", GST_PAD_SRC); - gst_pad_set_get_function (queue->srcpad, gst_queue_get); + gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); queue->queue = NULL; @@ -173,19 +175,19 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) /* we have to lock the queue since we span threads */ - DEBUG("queue: try have queue lock\n"); +// DEBUG("queue: try have queue lock\n"); GST_LOCK (queue); - DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ()); - DEBUG("queue: have queue lock\n"); +// DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self ()); +// DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLUSH)) { gst_queue_flush (queue); } - DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); +// DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); while (queue->level_buffers >= queue->max_buffers) { - DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); +// DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); GST_UNLOCK (queue); g_mutex_lock (queue->fulllock); @@ -193,26 +195,27 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) g_mutex_unlock (queue->fulllock); GST_LOCK (queue); STATUS("%s: O+\n"); - DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); +// DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } /* put the buffer on the tail of the list */ queue->queue = g_slist_append (queue->queue, buf); - STATUS("%s: +\n"); +// STATUS("%s: +\n"); + g_print("(%s:%s)+ ",GST_DEBUG_PAD_NAME(pad)); /* if we were empty, but aren't any more, signal a condition */ tosignal = (queue->level_buffers >= 0); queue->level_buffers++; /* we can unlock now */ - DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); +// DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); GST_UNLOCK (queue); if (tosignal) { g_mutex_lock (queue->emptylock); - STATUS("%s: >\n"); +// STATUS("%s: >\n"); g_cond_signal (queue->emptycond); - STATUS("%s: >>\n"); +// STATUS("%s: >>\n"); g_mutex_unlock (queue->emptylock); } } @@ -229,19 +232,19 @@ gst_queue_get (GstPad *pad) name = gst_element_get_name (GST_ELEMENT (queue)); /* have to lock for thread-safety */ - DEBUG("queue: %s try have queue lock\n", name); +// DEBUG("queue: %s try have queue lock\n", name); GST_LOCK (queue); - DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); - DEBUG("queue: %s have queue lock\n", name); +// DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); +// DEBUG("queue: %s have queue lock\n", name); while (!queue->level_buffers) { - STATUS("queue: %s U released lock\n"); - GST_UNLOCK (queue); +// STATUS("queue: %s U released lock\n"); +// GST_UNLOCK (queue); g_mutex_lock (queue->emptylock); g_cond_wait (queue->emptycond, queue->emptylock); g_mutex_unlock (queue->emptylock); GST_LOCK (queue); - STATUS("queue: %s U- getting lock\n"); +// STATUS("queue: %s U- getting lock\n"); } front = queue->queue; @@ -250,21 +253,22 @@ gst_queue_get (GstPad *pad) g_slist_free (front); queue->level_buffers--; - STATUS("%s: -\n"); +// STATUS("%s: -\n"); + g_print("(%s:%s)- ",GST_DEBUG_PAD_NAME(pad)); tosignal = queue->level_buffers < queue->max_buffers; GST_UNLOCK(queue); if (tosignal) { g_mutex_lock (queue->fulllock); - STATUS("%s: < \n"); +// STATUS("%s: < \n"); g_cond_signal (queue->fullcond); - STATUS("%s: << \n"); +// STATUS("%s: << \n"); g_mutex_unlock (queue->fulllock); } - DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); +// DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); gst_pad_push (queue->srcpad, buf); - DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); +// DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); /* unlock now */ } diff --git a/plugins/elements/gstsinesrc.c b/plugins/elements/gstsinesrc.c index e15c6c2bee..5527847dd9 100644 --- a/plugins/elements/gstsinesrc.c +++ b/plugins/elements/gstsinesrc.c @@ -62,7 +62,7 @@ static void gst_sinesrc_get_arg(GtkObject *object,GtkArg *arg,guint id); //static gboolean gst_sinesrc_open_audio(GstSineSrc *src); void gst_sinesrc_sync_parms(GstSineSrc *sinesrc); -void gst_sinesrc_get(GstPad *pad); +static GstBuffer * gst_sinesrc_get(GstPad *pad); static GstSrcClass *parent_class = NULL; //static guint gst_sinesrc_signals[LAST_SIGNAL] = { 0 }; @@ -142,7 +142,9 @@ GstElement *gst_sinesrc_new_with_fd(gchar *name,gchar *filename) { return sinesrc; } -void gst_sinesrc_get(GstPad *pad) { +static GstBuffer * +gst_sinesrc_get(GstPad *pad) +{ GstSineSrc *src; GstBuffer *buf; gint16 *samples; @@ -150,11 +152,11 @@ void gst_sinesrc_get(GstPad *pad) { gint volume; gdouble val; - g_return_if_fail(pad != NULL); + g_return_val_if_fail (pad != NULL, NULL); src = GST_SINESRC(gst_pad_get_parent(pad)); buf = gst_buffer_new(); - g_return_if_fail(buf); + g_return_val_if_fail (buf, NULL); GST_BUFFER_DATA(buf) = (gpointer)malloc(4096); samples = (gint16*)GST_BUFFER_DATA(buf); GST_BUFFER_SIZE(buf) = 4096; @@ -173,8 +175,8 @@ void gst_sinesrc_get(GstPad *pad) { src->sentmeta = TRUE; } - gst_pad_push(pad,buf); g_print(">"); + return buf; } static void gst_sinesrc_set_arg(GtkObject *object,GtkArg *arg,guint id) {