The properties on the udpsink/udpsrc elements need to be set before there is any state change. If not, in a network without default gateway, udpsink tries to bind an a NULL interface and fails. Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1411>
		
			
				
	
	
		
			1442 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			1442 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /* GStreamer RIST plugin
 | |
|  * Copyright (C) 2019 Net Insight AB
 | |
|  *     Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
 | |
|  *
 | |
|  * This library is free software; you can redistribute it and/or
 | |
|  * modify it under the terms of the GNU Library General Public
 | |
|  * License as published by the Free Software Foundation; either
 | |
|  * version 2 of the License, or (at your option) any later version.
 | |
|  *
 | |
|  * This library is distributed in the hope that it will be useful,
 | |
|  * but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 | |
|  * Library General Public License for more details.
 | |
|  *
 | |
|  * You should have received a copy of the GNU Library General Public
 | |
|  * License along with this library; if not, write to the
 | |
|  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 | |
|  * Boston, MA 02110-1301, USA.
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * SECTION:element-ristsink
 | |
|  * @title: ristsink
 | |
|  * @see_also: ristsrc
 | |
|  *
 | |
|  * This element implements RIST TR-06-1 Simple Profile transmitter. It
 | |
|  * currently supports any registered RTP static payload types such as
 | |
|  * MPEG TS. The stream passed to this element must be already RTP
 | |
|  * payloaded.  Even though RTP SSRC collision are rare in
 | |
|  * unidirectional streaming, this element expects the upstream elements
 | |
|  * to obey to collision events and change the SSRC in use. Collisions
 | |
|  * will occur when transmitting and receiving over multicast on the
 | |
|  * same host, and will be properly ignored.
 | |
|  *
 | |
|  * It also implements part of the RIST TR-06-2 Main Profile transmitter. The
 | |
|  * tunneling, multiplexing and encryption parts of the specification are not
 | |
|  * included. This element will include the RIST header extension if either of
 | |
|  * the "sequence-number-extension" or "drop-null-ts-packets" properties are set.
 | |
|  *
 | |
|  * ## Example gst-launch line
 | |
|  * |[
 | |
|  * gst-launch-1.0 udpsrc ! tsparse set-timestamps=1 smoothing-latency=40000 ! \
 | |
|  * rtpmp2tpay ! ristsink address=10.0.0.1 port=5004
 | |
|  * ]|
 | |
|  *
 | |
|  * Additionally, this element supports bonding, which consist of using
 | |
|  * multiple links in order to transmit the streams. The address of
 | |
|  * each link is configured through the "bonding-addresses"
 | |
|  * property. When set, this will replace the value that might have
 | |
|  * been set on the "address" and "port" properties. Each link will be
 | |
|  * mapped to its own RTP session. RTX request are only replied to on the
 | |
|  * link the NACK was received from.
 | |
|  *
 | |
|  * There are currently two bonding methods in place: "broadcast" and "round-robin".
 | |
|  * In "broadcast" mode, all the packets are duplicated over all sessions.
 | |
|  * While in "round-robin" mode, packets are evenly distributed over the links. One
 | |
|  * can also implement its own dispatcher element and configure it using the
 | |
|  * "dispatcher" property. As a reference, "broadcast" mode is implemented with
 | |
|  * the "tee" element, while "round-robin" mode is implemented with the
 | |
|  * "round-robin" element.
 | |
|  *
 | |
|  * ## Example gst-launch line for bonding
 | |
|  * |[
 | |
|  * gst-launch-1.0 udpsrc ! tsparse set-timestamps=1 smoothing-latency=40000 ! \
 | |
|  *  rtpmp2tpay ! ristsink bonding-addresses="10.0.0.1:5004,11.0.0.1:5006"
 | |
|  * ]|
 | |
|  */
 | |
| 
 | |
| /* using GValueArray, which has not replacement */
 | |
| #define GLIB_DISABLE_DEPRECATION_WARNINGS
 | |
| 
 | |
| #ifdef HAVE_CONFIG_H
 | |
| #include "config.h"
 | |
| #endif
 | |
| 
 | |
| #include <gio/gio.h>
 | |
| #include <gst/rtp/rtp.h>
 | |
| 
 | |
| /* for strtol() */
 | |
| #include <stdlib.h>
 | |
| 
 | |
| #include "gstrist.h"
 | |
| 
 | |
| GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug);
 | |
| #define GST_CAT_DEFAULT gst_rist_sink_debug
 | |
| 
 | |
| enum
 | |
| {
 | |
|   PROP_ADDRESS = 1,
 | |
|   PROP_PORT,
 | |
|   PROP_SENDER_BUFFER,
 | |
|   PROP_MIN_RTCP_INTERVAL,
 | |
|   PROP_MAX_RTCP_BANDWIDTH,
 | |
|   PROP_STATS_UPDATE_INTERVAL,
 | |
|   PROP_STATS,
 | |
|   PROP_CNAME,
 | |
|   PROP_MULTICAST_LOOPBACK,
 | |
|   PROP_MULTICAST_IFACE,
 | |
|   PROP_MULTICAST_TTL,
 | |
|   PROP_BONDING_ADDRESSES,
 | |
|   PROP_BONDING_METHOD,
 | |
|   PROP_DISPATCHER,
 | |
|   PROP_DROP_NULL_TS_PACKETS,
 | |
|   PROP_SEQUENCE_NUMBER_EXTENSION
 | |
| };
 | |
| 
 | |
| typedef enum
 | |
| {
 | |
|   GST_RIST_BONDING_METHOD_BROADCAST,
 | |
|   GST_RIST_BONDING_METHOD_ROUND_ROBIN,
 | |
| } GstRistBondingMethod;
 | |
| 
 | |
| static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
 | |
|     GST_PAD_SINK,
 | |
|     GST_PAD_ALWAYS,
 | |
|     GST_STATIC_CAPS ("application/x-rtp"));
 | |
| 
 | |
| typedef struct
 | |
| {
 | |
|   guint session;
 | |
|   gchar *address;
 | |
|   gchar *multicast_iface;
 | |
|   guint port;
 | |
|   GstElement *rtcp_src;
 | |
|   GstElement *rtp_sink;
 | |
|   GstElement *rtcp_sink;
 | |
|   GstElement *rtx_send;
 | |
|   GstElement *rtx_queue;
 | |
|   guint32 rtcp_ssrc;
 | |
| } RistSenderBond;
 | |
| 
 | |
| struct _GstRistSink
 | |
| {
 | |
|   GstBin parent;
 | |
| 
 | |
|   /* Common elements in the pipeline */
 | |
|   GstElement *rtpbin;
 | |
|   GstElement *ssrc_filter;
 | |
|   GstPad *sinkpad;
 | |
|   GstElement *rtxbin;
 | |
|   GstElement *dispatcher;
 | |
|   GstElement *rtpext;
 | |
| 
 | |
|   /* Common properties, protected by bonds_lock */
 | |
|   gint multicast_ttl;
 | |
|   gboolean multicast_loopback;
 | |
|   GstClockTime min_rtcp_interval;
 | |
|   gdouble max_rtcp_bandwidth;
 | |
|   GstRistBondingMethod bonding_method;
 | |
| 
 | |
|   /* Bonds */
 | |
|   GPtrArray *bonds;
 | |
|   /* this is needed as setting sibling properties will try to take the object
 | |
|    * lock. Thus, any properties that affects the bonds will be protected with
 | |
|    * that lock instead of the object lock. */
 | |
|   GMutex bonds_lock;
 | |
| 
 | |
|   /* For stats */
 | |
|   guint stats_interval;
 | |
|   guint32 rtp_ssrc;
 | |
|   GstClockID stats_cid;
 | |
| 
 | |
|   /* This is set whenever there is a pipeline construction failure, and used
 | |
|    * to fail state changes later */
 | |
|   gboolean construct_failed;
 | |
|   const gchar *missing_plugin;
 | |
| };
 | |
| 
 | |
| static GType
 | |
| gst_rist_bonding_method_get_type (void)
 | |
| {
 | |
|   static gsize id = 0;
 | |
|   static const GEnumValue values[] = {
 | |
|     {GST_RIST_BONDING_METHOD_BROADCAST,
 | |
|         "GST_RIST_BONDING_METHOD_BROADCAST", "broadcast"},
 | |
|     {GST_RIST_BONDING_METHOD_ROUND_ROBIN,
 | |
|         "GST_RIST_BONDING_METHOD_ROUND_ROBIN", "round-robin"},
 | |
|     {0, NULL, NULL}
 | |
|   };
 | |
| 
 | |
|   if (g_once_init_enter (&id)) {
 | |
|     GType tmp = g_enum_register_static ("GstRistBondingMethodType", values);
 | |
|     g_once_init_leave (&id, tmp);
 | |
|   }
 | |
| 
 | |
|   return (GType) id;
 | |
| }
 | |
| 
 | |
| G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN,
 | |
|     GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink"));
 | |
| GST_ELEMENT_REGISTER_DEFINE (ristsink, "ristsink", GST_RANK_PRIMARY,
 | |
|     GST_TYPE_RIST_SINK);
 | |
| 
 | |
| GQuark session_id_quark = 0;
 | |
| 
 | |
| static RistSenderBond *
 | |
| gst_rist_sink_add_bond (GstRistSink * sink)
 | |
| {
 | |
|   RistSenderBond *bond = g_slice_new0 (RistSenderBond);
 | |
|   GstPad *pad, *gpad;
 | |
|   gchar name[32];
 | |
| 
 | |
|   bond->session = sink->bonds->len;
 | |
|   bond->address = g_strdup ("localhost");
 | |
| 
 | |
|   g_snprintf (name, 32, "rist_rtp_udpsink%u", bond->session);
 | |
|   bond->rtp_sink = gst_element_factory_make ("udpsink", name);
 | |
|   if (!bond->rtp_sink) {
 | |
|     g_slice_free (RistSenderBond, bond);
 | |
|     sink->missing_plugin = "udp";
 | |
|     return NULL;
 | |
|   }
 | |
| 
 | |
|   /* these are all from UDP plugin, so they cannot fail */
 | |
|   g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
 | |
|   bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
 | |
|   g_snprintf (name, 32, "rist_rtcp_udpsink%u", bond->session);
 | |
|   bond->rtcp_sink = gst_element_factory_make ("udpsink", name);
 | |
|   g_object_set (bond->rtcp_sink, "async", FALSE, NULL);
 | |
| 
 | |
|   gst_bin_add_many (GST_BIN (sink), bond->rtp_sink, bond->rtcp_src,
 | |
|       bond->rtcp_sink, NULL);
 | |
|   gst_element_set_locked_state (bond->rtcp_src, TRUE);
 | |
|   gst_element_set_locked_state (bond->rtcp_sink, TRUE);
 | |
| 
 | |
|   g_snprintf (name, 32, "rist_rtx_queue%u", bond->session);
 | |
|   bond->rtx_queue = gst_element_factory_make ("queue", name);
 | |
|   gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_queue);
 | |
| 
 | |
|   g_snprintf (name, 32, "rist_rtx_send%u", bond->session);
 | |
|   bond->rtx_send = gst_element_factory_make ("ristrtxsend", name);
 | |
|   if (!bond->rtx_send) {
 | |
|     sink->missing_plugin = "rtpmanager";
 | |
|     g_slice_free (RistSenderBond, bond);
 | |
|     return NULL;
 | |
|   }
 | |
|   gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_send);
 | |
| 
 | |
|   gst_element_link (bond->rtx_queue, bond->rtx_send);
 | |
| 
 | |
|   pad = gst_element_get_static_pad (bond->rtx_send, "src");
 | |
|   g_snprintf (name, 32, "src_%u", bond->session);
 | |
|   gpad = gst_ghost_pad_new (name, pad);
 | |
|   gst_object_unref (pad);
 | |
|   gst_element_add_pad (sink->rtxbin, gpad);
 | |
| 
 | |
|   g_object_set (bond->rtx_send, "max-size-packets", 0, NULL);
 | |
| 
 | |
|   g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
 | |
|   if (bond->session == 0) {
 | |
|     gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, name);
 | |
|   } else {
 | |
|     GstPad *pad;
 | |
| 
 | |
|     /* to make a sender, we need to create an unused pad on rtpbin, which will
 | |
|      * require an unused pad on the rtxbin */
 | |
|     g_snprintf (name, 32, "sink_%u", bond->session);
 | |
|     pad = gst_ghost_pad_new_no_target (name, GST_PAD_SINK);
 | |
|     gst_element_add_pad (sink->rtxbin, pad);
 | |
| 
 | |
|     g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
 | |
|     pad = gst_element_request_pad_simple (sink->rtpbin, name);
 | |
|     gst_object_unref (pad);
 | |
|   }
 | |
| 
 | |
|   g_snprintf (name, 32, "send_rtp_src_%u", bond->session);
 | |
|   gst_element_link_pads (sink->rtpbin, name, bond->rtp_sink, "sink");
 | |
| 
 | |
|   g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
 | |
|   gst_element_link_pads (bond->rtcp_src, "src", sink->rtpbin, name);
 | |
| 
 | |
|   g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
 | |
|   gst_element_link_pads (sink->rtpbin, name, bond->rtcp_sink, "sink");
 | |
| 
 | |
|   g_ptr_array_add (sink->bonds, bond);
 | |
|   return bond;
 | |
| }
 | |
| 
 | |
| static GstCaps *
 | |
| gst_rist_sink_request_pt_map (GstRistSrc * sink, guint session_id, guint pt,
 | |
|     GstElement * rtpbin)
 | |
| {
 | |
|   const GstRTPPayloadInfo *pt_info;
 | |
|   GstCaps *ret;
 | |
| 
 | |
|   pt_info = gst_rtp_payload_info_for_pt (pt);
 | |
|   if (!pt_info || !pt_info->clock_rate)
 | |
|     return NULL;
 | |
| 
 | |
|   ret = gst_caps_new_simple ("application/x-rtp",
 | |
|       "media", G_TYPE_STRING, pt_info->media,
 | |
|       "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
 | |
|       "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);
 | |
| 
 | |
|   /* FIXME add sprop-parameter-set if any */
 | |
|   g_warn_if_fail (pt_info->encoding_parameters == NULL);
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| static GstElement *
 | |
| gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id,
 | |
|     GstElement * rtpbin)
 | |
| {
 | |
|   return gst_object_ref (sink->rtxbin);
 | |
| }
 | |
| 
 | |
| static void
 | |
| on_receiving_rtcp (GObject * session, GstBuffer * buffer, GstRistSink * sink)
 | |
| {
 | |
|   RistSenderBond *bond = NULL;
 | |
|   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
 | |
| 
 | |
|   if (gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp)) {
 | |
|     GstRTCPPacket packet;
 | |
| 
 | |
|     if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
 | |
|       /* Always skip the first one as it's never a FB or APP packet */
 | |
| 
 | |
|       while (gst_rtcp_packet_move_to_next (&packet)) {
 | |
|         guint32 ssrc;
 | |
| 
 | |
|         switch (gst_rtcp_packet_get_type (&packet)) {
 | |
|           case GST_RTCP_TYPE_APP:
 | |
|             if (memcmp (gst_rtcp_packet_app_get_name (&packet), "RIST", 4) == 0)
 | |
|               ssrc = gst_rtcp_packet_app_get_ssrc (&packet);
 | |
|             else
 | |
|               continue;
 | |
|             break;
 | |
|           case GST_RTCP_TYPE_RTPFB:
 | |
|             if (gst_rtcp_packet_fb_get_type (&packet) ==
 | |
|                 GST_RTCP_RTPFB_TYPE_NACK)
 | |
|               ssrc = gst_rtcp_packet_fb_get_media_ssrc (&packet);
 | |
|             else
 | |
|               continue;
 | |
|             break;
 | |
|           default:
 | |
|             continue;
 | |
|         }
 | |
| 
 | |
|         /* The SSRC could be that of the original data or the
 | |
|          * retransmission. So for the last bit to 0.
 | |
|          */
 | |
|         ssrc &= 0xFFFFFFFE;
 | |
| 
 | |
|         if (bond == NULL) {
 | |
|           guint session_id =
 | |
|               GPOINTER_TO_UINT (g_object_get_qdata (session, session_id_quark));
 | |
| 
 | |
|           bond = g_ptr_array_index (sink->bonds, session_id);
 | |
|           if (bond == NULL) {
 | |
|             g_critical ("Can't find session id %u", session_id);
 | |
|             goto done;
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         gst_rist_rtx_send_clear_extseqnum (GST_RIST_RTX_SEND (bond->rtx_send),
 | |
|             ssrc);
 | |
|       }
 | |
|     }
 | |
|   done:
 | |
|     gst_rtcp_buffer_unmap (&rtcp);
 | |
|   }
 | |
| }
 | |
| 
 | |
| static void
 | |
| on_app_rtcp (GObject * session, guint32 subtype, guint32 ssrc,
 | |
|     const gchar * name, GstBuffer * data, GstRistSink * sink)
 | |
| {
 | |
|   if (g_str_equal (name, "RIST")) {
 | |
|     guint session_id =
 | |
|         GPOINTER_TO_UINT (g_object_get_qdata (session, session_id_quark));
 | |
| 
 | |
|     if (subtype == 0) {
 | |
|       GstEvent *event;
 | |
|       GstPad *send_rtp_sink;
 | |
|       GstMapInfo map;
 | |
|       gint i;
 | |
|       GstElement *gstsession;
 | |
| 
 | |
|       g_signal_emit_by_name (sink->rtpbin, "get-session", session_id,
 | |
|           &gstsession);
 | |
| 
 | |
|       send_rtp_sink = gst_element_get_static_pad (gstsession, "send_rtp_sink");
 | |
|       if (send_rtp_sink) {
 | |
|         gst_buffer_map (data, &map, GST_MAP_READ);
 | |
| 
 | |
|         for (i = 0; i < map.size; i += sizeof (guint32)) {
 | |
|           guint32 dword = GST_READ_UINT32_BE (map.data + i);
 | |
|           guint16 seqnum = dword >> 16;
 | |
|           guint16 num = dword & 0x0000FFFF;
 | |
|           guint16 j;
 | |
| 
 | |
|           GST_DEBUG ("got RIST nack packet, #%u %u", seqnum, num);
 | |
| 
 | |
|           /* num is inclusive, i.e. it can be 0, which means exactly 1 seqnum */
 | |
|           for (j = 0; j <= num; j++) {
 | |
|             event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
 | |
|                 gst_structure_new ("GstRTPRetransmissionRequest",
 | |
|                     "seqnum", G_TYPE_UINT, (guint) seqnum + j,
 | |
|                     "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
 | |
|             gst_pad_push_event (send_rtp_sink, event);
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         gst_buffer_unmap (data, &map);
 | |
|         gst_object_unref (send_rtp_sink);
 | |
|       }
 | |
|     } else if (subtype == 1) {
 | |
|       GstMapInfo map;
 | |
|       RistSenderBond *bond;
 | |
|       guint16 seqnum_ext;
 | |
| 
 | |
|       bond = g_ptr_array_index (sink->bonds, session_id);
 | |
| 
 | |
|       if (gst_buffer_get_size (data) < 4) {
 | |
|         if (bond)
 | |
|           gst_rist_rtx_send_clear_extseqnum (GST_RIST_RTX_SEND (bond->rtx_send),
 | |
|               ssrc);
 | |
| 
 | |
|         GST_WARNING_OBJECT (sink, "RIST APP RTCP packet is too small,"
 | |
|             " it's %zu bytes, less than the expected 4 bytes",
 | |
|             gst_buffer_get_size (data));
 | |
|         return;
 | |
|       }
 | |
| 
 | |
|       gst_buffer_map (data, &map, GST_MAP_READ);
 | |
|       seqnum_ext = GST_READ_UINT16_BE (map.data);
 | |
|       gst_buffer_unmap (data, &map);
 | |
| 
 | |
|       if (bond)
 | |
|         gst_rist_rtx_send_set_extseqnum (GST_RIST_RTX_SEND (bond->rtx_send),
 | |
|             ssrc, seqnum_ext);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_on_new_sender_ssrc (GstRistSink * sink, guint session_id,
 | |
|     guint ssrc, GstElement * rtpbin)
 | |
| {
 | |
|   GObject *gstsession = NULL;
 | |
|   GObject *session = NULL;
 | |
|   GObject *source = NULL;
 | |
| 
 | |
|   if (session_id != 0)
 | |
|     return;
 | |
| 
 | |
|   g_signal_emit_by_name (rtpbin, "get-session", session_id, &gstsession);
 | |
|   g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
 | |
|   g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
 | |
|   g_object_set_qdata (session, session_id_quark, GUINT_TO_POINTER (session_id));
 | |
| 
 | |
|   if (ssrc & 1) {
 | |
|     g_object_set (source, "disable-rtcp", TRUE, NULL);
 | |
|   } else {
 | |
|     g_signal_connect_object (session, "on-app-rtcp",
 | |
|         (GCallback) on_app_rtcp, sink, 0);
 | |
|     g_signal_connect_object (session, "on-receiving-rtcp",
 | |
|         (GCallback) on_receiving_rtcp, sink, 0);
 | |
|   }
 | |
| 
 | |
|   g_object_unref (source);
 | |
|   g_object_unref (session);
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id,
 | |
|     guint ssrc, GstElement * rtpbin)
 | |
| {
 | |
|   RistSenderBond *bond;
 | |
| 
 | |
|   if (session_id != 0)
 | |
|     return;
 | |
| 
 | |
|   GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc);
 | |
|   bond = g_ptr_array_index (sink->bonds, session_id);
 | |
|   bond->rtcp_ssrc = ssrc;
 | |
| }
 | |
| 
 | |
| static GstPadProbeReturn
 | |
| gst_rist_sink_fix_collision (GstPad * pad, GstPadProbeInfo * info,
 | |
|     gpointer user_data)
 | |
| {
 | |
|   GstEvent *event = info->data;
 | |
|   const GstStructure *cs;
 | |
|   GstStructure *s;
 | |
|   guint ssrc;
 | |
| 
 | |
|   /* We simply ignore collisions */
 | |
|   if (GST_EVENT_TYPE (event) != GST_EVENT_CUSTOM_UPSTREAM)
 | |
|     return GST_PAD_PROBE_OK;
 | |
| 
 | |
|   cs = gst_event_get_structure (event);
 | |
|   if (!gst_structure_has_name (cs, "GstRTPCollision"))
 | |
|     return GST_PAD_PROBE_OK;
 | |
| 
 | |
|   gst_structure_get_uint (cs, "suggested-ssrc", &ssrc);
 | |
|   if ((ssrc & 1) == 0)
 | |
|     return GST_PAD_PROBE_OK;
 | |
| 
 | |
|   event = info->data = gst_event_make_writable (event);
 | |
|   /* we can drop the const qualifier as we ensured writability */
 | |
|   s = (GstStructure *) gst_event_get_structure (event);
 | |
|   gst_structure_set (s, "suggested-ssrc", G_TYPE_UINT, ssrc - 1, NULL);
 | |
| 
 | |
|   return GST_PAD_PROBE_OK;
 | |
| }
 | |
| 
 | |
| static gboolean
 | |
| gst_rist_sink_set_caps (GstRistSink * sink, GstCaps * caps)
 | |
| {
 | |
|   const GstStructure *s = gst_caps_get_structure (caps, 0);
 | |
| 
 | |
|   if (!gst_structure_get_uint (s, "ssrc", &sink->rtp_ssrc)) {
 | |
|     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, ("No 'ssrc' field in caps."),
 | |
|         (NULL));
 | |
|     return FALSE;
 | |
|   }
 | |
| 
 | |
|   if (sink->rtp_ssrc & 1) {
 | |
|     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION,
 | |
|         ("Invalid RIST SSRC, LSB must be zero."), (NULL));
 | |
|     return FALSE;
 | |
|   }
 | |
| 
 | |
|   return TRUE;
 | |
| }
 | |
| 
 | |
| static gboolean
 | |
| gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 | |
| {
 | |
|   GstRistSink *sink = GST_RIST_SINK (parent);
 | |
|   GstCaps *caps;
 | |
|   gboolean ret = TRUE;
 | |
| 
 | |
|   switch (GST_EVENT_TYPE (event)) {
 | |
|     case GST_EVENT_CAPS:
 | |
|       gst_event_parse_caps (event, &caps);
 | |
|       ret = gst_rist_sink_set_caps (sink, caps);
 | |
|       break;
 | |
|     default:
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   if (ret)
 | |
|     ret = gst_pad_event_default (pad, parent, event);
 | |
|   else
 | |
|     gst_event_unref (event);
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_init (GstRistSink * sink)
 | |
| {
 | |
|   GstPad *ssrc_filter_sinkpad, *rtxbin_gpad;
 | |
|   GstCaps *ssrc_caps;
 | |
|   GstStructure *sdes = NULL;
 | |
|   RistSenderBond *bond;
 | |
| 
 | |
|   sink->rtpext = gst_element_factory_make ("ristrtpext", "ristrtpext");
 | |
| 
 | |
|   g_mutex_init (&sink->bonds_lock);
 | |
|   sink->bonds = g_ptr_array_new ();
 | |
| 
 | |
|   /* Construct the RIST RTP sender pipeline.
 | |
|    *
 | |
|    * capsfilter*-> [send_rtp_sink_%u]   --------  [send_rtp_src_%u]  -> udpsink
 | |
|    *                                   | rtpbin |
 | |
|    * udpsrc     -> [recv_rtcp_sink_%u]  --------  [send_rtcp_src_%u] -> * udpsink
 | |
|    *
 | |
|    * * To select RIST compatible SSRC
 | |
|    */
 | |
|   sink->rtpbin = gst_element_factory_make ("rtpbin", "rist_send_rtpbin");
 | |
|   if (!sink->rtpbin) {
 | |
|     sink->missing_plugin = "rtpmanager";
 | |
|     goto missing_plugin;
 | |
|   }
 | |
| 
 | |
|   /* RIST specification says the SDES should only contain the CNAME */
 | |
|   g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
 | |
|   gst_structure_remove_field (sdes, "tool");
 | |
| 
 | |
|   gst_bin_add (GST_BIN (sink), sink->rtpbin);
 | |
|   g_object_set (sink->rtpbin, "do-retransmission", TRUE,
 | |
|       "rtp-profile", 3 /* AVFP */ ,
 | |
|       "sdes", sdes, NULL);
 | |
|   gst_structure_free (sdes);
 | |
| 
 | |
|   g_signal_connect_object (sink->rtpbin, "request-pt-map",
 | |
|       G_CALLBACK (gst_rist_sink_request_pt_map), sink, G_CONNECT_SWAPPED);
 | |
|   g_signal_connect_object (sink->rtpbin, "request-aux-sender",
 | |
|       G_CALLBACK (gst_rist_sink_request_aux_sender), sink, G_CONNECT_SWAPPED);
 | |
|   g_signal_connect_object (sink->rtpbin, "on-new-sender-ssrc",
 | |
|       G_CALLBACK (gst_rist_sink_on_new_sender_ssrc), sink, G_CONNECT_SWAPPED);
 | |
|   g_signal_connect_object (sink->rtpbin, "on-new-ssrc",
 | |
|       G_CALLBACK (gst_rist_sink_on_new_receiver_ssrc), sink, G_CONNECT_SWAPPED);
 | |
| 
 | |
|   sink->rtxbin = gst_bin_new ("rist_send_rtxbin");
 | |
|   g_object_ref_sink (sink->rtxbin);
 | |
| 
 | |
|   rtxbin_gpad = gst_ghost_pad_new_no_target ("sink_0", GST_PAD_SINK);
 | |
|   gst_element_add_pad (sink->rtxbin, rtxbin_gpad);
 | |
| 
 | |
|   sink->ssrc_filter = gst_element_factory_make ("capsfilter",
 | |
|       "rist_ssrc_filter");
 | |
|   gst_bin_add (GST_BIN (sink), sink->ssrc_filter);
 | |
| 
 | |
|   /* RIST RTP SSRC should have LSB set to 0 */
 | |
|   sink->rtp_ssrc = g_random_int () & ~1;
 | |
|   ssrc_caps = gst_caps_new_simple ("application/x-rtp",
 | |
|       "ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL);
 | |
|   gst_caps_append_structure (ssrc_caps,
 | |
|       gst_structure_new_empty ("application/x-rtp"));
 | |
|   g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL);
 | |
|   gst_caps_unref (ssrc_caps);
 | |
| 
 | |
|   ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink");
 | |
|   sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad,
 | |
|       gst_static_pad_template_get (&sink_templ));
 | |
|   gst_pad_set_event_function (sink->sinkpad, gst_rist_sink_event);
 | |
|   gst_element_add_pad (GST_ELEMENT (sink), sink->sinkpad);
 | |
|   gst_object_unref (ssrc_filter_sinkpad);
 | |
| 
 | |
|   gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
 | |
|       gst_rist_sink_fix_collision, sink, NULL);
 | |
| 
 | |
|   bond = gst_rist_sink_add_bond (sink);
 | |
|   if (!bond)
 | |
|     goto missing_plugin;
 | |
| 
 | |
|   return;
 | |
| 
 | |
| missing_plugin:
 | |
|   {
 | |
|     GST_ERROR_OBJECT (sink, "'%s' plugin is missing.", sink->missing_plugin);
 | |
|     sink->construct_failed = TRUE;
 | |
|     /* Just make our element valid, so we fail cleanly */
 | |
|     gst_element_add_pad (GST_ELEMENT (sink),
 | |
|         gst_pad_new_from_static_template (&sink_templ, "sink"));
 | |
|   }
 | |
| }
 | |
| 
 | |
| static gboolean
 | |
| gst_rist_sink_setup_rtcp_socket (GstRistSink * sink, RistSenderBond * bond)
 | |
| {
 | |
|   GSocket *socket = NULL;
 | |
|   GInetAddress *iaddr = NULL;
 | |
|   gchar *remote_addr = NULL;
 | |
|   guint port = bond->port + 1;
 | |
|   GError *error = NULL;
 | |
| 
 | |
|   iaddr = g_inet_address_new_from_string (bond->address);
 | |
|   if (!iaddr) {
 | |
|     GList *results;
 | |
|     GResolver *resolver = NULL;
 | |
| 
 | |
|     resolver = g_resolver_get_default ();
 | |
|     results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error);
 | |
| 
 | |
|     if (!results) {
 | |
|       g_object_unref (resolver);
 | |
|       goto dns_resolve_failed;
 | |
|     }
 | |
| 
 | |
|     iaddr = G_INET_ADDRESS (g_object_ref (results->data));
 | |
| 
 | |
|     g_resolver_free_addresses (results);
 | |
|     g_object_unref (resolver);
 | |
|   }
 | |
|   remote_addr = g_inet_address_to_string (iaddr);
 | |
| 
 | |
|   if (g_inet_address_get_is_multicast (iaddr)) {
 | |
|     g_object_set (bond->rtcp_src, "address", remote_addr, "port", port, NULL);
 | |
|   } else {
 | |
|     const gchar *any_addr;
 | |
| 
 | |
|     if (g_inet_address_get_family (iaddr) == G_SOCKET_FAMILY_IPV6)
 | |
|       any_addr = "::";
 | |
|     else
 | |
|       any_addr = "0.0.0.0";
 | |
| 
 | |
|     g_object_set (bond->rtcp_src, "address", any_addr, "port", 0, NULL);
 | |
|   }
 | |
|   g_free (remote_addr);
 | |
|   g_object_unref (iaddr);
 | |
| 
 | |
|   gst_element_set_locked_state (bond->rtcp_src, FALSE);
 | |
|   gst_element_sync_state_with_parent (bond->rtcp_src);
 | |
| 
 | |
|   /* share the socket created by the sink */
 | |
|   g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
 | |
|   g_object_set (bond->rtcp_sink, "socket", socket, "auto-multicast", FALSE,
 | |
|       "close-socket", FALSE, NULL);
 | |
|   g_object_unref (socket);
 | |
| 
 | |
|   g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
 | |
|   gst_element_set_locked_state (bond->rtcp_sink, FALSE);
 | |
|   gst_element_sync_state_with_parent (bond->rtcp_sink);
 | |
| 
 | |
|   return GST_STATE_CHANGE_SUCCESS;
 | |
| 
 | |
| dns_resolve_failed:
 | |
|   GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND,
 | |
|       ("Could not resolve hostname '%s'", GST_STR_NULL (bond->address)),
 | |
|       ("DNS resolver reported: %s", error->message));
 | |
|   g_error_free (error);
 | |
|   return GST_STATE_CHANGE_FAILURE;
 | |
| }
 | |
| 
 | |
| static GstStateChangeReturn
 | |
| gst_rist_sink_reuse_socket (GstRistSink * sink)
 | |
| {
 | |
|   gint i;
 | |
| 
 | |
|   for (i = 0; i < sink->bonds->len; i++) {
 | |
|     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
 | |
|     GObject *session = NULL;
 | |
|     GstPad *pad;
 | |
|     gchar name[32];
 | |
| 
 | |
|     g_signal_emit_by_name (sink->rtpbin, "get-session", i, &session);
 | |
|     g_object_set (session, "rtcp-min-interval", sink->min_rtcp_interval,
 | |
|         "rtcp-fraction", sink->max_rtcp_bandwidth, NULL);
 | |
|     g_object_unref (session);
 | |
| 
 | |
|     g_snprintf (name, 32, "src_%u", bond->session);
 | |
|     pad = gst_element_request_pad_simple (sink->dispatcher, name);
 | |
|     gst_element_link_pads (sink->dispatcher, name, bond->rtx_queue, "sink");
 | |
|     gst_object_unref (pad);
 | |
| 
 | |
|     if (!gst_rist_sink_setup_rtcp_socket (sink, bond))
 | |
|       return GST_STATE_CHANGE_FAILURE;
 | |
|   }
 | |
| 
 | |
|   return GST_STATE_CHANGE_SUCCESS;
 | |
| }
 | |
| 
 | |
| static GstStateChangeReturn
 | |
| gst_rist_sink_start (GstRistSink * sink)
 | |
| {
 | |
|   GstPad *rtxbin_gpad, *rtpext_sinkpad;
 | |
| 
 | |
|   /* Unless a custom dispatcher was provided, use the specified bonding method
 | |
|    * to create one */
 | |
|   if (!sink->dispatcher) {
 | |
|     switch (sink->bonding_method) {
 | |
|       case GST_RIST_BONDING_METHOD_BROADCAST:
 | |
|         sink->dispatcher = gst_element_factory_make ("tee", "rist_dispatcher");
 | |
|         if (!sink->dispatcher) {
 | |
|           sink->missing_plugin = "coreelements";
 | |
|           sink->construct_failed = TRUE;
 | |
|         }
 | |
|         break;
 | |
|       case GST_RIST_BONDING_METHOD_ROUND_ROBIN:
 | |
|         sink->dispatcher = gst_element_factory_make ("roundrobin",
 | |
|             "rist_dispatcher");
 | |
|         g_assert (sink->dispatcher);
 | |
|         break;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (sink->construct_failed) {
 | |
|     GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN,
 | |
|         ("Your GStreamer installation is missing plugin '%s'",
 | |
|             sink->missing_plugin), (NULL));
 | |
|     return GST_STATE_CHANGE_FAILURE;
 | |
|   }
 | |
| 
 | |
|   gst_bin_add (GST_BIN (sink), sink->rtpext);
 | |
|   rtxbin_gpad = gst_element_get_static_pad (sink->rtxbin, "sink_0");
 | |
|   rtpext_sinkpad = gst_element_get_static_pad (sink->rtpext, "sink");
 | |
|   gst_ghost_pad_set_target (GST_GHOST_PAD (rtxbin_gpad), rtpext_sinkpad);
 | |
|   gst_object_unref (rtpext_sinkpad);
 | |
| 
 | |
|   gst_bin_add (GST_BIN (sink->rtxbin), sink->dispatcher);
 | |
|   gst_element_link (sink->rtpext, sink->dispatcher);
 | |
| 
 | |
|   return GST_STATE_CHANGE_SUCCESS;
 | |
| }
 | |
| 
 | |
| 
 | |
| static GstStructure *
 | |
| gst_rist_sink_create_stats (GstRistSink * sink)
 | |
| {
 | |
|   RistSenderBond *bond;
 | |
|   GstStructure *ret;
 | |
|   GValueArray *session_stats;
 | |
|   guint64 total_pkt_sent = 0, total_rtx_sent = 0;
 | |
|   gint i;
 | |
| 
 | |
|   ret = gst_structure_new_empty ("rist/x-sender-stats");
 | |
|   session_stats = g_value_array_new (sink->bonds->len);
 | |
| 
 | |
|   for (i = 0; i < sink->bonds->len; i++) {
 | |
|     GObject *session = NULL, *source = NULL;
 | |
|     GstStructure *sstats = NULL, *stats;
 | |
|     guint64 pkt_sent = 0, rtx_sent = 0, rtt;
 | |
|     guint rb_rtt = 0;
 | |
|     GValue value = G_VALUE_INIT;
 | |
| 
 | |
|     g_signal_emit_by_name (sink->rtpbin, "get-internal-session", i, &session);
 | |
|     if (!session)
 | |
|       continue;
 | |
| 
 | |
|     stats = gst_structure_new_empty ("rist/x-sender-session-stats");
 | |
|     bond = g_ptr_array_index (sink->bonds, i);
 | |
| 
 | |
|     g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtp_ssrc,
 | |
|         &source);
 | |
|     if (source) {
 | |
|       g_object_get (source, "stats", &sstats, NULL);
 | |
|       gst_structure_get_uint64 (sstats, "packets-sent", &pkt_sent);
 | |
|       gst_structure_free (sstats);
 | |
|       g_clear_object (&source);
 | |
|     }
 | |
| 
 | |
|     g_signal_emit_by_name (session, "get-source-by-ssrc", bond->rtcp_ssrc,
 | |
|         &source);
 | |
|     if (source) {
 | |
|       g_object_get (source, "stats", &sstats, NULL);
 | |
|       gst_structure_get_uint (sstats, "rb-round-trip", &rb_rtt);
 | |
|       gst_structure_free (sstats);
 | |
|       g_clear_object (&source);
 | |
|     }
 | |
|     g_object_unref (session);
 | |
| 
 | |
|     g_object_get (bond->rtx_send, "num-rtx-packets", &rtx_sent, NULL);
 | |
| 
 | |
|     /* rb_rtt is in Q16 in NTP time */
 | |
|     rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536);
 | |
| 
 | |
|     gst_structure_set (stats, "session-id", G_TYPE_INT, i,
 | |
|         "sent-original-packets", G_TYPE_UINT64, pkt_sent,
 | |
|         "sent-retransmitted-packets", G_TYPE_UINT64, rtx_sent,
 | |
|         "round-trip-time", G_TYPE_UINT64, rtt, NULL);
 | |
| 
 | |
|     g_value_init (&value, GST_TYPE_STRUCTURE);
 | |
|     g_value_take_boxed (&value, stats);
 | |
|     g_value_array_append (session_stats, &value);
 | |
|     g_value_unset (&value);
 | |
| 
 | |
|     total_pkt_sent += pkt_sent;
 | |
|     total_rtx_sent += rtx_sent;
 | |
|   }
 | |
| 
 | |
|   gst_structure_set (ret,
 | |
|       "sent-original-packets", G_TYPE_UINT64, total_pkt_sent,
 | |
|       "sent-retransmitted-packets", G_TYPE_UINT64, total_rtx_sent,
 | |
|       "session-stats", G_TYPE_VALUE_ARRAY, session_stats, NULL);
 | |
|   g_value_array_free (session_stats);
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| static gboolean
 | |
| gst_rist_sink_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
 | |
|     gpointer user_data)
 | |
| {
 | |
|   GstRistSink *sink = GST_RIST_SINK (user_data);
 | |
|   GstStructure *stats = gst_rist_sink_create_stats (sink);
 | |
| 
 | |
|   gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (sink), stats);
 | |
| 
 | |
|   gst_structure_free (stats);
 | |
|   return TRUE;
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_enable_stats_interval (GstRistSink * sink)
 | |
| {
 | |
|   GstClock *clock;
 | |
|   GstClockTime start, interval;
 | |
| 
 | |
|   if (sink->stats_interval == 0)
 | |
|     return;
 | |
| 
 | |
|   interval = sink->stats_interval * GST_MSECOND;
 | |
|   clock = gst_system_clock_obtain ();
 | |
|   start = gst_clock_get_time (clock) + interval;
 | |
| 
 | |
|   sink->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
 | |
|   gst_clock_id_wait_async (sink->stats_cid, gst_rist_sink_dump_stats,
 | |
|       gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
 | |
| 
 | |
|   gst_object_unref (clock);
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_disable_stats_interval (GstRistSink * sink)
 | |
| {
 | |
|   if (sink->stats_cid) {
 | |
|     gst_clock_id_unschedule (sink->stats_cid);
 | |
|     gst_clock_id_unref (sink->stats_cid);
 | |
|     sink->stats_cid = NULL;
 | |
|   }
 | |
| }
 | |
| 
 | |
| static GstStateChangeReturn
 | |
| gst_rist_sink_change_state (GstElement * element, GstStateChange transition)
 | |
| {
 | |
|   GstRistSink *sink = GST_RIST_SINK (element);
 | |
|   GstStateChangeReturn ret;
 | |
| 
 | |
|   switch (transition) {
 | |
|     case GST_STATE_CHANGE_NULL_TO_READY:
 | |
|       /* Set the properties to the child elements to avoid binding to
 | |
|        * a NULL interface on a network without a default gateway */
 | |
|       if (gst_rist_sink_start (sink) == GST_STATE_CHANGE_FAILURE)
 | |
|         return GST_STATE_CHANGE_FAILURE;
 | |
|     case GST_STATE_CHANGE_PAUSED_TO_READY:
 | |
|       gst_rist_sink_disable_stats_interval (sink);
 | |
|       break;
 | |
|     default:
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   ret = GST_ELEMENT_CLASS (gst_rist_sink_parent_class)->change_state (element,
 | |
|       transition);
 | |
| 
 | |
|   switch (transition) {
 | |
|     case GST_STATE_CHANGE_NULL_TO_READY:
 | |
|       ret = gst_rist_sink_reuse_socket (sink);
 | |
|       break;
 | |
|     case GST_STATE_CHANGE_READY_TO_PAUSED:
 | |
|       gst_rist_sink_enable_stats_interval (sink);
 | |
|       break;
 | |
|     default:
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| /* called with bonds lock */
 | |
| static void
 | |
| gst_rist_sink_update_bond_address (GstRistSink * sink, RistSenderBond * bond,
 | |
|     const gchar * address, guint port, const gchar * multicast_iface)
 | |
| {
 | |
|   g_free (bond->address);
 | |
|   g_free (bond->multicast_iface);
 | |
|   bond->address = g_strdup (address);
 | |
|   bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
 | |
|   bond->port = port;
 | |
| 
 | |
|   g_object_set (G_OBJECT (bond->rtp_sink), "host", address, "port", port,
 | |
|       "multicast-iface", bond->multicast_iface, NULL);
 | |
|   g_object_set (G_OBJECT (bond->rtcp_sink), "host", address,
 | |
|       "port", port + 1, "multicast-iface", bond->multicast_iface, NULL);
 | |
| 
 | |
|   /* TODO add runtime support
 | |
|    *  - add blocking the pad probe
 | |
|    *  - update RTCP socket
 | |
|    *  - cycle elements through NULL state
 | |
|    */
 | |
| }
 | |
| 
 | |
| /* called with bonds lock */
 | |
| static gchar *
 | |
| gst_rist_sink_get_bonds (GstRistSink * sink)
 | |
| {
 | |
|   GString *bonds = g_string_new ("");
 | |
|   gint i;
 | |
| 
 | |
|   for (i = 0; i < sink->bonds->len; i++) {
 | |
|     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
 | |
|     if (bonds->len > 0)
 | |
|       g_string_append_c (bonds, ':');
 | |
| 
 | |
|     g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);
 | |
| 
 | |
|     if (bond->multicast_iface)
 | |
|       g_string_append_printf (bonds, "/%s", bond->multicast_iface);
 | |
|   }
 | |
| 
 | |
|   return g_string_free (bonds, FALSE);
 | |
| }
 | |
| 
 | |
| struct RistAddress
 | |
| {
 | |
|   gchar *address;
 | |
|   char *multicast_iface;
 | |
|   guint port;
 | |
| };
 | |
| 
 | |
| /* called with bonds lock */
 | |
| static void
 | |
| gst_rist_sink_set_bonds (GstRistSink * sink, const gchar * bonds)
 | |
| {
 | |
|   GStrv tokens = NULL;
 | |
|   struct RistAddress *addrs;
 | |
|   gint i;
 | |
| 
 | |
|   if (bonds == NULL)
 | |
|     goto missing_address;
 | |
| 
 | |
|   tokens = g_strsplit (bonds, ",", 0);
 | |
|   if (tokens[0] == NULL)
 | |
|     goto missing_address;
 | |
| 
 | |
|   addrs = g_new0 (struct RistAddress, g_strv_length (tokens));
 | |
| 
 | |
|   /* parse the address list */
 | |
|   for (i = 0; tokens[i]; i++) {
 | |
|     gchar *address = tokens[i];
 | |
|     char *port_ptr, *iface_ptr, *endptr;
 | |
|     guint port;
 | |
| 
 | |
|     port_ptr = g_utf8_strrchr (address, -1, ':');
 | |
|     iface_ptr = g_utf8_strrchr (address, -1, '/');
 | |
| 
 | |
|     if (!port_ptr)
 | |
|       goto bad_parameter;
 | |
|     if (!g_ascii_isdigit (port_ptr[1]))
 | |
|       goto bad_parameter;
 | |
| 
 | |
|     if (iface_ptr) {
 | |
|       if (iface_ptr < port_ptr)
 | |
|         goto bad_parameter;
 | |
|       iface_ptr[0] = '\0';
 | |
|     }
 | |
| 
 | |
|     port = strtol (port_ptr + 1, &endptr, 0);
 | |
|     if (endptr[0] != '\0')
 | |
|       goto bad_parameter;
 | |
| 
 | |
|     /* port must be a multiple of 2 between 2 and 65534 */
 | |
|     if (port < 2 || (port & 1) || port > G_MAXUINT16)
 | |
|       goto invalid_port;
 | |
| 
 | |
|     port_ptr[0] = '\0';
 | |
|     addrs[i].port = port;
 | |
|     addrs[i].address = g_strstrip (address);
 | |
|     if (iface_ptr)
 | |
|       addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
 | |
|   }
 | |
| 
 | |
|   /* configure the bonds */
 | |
|   for (i = 0; tokens[i]; i++) {
 | |
|     RistSenderBond *bond = NULL;
 | |
| 
 | |
|     if (i < sink->bonds->len)
 | |
|       bond = g_ptr_array_index (sink->bonds, i);
 | |
|     else
 | |
|       bond = gst_rist_sink_add_bond (sink);
 | |
| 
 | |
|     gst_rist_sink_update_bond_address (sink, bond, addrs[i].address,
 | |
|         addrs[i].port, addrs[i].multicast_iface);
 | |
|   }
 | |
| 
 | |
|   g_strfreev (tokens);
 | |
|   return;
 | |
| 
 | |
| missing_address:
 | |
|   g_warning ("'bonding-addresses' cannot be empty");
 | |
|   g_strfreev (tokens);
 | |
|   return;
 | |
| 
 | |
| bad_parameter:
 | |
|   g_warning ("Failed to parse address '%s", tokens[i]);
 | |
|   g_strfreev (tokens);
 | |
|   g_free (addrs);
 | |
|   return;
 | |
| 
 | |
| invalid_port:
 | |
|   g_warning ("RIST port must valid UDP port and a multiple of 2.");
 | |
|   g_strfreev (tokens);
 | |
|   g_free (addrs);
 | |
|   return;
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_set_multicast_loopback (GstRistSink * sink, gboolean loop)
 | |
| {
 | |
|   gint i;
 | |
| 
 | |
|   sink->multicast_loopback = loop;
 | |
|   for (i = 0; i < sink->bonds->len; i++) {
 | |
|     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
 | |
|     g_object_set (G_OBJECT (bond->rtp_sink), "loop", loop, NULL);
 | |
|     g_object_set (G_OBJECT (bond->rtcp_sink), "loop", loop, NULL);
 | |
|   }
 | |
| }
 | |
| 
 | |
| /* called with bonds lock */
 | |
| static void
 | |
| gst_rist_sink_set_multicast_ttl (GstRistSink * sink, gint ttl)
 | |
| {
 | |
|   gint i;
 | |
| 
 | |
|   sink->multicast_ttl = ttl;
 | |
|   for (i = 0; i < sink->bonds->len; i++) {
 | |
|     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
 | |
|     g_object_set (G_OBJECT (bond->rtp_sink), "ttl-mc", ttl, NULL);
 | |
|     g_object_set (G_OBJECT (bond->rtcp_sink), "ttl-mc", ttl, NULL);
 | |
|   }
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_get_property (GObject * object, guint prop_id,
 | |
|     GValue * value, GParamSpec * pspec)
 | |
| {
 | |
|   GstRistSink *sink = GST_RIST_SINK (object);
 | |
|   GstStructure *sdes;
 | |
|   RistSenderBond *bond;
 | |
| 
 | |
|   if (sink->construct_failed)
 | |
|     return;
 | |
| 
 | |
|   g_mutex_lock (&sink->bonds_lock);
 | |
| 
 | |
|   bond = g_ptr_array_index (sink->bonds, 0);
 | |
| 
 | |
|   switch (prop_id) {
 | |
|     case PROP_ADDRESS:
 | |
|       g_value_set_string (value, bond->address);
 | |
|       break;
 | |
| 
 | |
|     case PROP_PORT:
 | |
|       g_value_set_uint (value, bond->port);
 | |
|       break;
 | |
| 
 | |
|     case PROP_SENDER_BUFFER:
 | |
|       g_object_get_property (G_OBJECT (bond->rtx_send), "max-size-time", value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_MIN_RTCP_INTERVAL:
 | |
|       g_value_set_uint (value, (guint) (sink->min_rtcp_interval / GST_MSECOND));
 | |
|       break;
 | |
| 
 | |
|     case PROP_MAX_RTCP_BANDWIDTH:
 | |
|       g_value_set_double (value, sink->max_rtcp_bandwidth);
 | |
|       break;
 | |
| 
 | |
|     case PROP_STATS_UPDATE_INTERVAL:
 | |
|       g_value_set_uint (value, sink->stats_interval);
 | |
|       break;
 | |
| 
 | |
|     case PROP_STATS:
 | |
|       g_value_take_boxed (value, gst_rist_sink_create_stats (sink));
 | |
|       break;
 | |
| 
 | |
|     case PROP_CNAME:
 | |
|       g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
 | |
|       g_value_set_string (value, gst_structure_get_string (sdes, "cname"));
 | |
|       gst_structure_free (sdes);
 | |
|       break;
 | |
| 
 | |
|     case PROP_MULTICAST_LOOPBACK:
 | |
|       g_value_set_boolean (value, sink->multicast_loopback);
 | |
|       break;
 | |
| 
 | |
|     case PROP_MULTICAST_IFACE:
 | |
|       g_value_set_string (value, bond->multicast_iface);
 | |
|       break;
 | |
| 
 | |
|     case PROP_MULTICAST_TTL:
 | |
|       g_value_set_int (value, sink->multicast_ttl);
 | |
|       break;
 | |
| 
 | |
|     case PROP_BONDING_ADDRESSES:
 | |
|       g_value_take_string (value, gst_rist_sink_get_bonds (sink));
 | |
|       break;
 | |
| 
 | |
|     case PROP_BONDING_METHOD:
 | |
|       g_value_set_enum (value, sink->bonding_method);
 | |
|       break;
 | |
| 
 | |
|     case PROP_DISPATCHER:
 | |
|       g_value_set_object (value, sink->dispatcher);
 | |
|       break;
 | |
| 
 | |
|     case PROP_DROP_NULL_TS_PACKETS:
 | |
|       g_object_get_property (G_OBJECT (sink->rtpext), "drop-null-ts-packets",
 | |
|           value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_SEQUENCE_NUMBER_EXTENSION:
 | |
|       g_object_get_property (G_OBJECT (sink->rtpext),
 | |
|           "sequence-number-extension", value);
 | |
|       break;
 | |
| 
 | |
|     default:
 | |
|       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   g_mutex_unlock (&sink->bonds_lock);
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_set_property (GObject * object, guint prop_id,
 | |
|     const GValue * value, GParamSpec * pspec)
 | |
| {
 | |
|   GstRistSink *sink = GST_RIST_SINK (object);
 | |
|   GstStructure *sdes;
 | |
|   RistSenderBond *bond;
 | |
| 
 | |
|   if (sink->construct_failed)
 | |
|     return;
 | |
| 
 | |
|   g_mutex_lock (&sink->bonds_lock);
 | |
| 
 | |
|   bond = g_ptr_array_index (sink->bonds, 0);
 | |
| 
 | |
|   switch (prop_id) {
 | |
|     case PROP_ADDRESS:
 | |
|       g_free (bond->address);
 | |
|       bond->address = g_value_dup_string (value);
 | |
|       g_object_set_property (G_OBJECT (bond->rtp_sink), "host", value);
 | |
|       g_object_set_property (G_OBJECT (bond->rtcp_sink), "host", value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_PORT:{
 | |
|       guint port = g_value_get_uint (value);
 | |
| 
 | |
|       /* According to 5.1.1, RTCP receiver port most be event number and RTCP
 | |
|        * port should be the RTP port + 1 */
 | |
| 
 | |
|       if (port & 0x1) {
 | |
|         g_warning ("Invalid RIST port %u, should be an even number.", port);
 | |
|         return;
 | |
|       }
 | |
| 
 | |
|       bond->port = port;
 | |
|       g_object_set (bond->rtp_sink, "port", port, NULL);
 | |
|       g_object_set (bond->rtcp_sink, "port", port + 1, NULL);
 | |
|       break;
 | |
|     }
 | |
| 
 | |
|     case PROP_SENDER_BUFFER:
 | |
|       g_object_set (bond->rtx_send,
 | |
|           "max-size-time", g_value_get_uint (value), NULL);
 | |
|       break;
 | |
| 
 | |
|     case PROP_MIN_RTCP_INTERVAL:
 | |
|       sink->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND;
 | |
|       break;
 | |
| 
 | |
|     case PROP_MAX_RTCP_BANDWIDTH:
 | |
|       sink->max_rtcp_bandwidth = g_value_get_double (value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_STATS_UPDATE_INTERVAL:
 | |
|       sink->stats_interval = g_value_get_uint (value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_CNAME:
 | |
|       g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
 | |
|       gst_structure_set_value (sdes, "cname", value);
 | |
|       g_object_set (sink->rtpbin, "sdes", sdes, NULL);
 | |
|       gst_structure_free (sdes);
 | |
|       break;
 | |
| 
 | |
|     case PROP_MULTICAST_LOOPBACK:
 | |
|       gst_rist_sink_set_multicast_loopback (sink, g_value_get_boolean (value));
 | |
|       break;
 | |
| 
 | |
|     case PROP_MULTICAST_IFACE:
 | |
|       g_free (bond->multicast_iface);
 | |
|       bond->multicast_iface = g_value_dup_string (value);
 | |
|       g_object_set_property (G_OBJECT (bond->rtp_sink),
 | |
|           "multicast-iface", value);
 | |
|       g_object_set_property (G_OBJECT (bond->rtcp_sink),
 | |
|           "multicast-iface", value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_MULTICAST_TTL:
 | |
|       gst_rist_sink_set_multicast_ttl (sink, g_value_get_int (value));
 | |
|       break;
 | |
| 
 | |
|     case PROP_BONDING_ADDRESSES:
 | |
|       gst_rist_sink_set_bonds (sink, g_value_get_string (value));
 | |
|       break;
 | |
| 
 | |
|     case PROP_BONDING_METHOD:
 | |
|       sink->bonding_method = g_value_get_enum (value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_DISPATCHER:
 | |
|       if (sink->dispatcher)
 | |
|         g_object_unref (sink->dispatcher);
 | |
|       sink->dispatcher = g_object_ref_sink (g_value_get_object (value));
 | |
|       break;
 | |
| 
 | |
|     case PROP_DROP_NULL_TS_PACKETS:
 | |
|       g_object_set_property (G_OBJECT (sink->rtpext), "drop-null-ts-packets",
 | |
|           value);
 | |
|       break;
 | |
| 
 | |
|     case PROP_SEQUENCE_NUMBER_EXTENSION:
 | |
|       g_object_set_property (G_OBJECT (sink->rtpext),
 | |
|           "sequence-number-extension", value);
 | |
|       break;
 | |
| 
 | |
|     default:
 | |
|       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 | |
|       break;
 | |
|   }
 | |
| 
 | |
|   g_mutex_unlock (&sink->bonds_lock);
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_finalize (GObject * object)
 | |
| {
 | |
|   GstRistSink *sink = GST_RIST_SINK (object);
 | |
|   gint i;
 | |
| 
 | |
|   g_mutex_lock (&sink->bonds_lock);
 | |
| 
 | |
|   for (i = 0; i < sink->bonds->len; i++) {
 | |
|     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
 | |
|     g_free (bond->address);
 | |
|     g_free (bond->multicast_iface);
 | |
|     g_slice_free (RistSenderBond, bond);
 | |
|   }
 | |
|   g_ptr_array_free (sink->bonds, TRUE);
 | |
| 
 | |
|   g_clear_object (&sink->rtxbin);
 | |
| 
 | |
|   g_mutex_unlock (&sink->bonds_lock);
 | |
|   g_mutex_clear (&sink->bonds_lock);
 | |
| 
 | |
|   G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object);
 | |
| }
 | |
| 
 | |
| static void
 | |
| gst_rist_sink_class_init (GstRistSinkClass * klass)
 | |
| {
 | |
|   GstElementClass *element_class = (GstElementClass *) klass;
 | |
|   GObjectClass *object_class = (GObjectClass *) klass;
 | |
| 
 | |
| 
 | |
|   session_id_quark = g_quark_from_static_string ("gst-rist-sink-session-id");
 | |
| 
 | |
|   gst_element_class_set_metadata (element_class,
 | |
|       "RIST Sink", "Source/Network",
 | |
|       "Sink that implements RIST TR-06-1 streaming specification",
 | |
|       "Nicolas Dufresne <nicolas.dufresne@collabora.com");
 | |
|   gst_element_class_add_static_pad_template (element_class, &sink_templ);
 | |
| 
 | |
|   element_class->change_state = gst_rist_sink_change_state;
 | |
| 
 | |
|   object_class->get_property = gst_rist_sink_get_property;
 | |
|   object_class->set_property = gst_rist_sink_set_property;
 | |
|   object_class->finalize = gst_rist_sink_finalize;
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_ADDRESS,
 | |
|       g_param_spec_string ("address", "Address",
 | |
|           "Address to send packets to (can be IPv4 or IPv6).", "0.0.0.0",
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_PORT,
 | |
|       g_param_spec_uint ("port", "Port", "The port RTP packets will be sent, "
 | |
|           "the RTCP port is this value + 1. This port must be an even number.",
 | |
|           2, 65534, 5004,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_SENDER_BUFFER,
 | |
|       g_param_spec_uint ("sender-buffer", "Sender Buffer",
 | |
|           "Size of the retransmission queue (in ms)", 0, G_MAXUINT, 1200,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL,
 | |
|       g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal",
 | |
|           "The minimum interval (in ms) between two regular successive RTCP "
 | |
|           "packets.", 0, 100, 100,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH,
 | |
|       g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth",
 | |
|           "The maximum bandwidth used for RTCP as a fraction of RTP bandwdith",
 | |
|           0.0, 0.05, 0.05,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL,
 | |
|       g_param_spec_uint ("stats-update-interval", "Statistics Update Interval",
 | |
|           "The interval between 'stats' update notification (in ms) (0 disabled)",
 | |
|           0, G_MAXUINT, 0,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_STATS,
 | |
|       g_param_spec_boxed ("stats", "Statistics",
 | |
|           "Statistic in a GstStructure named 'rist/x-sender-stats'",
 | |
|           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_CNAME,
 | |
|       g_param_spec_string ("cname", "CName",
 | |
|           "Set the CNAME in the SDES block of the sender report.", NULL,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
 | |
|           GST_PARAM_DOC_SHOW_DEFAULT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK,
 | |
|       g_param_spec_boolean ("multicast-loopback", "Multicast Loopback",
 | |
|           "When enabled, the packet will be received locally.", FALSE,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_MULTICAST_IFACE,
 | |
|       g_param_spec_string ("multicast-iface", "multicast-iface",
 | |
|           "The multicast interface to use to send packets.", NULL,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_MULTICAST_TTL,
 | |
|       g_param_spec_int ("multicast-ttl", "Multicast TTL",
 | |
|           "The multicast time-to-live parameter.", 0, 255, 1,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES,
 | |
|       g_param_spec_string ("bonding-addresses", "Bonding Addresses",
 | |
|           "Comma (,) separated list of <address>:<port> to send to. ", NULL,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_BONDING_METHOD,
 | |
|       g_param_spec_enum ("bonding-method", "Bonding Method",
 | |
|           "Defines the bonding method to use.",
 | |
|           gst_rist_bonding_method_get_type (),
 | |
|           GST_RIST_BONDING_METHOD_BROADCAST,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   g_object_class_install_property (object_class, PROP_DISPATCHER,
 | |
|       g_param_spec_object ("dispatcher", "Bonding Dispatcher",
 | |
|           "An element that takes care of multi-plexing bonded links. When set "
 | |
|           "\"bonding-method\" is ignored.",
 | |
|           GST_TYPE_ELEMENT,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
 | |
|           GST_PARAM_MUTABLE_READY));
 | |
|   g_object_class_install_property (object_class, PROP_DROP_NULL_TS_PACKETS,
 | |
|       g_param_spec_boolean ("drop-null-ts-packets", "Drop null TS packets",
 | |
|           "Drop null MPEG-TS packet and replace them with a custom header"
 | |
|           " extension.", FALSE,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
|   g_object_class_install_property (object_class, PROP_SEQUENCE_NUMBER_EXTENSION,
 | |
|       g_param_spec_boolean ("sequence-number-extension",
 | |
|           "Sequence Number Extension",
 | |
|           "Add sequence number extension to packets.", FALSE,
 | |
|           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
 | |
| 
 | |
|   gst_type_mark_as_plugin_api (gst_rist_bonding_method_get_type (), 0);
 | |
| }
 |