diff --git a/gst-libs/gst/video/gstvideoaggregator.c b/gst-libs/gst/video/gstvideoaggregator.c index 4e186e05f5..d0f0a1ab99 100644 --- a/gst-libs/gst/video/gstvideoaggregator.c +++ b/gst-libs/gst/video/gstvideoaggregator.c @@ -415,6 +415,8 @@ struct _GstVideoAggregatorConvertPadPrivate * and as such are protected with the object lock */ GstStructure *converter_config; gboolean converter_config_changed; + + GstTaskPool *task_pool; }; G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorConvertPad, @@ -433,6 +435,11 @@ gst_video_aggregator_convert_pad_finalize (GObject * o) gst_structure_free (vaggpad->priv->converter_config); vaggpad->priv->converter_config = NULL; + if (vaggpad->priv->task_pool) + gst_task_pool_cleanup (vaggpad->priv->task_pool); + + gst_object_replace ((GstObject **) & vaggpad->priv->task_pool, NULL); + G_OBJECT_CLASS (gst_video_aggregator_pad_parent_class)->finalize (o); } @@ -447,6 +454,15 @@ static void GST_OBJECT_UNLOCK (pad); } +static guint +get_opt_uint (const GstStructure * config, const gchar * opt, guint def) +{ + guint res; + if (!gst_structure_get_uint (config, opt, &res)) + res = def; + return res; +} + static gboolean gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad, GstVideoAggregator * vagg, GstBuffer * buffer, @@ -475,10 +491,22 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad, pad->priv->convert = NULL; if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) { + if (pad->priv->converter_config) { + guint n_threads = get_opt_uint (pad->priv->converter_config, + GST_VIDEO_CONVERTER_OPT_THREADS, 1); + + if (n_threads == 0 || n_threads > g_get_num_processors ()) + n_threads = g_get_num_processors (); + + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pad->priv-> + task_pool), n_threads); + } + pad->priv->convert = - gst_video_converter_new (&vpad->info, &pad->priv->conversion_info, - pad->priv->converter_config ? gst_structure_copy (pad->priv-> - converter_config) : NULL); + gst_video_converter_new_with_pool (&vpad->info, + &pad->priv->conversion_info, + pad->priv->converter_config ? gst_structure_copy (pad-> + priv->converter_config) : NULL, pad->priv->task_pool); if (!pad->priv->convert) { GST_WARNING_OBJECT (pad, "No path found for conversion"); return FALSE; @@ -486,8 +514,8 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad, GST_DEBUG_OBJECT (pad, "This pad will be converted from %s to %s", gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&vpad->info)), - gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad-> - priv->conversion_info))); + gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->priv-> + conversion_info))); } else { GST_DEBUG_OBJECT (pad, "This pad will not need conversion"); } @@ -689,8 +717,10 @@ gst_video_aggregator_convert_pad_init (GstVideoAggregatorConvertPad * vaggpad) vaggpad->priv->convert = NULL; vaggpad->priv->converter_config = NULL; vaggpad->priv->converter_config_changed = FALSE; -} + vaggpad->priv->task_pool = gst_shared_task_pool_new (); + gst_task_pool_prepare (vaggpad->priv->task_pool, NULL); +} /** * gst_video_aggregator_convert_pad_update_conversion_info: diff --git a/gst-libs/gst/video/video-converter.c b/gst-libs/gst/video/video-converter.c index 7a8a6e20b7..3bfee92517 100644 --- a/gst-libs/gst/video/video-converter.c +++ b/gst-libs/gst/video/video-converter.c @@ -34,6 +34,7 @@ #include #include #include +#include #include "video-orc.h" @@ -120,161 +121,107 @@ typedef void (*GstParallelizedTaskFunc) (gpointer user_data); typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner; typedef struct _GstParallelizedTaskThread GstParallelizedTaskThread; -struct _GstParallelizedTaskThread -{ - GstParallelizedTaskRunner *runner; - guint idx; - GThread *thread; -}; - struct _GstParallelizedTaskRunner { + GstTaskPool *pool; + gboolean own_pool; guint n_threads; - GstParallelizedTaskThread *threads; + GstQueueArray *tasks; GstParallelizedTaskFunc func; gpointer *task_data; GMutex lock; - GCond cond_todo, cond_done; - gint n_todo, n_done; - gboolean quit; + gint n_todo; }; -static gpointer +static void gst_parallelized_task_thread_func (gpointer data) { - GstParallelizedTaskThread *self = data; + GstParallelizedTaskRunner *runner = data; + gint idx; -#if 0 -#ifdef HAVE_PTHREAD - { - pthread_t thread = pthread_self (); - cpu_set_t cpuset; - int r; + g_mutex_lock (&runner->lock); + idx = runner->n_todo--; + g_assert (runner->n_todo >= -1); + g_mutex_unlock (&runner->lock); - CPU_ZERO (&cpuset); - CPU_SET (self->idx, &cpuset); - if ((r = pthread_setaffinity_np (thread, sizeof (cpuset), &cpuset)) != 0) - GST_ERROR ("Failed to set thread affinity for thread %d: %s", self->idx, - g_strerror (r)); + g_assert (runner->func != NULL); + + runner->func (runner->task_data[idx]); +} + +static void +gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self) +{ + gboolean joined = FALSE; + + while (!joined) { + g_mutex_lock (&self->lock); + if (!(joined = gst_queue_array_is_empty (self->tasks))) { + gpointer task = gst_queue_array_pop_head (self->tasks); + g_mutex_unlock (&self->lock); + gst_task_pool_join (self->pool, task); + } else { + g_mutex_unlock (&self->lock); + } } -#endif -#endif - - g_mutex_lock (&self->runner->lock); - self->runner->n_done++; - if (self->runner->n_done == self->runner->n_threads - 1) - g_cond_signal (&self->runner->cond_done); - - do { - gint idx; - - while (self->runner->n_todo == -1 && !self->runner->quit) - g_cond_wait (&self->runner->cond_todo, &self->runner->lock); - - if (self->runner->quit) - break; - - idx = self->runner->n_todo--; - g_assert (self->runner->n_todo >= -1); - g_mutex_unlock (&self->runner->lock); - - g_assert (self->runner->func != NULL); - - self->runner->func (self->runner->task_data[idx]); - - g_mutex_lock (&self->runner->lock); - self->runner->n_done++; - if (self->runner->n_done == self->runner->n_threads - 1) - g_cond_signal (&self->runner->cond_done); - } while (TRUE); - - g_mutex_unlock (&self->runner->lock); - - return NULL; } static void gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self) { - guint i; - - g_mutex_lock (&self->lock); - self->quit = TRUE; - g_cond_broadcast (&self->cond_todo); - g_mutex_unlock (&self->lock); - - for (i = 1; i < self->n_threads; i++) { - if (!self->threads[i].thread) - continue; - - g_thread_join (self->threads[i].thread); - } + gst_parallelized_task_runner_join (self); + gst_queue_array_free (self->tasks); + if (self->own_pool) + gst_task_pool_cleanup (self->pool); + gst_object_unref (self->pool); g_mutex_clear (&self->lock); - g_cond_clear (&self->cond_todo); - g_cond_clear (&self->cond_done); - g_free (self->threads); g_free (self); } static GstParallelizedTaskRunner * -gst_parallelized_task_runner_new (guint n_threads) +gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool) { GstParallelizedTaskRunner *self; - guint i; - GError *err = NULL; if (n_threads == 0) n_threads = g_get_num_processors (); self = g_new0 (GstParallelizedTaskRunner, 1); - self->n_threads = n_threads; - self->threads = g_new0 (GstParallelizedTaskThread, n_threads); - self->quit = FALSE; + if (pool) { + self->pool = g_object_ref (pool); + self->own_pool = FALSE; + + /* No reason to split up the work between more threads than the + * pool can spawn */ + if (GST_IS_SHARED_TASK_POOL (pool)) + n_threads = + MIN (n_threads, + gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool))); + } else { + self->pool = gst_shared_task_pool_new (); + self->own_pool = TRUE; + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool), + n_threads); + gst_task_pool_prepare (self->pool, NULL); + } + + self->tasks = gst_queue_array_new (n_threads); + + self->n_threads = n_threads; + self->n_todo = -1; - self->n_done = 0; g_mutex_init (&self->lock); - g_cond_init (&self->cond_todo); - g_cond_init (&self->cond_done); /* Set when scheduling a job */ self->func = NULL; self->task_data = NULL; - for (i = 0; i < n_threads; i++) { - self->threads[i].runner = self; - self->threads[i].idx = i; - - /* First thread is the one calling run() */ - if (i > 0) { - self->threads[i].thread = - g_thread_try_new ("videoconvert", gst_parallelized_task_thread_func, - &self->threads[i], &err); - if (!self->threads[i].thread) - goto error; - } - } - - g_mutex_lock (&self->lock); - while (self->n_done < self->n_threads - 1) - g_cond_wait (&self->cond_done, &self->lock); - self->n_done = 0; - g_mutex_unlock (&self->lock); - return self; - -error: - { - GST_ERROR ("Failed to start thread %u: %s", i, err->message); - g_clear_error (&err); - - gst_parallelized_task_runner_free (self); - return NULL; - } } static void @@ -287,22 +234,24 @@ gst_parallelized_task_runner_run (GstParallelizedTaskRunner * self, self->task_data = task_data; if (n_threads > 1) { + guint i = 0; g_mutex_lock (&self->lock); self->n_todo = self->n_threads - 2; - self->n_done = 0; - g_cond_broadcast (&self->cond_todo); + for (i = 1; i < n_threads; i++) { + gpointer task = + gst_task_pool_push (self->pool, gst_parallelized_task_thread_func, + self, NULL); + + /* The return value of push() is unfortunately nullable, and we can't deal with that */ + g_assert (task != NULL); + gst_queue_array_push_tail (self->tasks, task); + } g_mutex_unlock (&self->lock); } self->func (self->task_data[self->n_threads - 1]); - if (n_threads > 1) { - g_mutex_lock (&self->lock); - while (self->n_done < self->n_threads - 1) - g_cond_wait (&self->cond_done, &self->lock); - self->n_done = 0; - g_mutex_unlock (&self->lock); - } + gst_parallelized_task_runner_join (self); self->func = NULL; self->task_data = NULL; @@ -2281,21 +2230,25 @@ convert_get_alpha_mode (GstVideoConverter * convert) } /** - * gst_video_converter_new: (skip) + * gst_video_converter_new_with_pool: (skip) * @in_info: a #GstVideoInfo * @out_info: a #GstVideoInfo * @config: (transfer full): a #GstStructure with configuration options + * @pool: (nullable): a #GstTaskPool to spawn threads from * * Create a new converter object to convert between @in_info and @out_info * with @config. * + * The optional @pool can be used to spawn threads, this is useful when + * creating new converters rapidly, for example when updating cropping. + * * Returns: a #GstVideoConverter or %NULL if conversion is not possible. * - * Since: 1.6 + * Since: 1.20 */ GstVideoConverter * -gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info, - GstStructure * config) +gst_video_converter_new_with_pool (GstVideoInfo * in_info, + GstVideoInfo * out_info, GstStructure * config, GstTaskPool * pool) { GstVideoConverter *convert; GstLineCache *prev; @@ -2431,7 +2384,8 @@ gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info, if (n_threads < 1) n_threads = 1; - convert->conversion_runner = gst_parallelized_task_runner_new (n_threads); + convert->conversion_runner = + gst_parallelized_task_runner_new (n_threads, pool); if (video_converter_lookup_fastpath (convert)) goto done; @@ -2520,6 +2474,26 @@ no_pack_func: } } +/** + * gst_video_converter_new: (skip) + * @in_info: a #GstVideoInfo + * @out_info: a #GstVideoInfo + * @config: (transfer full): a #GstStructure with configuration options + * + * Create a new converter object to convert between @in_info and @out_info + * with @config. + * + * Returns: a #GstVideoConverter or %NULL if conversion is not possible. + * + * Since: 1.6 + */ +GstVideoConverter * +gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info, + GstStructure * config) +{ + return gst_video_converter_new_with_pool (in_info, out_info, config, NULL); +} + static void clear_matrix_data (MatrixData * data) { diff --git a/gst-libs/gst/video/video-converter.h b/gst-libs/gst/video/video-converter.h index bdf03af9e7..5c82d4486e 100644 --- a/gst-libs/gst/video/video-converter.h +++ b/gst-libs/gst/video/video-converter.h @@ -280,6 +280,12 @@ GstVideoConverter * gst_video_converter_new (GstVideoInfo *in_info, GstVideoInfo *out_info, GstStructure *config); +GST_VIDEO_API +GstVideoConverter * gst_video_converter_new_with_pool (GstVideoInfo * in_info, + GstVideoInfo * out_info, + GstStructure * config, + GstTaskPool * pool); + GST_VIDEO_API void gst_video_converter_free (GstVideoConverter * convert); diff --git a/tests/check/libs/video.c b/tests/check/libs/video.c index acea722b78..b9b9ec78a3 100644 --- a/tests/check/libs/video.c +++ b/tests/check/libs/video.c @@ -2719,6 +2719,84 @@ GST_START_TEST (test_video_convert) GST_END_TEST; +GST_START_TEST (test_video_convert_multithreading) +{ + GstVideoInfo ininfo, outinfo; + GstVideoFrame inframe, outframe, refframe; + GstBuffer *inbuffer, *outbuffer, *refbuffer; + GstVideoConverter *convert; + GstMapInfo info; + GstTaskPool *pool; + + /* Large enough input resolution for video-converter to actually use + * 4 threads if required */ + fail_unless (gst_video_info_set_format (&ininfo, GST_VIDEO_FORMAT_ARGB, 1280, + 720)); + inbuffer = gst_buffer_new_and_alloc (ininfo.size); + gst_buffer_memset (inbuffer, 0, 0, -1); + gst_video_frame_map (&inframe, &ininfo, inbuffer, GST_MAP_READ); + + fail_unless (gst_video_info_set_format (&outinfo, GST_VIDEO_FORMAT_BGRx, 400, + 300)); + outbuffer = gst_buffer_new_and_alloc (outinfo.size); + refbuffer = gst_buffer_new_and_alloc (outinfo.size); + + gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE); + gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE); + + /* Single threaded-conversion */ + convert = gst_video_converter_new (&ininfo, &outinfo, + gst_structure_new_empty ("options")); + gst_video_converter_frame (convert, &inframe, &refframe); + gst_video_converter_free (convert); + + /* Multithreaded conversion, converter creates pool */ + convert = gst_video_converter_new (&ininfo, &outinfo, + gst_structure_new ("options", + GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL) + ); + gst_video_converter_frame (convert, &inframe, &outframe); + gst_video_converter_free (convert); + + gst_video_frame_unmap (&outframe); + gst_video_frame_unmap (&refframe); + + gst_buffer_map (outbuffer, &info, GST_MAP_READ); + fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0); + gst_buffer_unmap (outbuffer, &info); + + gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE); + gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE); + + /* Multi-threaded conversion, user-provided pool */ + pool = gst_shared_task_pool_new (); + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), 4); + gst_task_pool_prepare (pool, NULL); + convert = gst_video_converter_new_with_pool (&ininfo, &outinfo, + gst_structure_new ("options", + GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL), pool); + gst_video_converter_frame (convert, &inframe, &outframe); + gst_video_converter_free (convert); + gst_task_pool_cleanup (pool); + gst_object_unref (pool); + + gst_video_frame_unmap (&outframe); + gst_video_frame_unmap (&refframe); + + gst_buffer_map (outbuffer, &info, GST_MAP_READ); + fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0); + gst_buffer_unmap (outbuffer, &info); + + + gst_buffer_unref (refbuffer); + gst_buffer_unref (outbuffer); + gst_video_frame_unmap (&inframe); + gst_buffer_unref (inbuffer); + +} + +GST_END_TEST; + GST_START_TEST (test_video_transfer) { gint i, j; @@ -3889,6 +3967,7 @@ video_suite (void) tcase_add_test (tc_chain, test_video_color_convert_other); tcase_add_test (tc_chain, test_video_size_convert); tcase_add_test (tc_chain, test_video_convert); + tcase_add_test (tc_chain, test_video_convert_multithreading); tcase_add_test (tc_chain, test_video_transfer); tcase_add_test (tc_chain, test_overlay_blend); tcase_add_test (tc_chain, test_video_center_rect);