diff --git a/ChangeLog b/ChangeLog index f4e9127cca..3209e28675 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +2008-05-23 Wim Taymans + + * gst-libs/gst/rtp/gstbasertpdepayload.c: + (gst_base_rtp_depayload_chain), + (gst_base_rtp_depayload_handle_sink_event), + (gst_base_rtp_depayload_push_full), + (gst_base_rtp_depayload_change_state): + Check sequence numbers, mark input buffers with a discont flag for the + subclass when we detected a gap, drop duplicate buffers. We do this + because one can use the element without a jitterbuffer in front and we + don't want to feed the subclasses invalid or reordered data. + Do an error when the subclass did not provide a process function instead + of crashing. + Some other small cleanups. + 2008-05-22 Tim-Philipp Müller * gst/videotestsrc/videotestsrc.c: (paint_hline_NV12_NV21): diff --git a/gst-libs/gst/rtp/gstbasertpdepayload.c b/gst-libs/gst/rtp/gstbasertpdepayload.c index 519265f457..21dbca236d 100644 --- a/gst-libs/gst/rtp/gstbasertpdepayload.c +++ b/gst-libs/gst/rtp/gstbasertpdepayload.c @@ -56,6 +56,8 @@ struct _GstBaseRTPDepayloadPrivate gboolean discont; GstClockTime timestamp; GstClockTime duration; + + guint32 next_seqnum; }; /* Filter signals and args */ @@ -255,25 +257,79 @@ gst_base_rtp_depayload_chain (GstPad * pad, GstBuffer * in) GstFlowReturn ret = GST_FLOW_OK; GstBuffer *out_buf; GstClockTime timestamp; + guint16 seqnum; + gboolean reset_seq, discont; + gint gap; filter = GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad)); + /* we must validate, it's possible that this element is plugged right after a + * network receiver and we don't want to operate on invalid data */ if (!gst_rtp_buffer_validate (in)) goto invalid_buffer; priv = filter->priv; priv->discont = GST_BUFFER_IS_DISCONT (in); + timestamp = GST_BUFFER_TIMESTAMP (in); /* convert to running_time and save the timestamp, this is the timestamp * we put on outgoing buffers. */ - timestamp = GST_BUFFER_TIMESTAMP (in); timestamp = gst_segment_to_running_time (&filter->segment, GST_FORMAT_TIME, timestamp); priv->timestamp = timestamp; priv->duration = GST_BUFFER_DURATION (in); + seqnum = gst_rtp_buffer_get_seq (in); + reset_seq = TRUE; + discont = FALSE; + + GST_LOG_OBJECT (filter, "discont %d, seqnum %u, timestamp %" + GST_TIME_FORMAT, priv->discont, seqnum, GST_TIME_ARGS (timestamp)); + + /* Check seqnum. This is a very simple check that makes sure that the seqnums + * are striclty increasing, dropping anything that is out of the ordinary. We + * can only do this when the next_seqnum is known. */ + if (priv->next_seqnum != -1) { + gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum); + + /* if we have no gap, all is fine */ + if (G_UNLIKELY (gap != 0)) { + GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum, + priv->next_seqnum, gap); + if (gap < 0) { + /* seqnum > next_seqnum, we are missing some packets, this is always a + * DISCONT. */ + GST_LOG_OBJECT (filter, "%d missing packets", gap); + discont = TRUE; + } else { + /* seqnum < next_seqnum, we have seen this packet before or the sender + * could be restarted. If the packet is not too old, we throw it away as + * a duplicate, otherwise we mark discont and continue. 100 misordered + * packets is a good threshold. See also RFC 4737. */ + if (gap < 100) + goto dropping; + + GST_LOG_OBJECT (filter, + "%d > 100, packet too old, sender likely restarted", gap); + discont = TRUE; + } + } + } + priv->next_seqnum = (seqnum + 1) & 0xffff; + + if (discont && !priv->discont) { + GST_LOG_OBJECT (filter, "mark DISCONT on input buffer"); + /* we detected a seqnum discont but the buffer was not flagged with a discont, + * set the discont flag so that the subclass can throw away old data. */ + priv->discont = TRUE; + GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT); + } + bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter); + if (G_UNLIKELY (bclass->process == NULL)) + goto no_process; + /* let's send it out to processing */ out_buf = bclass->process (filter, in); if (out_buf) { @@ -298,6 +354,20 @@ invalid_buffer: gst_buffer_unref (in); return GST_FLOW_OK; } +dropping: + { + GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap); + gst_buffer_unref (in); + return GST_FLOW_OK; + } +no_process: + { + /* this is not fatal but should be filtered earlier */ + GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL), + ("The subclass does not have a process method")); + gst_buffer_unref (in); + return GST_FLOW_ERROR; + } } static gboolean @@ -314,6 +384,7 @@ gst_base_rtp_depayload_handle_sink_event (GstPad * pad, GstEvent * event) gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED); filter->need_newsegment = TRUE; + filter->priv->next_seqnum = -1; break; case GST_EVENT_NEWSEGMENT: { @@ -377,7 +448,7 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter, /* set the caps if any */ srccaps = GST_PAD_CAPS (filter->srcpad); - if (srccaps) + if (G_LIKELY (srccaps)) gst_buffer_set_caps (out_buf, srccaps); bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter); @@ -386,7 +457,8 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter, if (bclass->set_gst_timestamp && do_ts) bclass->set_gst_timestamp (filter, rtptime, out_buf); - if (priv->discont) { + if (G_UNLIKELY (priv->discont)) { + GST_LOG_OBJECT (filter, "Marking DISCONT on output buffer"); GST_BUFFER_FLAG_SET (out_buf, GST_BUFFER_FLAG_DISCONT); priv->discont = FALSE; } @@ -395,8 +467,8 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter, GST_LOG_OBJECT (filter, "Pushing buffer size %d, timestamp %" GST_TIME_FORMAT, GST_BUFFER_SIZE (out_buf), GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (out_buf))); + ret = gst_pad_push (filter->srcpad, out_buf); - GST_LOG_OBJECT (filter, "Pushed buffer: %s", gst_flow_get_name (ret)); return ret; } @@ -548,6 +620,7 @@ gst_base_rtp_depayload_change_state (GstElement * element, priv->npt_stop = -1; priv->play_speed = 1.0; priv->play_scale = 1.0; + filter->priv->next_seqnum = -1; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: break;