net: ptp: Use GSubprocess instead of lower-level GLib APIs that don't work on Windows

libgstnet depends on GIO already anyway so we can as well make use of it
instead of a half-baked Windows implementation that doesn't actually
work.

As a next step, the helper process also needs to be made usable on
Windows.

See https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/1259

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/3887>
This commit is contained in:
Sebastian Dröge 2023-02-03 14:29:10 +02:00 committed by GStreamer Marge Bot
parent 85476eab08
commit 716aaa562b

View File

@ -54,27 +54,22 @@
#endif #endif
#include "gstptpclock.h" #include "gstptpclock.h"
#include "gstptp_private.h" #include "gstptp_private.h"
#ifdef HAVE_SYS_WAIT_H #include <gio/gio.h>
#include <sys/wait.h>
#endif #include <gst/base/base.h>
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
#define WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN
#include <windows.h> #include <windows.h>
#include <processthreadsapi.h> /* GetCurrentProcessId */ #include <processthreadsapi.h> /* GetCurrentProcessId */
#endif #endif
#include <sys/types.h>
#ifdef HAVE_UNISTD_H #ifdef HAVE_UNISTD_H
#include <unistd.h> #include <unistd.h>
#elif defined(G_OS_WIN32)
#include <io.h>
#endif #endif
#include <gst/base/base.h>
GST_DEBUG_CATEGORY_STATIC (ptp_debug); GST_DEBUG_CATEGORY_STATIC (ptp_debug);
#define GST_CAT_DEFAULT (ptp_debug) #define GST_CAT_DEFAULT (ptp_debug)
@ -241,11 +236,14 @@ static gboolean supported = TRUE;
#else #else
static gboolean supported = FALSE; static gboolean supported = FALSE;
#endif #endif
static GPid ptp_helper_pid; static GSubprocess *ptp_helper_process;
static GInputStream *stdout_pipe;
static GOutputStream *stdin_pipe;
static StdIOHeader stdio_header; /* buffer for reading */
static gchar stdout_buffer[8192]; /* buffer for reading */
static GThread *ptp_helper_thread; static GThread *ptp_helper_thread;
static GMainContext *main_context; static GMainContext *main_context;
static GMainLoop *main_loop; static GMainLoop *main_loop;
static GIOChannel *stdin_channel, *stdout_channel;
static GRand *delay_req_rand; static GRand *delay_req_rand;
static GstClock *observation_system_clock; static GstClock *observation_system_clock;
static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 }; static PtpClockIdentity ptp_clock_id = { GST_PTP_CLOCK_ID_NONE, 0 };
@ -942,7 +940,6 @@ send_delay_req_timeout (PtpPendingSync * sync)
StdIOHeader header = { 0, }; StdIOHeader header = { 0, };
guint8 delay_req[44]; guint8 delay_req[44];
GstByteWriter writer; GstByteWriter writer;
GIOStatus status;
gsize written; gsize written;
GError *err = NULL; GError *err = NULL;
@ -969,23 +966,20 @@ send_delay_req_timeout (PtpPendingSync * sync)
gst_byte_writer_put_uint64_be_unchecked (&writer, 0); gst_byte_writer_put_uint64_be_unchecked (&writer, 0);
gst_byte_writer_put_uint16_be_unchecked (&writer, 0); gst_byte_writer_put_uint16_be_unchecked (&writer, 0);
status = if (!g_output_stream_write_all (stdin_pipe, &header, sizeof (StdIOHeader),
g_io_channel_write_chars (stdout_channel, (gchar *) & header, &written, NULL, &err)) {
sizeof (header), &written, &err); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)
if (status == G_IO_STATUS_ERROR) { || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
g_warning ("Failed to write to stdout: %s", err->message); GST_ERROR ("Got EOF on stdout");
g_clear_error (&err); } else {
return G_SOURCE_REMOVE; GST_ERROR ("Failed to write header to stdin: %s", err->message);
} else if (status == G_IO_STATUS_EOF) { }
g_message ("EOF on stdout"); g_message ("EOF on stdout");
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
} else if (status != G_IO_STATUS_NORMAL) { } else if (written != sizeof (StdIOHeader)) {
g_warning ("Unexpected stdout write status: %d", status); GST_ERROR ("Unexpected write size: %" G_GSIZE_FORMAT, written);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} else if (written != sizeof (header)) {
g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
} }
@ -993,24 +987,20 @@ send_delay_req_timeout (PtpPendingSync * sync)
sync->delay_req_send_time_local = sync->delay_req_send_time_local =
gst_clock_get_time (observation_system_clock); gst_clock_get_time (observation_system_clock);
status = if (!g_output_stream_write_all (stdin_pipe, delay_req, 44, &written, NULL,
g_io_channel_write_chars (stdout_channel, &err)) {
(const gchar *) delay_req, 44, &written, &err); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)
if (status == G_IO_STATUS_ERROR) { || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
g_warning ("Failed to write to stdout: %s", err->message); GST_ERROR ("Got EOF on stdout");
g_clear_error (&err); } else {
g_main_loop_quit (main_loop); GST_ERROR ("Failed to write delay-req to stdin: %s", err->message);
return G_SOURCE_REMOVE; }
} else if (status == G_IO_STATUS_EOF) {
g_message ("EOF on stdout"); g_message ("EOF on stdout");
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
} else if (status != G_IO_STATUS_NORMAL) {
g_warning ("Unexpected stdout write status: %d", status);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} else if (written != 44) { } else if (written != 44) {
g_warning ("Unexpected write size: %" G_GSIZE_FORMAT, written); GST_ERROR ("Unexpected write size: %" G_GSIZE_FORMAT, written);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
} }
@ -1792,75 +1782,45 @@ handle_ptp_message (PtpMessage * msg, GstClockTime receive_time)
} }
} }
static gboolean static void have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res,
have_stdin_data_cb (GIOChannel * channel, GIOCondition condition, gpointer user_data);
static void
have_stdout_body (GInputStream * stdout_pipe, GAsyncResult * res,
gpointer user_data) gpointer user_data)
{ {
GIOStatus status;
StdIOHeader header;
gchar buffer[8192];
GError *err = NULL; GError *err = NULL;
gsize read; gsize read;
if ((condition & G_IO_STATUS_EOF)) { /* Finish reading the body */
GST_ERROR ("Got EOF on stdin"); if (!g_input_stream_read_all_finish (stdout_pipe, res, &read, &err)) {
g_main_loop_quit (main_loop); if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) ||
return G_SOURCE_REMOVE; g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
} GST_ERROR ("Got EOF on stdout");
} else {
status = GST_ERROR ("Failed to read header from stdout: %s", err->message);
g_io_channel_read_chars (channel, (gchar *) & header, sizeof (header), }
&read, &err);
if (status == G_IO_STATUS_ERROR) {
GST_ERROR ("Failed to read from stdin: %s", err->message);
g_clear_error (&err); g_clear_error (&err);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return;
} else if (status == G_IO_STATUS_EOF) { } else if (read == 0) {
GST_ERROR ("Got EOF on stdin"); GST_ERROR ("Got EOF on stdin");
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return;
} else if (status != G_IO_STATUS_NORMAL) { } else if (read != stdio_header.size) {
GST_ERROR ("Unexpected stdin read status: %d", status);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} else if (read != sizeof (header)) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read); GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return;
} else if (header.size > 8192) {
GST_ERROR ("Unexpected size: %u", header.size);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} }
status = g_io_channel_read_chars (channel, buffer, header.size, &read, &err); switch (stdio_header.type) {
if (status == G_IO_STATUS_ERROR) {
GST_ERROR ("Failed to read from stdin: %s", err->message);
g_clear_error (&err);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} else if (status == G_IO_STATUS_EOF) {
GST_ERROR ("EOF on stdin");
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} else if (status != G_IO_STATUS_NORMAL) {
GST_ERROR ("Unexpected stdin read status: %d", status);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
} else if (read != header.size) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE;
}
switch (header.type) {
case TYPE_EVENT: case TYPE_EVENT:
case TYPE_GENERAL:{ case TYPE_GENERAL:{
GstClockTime receive_time = gst_clock_get_time (observation_system_clock); GstClockTime receive_time = gst_clock_get_time (observation_system_clock);
PtpMessage msg; PtpMessage msg;
if (parse_ptp_message (&msg, (const guint8 *) buffer, header.size)) { if (parse_ptp_message (&msg, (const guint8 *) stdout_buffer,
stdio_header.size)) {
dump_ptp_message (&msg); dump_ptp_message (&msg);
handle_ptp_message (&msg, receive_time); handle_ptp_message (&msg, receive_time);
} }
@ -1868,13 +1828,13 @@ have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
} }
default: default:
case TYPE_CLOCK_ID:{ case TYPE_CLOCK_ID:{
if (header.size != 8) { if (stdio_header.size != 8) {
GST_ERROR ("Unexpected clock id size (%u != 8)", header.size); GST_ERROR ("Unexpected clock id size (%u != 8)", stdio_header.size);
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
return G_SOURCE_REMOVE; return;
} }
g_mutex_lock (&ptp_lock); g_mutex_lock (&ptp_lock);
ptp_clock_id.clock_identity = GST_READ_UINT64_BE (buffer); ptp_clock_id.clock_identity = GST_READ_UINT64_BE (stdout_buffer);
#ifdef G_OS_WIN32 #ifdef G_OS_WIN32
ptp_clock_id.port_number = (guint16) GetCurrentProcessId (); ptp_clock_id.port_number = (guint16) GetCurrentProcessId ();
#else #else
@ -1888,7 +1848,48 @@ have_stdin_data_cb (GIOChannel * channel, GIOCondition condition,
} }
} }
return G_SOURCE_CONTINUE; /* And read the next header */
memset (&stdio_header, 0, sizeof (StdIOHeader));
g_input_stream_read_all_async (stdout_pipe, &stdio_header,
sizeof (StdIOHeader), G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stdout_header, NULL);
}
static void
have_stdout_header (GInputStream * stdout_pipe, GAsyncResult * res,
gpointer user_data)
{
GError *err = NULL;
gsize read;
/* Finish reading the header */
if (!g_input_stream_read_all_finish (stdout_pipe, res, &read, &err)) {
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED) ||
g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
GST_ERROR ("Got EOF on stdout");
} else {
GST_ERROR ("Failed to read header from stdout: %s", err->message);
}
g_clear_error (&err);
g_main_loop_quit (main_loop);
return;
} else if (read == 0) {
GST_ERROR ("Got EOF on stdin");
g_main_loop_quit (main_loop);
return;
} else if (read != sizeof (StdIOHeader)) {
GST_ERROR ("Unexpected read size: %" G_GSIZE_FORMAT, read);
g_main_loop_quit (main_loop);
return;
} else if (stdio_header.size > 8192) {
GST_ERROR ("Unexpected size: %u", stdio_header.size);
g_main_loop_quit (main_loop);
return;
}
/* And now read the body */
g_input_stream_read_all_async (stdout_pipe, stdout_buffer, stdio_header.size,
G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback) have_stdout_body, NULL);
} }
/* Cleanup all announce messages and announce message senders /* Cleanup all announce messages and announce message senders
@ -1988,6 +1989,13 @@ ptp_helper_main (gpointer data)
GST_DEBUG ("Starting PTP helper loop"); GST_DEBUG ("Starting PTP helper loop");
g_main_context_push_thread_default (main_context);
memset (&stdio_header, 0, sizeof (StdIOHeader));
g_input_stream_read_all_async (stdout_pipe, &stdio_header,
sizeof (StdIOHeader), G_PRIORITY_DEFAULT, NULL,
(GAsyncReadyCallback) have_stdout_header, NULL);
/* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */ /* Check all 5 seconds, if we have to cleanup ANNOUNCE or pending syncs message */
cleanup_source = g_timeout_source_new_seconds (5); cleanup_source = g_timeout_source_new_seconds (5);
g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT); g_source_set_priority (cleanup_source, G_PRIORITY_DEFAULT);
@ -1998,6 +2006,8 @@ ptp_helper_main (gpointer data)
g_main_loop_run (main_loop); g_main_loop_run (main_loop);
GST_DEBUG ("Stopped PTP helper loop"); GST_DEBUG ("Stopped PTP helper loop");
g_main_context_pop_thread_default (main_context);
g_mutex_lock (&ptp_lock); g_mutex_lock (&ptp_lock);
ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE; ptp_clock_id.clock_identity = GST_PTP_CLOCK_ID_NONE;
ptp_clock_id.port_number = 0; ptp_clock_id.port_number = 0;
@ -2066,9 +2076,7 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
const gchar *env; const gchar *env;
gchar **argv = NULL; gchar **argv = NULL;
gint argc, argc_c; gint argc, argc_c;
gint fd_r, fd_w;
GError *err = NULL; GError *err = NULL;
GSource *stdin_source;
GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock"); GST_DEBUG_CATEGORY_INIT (ptp_debug, "ptp", 0, "PTP clock");
@ -2085,7 +2093,7 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
goto done; goto done;
} }
if (ptp_helper_pid) { if (ptp_helper_process) {
GST_DEBUG ("PTP currently initializing"); GST_DEBUG ("PTP currently initializing");
goto wait; goto wait;
} }
@ -2108,15 +2116,17 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
if (env == NULL) if (env == NULL)
env = g_getenv ("GST_PTP_HELPER"); env = g_getenv ("GST_PTP_HELPER");
if (env != NULL && *env != '\0') { if (env != NULL && *env != '\0') {
GST_LOG ("Trying GST_PTP_HELPER env var: %s", env);
argv[argc_c++] = g_strdup (env); argv[argc_c++] = g_strdup (env);
} else { } else {
argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED); argv[argc_c++] = g_strdup (GST_PTP_HELPER_INSTALLED);
} }
GST_LOG ("Using PTP helper process: %s", argv[argc_c - 1]);
if (clock_id != GST_PTP_CLOCK_ID_NONE) { if (clock_id != GST_PTP_CLOCK_ID_NONE) {
argv[argc_c++] = g_strdup ("-c"); argv[argc_c++] = g_strdup ("-c");
argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id); argv[argc_c++] = g_strdup_printf ("0x%016" G_GINT64_MODIFIER "x", clock_id);
GST_LOG ("Using clock ID: %s", argv[argc_c - 1]);
} }
if (interfaces != NULL) { if (interfaces != NULL) {
@ -2125,10 +2135,41 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
while (*ptr) { while (*ptr) {
argv[argc_c++] = g_strdup ("-i"); argv[argc_c++] = g_strdup ("-i");
argv[argc_c++] = g_strdup (*ptr); argv[argc_c++] = g_strdup (*ptr);
GST_LOG ("Using interface: %s", argv[argc_c - 1]);
ptr++; ptr++;
} }
} }
ptp_helper_process =
g_subprocess_newv ((const gchar * const *) argv,
G_SUBPROCESS_FLAGS_STDIN_PIPE | G_SUBPROCESS_FLAGS_STDOUT_PIPE, &err);
if (!ptp_helper_process) {
GST_ERROR ("Failed to start ptp helper process: %s", err->message);
g_clear_error (&err);
ret = FALSE;
supported = FALSE;
goto done;
}
stdin_pipe = g_subprocess_get_stdin_pipe (ptp_helper_process);
if (stdin_pipe)
g_object_ref (stdin_pipe);
stdout_pipe = g_subprocess_get_stdout_pipe (ptp_helper_process);
if (stdout_pipe)
g_object_ref (stdout_pipe);
if (!stdin_pipe || !stdout_pipe) {
GST_ERROR ("Failed to get ptp helper process pipes");
ret = FALSE;
supported = FALSE;
goto done;
}
delay_req_rand = g_rand_new ();
observation_system_clock =
g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
NULL);
gst_object_ref_sink (observation_system_clock);
main_context = g_main_context_new (); main_context = g_main_context_new ();
main_loop = g_main_loop_new (main_context, FALSE); main_loop = g_main_loop_new (main_context, FALSE);
@ -2141,39 +2182,6 @@ gst_ptp_init (guint64 clock_id, gchar ** interfaces)
goto done; goto done;
} }
if (!g_spawn_async_with_pipes (NULL, argv, NULL, 0, NULL, NULL,
&ptp_helper_pid, &fd_w, &fd_r, NULL, &err)) {
GST_ERROR ("Failed to start ptp helper process: %s", err->message);
g_clear_error (&err);
ret = FALSE;
supported = FALSE;
goto done;
}
stdin_channel = g_io_channel_unix_new (fd_r);
g_io_channel_set_encoding (stdin_channel, NULL, NULL);
g_io_channel_set_buffered (stdin_channel, FALSE);
g_io_channel_set_close_on_unref (stdin_channel, TRUE);
stdin_source =
g_io_create_watch (stdin_channel, G_IO_IN | G_IO_PRI | G_IO_HUP);
g_source_set_priority (stdin_source, G_PRIORITY_DEFAULT);
g_source_set_callback (stdin_source, (GSourceFunc) have_stdin_data_cb, NULL,
NULL);
g_source_attach (stdin_source, main_context);
g_source_unref (stdin_source);
/* Create stdout channel */
stdout_channel = g_io_channel_unix_new (fd_w);
g_io_channel_set_encoding (stdout_channel, NULL, NULL);
g_io_channel_set_close_on_unref (stdout_channel, TRUE);
g_io_channel_set_buffered (stdout_channel, FALSE);
delay_req_rand = g_rand_new ();
observation_system_clock =
g_object_new (GST_TYPE_SYSTEM_CLOCK, "name", "ptp-observation-clock",
NULL);
gst_object_ref_sink (observation_system_clock);
initted = TRUE; initted = TRUE;
wait: wait:
@ -2195,24 +2203,12 @@ done:
g_strfreev (argv); g_strfreev (argv);
if (!ret) { if (!ret) {
if (ptp_helper_pid) { if (ptp_helper_process) {
#ifndef G_OS_WIN32 g_clear_object (&stdin_pipe);
kill (ptp_helper_pid, SIGKILL); g_clear_object (&stdout_pipe);
waitpid (ptp_helper_pid, NULL, 0); g_subprocess_force_exit (ptp_helper_process);
#else g_clear_object (&ptp_helper_process);
TerminateProcess (ptp_helper_pid, 1);
WaitForSingleObject (ptp_helper_pid, INFINITE);
#endif
g_spawn_close_pid (ptp_helper_pid);
} }
ptp_helper_pid = 0;
if (stdin_channel)
g_io_channel_unref (stdin_channel);
stdin_channel = NULL;
if (stdout_channel)
g_io_channel_unref (stdout_channel);
stdout_channel = NULL;
if (main_loop && ptp_helper_thread) { if (main_loop && ptp_helper_thread) {
g_main_loop_quit (main_loop); g_main_loop_quit (main_loop);
@ -2256,24 +2252,12 @@ gst_ptp_deinit (void)
g_mutex_lock (&ptp_lock); g_mutex_lock (&ptp_lock);
if (ptp_helper_pid) { if (ptp_helper_process) {
#ifndef G_OS_WIN32 g_clear_object (&stdin_pipe);
kill (ptp_helper_pid, SIGKILL); g_clear_object (&stdout_pipe);
waitpid (ptp_helper_pid, NULL, 0); g_subprocess_force_exit (ptp_helper_process);
#else g_clear_object (&ptp_helper_process);
TerminateProcess (ptp_helper_pid, 1);
WaitForSingleObject (ptp_helper_pid, INFINITE);
#endif
g_spawn_close_pid (ptp_helper_pid);
} }
ptp_helper_pid = 0;
if (stdin_channel)
g_io_channel_unref (stdin_channel);
stdin_channel = NULL;
if (stdout_channel)
g_io_channel_unref (stdout_channel);
stdout_channel = NULL;
if (main_loop && ptp_helper_thread) { if (main_loop && ptp_helper_thread) {
GThread *tmp = ptp_helper_thread; GThread *tmp = ptp_helper_thread;