From 8e589eec081fdbf640316649f1e88bdf361223f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 18 Apr 2024 17:08:36 +0300 Subject: [PATCH] bufferpool: Switch from GstAtomicQueue to GstVecDeque and a mutex/cond While the atomic queue itself is lock-free, all its usage had to be synchronized externally via a GstPoll and gst_poll_read_control() / gst_poll_write_control(). Both functions were always taking a mutex internally since cd06aea1, so the implementation was just very complicated but not lock-free at all. Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/2714 Part-of: --- subprojects/gstreamer/gst/gstbufferpool.c | 131 ++++++++-------------- 1 file changed, 48 insertions(+), 83 deletions(-) diff --git a/subprojects/gstreamer/gst/gstbufferpool.c b/subprojects/gstreamer/gst/gstbufferpool.c index d7da0cd4ad..911d1f7f33 100644 --- a/subprojects/gstreamer/gst/gstbufferpool.c +++ b/subprojects/gstreamer/gst/gstbufferpool.c @@ -70,8 +70,7 @@ #endif #include -#include "gstatomicqueue.h" -#include "gstpoll.h" +#include "gstvecdeque.h" #include "gstinfo.h" #include "gstquark.h" #include "gstvalue.h" @@ -92,8 +91,9 @@ GST_DEBUG_CATEGORY_STATIC (gst_buffer_pool_debug); struct _GstBufferPoolPrivate { - GstAtomicQueue *queue; - GstPoll *poll; + GMutex queue_lock; + GCond queue_cond; + GstVecDeque *queue; GRecMutex rec_lock; @@ -159,8 +159,10 @@ gst_buffer_pool_init (GstBufferPool * pool) g_rec_mutex_init (&priv->rec_lock); - priv->poll = gst_poll_new_timer (); - priv->queue = gst_atomic_queue_new (16); + priv->queue = gst_vec_deque_new (16); + g_mutex_init (&priv->queue_lock); + g_cond_init (&priv->queue_cond); + pool->flushing = 1; priv->active = FALSE; priv->configured = FALSE; @@ -171,10 +173,6 @@ gst_buffer_pool_init (GstBufferPool * pool) gst_allocation_params_init (&priv->params); gst_buffer_pool_config_set_allocator (priv->config, priv->allocator, &priv->params); - /* 1 control write for flushing - the flush token */ - gst_poll_write_control (priv->poll); - /* 1 control write for marking that we are not waiting for poll - the wait token */ - gst_poll_write_control (priv->poll); GST_DEBUG_OBJECT (pool, "created"); } @@ -207,8 +205,9 @@ gst_buffer_pool_finalize (GObject * object) GST_DEBUG_OBJECT (pool, "%p finalize", pool); - gst_atomic_queue_unref (priv->queue); - gst_poll_free (priv->poll); + gst_vec_deque_free (priv->queue); + g_mutex_clear (&priv->queue_lock); + g_cond_clear (&priv->queue_cond); gst_structure_free (priv->config); g_rec_mutex_clear (&priv->rec_lock); @@ -397,6 +396,10 @@ do_free_buffer (GstBufferPool * pool, GstBuffer * buffer) if (G_LIKELY (pclass->free_buffer)) pclass->free_buffer (pool, buffer); + + g_mutex_lock (&priv->queue_lock); + g_cond_signal (&priv->queue_cond); + g_mutex_unlock (&priv->queue_lock); } /* must be called with the lock */ @@ -405,23 +408,18 @@ default_stop (GstBufferPool * pool) { GstBufferPoolPrivate *priv = pool->priv; GstBuffer *buffer; + gboolean cleared; /* clear the pool */ - while ((buffer = gst_atomic_queue_pop (priv->queue))) { - while (!gst_poll_read_control (priv->poll)) { - if (errno == EWOULDBLOCK) { - /* We put the buffer into the queue but did not finish writing control - * yet, let's wait a bit and retry */ - g_thread_yield (); - continue; - } else { - /* Critical error but GstPoll already complained */ - break; - } - } + g_mutex_lock (&priv->queue_lock); + while ((buffer = gst_vec_deque_pop_head (priv->queue))) { + g_mutex_unlock (&priv->queue_lock); do_free_buffer (pool, buffer); + g_mutex_lock (&priv->queue_lock); } - return priv->cur_buffers == 0; + cleared = priv->cur_buffers == 0; + g_mutex_unlock (&priv->queue_lock); + return cleared; } /* must be called with the lock */ @@ -458,9 +456,11 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing) return; if (flushing) { + /* Wake up any waiters */ + g_mutex_lock (&priv->queue_lock); g_atomic_int_set (&pool->flushing, 1); - /* Write the flush token to wake up any waiters */ - gst_poll_write_control (priv->poll); + g_cond_broadcast (&priv->queue_cond); + g_mutex_unlock (&priv->queue_lock); if (pclass->flush_start) pclass->flush_start (pool); @@ -468,19 +468,6 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing) if (pclass->flush_stop) pclass->flush_stop (pool); - while (!gst_poll_read_control (priv->poll)) { - if (errno == EWOULDBLOCK) { - /* This should not really happen unless flushing and unflushing - * happens on different threads. Let's wait a bit to get back flush - * token from the thread that was setting it to flushing */ - g_thread_yield (); - continue; - } else { - /* Critical error but GstPoll already complained */ - break; - } - } - g_atomic_int_set (&pool->flushing, 0); } } @@ -1112,24 +1099,16 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, GstFlowReturn result; GstBufferPoolPrivate *priv = pool->priv; + g_mutex_lock (&priv->queue_lock); while (TRUE) { if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool))) goto flushing; /* try to get a buffer from the queue */ - *buffer = gst_atomic_queue_pop (priv->queue); + *buffer = gst_vec_deque_pop_head (priv->queue); + g_mutex_unlock (&priv->queue_lock); + if (G_LIKELY (*buffer)) { - while (!gst_poll_read_control (priv->poll)) { - if (errno == EWOULDBLOCK) { - /* We put the buffer into the queue but did not finish writing control - * yet, let's wait a bit and retry */ - g_thread_yield (); - continue; - } else { - /* Critical error but GstPoll already complained */ - break; - } - } result = GST_FLOW_OK; GST_LOG_OBJECT (pool, "acquired buffer %p", *buffer); break; @@ -1152,34 +1131,14 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, break; } - /* now we release the control socket, we wait for a buffer release or - * flushing */ - if (!gst_poll_read_control (pool->priv->poll)) { - if (errno == EWOULDBLOCK) { - /* This means that we have two threads trying to allocate buffers - * already, and the other one already got the wait token. This - * means that we only have to wait for the poll now and not write the - * token afterwards: we will be woken up once the other thread is - * woken up and that one will write the wait token it removed */ - GST_LOG_OBJECT (pool, "waiting for free buffers or flushing"); - gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE); - } else { - /* This is a critical error, GstPoll already gave a warning */ - result = GST_FLOW_ERROR; - break; - } - } else { - /* We're the first thread waiting, we got the wait token and have to - * write it again later - * OR - * We're a second thread and just consumed the flush token and block all - * other threads, in which case we must not wait and give it back - * immediately */ - if (!GST_BUFFER_POOL_IS_FLUSHING (pool)) { - GST_LOG_OBJECT (pool, "waiting for free buffers or flushing"); - gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE); - } - gst_poll_write_control (pool->priv->poll); + /* now we wait for a buffer release or flushing */ + g_mutex_lock (&priv->queue_lock); + while (gst_vec_deque_get_length (priv->queue) == 0 + && !GST_BUFFER_POOL_IS_FLUSHING (pool) + && g_atomic_int_get (&priv->cur_buffers) >= priv->max_buffers) { + GST_LOG_OBJECT (pool, "waiting for free buffers or flushing"); + g_cond_wait (&priv->queue_cond, &priv->queue_lock); + GST_LOG_OBJECT (pool, "waited for free buffers or flushing"); } } @@ -1188,6 +1147,7 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, /* ERRORS */ flushing: { + g_mutex_unlock (&priv->queue_lock); GST_DEBUG_OBJECT (pool, "we are flushing"); return GST_FLOW_FLUSHING; } @@ -1331,8 +1291,10 @@ default_release_buffer (GstBufferPool * pool, GstBuffer * buffer) goto not_writable; /* keep it around in our queue */ - gst_atomic_queue_push (pool->priv->queue, buffer); - gst_poll_write_control (pool->priv->poll); + g_mutex_lock (&pool->priv->queue_lock); + gst_vec_deque_push_tail (pool->priv->queue, buffer); + g_cond_signal (&pool->priv->queue_cond); + g_mutex_unlock (&pool->priv->queue_lock); return; @@ -1358,7 +1320,10 @@ not_writable: discard: { do_free_buffer (pool, buffer); - gst_poll_write_control (pool->priv->poll); + + g_mutex_lock (&pool->priv->queue_lock); + g_cond_signal (&pool->priv->queue_cond); + g_mutex_unlock (&pool->priv->queue_lock); return; } }