diff --git a/tests/check/elements/rtpaux.c b/tests/check/elements/rtpaux.c index 0fe5585a90..1f410bf22f 100644 --- a/tests/check/elements/rtpaux.c +++ b/tests/check/elements/rtpaux.c @@ -24,7 +24,8 @@ #include #include -static GMainLoop *main_loop; +static gboolean send_pipeline_eos = FALSE; +static gboolean receive_pipeline_eos = FALSE; static void message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) @@ -34,7 +35,15 @@ message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) switch (message->type) { case GST_MESSAGE_EOS: - g_main_loop_quit (main_loop); + if (!strcmp ("pipeline_send", + GST_OBJECT_NAME (GST_MESSAGE_SRC (message)))) + send_pipeline_eos = TRUE; + else if (!strcmp ("pipeline_receive", + GST_OBJECT_NAME (GST_MESSAGE_SRC (message)))) + receive_pipeline_eos = TRUE; + else + fail ("Unknown pipeline: %s", + GST_OBJECT_NAME (GST_MESSAGE_SRC (message))); break; case GST_MESSAGE_WARNING:{ GError *gerror; @@ -54,7 +63,7 @@ message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); g_error_free (gerror); g_free (debug); - g_main_loop_quit (main_loop); + fail ("Error!"); break; } default: @@ -334,7 +343,6 @@ GST_START_TEST (test_simple_rtpbin_aux) recvrtcp_udpsink, "sink", GST_PAD_LINK_CHECK_NOTHING); fail_unless (res == TRUE, NULL); - main_loop = g_main_loop_new (NULL, FALSE); g_signal_connect (bussend, "message::error", (GCallback) message_received, binsend); g_signal_connect (bussend, "message::warning", (GCallback) message_received, @@ -359,8 +367,8 @@ GST_START_TEST (test_simple_rtpbin_aux) g_timeout_add (5000, on_timeout, binreceive); GST_INFO ("enter mainloop"); - g_main_loop_run (main_loop); - g_main_loop_run (main_loop); + while (!send_pipeline_eos && !receive_pipeline_eos) + g_main_context_iteration (NULL, TRUE); GST_INFO ("exit mainloop"); /* check that FB NACK is working */ @@ -381,8 +389,6 @@ GST_START_TEST (test_simple_rtpbin_aux) fail_if (nb_rtx_recv_packets < 1); /* cleanup */ - g_main_loop_unref (main_loop); - gst_bus_remove_signal_watch (bussend); gst_object_unref (bussend); gst_object_unref (binsend); diff --git a/tests/check/elements/rtprtx.c b/tests/check/elements/rtprtx.c index 40e4a9527e..9c6206a1fb 100644 --- a/tests/check/elements/rtprtx.c +++ b/tests/check/elements/rtprtx.c @@ -31,8 +31,6 @@ static GstPad *srcpad, *sinkpad; /* we also have a list of src buffers */ static GList *inbuffers = NULL; -static GMainLoop *main_loop; - #define RTP_CAPS_STRING \ "application/x-rtp, " \ "media = (string)audio, " \ @@ -256,14 +254,14 @@ GST_START_TEST (test_push_forward_seq) GST_END_TEST; static void -message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) +message_received (GstBus * bus, GstMessage * message, gboolean * eos) { GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT, GST_MESSAGE_SRC (message), message); switch (message->type) { case GST_MESSAGE_EOS: - g_main_loop_quit (main_loop); + *eos = TRUE; break; case GST_MESSAGE_WARNING:{ GError *gerror; @@ -281,9 +279,9 @@ message_received (GstBus * bus, GstMessage * message, GstPipeline * bin) gst_message_parse_error (message, &gerror, &debug); gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); + fail ("Error: %s / %s", gerror->message, debug); g_error_free (gerror); g_free (debug); - g_main_loop_quit (main_loop); break; } default: @@ -392,7 +390,7 @@ static void start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader, GstElement * rtprtxsend, GstElement * rtprtxreceive, RTXSendData * send_rtxdata, RTXReceiveData * receive_rtxdata, - guint drop_every_n_packets) + guint drop_every_n_packets, gboolean * eos) { GstStateChangeReturn state_res = GST_STATE_CHANGE_FAILURE; guint nbrtxrequests = 0; @@ -439,7 +437,8 @@ start_test_drop_and_check_results (GstElement * bin, GstElement * rtppayloader, ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); GST_INFO ("running main loop"); - g_main_loop_run (main_loop); + while (!*eos) + g_main_context_iteration (NULL, TRUE); /* check results */ @@ -495,6 +494,7 @@ GST_START_TEST (test_drop_one_sender) guint drop_every_n_packets = 0; RTXSendData send_rtxdata; RTXReceiveData receive_rtxdata; + gboolean eos = FALSE; GST_INFO ("preparing test"); @@ -547,19 +547,19 @@ GST_START_TEST (test_drop_one_sender) gst_object_unref (sinkpad); gst_object_unref (srcpad); - main_loop = g_main_loop_new (NULL, FALSE); - g_signal_connect (bus, "message::error", (GCallback) message_received, bin); - g_signal_connect (bus, "message::warning", (GCallback) message_received, bin); - g_signal_connect (bus, "message::eos", (GCallback) message_received, bin); + g_signal_connect (bus, "message::error", (GCallback) message_received, NULL); + g_signal_connect (bus, "message::warning", (GCallback) message_received, + NULL); + g_signal_connect (bus, "message::eos", (GCallback) message_received, &eos); for (drop_every_n_packets = 2; drop_every_n_packets < 10; drop_every_n_packets++) { start_test_drop_and_check_results (bin, rtppayloader, rtprtxsend, - rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets); + rtprtxreceive, &send_rtxdata, &receive_rtxdata, drop_every_n_packets, + &eos); } /* cleanup */ - g_main_loop_unref (main_loop); gst_consistency_checker_free (chk_1); gst_consistency_checker_free (chk_2); gst_consistency_checker_free (chk_3); @@ -570,21 +570,13 @@ GST_START_TEST (test_drop_one_sender) GST_END_TEST; -GMutex lock_eos; -static gint nb_sources; -static gint nb_eos; - static void -message_received_multiple (GstBus * bus, GstMessage * message, - GstPipeline * bin) +message_received_multiple (GstBus * bus, GstMessage * message, gpointer data) { GST_INFO ("bus message from \"%" GST_PTR_FORMAT "\": %" GST_PTR_FORMAT, GST_MESSAGE_SRC (message), message); switch (message->type) { - case GST_MESSAGE_EOS: - g_main_loop_quit (main_loop); - break; case GST_MESSAGE_WARNING:{ GError *gerror; gchar *debug; @@ -601,9 +593,9 @@ message_received_multiple (GstBus * bus, GstMessage * message, gst_message_parse_error (message, &gerror, &debug); gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug); + fail ("Error: %s / %s", gerror->message, debug); g_error_free (gerror); g_free (debug); - g_main_loop_quit (main_loop); break; } default: @@ -617,6 +609,7 @@ typedef struct guint nb_packets; guint drop_every_n_packets; guint payload_type_master; + guint total_packets; } RTXSendMultipleData; /* drop some packets */ @@ -640,7 +633,11 @@ rtprtxsend_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, /* count packets of the main stream */ ++rtxdata->nb_packets; /* drop some packets */ - if (rtxdata->count < rtxdata->drop_every_n_packets) { + /* but make sure we never drop the last one, otherwise there + * will be nothing to trigger a retransmission. + */ + if (rtxdata->count < rtxdata->drop_every_n_packets || + rtxdata->nb_packets == rtxdata->total_packets) { ++rtxdata->count; } else { /* drop a packet every 'rtxdata->count' packets */ @@ -659,33 +656,37 @@ rtprtxsend_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, /* make sure every sources has sent all their buffers */ static GstPadProbeReturn -source_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, +source_srcpad_probe_multiple_drop_eos (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { - GstPadProbeReturn ret = GST_PAD_PROBE_OK; + GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); - if (info->type == - (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_PUSH)) { - GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); - if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { - g_mutex_lock (&lock_eos); - ++nb_eos; - if (nb_eos < nb_sources) - ret = GST_PAD_PROBE_DROP; - g_mutex_unlock (&lock_eos); - } - } - - return ret; + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) + return GST_PAD_PROBE_DROP; + else + return GST_PAD_PROBE_OK; } +typedef struct +{ + GHashTable *ssrc_to_nb_packets_map; + GHashTable *ssrc_to_seqnum_offset_map; + guint seqnum_offset; + + gint to_send; + volatile gint dropped_requests; + volatile gint received; + gboolean request_passed; +} RTXReceiveMultipleData; + /* add one branch videotestsrc ! rtpvrawpay ! rtprtxsend ! queue ! funnel. */ static RTXSendMultipleData * add_sender (GstElement * bin, const gchar * src_name, const gchar * payloader_name, guint payload_type_master, - guint payload_type_aux) + guint payload_type_aux, RTXReceiveMultipleData * rtxdata) { GstElement *src = NULL; + GstCaps *caps; GstElement *rtppayloader = NULL; GstElement *rtprtxsend = NULL; GstElement *queue = NULL; @@ -700,6 +701,8 @@ add_sender (GstElement * bin, const gchar * src_name, send_rtxdata->nb_packets = 0; send_rtxdata->drop_every_n_packets = 0; send_rtxdata->payload_type_master = payload_type_master; + send_rtxdata->total_packets = 25; + rtxdata->to_send += send_rtxdata->total_packets; src = gst_element_factory_make (src_name, NULL); rtppayloader = gst_element_factory_make (payloader_name, NULL); @@ -712,7 +715,8 @@ add_sender (GstElement * bin, const gchar * src_name, pt_master, G_TYPE_UINT, payload_type_aux, NULL); g_free (pt_master); - g_object_set (src, "num-buffers", 25, NULL); + g_object_set (src, "num-buffers", send_rtxdata->total_packets, NULL); + g_object_set (src, "is-live", TRUE, NULL); g_object_set (rtppayloader, "pt", payload_type_master, NULL); g_object_set (rtppayloader, "seqnum-offset", 1, NULL); g_object_set (rtprtxsend, "payload-type-map", pt_map, NULL); @@ -724,7 +728,11 @@ add_sender (GstElement * bin, const gchar * src_name, gst_bin_add_many (GST_BIN (bin), src, rtppayloader, rtprtxsend, queue, NULL); - res = gst_element_link (src, rtppayloader); + /* Make sure we have one buffer per frame, makes it easier to count! */ + caps = + gst_caps_from_string ("video/x-raw, width=20, height=10, framerate=30/1"); + res = gst_element_link_filtered (src, rtppayloader, caps); + gst_caps_unref (caps); fail_unless (res == TRUE, NULL); res = gst_element_link (rtppayloader, rtprtxsend); fail_unless (res == TRUE, NULL); @@ -743,23 +751,36 @@ add_sender (GstElement * bin, const gchar * src_name, gst_object_unref (srcpad); /* to make sure every sources has sent all their buffers */ - srcpad = gst_element_get_static_pad (queue, "src"); - gst_pad_add_probe (srcpad, - (GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_PUSH), - (GstPadProbeCallback) source_srcpad_probe_multiple, NULL, NULL); + srcpad = gst_element_get_static_pad (src, "src"); + gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, + (GstPadProbeCallback) source_srcpad_probe_multiple_drop_eos, NULL, NULL); gst_object_unref (srcpad); - ++nb_sources; - return send_rtxdata; } -typedef struct +static GstPadProbeReturn +rtprtxreceive_sinkpad_probe_check_drop (GstPad * pad, GstPadProbeInfo * info, + gpointer user_data) { - GHashTable *ssrc_to_nb_packets_map; - GHashTable *ssrc_to_seqnum_offset_map; - guint seqnum_offset; -} RTXReceiveMultipleData; + GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); + RTXReceiveMultipleData *rtxdata = (RTXReceiveMultipleData *) user_data; + + if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_UPSTREAM && + gst_event_get_structure (event) != NULL && + gst_structure_has_name (gst_event_get_structure (event), + "GstRTPRetransmissionRequest")) + rtxdata->request_passed = TRUE; + + return GST_PAD_PROBE_OK; +} + +static gboolean +check_finished (RTXReceiveMultipleData * rtxdata) +{ + return (g_atomic_int_get (&rtxdata->received) >= (rtxdata->to_send - + g_atomic_int_get (&rtxdata->dropped_requests))); +} static GstPadProbeReturn rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, @@ -778,6 +799,10 @@ rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, ssrc = gst_rtp_buffer_get_ssrc (&rtp); seqnum = gst_rtp_buffer_get_seq (&rtp); + g_atomic_int_inc (&rtxdata->received); + if (check_finished (rtxdata)) + g_main_context_wakeup (NULL); + if (!g_hash_table_lookup_extended (rtxdata->ssrc_to_seqnum_offset_map, GUINT_TO_POINTER (ssrc), NULL, &seqnum_prev)) { /*In our test we take care to never drop the first buffer */ @@ -789,6 +814,7 @@ rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, return GST_PAD_PROBE_OK; } + /* check if there is a dropped packet * (in our test every packet arrived in increasing order) */ if (seqnum > GPOINTER_TO_UINT (seqnum_prev) + rtxdata->seqnum_offset) { @@ -805,7 +831,13 @@ rtprtxreceive_srcpad_probe_multiple (GstPad * pad, GstPadProbeInfo * info, "payload-type", G_TYPE_UINT, gst_rtp_buffer_get_payload_type (&rtp), NULL)); + rtxdata->request_passed = FALSE; gst_pad_push_event (peerpad, event); + if (!rtxdata->request_passed) { + g_atomic_int_inc (&rtxdata->dropped_requests); + if (check_finished (rtxdata)) + g_main_context_wakeup (NULL); + } } gst_object_unref (peerpad); @@ -879,6 +911,9 @@ start_test_drop_multiple_and_check_results (GstElement * bin, GST_INFO ("starting test"); + g_atomic_int_set (&receive_rtxdata->received, 0); + g_atomic_int_set (&receive_rtxdata->dropped_requests, 0); + g_hash_table_remove_all (receive_rtxdata->ssrc_to_nb_packets_map); g_hash_table_remove_all (receive_rtxdata->ssrc_to_seqnum_offset_map); @@ -893,7 +928,8 @@ start_test_drop_multiple_and_check_results (GstElement * bin, ck_assert_int_ne (state_res, GST_STATE_CHANGE_FAILURE); GST_INFO ("running main loop"); - g_main_loop_run (main_loop); + while (!check_finished (receive_rtxdata)) + g_main_context_iteration (NULL, TRUE); /* check results */ itr_elements = gst_bin_iterate_elements (GST_BIN (bin)); @@ -934,10 +970,8 @@ start_test_drop_multiple_and_check_results (GstElement * bin, g_hash_table_foreach (receive_rtxdata->ssrc_to_nb_packets_map, compute_total_packets_received, (gpointer) & sum_all_packets_received); - /* check that we sent as many packets as received */ - /* when eos is received by sources we cannot ensure that every packets - * will be received by sinks (maybe queue flush ?) - */ + sum_all_packets_received += + g_atomic_int_get (&receive_rtxdata->dropped_requests); fail_if (sum_all_packets_sent < sum_all_packets_received); /* some packet are not received, I still have to figure out why @@ -990,10 +1024,10 @@ GST_START_TEST (test_drop_multiple_sender) GstElement *bin, *funnel, *rtprtxreceive, *sink; GstBus *bus; gboolean res; - GstPad *srcpad; + GstPad *srcpad, *sinkpad; guint drop_every_n_packets = 0; GList *send_rtxdata_list = NULL; - RTXReceiveMultipleData receive_rtxdata; + RTXReceiveMultipleData receive_rtxdata = { NULL }; GstStructure *pt_map; GST_INFO ("preparing test"); @@ -1016,21 +1050,18 @@ GST_START_TEST (test_drop_multiple_sender) g_object_set (sink, "qos", FALSE, NULL); gst_bin_add_many (GST_BIN (bin), funnel, rtprtxreceive, sink, NULL); - nb_sources = 0; - g_mutex_init (&lock_eos); - send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 96, 121)); + "rtpvrawpay", 96, 121, &receive_rtxdata)); send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 97, 122)); + "rtpvrawpay", 97, 122, &receive_rtxdata)); send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 98, 123)); + "rtpvrawpay", 98, 123, &receive_rtxdata)); send_rtxdata_list = g_list_append (send_rtxdata_list, add_sender (bin, "videotestsrc", - "rtpvrawpay", 99, 124)); + "rtpvrawpay", 99, 124, &receive_rtxdata)); pt_map = gst_structure_new ("application/x-rtp-pt-map", "96", G_TYPE_UINT, 121, "97", G_TYPE_UINT, 122, @@ -1050,23 +1081,25 @@ GST_START_TEST (test_drop_multiple_sender) &receive_rtxdata, NULL); gst_object_unref (srcpad); - main_loop = g_main_loop_new (NULL, FALSE); + sinkpad = gst_element_get_static_pad (rtprtxreceive, "sink"); + gst_pad_add_probe (sinkpad, + GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, + (GstPadProbeCallback) rtprtxreceive_sinkpad_probe_check_drop, + &receive_rtxdata, NULL); + gst_object_unref (sinkpad); + g_signal_connect (bus, "message::error", - (GCallback) message_received_multiple, bin); + (GCallback) message_received_multiple, NULL); g_signal_connect (bus, "message::warning", - (GCallback) message_received_multiple, bin); - g_signal_connect (bus, "message::eos", (GCallback) message_received_multiple, - bin); + (GCallback) message_received_multiple, NULL); for (drop_every_n_packets = 2; drop_every_n_packets < 10; drop_every_n_packets++) { - nb_eos = 0; start_test_drop_multiple_and_check_results (bin, send_rtxdata_list, &receive_rtxdata, drop_every_n_packets); } /* cleanup */ - g_main_loop_unref (main_loop); g_list_free_full (send_rtxdata_list, free_rtx_send_data); g_hash_table_destroy (receive_rtxdata.ssrc_to_nb_packets_map); @@ -1075,8 +1108,6 @@ GST_START_TEST (test_drop_multiple_sender) gst_bus_remove_signal_watch (bus); gst_object_unref (bus); gst_object_unref (bin); - - g_mutex_clear (&lock_eos); } GST_END_TEST; @@ -1269,10 +1300,9 @@ test_rtxsender_packet_retention (gboolean test_with_time) * to actually retransmit something */ if (j >= MAX (i - half_buffers, 1)) { guint64 end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND; - do - res = g_cond_wait_until (&check_cond, &check_mutex, end_time); - while (res == TRUE && last_out_buffer == g_list_last (buffers)); - fail_unless_equals_int (res, TRUE); + + while (last_out_buffer == g_list_last (buffers)) + fail_unless (g_cond_wait_until (&check_cond, &check_mutex, end_time)); } g_mutex_unlock (&check_mutex); }