From e5ab7f33acc92ebbbbe95b539227d7e8c2943c68 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 22 Sep 2000 23:35:14 +0000 Subject: [PATCH] A first attempt to fix the queues in a cothreaded pipeline. Original commit message from CVS: A first attempt to fix the queues in a cothreaded pipeline. Some fixes to the thread handling. Fix a bug in gstreamer-config : gthread was not included. gst_bin_create_plan() is now done in the READY state. a bin with only another bin in it will now work with gst_bin_iterate. Added some examples for the queues. --- examples/helloworld/helloworld.c | 3 - examples/helloworld2/helloworld2.c | 3 - examples/queue/.gitignore | 2 + examples/queue/queue.c | 84 +++++++++++++ examples/queue2/.gitignore | 2 + examples/queue2/queue2.c | 81 ++++++++++++ examples/queue3/.gitignore | 2 + examples/queue3/queue3.c | 85 +++++++++++++ examples/thread/thread.c | 4 +- gst/cothreads.c | 20 +-- gst/cothreads.h | 3 +- gst/elements/Makefile.am | 2 +- gst/elements/gstqueue.c | 42 ++++--- gst/gstbin.c | 125 +++++++++++++------ gst/gstpad.c | 11 +- gst/gstpipeline.c | 20 ++- gst/gstthread.c | 15 ++- gstplay/Makefile.am | 8 +- gstplay/avi.c | 2 + gstplay/gstplay.c | 18 +-- gstplay/mpeg1.c | 2 + gstplay/mpeg2.c | 2 + gstreamer-config.in | 6 +- plugins/elements/Makefile.am | 2 +- plugins/elements/gstqueue.c | 42 ++++--- test/mp2tomp1.c | 2 +- tests/old/examples/helloworld/helloworld.c | 3 - tests/old/examples/helloworld2/helloworld2.c | 3 - tests/old/examples/queue/.gitignore | 2 + tests/old/examples/queue/queue.c | 84 +++++++++++++ tests/old/examples/queue2/.gitignore | 2 + tests/old/examples/queue2/queue2.c | 81 ++++++++++++ tests/old/examples/queue3/.gitignore | 2 + tests/old/examples/queue3/queue3.c | 85 +++++++++++++ tests/old/examples/thread/thread.c | 4 +- 35 files changed, 719 insertions(+), 135 deletions(-) create mode 100644 examples/queue/.gitignore create mode 100644 examples/queue/queue.c create mode 100644 examples/queue2/.gitignore create mode 100644 examples/queue2/queue2.c create mode 100644 examples/queue3/.gitignore create mode 100644 examples/queue3/queue3.c create mode 100644 tests/old/examples/queue/.gitignore create mode 100644 tests/old/examples/queue/queue.c create mode 100644 tests/old/examples/queue2/.gitignore create mode 100644 tests/old/examples/queue2/queue2.c create mode 100644 tests/old/examples/queue3/.gitignore create mode 100644 tests/old/examples/queue3/queue3.c diff --git a/examples/helloworld/helloworld.c b/examples/helloworld/helloworld.c index 2a3a4867b3..3527cc0b48 100644 --- a/examples/helloworld/helloworld.c +++ b/examples/helloworld/helloworld.c @@ -50,9 +50,6 @@ int main(int argc,char *argv[]) gst_pad_connect(gst_element_get_pad(decoder,"src"), gst_element_get_pad(audiosink,"sink")); - /* find out how to handle this bin */ - gst_bin_create_plan(GST_BIN(bin)); - /* make it ready */ gst_element_set_state(bin, GST_STATE_READY); /* start playing */ diff --git a/examples/helloworld2/helloworld2.c b/examples/helloworld2/helloworld2.c index f687ef6b5f..fab7b49138 100644 --- a/examples/helloworld2/helloworld2.c +++ b/examples/helloworld2/helloworld2.c @@ -46,9 +46,6 @@ int main(int argc,char *argv[]) exit(-1); } - /* find out how to handle this bin */ - gst_bin_create_plan(GST_BIN(pipeline)); - /* make it ready */ gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY); /* start playing */ diff --git a/examples/queue/.gitignore b/examples/queue/.gitignore new file mode 100644 index 0000000000..b7dd5275b2 --- /dev/null +++ b/examples/queue/.gitignore @@ -0,0 +1,2 @@ +Makefile +queue diff --git a/examples/queue/queue.c b/examples/queue/queue.c new file mode 100644 index 0000000000..9dc42d1f17 --- /dev/null +++ b/examples/queue/queue.c @@ -0,0 +1,84 @@ +#include + +gboolean playing; + +/* eos will be called when the src element has an end of stream */ +void eos(GstSrc *src, gpointer data) +{ + g_print("have eos, quitting\n"); + + playing = FALSE; +} + +int main(int argc,char *argv[]) +{ + GstElement *disksrc, *audiosink, *parse, *decode, *queue; + GstElement *bin; + GstElement *thread; + + if (argc != 2) { + g_print("usage: %s \n", argv[0]); + exit(-1); + } + + gst_init(&argc,&argv); + + /* create a new thread to hold the elements */ + thread = gst_thread_new("thread"); + g_assert(thread != NULL); + + /* create a new bin to hold the elements */ + bin = gst_bin_new("bin"); + g_assert(bin != NULL); + + /* create a disk reader */ + disksrc = gst_elementfactory_make("disksrc", "disk_source"); + g_assert(disksrc != NULL); + gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL); + gtk_signal_connect(GTK_OBJECT(disksrc),"eos", + GTK_SIGNAL_FUNC(eos), thread); + + parse = gst_elementfactory_make("mp3parse", "parse"); + decode = gst_elementfactory_make("mpg123", "decode"); + + queue = gst_elementfactory_make("queue", "queue"); + + /* and an audio sink */ + audiosink = gst_elementfactory_make("audiosink", "play_audio"); + g_assert(audiosink != NULL); + + /* add objects to the main pipeline */ + gst_bin_add(GST_BIN(bin), disksrc); + gst_bin_add(GST_BIN(bin), parse); + gst_bin_add(GST_BIN(bin), decode); + gst_bin_add(GST_BIN(bin), queue); + + gst_bin_add(GST_BIN(thread), audiosink); + + gst_bin_add(GST_BIN(bin), thread); + + gst_pad_connect(gst_element_get_pad(disksrc,"src"), + gst_element_get_pad(parse,"sink")); + gst_pad_connect(gst_element_get_pad(parse,"src"), + gst_element_get_pad(decode,"sink")); + gst_pad_connect(gst_element_get_pad(decode,"src"), + gst_element_get_pad(queue,"sink")); + gst_pad_connect(gst_element_get_pad(queue,"src"), + gst_element_get_pad(audiosink,"sink")); + + /* make it ready */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY); + /* start playing */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING); + + playing = TRUE; + + while (playing) { + gst_bin_iterate(GST_BIN(bin)); + } + + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL); + + exit(0); +} + diff --git a/examples/queue2/.gitignore b/examples/queue2/.gitignore new file mode 100644 index 0000000000..ca0136adcb --- /dev/null +++ b/examples/queue2/.gitignore @@ -0,0 +1,2 @@ +Makefile +queue2 diff --git a/examples/queue2/queue2.c b/examples/queue2/queue2.c new file mode 100644 index 0000000000..845f491f89 --- /dev/null +++ b/examples/queue2/queue2.c @@ -0,0 +1,81 @@ +#include + +gboolean playing; + +/* eos will be called when the src element has an end of stream */ +void eos(GstSrc *src, gpointer data) +{ + g_print("have eos, quitting\n"); + + playing = FALSE; +} + +int main(int argc,char *argv[]) +{ + GstElement *disksrc, *audiosink, *queue; + GstElement *pipeline; + GstElement *thread; + + if (argc != 2) { + g_print("usage: %s \n", argv[0]); + exit(-1); + } + + gst_init(&argc,&argv); + + /* create a new thread to hold the elements */ + thread = gst_thread_new("thread"); + g_assert(thread != NULL); + + /* create a new bin to hold the elements */ + pipeline = gst_pipeline_new("pipeline"); + g_assert(pipeline != NULL); + + /* create a disk reader */ + disksrc = gst_elementfactory_make("disksrc", "disk_source"); + g_assert(disksrc != NULL); + gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL); + gtk_signal_connect(GTK_OBJECT(disksrc),"eos", + GTK_SIGNAL_FUNC(eos), thread); + + queue = gst_elementfactory_make("queue", "queue"); + + /* and an audio sink */ + audiosink = gst_elementfactory_make("audiosink", "play_audio"); + g_assert(audiosink != NULL); + + /* add objects to the main pipeline */ + gst_pipeline_add_src(GST_PIPELINE(pipeline), disksrc); + gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue); + + gst_bin_add(GST_BIN(thread), audiosink); + + gst_pad_connect(gst_element_get_pad(queue,"src"), + gst_element_get_pad(audiosink,"sink")); + + gst_pad_set_type_id(gst_element_get_pad(queue, "sink"), + gst_pad_get_type_id(gst_element_get_pad(audiosink, "sink"))); + + if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) { + g_print("cannot autoplug pipeline\n"); + exit(-1); + } + + gst_bin_add(GST_BIN(pipeline), thread); + + /* make it ready */ + gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY); + /* start playing */ + gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING); + + playing = TRUE; + + while (playing) { + gst_bin_iterate(GST_BIN(pipeline)); + } + + gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL); + + exit(0); +} + diff --git a/examples/queue3/.gitignore b/examples/queue3/.gitignore new file mode 100644 index 0000000000..b89de200c9 --- /dev/null +++ b/examples/queue3/.gitignore @@ -0,0 +1,2 @@ +Makefile +queue3 diff --git a/examples/queue3/queue3.c b/examples/queue3/queue3.c new file mode 100644 index 0000000000..fa79a78d19 --- /dev/null +++ b/examples/queue3/queue3.c @@ -0,0 +1,85 @@ +#include + +gboolean playing; + +/* eos will be called when the src element has an end of stream */ +void eos(GstSrc *src, gpointer data) +{ + g_print("have eos, quitting\n"); + + playing = FALSE; +} + +int main(int argc,char *argv[]) +{ + GstElement *disksrc, *audiosink, *queue, *parse, *decode; + GstElement *bin; + GstElement *thread; + + if (argc != 2) { + g_print("usage: %s \n", argv[0]); + exit(-1); + } + + gst_init(&argc,&argv); + + /* create a new thread to hold the elements */ + thread = gst_thread_new("thread"); + g_assert(thread != NULL); + + /* create a new bin to hold the elements */ + bin = gst_bin_new("bin"); + g_assert(bin != NULL); + + /* create a disk reader */ + disksrc = gst_elementfactory_make("disksrc", "disk_source"); + g_assert(disksrc != NULL); + gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL); + gtk_signal_connect(GTK_OBJECT(disksrc),"eos", + GTK_SIGNAL_FUNC(eos), thread); + + queue = gst_elementfactory_make("queue", "queue"); + + /* and an audio sink */ + audiosink = gst_elementfactory_make("audiosink", "play_audio"); + g_assert(audiosink != NULL); + + parse = gst_elementfactory_make("mp3parse", "parse"); + decode = gst_elementfactory_make("mpg123", "decode"); + + /* add objects to the main bin */ + gst_bin_add(GST_BIN(bin), disksrc); + gst_bin_add(GST_BIN(bin), queue); + + gst_bin_add(GST_BIN(thread), parse); + gst_bin_add(GST_BIN(thread), decode); + gst_bin_add(GST_BIN(thread), audiosink); + + gst_pad_connect(gst_element_get_pad(disksrc,"src"), + gst_element_get_pad(queue,"sink")); + + gst_pad_connect(gst_element_get_pad(queue,"src"), + gst_element_get_pad(parse,"sink")); + gst_pad_connect(gst_element_get_pad(parse,"src"), + gst_element_get_pad(decode,"sink")); + gst_pad_connect(gst_element_get_pad(decode,"src"), + gst_element_get_pad(audiosink,"sink")); + + gst_bin_add(GST_BIN(bin), thread); + + /* make it ready */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY); + /* start playing */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING); + + playing = TRUE; + + while (playing) { + gst_bin_iterate(GST_BIN(bin)); + } + + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL); + + exit(0); +} + diff --git a/examples/thread/thread.c b/examples/thread/thread.c index 4d8073db73..4cd7721dbb 100644 --- a/examples/thread/thread.c +++ b/examples/thread/thread.c @@ -53,9 +53,9 @@ int main(int argc,char *argv[]) exit(-1); } - gst_bin_remove(GST_BIN(pipeline), disksrc); + //gst_bin_remove(GST_BIN(pipeline), disksrc); - gst_bin_add(GST_BIN(thread), disksrc); + //gst_bin_add(GST_BIN(thread), disksrc); gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline)); /* make it ready */ diff --git a/gst/cothreads.c b/gst/cothreads.c index 32062fe0eb..9ca1242d83 100644 --- a/gst/cothreads.c +++ b/gst/cothreads.c @@ -41,6 +41,7 @@ cothread_state *cothread_create(cothread_context *ctx) { s->threadnum = ctx->nthreads; s->flags = 0; s->sp = ((int *)s + COTHREAD_STACKSIZE); + s->top_sp = s->sp; ctx->threads[ctx->nthreads++] = s; @@ -103,6 +104,7 @@ void cothread_stub() { thread->func(thread->argc,thread->argv); thread->flags &= ~COTHREAD_STARTED; thread->pc = 0; + thread->sp = thread->top_sp; DEBUG("cothread: cothread_stub() exit\n"); //printf("uh, yeah, we shouldn't be here, but we should deal anyway\n"); } @@ -113,8 +115,10 @@ void cothread_switch(cothread_state *thread) { int enter; // int i; - if (thread == NULL) + if (thread == NULL) { + g_print("cothread: there's no thread, strange...\n"); return; + } ctx = thread->ctx; @@ -124,12 +128,10 @@ void cothread_switch(cothread_state *thread) { exit(2); } - /* if (current == thread) { g_print("cothread: trying to switch to same thread, legal but not necessary\n"); - //return; + return; } - */ // find the number of the thread to switch to ctx->current = thread->threadnum; @@ -137,11 +139,14 @@ void cothread_switch(cothread_state *thread) { /* save the current stack pointer, frame pointer, and pc */ GET_SP(current->sp); - enter = setjmp(current->jmp); - DEBUG("cothread: after thread #%d %d\n",ctx->current, enter); + enter = sigsetjmp(current->jmp, 1); if (enter != 0) { + DEBUG("cothread: enter thread #%d %d %p<->%p (%d)\n",current->threadnum, enter, + current->sp, current->top_sp, current->top_sp-current->sp); return; } + DEBUG("cothread: exit thread #%d %d %p<->%p (%d)\n",current->threadnum, enter, + current->sp, current->top_sp, current->top_sp-current->sp); enter = 1; DEBUG("cothread: set stack to %p\n", thread->sp); @@ -150,7 +155,7 @@ void cothread_switch(cothread_state *thread) { DEBUG("cothread: in thread \n"); SET_SP(thread->sp); // switch to it - longjmp(thread->jmp,1); + siglongjmp(thread->jmp,1); } else { SETUP_STACK(thread->sp); SET_SP(thread->sp); @@ -158,5 +163,6 @@ void cothread_switch(cothread_state *thread) { //JUMP(cothread_stub); cothread_stub(); DEBUG("cothread: exit thread \n"); + ctx->current = 0; } } diff --git a/gst/cothreads.h b/gst/cothreads.h index 63fef42901..3a40dd6e6a 100644 --- a/gst/cothreads.h +++ b/gst/cothreads.h @@ -29,8 +29,9 @@ struct _cothread_state { int flags; int *sp; + int *top_sp; int *pc; - jmp_buf jmp; + sigjmp_buf jmp; }; struct _cothread_context { diff --git a/gst/elements/Makefile.am b/gst/elements/Makefile.am index 420b551ef9..bf57b271ca 100644 --- a/gst/elements/Makefile.am +++ b/gst/elements/Makefile.am @@ -42,7 +42,7 @@ noinst_HEADERS = \ gsttypefind.h \ gstsinesrc.h -CFLAGS += -O2 -Wall +CFLAGS += -O2 -Wall libgstelements_la_LIBADD = $(GLIB_LIBS) $(GTK_LIBS) $(GHTTP_LIBS) libgstelements_la_LDFLAGS = -version-info $(STREAMER_CURRENT):$(STREAMER_REVISION):$(STREAMER_AGE) diff --git a/gst/elements/gstqueue.c b/gst/elements/gstqueue.c index 5352ded75e..b9af36de44 100644 --- a/gst/elements/gstqueue.c +++ b/gst/elements/gstqueue.c @@ -27,6 +27,7 @@ #include +#include GstElementDetails gst_queue_details = { "Queue", @@ -105,6 +106,7 @@ static void gst_queue_class_init(GstQueueClass *klass) { static void gst_queue_init(GstQueue *queue) { queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); + gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); queue->srcpad = gst_pad_new("src",GST_PAD_SRC); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); @@ -150,9 +152,10 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { /* we have to lock the queue since we span threads */ - DEBUG("queue: %s adding buffer %p\n", name, buf); + DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self()); GST_LOCK(queue); + DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); @@ -164,18 +167,17 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); - if (queue->level_buffers >= queue->max_buffers) { + g_mutex_lock(queue->fulllock); + while (queue->level_buffers >= queue->max_buffers) { DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); - while (queue->level_buffers >= queue->max_buffers) { - GST_UNLOCK(queue); - g_mutex_lock(queue->fulllock); - STATUS("%s: O\n"); - g_cond_wait(queue->fullcond,queue->fulllock); - g_mutex_unlock(queue->fulllock); - GST_LOCK(queue); - } + STATUS("%s: O\n"); + GST_UNLOCK(queue); + g_cond_wait(queue->fullcond,queue->fulllock); + GST_LOCK(queue); + STATUS("%s: O+\n"); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } + g_mutex_unlock(queue->fulllock); /* put the buffer on the head of the list */ @@ -192,18 +194,19 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { STATUS("%s: +\n"); /* if we were empty, but aren't any more, signal a condition */ - tosignal = (queue->level_buffers <= 0); + tosignal = (queue->level_buffers >= 0); queue->level_buffers++; /* we can unlock now */ - DEBUG("queue: %s chain %d end\n", name, queue->level_buffers); + DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); GST_UNLOCK(queue); if (tosignal) { g_mutex_lock(queue->emptylock); + STATUS("%s: >\n"); g_cond_signal(queue->emptycond); + STATUS("%s: >>\n"); g_mutex_unlock(queue->emptylock); - //g_print(">"); } } @@ -216,17 +219,20 @@ void gst_queue_push(GstConnection *connection) { name = gst_element_get_name(GST_ELEMENT(queue)); - DEBUG("queue: %s push %d\n", name, queue->level_buffers); + DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); /* have to lock for thread-safety */ + DEBUG("queue: try have queue lock\n"); GST_LOCK(queue); + DEBUG("queue: have queue lock\n"); while (!queue->level_buffers) { + STATUS("%s: U released lock\n"); GST_UNLOCK(queue); g_mutex_lock(queue->emptylock); - STATUS("%s: U\n"); g_cond_wait(queue->emptycond,queue->emptylock); g_mutex_unlock(queue->emptylock); GST_LOCK(queue); + STATUS("%s: U- getting lock\n"); } front = queue->queue; @@ -240,13 +246,15 @@ void gst_queue_push(GstConnection *connection) { if (tosignal) { g_mutex_lock(queue->fulllock); + STATUS("%s: < \n"); g_cond_signal(queue->fullcond); + STATUS("%s: << \n"); g_mutex_unlock(queue->fulllock); } - DEBUG("queue: %s pushing %d %p\n", name, queue->level_buffers, buf); + //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf); gst_pad_push(queue->srcpad,buf); - DEBUG("queue: %s pushing %d done\n", name, queue->level_buffers); + //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers); /* unlock now */ } diff --git a/gst/gstbin.c b/gst/gstbin.c index b2748c02ce..d10051c350 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -202,6 +202,9 @@ static GstElementStateReturn gst_bin_change_state(GstElement *element) { _gst_print_statename(GST_STATE(element)),GST_STATE_PENDING(element), _gst_print_statename(GST_STATE_PENDING(element))); + if (GST_STATE_PENDING(element) == GST_STATE_READY) { + gst_bin_create_plan(bin); + } // g_return_val_if_fail(bin->numchildren != 0, GST_STATE_FAILURE); // g_print("-->\n"); @@ -226,18 +229,12 @@ static GstElementStateReturn gst_bin_change_state(GstElement *element) { } // g_print("<-- \"%s\"\n",gst_object_get_name(GST_OBJECT(bin))); -// if (GST_STATE_PENDING(element), return gst_bin_change_state_norecurse(bin); } static GstElementStateReturn gst_bin_change_state_norecurse(GstBin *bin) { -/* - if ((state == GST_STATE_READY) && (GST_STATE(bin) < GST_STATE_READY)) { -// gst_bin_create_plan( - } -*/ if (GST_ELEMENT_CLASS(parent_class)->change_state) return GST_ELEMENT_CLASS(parent_class)->change_state(GST_ELEMENT(bin)); @@ -292,8 +289,8 @@ gboolean gst_bin_set_state_type(GstBin *bin, GtkType type) { GstBinClass *oclass; -// g_print("gst_bin_set_state_type(\"%s\",%d,%d)\n", -// gst_object_get_name(GST_OBJECT(bin)),state,type); + DEBUG("gst_bin_set_state_type(\"%s\",%d,%d)\n", + gst_element_get_name(GST_ELEMENT(bin)),state,type); g_return_val_if_fail(bin != NULL, FALSE); g_return_val_if_fail(GST_IS_BIN(bin), FALSE); @@ -310,7 +307,7 @@ void gst_bin_real_destroy(GtkObject *object) { GList *children; GstElement *child; -// g_print("in gst_bin_real_destroy()\n"); + DEBUG("in gst_bin_real_destroy()\n"); children = bin->children; while (children) { @@ -427,32 +424,38 @@ static int gst_bin_loopfunc_wrapper(int argc,char *argv[]) { GList *pads; GstPad *pad; GstBuffer *buf; + gchar *name = gst_element_get_name(element); -// g_print("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n", -// argc,gst_element_get_name(element)); + DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n", + argc,gst_element_get_name(element)); if (element->loopfunc != NULL) { while (1) { - DEBUG("** gst_bin_loopfunc_wrapper(): element has loop function, calling it\n"); + DEBUG("** gst_bin_loopfunc_wrapper(): element %s has loop function, calling it\n", name); (element->loopfunc)(element); - DEBUG("** gst_bin_loopfunc_wrapper(): element ended loop function\n"); + DEBUG("** gst_bin_loopfunc_wrapper(): element %s ended loop function\n", name); } } else { - DEBUG("** gst_bin_loopfunc_wrapper(): element is chain-based, calling in infinite loop\n"); + DEBUG("** gst_bin_loopfunc_wrapper(): element %s is chain-based, calling in infinite loop\n", name); if (GST_IS_SRC(element)) { - //while (1) { - DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source\n"); - gst_src_push(GST_SRC(element)); - //} + DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s\n", name); + gst_src_push(GST_SRC(element)); + DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of source %s done\n", name); + } else if (GST_IS_CONNECTION(element) && argc == 1) { + while (1) { + DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s\n", name); + gst_connection_push(GST_CONNECTION(element)); + DEBUG("** gst_bin_loopfunc_wrapper(): calling push function of connection %s done\n", name); + } } else { while (1) { pads = element->pads; while (pads) { pad = GST_PAD(pads->data); if (pad->direction == GST_PAD_SINK) { - DEBUG("** gst_bin_loopfunc_wrapper(): pulling a buffer\n"); - buf = gst_pad_pull(pad); - DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function\n"); + DEBUG("** gst_bin_loopfunc_wrapper(): pulling a buffer from %s:%s\n", name, gst_pad_get_name(pad)); + buf = gst_pad_pull(pad); + DEBUG("** gst_bin_loopfunc_wrapper(): calling chain function of %s:%s\n", name, gst_pad_get_name(pad)); (pad->chainfunc)(pad,buf); } pads = g_list_next(pads); @@ -484,7 +487,7 @@ static void gst_bin_create_plan_func(GstBin *bin) { GstElement *element; int sink_pads; GList *pads; - GstPad *pad, *peer; + GstPad *pad, *opad, *peer; GstElement *outside; g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin))); @@ -499,13 +502,11 @@ static void gst_bin_create_plan_func(GstBin *bin) { if (element->loopfunc != NULL) { g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); bin->need_cothreads = TRUE; - break; } // if it's a complex element, use cothreads - if (GST_ELEMENT_IS_MULTI_IN(element)) { + else if (GST_ELEMENT_IS_MULTI_IN(element)) { g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); bin->need_cothreads = TRUE; - break; } // if it has more than one input pad, use cothreads sink_pads = 0; @@ -527,6 +528,11 @@ static void gst_bin_create_plan_func(GstBin *bin) { // FIXME bin->need_cothreads &= bin->use_cothreads; + // clear previous plan state + g_list_free(bin->entries); + bin->entries = NULL; + bin->numentries = 0; + if (bin->need_cothreads) { g_print("gstbin: need cothreads\n"); @@ -551,22 +557,56 @@ static void gst_bin_create_plan_func(GstBin *bin) { pads = gst_element_get_pad_list(element); while (pads) { pad = GST_PAD(pads->data); -g_print("gstbin: setting push&pull handlers for %s:%s\n", -gst_element_get_name(element),gst_pad_get_name(pad)); -// if (pad->direction == GST_PAD_SRC) + g_print("gstbin: setting push&pull handlers for %s:%s\n", + gst_element_get_name(element),gst_pad_get_name(pad)); + + // an internal connection will push outside this bin. + if (!GST_IS_CONNECTION(element)) { pad->pushfunc = gst_bin_pushfunc_wrapper; -// else - pad->pullfunc = gst_bin_pullfunc_wrapper; + } + pad->pullfunc = gst_bin_pullfunc_wrapper; + + /* we only worry about sink pads */ + if (gst_pad_get_direction(pad) == GST_PAD_SINK) { + /* get the pad's peer */ + peer = gst_pad_get_peer(pad); + if (!peer) break; + /* get the parent of the peer of the pad */ + outside = GST_ELEMENT(gst_pad_get_parent(peer)); + if (!outside) break; + /* if it's a connection and it's not ours... */ + if (GST_IS_CONNECTION(outside) && + (gst_object_get_parent(GST_OBJECT(outside)) != GST_OBJECT(bin))) { + GList *connection_pads = gst_element_get_pad_list(outside); + while (connection_pads) { + opad = GST_PAD(connection_pads->data); + if (gst_pad_get_direction(opad) == GST_PAD_SRC) { + g_print("gstbin: setting push&pull handlers for %s:%s SRC connection\n", + gst_element_get_name(outside),gst_pad_get_name(opad)); + opad->pushfunc = gst_bin_pushfunc_wrapper; + opad->pullfunc = gst_bin_pullfunc_wrapper; + if (outside->threadstate == NULL) { + outside->threadstate = cothread_create(bin->threadcontext); + cothread_setfunc(outside->threadstate,gst_bin_loopfunc_wrapper, + 1,(char **)outside); + } + } + connection_pads = g_list_next(connection_pads); + } + gst_info("gstbin: element \"%s\" is the external source Connection " + "for internal element \"%s\"\n", + gst_element_get_name(GST_ELEMENT(outside)), + gst_element_get_name(GST_ELEMENT(element))); + bin->entries = g_list_prepend(bin->entries,outside); + bin->numentries++; + } + } pads = g_list_next(pads); } elements = g_list_next(elements); } } else { g_print("gstbin: don't need cothreads, looking for entry points\n"); - // clear previous plan state - g_list_free(bin->entries); - bin->entries = NULL; - bin->numentries = 0; // we have to find which elements will drive an iteration elements = bin->children; while (elements) { @@ -592,8 +632,8 @@ gst_element_get_name(element),gst_pad_get_name(pad)); /* if it's a connection and it's not ours... */ if (GST_IS_CONNECTION(outside) && (gst_object_get_parent(GST_OBJECT(outside)) != GST_OBJECT(bin))) { - gst_info("gstbin: element \"%s\" is the external source Connection \ - for internal element \"%s\"\n", + gst_info("gstbin: element \"%s\" is the external source Connection " + "for internal element \"%s\"\n", gst_element_get_name(GST_ELEMENT(outside)), gst_element_get_name(GST_ELEMENT(element))); bin->entries = g_list_prepend(bin->entries,outside); @@ -617,18 +657,23 @@ void gst_bin_iterate_func(GstBin *bin) { g_return_if_fail(bin != NULL); g_return_if_fail(GST_IS_BIN(bin)); g_return_if_fail(GST_STATE(bin) == GST_STATE_PLAYING); - g_return_if_fail(bin->numentries > 0); DEBUG("GstBin: iterating\n"); if (bin->need_cothreads) { // all we really have to do is switch to the first child // FIXME this should be lots more intelligent about where to start - DEBUG("** in gst_bin_iterate_func()==================================%s\n", + DEBUG("** in gst_bin_iterate_func()==================================%s\n", gst_element_get_name(GST_ELEMENT(bin->children->data))); cothread_switch(GST_ELEMENT(bin->children->data)->threadstate); } else { - entries = bin->entries; + if (bin->numentries <= 0) { + printf("gstbin: no elements in bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin))); + entries = bin->children; + } + else { + entries = bin->entries; + } while (entries) { entry = GST_ELEMENT(entries->data); @@ -636,6 +681,8 @@ void gst_bin_iterate_func(GstBin *bin) { gst_src_push(GST_SRC(entry)); else if (GST_IS_CONNECTION(entry)) gst_connection_push(GST_CONNECTION(entry)); + else if (GST_IS_BIN(entry)) + gst_bin_iterate(GST_BIN(entry)); else g_assert_not_reached(); entries = g_list_next(entries); diff --git a/gst/gstpad.c b/gst/gstpad.c index cf3bf9d3e3..ad3f5dd3f0 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -183,16 +183,11 @@ void gst_pad_set_qos_function(GstPad *pad,GstPadQoSFunction qos) { void gst_pad_push(GstPad *pad,GstBuffer *buffer) { g_return_if_fail(pad != NULL); g_return_if_fail(GST_IS_PAD(pad)); -// g_return_if_fail(GST_PAD_CONNECTED(pad)); + g_return_if_fail(GST_PAD_CONNECTED(pad)); g_return_if_fail(buffer != NULL); gst_trace_add_entry(NULL,0,buffer,"push buffer"); - // FIXME we should probably make some noise here... - if (!GST_PAD_CONNECTED(pad)) return; - -// g_return_if_fail(pad->pushfunc != NULL); - // first check to see if there's a push handler if (pad->pushfunc != NULL) { //g_print("-- gst_pad_push(): putting buffer in pen and calling push handler\n"); @@ -230,7 +225,7 @@ GstBuffer *gst_pad_pull(GstPad *pad) { // g_print("-- gst_pad_pull(): calling pull handler\n"); (pad->pullfunc)(pad->peer); } else { -// g_print("-- gst_pad_pull(): no buffer in pen, and no handler to get one there!!!\n"); + g_print("-- gst_pad_pull(): no buffer in pen, and no handler to get one there!!!\n"); } } @@ -242,7 +237,7 @@ GstBuffer *gst_pad_pull(GstPad *pad) { return buf; // else we have a big problem... } else { -// g_print("-- gst_pad_pull(): uh, nothing in pen and no handler\n"); + g_print("-- gst_pad_pull(): uh, nothing in pen and no handler\n"); return NULL; } diff --git a/gst/gstpipeline.c b/gst/gstpipeline.c index adcc0103b0..3b520842cc 100644 --- a/gst/gstpipeline.c +++ b/gst/gstpipeline.c @@ -347,10 +347,27 @@ gboolean gst_pipeline_autoplug(GstPipeline *pipeline) { elements = pipeline->sinks; - // fase 2, find all the sinks.. + // fase 2, loop over all the sinks.. while (elements) { + GList *pads; + GstPad *pad; + element = GST_ELEMENT(elements->data); + pads = gst_element_get_pad_list(element); + + while (pads) { + pad = (GstPad *)pads->data; + + if (pad->direction == GST_PAD_SINK) { + sink_type = gst_pad_get_type_id(pad); + sinkelement = element; + break; + } + + pads = g_list_next(pads); + } + /* if (GST_IS_SINK(element)) { g_print("GstPipeline: found sink \"%s\"\n", gst_element_get_name(element)); @@ -369,6 +386,7 @@ gboolean gst_pipeline_autoplug(GstPipeline *pipeline) { gst_element_get_name(element), sink_type); } } + */ elements = g_list_next(elements); } diff --git a/gst/gstthread.c b/gst/gstthread.c index 41adceb055..b84533df3e 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -105,9 +105,6 @@ gst_thread_class_init(GstThreadClass *klass) { static void gst_thread_init(GstThread *thread) { GST_FLAG_SET(thread,GST_THREAD_CREATE); -// thread->entries = NULL; -// thread->numentries = 0; - thread->lock = g_mutex_new(); thread->cond = g_cond_new(); } @@ -178,10 +175,13 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) { pending = GST_STATE_PENDING(element); + if (pending == GST_STATE(element)) return GST_STATE_SUCCESS; + if (GST_ELEMENT_CLASS(parent_class)->change_state) stateset = GST_ELEMENT_CLASS(parent_class)->change_state(element); - gst_info("gstthread: stateset %d %d\n", stateset, GST_STATE_PENDING(element)); + gst_info("gstthread: stateset %d %d %d\n", GST_STATE(element), stateset, GST_STATE_PENDING(element)); + switch (pending) { case GST_STATE_READY: @@ -189,13 +189,10 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) { // we want to prepare our internal state for doing the iterations gst_info("gstthread: preparing thread \"%s\" for iterations:\n", gst_element_get_name(GST_ELEMENT(element))); - //gst_thread_prepare(thread); - gst_bin_create_plan(GST_BIN(thread)); -// if (thread->numentries == 0) -// return FALSE; // set the state to idle GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING); + GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING); // create the thread if that's what we're supposed to do gst_info("gstthread: flags are 0x%08x\n",GST_FLAGS(thread)); if (GST_FLAG_IS_SET(thread,GST_THREAD_CREATE)) { @@ -214,12 +211,14 @@ static GstElementStateReturn gst_thread_change_state(GstElement *element) { gst_info("gstthread: starting thread \"%s\"\n", gst_element_get_name(GST_ELEMENT(element))); GST_FLAG_SET(thread,GST_THREAD_STATE_SPINNING); + GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING); gst_thread_signal_thread(thread); break; case GST_STATE_PAUSED: gst_info("gstthread: pausing thread \"%s\"\n", gst_element_get_name(GST_ELEMENT(element))); GST_FLAG_UNSET(thread,GST_THREAD_STATE_SPINNING); + GST_FLAG_UNSET(thread,GST_THREAD_STATE_REAPING); gst_thread_signal_thread(thread); break; case GST_STATE_NULL: diff --git a/gstplay/Makefile.am b/gstplay/Makefile.am index 63981b300f..5147c0ff3a 100644 --- a/gstplay/Makefile.am +++ b/gstplay/Makefile.am @@ -1,7 +1,7 @@ ## Process this file with automake to produce Makefile.in INCLUDES = $(GLIB_CFLAGS) $(GTK_CFLAGS) -I$(top_srcdir) \ - $(shell gnome-config --cflags gnomeui) + $(shell gnome-config --cflags gnomeui) $(shell gstreamer-config --cflags) bin_PROGRAMS = gstplay @@ -19,8 +19,10 @@ noinst_HEADERS = codecs.h CFLAGS += -O2 -Wall -DDATADIR=\""$(gladedir)/"\" -gstplay_CFLAGS = $(shell gnome-config --cflags gnomeui) $(shell libglade-config --cflags gnome) -gstplay_LDFLAGS = $(shell gnome-config --libs gnomeui) $(shell libglade-config --libs gnome) +gstplay_CFLAGS = $(shell gnome-config --cflags gnomeui) $(shell libglade-config --cflags gnome) \ + $(shell gstreamer-config --cflags ) +gstplay_LDFLAGS = $(shell gnome-config --libs gnomeui) $(shell libglade-config --libs gnome) \ + $(shell gstreamer-config --libs ) if HAVE_LIBXV xvlibs=-lXv diff --git a/gstplay/avi.c b/gstplay/avi.c index 034e9aff12..2b22bbfd7d 100644 --- a/gstplay/avi.c +++ b/gstplay/avi.c @@ -17,12 +17,14 @@ void avi_new_pad_created(GstElement *parse,GstPad *pad,GstElement *pipeline) //if (0) { if (strncmp(gst_pad_get_name(pad), "audio_", 6) == 0) { + gst_bin_add(GST_BIN(pipeline), audio_render_queue); gst_pad_connect(pad, gst_element_get_pad(audio_render_queue,"sink")); } else if (strncmp(gst_pad_get_name(pad), "video_", 6) == 0) { //} else if (0) { + gst_bin_add(GST_BIN(pipeline), video_render_queue); gst_pad_connect(pad, gst_element_get_pad(video_render_queue,"sink")); } diff --git a/gstplay/gstplay.c b/gstplay/gstplay.c index 3d3ba93f75..8ffd8ffebe 100644 --- a/gstplay/gstplay.c +++ b/gstplay/gstplay.c @@ -27,7 +27,7 @@ gboolean idle_func(gpointer data); GstElement *show, *video_render_queue; GstElement *audio_play, *audio_render_queue; GstElement *src; -GstPipeline *pipeline; +GstElement *pipeline; GstElement *parse = NULL; GstElement *typefind; GstElement *video_render_thread; @@ -254,6 +254,9 @@ static void have_type(GstSink *sink) { } gtk_object_set(GTK_OBJECT(src),"offset",0,NULL); + gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_thread)); + gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_thread)); + g_print("setting to READY state\n"); gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY); g_print("setting to PLAYING state\n"); @@ -286,7 +289,6 @@ gint start_from_file(guchar *filename) g_print("setting to READY state\n"); - gst_bin_create_plan(GST_BIN(pipeline)); gst_element_set_state(GST_ELEMENT(pipeline),GST_STATE_READY); state = GSTPLAY_STOPPED; @@ -363,17 +365,13 @@ main (int argc, char *argv[]) gnome_dock_set_client_area(GNOME_DOCK(glade_xml_get_widget(xml, "dock1")), gst_util_get_widget_arg(GTK_OBJECT(show),"widget")); gst_bin_add(GST_BIN(video_render_thread),GST_ELEMENT(show)); - gst_element_add_ghost_pad(GST_ELEMENT(video_render_thread), - gst_element_get_pad(show,"sink")); glade_xml_signal_autoconnect(xml); video_render_queue = gst_elementfactory_make("queue","video_render_queue"); gtk_object_set(GTK_OBJECT(video_render_queue),"max_level",BUFFER,NULL); - gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_queue)); - gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(video_render_thread)); gst_pad_connect(gst_element_get_pad(video_render_queue,"src"), - gst_element_get_pad(video_render_thread,"sink")); + gst_element_get_pad(show,"sink")); gtk_object_set(GTK_OBJECT(video_render_thread),"create_thread",TRUE,NULL); @@ -381,15 +379,11 @@ main (int argc, char *argv[]) g_return_val_if_fail(audio_render_thread != NULL, -1); audio_play = gst_elementfactory_make("audiosink","play_audio"); gst_bin_add(GST_BIN(audio_render_thread),GST_ELEMENT(audio_play)); - gst_element_add_ghost_pad(GST_ELEMENT(audio_render_thread), - gst_element_get_pad(audio_play,"sink")); audio_render_queue = gst_elementfactory_make("queue","audio_render_queue"); gtk_object_set(GTK_OBJECT(audio_render_queue),"max_level",BUFFER,NULL); - gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_queue)); - gst_bin_add(GST_BIN(pipeline),GST_ELEMENT(audio_render_thread)); gst_pad_connect(gst_element_get_pad(audio_render_queue,"src"), - gst_element_get_pad(audio_render_thread,"sink")); + gst_element_get_pad(audio_play,"sink")); gtk_object_set(GTK_OBJECT(audio_render_thread),"create_thread",TRUE,NULL); if (argc > 1) { diff --git a/gstplay/mpeg1.c b/gstplay/mpeg1.c index 4febea6064..4c04b5cd42 100644 --- a/gstplay/mpeg1.c +++ b/gstplay/mpeg1.c @@ -56,6 +56,7 @@ void mpeg1_setup_audio_thread(GstPad *pad, GstElement *audio_render_queue, GstEl g_return_if_fail(audio_thread != NULL); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio)); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode)); + gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(audio_render_queue)); // set up pad connections gst_element_add_ghost_pad(GST_ELEMENT(audio_thread), @@ -100,6 +101,7 @@ void mpeg1_setup_video_thread(GstPad *pad, GstElement *video_render_queue, GstEl g_return_if_fail(video_thread != NULL); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video)); + gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(video_render_queue)); // set up pad connections gst_element_add_ghost_pad(GST_ELEMENT(video_thread), diff --git a/gstplay/mpeg2.c b/gstplay/mpeg2.c index 990f3185e1..aefdd4a783 100644 --- a/gstplay/mpeg2.c +++ b/gstplay/mpeg2.c @@ -66,6 +66,7 @@ void mpeg2_new_pad_created(GstElement *parse,GstPad *pad,GstElement *pipeline) g_return_if_fail(audio_thread != NULL); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(parse_audio)); gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(decode)); + gst_bin_add(GST_BIN(audio_thread),GST_ELEMENT(audio_render_queue)); // set up pad connections gst_element_add_ghost_pad(GST_ELEMENT(audio_thread), @@ -116,6 +117,7 @@ void mpeg2_setup_video_thread(GstPad *pad, GstElement *show, GstElement *pipelin gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(parse_video)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(decode_video)); gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(merge_subtitles)); + gst_bin_add(GST_BIN(video_thread),GST_ELEMENT(video_render_queue)); gst_bin_use_cothreads(GST_BIN(video_thread), FALSE); // set up pad connections diff --git a/gstreamer-config.in b/gstreamer-config.in index ad9c04863f..6811a7feee 100644 --- a/gstreamer-config.in +++ b/gstreamer-config.in @@ -47,14 +47,14 @@ while test $# -gt 0; do elif test @includedir@ != /usr/include ; then includes=-I@includedir@ fi - echo $includes `gtk-config --cflags` + echo $includes `gtk-config --cflags gtk gthread` ;; --libs) if test $prefix -ef @builddir@ ; then - echo @builddir@/libgst.la `gtk-config --libs` + echo @builddir@/libgst.la `gtk-config --libs gtk gthread` else libdirs=-L@libdir@ - echo $libdirs -lgst `gtk-config --libs` + echo $libdirs -lgst `gtk-config --libs gtk gthread` fi ;; *) diff --git a/plugins/elements/Makefile.am b/plugins/elements/Makefile.am index 420b551ef9..bf57b271ca 100644 --- a/plugins/elements/Makefile.am +++ b/plugins/elements/Makefile.am @@ -42,7 +42,7 @@ noinst_HEADERS = \ gsttypefind.h \ gstsinesrc.h -CFLAGS += -O2 -Wall +CFLAGS += -O2 -Wall libgstelements_la_LIBADD = $(GLIB_LIBS) $(GTK_LIBS) $(GHTTP_LIBS) libgstelements_la_LDFLAGS = -version-info $(STREAMER_CURRENT):$(STREAMER_REVISION):$(STREAMER_AGE) diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 5352ded75e..b9af36de44 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -27,6 +27,7 @@ #include +#include GstElementDetails gst_queue_details = { "Queue", @@ -105,6 +106,7 @@ static void gst_queue_class_init(GstQueueClass *klass) { static void gst_queue_init(GstQueue *queue) { queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); + gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); queue->srcpad = gst_pad_new("src",GST_PAD_SRC); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); @@ -150,9 +152,10 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { /* we have to lock the queue since we span threads */ - DEBUG("queue: %s adding buffer %p\n", name, buf); + DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self()); GST_LOCK(queue); + DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); @@ -164,18 +167,17 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); - if (queue->level_buffers >= queue->max_buffers) { + g_mutex_lock(queue->fulllock); + while (queue->level_buffers >= queue->max_buffers) { DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); - while (queue->level_buffers >= queue->max_buffers) { - GST_UNLOCK(queue); - g_mutex_lock(queue->fulllock); - STATUS("%s: O\n"); - g_cond_wait(queue->fullcond,queue->fulllock); - g_mutex_unlock(queue->fulllock); - GST_LOCK(queue); - } + STATUS("%s: O\n"); + GST_UNLOCK(queue); + g_cond_wait(queue->fullcond,queue->fulllock); + GST_LOCK(queue); + STATUS("%s: O+\n"); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } + g_mutex_unlock(queue->fulllock); /* put the buffer on the head of the list */ @@ -192,18 +194,19 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { STATUS("%s: +\n"); /* if we were empty, but aren't any more, signal a condition */ - tosignal = (queue->level_buffers <= 0); + tosignal = (queue->level_buffers >= 0); queue->level_buffers++; /* we can unlock now */ - DEBUG("queue: %s chain %d end\n", name, queue->level_buffers); + DEBUG("queue: %s chain %d end signal(%d,%p)\n", name, queue->level_buffers, tosignal, queue->emptycond); GST_UNLOCK(queue); if (tosignal) { g_mutex_lock(queue->emptylock); + STATUS("%s: >\n"); g_cond_signal(queue->emptycond); + STATUS("%s: >>\n"); g_mutex_unlock(queue->emptylock); - //g_print(">"); } } @@ -216,17 +219,20 @@ void gst_queue_push(GstConnection *connection) { name = gst_element_get_name(GST_ELEMENT(queue)); - DEBUG("queue: %s push %d\n", name, queue->level_buffers); + DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); /* have to lock for thread-safety */ + DEBUG("queue: try have queue lock\n"); GST_LOCK(queue); + DEBUG("queue: have queue lock\n"); while (!queue->level_buffers) { + STATUS("%s: U released lock\n"); GST_UNLOCK(queue); g_mutex_lock(queue->emptylock); - STATUS("%s: U\n"); g_cond_wait(queue->emptycond,queue->emptylock); g_mutex_unlock(queue->emptylock); GST_LOCK(queue); + STATUS("%s: U- getting lock\n"); } front = queue->queue; @@ -240,13 +246,15 @@ void gst_queue_push(GstConnection *connection) { if (tosignal) { g_mutex_lock(queue->fulllock); + STATUS("%s: < \n"); g_cond_signal(queue->fullcond); + STATUS("%s: << \n"); g_mutex_unlock(queue->fulllock); } - DEBUG("queue: %s pushing %d %p\n", name, queue->level_buffers, buf); + //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf); gst_pad_push(queue->srcpad,buf); - DEBUG("queue: %s pushing %d done\n", name, queue->level_buffers); + //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers); /* unlock now */ } diff --git a/test/mp2tomp1.c b/test/mp2tomp1.c index 0e27bdfea8..144027a008 100644 --- a/test/mp2tomp1.c +++ b/test/mp2tomp1.c @@ -158,7 +158,7 @@ void mp2tomp1(GstElement *parser,GstPad *pad, GstElement *pipeline) { gtk_object_set(GTK_OBJECT(smooth),"active",FALSE,NULL); encode = gst_elementfactory_make("mpeg2enc","encode"); g_return_if_fail(encode != NULL); - gtk_object_set(GTK_OBJECT(encode),"frames_per_second",25.0,NULL); + gtk_object_set(GTK_OBJECT(encode),"frames_per_second",29.97,NULL); //encode = gst_elementfactory_make("mpeg1encoder","encode"); //gtk_object_set(GTK_OBJECT(show),"width",640, "height", 480,NULL); diff --git a/tests/old/examples/helloworld/helloworld.c b/tests/old/examples/helloworld/helloworld.c index 2a3a4867b3..3527cc0b48 100644 --- a/tests/old/examples/helloworld/helloworld.c +++ b/tests/old/examples/helloworld/helloworld.c @@ -50,9 +50,6 @@ int main(int argc,char *argv[]) gst_pad_connect(gst_element_get_pad(decoder,"src"), gst_element_get_pad(audiosink,"sink")); - /* find out how to handle this bin */ - gst_bin_create_plan(GST_BIN(bin)); - /* make it ready */ gst_element_set_state(bin, GST_STATE_READY); /* start playing */ diff --git a/tests/old/examples/helloworld2/helloworld2.c b/tests/old/examples/helloworld2/helloworld2.c index f687ef6b5f..fab7b49138 100644 --- a/tests/old/examples/helloworld2/helloworld2.c +++ b/tests/old/examples/helloworld2/helloworld2.c @@ -46,9 +46,6 @@ int main(int argc,char *argv[]) exit(-1); } - /* find out how to handle this bin */ - gst_bin_create_plan(GST_BIN(pipeline)); - /* make it ready */ gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY); /* start playing */ diff --git a/tests/old/examples/queue/.gitignore b/tests/old/examples/queue/.gitignore new file mode 100644 index 0000000000..b7dd5275b2 --- /dev/null +++ b/tests/old/examples/queue/.gitignore @@ -0,0 +1,2 @@ +Makefile +queue diff --git a/tests/old/examples/queue/queue.c b/tests/old/examples/queue/queue.c new file mode 100644 index 0000000000..9dc42d1f17 --- /dev/null +++ b/tests/old/examples/queue/queue.c @@ -0,0 +1,84 @@ +#include + +gboolean playing; + +/* eos will be called when the src element has an end of stream */ +void eos(GstSrc *src, gpointer data) +{ + g_print("have eos, quitting\n"); + + playing = FALSE; +} + +int main(int argc,char *argv[]) +{ + GstElement *disksrc, *audiosink, *parse, *decode, *queue; + GstElement *bin; + GstElement *thread; + + if (argc != 2) { + g_print("usage: %s \n", argv[0]); + exit(-1); + } + + gst_init(&argc,&argv); + + /* create a new thread to hold the elements */ + thread = gst_thread_new("thread"); + g_assert(thread != NULL); + + /* create a new bin to hold the elements */ + bin = gst_bin_new("bin"); + g_assert(bin != NULL); + + /* create a disk reader */ + disksrc = gst_elementfactory_make("disksrc", "disk_source"); + g_assert(disksrc != NULL); + gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL); + gtk_signal_connect(GTK_OBJECT(disksrc),"eos", + GTK_SIGNAL_FUNC(eos), thread); + + parse = gst_elementfactory_make("mp3parse", "parse"); + decode = gst_elementfactory_make("mpg123", "decode"); + + queue = gst_elementfactory_make("queue", "queue"); + + /* and an audio sink */ + audiosink = gst_elementfactory_make("audiosink", "play_audio"); + g_assert(audiosink != NULL); + + /* add objects to the main pipeline */ + gst_bin_add(GST_BIN(bin), disksrc); + gst_bin_add(GST_BIN(bin), parse); + gst_bin_add(GST_BIN(bin), decode); + gst_bin_add(GST_BIN(bin), queue); + + gst_bin_add(GST_BIN(thread), audiosink); + + gst_bin_add(GST_BIN(bin), thread); + + gst_pad_connect(gst_element_get_pad(disksrc,"src"), + gst_element_get_pad(parse,"sink")); + gst_pad_connect(gst_element_get_pad(parse,"src"), + gst_element_get_pad(decode,"sink")); + gst_pad_connect(gst_element_get_pad(decode,"src"), + gst_element_get_pad(queue,"sink")); + gst_pad_connect(gst_element_get_pad(queue,"src"), + gst_element_get_pad(audiosink,"sink")); + + /* make it ready */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY); + /* start playing */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING); + + playing = TRUE; + + while (playing) { + gst_bin_iterate(GST_BIN(bin)); + } + + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL); + + exit(0); +} + diff --git a/tests/old/examples/queue2/.gitignore b/tests/old/examples/queue2/.gitignore new file mode 100644 index 0000000000..ca0136adcb --- /dev/null +++ b/tests/old/examples/queue2/.gitignore @@ -0,0 +1,2 @@ +Makefile +queue2 diff --git a/tests/old/examples/queue2/queue2.c b/tests/old/examples/queue2/queue2.c new file mode 100644 index 0000000000..845f491f89 --- /dev/null +++ b/tests/old/examples/queue2/queue2.c @@ -0,0 +1,81 @@ +#include + +gboolean playing; + +/* eos will be called when the src element has an end of stream */ +void eos(GstSrc *src, gpointer data) +{ + g_print("have eos, quitting\n"); + + playing = FALSE; +} + +int main(int argc,char *argv[]) +{ + GstElement *disksrc, *audiosink, *queue; + GstElement *pipeline; + GstElement *thread; + + if (argc != 2) { + g_print("usage: %s \n", argv[0]); + exit(-1); + } + + gst_init(&argc,&argv); + + /* create a new thread to hold the elements */ + thread = gst_thread_new("thread"); + g_assert(thread != NULL); + + /* create a new bin to hold the elements */ + pipeline = gst_pipeline_new("pipeline"); + g_assert(pipeline != NULL); + + /* create a disk reader */ + disksrc = gst_elementfactory_make("disksrc", "disk_source"); + g_assert(disksrc != NULL); + gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL); + gtk_signal_connect(GTK_OBJECT(disksrc),"eos", + GTK_SIGNAL_FUNC(eos), thread); + + queue = gst_elementfactory_make("queue", "queue"); + + /* and an audio sink */ + audiosink = gst_elementfactory_make("audiosink", "play_audio"); + g_assert(audiosink != NULL); + + /* add objects to the main pipeline */ + gst_pipeline_add_src(GST_PIPELINE(pipeline), disksrc); + gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue); + + gst_bin_add(GST_BIN(thread), audiosink); + + gst_pad_connect(gst_element_get_pad(queue,"src"), + gst_element_get_pad(audiosink,"sink")); + + gst_pad_set_type_id(gst_element_get_pad(queue, "sink"), + gst_pad_get_type_id(gst_element_get_pad(audiosink, "sink"))); + + if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) { + g_print("cannot autoplug pipeline\n"); + exit(-1); + } + + gst_bin_add(GST_BIN(pipeline), thread); + + /* make it ready */ + gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_READY); + /* start playing */ + gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING); + + playing = TRUE; + + while (playing) { + gst_bin_iterate(GST_BIN(pipeline)); + } + + gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL); + + exit(0); +} + diff --git a/tests/old/examples/queue3/.gitignore b/tests/old/examples/queue3/.gitignore new file mode 100644 index 0000000000..b89de200c9 --- /dev/null +++ b/tests/old/examples/queue3/.gitignore @@ -0,0 +1,2 @@ +Makefile +queue3 diff --git a/tests/old/examples/queue3/queue3.c b/tests/old/examples/queue3/queue3.c new file mode 100644 index 0000000000..fa79a78d19 --- /dev/null +++ b/tests/old/examples/queue3/queue3.c @@ -0,0 +1,85 @@ +#include + +gboolean playing; + +/* eos will be called when the src element has an end of stream */ +void eos(GstSrc *src, gpointer data) +{ + g_print("have eos, quitting\n"); + + playing = FALSE; +} + +int main(int argc,char *argv[]) +{ + GstElement *disksrc, *audiosink, *queue, *parse, *decode; + GstElement *bin; + GstElement *thread; + + if (argc != 2) { + g_print("usage: %s \n", argv[0]); + exit(-1); + } + + gst_init(&argc,&argv); + + /* create a new thread to hold the elements */ + thread = gst_thread_new("thread"); + g_assert(thread != NULL); + + /* create a new bin to hold the elements */ + bin = gst_bin_new("bin"); + g_assert(bin != NULL); + + /* create a disk reader */ + disksrc = gst_elementfactory_make("disksrc", "disk_source"); + g_assert(disksrc != NULL); + gtk_object_set(GTK_OBJECT(disksrc),"location", argv[1],NULL); + gtk_signal_connect(GTK_OBJECT(disksrc),"eos", + GTK_SIGNAL_FUNC(eos), thread); + + queue = gst_elementfactory_make("queue", "queue"); + + /* and an audio sink */ + audiosink = gst_elementfactory_make("audiosink", "play_audio"); + g_assert(audiosink != NULL); + + parse = gst_elementfactory_make("mp3parse", "parse"); + decode = gst_elementfactory_make("mpg123", "decode"); + + /* add objects to the main bin */ + gst_bin_add(GST_BIN(bin), disksrc); + gst_bin_add(GST_BIN(bin), queue); + + gst_bin_add(GST_BIN(thread), parse); + gst_bin_add(GST_BIN(thread), decode); + gst_bin_add(GST_BIN(thread), audiosink); + + gst_pad_connect(gst_element_get_pad(disksrc,"src"), + gst_element_get_pad(queue,"sink")); + + gst_pad_connect(gst_element_get_pad(queue,"src"), + gst_element_get_pad(parse,"sink")); + gst_pad_connect(gst_element_get_pad(parse,"src"), + gst_element_get_pad(decode,"sink")); + gst_pad_connect(gst_element_get_pad(decode,"src"), + gst_element_get_pad(audiosink,"sink")); + + gst_bin_add(GST_BIN(bin), thread); + + /* make it ready */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_READY); + /* start playing */ + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_PLAYING); + + playing = TRUE; + + while (playing) { + gst_bin_iterate(GST_BIN(bin)); + } + + gst_element_set_state(GST_ELEMENT(bin), GST_STATE_NULL); + + exit(0); +} + diff --git a/tests/old/examples/thread/thread.c b/tests/old/examples/thread/thread.c index 4d8073db73..4cd7721dbb 100644 --- a/tests/old/examples/thread/thread.c +++ b/tests/old/examples/thread/thread.c @@ -53,9 +53,9 @@ int main(int argc,char *argv[]) exit(-1); } - gst_bin_remove(GST_BIN(pipeline), disksrc); + //gst_bin_remove(GST_BIN(pipeline), disksrc); - gst_bin_add(GST_BIN(thread), disksrc); + //gst_bin_add(GST_BIN(thread), disksrc); gst_bin_add(GST_BIN(thread), GST_ELEMENT(pipeline)); /* make it ready */