From 4421c3de75d44f83c28590d510dee81b762e0918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 17 Apr 2024 13:30:42 +0300 Subject: [PATCH] rtpbin: Handle ntp-sync=true before everything else This simplifies the code as it's a much simpler case than the normal inter-stream synchronization, and interleaving it with that only reduces readability of the code. Also improve some debug output in this code path. Part-of: --- .../gst/rtpmanager/gstrtpbin.c | 406 ++++++++++-------- 1 file changed, 226 insertions(+), 180 deletions(-) diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpbin.c b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpbin.c index 708757ddc4..89690397fe 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpbin.c +++ b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpbin.c @@ -1481,6 +1481,88 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, return; } + /* In case of NTP sync we can directly calculate the offset for this stream + * here and return immediately */ + if (bin->ntp_sync) { + GstClockTime rtp_running_time, diff_rtp; + GstClockTime local_running_time, local_ntpnstime; + gint64 ntpdiff, rtdiff; + + GST_DEBUG_OBJECT (bin, "Doing NTP sync"); + + if (!GST_CLOCK_TIME_IS_VALID (extrtptime) + || !GST_CLOCK_TIME_IS_VALID (ntpnstime) + || extrtptime < base_rtptime) { + GST_DEBUG_OBJECT (bin, "invalidated sync data, bailing out"); + return; + } + + /* Take the extended rtptime we found in the SR packet and map it to the + * local rtptime. The local rtp time is used to construct timestamps on the + * buffers so we will calculate what running_time corresponds to the RTP + * timestamp in the SR packet. */ + diff_rtp = extrtptime - base_rtptime; + + GST_DEBUG_OBJECT (bin, + "base RTP time %" G_GUINT64_FORMAT ", SR RTP time %" G_GUINT64_FORMAT + ", RTP time difference %" G_GUINT64_FORMAT ", clock-rate %d", + base_rtptime, extrtptime, diff_rtp, clock_rate); + + /* calculate local RTP time in GStreamer timestamp units, we essentially + * perform the same conversion that a jitterbuffer would use to convert an + * rtp timestamp into a corresponding gstreamer timestamp. Note that the + * base_time also contains the drift between sender and receiver. */ + rtp_running_time = + gst_util_uint64_scale_int (diff_rtp, GST_SECOND, clock_rate); + rtp_running_time += base_time; + + GST_DEBUG_OBJECT (bin, + "RTP running time %" GST_TIME_FORMAT ", SR NTP time %" GST_TIME_FORMAT, + GST_TIME_ARGS (rtp_running_time), GST_TIME_ARGS (ntpnstime)); + + /* For NTP sync we need to first get a snapshot of running_time and NTP + * time. We know at what running_time we play a certain RTP time, we also + * calculated when we would play the RTP time in the SR packet. Now we need + * to know how the running_time and the NTP time relate to each other. */ + get_current_times (bin, &local_running_time, &local_ntpnstime); + + /* see how far away the NTP time is. This is the difference between the + * current NTP time and the NTP time in the last SR packet. */ + ntpdiff = local_ntpnstime - ntpnstime; + + /* see how far away the running_time is. This is the difference between the + * current running_time and the running_time of the RTP timestamp in the + * last SR packet. */ + rtdiff = local_running_time - rtp_running_time; + + GST_DEBUG_OBJECT (bin, + "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT, + local_ntpnstime, ntpnstime); + GST_DEBUG_OBJECT (bin, + "local running time %" G_GUINT64_FORMAT ", SR RTP running time %" + G_GUINT64_FORMAT, local_running_time, rtp_running_time); + GST_DEBUG_OBJECT (bin, + "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff, + rtdiff); + + /* combine to get the final diff to apply to the running_time */ + stream->rt_delta = rtdiff - ntpdiff; + + GST_DEBUG_OBJECT (bin, + "Calculated ts-offset %" GST_STIME_FORMAT " for SSRC %08x", + GST_STIME_ARGS (stream->rt_delta), stream->ssrc); + + stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset, + bin->min_ts_offset, FALSE); + + gst_rtp_bin_send_sync_event (stream); + + return; + } + + /* For all other cases (not RFC7273 and not NTP sync) we have to look how + * all streams of a client relate to each other */ + GstRtpBinClient *client; gboolean created; GSList *walk; @@ -1548,195 +1630,159 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, "SR RTP running time %" G_GUINT64_FORMAT ", SR NTP %" G_GUINT64_FORMAT, running_time, ntpnstime); - /* recalc inter stream playout offset, but only if there is more than one - * stream or we're doing NTP sync. */ - if (bin->ntp_sync) { - gint64 ntpdiff, rtdiff; - guint64 local_ntpnstime; - GstClockTime local_running_time; + /* recalc inter stream playout offset, but only if there is more than one stream. */ + gint64 min, rtp_min, clock_base; + gboolean all_sync, use_rtp; + gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync); - /* For NTP sync we need to first get a snapshot of running_time and NTP - * time. We know at what running_time we play a certain RTP time, we also - * calculated when we would play the RTP time in the SR packet. Now we need - * to know how the running_time and the NTP time relate to each other. */ - get_current_times (bin, &local_running_time, &local_ntpnstime); + /* calculate delta between server and receiver. ntpnstime is created by + * converting the ntptime in the last SR packet to a gstreamer timestamp. This + * delta expresses the difference to our timeline and the server timeline. The + * difference in itself doesn't mean much but we can combine the delta of + * multiple streams to create a stream specific offset. */ + stream->rt_delta = ntpnstime - running_time; - /* see how far away the NTP time is. This is the difference between the - * current NTP time and the NTP time in the last SR packet. */ - ntpdiff = local_ntpnstime - ntpnstime; - /* see how far away the running_time is. This is the difference between the - * current running_time and the running_time of the RTP timestamp in the - * last SR packet. */ - rtdiff = local_running_time - running_time; + /* calculate the min of all deltas, ignoring streams that did not yet have a + * valid rt_delta because we did not yet receive an SR packet for those + * streams. + * We calculate the minimum because we would like to only apply positive + * offsets to streams, delaying their playback instead of trying to speed up + * other streams (which might be impossible when we have to create negative + * latencies). + * The stream that has the smallest diff is selected as the reference stream, + * all other streams will have a positive offset to this difference. */ - GST_DEBUG_OBJECT (bin, - "local NTP time %" G_GUINT64_FORMAT ", SR NTP time %" G_GUINT64_FORMAT, - local_ntpnstime, ntpnstime); - GST_DEBUG_OBJECT (bin, - "local running time %" G_GUINT64_FORMAT ", SR RTP running time %" - G_GUINT64_FORMAT, local_running_time, running_time); - GST_DEBUG_OBJECT (bin, - "NTP diff %" G_GINT64_FORMAT ", RT diff %" G_GINT64_FORMAT, ntpdiff, - rtdiff); + /* some alternative setting allow ignoring RTCP as much as possible, + * for servers generating bogus ntp timeline */ + min = rtp_min = G_MAXINT64; + use_rtp = FALSE; + if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) { + guint64 ext_base = -1; + gint64 rtp_delta = 0; - /* combine to get the final diff to apply to the running_time */ - stream->rt_delta = rtdiff - ntpdiff; - - stream_set_ts_offset (bin, stream, stream->rt_delta, bin->max_ts_offset, - bin->min_ts_offset, FALSE); - } else { - gint64 min, rtp_min, clock_base; - gboolean all_sync, use_rtp; - gboolean rtcp_sync = g_atomic_int_get (&bin->rtcp_sync); - - /* calculate delta between server and receiver. ntpnstime is created by - * converting the ntptime in the last SR packet to a gstreamer timestamp. This - * delta expresses the difference to our timeline and the server timeline. The - * difference in itself doesn't mean much but we can combine the delta of - * multiple streams to create a stream specific offset. */ - stream->rt_delta = ntpnstime - running_time; - - /* calculate the min of all deltas, ignoring streams that did not yet have a - * valid rt_delta because we did not yet receive an SR packet for those - * streams. - * We calculate the minimum because we would like to only apply positive - * offsets to streams, delaying their playback instead of trying to speed up - * other streams (which might be impossible when we have to create negative - * latencies). - * The stream that has the smallest diff is selected as the reference stream, - * all other streams will have a positive offset to this difference. */ - - /* some alternative setting allow ignoring RTCP as much as possible, - * for servers generating bogus ntp timeline */ - min = rtp_min = G_MAXINT64; - use_rtp = FALSE; - if (rtcp_sync == GST_RTP_BIN_RTCP_SYNC_RTP) { - guint64 ext_base = -1; - gint64 rtp_delta = 0; - - use_rtp = TRUE; - /* convert to extended RTP time */ - rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base); - /* sanity check; base rtp and provided clock_base should be close */ - if (rtp_clock_base >= base_rtptime) { - if (rtp_clock_base - base_rtptime < 10 * clock_rate) { - rtp_delta = base_time + - gst_util_uint64_scale_int (rtp_clock_base - base_rtptime, - GST_SECOND, clock_rate); - } else { - use_rtp = FALSE; - } + use_rtp = TRUE; + /* convert to extended RTP time */ + rtp_clock_base = gst_rtp_buffer_ext_timestamp (&ext_base, rtp_clock_base); + /* sanity check; base rtp and provided clock_base should be close */ + if (rtp_clock_base >= base_rtptime) { + if (rtp_clock_base - base_rtptime < 10 * clock_rate) { + rtp_delta = base_time + + gst_util_uint64_scale_int (rtp_clock_base - base_rtptime, + GST_SECOND, clock_rate); } else { - if (base_rtptime - rtp_clock_base < 10 * clock_rate) { - rtp_delta = base_time - - gst_util_uint64_scale_int (base_rtptime - rtp_clock_base, - GST_SECOND, clock_rate); - } else { - use_rtp = FALSE; - } + use_rtp = FALSE; } - /* warn and bail for clarity out if no sane values */ - if (!use_rtp) { - GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime"); + } else { + if (base_rtptime - rtp_clock_base < 10 * clock_rate) { + rtp_delta = base_time - + gst_util_uint64_scale_int (base_rtptime - rtp_clock_base, + GST_SECOND, clock_rate); + } else { + use_rtp = FALSE; + } + } + /* warn and bail for clarity out if no sane values */ + if (!use_rtp) { + GST_WARNING_OBJECT (bin, "unable to sync to provided rtptime"); + return; + } + /* store to track changes */ + clock_base = rtp_delta; + /* generate a fake as before, + * now equating rtptime obtained from RTP-Info, + * where the large time represent the otherwise irrelevant npt/ntp time */ + stream->rtp_delta = (GST_SECOND << 28) - rtp_delta; + } else { + clock_base = rtp_clock_base; + } + + all_sync = TRUE; + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + + if (!ostream->have_sync) { + all_sync = FALSE; + continue; + } + + /* change in current stream's base from previously init'ed value + * leads to reset of all stream's base */ + if (stream != ostream && stream->clock_base >= 0 && + (stream->clock_base != clock_base)) { + GST_DEBUG_OBJECT (bin, "reset upon clock base change"); + ostream->clock_base = -100 * GST_SECOND; + ostream->rtp_delta = 0; + } + + if (ostream->rt_delta < min) + min = ostream->rt_delta; + if (ostream->rtp_delta < rtp_min) + rtp_min = ostream->rtp_delta; + } + + /* arrange to re-sync for each stream upon significant change, + * e.g. post-seek */ + all_sync = all_sync && (stream->clock_base == clock_base); + stream->clock_base = clock_base; + + /* may need init performed above later on, but nothing more to do now */ + if (client->nstreams <= 1) + return; + + GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT + " all sync %d", client, min, all_sync); + GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp); + + switch (rtcp_sync) { + case GST_RTP_BIN_RTCP_SYNC_RTP: + if (!use_rtp) + break; + GST_DEBUG_OBJECT (bin, "using rtp generated reports; " + "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min); + /* fall-through */ + case GST_RTP_BIN_RTCP_SYNC_INITIAL: + /* if all have been synced already, do not bother further */ + if (all_sync) { + GST_DEBUG_OBJECT (bin, "all streams already synced; done"); return; } - /* store to track changes */ - clock_base = rtp_delta; - /* generate a fake as before, - * now equating rtptime obtained from RTP-Info, - * where the large time represent the otherwise irrelevant npt/ntp time */ - stream->rtp_delta = (GST_SECOND << 28) - rtp_delta; - } else { - clock_base = rtp_clock_base; - } - - all_sync = TRUE; - for (walk = client->streams; walk; walk = g_slist_next (walk)) { - GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; - - if (!ostream->have_sync) { - all_sync = FALSE; - continue; - } - - /* change in current stream's base from previously init'ed value - * leads to reset of all stream's base */ - if (stream != ostream && stream->clock_base >= 0 && - (stream->clock_base != clock_base)) { - GST_DEBUG_OBJECT (bin, "reset upon clock base change"); - ostream->clock_base = -100 * GST_SECOND; - ostream->rtp_delta = 0; - } - - if (ostream->rt_delta < min) - min = ostream->rt_delta; - if (ostream->rtp_delta < rtp_min) - rtp_min = ostream->rtp_delta; - } - - /* arrange to re-sync for each stream upon significant change, - * e.g. post-seek */ - all_sync = all_sync && (stream->clock_base == clock_base); - stream->clock_base = clock_base; - - /* may need init performed above later on, but nothing more to do now */ - if (client->nstreams <= 1) - return; - - GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT - " all sync %d", client, min, all_sync); - GST_DEBUG_OBJECT (bin, "rtcp sync mode %d, use_rtp %d", rtcp_sync, use_rtp); - - switch (rtcp_sync) { - case GST_RTP_BIN_RTCP_SYNC_RTP: - if (!use_rtp) - break; - GST_DEBUG_OBJECT (bin, "using rtp generated reports; " - "client %p min rtp delta %" G_GINT64_FORMAT, client, rtp_min); - /* fall-through */ - case GST_RTP_BIN_RTCP_SYNC_INITIAL: - /* if all have been synced already, do not bother further */ - if (all_sync) { - GST_DEBUG_OBJECT (bin, "all streams already synced; done"); - return; - } - break; - default: - break; - } - - /* bail out if we adjusted recently enough */ - if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) < - bin->rtcp_sync_interval * GST_MSECOND) { - GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; " - "previous sender info too recent " - "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime); - return; - } - bin->priv->last_ntpnstime = ntpnstime; - - /* calculate offsets for each stream */ - for (walk = client->streams; walk; walk = g_slist_next (walk)) { - GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; - gint64 ts_offset; - - /* ignore streams for which we didn't receive an SR packet yet, we - * can't synchronize them yet. We can however sync other streams just - * fine. */ - if (!ostream->have_sync) - continue; - - /* calculate offset to our reference stream, this should always give a - * positive number. */ - if (use_rtp) - ts_offset = ostream->rtp_delta - rtp_min; - else - ts_offset = ostream->rt_delta - min; - - stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset, - bin->min_ts_offset, TRUE); - } + break; + default: + break; } + + /* bail out if we adjusted recently enough */ + if (all_sync && (ntpnstime - bin->priv->last_ntpnstime) < + bin->rtcp_sync_interval * GST_MSECOND) { + GST_DEBUG_OBJECT (bin, "discarding RTCP sender packet for sync; " + "previous sender info too recent " + "(previous NTP %" G_GUINT64_FORMAT ")", bin->priv->last_ntpnstime); + return; + } + bin->priv->last_ntpnstime = ntpnstime; + + /* calculate offsets for each stream */ + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + gint64 ts_offset; + + /* ignore streams for which we didn't receive an SR packet yet, we + * can't synchronize them yet. We can however sync other streams just + * fine. */ + if (!ostream->have_sync) + continue; + + /* calculate offset to our reference stream, this should always give a + * positive number. */ + if (use_rtp) + ts_offset = ostream->rtp_delta - rtp_min; + else + ts_offset = ostream->rt_delta - min; + + stream_set_ts_offset (bin, ostream, ts_offset, bin->max_ts_offset, + bin->min_ts_offset, TRUE); + } + gst_rtp_bin_send_sync_event (stream); return;