From fa0a233fa7237ce6cd26f6408791fbc22f67a2e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 30 Jan 2020 16:06:09 +0200 Subject: [PATCH] sctp: Make receive/packetout callbacks thread-safe --- ext/sctp/gstsctpdec.c | 7 ++-- ext/sctp/gstsctpenc.c | 6 +-- ext/sctp/sctpassociation.c | 84 +++++++++++++++++++------------------- ext/sctp/sctpassociation.h | 8 ++-- 4 files changed, 54 insertions(+), 51 deletions(-) diff --git a/ext/sctp/gstsctpdec.c b/ext/sctp/gstsctpdec.c index 121f738e5a..08551ad25a 100644 --- a/ext/sctp/gstsctpdec.c +++ b/ext/sctp/gstsctpdec.c @@ -474,7 +474,7 @@ configure_association (GstSctpDec * self) "local-port", G_BINDING_SYNC_CREATE); gst_sctp_association_set_on_packet_received (self->sctp_association, - on_receive, self); + on_receive, gst_object_ref (self), gst_object_unref); return TRUE; error: @@ -688,9 +688,8 @@ static void sctpdec_cleanup (GstSctpDec * self) { if (self->sctp_association) { - /* FIXME: make this threadsafe */ - /* gst_sctp_association_set_on_packet_received (self->sctp_association, NULL, - NULL); */ + gst_sctp_association_set_on_packet_received (self->sctp_association, NULL, + NULL, NULL); g_signal_handler_disconnect (self->sctp_association, self->signal_handler_stream_reset); stop_all_srcpad_tasks (self); diff --git a/ext/sctp/gstsctpenc.c b/ext/sctp/gstsctpenc.c index 818153dcb7..9c5bf0b2a5 100644 --- a/ext/sctp/gstsctpenc.c +++ b/ext/sctp/gstsctpenc.c @@ -798,7 +798,7 @@ configure_association (GstSctpEnc * self) "use-sock-stream", G_BINDING_SYNC_CREATE); gst_sctp_association_set_on_packet_out (self->sctp_association, - on_sctp_packet_out, self); + on_sctp_packet_out, gst_object_ref (self), gst_object_unref); return TRUE; error: @@ -905,8 +905,8 @@ sctpenc_cleanup (GstSctpEnc * self) { GstIterator *it; - /* FIXME: make this threadsafe */ - /* gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL); */ + gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL, + NULL); g_signal_handler_disconnect (self->sctp_association, self->signal_handler_state_changed); diff --git a/ext/sctp/sctpassociation.c b/ext/sctp/sctpassociation.c index 2ae4781217..fb32063808 100644 --- a/ext/sctp/sctpassociation.c +++ b/ext/sctp/sctpassociation.c @@ -190,7 +190,7 @@ gst_sctp_association_init (GstSctpAssociation * self) self->sctp_ass_sock = NULL; self->connection_thread = NULL; - g_mutex_init (&self->association_mutex); + g_rec_mutex_init (&self->association_mutex); self->state = GST_SCTP_ASSOCIATION_STATE_NEW; @@ -227,7 +227,7 @@ gst_sctp_association_set_property (GObject * object, guint prop_id, { GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object); - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) { switch (prop_id) { case PROP_LOCAL_PORT: @@ -258,14 +258,14 @@ gst_sctp_association_set_property (GObject * object, guint prop_id, break; } - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT) maybe_set_state_to_ready (self); return; error: - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); } static void @@ -273,7 +273,7 @@ maybe_set_state_to_ready (GstSctpAssociation * self) { gboolean signal_ready_state = FALSE; - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) && (self->local_port != 0 && self->remote_port != 0) && (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) { @@ -281,7 +281,7 @@ maybe_set_state_to_ready (GstSctpAssociation * self) gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY, FALSE); } - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); /* The reason the state is changed twice is that we do not want to change state with * notification while the association_mutex is locked. If someone listens @@ -353,7 +353,7 @@ gst_sctp_association_start (GstSctpAssociation * self) { gchar *thread_name; - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) { g_warning ("SCTP association is in wrong state and cannot be started"); goto configure_required; @@ -364,7 +364,7 @@ gst_sctp_association_start (GstSctpAssociation * self) gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_CONNECTING, FALSE); - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); /* The reason the state is changed twice is that we do not want to change state with * notification while the association_mutex is locked. If someone listens @@ -379,48 +379,46 @@ gst_sctp_association_start (GstSctpAssociation * self) return TRUE; error: - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR, TRUE); configure_required: - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); return FALSE; } void gst_sctp_association_set_on_packet_out (GstSctpAssociation * self, - GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data) + GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data, + GDestroyNotify destroy_notify) { g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); - g_mutex_lock (&self->association_mutex); - if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) { - self->packet_out_cb = packet_out_cb; - self->packet_out_user_data = user_data; - } else { - /* This is to be thread safe. The Association might try to write to the closure already */ - g_warning ("It is not possible to change packet callback in this state"); - } - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); + if (self->packet_out_destroy_notify) + self->packet_out_destroy_notify (self->packet_out_user_data); + self->packet_out_cb = packet_out_cb; + self->packet_out_user_data = user_data; + self->packet_out_destroy_notify = destroy_notify; + g_rec_mutex_unlock (&self->association_mutex); maybe_set_state_to_ready (self); } void gst_sctp_association_set_on_packet_received (GstSctpAssociation * self, - GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data) + GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data, + GDestroyNotify destroy_notify) { g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self)); - g_mutex_lock (&self->association_mutex); - if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) { - self->packet_received_cb = packet_received_cb; - self->packet_received_user_data = user_data; - } else { - /* This is to be thread safe. The Association might try to write to the closure already */ - g_warning ("It is not possible to change receive callback in this state"); - } - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); + if (self->packet_received_destroy_notify) + self->packet_received_destroy_notify (self->packet_received_user_data); + self->packet_received_cb = packet_received_cb; + self->packet_received_user_data = user_data; + self->packet_received_destroy_notify = destroy_notify; + g_rec_mutex_unlock (&self->association_mutex); maybe_set_state_to_ready (self); } @@ -442,7 +440,7 @@ gst_sctp_association_send_data (GstSctpAssociation * self, guint8 * buf, gboolean result = FALSE; struct sockaddr_conn remote_addr; - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) goto end; @@ -483,7 +481,7 @@ gst_sctp_association_send_data (GstSctpAssociation * self, guint8 * buf, result = TRUE; end: - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); return result; } @@ -500,10 +498,10 @@ gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id) srs->srs_number_streams = 1; srs->srs_stream_list[0] = stream_id; - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, length); - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); g_free (srs); } @@ -511,13 +509,13 @@ gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id) void gst_sctp_association_force_close (GstSctpAssociation * self) { - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); if (self->sctp_ass_sock) { usrsctp_close (self->sctp_ass_sock); self->sctp_ass_sock = NULL; } - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); } static struct socket * @@ -634,7 +632,7 @@ client_role_connect (GstSctpAssociation * self) struct sockaddr_conn addr; gint ret; - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); addr = get_sctp_socket_address (self, self->local_port); ret = usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &addr, @@ -652,10 +650,10 @@ client_role_connect (GstSctpAssociation * self) g_warning ("usrsctp_connect() error: (%u) %s", errno, strerror (errno)); goto error; } - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); return TRUE; error: - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); return FALSE; } @@ -665,9 +663,11 @@ sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos, { GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr); + g_rec_mutex_lock (&self->association_mutex); if (self->packet_out_cb) { self->packet_out_cb (self, buffer, length, self->packet_out_user_data); } + g_rec_mutex_unlock (&self->association_mutex); return 0; } @@ -769,7 +769,7 @@ handle_association_changed (GstSctpAssociation * self, switch (sac->sac_state) { case SCTP_COMM_UP: g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP_COMM_UP()"); - g_mutex_lock (&self->association_mutex); + g_rec_mutex_lock (&self->association_mutex); if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) { change_state = TRUE; new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED; @@ -779,7 +779,7 @@ handle_association_changed (GstSctpAssociation * self, } else { g_warning ("SCTP association in unexpected state"); } - g_mutex_unlock (&self->association_mutex); + g_rec_mutex_unlock (&self->association_mutex); break; case SCTP_COMM_LOST: g_warning ("SCTP event SCTP_COMM_LOST received"); @@ -824,10 +824,12 @@ static void handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen, guint16 stream_id, guint32 ppid) { + g_rec_mutex_lock (&self->association_mutex); if (self->packet_received_cb) { self->packet_received_cb (self, data, datalen, stream_id, ppid, self->packet_received_user_data); } + g_rec_mutex_unlock (&self->association_mutex); } static void diff --git a/ext/sctp/sctpassociation.h b/ext/sctp/sctpassociation.h index 4510ba70f0..fc53878600 100644 --- a/ext/sctp/sctpassociation.h +++ b/ext/sctp/sctpassociation.h @@ -78,7 +78,7 @@ struct _GstSctpAssociation gboolean use_sock_stream; struct socket *sctp_ass_sock; - GMutex association_mutex; + GRecMutex association_mutex; GstSctpAssociationState state; @@ -86,9 +86,11 @@ struct _GstSctpAssociation GstSctpAssociationPacketReceivedCb packet_received_cb; gpointer packet_received_user_data; + GDestroyNotify packet_received_destroy_notify; GstSctpAssociationPacketOutCb packet_out_cb; gpointer packet_out_user_data; + GDestroyNotify packet_out_destroy_notify; }; struct _GstSctpAssociationClass @@ -105,9 +107,9 @@ GstSctpAssociation *gst_sctp_association_get (guint32 association_id); gboolean gst_sctp_association_start (GstSctpAssociation * self); void gst_sctp_association_set_on_packet_out (GstSctpAssociation * self, - GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data); + GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data, GDestroyNotify destroy_notify); void gst_sctp_association_set_on_packet_received (GstSctpAssociation * self, - GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data); + GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data, GDestroyNotify destroy_notify); void gst_sctp_association_incoming_packet (GstSctpAssociation * self, guint8 * buf, guint32 length); gboolean gst_sctp_association_send_data (GstSctpAssociation * self,