diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c index ad8282cc70..dc3c777c12 100644 --- a/ext/pulse/pulsesink.c +++ b/ext/pulse/pulsesink.c @@ -54,6 +54,7 @@ #include #include #include +#include #include /* only used for GST_PLUGINS_BASE_VERSION_* */ @@ -138,7 +139,7 @@ struct _GstPulseRingBuffer pa_context *context; pa_stream *stream; -#if HAVE_PULSE_1_0 +#ifdef HAVE_PULSE_1_0 pa_format_info *format; guint channels; #else @@ -712,11 +713,60 @@ gst_pulsering_stream_event_cb (pa_stream * p, const char *name, gst_element_post_message (GST_ELEMENT_CAST (psink), gst_message_new_request_state (GST_OBJECT_CAST (psink), GST_STATE_PLAYING)); +#ifdef HAVE_PULSE_1_0 + } else if (!strcmp (name, PA_STREAM_EVENT_FORMAT_LOST)) { + GstEvent *renego; + + if (g_atomic_int_get (&psink->format_lost)) { + /* Duplicate event before we're done reconfiguring, discard */ + return; + } + + GST_DEBUG_OBJECT (psink, "got FORMAT LOST"); + g_atomic_int_set (&psink->format_lost, 1); + psink->format_lost_time = g_ascii_strtoull (pa_proplist_gets (pl, + "stream-time"), NULL, 0) * 1000; + + g_free (psink->device); + psink->device = g_strdup (pa_proplist_gets (pl, "device")); + + renego = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("pulse-format-lost", NULL)); + + if (!gst_pad_push_event (GST_BASE_SINK (psink)->sinkpad, renego)) { + /* Nobody handled the format change - emit an error */ + GST_ELEMENT_ERROR (psink, STREAM, FORMAT, ("Sink format changed"), + ("Sink format changed")); + } +#endif } else { GST_DEBUG_OBJECT (psink, "got unknown event %s", name); } } +/* Called with the mainloop locked */ +static gboolean +gst_pulsering_wait_for_stream_ready (GstPulseSink * psink, pa_stream * stream) +{ + pa_stream_state_t state; + + for (;;) { + state = pa_stream_get_state (stream); + + GST_LOG_OBJECT (psink, "stream state is now %d", state); + + if (!PA_STREAM_IS_GOOD (state)) + return FALSE; + + if (state == PA_STREAM_READY) + return TRUE; + + /* Wait until the stream is ready */ + pa_threaded_mainloop_wait (mainloop); + } +} + + /* This method should create a new stream of the given @spec. No playback should * start yet so we start in the corked state. */ static gboolean @@ -737,6 +787,9 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) GstAudioClock *clock; #ifdef HAVE_PULSE_1_0 pa_format_info *formats[1]; +#ifndef GST_DISABLE_GST_DEBUG + gchar print_buf[PA_FORMAT_INFO_SNPRINT_MAX]; +#endif #endif psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); @@ -833,6 +886,10 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) #ifdef HAVE_PULSE_1_0 if (pa_format_info_is_pcm (pbuf->format)) gst_pulse_cvolume_from_linear (pv, pbuf->channels, psink->volume); + else { + GST_DEBUG_OBJECT (psink, "passthrough stream, not setting volume"); + pv = NULL; + } #else gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels, psink->volume); @@ -863,22 +920,19 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock); gst_audio_clock_reset (clock, 0); - for (;;) { - pa_stream_state_t state; + if (!gst_pulsering_wait_for_stream_ready (psink, pbuf->stream)) + goto connect_failed; - state = pa_stream_get_state (pbuf->stream); +#ifdef HAVE_PULSE_1_0 + g_free (psink->device); + psink->device = g_strdup (pa_stream_get_device_name (pbuf->stream)); - GST_LOG_OBJECT (psink, "stream state is now %d", state); - - if (!PA_STREAM_IS_GOOD (state)) - goto connect_failed; - - if (state == PA_STREAM_READY) - break; - - /* Wait until the stream is ready */ - pa_threaded_mainloop_wait (mainloop); - } +#ifndef GST_DISABLE_GST_DEBUG + pa_format_info_snprint (print_buf, sizeof (print_buf), + pa_stream_get_format_info (pbuf->stream)); + GST_INFO_OBJECT (psink, "negotiated to: %s", print_buf); +#endif +#endif /* After we passed the volume off of to PA we never want to set it again, since it is PA's job to save/restore volumes. */ @@ -945,13 +999,20 @@ static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf) { GstPulseRingBuffer *pbuf; + GstPulseSink *psink; pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); pa_threaded_mainloop_lock (mainloop); gst_pulsering_destroy_stream (pbuf); pa_threaded_mainloop_unlock (mainloop); +#ifdef HAVE_PULSE_1_0 + g_atomic_int_set (&psink->format_lost, FALSE); + psink->format_lost_time = GST_CLOCK_TIME_NONE; +#endif + return TRUE; } @@ -973,6 +1034,13 @@ gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked, psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); +#ifdef HAVE_PULSE_1_0 + if (g_atomic_int_get (&psink->format_lost)) { + /* Sink format changed, stream's gone so fake being paused */ + return TRUE; + } +#endif + GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked); if (pbuf->corked != corked) { if (!(o = pa_stream_cork (pbuf->stream, corked, @@ -1146,13 +1214,22 @@ gst_pulseringbuffer_stop (GstRingBuffer * buf) psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); pa_threaded_mainloop_lock (mainloop); + pbuf->paused = TRUE; res = gst_pulsering_set_corked (pbuf, TRUE, TRUE); + /* Inform anyone waiting in _commit() call that it shall wakeup */ if (pbuf->in_commit) { GST_DEBUG_OBJECT (psink, "signal commit thread"); pa_threaded_mainloop_signal (mainloop, 0); } +#ifdef HAVE_PULSE_1_0 + if (g_atomic_int_get (&psink->format_lost)) { + /* Don't try to flush, the stream's probably gone by now */ + res = TRUE; + goto cleanup; + } +#endif /* then try to flush, it's not fatal when this fails */ GST_DEBUG_OBJECT (psink, "flushing"); @@ -1260,7 +1337,6 @@ G_STMT_START { \ GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess); \ } G_STMT_END - /* our custom commit function because we write into the buffer of pulseaudio * instead of keeping our own buffer */ static guint @@ -1299,6 +1375,7 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, } pa_threaded_mainloop_lock (mainloop); + GST_DEBUG_OBJECT (psink, "entering commit"); pbuf->in_commit = TRUE; @@ -1323,6 +1400,13 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, * needed to properly handle reverse playback: it points to the last sample. */ data_end = data + (bps * inr); +#ifdef HAVE_PULSE_1_0 + if (g_atomic_int_get (&psink->format_lost)) { + /* Sink format changed, drop the data and hope upstream renegotiates */ + goto fake_done; + } +#endif + if (pbuf->paused) goto was_paused; @@ -1372,6 +1456,13 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, for (;;) { pbuf->m_writable = pa_stream_writable_size (pbuf->stream); +#ifdef HAVE_PULSE_1_0 + if (g_atomic_int_get (&psink->format_lost)) { + /* Sink format changed, give up and hope upstream renegotiates */ + goto fake_done; + } +#endif + if (pbuf->m_writable == (size_t) - 1) goto writable_size_failed; @@ -1430,6 +1521,14 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT, (guint) avail, offset); +#ifdef HAVE_PULSE_1_0 + /* No trick modes for passthrough streams */ + if (G_UNLIKELY (inr != outr || reverse)) { + GST_WARNING_OBJECT (psink, "Passthrough stream can't run in trick mode"); + goto unlock_and_fail; + } +#endif + if (G_LIKELY (inr == outr && !reverse)) { /* no rate conversion, simply write out the samples */ /* copy the data into internal buffer */ @@ -1509,6 +1608,10 @@ gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, } } } + +#ifdef HAVE_PULSE_1_0 +fake_done: +#endif /* we consumed all samples here */ data = data_end + bps; @@ -1730,8 +1833,16 @@ gst_pulsesink_base_init (gpointer g_class) "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ];" "audio/x-mulaw, " - "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]") - ); + "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ];" +#ifdef HAVE_PULSE_1_0 + "audio/x-ac3, framed = (boolean) true;" + "audio/x-eac3, framed = (boolean) true; " + "audio/x-dts, framed = (boolean) true, " + " block_size = (int) { 512, 1024, 2048 }; " + "audio/mpeg, mpegversion = (int)1, " + " mpegaudioversion = (int) [ 1, 2 ], parsed = (boolean) true; " +#endif + )); GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); @@ -1754,6 +1865,40 @@ gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink) return buffer; } +static GstBuffer * +gst_pulsesink_payload (GstBaseAudioSink * sink, GstBuffer * buf) +{ + switch (sink->ringbuffer->spec.type) { + case GST_BUFTYPE_AC3: + case GST_BUFTYPE_EAC3: + case GST_BUFTYPE_DTS: + case GST_BUFTYPE_MPEG: + { + /* FIXME: alloc memory from PA if possible */ + gint framesize = gst_audio_iec61937_frame_size (&sink->ringbuffer->spec); + GstBuffer *out; + + if (framesize <= 0) + return NULL; + + out = gst_buffer_new_and_alloc (framesize); + + if (!gst_audio_iec61937_payload (GST_BUFFER_DATA (buf), + GST_BUFFER_SIZE (buf), GST_BUFFER_DATA (out), + GST_BUFFER_SIZE (out), &sink->ringbuffer->spec)) { + gst_buffer_unref (out); + return NULL; + } + + gst_buffer_copy_metadata (out, buf, GST_BUFFER_COPY_ALL); + return out; + } + + default: + return gst_buffer_ref (buf); + } +} + static void gst_pulsesink_class_init (GstPulseSinkClass * klass) { @@ -1778,6 +1923,7 @@ gst_pulsesink_class_init (GstPulseSinkClass * klass) gstaudiosink_class->create_ringbuffer = GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer); + gstaudiosink_class->payload = GST_DEBUG_FUNCPTR (gst_pulsesink_payload); /* Overwrite GObject fields */ g_object_class_install_property (gobject_class, @@ -1860,6 +2006,14 @@ gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink) pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer); psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); +#ifdef HAVE_PULSE_1_0 + if (g_atomic_int_get (&psink->format_lost)) { + /* Stream was lost in a format change, it'll get set up again once + * upstream renegotiates */ + return psink->format_lost_time; + } +#endif + pa_threaded_mainloop_lock (mainloop); if (gst_pulsering_is_dead (psink, pbuf, TRUE)) goto server_dead; @@ -1888,6 +2042,181 @@ server_dead: } } +static void +gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol, + void *userdata) +{ + GstPulseRingBuffer *pbuf; + GstPulseSink *psink; +#ifdef HAVE_PULSE_1_0 + GList *l; + guint8 j; +#endif + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + if (!i) + goto done; + + g_free (psink->device_description); + psink->device_description = g_strdup (i->description); + +#ifdef HAVE_PULSE_1_0 + g_mutex_lock (psink->sink_formats_lock); + + for (l = g_list_first (psink->sink_formats); l; l = g_list_next (l)) + pa_format_info_free ((pa_format_info *) l->data); + + g_list_free (psink->sink_formats); + psink->sink_formats = NULL; + + for (j = 0; j < i->n_formats; j++) + psink->sink_formats = g_list_prepend (psink->sink_formats, + pa_format_info_copy (i->formats[j])); + + g_mutex_unlock (psink->sink_formats_lock); +#endif + +done: + pa_threaded_mainloop_signal (mainloop, 0); +} + +#ifdef HAVE_PULSE_1_0 +static gboolean +gst_pulsesink_pad_acceptcaps (GstPad * pad, GstCaps * caps) +{ + GstPulseSink *psink = GST_PULSESINK (gst_pad_get_parent_element (pad)); + GstPulseRingBuffer *pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK + (psink)->ringbuffer); + GstCaps *pad_caps; + GstStructure *st; + gboolean ret = FALSE; + + GstRingBufferSpec spec = { 0 }; + pa_stream *stream = NULL; + pa_operation *o = NULL; + pa_channel_map channel_map; + pa_stream_flags_t flags; + pa_format_info *format = NULL, *formats[1]; + guint channels; + + pad_caps = gst_pad_get_caps_reffed (pad); + if (pad_caps) { + ret = gst_caps_can_intersect (pad_caps, caps); + gst_caps_unref (pad_caps); + } + + /* Either template caps didn't match, or we're still in NULL state */ + if (!ret || !pbuf->context) + goto done; + + /* If we've not got fixed caps, creating a stream might fail, so let's just + * return from here with default acceptcaps behaviour */ + if (!gst_caps_is_fixed (caps)) + goto done; + + ret = FALSE; + + pa_threaded_mainloop_lock (mainloop); + + spec.latency_time = GST_BASE_AUDIO_SINK (psink)->latency_time; + if (!gst_ring_buffer_parse_caps (&spec, caps)) + goto out; + + if (!gst_pulse_fill_format_info (&spec, &format, &channels)) + goto out; + + /* Make sure input is framed (one frame per buffer) and can be payloaded */ + if (!pa_format_info_is_pcm (format)) { + gboolean framed = FALSE, parsed = FALSE; + st = gst_caps_get_structure (caps, 0); + + gst_structure_get_boolean (st, "framed", &framed); + gst_structure_get_boolean (st, "parsed", &parsed); + if ((!framed && !parsed) || gst_audio_iec61937_frame_size (&spec) <= 0) + goto out; + } + + /* initialize the channel map */ + if (pa_format_info_is_pcm (format) && + gst_pulse_gst_to_channel_map (&channel_map, &spec)) + pa_format_info_set_channel_map (format, &channel_map); + + if (pbuf->stream) { + /* We're already in PAUSED or above, so just reuse this stream to query + * sink formats and use those. */ + GList *i; + + if (!(o = pa_context_get_sink_info_by_name (pbuf->context, psink->device, + gst_pulsesink_sink_info_cb, pbuf))) + goto info_failed; + + while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait (mainloop); + if (gst_pulsering_is_dead (psink, pbuf, TRUE)) + goto out; + } + + g_mutex_lock (psink->sink_formats_lock); + for (i = g_list_first (psink->sink_formats); i; i = g_list_next (i)) { + if (pa_format_info_is_compatible ((pa_format_info *) i->data, format)) { + ret = TRUE; + break; + } + } + g_mutex_unlock (psink->sink_formats_lock); + } else { + /* We're in READY, let's connect a stream to see if the format is + * accpeted by whatever sink we're routed to */ + formats[0] = format; + + if (!(stream = pa_stream_new_extended (pbuf->context, "pulsesink probe", + formats, 1, psink->proplist))) + goto out; + + /* construct the flags */ + flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE | + PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED; + + pa_stream_set_state_callback (stream, gst_pulsering_stream_state_cb, pbuf); + + if (pa_stream_connect_playback (stream, psink->device, NULL, flags, NULL, + NULL) < 0) + goto out; + + ret = gst_pulsering_wait_for_stream_ready (psink, stream); + } + +out: + if (format) + pa_format_info_free (format); + + if (o) + pa_operation_unref (o); + + if (stream) { + pa_stream_set_state_callback (stream, NULL, NULL); + pa_stream_disconnect (stream); + pa_stream_unref (stream); + } + + pa_threaded_mainloop_unlock (mainloop); + +done: + gst_object_unref (psink); + return ret; + +info_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_context_get_sink_input_info() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto out; + } +} +#endif + static void gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) { @@ -1896,6 +2225,11 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) pulsesink->device_description = NULL; pulsesink->client_name = gst_pulse_client_name (); +#ifdef HAVE_PULSE_1_0 + pulsesink->sink_formats_lock = g_mutex_new (); + pulsesink->sink_formats = NULL; +#endif + pulsesink->volume = DEFAULT_VOLUME; pulsesink->volume_set = FALSE; @@ -1904,6 +2238,11 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) pulsesink->notify = 0; +#ifdef HAVE_PULSE_1_0 + g_atomic_int_set (&pulsesink->format_lost, FALSE); + pulsesink->format_lost_time = GST_CLOCK_TIME_NONE; +#endif + pulsesink->properties = NULL; pulsesink->proplist = NULL; @@ -1915,6 +2254,11 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) gst_audio_clock_new ("GstPulseSinkClock", (GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink); +#ifdef HAVE_PULSE_1_0 + gst_pad_set_acceptcaps_function (GST_BASE_SINK (pulsesink)->sinkpad, + GST_DEBUG_FUNCPTR (gst_pulsesink_pad_acceptcaps)); +#endif + /* TRUE for sinks, FALSE for sources */ pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, @@ -1925,12 +2269,23 @@ static void gst_pulsesink_finalize (GObject * object) { GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); +#ifdef HAVE_PULSE_1_0 + GList *i; +#endif g_free (pulsesink->server); g_free (pulsesink->device); g_free (pulsesink->device_description); g_free (pulsesink->client_name); +#ifdef HAVE_PULSE_1_0 + for (i = g_list_first (pulsesink->sink_formats); i; i = g_list_next (i)) + pa_format_info_free ((pa_format_info *) i->data); + + g_list_free (pulsesink->sink_formats); + g_mutex_free (pulsesink->sink_formats_lock); +#endif + if (pulsesink->properties) gst_structure_free (pulsesink->properties); if (pulsesink->proplist) @@ -1969,6 +2324,10 @@ gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume) #ifdef HAVE_PULSE_1_0 if (pa_format_info_is_pcm (pbuf->format)) gst_pulse_cvolume_from_linear (&v, pbuf->channels, volume); + else + /* FIXME: this will eventually be superceded by checks to see if the volume + * is readable/writable */ + goto unlock; #else gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume); #endif @@ -2248,26 +2607,6 @@ info_failed: } } -static void -gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol, - void *userdata) -{ - GstPulseRingBuffer *pbuf; - GstPulseSink *psink; - - pbuf = GST_PULSERING_BUFFER_CAST (userdata); - psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - - if (!i) - goto done; - - g_free (psink->device_description); - psink->device_description = g_strdup (i->description); - -done: - pa_threaded_mainloop_signal (mainloop, 0); -} - static gchar * gst_pulsesink_device_description (GstPulseSink * psink) { @@ -2664,6 +3003,7 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition) gst_message_new_clock_provide (GST_OBJECT_CAST (element), GST_BASE_AUDIO_SINK (pulsesink)->provided_clock, TRUE)); break; + default: break; } @@ -2674,6 +3014,7 @@ gst_pulsesink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: + /* format_lost is reset in release() in baseaudiosink */ gst_element_post_message (element, gst_message_new_clock_lost (GST_OBJECT_CAST (element), GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)); diff --git a/ext/pulse/pulsesink.h b/ext/pulse/pulsesink.h index e3029295ef..ad8831f393 100644 --- a/ext/pulse/pulsesink.h +++ b/ext/pulse/pulsesink.h @@ -72,6 +72,13 @@ struct _GstPulseSink GstStructure *properties; pa_proplist *proplist; + +#ifdef HAVE_PULSE_1_0 + GMutex *sink_formats_lock; + GList *sink_formats; + volatile gint format_lost; + GstClockTime format_lost_time; +#endif }; struct _GstPulseSinkClass diff --git a/ext/pulse/pulseutil.c b/ext/pulse/pulseutil.c index a634b71f64..0d8af79d90 100644 --- a/ext/pulse/pulseutil.c +++ b/ext/pulse/pulseutil.c @@ -167,6 +167,14 @@ gst_pulse_fill_format_info (GstRingBufferSpec * spec, pa_format_info ** f, } else if (spec->format == GST_S24_BE && spec->width == 32) { format->encoding = PA_ENCODING_PCM; sf = PA_SAMPLE_S24_32BE; + } else if (spec->format == GST_AC3) { + format->encoding = PA_ENCODING_AC3_IEC61937; + } else if (spec->format == GST_EAC3) { + format->encoding = PA_ENCODING_EAC3_IEC61937; + } else if (spec->format == GST_DTS) { + format->encoding = PA_ENCODING_DTS_IEC61937; + } else if (spec->format == GST_MPEG) { + format->encoding = PA_ENCODING_MPEG_IEC61937; } else { goto fail; }