avtp: crf: Setup socket during state change to ensure we handle failure

Previously the socket would be created in the thread, which take some
time to start. As the tests were so short they would usually pass as
they don't actually use the socket.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/9364>
This commit is contained in:
Doug Nazar 2025-07-09 02:52:27 -04:00 committed by GStreamer Marge Bot
parent 7d8f9b840f
commit c00d77e6d7
2 changed files with 22 additions and 13 deletions

View File

@ -73,6 +73,7 @@ gst_avtp_crf_base_get_property (GObject * object, guint prop_id,
static GstStateChangeReturn gst_avtp_crf_base_change_state (GstElement *
element, GstStateChange transition);
static void crf_listener_thread_func (GstAvtpCrfBase * avtpcrfbase);
static int setup_socket (GstAvtpCrfBase * avtpcrfbase);
#define gst_avtp_crf_base_parent_class parent_class
G_DEFINE_TYPE (GstAvtpCrfBase, gst_avtp_crf_base, GST_TYPE_BASE_TRANSFORM);
@ -122,6 +123,7 @@ gst_avtp_crf_base_init (GstAvtpCrfBase * avtpcrfbase)
avtpcrfbase->streamid = DEFAULT_STREAMID;
avtpcrfbase->ifname = g_strdup (DEFAULT_IFNAME);
avtpcrfbase->address = g_strdup (DEFAULT_ADDRESS);
avtpcrfbase->thread_data.fd = -1;
}
static GstStateChangeReturn
@ -136,6 +138,13 @@ gst_avtp_crf_base_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
thread_data->fd = setup_socket (avtpcrfbase);
if (thread_data->fd < 0) {
GST_ELEMENT_ERROR (avtpcrfbase, RESOURCE, OPEN_READ,
("Cannot open socket for CRF Listener"), (NULL));
return GST_STATE_CHANGE_FAILURE;
}
thread_data->past_periods =
g_malloc0 (sizeof (thread_data->past_periods[0]) *
MAX_NUM_PERIODS_STORED);
@ -149,7 +158,9 @@ gst_avtp_crf_base_change_state (GstElement * element, GstStateChange transition)
GST_ERROR_OBJECT (avtpcrfbase, "failed to start thread, %s",
error->message);
g_error_free (error);
g_free (thread_data->past_periods);
g_clear_pointer (&thread_data->past_periods, g_free);
close (thread_data->fd);
thread_data->fd = -1;
return GST_STATE_CHANGE_FAILURE;
}
break;
@ -162,8 +173,12 @@ gst_avtp_crf_base_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_READY_TO_NULL:
thread_data->is_running = FALSE;
g_thread_join (thread_data->thread);
g_free (thread_data->past_periods);
g_clear_pointer (&thread_data->thread, g_thread_join);
g_clear_pointer (&thread_data->past_periods, g_free);
if (thread_data->fd > -1) {
close (thread_data->fd);
thread_data->fd = -1;
}
break;
default:
break;
@ -480,17 +495,12 @@ crf_listener_thread_func (GstAvtpCrfBase * avtpcrfbase)
GstAvtpCrfThreadData *data = &avtpcrfbase->thread_data;
struct avtp_crf_pdu *crf_pdu = g_alloca (MAX_AVTPDU_SIZE);
guint64 media_clk_reset;
int fd, n, res;
int n, res;
fd = setup_socket (avtpcrfbase);
if (fd < 0) {
GST_ELEMENT_ERROR (avtpcrfbase, RESOURCE, OPEN_READ,
("Cannot open socket for CRF Listener"), (NULL));
return;
}
g_assert (data->fd > -1);
while (data->is_running) {
n = recv (fd, crf_pdu, MAX_AVTPDU_SIZE, 0);
n = recv (data->fd, crf_pdu, MAX_AVTPDU_SIZE, 0);
if (n == -1) {
if (errno == EAGAIN || errno == EINTR)
@ -522,8 +532,6 @@ crf_listener_thread_func (GstAvtpCrfBase * avtpcrfbase)
calculate_average_period (avtpcrfbase, crf_pdu);
}
close (fd);
}
static void

View File

@ -43,6 +43,7 @@ struct _GstAvtpCrfThreadData
{
GThread *thread;
gboolean is_running;
gint fd;
guint64 num_pkt_tstamps;
GstClockTime timestamp_interval;