diff --git a/gst/rist/gstristsrc.c b/gst/rist/gstristsrc.c index c94d0059d0..6395d37175 100644 --- a/gst/rist/gstristsrc.c +++ b/gst/rist/gstristsrc.c @@ -34,6 +34,17 @@ * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2depay ! udpsink * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700" * ]| + * + * Additionally, this element supports bonding, which means it can receive the + * same stream from multiple addresses. Each address will be mapped to it's + * own RTP session. In order to enable bonding support, one need to configure + * the list of addresses through "bonding-addresses" properties. + * + * ## Example launch line for bonding + * |[ + * gst-launch-1.0 ristsrc bonding-addresses="10.0.0.1:5004,11.0.0.1:5006" ! rtpmp2depay ! udpsink + * gst-play-1.0 "rist://0.0.0.0:5004?bonding-addresses=10.0.0.1:5004,11.0.0.1:5006" + * ]| */ @@ -45,6 +56,9 @@ #include #include +/* for strtol() */ +#include + /* for setsockopt() */ #ifndef G_OS_WIN32 #include @@ -70,7 +84,8 @@ enum PROP_CNAME, PROP_MULTICAST_LOOPBACK, PROP_MULTICAST_IFACE, - PROP_MULTICAST_TTL + PROP_MULTICAST_TTL, + PROP_BONDING_ADDRESSES }; static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src", @@ -78,30 +93,50 @@ static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-rtp")); + +typedef struct +{ + guint session; + gchar *address; + gchar *multicast_iface; + guint port; + + GstElement *rtcp_src; + GstElement *rtp_src; + GstElement *rtcp_sink; + GstElement *rtx_receive; + gulong rtcp_recv_probe; + gulong rtcp_send_probe; + GSocketAddress *rtcp_send_addr; + +} RistReceiverBond; + struct _GstRistSrc { GstBin parent; GstUri *uri; - /* Elements contained in the pipeline, the rtp/rtcp_src are 'udpsrc' */ + /* Common elements in the pipeline */ GstElement *rtpbin; - GstElement *rtp_src; - GstElement *rtcp_src; - GstElement *rtcp_sink; - gulong rtcp_recv_probe; - gulong rtcp_send_probe; - GSocketAddress *rtcp_send_addr; GstPad *srcpad; - gint multicast_ttl; - - /* RTX Elements */ GstElement *rtxbin; - GstElement *rtx_receive; + GstElement *rtx_funnel; - /* For property handling */ + /* Common properties, protected by bonds_lock */ guint reorder_section; guint max_rtx_retries; + GstClockTime min_rtcp_interval; + gdouble max_rtcp_bandwidth; + gint multicast_loopback; + gint multicast_ttl; + + /* Bonds */ + GPtrArray *bonds; + /* this is needed as setting sibling properties will try to take the object + * lock. Thus, any properties that affects the bonds will be protected with + * that lock instead of the object lock. */ + GMutex bonds_lock; /* For stats */ guint stats_interval; @@ -121,6 +156,60 @@ G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN, G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init); GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source")); +/* called with bonds lock */ +static RistReceiverBond * +gst_rist_src_add_bond (GstRistSrc * src) +{ + RistReceiverBond *bond = g_slice_new0 (RistReceiverBond); + GstPad *pad, *gpad; + gchar name[32]; + + bond->session = src->bonds->len; + bond->address = g_strdup ("0.0.0.0"); + + g_snprintf (name, 32, "rist_rtx_receive%u", bond->session); + bond->rtx_receive = gst_element_factory_make ("ristrtxreceive", name); + gst_bin_add (GST_BIN (src->rtxbin), bond->rtx_receive); + + g_snprintf (name, 32, "sink_%u", bond->session); + gst_element_link_pads (bond->rtx_receive, "src", src->rtx_funnel, name); + + g_snprintf (name, 32, "sink_%u", bond->session); + pad = gst_element_get_static_pad (bond->rtx_receive, "sink"); + gpad = gst_ghost_pad_new (name, pad); + gst_object_unref (pad); + gst_element_add_pad (src->rtxbin, gpad); + + g_snprintf (name, 32, "rist_rtp_udpsrc%u", bond->session); + bond->rtp_src = gst_element_factory_make ("udpsrc", name); + g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session); + bond->rtcp_src = gst_element_factory_make ("udpsrc", name); + g_snprintf (name, 32, "rist_rtcp_dynudpsink%u", bond->session); + bond->rtcp_sink = gst_element_factory_make ("dynudpsink", name); + if (!bond->rtp_src || !bond->rtcp_src || !bond->rtcp_sink) { + g_clear_object (&bond->rtp_src); + g_clear_object (&bond->rtcp_src); + g_clear_object (&bond->rtcp_sink); + g_slice_free (RistReceiverBond, bond); + src->missing_plugin = "udp"; + return NULL; + } + gst_bin_add_many (GST_BIN (src), bond->rtp_src, bond->rtcp_src, + bond->rtcp_sink, NULL); + g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL); + gst_element_set_locked_state (bond->rtcp_sink, TRUE); + + g_snprintf (name, 32, "recv_rtp_sink_%u", bond->session); + gst_element_link_pads (bond->rtp_src, "src", src->rtpbin, name); + g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session); + gst_element_link_pads (bond->rtcp_src, "src", src->rtpbin, name); + g_snprintf (name, 32, "send_rtcp_src_%u", bond->session); + gst_element_link_pads (src->rtpbin, name, bond->rtcp_sink, "sink"); + + g_ptr_array_add (src->bonds, bond); + return bond; +} + static void gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin) { @@ -158,9 +247,6 @@ static GstElement * gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id, GstElement * rtpbin) { - if (session_id != 0) - return NULL; - return gst_object_ref (src->rtxbin); } @@ -262,17 +348,17 @@ gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc, GObject *session = NULL; GObject *source = NULL; - if (session_id != 0) - return; - g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session); g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source); - if (ssrc & 1) + if (ssrc & 1) { + GST_DEBUG ("Disabling RTCP and probation on RTX stream " + "(SSRC %u on session %u)", ssrc, session_id); g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL); - else + } else { g_signal_connect (session, "on-sending-nacks", (GCallback) gst_rist_src_on_sending_nacks, NULL); + } g_object_unref (source); g_object_unref (session); @@ -282,19 +368,22 @@ static void gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer, guint session, guint ssrc, GstElement * rtpbin) { - GST_OBJECT_LOCK (src); + if (session != 0) { + GST_WARNING_OBJECT (rtpbin, "Unexpected jitterbuffer created."); + return; + } + g_object_set (jitterbuffer, "rtx-delay", src->reorder_section, "rtx-max-retries", src->max_rtx_retries, NULL); if ((ssrc & 1) == 0) { GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u", session, ssrc); + g_clear_object (&src->jitterbuffer); src->jitterbuffer = gst_object_ref (jitterbuffer); src->rtp_ssrc = ssrc; } - - GST_OBJECT_UNLOCK (src); } static void @@ -302,6 +391,10 @@ gst_rist_src_init (GstRistSrc * src) { GstPad *pad, *gpad; GstStructure *sdes = NULL; + RistReceiverBond *bond; + + g_mutex_init (&src->bonds_lock); + src->bonds = g_ptr_array_new (); /* Construct the RIST RTP receiver pipeline. * @@ -330,7 +423,6 @@ gst_rist_src_init (GstRistSrc * src) g_object_set (src->rtpbin, "do-retransmission", TRUE, "rtp-profile", 3 /* AVPF */ , "sdes", sdes, NULL); - gst_structure_free (sdes); g_signal_connect_swapped (src->rtpbin, "request-pt-map", @@ -340,43 +432,15 @@ gst_rist_src_init (GstRistSrc * src) src->rtxbin = gst_bin_new ("rist_recv_rtxbin"); g_object_ref_sink (src->rtxbin); - src->rtx_receive = gst_element_factory_make ("ristrtxreceive", - "rist_rtx_receive"); - gst_bin_add (GST_BIN (src->rtxbin), src->rtx_receive); - pad = gst_element_get_static_pad (src->rtx_receive, "sink"); - gpad = gst_ghost_pad_new ("sink_0", pad); - gst_object_unref (pad); - gst_element_add_pad (src->rtxbin, gpad); + src->rtx_funnel = gst_element_factory_make ("funnel", "rist_rtx_funnel"); + gst_bin_add (GST_BIN (src->rtxbin), src->rtx_funnel); - pad = gst_element_get_static_pad (src->rtx_receive, "src"); + pad = gst_element_get_static_pad (src->rtx_funnel, "src"); gpad = gst_ghost_pad_new ("src_0", pad); gst_object_unref (pad); gst_element_add_pad (src->rtxbin, gpad); - src->rtp_src = gst_element_factory_make ("udpsrc", "rist_rtp_udpsrc"); - src->rtcp_src = gst_element_factory_make ("udpsrc", "rist_rtcp_udpsrc"); - src->rtcp_sink = - gst_element_factory_make ("dynudpsink", "rist_rtcp_dynudpsink"); - if (!src->rtp_src || !src->rtcp_src || !src->rtcp_sink) { - g_clear_object (&src->rtp_src); - g_clear_object (&src->rtcp_src); - g_clear_object (&src->rtcp_sink); - src->missing_plugin = "udp"; - goto missing_plugin; - } - gst_bin_add_many (GST_BIN (src), src->rtp_src, src->rtcp_src, - src->rtcp_sink, NULL); - g_object_set (src->rtcp_sink, "sync", FALSE, "async", FALSE, NULL); - /* delay udpsink startup, we will give it the socket from the RTCP udpsrc, - * but socket can only be set in NULL state */ - gst_element_set_locked_state (src->rtcp_sink, TRUE); - - gst_element_link_pads (src->rtp_src, "src", src->rtpbin, "recv_rtp_sink_0"); - gst_element_link_pads (src->rtcp_src, "src", src->rtpbin, "recv_rtcp_sink_0"); - gst_element_link_pads (src->rtpbin, "send_rtcp_src_0", - src->rtcp_sink, "sink"); - g_signal_connect_swapped (src->rtpbin, "pad-added", G_CALLBACK (gst_rist_src_pad_added), src); g_signal_connect_swapped (src->rtpbin, "on-new-ssrc", @@ -384,6 +448,10 @@ gst_rist_src_init (GstRistSrc * src) g_signal_connect_swapped (src->rtpbin, "new-jitterbuffer", G_CALLBACK (gst_rist_src_new_jitterbuffer), src); + bond = gst_rist_src_add_bond (src); + if (!bond) + goto missing_plugin; + return; missing_plugin: @@ -400,6 +468,28 @@ gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info, GstRistSrc *src = GST_RIST_SRC (user_data); GstBuffer *buffer; GstNetAddressMeta *meta; + GstElement *rtcp_src; + RistReceiverBond *bond = NULL; + gint i; + + rtcp_src = GST_ELEMENT (gst_pad_get_parent (pad)); + + g_mutex_lock (&src->bonds_lock); + + for (i = 0; i < src->bonds->len; i++) { + RistReceiverBond *b = g_ptr_array_index (src->bonds, i); + if (b->rtcp_src == rtcp_src) { + bond = b; + break; + } + } + gst_object_unref (rtcp_src); + + if (!bond) { + GST_WARNING_OBJECT (src, "Unexpected RTCP source."); + g_mutex_unlock (&src->bonds_lock); + return GST_PAD_PROBE_OK; + } if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { GstBufferList *buffer_list = info->data; @@ -410,21 +500,21 @@ gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info, meta = gst_buffer_get_net_address_meta (buffer); - GST_OBJECT_LOCK (src); - g_clear_object (&src->rtcp_send_addr); - src->rtcp_send_addr = g_object_ref (meta->addr); - GST_OBJECT_UNLOCK (src); + g_clear_object (&bond->rtcp_send_addr); + bond->rtcp_send_addr = g_object_ref (meta->addr); + + g_mutex_unlock (&src->bonds_lock); return GST_PAD_PROBE_OK; } +/* called with bonds lock */ static inline void -gst_rist_src_attach_net_address_meta (GstRistSrc * src, GstBuffer * buffer) +gst_rist_src_attach_net_address_meta (RistReceiverBond * bond, + GstBuffer * buffer) { - GST_OBJECT_LOCK (src); - if (src->rtcp_send_addr) - gst_buffer_add_net_address_meta (buffer, src->rtcp_send_addr); - GST_OBJECT_UNLOCK (src); + if (bond->rtcp_send_addr) + gst_buffer_add_net_address_meta (buffer, bond->rtcp_send_addr); } static GstPadProbeReturn @@ -432,6 +522,28 @@ gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstRistSrc *src = GST_RIST_SRC (user_data); + GstElement *rtcp_sink; + RistReceiverBond *bond = NULL; + gint i; + + rtcp_sink = GST_ELEMENT (gst_pad_get_parent (pad)); + + g_mutex_lock (&src->bonds_lock); + + for (i = 0; i < src->bonds->len; i++) { + RistReceiverBond *b = g_ptr_array_index (src->bonds, i); + if (b->rtcp_sink == rtcp_sink) { + bond = b; + break; + } + } + gst_object_unref (rtcp_sink); + + if (!bond) { + GST_WARNING_OBJECT (src, "Unexpected RTCP sink."); + g_mutex_unlock (&src->bonds_lock); + return GST_PAD_PROBE_OK; + } if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) { GstBufferList *buffer_list = info->data; @@ -441,25 +553,68 @@ gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info, info->data = buffer_list = gst_buffer_list_make_writable (buffer_list); for (i = 0; i < gst_buffer_list_length (buffer_list); i++) { buffer = gst_buffer_list_get (buffer_list, i); - gst_rist_src_attach_net_address_meta (src, buffer); + gst_rist_src_attach_net_address_meta (bond, buffer); } } else { GstBuffer *buffer = info->data; info->data = buffer = gst_buffer_make_writable (buffer); - gst_rist_src_attach_net_address_meta (src, buffer); + gst_rist_src_attach_net_address_meta (bond, buffer); } + g_mutex_unlock (&src->bonds_lock); + return GST_PAD_PROBE_OK; } +static gboolean +gst_rist_src_setup_rtcp_socket (GstRistSrc * src, RistReceiverBond * bond) +{ + GstPad *pad; + GSocket *socket = NULL; + GInetAddress *iaddr = NULL; + guint port = bond->port + 1; + + g_object_get (bond->rtcp_src, "used-socket", &socket, NULL); + if (!socket) + return GST_STATE_CHANGE_FAILURE; + + iaddr = g_inet_address_new_from_string (bond->address); + + if (g_inet_address_get_is_multicast (iaddr)) { + /* mc-ttl is not supported by dynudpsink */ + g_socket_set_multicast_ttl (socket, src->multicast_ttl); + /* In multicast, send RTCP to the multicast group */ + bond->rtcp_send_addr = g_inet_socket_address_new (iaddr, port); + } else { + /* In unicast, send RTCP to the detected sender address */ + pad = gst_element_get_static_pad (bond->rtcp_src, "src"); + bond->rtcp_recv_probe = gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, + gst_rist_src_on_recv_rtcp, src, NULL); + gst_object_unref (pad); + } + g_object_unref (iaddr); + + pad = gst_element_get_static_pad (bond->rtcp_sink, "sink"); + bond->rtcp_send_probe = gst_pad_add_probe (pad, + GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, + gst_rist_src_on_send_rtcp, src, NULL); + gst_object_unref (pad); + + /* share the socket created by the source */ + g_object_set (bond->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL); + g_object_unref (socket); + + gst_element_set_locked_state (bond->rtcp_sink, FALSE); + gst_element_sync_state_with_parent (bond->rtcp_sink); + + return GST_STATE_CHANGE_SUCCESS; +} + static GstStateChangeReturn gst_rist_src_start (GstRistSrc * src) { - GstPad *pad; - GSocket *socket = NULL; - gchar *address; - guint rtcp_port; - GInetAddress *iaddr; + gint i; if (src->construct_failed) { GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN, @@ -468,39 +623,18 @@ gst_rist_src_start (GstRistSrc * src) return GST_STATE_CHANGE_FAILURE; } - g_object_get (src->rtcp_src, "used-socket", &socket, - "address", &address, "port", &rtcp_port, NULL); + for (i = 0; i < src->bonds->len; i++) { + RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); + GObject *session = NULL; - iaddr = g_inet_address_new_from_string (address); - g_free (address); + g_signal_emit_by_name (src->rtpbin, "get-session", i, &session); + g_object_set (session, "rtcp-min-interval", src->min_rtcp_interval, + "rtcp-fraction", src->max_rtcp_bandwidth, NULL); + g_object_unref (session); - if (g_inet_address_get_is_multicast (iaddr)) { - /* mc-ttl is not supported by dynudpsink */ - g_socket_set_multicast_ttl (socket, src->multicast_ttl); - /* In multicast, send RTCP to the multicast group */ - src->rtcp_send_addr = g_inet_socket_address_new (iaddr, rtcp_port); - } else { - /* In unicast, send RTCP to the detected sender address */ - pad = gst_element_get_static_pad (src->rtcp_src, "src"); - src->rtcp_recv_probe = gst_pad_add_probe (pad, - GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, - gst_rist_src_on_recv_rtcp, src, NULL); - gst_object_unref (pad); + if (!gst_rist_src_setup_rtcp_socket (src, bond)) + return GST_STATE_CHANGE_FAILURE; } - g_object_unref (iaddr); - - pad = gst_element_get_static_pad (src->rtcp_sink, "sink"); - src->rtcp_send_probe = gst_pad_add_probe (pad, - GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, - gst_rist_src_on_send_rtcp, src, NULL); - gst_object_unref (pad); - - /* share the socket created by the source */ - g_object_set (src->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL); - g_object_unref (socket); - - gst_element_set_locked_state (src->rtcp_sink, FALSE); - gst_element_sync_state_with_parent (src->rtcp_sink); return GST_STATE_CHANGE_SUCCESS; } @@ -599,18 +733,25 @@ static void gst_rist_src_stop (GstRistSrc * src) { GstPad *pad; + gint i; - if (src->rtcp_recv_probe) { - pad = gst_element_get_static_pad (src->rtcp_src, "src"); - gst_pad_remove_probe (pad, src->rtcp_recv_probe); - src->rtcp_recv_probe = 0; - gst_object_unref (pad); + for (i = 0; i < src->bonds->len; i++) { + RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); + + if (bond->rtcp_recv_probe) { + pad = gst_element_get_static_pad (bond->rtcp_src, "src"); + gst_pad_remove_probe (pad, bond->rtcp_recv_probe); + bond->rtcp_recv_probe = 0; + gst_object_unref (pad); + } + + if (bond->rtcp_send_probe) { + pad = gst_element_get_static_pad (bond->rtcp_sink, "sink"); + gst_pad_remove_probe (pad, bond->rtcp_send_probe); + bond->rtcp_send_probe = 0; + gst_object_unref (pad); + } } - - pad = gst_element_get_static_pad (src->rtcp_sink, "sink"); - gst_pad_remove_probe (pad, src->rtcp_send_probe); - src->rtcp_send_probe = 0; - gst_object_unref (pad); } static GstStateChangeReturn @@ -647,25 +788,178 @@ gst_rist_src_change_state (GstElement * element, GstStateChange transition) return ret; } +/* called with bonds lock */ +static void +gst_rist_src_update_bond_address (GstRistSrc * src, RistReceiverBond * bond, + const gchar * address, guint port, const gchar * multicast_iface) +{ + g_free (bond->address); + g_free (bond->multicast_iface); + bond->address = g_strdup (address); + bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL; + bond->port = port; + + g_object_set (G_OBJECT (bond->rtp_src), "address", address, "port", port, + "multicast-iface", bond->multicast_iface, NULL); + g_object_set (G_OBJECT (bond->rtcp_src), "address", address, + "port", port + 1, "multicast-iface", bond->multicast_iface, NULL); + + /* TODO add runtime support + * - add blocking the pad probe + * - update RTCP socket + * - cycle elements through NULL state + */ +} + +/* called with bonds lock */ +static gchar * +gst_rist_src_get_bonds (GstRistSrc * src) +{ + GString *bonds = g_string_new (""); + gint i; + + for (i = 0; i < src->bonds->len; i++) { + RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); + if (bonds->len > 0) + g_string_append_c (bonds, ':'); + + g_string_append_printf (bonds, "%s:%u", bond->address, bond->port); + + if (bond->multicast_iface) + g_string_append_printf (bonds, "/%s", bond->multicast_iface); + } + + return g_string_free (bonds, FALSE); +} + +struct RistAddress +{ + gchar *address; + char *multicast_iface; + guint port; +}; + +/* called with bonds lock */ +static void +gst_rist_src_set_bonds (GstRistSrc * src, const gchar * bonds) +{ + GStrv tokens = NULL; + struct RistAddress *addrs; + gint i; + + if (bonds == NULL) + goto missing_address; + + tokens = g_strsplit (bonds, ",", 0); + if (tokens[0] == NULL) + goto missing_address; + + addrs = g_new0 (struct RistAddress, g_strv_length (tokens)); + + /* parse the address list */ + for (i = 0; tokens[i]; i++) { + gchar *address = tokens[i]; + char *port_ptr, *iface_ptr, *endptr; + guint port; + + port_ptr = g_utf8_strrchr (address, -1, ':'); + iface_ptr = g_utf8_strrchr (address, -1, '/'); + + if (!port_ptr) + goto bad_parameter; + if (!g_ascii_isdigit (port_ptr[1])) + goto bad_parameter; + + if (iface_ptr) { + if (iface_ptr < port_ptr) + goto bad_parameter; + iface_ptr[0] = '\0'; + } + + port = strtol (port_ptr + 1, &endptr, 0); + if (endptr[0] != '\0') + goto bad_parameter; + + /* port must be a multiple of 2 between 2 and 65534 */ + if (port < 2 || (port & 1) || port > G_MAXUINT16) + goto invalid_port; + + port_ptr[0] = '\0'; + addrs[i].port = port; + addrs[i].address = g_strstrip (address); + if (iface_ptr) + addrs[i].multicast_iface = g_strstrip (iface_ptr + 1); + } + + /* configure the bonds */ + for (i = 0; tokens[i]; i++) { + RistReceiverBond *bond = NULL; + + if (i < src->bonds->len) + bond = g_ptr_array_index (src->bonds, i); + else + bond = gst_rist_src_add_bond (src); + + gst_rist_src_update_bond_address (src, bond, addrs[i].address, + addrs[i].port, addrs[i].multicast_iface); + } + + g_strfreev (tokens); + return; + +missing_address: + g_warning ("'bonding-addresses' cannot be empty"); + g_strfreev (tokens); + return; + +bad_parameter: + g_warning ("Failed to parse address '%s", tokens[i]); + g_strfreev (tokens); + g_free (addrs); + return; + +invalid_port: + g_warning ("RIST port must valid UDP port and a multiple of 2."); + g_strfreev (tokens); + g_free (addrs); + return; +} + +static void +gst_rist_src_set_multicast_loopback (GstRistSrc * src, gboolean loop) +{ + gint i; + + src->multicast_loopback = loop; + for (i = 0; i < src->bonds->len; i++) { + RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); + g_object_set (G_OBJECT (bond->rtp_src), "loop", loop, NULL); + g_object_set (G_OBJECT (bond->rtcp_src), "loop", loop, NULL); + } +} + static void gst_rist_src_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstRistSrc *src = GST_RIST_SRC (object); - GstElement *session = NULL; - GstClockTime interval; GstStructure *sdes; + RistReceiverBond *bond; if (src->construct_failed) return; + g_mutex_lock (&src->bonds_lock); + + bond = g_ptr_array_index (src->bonds, 0); + switch (prop_id) { case PROP_ADDRESS: - g_object_get_property (G_OBJECT (src->rtp_src), "address", value); + g_value_set_string (value, bond->address); break; case PROP_PORT: - g_object_get_property (G_OBJECT (src->rtp_src), "port", value); + g_value_set_uint (value, bond->port); break; case PROP_RECEIVER_BUFFER: @@ -673,28 +967,19 @@ gst_rist_src_get_property (GObject * object, guint prop_id, break; case PROP_REORDER_SECTION: - GST_OBJECT_LOCK (src); g_value_set_uint (value, src->reorder_section); - GST_OBJECT_UNLOCK (src); break; case PROP_MAX_RTX_RETRIES: - GST_OBJECT_LOCK (src); g_value_set_uint (value, src->max_rtx_retries); - GST_OBJECT_UNLOCK (src); break; case PROP_MIN_RTCP_INTERVAL: - g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); - g_object_get (session, "rtcp-min-interval", &interval, NULL); - g_value_set_uint (value, (guint) (interval / GST_MSECOND)); - g_object_unref (session); + g_value_set_uint (value, (guint) (src->min_rtcp_interval / GST_MSECOND)); break; case PROP_MAX_RTCP_BANDWIDTH: - g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); - g_object_get_property (G_OBJECT (session), "rtcp-fraction", value); - g_object_unref (session); + g_value_set_double (value, src->max_rtcp_bandwidth); break; case PROP_STATS_UPDATE_INTERVAL: @@ -712,21 +997,27 @@ gst_rist_src_get_property (GObject * object, guint prop_id, break; case PROP_MULTICAST_LOOPBACK: - g_object_get_property (G_OBJECT (src->rtp_src), "loop", value); + g_value_set_boolean (value, src->multicast_loopback); break; case PROP_MULTICAST_IFACE: - g_object_get_property (G_OBJECT (src->rtp_src), "multicast-iface", value); + g_value_set_string (value, bond->multicast_iface); break; case PROP_MULTICAST_TTL: g_value_set_int (value, src->multicast_ttl); break; + case PROP_BONDING_ADDRESSES: + g_value_take_string (value, gst_rist_src_get_bonds (src)); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + g_mutex_unlock (&src->bonds_lock); } static void @@ -734,16 +1025,22 @@ gst_rist_src_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstRistSrc *src = GST_RIST_SRC (object); - GstElement *session = NULL; GstStructure *sdes; + RistReceiverBond *bond; if (src->construct_failed) return; + g_mutex_lock (&src->bonds_lock); + + bond = g_ptr_array_index (src->bonds, 0); + switch (prop_id) { case PROP_ADDRESS: - g_object_set_property (G_OBJECT (src->rtp_src), "address", value); - g_object_set_property (G_OBJECT (src->rtcp_src), "address", value); + g_free (bond->address); + bond->address = g_value_dup_string (value); + g_object_set_property (G_OBJECT (bond->rtp_src), "address", value); + g_object_set_property (G_OBJECT (bond->rtcp_src), "address", value); break; case PROP_PORT:{ @@ -757,8 +1054,9 @@ gst_rist_src_set_property (GObject * object, guint prop_id, return; } - g_object_set (src->rtp_src, "port", port, NULL); - g_object_set (src->rtcp_src, "port", port + 1, NULL); + bond->port = port; + g_object_set (bond->rtp_src, "port", port, NULL); + g_object_set (bond->rtcp_src, "port", port + 1, NULL); break; } @@ -767,28 +1065,19 @@ gst_rist_src_set_property (GObject * object, guint prop_id, break; case PROP_REORDER_SECTION: - GST_OBJECT_LOCK (src); src->reorder_section = g_value_get_uint (value); - GST_OBJECT_UNLOCK (src); break; case PROP_MAX_RTX_RETRIES: - GST_OBJECT_LOCK (src); src->max_rtx_retries = g_value_get_uint (value); - GST_OBJECT_UNLOCK (src); break; case PROP_MIN_RTCP_INTERVAL: - g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); - g_object_set (session, "rtcp-min-interval", - g_value_get_uint (value) * GST_MSECOND, NULL); - g_object_unref (session); + src->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND; break; case PROP_MAX_RTCP_BANDWIDTH: - g_signal_emit_by_name (src->rtpbin, "get-session", 0, &session); - g_object_set (session, "rtcp-fraction", g_value_get_double (value), NULL); - g_object_unref (session); + src->max_rtcp_bandwidth = g_value_get_double (value); break; case PROP_STATS_UPDATE_INTERVAL: @@ -803,13 +1092,15 @@ gst_rist_src_set_property (GObject * object, guint prop_id, break; case PROP_MULTICAST_LOOPBACK: - g_object_set_property (G_OBJECT (src->rtp_src), "loop", value); - g_object_set_property (G_OBJECT (src->rtcp_src), "loop", value); + gst_rist_src_set_multicast_loopback (src, g_value_get_boolean (value)); break; case PROP_MULTICAST_IFACE: - g_object_set_property (G_OBJECT (src->rtp_src), "multicast-iface", value); - g_object_set_property (G_OBJECT (src->rtcp_src), + g_free (bond->multicast_iface); + bond->multicast_iface = g_value_dup_string (value); + g_object_set_property (G_OBJECT (bond->rtp_src), + "multicast-iface", value); + g_object_set_property (G_OBJECT (bond->rtcp_src), "multicast-iface", value); break; @@ -817,22 +1108,41 @@ gst_rist_src_set_property (GObject * object, guint prop_id, src->multicast_ttl = g_value_get_int (value); break; + case PROP_BONDING_ADDRESSES: + gst_rist_src_set_bonds (src, g_value_get_string (value)); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } + + g_mutex_unlock (&src->bonds_lock); } static void gst_rist_src_finalize (GObject * object) { GstRistSrc *src = GST_RIST_SRC (object); + gint i; + + g_mutex_lock (&src->bonds_lock); + + for (i = 0; i < src->bonds->len; i++) { + RistReceiverBond *bond = g_ptr_array_index (src->bonds, i); + g_free (bond->address); + g_free (bond->multicast_iface); + g_slice_free (RistReceiverBond, bond); + } + g_ptr_array_free (src->bonds, TRUE); if (src->jitterbuffer) gst_object_unref (src->jitterbuffer); - gst_object_unref (src->rtxbin); + g_mutex_unlock (&src->bonds_lock); + g_mutex_clear (&src->bonds_lock); + G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object); } @@ -925,6 +1235,12 @@ gst_rist_src_class_init (GstRistSrcClass * klass) g_param_spec_int ("multicast-ttl", "Multicast TTL", "The multicast time-to-live parameter.", 0, 255, 1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT)); + + g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES, + g_param_spec_string ("bonding-addresses", "Bonding Addresses", + "Comma (,) seperated list of
: to receive from. " + "Only used if 'enale-bonding' is set.", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static GstURIType