diff --git a/subprojects/gstreamer/libs/gst/net/gstptpclock.c b/subprojects/gstreamer/libs/gst/net/gstptpclock.c index 018e81017b..5b2dd18631 100644 --- a/subprojects/gstreamer/libs/gst/net/gstptpclock.c +++ b/subprojects/gstreamer/libs/gst/net/gstptpclock.c @@ -54,27 +54,22 @@ #endif #include "gstptpclock.h" - #include "gstptp_private.h" -#ifdef HAVE_SYS_WAIT_H -#include -#endif +#include + +#include + #ifdef G_OS_WIN32 #define WIN32_LEAN_AND_MEAN #include #include /* GetCurrentProcessId */ #endif -#include #ifdef HAVE_UNISTD_H #include -#elif defined(G_OS_WIN32) -#include #endif -#include - GST_DEBUG_CATEGORY_STATIC (ptp_debug); #define GST_CAT_DEFAULT (ptp_debug) @@ -241,11 +236,14 @@ static gboolean supported = TRUE; #else static gboolean supported = FALSE; #endif -static GPid ptp_helper_pid; +static GSubprocess *ptp_helper_process; +static GInputStream *stdout_pipe; +static GOutputStream *stdin_pipe; +static StdIOHeader stdio_header; /* buffer for reading */ +static gchar stdout_buffer[8192]; /* buffer for reading */ static GThread *ptp_helper_thread; static GMainContext *main_context; static GMainLoop *main_loop; -static GIOChannel *stdin_channel, *stdout_channel; static GRand *delay_req_rand; static GstClock *observation_system_clock; static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 }; @@ -942,7 +940,6 @@ send_delay_req_timeout (PtpPendingSync * sync) StdIOHeader header = { 0, }; guint8 delay_req[44]; GstByteWriter writer; - GIOStatus status; gsize written; GError *err = NULL; @@ -969,23 +966,20 @@ send_delay_req_timeout (PtpPendingSync * sync) gst_byte_writer_put_uint64_be_unchecked (&writer, 0); gst_byte_writer_put_uint16_be_unchecked (&writer, 0); - status = - g_io_channel_write_chars (stdout_channel, (gchar *) & header, - sizeof (header), &written, &err); - if (status == G_IO_STATUS_ERROR) { - g_warning ("Failed to write to stdout: %s", err->message); - g_clear_error (&err); - return G_SOURCE_REMOVE; - } else if (status == G_IO_STATUS_EOF) { + if (!g_output_stream_write_all (stdin_pipe, &header, sizeof (StdIOHeader), + &written, NULL, &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) + || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) { + GST_ERROR ("Got EOF on stdout"); + } else { + GST_ERROR ("Failed to write header to stdin: %s", err->message); + } + g_message ("EOF on stdout"); g_main_loop_quit (main_loop); return G_SOURCE_REMOVE; - } else if (status != G_IO_STATUS_NORMAL) { - g_warning ("Unexpected stdout write status: %d", status); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (written != sizeof (header)) { - g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written); + } else if (written != sizeof (StdIOHeader)) { + GST_ERROR ("Unexpected write size: %" G_GSIZE_FORMAT, written); g_main_loop_quit (main_loop); return G_SOURCE_REMOVE; } @@ -993,24 +987,20 @@ send_delay_req_timeout (PtpPendingSync * sync) sync->delay_req_send_time_local = gst_clock_get_time (observation_system_clock); - status = - g_io_channel_write_chars (stdout_channel, - (const gchar *) delay_req, 44, &written, &err); - if (status == G_IO_STATUS_ERROR) { - g_warning ("Failed to write to stdout: %s", err->message); - g_clear_error (&err); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (status == G_IO_STATUS_EOF) { + if (!g_output_stream_write_all (stdin_pipe, delay_req, 44, &written, NULL, + &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) + || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) { + GST_ERROR ("Got EOF on stdout"); + } else { + GST_ERROR ("Failed to write delay-req to stdin: %s", err->message); + } + g_message ("EOF on stdout"); g_main_loop_quit (main_loop); return G_SOURCE_REMOVE; - } else if (status != G_IO_STATUS_NORMAL) { - g_warning ("Unexpected stdout write status: %d", status); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; } else if (written != 44) { - g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written); + GST_ERROR ("Unexpected write size: %" G_GSIZE_FORMAT, written); g_main_loop_quit (main_loop); return G_SOURCE_REMOVE; } @@ -1792,75 +1782,45 @@ handle_ptp_message (PtpMessage * msg, GstClockTime receive_time) } } -static gboolean -have_stdin_data_cb (GIOChannel * channel, GIOCondition condition, +static void have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res, + gpointer user_data); + +static void +have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res, gpointer user_data) { - GIOStatus status; - StdIOHeader header; - gchar buffer[8192]; GError *err = NULL; gsize read; - if ((condition & G_IO_STATUS_EOF)) { - GST_ERROR ("Got EOF on stdin"); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } - - status = - g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header), - &read, &err); - if (status == G_IO_STATUS_ERROR) { - GST_ERROR ("Failed to read from stdin: %s", err->message); + /* Finish reading the body */ + if (!g_input_stream_read_all_finish (stdout_pipe, res, &read, &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) || + g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) { + GST_ERROR ("Got EOF on stdout"); + } else { + GST_ERROR ("Failed to read header from stdout: %s", err->message); + } g_clear_error (&err); g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (status == G_IO_STATUS_EOF) { + return; + } else if (read == 0) { GST_ERROR ("Got EOF on stdin"); g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (status != G_IO_STATUS_NORMAL) { - GST_ERROR ("Unexpected stdin read status: %d", status); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (read != sizeof (header)) { + return; + } else if (read != stdio_header.size) { GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (header.size > 8192) { - GST_ERROR ("Unexpected size: %u", header.size); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; + return; } - status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err); - if (status == G_IO_STATUS_ERROR) { - GST_ERROR ("Failed to read from stdin: %s", err->message); - g_clear_error (&err); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (status == G_IO_STATUS_EOF) { - GST_ERROR ("EOF on stdin"); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (status != G_IO_STATUS_NORMAL) { - GST_ERROR ("Unexpected stdin read status: %d", status); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } else if (read != header.size) { - GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); - g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; - } - - switch (header.type) { + switch (stdio_header.type) { case TYPE_EVENT: case TYPE_GENERAL:{ GstClockTime receive_time = gst_clock_get_time (observation_system_clock); PtpMessage msg; - if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) { + if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer, + stdio_header.size)) { dump_ptp_message (&msg); handle_ptp_message (&msg, receive_time); } @@ -1868,13 +1828,13 @@ have_stdin_data_cb (GIOChannel * channel, GIOCondition condition, } default: case TYPE_CLOCK_ID:{ - if (header.size != 8) { - GST_ERROR ("Unexpected clock id size (%u != 8)", header.size); + if (stdio_header.size != 8) { + GST_ERROR ("Unexpected clock id size (%u != 8)", stdio_header.size); g_main_loop_quit (main_loop); - return G_SOURCE_REMOVE; + return; } g_mutex_lock (&ptp_lock); - ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer); + ptp_clock_id.clock_identity = GST_READ_UINT64_BE (stdout_buffer); #ifdef G_OS_WIN32 ptp_clock_id.port_number = (guint16) GetCurrentProcessId (); #else @@ -1888,7 +1848,48 @@ have_stdin_data_cb (GIOChannel * channel, GIOCondition condition, } } - return G_SOURCE_CONTINUE; + /* And read the next header */ + memset (&stdio_header, 0, sizeof (StdIOHeader)); + g_input_stream_read_all_async (stdout_pipe, &stdio_header, + sizeof (StdIOHeader), G_PRIORITY_DEFAULT, NULL, + (GAsyncReadyCallback) have_stdout_header, NULL); +} + +static void +have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res, + gpointer user_data) +{ + GError *err = NULL; + gsize read; + + /* Finish reading the header */ + if (!g_input_stream_read_all_finish (stdout_pipe, res, &read, &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) || + g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) { + GST_ERROR ("Got EOF on stdout"); + } else { + GST_ERROR ("Failed to read header from stdout: %s", err->message); + } + g_clear_error (&err); + g_main_loop_quit (main_loop); + return; + } else if (read == 0) { + GST_ERROR ("Got EOF on stdin"); + g_main_loop_quit (main_loop); + return; + } else if (read != sizeof (StdIOHeader)) { + GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); + g_main_loop_quit (main_loop); + return; + } else if (stdio_header.size > 8192) { + GST_ERROR ("Unexpected size: %u", stdio_header.size); + g_main_loop_quit (main_loop); + return; + } + + /* And now read the body */ + g_input_stream_read_all_async (stdout_pipe, stdout_buffer, stdio_header.size, + G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) have_stdout_body, NULL); } /* Cleanup all announce messages and announce message senders @@ -1988,6 +1989,13 @@ ptp_helper_main (gpointer data) GST_DEBUG ("Starting PTP helper loop"); + g_main_context_push_thread_default (main_context); + + memset (&stdio_header, 0, sizeof (StdIOHeader)); + g_input_stream_read_all_async (stdout_pipe, &stdio_header, + sizeof (StdIOHeader), G_PRIORITY_DEFAULT, NULL, + (GAsyncReadyCallback) have_stdout_header, NULL); + /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */ cleanup_source = g_timeout_source_new_seconds (5); g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT); @@ -1998,6 +2006,8 @@ ptp_helper_main (gpointer data) g_main_loop_run (main_loop); GST_DEBUG ("Stopped PTP helper loop"); + g_main_context_pop_thread_default (main_context); + g_mutex_lock (&ptp_lock); ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE; ptp_clock_id.port_number = 0; @@ -2066,9 +2076,7 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) const gchar *env; gchar **argv = NULL; gint argc, argc_c; - gint fd_r, fd_w; GError *err = NULL; - GSource *stdin_source; GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock"); @@ -2085,7 +2093,7 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) goto done; } - if (ptp_helper_pid) { + if (ptp_helper_process) { GST_DEBUG ("PTP currently initializing"); goto wait; } @@ -2108,15 +2116,17 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) if (env == NULL) env = g_getenv ("GST_PTP_HELPER"); if (env != NULL && *env != '\0') { - GST_LOG ("Trying GST_PTP_HELPER env var: %s", env); argv[argc_c++] = g_strdup (env); } else { argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED); } + GST_LOG ("Using PTP helper process: %s", argv[argc_c - 1]); + if (clock_id != GST_PTP_CLOCK_ID_NONE) { argv[argc_c++] = g_strdup ("-c"); argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id); + GST_LOG ("Using clock ID: %s", argv[argc_c - 1]); } if (interfaces != NULL) { @@ -2125,10 +2135,41 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) while (*ptr) { argv[argc_c++] = g_strdup ("-i"); argv[argc_c++] = g_strdup (*ptr); + GST_LOG ("Using interface: %s", argv[argc_c - 1]); ptr++; } } + ptp_helper_process = + g_subprocess_newv ((const gchar * const *) argv, + G_SUBPROCESS_FLAGS_STDIN_PIPE | G_SUBPROCESS_FLAGS_STDOUT_PIPE, &err); + if (!ptp_helper_process) { + GST_ERROR ("Failed to start ptp helper process: %s", err->message); + g_clear_error (&err); + ret = FALSE; + supported = FALSE; + goto done; + } + + stdin_pipe = g_subprocess_get_stdin_pipe (ptp_helper_process); + if (stdin_pipe) + g_object_ref (stdin_pipe); + stdout_pipe = g_subprocess_get_stdout_pipe (ptp_helper_process); + if (stdout_pipe) + g_object_ref (stdout_pipe); + if (!stdin_pipe || !stdout_pipe) { + GST_ERROR ("Failed to get ptp helper process pipes"); + ret = FALSE; + supported = FALSE; + goto done; + } + + delay_req_rand = g_rand_new (); + observation_system_clock = + g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock", + NULL); + gst_object_ref_sink (observation_system_clock); + main_context = g_main_context_new (); main_loop = g_main_loop_new (main_context, FALSE); @@ -2141,39 +2182,6 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces) goto done; } - if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL, - &ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) { - GST_ERROR ("Failed to start ptp helper process: %s", err->message); - g_clear_error (&err); - ret = FALSE; - supported = FALSE; - goto done; - } - - stdin_channel = g_io_channel_unix_new (fd_r); - g_io_channel_set_encoding (stdin_channel, NULL, NULL); - g_io_channel_set_buffered (stdin_channel, FALSE); - g_io_channel_set_close_on_unref (stdin_channel, TRUE); - stdin_source = - g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP); - g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT); - g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL, - NULL); - g_source_attach (stdin_source, main_context); - g_source_unref (stdin_source); - - /* Create stdout channel */ - stdout_channel = g_io_channel_unix_new (fd_w); - g_io_channel_set_encoding (stdout_channel, NULL, NULL); - g_io_channel_set_close_on_unref (stdout_channel, TRUE); - g_io_channel_set_buffered (stdout_channel, FALSE); - - delay_req_rand = g_rand_new (); - observation_system_clock = - g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock", - NULL); - gst_object_ref_sink (observation_system_clock); - initted = TRUE; wait: @@ -2195,24 +2203,12 @@ done: g_strfreev (argv); if (!ret) { - if (ptp_helper_pid) { -#ifndef G_OS_WIN32 - kill (ptp_helper_pid, SIGKILL); - waitpid (ptp_helper_pid, NULL, 0); -#else - TerminateProcess (ptp_helper_pid, 1); - WaitForSingleObject (ptp_helper_pid, INFINITE); -#endif - g_spawn_close_pid (ptp_helper_pid); + if (ptp_helper_process) { + g_clear_object (&stdin_pipe); + g_clear_object (&stdout_pipe); + g_subprocess_force_exit (ptp_helper_process); + g_clear_object (&ptp_helper_process); } - ptp_helper_pid = 0; - - if (stdin_channel) - g_io_channel_unref (stdin_channel); - stdin_channel = NULL; - if (stdout_channel) - g_io_channel_unref (stdout_channel); - stdout_channel = NULL; if (main_loop && ptp_helper_thread) { g_main_loop_quit (main_loop); @@ -2256,24 +2252,12 @@ gst_ptp_deinit (void) g_mutex_lock (&ptp_lock); - if (ptp_helper_pid) { -#ifndef G_OS_WIN32 - kill (ptp_helper_pid, SIGKILL); - waitpid (ptp_helper_pid, NULL, 0); -#else - TerminateProcess (ptp_helper_pid, 1); - WaitForSingleObject (ptp_helper_pid, INFINITE); -#endif - g_spawn_close_pid (ptp_helper_pid); + if (ptp_helper_process) { + g_clear_object (&stdin_pipe); + g_clear_object (&stdout_pipe); + g_subprocess_force_exit (ptp_helper_process); + g_clear_object (&ptp_helper_process); } - ptp_helper_pid = 0; - - if (stdin_channel) - g_io_channel_unref (stdin_channel); - stdin_channel = NULL; - if (stdout_channel) - g_io_channel_unref (stdout_channel); - stdout_channel = NULL; if (main_loop && ptp_helper_thread) { GThread *tmp = ptp_helper_thread;