diff --git a/gst/elements/gstqueue.c b/gst/elements/gstqueue.c index 2abf235a0d..61d1eaa7a8 100644 --- a/gst/elements/gstqueue.c +++ b/gst/elements/gstqueue.c @@ -100,12 +100,16 @@ static void gst_queue_init(GstQueue *queue) { queue->queue = NULL; queue->tail = NULL; queue->level_buffers = 0; + queue->max_buffers = 5; queue->level_bytes = 0; queue->size_buffers = 0; queue->size_bytes = 0; - queue->waiterlock = g_mutex_new(); - queue->waitercond = g_cond_new(); + queue->emptylock = g_mutex_new(); + queue->emptycond = g_cond_new(); + + queue->fulllock = g_mutex_new(); + queue->fullcond = g_cond_new(); } GstElement *gst_queue_new(gchar *name) { @@ -125,8 +129,21 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { queue = GST_QUEUE(pad->parent); /* we have to lock the queue since we span threads */ + GST_LOCK(queue); + if (queue->level_buffers >= queue->max_buffers) { + GST_UNLOCK(queue); + while (queue->level_buffers >= queue->max_buffers) { + g_mutex_lock(queue->fulllock); +// g_print("0"); + g_cond_wait(queue->fullcond,queue->fulllock); + g_mutex_unlock(queue->fulllock); + } + GST_LOCK(queue); + } + + /* put the buffer on the head of the list */ /* if the queue is NULL, start a new list and make this the tail */ if (!queue->queue) { @@ -147,9 +164,9 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { GST_UNLOCK(queue); if (tosignal) { - g_mutex_lock(queue->waiterlock); - g_cond_signal(queue->waitercond); - g_mutex_unlock(queue->waiterlock); + g_mutex_lock(queue->emptylock); + g_cond_signal(queue->emptycond); + g_mutex_unlock(queue->emptylock); // g_print(">"); } } @@ -158,6 +175,7 @@ void gst_queue_push(GstConnection *connection) { GstQueue *queue = GST_QUEUE(connection); GstBuffer *buf = NULL; GList *front; + gboolean tosignal = FALSE; /* have to lock for thread-safety */ GST_LOCK(queue); @@ -165,10 +183,10 @@ void gst_queue_push(GstConnection *connection) { if (!queue->level_buffers) { GST_UNLOCK(queue); while (!queue->level_buffers) { - g_mutex_lock(queue->waiterlock); + g_mutex_lock(queue->emptylock); // g_print("0"); - g_cond_wait(queue->waitercond,queue->waiterlock); - g_mutex_unlock(queue->waiterlock); + g_cond_wait(queue->emptycond,queue->emptylock); + g_mutex_unlock(queue->emptylock); } GST_LOCK(queue); } @@ -180,8 +198,17 @@ void gst_queue_push(GstConnection *connection) { queue->level_buffers--; // g_print("-"); - /* unlock now */ + tosignal = queue->level_buffers < queue->max_buffers; GST_UNLOCK(queue); + + if (tosignal) { + g_mutex_lock(queue->fulllock); + g_cond_signal(queue->fullcond); + g_mutex_unlock(queue->fulllock); + } + + + /* unlock now */ } diff --git a/gst/elements/gstqueue.h b/gst/elements/gstqueue.h index 5ce8d6f57b..b0ea7680b1 100644 --- a/gst/elements/gstqueue.h +++ b/gst/elements/gstqueue.h @@ -59,12 +59,15 @@ struct _GstQueue { GList *tail; /* have to keep track of this myself */ gint level_buffers; /* number of buffers queued here */ + gint max_buffers; /* maximum number of buffers queued here */ gint level_bytes; /* number of bytes queued here */ gint size_buffers; /* size of queue in buffers */ gint size_bytes; /* size of queue in bytes */ - GMutex *waiterlock; /* used when the queue is empty */ - GCond *waitercond; + GMutex *emptylock; /* used when the queue is empty */ + GCond *emptycond; + GMutex *fulllock; /* used when the queue is full */ + GCond *fullcond; }; struct _GstQueueClass { diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 2abf235a0d..61d1eaa7a8 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -100,12 +100,16 @@ static void gst_queue_init(GstQueue *queue) { queue->queue = NULL; queue->tail = NULL; queue->level_buffers = 0; + queue->max_buffers = 5; queue->level_bytes = 0; queue->size_buffers = 0; queue->size_bytes = 0; - queue->waiterlock = g_mutex_new(); - queue->waitercond = g_cond_new(); + queue->emptylock = g_mutex_new(); + queue->emptycond = g_cond_new(); + + queue->fulllock = g_mutex_new(); + queue->fullcond = g_cond_new(); } GstElement *gst_queue_new(gchar *name) { @@ -125,8 +129,21 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { queue = GST_QUEUE(pad->parent); /* we have to lock the queue since we span threads */ + GST_LOCK(queue); + if (queue->level_buffers >= queue->max_buffers) { + GST_UNLOCK(queue); + while (queue->level_buffers >= queue->max_buffers) { + g_mutex_lock(queue->fulllock); +// g_print("0"); + g_cond_wait(queue->fullcond,queue->fulllock); + g_mutex_unlock(queue->fulllock); + } + GST_LOCK(queue); + } + + /* put the buffer on the head of the list */ /* if the queue is NULL, start a new list and make this the tail */ if (!queue->queue) { @@ -147,9 +164,9 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { GST_UNLOCK(queue); if (tosignal) { - g_mutex_lock(queue->waiterlock); - g_cond_signal(queue->waitercond); - g_mutex_unlock(queue->waiterlock); + g_mutex_lock(queue->emptylock); + g_cond_signal(queue->emptycond); + g_mutex_unlock(queue->emptylock); // g_print(">"); } } @@ -158,6 +175,7 @@ void gst_queue_push(GstConnection *connection) { GstQueue *queue = GST_QUEUE(connection); GstBuffer *buf = NULL; GList *front; + gboolean tosignal = FALSE; /* have to lock for thread-safety */ GST_LOCK(queue); @@ -165,10 +183,10 @@ void gst_queue_push(GstConnection *connection) { if (!queue->level_buffers) { GST_UNLOCK(queue); while (!queue->level_buffers) { - g_mutex_lock(queue->waiterlock); + g_mutex_lock(queue->emptylock); // g_print("0"); - g_cond_wait(queue->waitercond,queue->waiterlock); - g_mutex_unlock(queue->waiterlock); + g_cond_wait(queue->emptycond,queue->emptylock); + g_mutex_unlock(queue->emptylock); } GST_LOCK(queue); } @@ -180,8 +198,17 @@ void gst_queue_push(GstConnection *connection) { queue->level_buffers--; // g_print("-"); - /* unlock now */ + tosignal = queue->level_buffers < queue->max_buffers; GST_UNLOCK(queue); + + if (tosignal) { + g_mutex_lock(queue->fulllock); + g_cond_signal(queue->fullcond); + g_mutex_unlock(queue->fulllock); + } + + + /* unlock now */ } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 5ce8d6f57b..b0ea7680b1 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -59,12 +59,15 @@ struct _GstQueue { GList *tail; /* have to keep track of this myself */ gint level_buffers; /* number of buffers queued here */ + gint max_buffers; /* maximum number of buffers queued here */ gint level_bytes; /* number of bytes queued here */ gint size_buffers; /* size of queue in buffers */ gint size_bytes; /* size of queue in bytes */ - GMutex *waiterlock; /* used when the queue is empty */ - GCond *waitercond; + GMutex *emptylock; /* used when the queue is empty */ + GCond *emptycond; + GMutex *fulllock; /* used when the queue is full */ + GCond *fullcond; }; struct _GstQueueClass {