From f62dc6976b611384c98efb37d407b5299daf8c17 Mon Sep 17 00:00:00 2001 From: Stefan Kost Date: Mon, 13 Sep 2010 16:24:26 +0300 Subject: [PATCH] pulsesink: rework context sharing We also need to share the main-loop threads as this owns the context. Thus have a class wide main-loop thread. From this we create a context per client-name. Instead of always looking up the context, we keep this with the instance. The reverse mapping is only needed in pulse singal handlers. This saves a lot of locking. Also one signal handler becomes simpler as ther eis only one mainloop to notify. Now valgind happy - no leaks, no bad reads/writes. This reverts major parts of commit 69a397c32f4baf07a7b2937c610f9e8f383e9ae9. Fixes #628996 --- ext/pulse/pulsesink.c | 260 +++++++++++++++++++----------------------- ext/pulse/pulsesink.h | 6 + 2 files changed, 126 insertions(+), 140 deletions(-) diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c index d22dc72c9d..093b91dac8 100644 --- a/ext/pulse/pulsesink.c +++ b/ext/pulse/pulsesink.c @@ -115,6 +115,8 @@ struct _GstPulseContext * (strings) and values should be GstPulseContext pointers. */ static GHashTable *gst_pulse_shared_contexts = NULL; +static GMutex *pa_shared_ressource_mutex = NULL; + /* We keep a custom ringbuffer that is backed up by data allocated by * pulseaudio. We must also overide the commit function to write into * pulseaudio memory instead. */ @@ -125,6 +127,7 @@ struct _GstPulseRingBuffer gchar *context_name; gchar *stream_name; + pa_context *context; pa_stream *stream; pa_sample_spec sample_spec; @@ -166,13 +169,13 @@ static guint gst_pulseringbuffer_commit (GstRingBuffer * buf, G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER); -static GMutex *pa_ring_buffer_mutex = NULL; static void -gst_pulseringbuffer_init_contexts (void) +gst_pulsesink_init_contexts (void) { - g_assert (pa_ring_buffer_mutex == NULL); - pa_ring_buffer_mutex = g_mutex_new (); - gst_pulse_shared_contexts = g_hash_table_new (g_str_hash, g_str_equal); + g_assert (pa_shared_ressource_mutex == NULL); + pa_shared_ressource_mutex = g_mutex_new (); + gst_pulse_shared_contexts = g_hash_table_new_full (g_str_hash, g_str_equal, + g_free, NULL); } static void @@ -210,6 +213,7 @@ static void gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf) { pbuf->stream_name = NULL; + pbuf->context = NULL; pbuf->stream = NULL; #ifdef HAVE_PULSE_0_9_13 @@ -268,38 +272,35 @@ gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf) pbuf->stream_name = NULL; } -static GstPulseContext * -gst_pulsering_get_context (GstPulseRingBuffer * pbuf) -{ - GstPulseContext *pctx; - - g_mutex_lock (pa_ring_buffer_mutex); - pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name); - g_mutex_unlock (pa_ring_buffer_mutex); - return pctx; -} - static void gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf) { - GstPulseSink *psink; + g_mutex_lock (pa_shared_ressource_mutex); - g_mutex_lock (pa_ring_buffer_mutex); - psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + GST_DEBUG_OBJECT (pbuf, "destroying ringbuffer %p", pbuf); + + gst_pulsering_destroy_stream (pbuf); + + if (pbuf->context) { + pa_context_unref (pbuf->context); + pbuf->context = NULL; + } if (pbuf->context_name) { GstPulseContext *pctx; - GST_DEBUG_OBJECT (psink, "destroying context for pbuf=%p '%s'", - pbuf, pbuf->context_name); - pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name); - gst_pulsering_destroy_stream (pbuf); + GST_DEBUG_OBJECT (pbuf, "releasing context with name %s, pbuf=%p, pctx=%p", + pbuf->context_name, pbuf, pctx); if (pctx) { pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf); if (pctx->ring_buffers == NULL) { + GST_DEBUG_OBJECT (pbuf, + "destroying final context with name %s, pbuf=%p, pctx=%p", + pbuf->context_name, pbuf, pctx); + pa_context_disconnect (pctx->context); /* Make sure we don't get any further callbacks */ @@ -309,15 +310,15 @@ gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf) #endif g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name); - g_free (pbuf->context_name); - pbuf->context_name = NULL; pa_context_unref (pctx->context); g_slice_free (GstPulseContext, pctx); } } + g_free (pbuf->context_name); + pbuf->context_name = NULL; } - g_mutex_unlock (pa_ring_buffer_mutex); + g_mutex_unlock (pa_shared_ressource_mutex); } static void @@ -328,26 +329,18 @@ gst_pulseringbuffer_finalize (GObject * object) ringbuffer = GST_PULSERING_BUFFER_CAST (object); gst_pulsering_destroy_context (ringbuffer); - G_OBJECT_CLASS (ring_parent_class)->finalize (object); } static gboolean gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf) { - GstPulseContext *pctx = gst_pulsering_get_context (pbuf); - - if (!pctx) { - GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected"), (NULL)); - return TRUE; - } - - if (!pctx->context - || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pctx->context)) + if (!pbuf->context + || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context)) || !pbuf->stream || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) { const gchar *err_str = - pctx->context ? pa_strerror (pa_context_errno (pctx->context)) : NULL; + pbuf->context ? pa_strerror (pa_context_errno (pbuf->context)) : NULL; GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s", err_str), (NULL)); return TRUE; @@ -358,38 +351,26 @@ gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf) static void gst_pulsering_context_state_cb (pa_context * c, void *userdata) { - GstPulseSink *psink; pa_context_state_t state; - - GstPulseContext *pctx = (GstPulseContext *) userdata; - GSList *walk; + pa_threaded_mainloop *mainloop = (pa_threaded_mainloop *) userdata; state = pa_context_get_state (c); - for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) { - GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data; - psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - GST_LOG_OBJECT (psink, "got new context state %d", state); + GST_LOG ("got new context state %d", state); - /* psink can be null when we are shutting down and the ringbuffer is already - * unparented */ - if (psink == NULL) - continue; + switch (state) { + case PA_CONTEXT_READY: + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + GST_LOG ("signaling"); + pa_threaded_mainloop_signal (mainloop, 0); + break; - switch (state) { - case PA_CONTEXT_READY: - case PA_CONTEXT_TERMINATED: - case PA_CONTEXT_FAILED: - GST_LOG_OBJECT (psink, "signaling"); - pa_threaded_mainloop_signal (psink->mainloop, 0); - break; - - case PA_CONTEXT_UNCONNECTED: - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; - } + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; } } @@ -443,7 +424,6 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf) pbuf = GST_PULSERING_BUFFER_CAST (buf); g_assert (!pbuf->stream); - g_assert (psink->client_name); if (psink->server) @@ -453,25 +433,25 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf) pbuf->context_name = g_strdup (psink->client_name); pa_threaded_mainloop_lock (psink->mainloop); - g_mutex_lock (pa_ring_buffer_mutex); + g_mutex_lock (pa_shared_ressource_mutex); pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name); if (pctx == NULL) { pctx = g_slice_new0 (GstPulseContext); /* get the mainloop api and create a context */ - GST_LOG_OBJECT (psink, "new context with name %s", - GST_STR_NULL (pbuf->context_name)); + GST_INFO_OBJECT (psink, "new context with name %s, pbuf=%p, pctx=%p", + pbuf->context_name, pbuf, pctx); api = pa_threaded_mainloop_get_api (psink->mainloop); if (!(pctx->context = pa_context_new (api, pbuf->context_name))) goto create_failed; pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf); - g_hash_table_insert (gst_pulse_shared_contexts, pbuf->context_name, - (gpointer) pctx); + g_hash_table_insert (gst_pulse_shared_contexts, + g_strdup (pbuf->context_name), (gpointer) pctx); /* register some essential callbacks */ pa_context_set_state_callback (pctx->context, - gst_pulsering_context_state_cb, pctx); + gst_pulsering_context_state_cb, psink->mainloop); #ifdef HAVE_PULSE_0_9_12 pa_context_set_subscribe_callback (pctx->context, gst_pulsering_context_subscribe_cb, pctx); @@ -485,15 +465,19 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf) PA_CONTEXT_NOAUTOSPAWN, NULL) < 0) goto connect_failed; } else { - GST_LOG_OBJECT (psink, "reusing shared pulseaudio context with name %s", - GST_STR_NULL (pbuf->context_name)); + GST_INFO_OBJECT (psink, + "reusing shared context with name %s, pbuf=%p, pctx=%p", + pbuf->context_name, pbuf, pctx); pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf); } + /* context created or shared okay */ + pbuf->context = pa_context_ref (pctx->context); + for (;;) { pa_context_state_t state; - state = pa_context_get_state (pctx->context); + state = pa_context_get_state (pbuf->context); GST_LOG_OBJECT (psink, "context state is now %d", state); @@ -510,7 +494,7 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf) GST_LOG_OBJECT (psink, "opened the device"); - g_mutex_unlock (pa_ring_buffer_mutex); + g_mutex_unlock (pa_shared_ressource_mutex); pa_threaded_mainloop_unlock (psink->mainloop); return TRUE; @@ -518,7 +502,7 @@ gst_pulseringbuffer_open_device (GstRingBuffer * buf) /* ERRORS */ unlock_and_fail: { - g_mutex_unlock (pa_ring_buffer_mutex); + g_mutex_unlock (pa_shared_ressource_mutex); gst_pulsering_destroy_context (pbuf); pa_threaded_mainloop_unlock (psink->mainloop); @@ -528,6 +512,7 @@ create_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to create context"), (NULL)); + g_slice_free (GstPulseContext, pctx); goto unlock_and_fail; } connect_failed: @@ -725,7 +710,6 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) { GstPulseSink *psink; GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; pa_buffer_attr wanted; const pa_buffer_attr *actual; pa_channel_map channel_map; @@ -749,12 +733,12 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) pa_threaded_mainloop_lock (psink->mainloop); /* we need a context and a no stream */ - pctx = gst_pulsering_get_context (pbuf); + g_assert (pbuf->context); g_assert (!pbuf->stream); /* enable event notifications */ GST_LOG_OBJECT (psink, "subscribing to context events"); - if (!(o = pa_context_subscribe (pctx->context, + if (!(o = pa_context_subscribe (pbuf->context, PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) goto subscribe_failed; @@ -772,10 +756,10 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) /* create a stream */ GST_LOG_OBJECT (psink, "creating stream with name %s", name); if (psink->proplist) { - if (!(pbuf->stream = pa_stream_new_with_proplist (pctx->context, + if (!(pbuf->stream = pa_stream_new_with_proplist (pbuf->context, name, &pbuf->sample_spec, &channel_map, psink->proplist))) goto stream_failed; - } else if (!(pbuf->stream = pa_stream_new (pctx->context, + } else if (!(pbuf->stream = pa_stream_new (pbuf->context, name, &pbuf->sample_spec, &channel_map))) goto stream_failed; @@ -910,21 +894,21 @@ subscribe_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_context_subscribe() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock_and_fail; } stream_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to create stream: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock_and_fail; } connect_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect stream: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock_and_fail; } } @@ -966,7 +950,6 @@ gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked, { pa_operation *o = NULL; GstPulseSink *psink; - GstPulseContext *pctx = NULL; gboolean res = FALSE; psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); @@ -1002,10 +985,9 @@ server_dead: } cork_failed: { - pctx = gst_pulsering_get_context (pbuf); GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_stream_cork() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto cleanup; } } @@ -1264,7 +1246,6 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, { GstPulseSink *psink; GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; guint result; guint8 *data_end; gboolean reverse; @@ -1636,20 +1617,16 @@ was_paused: } writable_size_failed: { - pctx = gst_pulsering_get_context (pbuf); - GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_stream_writable_size() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock_and_fail; } write_failed: { - pctx = gst_pulsering_get_context (pbuf); - GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_stream_write() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock_and_fail; } } @@ -1676,7 +1653,7 @@ static void gst_pulsesink_init_interfaces (GType type); GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink); #define _do_init(type) \ - gst_pulseringbuffer_init_contexts (); \ + gst_pulsesink_init_contexts (); \ gst_pulsesink_init_interfaces (type); GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink, @@ -2007,7 +1984,6 @@ gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume) pa_cvolume v; pa_operation *o = NULL; GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; uint32_t idx; if (!psink->mainloop) @@ -2026,9 +2002,7 @@ gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume) gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume); - pctx = gst_pulsering_get_context (pbuf); - - if (!(o = pa_context_set_sink_input_volume (pctx->context, idx, + if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx, &v, NULL, NULL))) goto volume_failed; @@ -2068,7 +2042,7 @@ volume_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_stream_set_sink_input_volume() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock; } } @@ -2078,7 +2052,6 @@ gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute) { pa_operation *o = NULL; GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; uint32_t idx; if (!psink->mainloop) @@ -2095,9 +2068,7 @@ gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute) if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX) goto no_index; - pctx = gst_pulsering_get_context (pbuf); - - if (!(o = pa_context_set_sink_input_mute (pctx->context, idx, + if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx, mute, NULL, NULL))) goto mute_failed; @@ -2137,7 +2108,7 @@ mute_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_stream_set_sink_input_mute() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock; } } @@ -2174,7 +2145,6 @@ static gdouble gst_pulsesink_get_volume (GstPulseSink * psink) { GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; pa_operation *o = NULL; gdouble v = DEFAULT_VOLUME; uint32_t idx; @@ -2191,9 +2161,7 @@ gst_pulsesink_get_volume (GstPulseSink * psink) if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX) goto no_index; - pctx = gst_pulsering_get_context (pbuf); - - if (!(o = pa_context_get_sink_input_info (pctx->context, idx, + if (!(o = pa_context_get_sink_input_info (pbuf->context, idx, gst_pulsesink_sink_input_info_cb, pbuf))) goto info_failed; @@ -2239,7 +2207,7 @@ info_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_context_get_sink_input_info() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock; } } @@ -2248,7 +2216,6 @@ static gboolean gst_pulsesink_get_mute (GstPulseSink * psink) { GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; pa_operation *o = NULL; uint32_t idx; gboolean mute = FALSE; @@ -2266,9 +2233,7 @@ gst_pulsesink_get_mute (GstPulseSink * psink) if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX) goto no_index; - pctx = gst_pulsering_get_context (pbuf); - - if (!(o = pa_context_get_sink_input_info (pctx->context, idx, + if (!(o = pa_context_get_sink_input_info (pbuf->context, idx, gst_pulsesink_sink_input_info_cb, pbuf))) goto info_failed; @@ -2279,7 +2244,6 @@ gst_pulsesink_get_mute (GstPulseSink * psink) } unlock: - if (o) pa_operation_unref (o); @@ -2308,7 +2272,7 @@ info_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_context_get_sink_input_info() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock; } } @@ -2343,7 +2307,6 @@ static gchar * gst_pulsesink_device_description (GstPulseSink * psink) { GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; pa_operation *o = NULL; gchar *t; @@ -2355,9 +2318,7 @@ gst_pulsesink_device_description (GstPulseSink * psink) if (pbuf == NULL || pbuf->stream == NULL) goto no_buffer; - pctx = gst_pulsering_get_context (pbuf); - - if (!(o = pa_context_get_sink_info_by_index (pctx->context, + if (!(o = pa_context_get_sink_info_by_index (pbuf->context, pa_stream_get_device_index (pbuf->stream), gst_pulsesink_sink_info_cb, pbuf))) goto info_failed; @@ -2369,7 +2330,6 @@ gst_pulsesink_device_description (GstPulseSink * psink) } unlock: - if (o) pa_operation_unref (o); @@ -2393,7 +2353,7 @@ info_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_context_get_sink_info_by_index() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock; } } @@ -2489,7 +2449,6 @@ gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t) { pa_operation *o = NULL; GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; pa_threaded_mainloop_lock (psink->mainloop); @@ -2501,8 +2460,6 @@ gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t) g_free (pbuf->stream_name); pbuf->stream_name = g_strdup (t); - pctx = gst_pulsering_get_context (pbuf); - if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL))) goto name_failed; @@ -2525,7 +2482,7 @@ name_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_stream_set_name() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock; } } @@ -2551,7 +2508,6 @@ gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l) gboolean empty = TRUE; pa_operation *o = NULL; GstPulseRingBuffer *pbuf; - GstPulseContext *pctx; pl = pa_proplist_new (); @@ -2572,13 +2528,10 @@ gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l) goto finish; pa_threaded_mainloop_lock (psink->mainloop); - pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); if (pbuf == NULL || pbuf->stream == NULL) goto no_buffer; - pctx = gst_pulsering_get_context (pbuf); - if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE, pl, NULL, NULL))) goto update_failed; @@ -2608,7 +2561,7 @@ update_failed: { GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("pa_stream_proplist_update() failed: %s", - pa_strerror (pa_context_errno (pctx->context))), (NULL)); + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); goto unlock; } } @@ -2672,18 +2625,30 @@ static GstStateChangeReturn gst_pulsesink_change_state (GstElement * element, GstStateChange transition) { GstPulseSink *pulsesink = GST_PULSESINK (element); + GstPulseSinkClass *klass = GST_PULSESINK_GET_CLASS (pulsesink); GstStateChangeReturn ret; guint res; switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: - g_assert (pulsesink->mainloop == NULL); - pulsesink->mainloop = pa_threaded_mainloop_new (); - if (!pulsesink->mainloop) - goto mainloop_failed; - res = pa_threaded_mainloop_start (pulsesink->mainloop); - g_assert (res == 0); + g_mutex_lock (pa_shared_ressource_mutex); + if (!klass->main_loop_ref_ct) { + GST_INFO_OBJECT (element, "new pa main loop thread"); + klass->mainloop = pa_threaded_mainloop_new (); + if (!klass->mainloop) + goto mainloop_failed; + klass->main_loop_ref_ct = 1; + res = pa_threaded_mainloop_start (klass->mainloop); + g_assert (res == 0); + g_mutex_unlock (pa_shared_ressource_mutex); + pulsesink->mainloop = klass->mainloop; + } else { + GST_INFO_OBJECT (element, "reusing pa main loop thread"); + pulsesink->mainloop = klass->mainloop; + klass->main_loop_ref_ct++; + g_mutex_unlock (pa_shared_ressource_mutex); + } break; case GST_STATE_CHANGE_READY_TO_PAUSED: gst_element_post_message (element, @@ -2706,9 +2671,16 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_READY_TO_NULL: if (pulsesink->mainloop) { - pa_threaded_mainloop_stop (pulsesink->mainloop); - pa_threaded_mainloop_free (pulsesink->mainloop); + g_mutex_lock (pa_shared_ressource_mutex); + klass->main_loop_ref_ct--; pulsesink->mainloop = NULL; + if (!klass->main_loop_ref_ct) { + GST_INFO_OBJECT (element, "terminating pa main loop thread"); + pa_threaded_mainloop_stop (klass->mainloop); + pa_threaded_mainloop_free (klass->mainloop); + klass->mainloop = NULL; + } + g_mutex_unlock (pa_shared_ressource_mutex); } break; default: @@ -2720,6 +2692,7 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition) /* ERRORS */ mainloop_failed: { + g_mutex_unlock (pa_shared_ressource_mutex); GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("pa_threaded_mainloop_new() failed"), (NULL)); return GST_STATE_CHANGE_FAILURE; @@ -2729,9 +2702,16 @@ state_failure: if (transition == GST_STATE_CHANGE_NULL_TO_READY) { /* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */ g_assert (pulsesink->mainloop); - pa_threaded_mainloop_stop (pulsesink->mainloop); - pa_threaded_mainloop_free (pulsesink->mainloop); + g_mutex_lock (pa_shared_ressource_mutex); + klass->main_loop_ref_ct--; pulsesink->mainloop = NULL; + if (!klass->main_loop_ref_ct) { + GST_INFO_OBJECT (element, "terminating pa main loop thread"); + pa_threaded_mainloop_stop (klass->mainloop); + pa_threaded_mainloop_free (klass->mainloop); + klass->mainloop = NULL; + } + g_mutex_unlock (pa_shared_ressource_mutex); } return ret; } diff --git a/ext/pulse/pulsesink.h b/ext/pulse/pulsesink.h index 83a0bed2af..6ddade2f34 100644 --- a/ext/pulse/pulsesink.h +++ b/ext/pulse/pulsesink.h @@ -46,6 +46,9 @@ G_BEGIN_DECLS (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSESINK)) #define GST_PULSESINK_CAST(obj) \ ((GstPulseSink *)(obj)) +#define GST_PULSESINK_GET_CLASS(obj) \ + (G_TYPE_INSTANCE_GET_CLASS((obj),GST_TYPE_PULSESINK,GstPulseSinkClass)) + typedef struct _GstPulseSink GstPulseSink; typedef struct _GstPulseSinkClass GstPulseSinkClass; @@ -79,6 +82,9 @@ struct _GstPulseSink struct _GstPulseSinkClass { GstBaseAudioSinkClass parent_class; + + pa_threaded_mainloop *mainloop; + guint main_loop_ref_ct; }; GType gst_pulsesink_get_type (void);