rtpjitterbuffer: push the lost event from the timer thread

Instead of pushing the lost event from the chain function, schedule a timeout
that will push the lost event from the timer thread. This avoid blocking the
upstream thread while we push and sync the event.
This commit is contained in:
Wim Taymans 2013-09-18 14:57:09 +02:00
parent 5c1f2f0045
commit f2efdf28f5

View File

@ -239,6 +239,7 @@ typedef struct
{ {
guint idx; guint idx;
guint16 seqnum; guint16 seqnum;
guint num;
TimerType type; TimerType type;
GstClockTime timeout; GstClockTime timeout;
GstClockTime duration; GstClockTime duration;
@ -1486,7 +1487,7 @@ recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
static TimerData * static TimerData *
add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
guint16 seqnum, GstClockTime timeout, GstClockTime duration) guint16 seqnum, guint num, GstClockTime timeout, GstClockTime duration)
{ {
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
TimerData *timer; TimerData *timer;
@ -1502,6 +1503,7 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
timer->idx = len; timer->idx = len;
timer->type = type; timer->type = type;
timer->seqnum = seqnum; timer->seqnum = seqnum;
timer->num = num;
timer->timeout = timeout; timer->timeout = timeout;
timer->duration = duration; timer->duration = duration;
if (type == TIMER_TYPE_EXPECTED) { if (type == TIMER_TYPE_EXPECTED) {
@ -1562,7 +1564,7 @@ set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
/* find the seqnum timer */ /* find the seqnum timer */
timer = find_timer (jitterbuffer, type, seqnum); timer = find_timer (jitterbuffer, type, seqnum);
if (timer == NULL) { if (timer == NULL) {
timer = add_timer (jitterbuffer, type, seqnum, timeout, -1); timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, -1);
} else { } else {
reschedule_timer (jitterbuffer, timer, seqnum, timeout); reschedule_timer (jitterbuffer, timer, seqnum, timeout);
} }
@ -1644,7 +1646,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
if (timer) if (timer)
reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected); reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected);
else else
add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
expected, priv->packet_spacing); expected, priv->packet_spacing);
} else if (timer && timer->type != TIMER_TYPE_DEADLINE) { } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
/* if we had a timer, remove it, we don't know when to expect the next /* if we had a timer, remove it, we don't know when to expect the next
@ -1675,7 +1677,7 @@ calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
} }
} }
static GstFlowReturn static void
send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum, send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
guint lost_packets, GstClockTime timestamp, GstClockTime duration, guint lost_packets, GstClockTime timestamp, GstClockTime duration,
gboolean late) gboolean late)
@ -1710,25 +1712,15 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
"late", G_TYPE_BOOLEAN, late, NULL)); "late", G_TYPE_BOOLEAN, late, NULL));
JBUF_UNLOCK (priv); JBUF_UNLOCK (priv);
gst_pad_push_event (priv->srcpad, event); gst_pad_push_event (priv->srcpad, event);
JBUF_LOCK_CHECK (priv, flushing); JBUF_LOCK (priv);
}
return GST_FLOW_OK;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
return GST_FLOW_FLUSHING;
} }
} }
static void
static GstFlowReturn
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
guint16 seqnum, GstClockTime dts, gint gap) guint16 seqnum, GstClockTime dts, gint gap)
{ {
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn ret = GST_FLOW_OK;
GstClockTime total_duration, duration, expected_dts; GstClockTime total_duration, duration, expected_dts;
TimerType type; TimerType type;
@ -1770,9 +1762,10 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns), GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
lost_packets); lost_packets);
ret = /* this timer will fire immediately and the lost event will be pushed from
send_lost_event (jitterbuffer, expected, lost_packets, * the timer thread */
priv->last_in_dts + duration, gap_time, TRUE); add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
priv->last_in_dts + duration, gap_time);
expected += lost_packets; expected += lost_packets;
priv->last_in_dts += gap_time; priv->last_in_dts += gap_time;
@ -1790,11 +1783,10 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
} }
while (expected < seqnum) { while (expected < seqnum) {
add_timer (jitterbuffer, type, expected, expected_dts, duration); add_timer (jitterbuffer, type, expected, 0, expected_dts, duration);
expected_dts += duration; expected_dts += duration;
expected++; expected++;
} }
return ret;
} }
static GstFlowReturn static GstFlowReturn
@ -1918,9 +1910,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
} else { } else {
GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap); GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
/* fill in the gap with EXPECTED timers */ /* fill in the gap with EXPECTED timers */
ret = calculate_expected (jitterbuffer, expected, seqnum, dts, gap); calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
if (ret != GST_FLOW_OK)
goto out_flushing;
do_next_seqnum = TRUE; do_next_seqnum = TRUE;
} }
@ -2350,19 +2340,22 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
{ {
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstClockTime duration, timestamp; GstClockTime duration, timestamp;
guint seqnum; guint seqnum, num;
gboolean late;
seqnum = timer->seqnum; seqnum = timer->seqnum;
timestamp = apply_offset (jitterbuffer, timer->timeout); timestamp = apply_offset (jitterbuffer, timer->timeout);
duration = timer->duration; duration = timer->duration;
if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0) if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
duration = priv->packet_spacing; duration = priv->packet_spacing;
num = MAX (timer->num, 1);
late = timer->num > 0;
/* remove timer now */ /* remove timer now */
remove_timer (jitterbuffer, timer); remove_timer (jitterbuffer, timer);
JBUF_SIGNAL_EVENT (priv); JBUF_SIGNAL_EVENT (priv);
send_lost_event (jitterbuffer, seqnum, 1, timestamp, duration, FALSE); send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late);
return TRUE; return TRUE;
} }