diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 9ffa501320..13057729be 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -236,6 +236,7 @@ enum #define DEFAULT_SDES NULL #define DEFAULT_DO_LOST FALSE #define DEFAULT_IGNORE_PT FALSE +#define DEFAULT_NTP_SYNC FALSE #define DEFAULT_AUTOREMOVE FALSE #define DEFAULT_BUFFER_MODE RTP_JITTER_BUFFER_MODE_SLAVE #define DEFAULT_USE_PIPELINE_CLOCK FALSE @@ -247,6 +248,7 @@ enum PROP_SDES, PROP_DO_LOST, PROP_IGNORE_PT, + PROP_NTP_SYNC, PROP_AUTOREMOVE, PROP_BUFFER_MODE, PROP_USE_PIPELINE_CLOCK, @@ -299,10 +301,10 @@ struct _GstRtpBinStream gulong demux_ptreq_sig; gulong demux_ptchange_sig; - /* if we have calculated a valid unix_delta for this stream */ + /* if we have calculated a valid rt_delta for this stream */ gboolean have_sync; /* mapping to local RTP and NTP time */ - gint64 unix_delta; + gint64 rt_delta; }; #define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock) @@ -767,7 +769,7 @@ gst_rtp_bin_reset_sync (GstRtpBin * rtpbin) /* make use require a new SR packet for this stream before we attempt new * lip-sync */ stream->have_sync = FALSE; - stream->unix_delta = 0; + stream->rt_delta = 0; } } GST_RTP_BIN_UNLOCK (rtpbin); @@ -887,19 +889,102 @@ free_client (GstRtpBinClient * client, GstRtpBin * bin) g_free (client); } +static void +get_current_times (GstRtpBin * bin, GstClockTime * running_time, + guint64 * ntpnstime) +{ + guint64 ntpns; + GstClock *clock; + GstClockTime base_time, rt, clock_time; + + GST_OBJECT_LOCK (bin); + if ((clock = GST_ELEMENT_CLOCK (bin))) { + base_time = GST_ELEMENT_CAST (bin)->base_time; + gst_object_ref (clock); + GST_OBJECT_UNLOCK (bin); + + clock_time = gst_clock_get_time (clock); + + if (bin->use_pipeline_clock) { + ntpns = clock_time; + } else { + GTimeVal current; + + /* get current NTP time */ + g_get_current_time (¤t); + ntpns = GST_TIMEVAL_TO_TIME (current); + } + + /* add constant to convert from 1970 based time to 1900 based time */ + ntpns += (2208988800LL * GST_SECOND); + + /* get current clock time and convert to running time */ + rt = clock_time - base_time; + + gst_object_unref (clock); + } else { + GST_OBJECT_UNLOCK (bin); + rt = -1; + ntpns = -1; + } + if (running_time) + *running_time = rt; + if (ntpnstime) + *ntpnstime = ntpns; +} + +static void +stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream, + gint64 ts_offset) +{ + gint64 prev_ts_offset; + + g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL); + + /* delta changed, see how much */ + if (prev_ts_offset != ts_offset) { + gint64 diff; + + diff = prev_ts_offset - ts_offset; + + GST_DEBUG_OBJECT (bin, + "ts-offset %" G_GINT64_FORMAT ", prev %" G_GINT64_FORMAT + ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff); + + /* only change diff when it changed more than 4 milliseconds. This + * compensates for rounding errors in NTP to RTP timestamp + * conversions */ + if (ABS (diff) > 4 * GST_MSECOND) { + if (ABS (diff) < (3 * GST_SECOND)) { + g_object_set (stream->buffer, "ts-offset", ts_offset, NULL); + } else { + GST_WARNING_OBJECT (bin, "offset unusually large, ignoring"); + } + } else { + GST_DEBUG_OBJECT (bin, "offset too small, ignoring"); + } + } + GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, + stream->ssrc, ts_offset); +} + /* associate a stream to the given CNAME. This will make sure all streams for * that CNAME are synchronized together. * Must be called with GST_RTP_BIN_LOCK */ static void gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, - guint8 * data, guint64 last_unix, guint64 last_extrtptime, + guint8 * data, guint64 ntptime, guint64 last_extrtptime, guint64 base_rtptime, guint64 base_time, guint clock_rate) { GstRtpBinClient *client; gboolean created; GSList *walk; - guint64 local_unix; + guint64 local_rt; guint64 local_rtp; + GstClockTime running_time; + guint64 ntpnstime; + gint64 ntpdiff, rtdiff; + guint64 last_unix; /* first find or create the CNAME */ client = get_client (bin, len, data, &created); @@ -924,9 +1009,10 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, stream->ssrc, client, client->cname); } - /* take the extended rtptime we found in the SR packet and map it to the + /* Take the extended rtptime we found in the SR packet and map it to the * local rtptime. The local rtp time is used to construct timestamps on the - * buffers. */ + * buffers so we will calculate what running_time corresponds to the RTP + * timestamp in the SR packet. */ local_rtp = last_extrtptime - base_rtptime; GST_DEBUG_OBJECT (bin, @@ -934,29 +1020,63 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", base_rtptime, last_extrtptime, local_rtp, clock_rate); - /* calculate local NTP time in gstreamer timestamp, we essentially perform the + /* calculate local RTP time in gstreamer timestamp, we essentially perform the * same conversion that a jitterbuffer would use to convert an rtp timestamp - * into a corresponding gstreamer timestamp. */ - local_unix = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate); - local_unix += base_time; + * into a corresponding gstreamer timestamp. Note that the base_time also + * contains the drift between sender and receiver. */ + local_rt = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate); + local_rt += base_time; + + /* convert ntptime to unix time since 1900 */ + last_unix = gst_util_uint64_scale (ntptime, GST_SECOND, + (G_GINT64_CONSTANT (1) << 32)); - /* calculate delta between server and receiver. last_unix is created by - * converting the ntptime in the last SR packet to a gstreamer timestamp. This - * delta expresses the difference to our timeline and the server timeline. */ - stream->unix_delta = last_unix - local_unix; stream->have_sync = TRUE; GST_DEBUG_OBJECT (bin, - "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT - ", delta %" G_GINT64_FORMAT, local_unix, last_unix, stream->unix_delta); + "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT, + local_rt, last_unix); /* recalc inter stream playout offset, but only if there is more than one - * stream. */ - if (client->nstreams > 1) { + * stream or we're doing NTP sync. */ + if (bin->ntp_sync) { + /* For NTP sync we need to first get a snapshot of running_time and NTP + * time. We know at what running_time we play a certain RTP time, we also + * calculated when we would play the RTP time in the SR packet. Now we need + * to know how the running_time and the NTP time relate to eachother. */ + get_current_times (bin, &running_time, &ntpnstime); + + /* see how far away the NTP time is. This is the difference between the + * current NTP time and the NTP time in the last SR packet. */ + ntpdiff = ntpnstime - last_unix; + /* see how far away the running_time is. This is the difference between the + * current running_time and the running_time of the RTP timestamp in the + * last SR packet. */ + rtdiff = running_time - local_rt; + + GST_DEBUG_OBJECT (bin, + "NTP time %" G_GUINT64_FORMAT ", last unix %" G_GUINT64_FORMAT, + ntpnstime, last_unix); + GST_DEBUG_OBJECT (bin, + "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff, + rtdiff); + + /* combine to get the final diff to apply to the running_time */ + stream->rt_delta = rtdiff - ntpdiff; + + stream_set_ts_offset (bin, stream, stream->rt_delta); + } else if (client->nstreams > 1) { gint64 min; + /* calculate delta between server and receiver. last_unix is created by + * converting the ntptime in the last SR packet to a gstreamer timestamp. This + * delta expresses the difference to our timeline and the server timeline. The + * difference in itself doesn't mean much but we can combine the delta of + * multiple streams to create a stream specific offset. */ + stream->rt_delta = last_unix - local_rt; + /* calculate the min of all deltas, ignoring streams that did not yet have a - * valid unix_delta because we did not yet receive an SR packet for those + * valid rt_delta because we did not yet receive an SR packet for those * streams. * We calculate the mininum because we would like to only apply positive * offsets to streams, delaying their playback instead of trying to speed up @@ -971,8 +1091,8 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, if (!ostream->have_sync) continue; - if (ostream->unix_delta < min) - min = ostream->unix_delta; + if (ostream->rt_delta < min) + min = ostream->rt_delta; } GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client, @@ -981,7 +1101,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, /* calculate offsets for each stream */ for (walk = client->streams; walk; walk = g_slist_next (walk)) { GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; - gint64 ts_offset, prev_ts_offset; + gint64 ts_offset; /* ignore streams for which we didn't receive an SR packet yet, we * can't synchronize them yet. We can however sync other streams just @@ -991,32 +1111,9 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, /* calculate offset to our reference stream, this should always give a * positive number. */ - ts_offset = ostream->unix_delta - min; + ts_offset = ostream->rt_delta - min; - g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL); - - /* delta changed, see how much */ - if (prev_ts_offset != ts_offset) { - gint64 diff; - - if (prev_ts_offset > ts_offset) - diff = prev_ts_offset - ts_offset; - else - diff = ts_offset - prev_ts_offset; - - GST_DEBUG_OBJECT (bin, - "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT - ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff); - - /* only change diff when it changed more than 4 milliseconds. This - * compensates for rounding errors in NTP to RTP timestamp - * conversions */ - if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) { - g_object_set (ostream->buffer, "ts-offset", ts_offset, NULL); - } - } - GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, - ostream->ssrc, ts_offset); + stream_set_ts_offset (bin, ostream, ts_offset); } } return; @@ -1114,8 +1211,7 @@ gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s, GST_RTP_BIN_LOCK (bin); /* associate the stream to CNAME */ gst_rtp_bin_associate (bin, stream, len, data, - gst_rtcp_ntp_to_unix (ntptime), extrtptime, - base_rtptime, base_time, clock_rate); + ntptime, extrtptime, base_rtptime, base_time, clock_rate); GST_RTP_BIN_UNLOCK (bin); } } @@ -1158,7 +1254,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) stream->demux = demux; stream->have_sync = FALSE; - stream->unix_delta = 0; + stream->rt_delta = 0; stream->percent = 100; session->streams = g_slist_prepend (session->streams, stream); @@ -1547,7 +1643,6 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) g_param_spec_boolean ("use-pipeline-clock", "Use pipeline clock", "Use the pipeline clock to set the NTP time in the RTCP SR messages", DEFAULT_AUTOREMOVE, G_PARAM_READWRITE)); - /** * GstRtpBin::buffer-mode: * @@ -1559,6 +1654,19 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) g_param_spec_enum ("buffer-mode", "Buffer Mode", "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE, DEFAULT_BUFFER_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin::ntp-sync: + * + * Synchronize received streams to the NTP clock. When the NTP clock is shared + * between the receivers and the senders (such as when using ntpd) this option + * can be used to synchronize receivers on multiple machines. + * + * Since: 0.10.21 + */ + g_object_class_install_property (gobject_class, PROP_NTP_SYNC, + g_param_spec_boolean ("ntp-sync", "Sync on NTP clock", + "Synchronize received streams to the NTP clock", DEFAULT_NTP_SYNC, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); gstelement_class->request_new_pad = @@ -1588,6 +1696,7 @@ gst_rtp_bin_init (GstRtpBin * rtpbin, GstRtpBinClass * klass) rtpbin->latency_ns = DEFAULT_LATENCY_MS * GST_MSECOND; rtpbin->do_lost = DEFAULT_DO_LOST; rtpbin->ignore_pt = DEFAULT_IGNORE_PT; + rtpbin->ntp_sync = DEFAULT_NTP_SYNC; rtpbin->priv->autoremove = DEFAULT_AUTOREMOVE; rtpbin->buffer_mode = DEFAULT_BUFFER_MODE; rtpbin->use_pipeline_clock = DEFAULT_USE_PIPELINE_CLOCK; @@ -1698,6 +1807,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, GST_RTP_BIN_UNLOCK (rtpbin); gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin, "do-lost", value); break; + case PROP_NTP_SYNC: + rtpbin->ntp_sync = g_value_get_boolean (value); + break; case PROP_IGNORE_PT: rtpbin->ignore_pt = g_value_get_boolean (value); break; @@ -1755,6 +1867,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, case PROP_IGNORE_PT: g_value_set_boolean (value, rtpbin->ignore_pt); break; + case PROP_NTP_SYNC: + g_value_set_boolean (value, rtpbin->ntp_sync); + break; case PROP_AUTOREMOVE: g_value_set_boolean (value, rtpbin->priv->autoremove); break; diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index b64f86eb09..74aaac2453 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -49,6 +49,7 @@ struct _GstRtpBin { guint64 latency_ns; gboolean do_lost; gboolean ignore_pt; + gboolean ntp_sync; RTPJitterBufferMode buffer_mode; gboolean buffering; gboolean use_pipeline_clock;