From 96a0b1b9b90a81b6dfecda47f88aabdd3663965b Mon Sep 17 00:00:00 2001 From: Philippe Kalaf Date: Fri, 12 Aug 2005 13:34:56 +0000 Subject: [PATCH] gst-libs/gst/rtp/gstbasertpdepayload.*: Made a thread to release the queue. Original commit message from CVS: 2005-08-12 Philippe Khalaf * gst-libs/gst/rtp/gstbasertpdepayload.c: * gst-libs/gst/rtp/gstbasertpdepayload.h: Made a thread to release the queue. Removed timestamp conversion for now. --- ChangeLog | 6 + gst-libs/gst/rtp/gstbasertpdepayload.c | 159 +++++++++++++++++++++---- gst-libs/gst/rtp/gstbasertpdepayload.h | 12 ++ 3 files changed, 151 insertions(+), 26 deletions(-) diff --git a/ChangeLog b/ChangeLog index 18e47e6333..2d7081f8c3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,9 @@ +2005-08-12 Philippe Khalaf + * gst-libs/gst/rtp/gstbasertpdepayload.c: + * gst-libs/gst/rtp/gstbasertpdepayload.h: + Made a thread to release the queue. + Removed timestamp conversion for now. + 2005-08-10 Philippe Khalaf * gst-libs/gst/rtp/gstbasertpdepayload.c: * gst-libs/gst/rtp/gstbasertpdepayload.h: diff --git a/gst-libs/gst/rtp/gstbasertpdepayload.c b/gst-libs/gst/rtp/gstbasertpdepayload.c index 504683684f..a3d61b8b12 100644 --- a/gst-libs/gst/rtp/gstbasertpdepayload.c +++ b/gst-libs/gst/rtp/gstbasertpdepayload.c @@ -88,8 +88,10 @@ static gboolean gst_base_rtp_depayload_setcaps (GstPad * pad, GstCaps * caps); static GstFlowReturn gst_base_rtp_depayload_chain (GstPad * pad, GstBuffer * in); -static GstFlowReturn gst_base_rtp_depayload_add_to_queue - (GstBaseRTPDepayload * filter, GstRTPBuffer * in); +static GstElementStateReturn gst_base_rtp_depayload_change_state (GstElement * + element); +static GstFlowReturn gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload * + filter, GstRTPBuffer * in); static void gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter, guint32 timestamp, GstBuffer * buf); @@ -108,8 +110,10 @@ static void gst_base_rtp_depayload_class_init (GstBaseRTPDepayloadClass * klass) { GObjectClass *gobject_class; + GstElementClass *gstelement_class; gobject_class = G_OBJECT_CLASS (klass); + gstelement_class = (GstElementClass *) klass; gobject_class->set_property = gst_base_rtp_depayload_set_property; gobject_class->get_property = gst_base_rtp_depayload_get_property; @@ -124,6 +128,8 @@ gst_base_rtp_depayload_class_init (GstBaseRTPDepayloadClass * klass) gobject_class->finalize = gst_base_rtp_depayload_finalize; + gstelement_class->change_state = gst_base_rtp_depayload_change_state; + klass->add_to_queue = gst_base_rtp_depayload_add_to_queue; klass->set_gst_timestamp = gst_base_rtp_depayload_set_gst_timestamp; @@ -158,6 +164,9 @@ gst_base_rtp_depayload_init (GstBaseRTPDepayload * filter, gpointer g_class) filter->queue_delay = RTP_QUEUEDELAY; + // init queue mutex + QUEUE_LOCK_INIT (filter); + // this one needs to be overwritten by child filter->clock_rate = 0; } @@ -234,6 +243,7 @@ gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload * filter, GQueue *queue = filter->queue; // our first packet, just push it + QUEUE_LOCK (filter); if (g_queue_is_empty (queue)) { g_queue_push_tail (queue, in); } else @@ -255,28 +265,8 @@ gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload * filter, GST_DEBUG ("Packet added to queue %d at pos %d timestamp %u sn %d", g_queue_get_length (queue), i, in->timestamp, in->seqnum); - // if our queue is getting to big (more than RTP_QUEUEDELAY ms of data) - // release heading buffers - //GST_DEBUG("clockrate %d, queu_delay %d", filter->clock_rate, filter->queue_delay); - gfloat q_size_secs = (gfloat) filter->queue_delay / 1000; - guint maxtsunits = (gfloat) filter->clock_rate * q_size_secs; - - GST_DEBUG ("maxtsunit is %u", maxtsunits); - GST_DEBUG ("ts %d %d %d %d", in->timestamp, in->seqnum, - GST_RTPBUFFER (g_queue_peek_tail (queue))->timestamp, - GST_RTPBUFFER (g_queue_peek_tail (queue))->seqnum); - while (in->timestamp - - GST_RTPBUFFER (g_queue_peek_tail (queue))->timestamp > maxtsunits) { - GST_DEBUG ("Poping packet from queue"); - GstBaseRTPDepayloadClass *bclass = - GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter); - if (bclass->process) { - GstRTPBuffer *in = g_queue_pop_tail (queue); - - gst_base_rtp_depayload_push (filter, GST_RTPBUFFER (in)); - } - } } + QUEUE_UNLOCK (filter); return GST_FLOW_OK; } @@ -298,7 +288,7 @@ gst_base_rtp_depayload_push (GstBaseRTPDepayload * filter, // is the same as the timestamp wanted on the collector // maybe i should add a way to override this timestamp from the // depayloader child class - bclass->set_gst_timestamp (filter, rtp_buf->timestamp, out_buf); + //bclass->set_gst_timestamp (filter, rtp_buf->timestamp, out_buf); // push it GST_DEBUG ("Pushing buffer size %d, timestamp %u", GST_BUFFER_SIZE (out_buf), GST_BUFFER_TIMESTAMP (out_buf)); @@ -315,8 +305,7 @@ gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter, // rtp timestamps are based on the clock_rate // gst timesamps are in nanoseconds - GST_DEBUG ("calculating ts : timestamp : %u, clockrate : %u", timestamp, - filter->clock_rate); + //GST_DEBUG("calculating ts : timestamp : %u, clockrate : %u", timestamp, filter->clock_rate); guint64 ts = ((timestamp * GST_SECOND) / filter->clock_rate); GST_BUFFER_TIMESTAMP (buf) = ts; @@ -337,6 +326,124 @@ gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter, } } +static void +gst_base_rtp_depayload_queue_release (GstBaseRTPDepayload * filter) +{ + GQueue *queue = filter->queue; + + if (g_queue_is_empty (queue)) + return; + + // if our queue is getting to big (more than RTP_QUEUEDELAY ms of data) + // release heading buffers + //GST_DEBUG("clockrate %d, queu_delay %d", filter->clock_rate, filter->queue_delay); + gfloat q_size_secs = (gfloat) filter->queue_delay / 1000; + guint maxtsunits = (gfloat) filter->clock_rate * q_size_secs; + + //GST_DEBUG("maxtsunit is %u", maxtsunits); + //GST_DEBUG("ts %d %d %d", GST_RTPBUFFER(g_queue_peek_head (queue))->timestamp, GST_RTPBUFFER(g_queue_peek_tail (queue))->timestamp); + QUEUE_LOCK (filter); + while (GST_RTPBUFFER (g_queue_peek_head (queue))->timestamp - + GST_RTPBUFFER (g_queue_peek_tail (queue))->timestamp > maxtsunits) { + //GST_DEBUG("Poping packet from queue"); + GstBaseRTPDepayloadClass *bclass = + GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter); + if (bclass->process) { + GstRTPBuffer *in = g_queue_pop_tail (queue); + + gst_base_rtp_depayload_push (filter, GST_RTPBUFFER (in)); + } + } + QUEUE_UNLOCK (filter); +} + + +static gpointer +gst_base_rtp_depayload_thread (GstBaseRTPDepayload * filter) +{ + while (filter->thread_running) { + gst_base_rtp_depayload_queue_release (filter); + // i want to run this thread clock_rate times per second + g_usleep (1000000 / filter->clock_rate); + //g_usleep (1000000); + } + return NULL; +} + +static gboolean +gst_base_rtp_depayload_start_thread (GstBaseRTPDepayload * filter) +{ + GST_DEBUG ("Starting queue release thread"); + filter->thread_running = TRUE; + filter->thread = g_thread_create ((GThreadFunc) gst_base_rtp_depayload_thread, + filter, TRUE, NULL); + GST_DEBUG ("Started queue release thread"); + return TRUE; +} + +static gboolean +gst_base_rtp_depayload_stop_thread (GstBaseRTPDepayload * filter) +{ + filter->thread_running = FALSE; + + if (filter->thread) { + g_thread_join (filter->thread); + filter->thread = NULL; + } + QUEUE_LOCK_FREE (filter); + return TRUE; +} + +static GstElementStateReturn +gst_base_rtp_depayload_change_state (GstElement * element) +{ + GstBaseRTPDepayload *filter; + gint transition; + +// GstElementStateReturn ret; + + g_return_val_if_fail (GST_IS_BASE_RTP_DEPAYLOAD (element), GST_STATE_FAILURE); + filter = GST_BASE_RTP_DEPAYLOAD (element); + + /* we disallow changing the state from the thread */ + if (g_thread_self () == filter->thread) + return GST_STATE_FAILURE; + + transition = GST_STATE_TRANSITION (element); + + switch (transition) { + case GST_STATE_NULL_TO_READY: + if (!gst_base_rtp_depayload_start_thread (filter)) + goto start_failed; + break; + case GST_STATE_READY_TO_PAUSED: + break; + case GST_STATE_PAUSED_TO_PLAYING: + break; + default: + break; + } + +// ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); + + switch (transition) { + case GST_STATE_PLAYING_TO_PAUSED: + break; + case GST_STATE_PAUSED_TO_READY: + break; + case GST_STATE_READY_TO_NULL: + gst_base_rtp_depayload_stop_thread (filter); + break; + } + return GST_STATE_SUCCESS; + + /* ERRORS */ +start_failed: + { + return GST_STATE_FAILURE; + } +} + static void gst_base_rtp_depayload_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) diff --git a/gst-libs/gst/rtp/gstbasertpdepayload.h b/gst-libs/gst/rtp/gstbasertpdepayload.h index d3989f9ed2..5190973523 100644 --- a/gst-libs/gst/rtp/gstbasertpdepayload.h +++ b/gst-libs/gst/rtp/gstbasertpdepayload.h @@ -41,6 +41,11 @@ G_BEGIN_DECLS // in milliseconds #define RTP_QUEUEDELAY 100; +#define QUEUE_LOCK_INIT(base) (g_static_rec_mutex_init(&base->queuelock)) +#define QUEUE_LOCK_FREE(base) (g_static_rec_mutex_free(&base->queuelock)) +#define QUEUE_LOCK(base) (g_static_rec_mutex_lock(&base->queuelock)) +#define QUEUE_UNLOCK(base) (g_static_rec_mutex_unlock(&base->queuelock)) + typedef struct _GstBaseRTPDepayload GstBaseRTPDepayload; typedef struct _GstBaseRTPDepayloadClass GstBaseRTPDepayloadClass; @@ -50,6 +55,13 @@ struct _GstBaseRTPDepayload GstPad *sinkpad, *srcpad; + /* lock to protect the queue */ + GStaticRecMutex queuelock; + + gboolean thread_running; + /* the releaser thread */ + GThread *thread; + // this attribute must be set by the child guint clock_rate;