diff --git a/ext/avtp/gstavtp.c b/ext/avtp/gstavtp.c index c94b608c18..fa8810b1d9 100644 --- a/ext/avtp/gstavtp.c +++ b/ext/avtp/gstavtp.c @@ -82,6 +82,19 @@ * should use GstSytemClock with GST_CLOCK_TYPE_REALTIME as the pipeline * clock. * + * ### Clock Reference Format (CRF) + * + * Even though the systems are synchronized by PTP, it is possible that + * different talkers can send media streams which are out of phase or the + * frequencies do not exactly match. This is partcularly important when there + * is a single listener processing data from multiple talkers. The systems in + * this scenario can benefit if a common clock is distributed among the + * systems. + * + * This can be achieved by using the avtpcrfsync element which implements CRF + * as described in Chapter 10 of IEEE 1722-2016. For further details, look at + * the documentation for avtpcrfsync. + * * ### Traffic Control Setup * * FQTSS (Forwarding and Queuing Enhancements for Time-Sensitive Streams) can be @@ -133,11 +146,11 @@ * Each element has its own configuration properties, with some being common * to several elements. Basic properties are: * - * * streamid (avtpaafpay, avtpcvfpay, avtpaafdepay, avtpcvfdepay): Stream ID - * associated with the stream. + * * streamid (avtpaafpay, avtpcvfpay, avtpaafdepay, avtpcvfdepay, + * avtpcrfsync): Stream ID associated with the stream. * - * * ifname (avtpsink, avtpsrc): Network interface used to send/receive - * AVTP packets. + * * ifname (avtpsink, avtpsrc, avtpcrfsync): Network interface + * used to send/receive AVTP packets. * * * dst-macaddr (avtpsink, avtpsrc): Destination MAC address for the stream. * @@ -177,7 +190,7 @@ * * $ gst-launch-1.0 -k ptp videotestsrc is-live=true ! clockoverlay ! \ * x264enc ! avtpcvfpay processing-deadline=20000000 ! \ - * avtpsink ifname=$IFNAME + * avtpcrfsync ifname=$IFNAME ! avtpsink ifname=$IFNAME * * On the AVTP listener host, the following pipeline can be used to get the * AVTP stream, depacketize it and show it on the screen: @@ -223,6 +236,7 @@ #include "gstavtpcvfpay.h" #include "gstavtpsink.h" #include "gstavtpsrc.h" +#include "gstavtpcrfsync.h" static gboolean plugin_init (GstPlugin * plugin) @@ -239,6 +253,8 @@ plugin_init (GstPlugin * plugin) return FALSE; if (!gst_avtp_cvf_depay_plugin_init (plugin)) return FALSE; + if (!gst_avtp_crf_sync_plugin_init (plugin)) + return FALSE; return TRUE; } diff --git a/ext/avtp/gstavtpcrfbase.c b/ext/avtp/gstavtpcrfbase.c new file mode 100644 index 0000000000..26109f7769 --- /dev/null +++ b/ext/avtp/gstavtpcrfbase.c @@ -0,0 +1,583 @@ +/* + * GStreamer AVTP Plugin + * Copyright (C) 2019 Intel Corporation + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Library General Public License as published by + * the Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * See the GNU Library General Public License for more details. You should + * have received a copy of the GNU Library General Public License along with + * this library; if not , write to the Free Software Foundation, Inc., 51 + * Franklin St, Fifth Floor, Boston, MA 02110 - 1301, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "gstavtpcrfutil.h" +#include "gstavtpcrfbase.h" + +GST_DEBUG_CATEGORY_STATIC (avtpcrfbase_debug); +#define GST_CAT_DEFAULT (avtpcrfbase_debug) + +#define CRF_TIMESTAMP_SIZE 8 +#define MAX_AVTPDU_SIZE 1500 +#define MAX_NUM_PERIODS_STORED 10 +#define RECV_TIMEOUT 1 // in seconds + +#define DEFAULT_STREAMID 0xAABBCCDDEEFF1000 +#define DEFAULT_IFNAME "eth0" +#define DEFAULT_ADDRESS "01:AA:AA:AA:AA:AA" + +enum +{ + PROP_0, + PROP_STREAMID, + PROP_IFNAME, + PROP_ADDRESS, +}; + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-avtp") + ); + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-avtp") + ); + +static void gst_avtp_crf_base_finalize (GObject * gobject); +static void +gst_avtp_crf_base_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void +gst_avtp_crf_base_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static GstStateChangeReturn gst_avtp_crf_base_change_state (GstElement * + element, GstStateChange transition); +static void crf_listener_thread_func (GstAvtpCrfBase * avtpcrfbase); + +#define gst_avtp_crf_base_parent_class parent_class +G_DEFINE_TYPE (GstAvtpCrfBase, gst_avtp_crf_base, GST_TYPE_BASE_TRANSFORM); + +static void +gst_avtp_crf_base_class_init (GstAvtpCrfBaseClass * klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + GObjectClass *object_class = G_OBJECT_CLASS (klass); + + object_class->finalize = GST_DEBUG_FUNCPTR (gst_avtp_crf_base_finalize); + object_class->get_property = + GST_DEBUG_FUNCPTR (gst_avtp_crf_base_get_property); + object_class->set_property = + GST_DEBUG_FUNCPTR (gst_avtp_crf_base_set_property); + + g_object_class_install_property (object_class, PROP_STREAMID, + g_param_spec_uint64 ("streamid", "Stream ID", + "Stream ID associated with the CRF AVTPDU", 0, G_MAXUINT64, + DEFAULT_STREAMID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + g_object_class_install_property (object_class, PROP_IFNAME, + g_param_spec_string ("ifname", "Interface Name", + "Network interface utilized to receive CRF AVTPDUs", + DEFAULT_IFNAME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + g_object_class_install_property (object_class, PROP_ADDRESS, + g_param_spec_string ("address", "Destination MAC address", + "Destination MAC address expected on the Ethernet frames", + DEFAULT_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); + + element_class->change_state = + GST_DEBUG_FUNCPTR (gst_avtp_crf_base_change_state); + + gst_element_class_add_static_pad_template (element_class, &sink_template); + gst_element_class_add_static_pad_template (element_class, &src_template); + + GST_DEBUG_CATEGORY_INIT (avtpcrfbase_debug, "avtpcrfbase", 0, "CRF Base"); +} + +static void +gst_avtp_crf_base_init (GstAvtpCrfBase * avtpcrfbase) +{ + avtpcrfbase->streamid = DEFAULT_STREAMID; + avtpcrfbase->ifname = g_strdup (DEFAULT_IFNAME); + avtpcrfbase->address = g_strdup (DEFAULT_ADDRESS); +} + +static GstStateChangeReturn +gst_avtp_crf_base_change_state (GstElement * element, GstStateChange transition) +{ + GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (element); + GstAvtpCrfThreadData *thread_data = &avtpcrfbase->thread_data; + GstStateChangeReturn res; + GError *error = NULL; + + GST_DEBUG_OBJECT (avtpcrfbase, "transition %d", transition); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + thread_data->past_periods = + g_malloc0 (sizeof (guint64) * MAX_NUM_PERIODS_STORED); + thread_data->mr = -1; + thread_data->is_running = TRUE; + thread_data->thread = + g_thread_try_new ("crf-listener", + (GThreadFunc) crf_listener_thread_func, avtpcrfbase, &error); + + if (error) { + GST_ERROR_OBJECT (avtpcrfbase, "failed to start thread, %s", + error->message); + g_error_free (error); + g_free (thread_data->past_periods); + return GST_STATE_CHANGE_FAILURE; + } + break; + default: + break; + } + + res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_NULL: + thread_data->is_running = FALSE; + g_thread_join (thread_data->thread); + g_free (thread_data->past_periods); + break; + default: + break; + } + + return res; +} + +static int +setup_socket (GstAvtpCrfBase * avtpcrfbase) +{ + struct sockaddr_ll sk_addr = { 0 }; + struct packet_mreq mreq = { 0 }; + struct timeval timeout = { 0 }; + guint8 addr[ETH_ALEN]; + int fd, res, ifindex; + + fd = socket (AF_PACKET, SOCK_DGRAM, htons (ETH_P_TSN)); + if (fd < 0) { + GST_ERROR_OBJECT (avtpcrfbase, "Failed to open socket: %s", + strerror (errno)); + return fd; + } + + ifindex = if_nametoindex (avtpcrfbase->ifname); + if (!ifindex) { + res = -1; + GST_ERROR_OBJECT (avtpcrfbase, "Failed to get index for interface: %s", + strerror (errno)); + goto err; + } + + sk_addr.sll_family = AF_PACKET; + sk_addr.sll_protocol = htons (ETH_P_TSN); + sk_addr.sll_ifindex = ifindex; + + res = bind (fd, (struct sockaddr *) &sk_addr, sizeof (sk_addr)); + if (res < 0) { + GST_ERROR_OBJECT (avtpcrfbase, "Failed to bind socket: %s", + strerror (errno)); + goto err; + } + + res = sscanf (avtpcrfbase->address, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx", + &addr[0], &addr[1], &addr[2], &addr[3], &addr[4], &addr[5]); + if (res != 6) { + res = -1; + GST_ERROR_OBJECT (avtpcrfbase, "Destination MAC address format not valid"); + goto err; + } + + mreq.mr_ifindex = ifindex; + mreq.mr_type = PACKET_MR_MULTICAST; + mreq.mr_alen = ETH_ALEN; + memcpy (&mreq.mr_address, addr, ETH_ALEN); + res = setsockopt (fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP, &mreq, + sizeof (struct packet_mreq)); + if (res < 0) { + GST_ERROR_OBJECT (avtpcrfbase, "Failed to set multicast address: %s", + strerror (errno)); + goto err; + } + + timeout.tv_sec = RECV_TIMEOUT; + res = + setsockopt (fd, SOL_SOCKET, SO_RCVTIMEO, (void *) &timeout, + sizeof (struct timeval)); + if (res < 0) { + GST_ERROR_OBJECT (avtpcrfbase, "Failed to set receive timeout: %s", + strerror (errno)); + goto err; + } + + return fd; + +err: + close (fd); + return res; +} + +static gboolean +validate_crf_pdu (GstAvtpCrfBase * avtpcrfbase, struct avtp_crf_pdu *crf_pdu, + int packet_size) +{ + GstAvtpCrfThreadData *data = &avtpcrfbase->thread_data; + guint64 tstamp_interval, base_freq, pull, type; + guint64 streamid_valid, streamid, data_len; + guint32 subtype; + int res; + + if (packet_size < sizeof (struct avtp_crf_pdu)) + return FALSE; + + res = avtp_pdu_get ((struct avtp_common_pdu *) crf_pdu, AVTP_FIELD_SUBTYPE, + &subtype); + g_assert (res == 0); + if (subtype != AVTP_SUBTYPE_CRF) { + GST_DEBUG_OBJECT (avtpcrfbase, "Not a CRF PDU.subtype: %u", subtype); + return FALSE; + } + + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_SV, &streamid_valid); + g_assert (res == 0); + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_STREAM_ID, &streamid); + g_assert (res == 0); + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_CRF_DATA_LEN, &data_len); + g_assert (res == 0); + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_TIMESTAMP_INTERVAL, + (guint64 *) & tstamp_interval); + g_assert (res == 0); + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_BASE_FREQ, &base_freq); + g_assert (res == 0); + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_PULL, &pull); + g_assert (res == 0); + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_TYPE, &type); + g_assert (res == 0); + + if (!streamid_valid || streamid != avtpcrfbase->streamid) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Stream ID doesn't match. Discarding CRF packet"); + return FALSE; + } + + if (G_UNLIKELY (data_len + sizeof (struct avtp_crf_pdu) > packet_size)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Packet size smaller than expected. Discarding CRF packet"); + return FALSE; + } + + if (G_UNLIKELY (!data->timestamp_interval)) { + if (G_UNLIKELY (tstamp_interval == 0)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "timestamp_interval should not be zero. Discarding CRF packet"); + return FALSE; + } + data->timestamp_interval = tstamp_interval; + + if (G_UNLIKELY (base_freq == 0)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Base Frequency cannot be zero, Discarding CRF packet"); + goto error; + } + data->base_freq = base_freq; + + if (G_UNLIKELY (pull > AVTP_CRF_PULL_MULT_BY_1_OVER_8)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Pull value invalid, Discarding CRF packet"); + goto error; + } + data->pull = pull; + + if (G_UNLIKELY (type > AVTP_CRF_TYPE_MACHINE_CYCLE)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "CRF timestamp type invalid, Discarding CRF packet"); + goto error; + } + data->type = type; + + if (G_UNLIKELY (!data_len || data_len % 8 != 0)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Data Length should be a multiple of 8. Discarding CRF packet."); + goto error; + } + data->num_pkt_tstamps = data_len / CRF_TIMESTAMP_SIZE; + } else { + if (G_UNLIKELY (tstamp_interval != data->timestamp_interval)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Timestamp interval doesn't match, discarding CRF packet"); + return FALSE; + } + + if (G_UNLIKELY (base_freq != data->base_freq)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Base Frequency doesn't match, discarding CRF packet"); + return FALSE; + } + + if (G_UNLIKELY (pull != data->pull)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Pull value doesn't match, discarding CRF packet"); + return FALSE; + } + + if (G_UNLIKELY (data->type != type)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "CRF timestamp type doesn't match, Discarding CRF packet"); + return FALSE; + } + + if (G_UNLIKELY (data_len / CRF_TIMESTAMP_SIZE != data->num_pkt_tstamps)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Number of timestamps doesn't match. discarding CRF packet"); + return FALSE; + } + } + + /* Make sure all the timestamps are monotonically increasing. */ + for (int i = 0; i < data->num_pkt_tstamps - 1; i++) { + GstClockTime tstamp, next_tstamp; + + tstamp = be64toh (crf_pdu->crf_data[i]); + next_tstamp = be64toh (crf_pdu->crf_data[i + 1]); + if (G_UNLIKELY (tstamp >= next_tstamp)) { + GST_DEBUG_OBJECT (avtpcrfbase, + "Timestamps are not monotonically increasing. discarding CRF packet"); + return FALSE; + } + } + + return TRUE; + +error: + data->timestamp_interval = 0; + return FALSE; +} + +static gdouble +get_base_freq_multiplier (GstAvtpCrfBase * avtpcrfbase, guint64 pull) +{ + switch (pull) { + case 0: + return 1.0; + case 1: + return 1 / 1.001; + case 2: + return 1.001; + case 3: + return 24.0 / 25; + case 4: + return 25.0 / 24; + case 5: + return 1.0 / 8; + default: + GST_ERROR_OBJECT (avtpcrfbase, "Invalid pull value"); + return -1; + } +} + +static void +calculate_average_period (GstAvtpCrfBase * avtpcrfbase, + struct avtp_crf_pdu *crf_pdu) +{ + GstAvtpCrfThreadData *data = &avtpcrfbase->thread_data; + GstClockTime first_pkt_tstamp, last_pkt_tstamp; + int num_pkt_tstamps, past_periods_iter; + GstClockTime accumulate_period = 0; + + num_pkt_tstamps = data->num_pkt_tstamps; + past_periods_iter = data->past_periods_iter; + first_pkt_tstamp = be64toh (crf_pdu->crf_data[0]); + last_pkt_tstamp = be64toh (crf_pdu->crf_data[num_pkt_tstamps - 1]); + + /* + * If there is only one CRF Timestamp per CRF AVTPU, at least two packets are + * needed to calculate the period. Also, sequence number needs to be checked + * to ensure consecutive packets are being used to calculate the period. + * Otherwise, we will just use the nominal frequency to estimate period. + */ + if (num_pkt_tstamps == 1) { + guint64 seqnum; + int res; + + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_SEQ_NUM, &seqnum); + g_assert (res == 0); + + if (!data->last_received_tstamp || + ((data->last_seqnum + 1) % 255 != seqnum)) { + GstClockTime average_period = data->average_period; + + if (!data->last_received_tstamp) { + gdouble base_freq_mult; + + base_freq_mult = get_base_freq_multiplier (avtpcrfbase, data->pull); + if (base_freq_mult < 0) + return; + + average_period = + gst_util_uint64_scale (1.0, 1000000000, + (data->base_freq * base_freq_mult)); + } + data->last_received_tstamp = first_pkt_tstamp; + data->last_seqnum = seqnum; + data->current_ts = first_pkt_tstamp; + data->average_period = average_period; + return; + } + + data->past_periods[past_periods_iter] = + first_pkt_tstamp - data->last_received_tstamp; + data->last_received_tstamp = first_pkt_tstamp; + data->last_seqnum = seqnum; + } else { + data->past_periods[past_periods_iter] = + (last_pkt_tstamp - first_pkt_tstamp) / + (data->timestamp_interval * (num_pkt_tstamps - 1)); + } + + if (data->periods_stored < MAX_NUM_PERIODS_STORED) + data->periods_stored++; + + data->past_periods_iter = (past_periods_iter + 1) % data->periods_stored; + + for (int i = 0; i < data->periods_stored; i++) + accumulate_period += data->past_periods[i]; + + data->average_period = accumulate_period / data->periods_stored; + data->current_ts = first_pkt_tstamp; +} + +static void +crf_listener_thread_func (GstAvtpCrfBase * avtpcrfbase) +{ + GstAvtpCrfThreadData *data = &avtpcrfbase->thread_data; + struct avtp_crf_pdu *crf_pdu = g_alloca (MAX_AVTPDU_SIZE); + guint64 media_clk_reset; + int fd, n, res; + + fd = setup_socket (avtpcrfbase); + if (fd < 0) { + GST_ELEMENT_ERROR (avtpcrfbase, RESOURCE, OPEN_READ, + ("Cannot open socket for CRF Listener"), (NULL)); + return; + } + + while (data->is_running) { + n = recv (fd, crf_pdu, MAX_AVTPDU_SIZE, 0); + + if (n == -1) { + if (errno == EAGAIN || errno == EINTR) + continue; + + GST_ERROR_OBJECT (avtpcrfbase, "Failed to receive packet: %s", + strerror (errno)); + break; + } + + if (!validate_crf_pdu (avtpcrfbase, crf_pdu, n)) + continue; + + GST_DEBUG_OBJECT (avtpcrfbase, "Packet valid. Adding to buffer\n"); + + res = avtp_crf_pdu_get (crf_pdu, AVTP_CRF_FIELD_MR, &media_clk_reset); + g_assert (res == 0); + + if (media_clk_reset != data->mr) { + memset (data->past_periods, 0, sizeof (gint64) * MAX_NUM_PERIODS_STORED); + data->periods_stored = 0; + data->average_period = 0; + data->current_ts = 0; + data->last_received_tstamp = 0; + data->past_periods_iter = 0; + data->mr = media_clk_reset; + } + + calculate_average_period (avtpcrfbase, crf_pdu); + } + + close (fd); +} + +static void +gst_avtp_crf_base_finalize (GObject * object) +{ + GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (object); + + g_free (avtpcrfbase->ifname); + g_free (avtpcrfbase->address); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_avtp_crf_base_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (object); + + GST_DEBUG_OBJECT (avtpcrfbase, "prop_id %u", prop_id); + + switch (prop_id) { + case PROP_STREAMID: + avtpcrfbase->streamid = g_value_get_uint64 (value); + break; + case PROP_IFNAME: + g_free (avtpcrfbase->ifname); + avtpcrfbase->ifname = g_value_dup_string (value); + break; + case PROP_ADDRESS: + g_free (avtpcrfbase->address); + avtpcrfbase->address = g_value_dup_string (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_avtp_crf_base_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (object); + + GST_DEBUG_OBJECT (avtpcrfbase, "prop_id %u", prop_id); + + switch (prop_id) { + case PROP_STREAMID: + g_value_set_uint64 (value, avtpcrfbase->streamid); + break; + case PROP_IFNAME: + g_value_set_string (value, avtpcrfbase->ifname); + break; + case PROP_ADDRESS: + g_value_set_string (value, avtpcrfbase->address); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} diff --git a/ext/avtp/gstavtpcrfbase.h b/ext/avtp/gstavtpcrfbase.h new file mode 100644 index 0000000000..da5197509f --- /dev/null +++ b/ext/avtp/gstavtpcrfbase.h @@ -0,0 +1,86 @@ +/* + * GStreamer AVTP Plugin + * Copyright (C) 2019 Intel Corporation + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_AVTP_CRF_BASE_H__ +#define __GST_AVTP_CRF_BASE_H__ + +#include +#include +#include + +G_BEGIN_DECLS +#define GST_TYPE_AVTP_CRF_BASE (gst_avtp_crf_base_get_type()) +#define GST_AVTP_CRF_BASE(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AVTP_CRF_BASE,GstAvtpCrfBase)) +#define GST_AVTP_CRF_BASE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AVTP_CRF_BASE,GstAvtpCrfBaseClass)) +#define GST_IS_AVTP_CRF_BASE(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AVTP_CRF_BASE)) +#define GST_IS_AVTP_CRF_BASE_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AVTP_CRF_BASE)) +typedef struct _GstAvtpCrfBase GstAvtpCrfBase; +typedef struct _GstAvtpCrfBaseClass GstAvtpCrfBaseClass; +typedef struct _GstAvtpCrfThreadData GstAvtpCrfThreadData; + +struct _GstAvtpCrfThreadData +{ + GThread *thread; + gboolean is_running; + + guint64 num_pkt_tstamps; + GstClockTime timestamp_interval; + guint64 base_freq; + guint64 pull; + guint64 type; + guint64 mr; + + GstClockTime *past_periods; + int past_periods_iter; + int periods_stored; + GstClockTime average_period; + GstClockTime current_ts; + GstClockTime last_received_tstamp; + guint64 last_seqnum; +}; + +struct _GstAvtpCrfBase +{ + GstBaseTransform element; + + guint64 streamid; + gchar *ifname; + gchar *address; + + GstAvtpCrfThreadData thread_data; +}; + +struct _GstAvtpCrfBaseClass +{ + GstBaseTransformClass parent_class; + + GstPadEventFunction sink_event; + + gpointer _gst_reserved[GST_PADDING]; +}; + +GType gst_avtp_crf_base_get_type (void); + +G_END_DECLS +#endif /* __GST_AVTP_CRF_BASE_H__ */ diff --git a/ext/avtp/gstavtpcrfsync.c b/ext/avtp/gstavtpcrfsync.c new file mode 100644 index 0000000000..4a17977bb3 --- /dev/null +++ b/ext/avtp/gstavtpcrfsync.c @@ -0,0 +1,249 @@ +/* + * GStreamer AVTP Plugin + * Copyright (C) 2019 Intel Corporation + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-avtpcrfsync + * @see_also: avtpcrfcheck + * + * Adjust the Presentation Time from AVTPDUs to align with the reference clock + * provided by the CRF stream. For detailed information see chapter 10 in + * https://standards.ieee.org/standard/1722-2016.html. A helpful aid for + * visualizing CRF and it's advantages can be found at + * http://grouper.ieee.org/groups/1722/contributions/2014/1722a-rsilfvast-Diagrams%20for%20Common%20Timing%20Grid%20and%20Presentation%20Time%20(for%20review%20and%20discussion).pdf + * (Look at page 1). + * + * + * Example pipeline + * |[ + * gst-launch-1.0 audiotestsrc ! audioconvert ! avtpaafpay ! avtpcrfsync ! avtpsink + * ]| This example pipeline will adjust the timestamps for rawaudio payload. + * Refer to the avtpcrfcheck example to validate the adjusted timestamp. + * + */ + +#include +#include +#include +#include +#include +#include + +#include "gstavtpcrfbase.h" +#include "gstavtpcrfsync.h" +#include "gstavtpcrfutil.h" + +GST_DEBUG_CATEGORY_STATIC (avtpcrfsync_debug); +#define GST_CAT_DEFAULT (avtpcrfsync_debug) + +#define gst_avtp_crf_sync_parent_class parent_class +G_DEFINE_TYPE (GstAvtpCrfSync, gst_avtp_crf_sync, GST_TYPE_AVTP_CRF_BASE); + +static GstFlowReturn gst_avtp_crf_sync_transform_ip (GstBaseTransform * parent, + GstBuffer * buffer); + +static void +gst_avtp_crf_sync_class_init (GstAvtpCrfSyncClass * klass) +{ + GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + + gst_element_class_set_static_metadata (element_class, + "Clock Reference Format (CRF) Synchronizer", + "Filter/Network/AVTP", + "Synchronize Presentation Time from AVTPDUs so they are phase-locked with clock provided by CRF stream", + "Vedang Patel "); + + GST_BASE_TRANSFORM_CLASS (klass)->transform_ip = + GST_DEBUG_FUNCPTR (gst_avtp_crf_sync_transform_ip); + + GST_DEBUG_CATEGORY_INIT (avtpcrfsync_debug, "avtpcrfsync", 0, + "CRF Synchronizer"); +} + +static void +gst_avtp_crf_sync_init (GstAvtpCrfSync * avtpcrfsync) +{ + /* Nothing to do here. */ +} + +static void +set_avtp_tstamp (GstAvtpCrfSync * avtpcrfsync, struct avtp_stream_pdu *pdu, + GstClockTime tstamp) +{ + int res; + guint32 type; + + res = + avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type); + g_assert (res == 0); + + switch (type) { + case AVTP_SUBTYPE_AAF: + res = avtp_aaf_pdu_set (pdu, AVTP_AAF_FIELD_TIMESTAMP, tstamp); + g_assert (res == 0); + break; + case AVTP_SUBTYPE_CVF: + res = avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, tstamp); + g_assert (res == 0); + break; + default: + GST_ERROR_OBJECT (avtpcrfsync, "type 0x%x not supported.\n", type); + break; + } +} + +static void +set_avtp_mr_bit (GstAvtpCrfSync * avtpcrfsync, struct avtp_stream_pdu *pdu, + guint64 mr) +{ + int res; + guint32 type; + + res = + avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type); + g_assert (res == 0); + + switch (type) { + case AVTP_SUBTYPE_AAF: + res = avtp_aaf_pdu_set (pdu, AVTP_AAF_FIELD_MR, mr); + g_assert (res == 0); + break; + case AVTP_SUBTYPE_CVF: + res = avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_MR, mr); + g_assert (res == 0); + break; + default: + GST_ERROR_OBJECT (avtpcrfsync, "type 0x%x not supported.\n", type); + break; + } +} + +static GstFlowReturn +gst_avtp_crf_sync_transform_ip (GstBaseTransform * parent, GstBuffer * buffer) +{ + GstClockTime tstamp, h264_time = 0, adjusted_tstamp, adjusted_h264_time = 0; + GstAvtpCrfBase *avtpcrfbase = GST_AVTP_CRF_BASE (parent); + GstAvtpCrfSync *avtpcrfsync = GST_AVTP_CRF_SYNC (avtpcrfbase); + GstAvtpCrfThreadData *thread_data = &avtpcrfbase->thread_data; + GstClockTime current_ts = thread_data->current_ts; + gdouble avg_period = thread_data->average_period; + struct avtp_stream_pdu *pdu; + gboolean h264_packet; + GstMapInfo info; + gboolean res; + + if (!avg_period || !current_ts) + return GST_FLOW_OK; + + res = gst_buffer_map (buffer, &info, GST_MAP_READWRITE); + if (!res) { + GST_ELEMENT_ERROR (avtpcrfsync, RESOURCE, OPEN_WRITE, + ("cannot access buffer"), (NULL)); + return GST_FLOW_ERROR; + } + + if (!buffer_size_valid (&info)) { + GST_DEBUG_OBJECT (avtpcrfsync, "Malformed AVTPDU, discarding it"); + goto exit; + } + + pdu = (struct avtp_stream_pdu *) info.data; + + h264_packet = h264_tstamp_valid (pdu); + + if (h264_packet) { + res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, &h264_time); + g_assert (res == 0); + + /* + * Extrapolate H264 tstamp to 64 bit and assume it's greater than CRF + * timestamp. + */ + h264_time |= current_ts & 0xFFFFFFFF00000000; + if (h264_time < current_ts) + h264_time += (1ULL << 32); + + /* + * float typecasted to guint64 truncates the decimal part. So, round() it + * before casting. + */ + adjusted_h264_time = + (GstClockTime) roundl (current_ts + ceill (((gdouble) h264_time - + current_ts) / avg_period) * avg_period); + res = + avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, + adjusted_h264_time); + g_assert (res == 0); + + GST_LOG_OBJECT (avtpcrfsync, + "Adjust H264 timestamp in CVF packet. tstamp: %" G_GUINT64_FORMAT + " adjusted_tstamp: %" G_GUINT64_FORMAT, + h264_time & 0xFFFFFFFF, adjusted_h264_time & 0xFFFFFFFF); + } + + tstamp = get_avtp_tstamp (avtpcrfbase, pdu); + if (tstamp == GST_CLOCK_TIME_NONE) + goto exit; + + /* + * Extrapolate the 32-bit AVTP Timestamp to 64-bit and assume it's greater + * than the 64-bit CRF timestamp. + */ + tstamp |= current_ts & 0xFFFFFFFF00000000; + if (tstamp < current_ts) + tstamp += (1ULL << 32); + + /* + * float typecasted to guint64 truncates the decimal part. So, round() it + * before casting. + */ + adjusted_tstamp = + (GstClockTime) roundl (current_ts + ceill ((tstamp - + current_ts) / avg_period) * avg_period); + set_avtp_tstamp (avtpcrfsync, pdu, adjusted_tstamp); + set_avtp_mr_bit (avtpcrfsync, pdu, thread_data->mr); + GST_LOG_OBJECT (avtpcrfsync, + "Adjust AVTP timestamp. tstamp: %" G_GUINT64_FORMAT + " Adjusted tstamp: %" G_GUINT64_FORMAT, + tstamp & 0xFFFFFFFF, adjusted_tstamp & 0xFFFFFFFF); + + /* + * Since we adjusted the AVTP/H264 presentation times in the AVTPDU, we also + * need to adjust buffer times by the same amount so that the buffer is + * transmitted at the right time. + */ + if (h264_packet) { + if (GST_BUFFER_DTS (buffer) != GST_CLOCK_TIME_NONE) + GST_BUFFER_DTS (buffer) += adjusted_tstamp - tstamp; + GST_BUFFER_PTS (buffer) += adjusted_h264_time - h264_time; + } else { + GST_BUFFER_PTS (buffer) += adjusted_tstamp - tstamp; + } + +exit: + gst_buffer_unmap (buffer, &info); + return GST_FLOW_OK; +} + +gboolean +gst_avtp_crf_sync_plugin_init (GstPlugin * plugin) +{ + return gst_element_register (plugin, "avtpcrfsync", GST_RANK_NONE, + GST_TYPE_AVTP_CRF_SYNC); +} diff --git a/ext/avtp/gstavtpcrfsync.h b/ext/avtp/gstavtpcrfsync.h new file mode 100644 index 0000000000..37ee183adb --- /dev/null +++ b/ext/avtp/gstavtpcrfsync.h @@ -0,0 +1,56 @@ +/* + * GStreamer AVTP Plugin + * Copyright (C) 2019 Intel Corporation + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_AVTP_CRF_SYNC_H__ +#define __GST_AVTP_CRF_SYNC_H__ + +#include + +#include "gstavtpcrfbase.h" + +G_BEGIN_DECLS +#define GST_TYPE_AVTP_CRF_SYNC (gst_avtp_crf_sync_get_type()) +#define GST_AVTP_CRF_SYNC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_AVTP_CRF_SYNC,GstAvtpCrfSync)) +#define GST_AVTP_CRF_SYNC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_AVTP_CRF_SYNC,GstAvtpCrfSyncClass)) +#define GST_IS_AVTP_CRF_SYNC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_AVTP_CRF_SYNC)) +#define GST_IS_AVTP_CRF_SYNC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_AVTP_CRF_SYNC)) +typedef struct _GstAvtpCrfSync GstAvtpCrfSync; +typedef struct _GstAvtpCrfSyncClass GstAvtpCrfSyncClass; + +struct _GstAvtpCrfSync +{ + GstAvtpCrfBase avtpcrfbase; +}; + +struct _GstAvtpCrfSyncClass +{ + GstAvtpCrfBaseClass parent_class; +}; + +GType gst_avtp_crf_sync_get_type (void); + +gboolean gst_avtp_crf_sync_plugin_init (GstPlugin * plugin); + +G_END_DECLS +#endif /* __GST_AVTP_CRF_SYNC_H__ */ diff --git a/ext/avtp/gstavtpcrfutil.c b/ext/avtp/gstavtpcrfutil.c new file mode 100644 index 0000000000..1c60e76550 --- /dev/null +++ b/ext/avtp/gstavtpcrfutil.c @@ -0,0 +1,117 @@ +/* + * GStreamer AVTP Plugin + * Copyright (C) 2019 Intel Corporation + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#include +#include +#include +#include + +#include "gstavtpcrfutil.h" + +#define AVTP_CVF_H264_HEADER_SIZE (sizeof(struct avtp_stream_pdu) + sizeof(guint32)) + +gboolean +buffer_size_valid (GstMapInfo * info) +{ + struct avtp_stream_pdu *pdu; + guint64 subtype; + guint32 type; + int res; + + if (info->size < sizeof (struct avtp_stream_pdu)) + return FALSE; + + pdu = (struct avtp_stream_pdu *) info->data; + + res = + avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type); + g_assert (res == 0); + res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_FORMAT_SUBTYPE, &subtype); + g_assert (res == 0); + + if (type == AVTP_SUBTYPE_CVF && subtype == AVTP_CVF_FORMAT_SUBTYPE_H264 + && info->size < AVTP_CVF_H264_HEADER_SIZE) + return FALSE; + + return TRUE; +} + +GstClockTime +get_avtp_tstamp (GstAvtpCrfBase * avtpcrfbase, struct avtp_stream_pdu * pdu) +{ + guint64 tstamp = GST_CLOCK_TIME_NONE, tstamp_valid; + guint32 type; + int res; + + res = + avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type); + g_assert (res == 0); + + switch (type) { + case AVTP_SUBTYPE_AAF: + res = avtp_aaf_pdu_get (pdu, AVTP_AAF_FIELD_TV, &tstamp_valid); + g_assert (res == 0); + if (!tstamp_valid) + break; + + res = avtp_aaf_pdu_get (pdu, AVTP_AAF_FIELD_TIMESTAMP, &tstamp); + g_assert (res == 0); + break; + case AVTP_SUBTYPE_CVF: + res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_TV, &tstamp_valid); + g_assert (res == 0); + if (!tstamp_valid) + break; + + res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_TIMESTAMP, &tstamp); + g_assert (res == 0); + break; + default: + GST_INFO_OBJECT (avtpcrfbase, "type 0x%x not supported.\n", type); + break; + } + + return (GstClockTime) tstamp; +} + +gboolean +h264_tstamp_valid (struct avtp_stream_pdu * pdu) +{ + guint64 subtype, h264_time_valid; + guint32 type; + int res; + + /* + * Validate H264 timestamp for H264 format. For more details about the + * timestamp look at IEEE 1722-2016 Section 8.5.3.1 + */ + res = + avtp_pdu_get ((struct avtp_common_pdu *) pdu, AVTP_FIELD_SUBTYPE, &type); + g_assert (res == 0); + if (type == AVTP_SUBTYPE_CVF) { + res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_FORMAT_SUBTYPE, &subtype); + g_assert (res == 0); + res = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_H264_PTV, &h264_time_valid); + g_assert (res == 0); + + if (subtype == AVTP_CVF_FORMAT_SUBTYPE_H264 && h264_time_valid) + return TRUE; + } + return FALSE; +} diff --git a/ext/avtp/gstavtpcrfutil.h b/ext/avtp/gstavtpcrfutil.h new file mode 100644 index 0000000000..913b27a74f --- /dev/null +++ b/ext/avtp/gstavtpcrfutil.h @@ -0,0 +1,32 @@ +/* + * GStreamer AVTP Plugin + * Copyright (C) 2019 Intel Corporation + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifndef __GST_AVTP_CRF_UTILS_H__ +#define __GST_AVTP_CRF_UTILS_H__ + +#include + +#include "gstavtpcrfbase.h" + +gboolean buffer_size_valid (GstMapInfo * info); +GstClockTime get_avtp_tstamp (GstAvtpCrfBase * avtpcrfbase, + struct avtp_stream_pdu * pdu); +gboolean h264_tstamp_valid (struct avtp_stream_pdu * pdu); + +#endif /* __GST_AVTP_CRF_UTILS_H__ */ diff --git a/ext/avtp/meson.build b/ext/avtp/meson.build index e0f2f7960f..862b269a4e 100644 --- a/ext/avtp/meson.build +++ b/ext/avtp/meson.build @@ -8,6 +8,9 @@ avtp_sources = [ 'gstavtpbasepayload.c', 'gstavtpsink.c', 'gstavtpsrc.c', + 'gstavtpcrfutil.c', + 'gstavtpcrfbase.c', + 'gstavtpcrfsync.c', ] avtp_dep = dependency('avtp', required: get_option('avtp')) @@ -17,7 +20,7 @@ if avtp_dep.found() and cc.has_type('struct sock_txtime', prefix : '#include