avtp: Use the DTS as the AVTP base time

Make it work a little more like RTP. Have the source interact with the
clock and set the capture time on each packet. Then the other elements
can use that to do adjustments. Since AVTP is always very low latency,
it can be assumed that the gPTP clock at the packet reception is very
close to the sending time, never more than 2 seconds off, so the
timestamps can be compared directly.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/9412>
This commit is contained in:
Olivier Crête 2025-03-06 18:01:23 -05:00 committed by GStreamer Marge Bot
parent 134ff5b45f
commit a6a38dcab9
8 changed files with 76 additions and 93 deletions

View File

@ -284,10 +284,6 @@ gst_avtp_aaf_depay_process (GstAvtpBaseDepayload * avtpbasedepayload,
gst_buffer_unref (buffer);
return GST_FLOW_NOT_NEGOTIATED;
}
if (!gst_avtp_base_depayload_push_segment_event (avtpbasedepayload, tstamp)) {
gst_buffer_unref (buffer);
return GST_FLOW_ERROR;
}
avtpbasedepayload->seqnum = seqnum;
}
@ -304,16 +300,16 @@ gst_avtp_aaf_depay_process (GstAvtpBaseDepayload * avtpbasedepayload,
avtpbasedepayload->seqnum++;
ptime = gst_avtp_base_depayload_tstamp_to_ptime (avtpbasedepayload, tstamp,
avtpbasedepayload->prev_ptime);
avtpbasedepayload->last_dts);
subbuffer = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_ALL,
sizeof (struct avtp_stream_pdu), data_len);
GST_BUFFER_PTS (subbuffer) = ptime;
GST_BUFFER_DTS (subbuffer) = ptime;
avtpbasedepayload->prev_ptime = ptime;
gst_buffer_unref (buffer);
return gst_pad_push (avtpbasedepayload->srcpad, subbuffer);
return gst_avtp_base_depayload_push (avtpbasedepayload, subbuffer);
discard:
gst_buffer_unref (buffer);

View File

@ -56,6 +56,9 @@ static GstFlowReturn avtp_base_depayload_chain (GstPad * pad,
static gboolean avtp_base_depayload_sink_event (GstPad * pad,
GstObject * parent, GstEvent * event);
static gboolean gst_avtp_base_depayload_push_segment_event (GstAvtpBaseDepayload
* avtpbasedepayload);
GType
gst_avtp_base_depayload_get_type (void)
{
@ -133,7 +136,6 @@ gst_avtp_base_depayload_init (GstAvtpBaseDepayload * avtpbasedepayload,
avtpbasedepayload->streamid = DEFAULT_STREAMID;
avtpbasedepayload->prev_ptime = 0;
avtpbasedepayload->seqnum = 0;
}
@ -180,6 +182,8 @@ avtp_base_depayload_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
GstAvtpBaseDepayloadClass *klass =
GST_AVTP_BASE_DEPAYLOAD_GET_CLASS (avtpbasedepayload);
avtpbasedepayload->last_dts = GST_BUFFER_DTS (buffer);
return klass->process (avtpbasedepayload, buffer);
}
@ -201,6 +205,7 @@ gst_avtp_base_depayload_sink_event (GstAvtpBaseDepayload * avtpbasedepayload,
* gst_avtp_base_depayload_push_segment_event() for more information.
*/
gst_event_unref (event);
avtpbasedepayload->segment_sent = FALSE;
return TRUE;
default:
return gst_pad_event_default (avtpbasedepayload->sinkpad,
@ -231,41 +236,44 @@ gst_avtp_base_depayload_tstamp_to_ptime (GstAvtpBaseDepayload *
avtpbasedepayload, guint32 tstamp, GstClockTime ref)
{
GstClockTime ptime;
guint32 ref_low;
ref += gst_element_get_base_time (GST_ELEMENT (avtpbasedepayload));
GST_LOG_OBJECT (avtpbasedepayload, "dts: %" GST_TIME_FORMAT " tstamp: %u",
GST_TIME_ARGS (ref), tstamp);
ref_low = ref & 0xFFFFFFFFULL;
ptime = (ref & 0xFFFFFFFF00000000ULL) | tstamp;
/* If 'ptime' is less than the our reference time, it means the higher part
* from 'ptime' needs to be incremented by 1 in order reflect the correct
* presentation time.
*/
if (ptime < ref)
ptime += (1ULL << 32);
if (tstamp < G_MAXINT32 && ref_low > G_MAXINT32)
ptime += G_MAXUINT32 + 1;
if (tstamp < G_MAXINT32 && ref_low > G_MAXINT32 && ptime > G_MAXUINT32)
ptime -= G_MAXUINT32 + 1;
GST_LOG_OBJECT (avtpbasedepayload, "AVTP presentation time %" GST_TIME_FORMAT,
GST_TIME_ARGS (ptime));
return ptime;
}
gboolean
static gboolean
gst_avtp_base_depayload_push_segment_event (GstAvtpBaseDepayload *
avtpbasedepayload, guint32 avtp_tstamp)
avtpbasedepayload)
{
GstClock *clock;
GstEvent *event;
GstSegment segment;
GstClockTime now, base_time, avtp_ptime;
GstClockTime base_time;
clock = GST_ELEMENT_CLOCK (avtpbasedepayload);
now = gst_clock_get_time (clock);
avtp_ptime =
gst_avtp_base_depayload_tstamp_to_ptime (avtpbasedepayload, avtp_tstamp,
now);
base_time = gst_element_get_base_time (GST_ELEMENT (avtpbasedepayload));
gst_segment_init (&segment, GST_FORMAT_TIME);
segment.base = avtp_ptime - base_time;
segment.start = avtp_ptime;
segment.base = 0;
segment.start = base_time;
segment.stop = -1;
event = gst_event_new_segment (&segment);
@ -282,6 +290,16 @@ gst_avtp_base_depayload_push_segment_event (GstAvtpBaseDepayload *
GST_DEBUG_OBJECT (avtpbasedepayload, "SEGMENT event pushed: %"
GST_SEGMENT_FORMAT, &segment);
avtpbasedepayload->prev_ptime = avtp_ptime;
avtpbasedepayload->segment_sent = TRUE;
return TRUE;
}
GstFlowReturn
gst_avtp_base_depayload_push (GstAvtpBaseDepayload *
avtpbasedepayload, GstBuffer * buffer)
{
if (!avtpbasedepayload->segment_sent)
gst_avtp_base_depayload_push_segment_event (avtpbasedepayload);
return gst_pad_push (avtpbasedepayload->srcpad, buffer);
}

View File

@ -51,7 +51,9 @@ struct _GstAvtpBaseDepayload
guint64 streamid;
GstClockTime prev_ptime;
GstClockTime last_dts;
gboolean segment_sent;
guint8 seqnum;
gpointer _gst_reserved[GST_PADDING];
@ -73,8 +75,8 @@ GType gst_avtp_base_depayload_get_type (void);
GstClockTime gst_avtp_base_depayload_tstamp_to_ptime (GstAvtpBaseDepayload *
avtpbasedepayload, guint32 tstamp, GstClockTime ref);
gboolean gst_avtp_base_depayload_push_segment_event (GstAvtpBaseDepayload *
avtpbasedepayload, guint32 avtp_tstamp);
GstFlowReturn gst_avtp_base_depayload_push (GstAvtpBaseDepayload *
avtpbasedepayload, GstBuffer * buffer);
G_END_DECLS

View File

@ -301,6 +301,7 @@ static void
gst_avtp_cvf_depay_get_avtp_timestamps (GstAvtpCvfDepay * avtpcvfdepay,
GstMapInfo * map, GstClockTime * pts, GstClockTime * dts)
{
GstAvtpBaseDepayload *base = GST_AVTP_BASE_DEPAYLOAD (avtpcvfdepay);
struct avtp_stream_pdu *pdu;
guint64 avtp_time, h264_time, tv, ptv;
gint res;
@ -317,7 +318,8 @@ gst_avtp_cvf_depay_get_avtp_timestamps (GstAvtpCvfDepay * avtpcvfdepay,
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_TIMESTAMP, &avtp_time);
g_assert (res == 0);
*dts = avtp_time;
*dts = gst_avtp_base_depayload_tstamp_to_ptime (base, avtp_time,
base->last_dts);
}
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_H264_PTV, &ptv);
@ -327,7 +329,8 @@ gst_avtp_cvf_depay_get_avtp_timestamps (GstAvtpCvfDepay * avtpcvfdepay,
res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, &h264_time);
g_assert (res == 0);
*pts = h264_time;
*pts = gst_avtp_base_depayload_tstamp_to_ptime (base, h264_time,
base->last_dts);
}
}

View File

@ -136,6 +136,7 @@ gst_avtp_src_init (GstAvtpSrc * avtpsrc)
gst_base_src_set_live (GST_BASE_SRC (avtpsrc), TRUE);
gst_base_src_set_format (GST_BASE_SRC (avtpsrc), GST_FORMAT_TIME);
gst_base_src_set_blocksize (GST_BASE_SRC (avtpsrc), MAX_AVTPDU_SIZE);
gst_base_src_set_do_timestamp (GST_BASE_SRC (avtpsrc), TRUE);
avtpsrc->ifname = g_strdup (DEFAULT_IFNAME);
avtpsrc->address = g_strdup (DEFAULT_ADDRESS);

View File

@ -84,9 +84,6 @@ gst_avtp_vf_depay_base_push (GstAvtpVfDepayBase * avtpvfdepaybase)
GstFlowReturn ret;
if (G_UNLIKELY (!gst_pad_has_current_caps (avtpbasedepayload->srcpad))) {
guint64 pts_m;
guint32 dts, pts;
if (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= GST_LEVEL_DEBUG) {
GstClock *clock =
gst_element_get_clock (GST_ELEMENT_CAST (avtpvfdepaybase));
@ -108,67 +105,10 @@ gst_avtp_vf_depay_base_push (GstAvtpVfDepayBase * avtpvfdepaybase)
GST_ELEMENT_ERROR (avtpvfdepaybase, CORE, CAPS, (NULL), (NULL));
return GST_FLOW_ERROR;
}
if (!gst_avtp_base_depayload_push_segment_event (avtpbasedepayload,
GST_BUFFER_PTS (avtpvfdepaybase->out_buffer))) {
GST_ELEMENT_ERROR (avtpvfdepaybase, CORE, EVENT,
("Could not send SEGMENT event"), (NULL));
}
/* Now that we sent our segment starting on the first Presentation
* time available, `avtpbasedepayload->prev_ptime` saves that value,
* to be a reference for calculating future buffer timestamps from
* the AVTP timestamps.
*
* However, decode timestamps can be smaller than presentation
* timestamps. So we can't use `avtpbasedepayload->prev_time` as
* reference to calculate them. Instead, here, we calculate the
* first decode timestamp and save it on `avtpvfdepaybase->prev_ptime`.
*
* The method used to calculate the "absolute" decode timestamp (DTS)
* from presentation timestamp is as follows:
*
* DTS = dts > pts ? (PTSm - 1) | dts : PTSm | dts
*
* Where:
* dts: 32 bits gPTP decode timestamp
* pts: 32 bits gPTP presentation timestamp
* PTSm: 32 most signifactive bits of the "absolute" presentation
* timestamp
*
* This allow us to handle cases where the pts ends up being smaller
* than dts due pts falling after an AVTP timestamp wrapping.
*/
pts = GST_BUFFER_PTS (avtpvfdepaybase->out_buffer);
dts = GST_BUFFER_DTS (avtpvfdepaybase->out_buffer);
pts_m = avtpbasedepayload->prev_ptime & 0xFFFFFFFF00000000ULL;
avtpbasedepayload->prev_ptime = dts > pts ? (pts_m -=
(1ULL << 32)) | dts : pts_m | dts;
GST_DEBUG_OBJECT (avtpvfdepaybase, "prev_ptime set to %" GST_TIME_FORMAT,
GST_TIME_ARGS (avtpbasedepayload->prev_ptime));
}
/* At this point, we're sure segment was sent, so we can properly calc
* buffer timestamps */
GST_DEBUG_OBJECT (avtpvfdepaybase, "Converting %" GST_TIME_FORMAT " to PTS",
GST_TIME_ARGS (GST_BUFFER_PTS (avtpvfdepaybase->out_buffer)));
GST_BUFFER_PTS (avtpvfdepaybase->out_buffer) =
gst_avtp_base_depayload_tstamp_to_ptime (avtpbasedepayload, GST_BUFFER_PTS
(avtpvfdepaybase->out_buffer), avtpbasedepayload->prev_ptime);
GST_DEBUG_OBJECT (avtpvfdepaybase, "Converting %" GST_TIME_FORMAT " to DTS",
GST_TIME_ARGS (GST_BUFFER_DTS (avtpvfdepaybase->out_buffer)));
GST_BUFFER_DTS (avtpvfdepaybase->out_buffer) =
gst_avtp_base_depayload_tstamp_to_ptime (avtpbasedepayload, GST_BUFFER_DTS
(avtpvfdepaybase->out_buffer), avtpbasedepayload->prev_ptime);
/* Use DTS as prev_ptime as it is smaller or equal to PTS, so that
* next calculations of PTS/DTS won't wrap too early */
avtpbasedepayload->prev_ptime = GST_BUFFER_DTS (avtpvfdepaybase->out_buffer);
ret = gst_pad_push (GST_AVTP_BASE_DEPAYLOAD (avtpvfdepaybase)->srcpad,
ret =
gst_avtp_base_depayload_push (GST_AVTP_BASE_DEPAYLOAD (avtpvfdepaybase),
avtpvfdepaybase->out_buffer);
avtpvfdepaybase->out_buffer = NULL;

View File

@ -60,6 +60,8 @@ create_input_buffer (GstHarness * h)
memcpy (pdu->avtp_payload, audio_data, sizeof (audio_data));
gst_buffer_unmap (buf, &info);
GST_BUFFER_DTS (buf) = GST_SECOND;
return buf;
}
@ -207,6 +209,7 @@ GST_START_TEST (test_events)
const GstSegment *segment;
h = setup_harness ();
buf = create_input_buffer (h);
gst_harness_push (h, buf);
@ -231,9 +234,9 @@ GST_START_TEST (test_events)
fail_unless (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT);
gst_event_parse_segment (event, &segment);
fail_unless (segment->format == GST_FORMAT_TIME);
fail_unless (segment->base == 3000);
fail_unless (segment->start == 3000);
fail_unless (segment->stop == -1);
fail_unless_equals_uint64 (segment->base, 0);
fail_unless_equals_uint64 (segment->start, 0);
fail_unless_equals_uint64 (segment->stop, GST_CLOCK_TIME_NONE);
gst_event_unref (event);
gst_harness_teardown (h);
@ -257,6 +260,8 @@ GST_START_TEST (test_buffer)
fail_unless (memcmp (info.data, audio_data, info.size) == 0);
gst_buffer_unmap (out, &info);
fail_unless_equals_uint64 (GST_BUFFER_PTS (out), 3000);
gst_buffer_unref (out);
gst_harness_teardown (h);
}

View File

@ -111,6 +111,7 @@ GST_START_TEST (test_depayloader_fragment_and_single)
/* Create the input AVTPDU */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -175,6 +176,7 @@ GST_START_TEST (test_depayloader_fragmented_two_start_eos)
/* Create the input AVTPDU */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -240,6 +242,7 @@ GST_START_TEST (test_depayloader_multiple_lost_eos)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -290,6 +293,7 @@ GST_START_TEST (test_depayloader_fragmented_eos)
/* Create the input AVTPDU */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -346,6 +350,7 @@ GST_START_TEST (test_depayloader_single_eos)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -382,6 +387,7 @@ GST_START_TEST (test_depayloader_invalid_avtpdu)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -526,11 +532,13 @@ GST_START_TEST (test_depayloader_invalid_avtpdu)
/* Invalid buffer size (too small to fit an AVTP header) */
small = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE / 2);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_harness_push (h, small);
fail_unless_equals_uint64 (gst_harness_buffers_received (h), 0);
/* Invalid buffer size (too small to fit a fragment header) */
small = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 1);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (small, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
avtp_cvf_pdu_init (pdu, AVTP_CVF_FORMAT_SUBTYPE_H264);
@ -571,6 +579,7 @@ GST_START_TEST (test_depayloader_lost_fragments)
/* Create the input AVTPDU */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -703,6 +712,7 @@ GST_START_TEST (test_depayloader_lost_packet)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -798,6 +808,7 @@ GST_START_TEST (test_depayloader_single_and_messed_fragments)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -871,6 +882,7 @@ GST_START_TEST (test_depayloader_single_and_messed_fragments_2)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -959,6 +971,7 @@ GST_START_TEST (test_depayloader_single_and_messed_fragments_3)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -1095,6 +1108,7 @@ GST_START_TEST (test_depayloader_single_and_fragmented)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -1182,6 +1196,7 @@ GST_START_TEST (test_depayloader_fragmented)
/* Create the input AVTPDU */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -1265,6 +1280,7 @@ GST_START_TEST (test_depayloader_fragmented_big)
/* Create the input AVTPDU */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + DATA_LEN);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -1350,6 +1366,7 @@ GST_START_TEST (test_depayloader_multiple_single)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;
@ -1438,6 +1455,7 @@ GST_START_TEST (test_depayloader_single)
/* Create the input AVTPDU header */
in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
GST_BUFFER_DTS (in) = GST_SECOND;
gst_buffer_map (in, &map, GST_MAP_READWRITE);
pdu = (struct avtp_stream_pdu *) map.data;