From d086491909d715dbd18a730f67d97726822aa182 Mon Sep 17 00:00:00 2001
From: Jochen Henneberg <jh@henneberg-systemdesign.com>
Date: Thu, 6 Jul 2023 10:13:06 +0200
Subject: [PATCH] rtpbasedepayload: Fixed HdrExt handling for aggregated output
 buffer

If a depayloader aggregates multiple RTP buffers into one buffer only
the last RTP buffer was checked for header extensions. Now the
depayloader remembers all RTP packets pushed before a output buffer is
pushed and checks all RTP buffers for header extensions.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4979>
---
 girs/GstRtp-1.0.gir                           | 145 +++++++++-
 .../gst-libs/gst/rtp/gstrtpbasedepayload.c    | 266 +++++++++++++++++-
 .../gst-libs/gst/rtp/gstrtpbasedepayload.h    |  16 ++
 .../tests/check/libs/rtpbasedepayload.c       | 212 +++++++++++++-
 .../tests/check/libs/rtpdummyhdrextimpl.c     |   3 +-
 5 files changed, 634 insertions(+), 8 deletions(-)

diff --git a/girs/GstRtp-1.0.gir b/girs/GstRtp-1.0.gir
index 4dad967d07..6ae0dfdb77 100644
--- a/girs/GstRtp-1.0.gir
+++ b/girs/GstRtp-1.0.gir
@@ -2332,7 +2332,39 @@ audio codec</doc>
       <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbaseaudiopayload.h"/>
     </record>
     <class name="RTPBaseDepayload" c:symbol-prefix="rtp_base_depayload" c:type="GstRTPBaseDepayload" parent="Gst.Element" abstract="1" glib:type-name="GstRTPBaseDepayload" glib:get-type="gst_rtp_base_depayload_get_type" glib:type-struct="RTPBaseDepayloadClass">
-      <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Provides a base class for RTP depayloaders</doc>
+      <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Provides a base class for RTP depayloaders
+
+In order to handle RTP header extensions correctly if the
+depayloader aggregates multiple RTP packet payloads into one output
+buffer this class provides the function
+gst_rtp_base_depayload_set_aggregate_hdrext_enabled(). If the
+aggregation is enabled the virtual functions
+@GstRTPBaseDepayload.process or
+@GstRTPBaseDepayload.process_rtp_packet must tell the base class
+what happens to the current RTP packet. By default the base class
+assumes that the packet payload is used with the next output
+buffer.
+
+If the RTP packet will not be used with an output buffer
+gst_rtp_base_depayload_dropped() must be called. A typical
+situation would be if we are waiting for a keyframe.
+
+If the RTP packet will be used but not with the current output
+buffer but with the next one gst_rtp_base_depayload_delayed() must
+be called. This may happen if the current RTP packet signals the
+start of a new output buffer and the currently processed output
+buffer will be pushed first. The undelay happens implicitly once
+the current buffer has been pushed or
+gst_rtp_base_depayload_flush() has been called.
+
+If gst_rtp_base_depayload_flush() is called all RTP packets that
+have not been dropped since the last output buffer are dropped,
+e.g. if an output buffer is discarded due to malformed data. This
+may or may not include the current RTP packet depending on the 2nd
+parameter @keep_current.
+
+Be aware that in case gst_rtp_base_depayload_push_list() is used
+each buffer will see the same list of RTP header extensions.</doc>
       <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
       <virtual-method name="handle_event">
         <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
@@ -2404,6 +2436,100 @@ audio codec</doc>
           </parameter>
         </parameters>
       </virtual-method>
+      <method name="delayed" c:identifier="gst_rtp_base_depayload_delayed" version="1.24">
+        <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Called from @GstRTPBaseDepayload.process or
+@GstRTPBaseDepayload.process_rtp_packet when the depayloader needs
+to keep the current input RTP header for use with the next output
+buffer.
+
+The delayed buffer will remain until the end of processing the
+current output buffer and then enqueued for processing with the
+next output buffer.
+
+A typical use-case is when the depayloader implementation will
+start a new output buffer for the current input RTP buffer but push
+the current output buffer first.
+
+Must be called with the stream lock held.</doc>
+        <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
+        <return-value transfer-ownership="none">
+          <type name="none" c:type="void"/>
+        </return-value>
+        <parameters>
+          <instance-parameter name="depayload" transfer-ownership="none">
+            <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
+            <type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
+          </instance-parameter>
+        </parameters>
+      </method>
+      <method name="dropped" c:identifier="gst_rtp_base_depayload_dropped" version="1.24">
+        <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Called from @GstRTPBaseDepayload.process or
+@GstRTPBaseDepayload.process_rtp_packet if the depayloader does not
+use the current buffer for the output buffer. This will either drop
+the delayed buffer or the last buffer from the header extension
+cache.
+
+A typical use-case is when the depayloader implementation is
+dropping an input RTP buffer while waiting for the first keyframe.
+
+Must be called with the stream lock held.</doc>
+        <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
+        <return-value transfer-ownership="none">
+          <type name="none" c:type="void"/>
+        </return-value>
+        <parameters>
+          <instance-parameter name="depayload" transfer-ownership="none">
+            <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
+            <type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
+          </instance-parameter>
+        </parameters>
+      </method>
+      <method name="flush" c:identifier="gst_rtp_base_depayload_flush" version="1.24">
+        <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">If @GstRTPBaseDepayload.process or
+@GstRTPBaseDepayload.process_rtp_packet drop an output buffer this
+function tells the base class to flush header extension cache as
+well.
+
+This will not drop an input RTP header marked as delayed from
+gst_rtp_base_depayload_delayed().
+
+If @keep_current is %TRUE the current input RTP header will be kept
+and enqueued after flushing the previous input RTP headers.
+
+A typical use-case for @keep_current is when the depayloader
+implementation invalidates the current output buffer and starts a
+new one with the current RTP input buffer.
+
+Must be called with the stream lock held.</doc>
+        <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
+        <return-value transfer-ownership="none">
+          <type name="none" c:type="void"/>
+        </return-value>
+        <parameters>
+          <instance-parameter name="depayload" transfer-ownership="none">
+            <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
+            <type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
+          </instance-parameter>
+          <parameter name="keep_current" transfer-ownership="none">
+            <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">if the current RTP buffer shall be kept</doc>
+            <type name="gboolean" c:type="gboolean"/>
+          </parameter>
+        </parameters>
+      </method>
+      <method name="is_aggregate_hdrext_enabled" c:identifier="gst_rtp_base_depayload_is_aggregate_hdrext_enabled" version="1.24">
+        <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Queries whether header extensions will be aggregated per depayloaded buffers.</doc>
+        <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
+        <return-value transfer-ownership="none">
+          <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">%TRUE if aggregate-header-extension is enabled.</doc>
+          <type name="gboolean" c:type="gboolean"/>
+        </return-value>
+        <parameters>
+          <instance-parameter name="depayload" transfer-ownership="none">
+            <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
+            <type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
+          </instance-parameter>
+        </parameters>
+      </method>
       <method name="is_source_info_enabled" c:identifier="gst_rtp_base_depayload_is_source_info_enabled" version="1.16">
         <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Queries whether #GstRTPSourceMeta will be added to depayloaded buffers.</doc>
         <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
@@ -2459,6 +2585,23 @@ the outgoing buffer when it didn't have a timestamp already.</doc>
           </parameter>
         </parameters>
       </method>
+      <method name="set_aggregate_hdrext_enabled" c:identifier="gst_rtp_base_depayload_set_aggregate_hdrext_enabled" version="1.24">
+        <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Enable or disable aggregating header extensions.</doc>
+        <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
+        <return-value transfer-ownership="none">
+          <type name="none" c:type="void"/>
+        </return-value>
+        <parameters>
+          <instance-parameter name="depayload" transfer-ownership="none">
+            <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">a #GstRTPBaseDepayload</doc>
+            <type name="RTPBaseDepayload" c:type="GstRTPBaseDepayload*"/>
+          </instance-parameter>
+          <parameter name="enable" transfer-ownership="none">
+            <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">whether to aggregate header extensions per output buffer</doc>
+            <type name="gboolean" c:type="gboolean"/>
+          </parameter>
+        </parameters>
+      </method>
       <method name="set_source_info_enabled" c:identifier="gst_rtp_base_depayload_set_source_info_enabled" version="1.16">
         <doc xml:space="preserve" filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c">Enable or disable adding #GstRTPSourceMeta to depayloaded buffers.</doc>
         <source-position filename="../subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h"/>
diff --git a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c
index 0fd2d827a5..c788898ef2 100644
--- a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c
+++ b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.c
@@ -24,6 +24,38 @@
  * @short_description: Base class for RTP depayloader
  *
  * Provides a base class for RTP depayloaders
+ *
+ * In order to handle RTP header extensions correctly if the
+ * depayloader aggregates multiple RTP packet payloads into one output
+ * buffer this class provides the function
+ * gst_rtp_base_depayload_set_aggregate_hdrext_enabled(). If the
+ * aggregation is enabled the virtual functions
+ * @GstRTPBaseDepayload.process or
+ * @GstRTPBaseDepayload.process_rtp_packet must tell the base class
+ * what happens to the current RTP packet. By default the base class
+ * assumes that the packet payload is used with the next output
+ * buffer.
+ *
+ * If the RTP packet will not be used with an output buffer
+ * gst_rtp_base_depayload_dropped() must be called. A typical
+ * situation would be if we are waiting for a keyframe.
+ *
+ * If the RTP packet will be used but not with the current output
+ * buffer but with the next one gst_rtp_base_depayload_delayed() must
+ * be called. This may happen if the current RTP packet signals the
+ * start of a new output buffer and the currently processed output
+ * buffer will be pushed first. The undelay happens implicitly once
+ * the current buffer has been pushed or
+ * gst_rtp_base_depayload_flush() has been called.
+ *
+ * If gst_rtp_base_depayload_flush() is called all RTP packets that
+ * have not been dropped since the last output buffer are dropped,
+ * e.g. if an output buffer is discarded due to malformed data. This
+ * may or may not include the current RTP packet depending on the 2nd
+ * parameter @keep_current.
+ *
+ * Be aware that in case gst_rtp_base_depayload_push_list() is used
+ * each buffer will see the same list of RTP header extensions.
  */
 #ifdef HAVE_CONFIG_H
 #include "config.h"
@@ -75,6 +107,13 @@ struct _GstRTPBaseDepayloadPrivate
 
   /* array of GstRTPHeaderExtension's * */
   GPtrArray *header_exts;
+
+  /* maintain buffer list for header extensions read() */
+  gboolean hdrext_aggregate;
+  GstBufferList *hdrext_buffers;
+  GstBuffer *hdrext_delayed;
+  GstBuffer *hdrext_outbuf;
+  gboolean hdrext_read_result;
 };
 
 /* Filter signals and args */
@@ -139,6 +178,11 @@ static void gst_rtp_base_depayload_add_extension (GstRTPBaseDepayload *
 static void gst_rtp_base_depayload_clear_extensions (GstRTPBaseDepayload *
     rtpbasepayload);
 
+static gboolean gst_rtp_base_depayload_operate_hdrext_buffer (GstBuffer **
+    buffer, guint idx, gpointer depayloader);
+static void gst_rtp_base_depayload_reset_hdrext_buffers (GstRTPBaseDepayload *
+    rtpbasepayload);
+
 GType
 gst_rtp_base_depayload_get_type (void)
 {
@@ -404,11 +448,13 @@ gst_rtp_base_depayload_init (GstRTPBaseDepayload * filter,
   priv->source_info = DEFAULT_SOURCE_INFO;
   priv->max_reorder = DEFAULT_MAX_REORDER;
   priv->auto_hdr_ext = DEFAULT_AUTO_HEADER_EXTENSION;
+  priv->hdrext_aggregate = FALSE;
 
   gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
 
   priv->header_exts =
       g_ptr_array_new_with_free_func ((GDestroyNotify) gst_object_unref);
+  priv->hdrext_buffers = gst_buffer_list_new ();
 }
 
 static void
@@ -417,7 +463,7 @@ gst_rtp_base_depayload_finalize (GObject * object)
   GstRTPBaseDepayload *rtpbasedepayload = GST_RTP_BASE_DEPAYLOAD (object);
 
   g_ptr_array_unref (rtpbasedepayload->priv->header_exts);
-  rtpbasedepayload->priv->header_exts = NULL;
+  gst_clear_buffer_list (&rtpbasedepayload->priv->hdrext_buffers);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -808,6 +854,22 @@ gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter,
 
   priv->input_buffer = in;
 
+  if (discont) {
+    gst_rtp_base_depayload_reset_hdrext_buffers (filter);
+    g_assert_null (priv->hdrext_delayed);
+  }
+
+  /* update RTP buffer cache for header extensions */
+  if (priv->hdrext_aggregate) {
+    GstBuffer *b = gst_buffer_new ();
+    /* make a copy of the buffer that only contains the RTP header
+       with the extensions to not waste too much memory */
+    guint s = gst_rtp_buffer_get_header_len (&rtp);
+    gst_buffer_copy_into (b, in,
+        GST_BUFFER_COPY_MEMORY | GST_BUFFER_COPY_DEEP, 0, s);
+    gst_buffer_list_add (priv->hdrext_buffers, b);
+  }
+
   if (process_rtp_packet_func != NULL) {
     out_buf = process_rtp_packet_func (filter, &rtp);
     gst_rtp_buffer_unmap (&rtp);
@@ -820,10 +882,22 @@ gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter,
 
   /* let's send it out to processing */
   if (out_buf) {
-    if (priv->process_flow_ret == GST_FLOW_OK)
+    if (priv->process_flow_ret == GST_FLOW_OK) {
       priv->process_flow_ret = gst_rtp_base_depayload_push (filter, out_buf);
-    else
+    } else {
       gst_buffer_unref (out_buf);
+      gst_rtp_base_depayload_reset_hdrext_buffers (filter);
+    }
+  }
+
+  /* if the current buffer is delayed the depayloader should either
+     have called gst_rtp_base_depayload_push() internally or returned
+     a buffer that's pushed, either way the buffer cache should be
+     empty here and we append the delayed buffer */
+  if (priv->hdrext_delayed) {
+    g_assert_true (gst_buffer_list_length (priv->hdrext_buffers) == 0);
+    gst_buffer_list_add (priv->hdrext_buffers, priv->hdrext_delayed);
+    priv->hdrext_delayed = NULL;
   }
 
   gst_buffer_unref (in);
@@ -1283,12 +1357,34 @@ out:
   return needs_src_caps_update;
 }
 
+static gboolean
+gst_rtp_base_depayload_operate_hdrext_buffer (GstBuffer ** buffer,
+    guint idx, gpointer depayloader)
+{
+  GstRTPBaseDepayload *depayload = depayloader;
+
+  depayload->priv->hdrext_read_result |=
+      read_rtp_header_extensions (depayload, *buffer,
+      depayload->priv->hdrext_outbuf);
+  return TRUE;
+}
+
+static void
+gst_rtp_base_depayload_reset_hdrext_buffers (GstRTPBaseDepayload * depayload)
+{
+  GstRTPBaseDepayloadPrivate *priv = depayload->priv;
+
+  gst_buffer_list_unref (priv->hdrext_buffers);
+  priv->hdrext_buffers = gst_buffer_list_new ();
+}
+
 static gboolean
 gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload,
     GstBuffer * buffer)
 {
   GstRTPBaseDepayloadPrivate *priv = depayload->priv;
   GstClockTime pts, dts, duration;
+  gboolean ret = FALSE;
 
   pts = GST_BUFFER_PTS (buffer);
   dts = GST_BUFFER_DTS (buffer);
@@ -1318,10 +1414,25 @@ gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload,
     if (priv->source_info)
       add_rtp_source_meta (buffer, priv->input_buffer);
 
-    return read_rtp_header_extensions (depayload, priv->input_buffer, buffer);
+    if (priv->hdrext_aggregate) {
+      priv->hdrext_read_result = FALSE;
+      priv->hdrext_outbuf = buffer;
+      /* if we have an empty list but a delayed RTP buffer let's use it */
+      if (!gst_buffer_list_length (priv->hdrext_buffers) &&
+          priv->hdrext_delayed) {
+        gst_buffer_list_add (priv->hdrext_buffers, priv->hdrext_delayed);
+        priv->hdrext_delayed = NULL;
+      }
+      gst_buffer_list_foreach (priv->hdrext_buffers,
+          gst_rtp_base_depayload_operate_hdrext_buffer, depayload);
+      ret = priv->hdrext_read_result;
+      priv->hdrext_outbuf = NULL;
+    } else {
+      ret = read_rtp_header_extensions (depayload, priv->input_buffer, buffer);
+    }
   }
 
-  return FALSE;
+  return ret;
 }
 
 static GstFlowReturn
@@ -1447,6 +1558,8 @@ gst_rtp_base_depayload_do_push (GstRTPBaseDepayload * filter, gboolean is_list,
     gst_clear_buffer (&buf);
   }
 
+  gst_rtp_base_depayload_reset_hdrext_buffers (filter);
+
   return res;
 }
 
@@ -1718,3 +1831,146 @@ gst_rtp_base_depayload_is_source_info_enabled (GstRTPBaseDepayload * depayload)
 {
   return depayload->priv->source_info;
 }
+
+/**
+ * gst_rtp_base_depayload_set_aggregate_hdrext_enabled:
+ * @depayload: a #GstRTPBaseDepayload
+ * @enable: whether to aggregate header extensions per output buffer
+ *
+ * Enable or disable aggregating header extensions.
+ *
+ * Since: 1.24
+ **/
+void
+gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GstRTPBaseDepayload *
+    depayload, gboolean enable)
+{
+  depayload->priv->hdrext_aggregate = enable;
+  if (!enable)
+    gst_rtp_base_depayload_reset_hdrext_buffers (depayload);
+}
+
+/**
+ * gst_rtp_base_depayload_is_aggregate_hdrext_enabled:
+ * @depayload: a #GstRTPBaseDepayload
+ *
+ * Queries whether header extensions will be aggregated per depayloaded buffers.
+ *
+ * Returns: %TRUE if aggregate-header-extension is enabled.
+ *
+ * Since: 1.24
+ **/
+gboolean
+gst_rtp_base_depayload_is_aggregate_hdrext_enabled (GstRTPBaseDepayload *
+    depayload)
+{
+  return depayload->priv->hdrext_aggregate;
+}
+
+/**
+ * gst_rtp_base_depayload_dropped:
+ * @depayload: a #GstRTPBaseDepayload
+ *
+ * Called from @GstRTPBaseDepayload.process or
+ * @GstRTPBaseDepayload.process_rtp_packet if the depayloader does not
+ * use the current buffer for the output buffer. This will either drop
+ * the delayed buffer or the last buffer from the header extension
+ * cache.
+ *
+ * A typical use-case is when the depayloader implementation is
+ * dropping an input RTP buffer while waiting for the first keyframe.
+ *
+ * Must be called with the stream lock held.
+ *
+ * Since: 1.24
+ **/
+void
+gst_rtp_base_depayload_dropped (GstRTPBaseDepayload * depayload)
+{
+  GstRTPBaseDepayloadPrivate *priv = depayload->priv;
+  guint l = gst_buffer_list_length (priv->hdrext_buffers);
+
+  if (priv->hdrext_delayed) {
+    gst_clear_buffer (&priv->hdrext_delayed);
+  } else if (l) {
+    gst_buffer_list_remove (priv->hdrext_buffers, l - 1, 1);
+  }
+}
+
+/**
+ * gst_rtp_base_depayload_delayed:
+ * @depayload: a #GstRTPBaseDepayload
+ *
+ * Called from @GstRTPBaseDepayload.process or
+ * @GstRTPBaseDepayload.process_rtp_packet when the depayloader needs
+ * to keep the current input RTP header for use with the next output
+ * buffer.
+ *
+ * The delayed buffer will remain until the end of processing the
+ * current output buffer and then enqueued for processing with the
+ * next output buffer.
+ *
+ * A typical use-case is when the depayloader implementation will
+ * start a new output buffer for the current input RTP buffer but push
+ * the current output buffer first.
+ *
+ * Must be called with the stream lock held.
+ *
+ * Since: 1.24
+ **/
+void
+gst_rtp_base_depayload_delayed (GstRTPBaseDepayload * depayload)
+{
+  GstRTPBaseDepayloadPrivate *priv = depayload->priv;
+  guint l = gst_buffer_list_length (priv->hdrext_buffers);
+
+  if (l) {
+    priv->hdrext_delayed = gst_buffer_list_get (priv->hdrext_buffers, l - 1);
+    gst_buffer_ref (priv->hdrext_delayed);
+    gst_buffer_list_remove (priv->hdrext_buffers, l - 1, 1);
+  }
+}
+
+/**
+ * gst_rtp_base_depayload_flush:
+ * @depayload: a #GstRTPBaseDepayload
+ * @keep_current: if the current RTP buffer shall be kept
+ *
+ * If @GstRTPBaseDepayload.process or
+ * @GstRTPBaseDepayload.process_rtp_packet drop an output buffer this
+ * function tells the base class to flush header extension cache as
+ * well.
+ *
+ * This will not drop an input RTP header marked as delayed from
+ * gst_rtp_base_depayload_delayed().
+ *
+ * If @keep_current is %TRUE the current input RTP header will be kept
+ * and enqueued after flushing the previous input RTP headers.
+ *
+ * A typical use-case for @keep_current is when the depayloader
+ * implementation invalidates the current output buffer and starts a
+ * new one with the current RTP input buffer.
+ *
+ * Must be called with the stream lock held.
+ *
+ * Since: 1.24
+ **/
+void
+gst_rtp_base_depayload_flush (GstRTPBaseDepayload * depayload,
+    gboolean keep_current)
+{
+  GstRTPBaseDepayloadPrivate *priv = depayload->priv;
+  guint l = gst_buffer_list_length (priv->hdrext_buffers);
+
+  /* if the current buffer shall not be kept or has already been
+     removed from the cache clear the cache */
+  if (!keep_current || priv->hdrext_delayed) {
+    gst_rtp_base_depayload_reset_hdrext_buffers (depayload);
+  } else if (l) {
+    /* clear all cached buffers (if any) except the delayed */
+    GstBuffer *b = gst_buffer_list_get (priv->hdrext_buffers, l - 1);
+    gst_buffer_ref (b);
+    gst_rtp_base_depayload_reset_hdrext_buffers (depayload);
+    gst_buffer_list_add (priv->hdrext_buffers, b);
+  }
+}
diff --git a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h
index 341a61551c..1b778fc43b 100644
--- a/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h
+++ b/subprojects/gst-plugins-base/gst-libs/gst/rtp/gstrtpbasedepayload.h
@@ -127,6 +127,22 @@ GST_RTP_API
 void            gst_rtp_base_depayload_set_source_info_enabled (GstRTPBaseDepayload * depayload,
                                                                 gboolean enable);
 
+GST_RTP_API
+void            gst_rtp_base_depayload_dropped (GstRTPBaseDepayload * depayload);
+
+GST_RTP_API
+void            gst_rtp_base_depayload_delayed (GstRTPBaseDepayload * depayload);
+
+GST_RTP_API
+void            gst_rtp_base_depayload_flush   (GstRTPBaseDepayload * depayload,
+                                                gboolean keep_current);
+
+GST_RTP_API
+gboolean        gst_rtp_base_depayload_is_aggregate_hdrext_enabled  (GstRTPBaseDepayload * depayload);
+
+GST_RTP_API
+void            gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GstRTPBaseDepayload * depayload,
+                                                                     gboolean enable);
 
 G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstRTPBaseDepayload, gst_object_unref)
 
diff --git a/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c b/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c
index 464b95fca2..29e9098663 100644
--- a/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c
+++ b/subprojects/gst-plugins-base/tests/check/libs/rtpbasedepayload.c
@@ -48,8 +48,17 @@ typedef enum
   GST_RTP_DUMMY_RETURN_TO_PUSH,
   GST_RTP_DUMMY_USE_PUSH_FUNC,
   GST_RTP_DUMMY_USE_PUSH_LIST_FUNC,
+  GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC,
 } GstRtpDummyPushMethod;
 
+typedef enum
+{
+  GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT,
+  GST_RTP_DUMMY_PUSH_AGGREGATE_DROP,
+  GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED,
+  GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH,
+} GstRtpDummyPushAggregateMethod;
+
 typedef struct _GstRtpDummyDepay GstRtpDummyDepay;
 typedef struct _GstRtpDummyDepayClass GstRtpDummyDepayClass;
 
@@ -60,6 +69,10 @@ struct _GstRtpDummyDepay
 
   GstRtpDummyPushMethod push_method;
   guint num_buffers_in_blist;
+
+  GstRtpDummyPushAggregateMethod aggregate_method;
+  guint num_buffers_to_aggregate;
+  guint num_buffers_aggregated;
 };
 
 struct _GstRtpDummyDepayClass
@@ -112,6 +125,8 @@ gst_rtp_dummy_depay_init (GstRtpDummyDepay * depay)
 {
   depay->rtptime = 0;
   depay->num_buffers_in_blist = 1;
+  depay->num_buffers_to_aggregate = 1;
+  depay->num_buffers_aggregated = 0;
 }
 
 static GstRtpDummyDepay *
@@ -180,6 +195,34 @@ gst_rtp_dummy_depay_process (GstRTPBaseDepayload * depayload, GstBuffer * buf)
       gst_rtp_base_depayload_push_list (depayload, blist);
       break;
     }
+    case GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC:
+      ++self->num_buffers_aggregated;
+      if (self->num_buffers_aggregated != self->num_buffers_to_aggregate) {
+        switch (self->aggregate_method) {
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP:
+            gst_rtp_base_depayload_dropped (depayload);
+            break;
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT:
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED:
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH:
+            break;
+        }
+        gst_clear_buffer (&outbuf);
+      } else {
+        switch (self->aggregate_method) {
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED:
+            gst_rtp_base_depayload_delayed (depayload);
+            break;
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH:
+            gst_rtp_base_depayload_flush (depayload, TRUE);
+            break;
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP:
+          case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT:
+            break;
+        }
+        self->num_buffers_aggregated = 0;
+      }
+      break;
     case GST_RTP_DUMMY_RETURN_TO_PUSH:
       break;
   }
@@ -1877,6 +1920,170 @@ GST_START_TEST (rtp_base_depayload_hdr_ext_caps_change)
 
 GST_END_TEST;
 
+static GstFlowReturn
+hdr_ext_aggregate_chain_func (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer)
+{
+  GstFlowReturn res;
+  GstCaps *caps;
+  guint val;
+  GstPad *srcpad;
+  GstElement *depay;
+  static gboolean first = TRUE;
+  static guint expected_caps_val = 0;
+
+  res = gst_check_chain_func (pad, parent, buffer);
+  if (res != GST_FLOW_OK) {
+    return res;
+  }
+
+  caps = gst_pad_get_current_caps (pad);
+
+  fail_unless (gst_structure_get_uint (gst_caps_get_structure (caps, 0),
+          "dummy-hdrext-val", &val));
+
+  srcpad = gst_pad_get_peer (pad);
+  depay = gst_pad_get_parent_element (srcpad);
+
+  switch (GST_RTP_DUMMY_DEPAY (depay)->aggregate_method) {
+    case GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT:
+      /* Every fifth buffer increments "dummy-hdrext-val", but we
+         aggregate 5 buffers per output buffer so we increment for every
+         output buffer. */
+      expected_caps_val++;
+      break;
+    case GST_RTP_DUMMY_PUSH_AGGREGATE_DROP:
+      /* We aggregate 5 buffers per output buffer but drop 4 of them
+         from the buffer cache. */
+      if (g_list_length (buffers) % 5 == 1) {
+        expected_caps_val++;
+      }
+      break;
+    case GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED:
+      /* We aggregate 6 buffers per output buffer but delay the 6th one
+         which will then account to the 2nd output buffer. Thus the 1st
+         output buffer will process 5 header extensions (val increments
+         by one) whereas the 2nd buffer will process 6 (val increments
+         by two)! */
+      if (first) {
+        first = FALSE;
+        expected_caps_val++;
+      } else {
+        expected_caps_val += 2;
+      }
+      break;
+    case GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH:
+      /* We aggregate 5 buffers per output buffer but flush 4 of them
+         from the hdr ext buffer cache. */
+      if (g_list_length (buffers) % 5 == 1) {
+        expected_caps_val++;
+      }
+      break;
+  }
+
+  gst_object_unref (depay);
+  gst_object_unref (srcpad);
+
+  fail_unless_equals_int (expected_caps_val, val);
+
+  gst_caps_unref (caps);
+
+  return res;
+}
+
+static void
+hdr_ext_aggregate_test (gint n_buffers, gint n_aggregate,
+    GstRtpDummyPushAggregateMethod method)
+{
+  GstRTPHeaderExtension *ext;
+  State *state;
+  guint i;
+
+  state = create_depayloader ("application/x-rtp", NULL);
+  gst_rtp_base_depayload_set_aggregate_hdrext_enabled (GST_RTP_BASE_DEPAYLOAD
+      (state->element), TRUE);
+  gst_pad_set_chain_function (state->sinkpad, hdr_ext_aggregate_chain_func);
+  ext = rtp_dummy_hdr_ext_new ();
+  gst_rtp_header_extension_set_id (ext, 1);
+
+  GST_RTP_DUMMY_DEPAY (state->element)->push_method =
+      GST_RTP_DUMMY_USE_PUSH_AGGREGATE_FUNC;
+  GST_RTP_DUMMY_DEPAY (state->element)->num_buffers_to_aggregate = n_aggregate;
+  GST_RTP_DUMMY_DEPAY (state->element)->aggregate_method = method;
+
+  g_signal_emit_by_name (state->element, "add-extension", ext);
+  set_state (state, GST_STATE_PLAYING);
+
+  for (i = 0; i < n_buffers; ++i) {
+    push_rtp_buffer (state, "pts", 0 * GST_SECOND,
+        "rtptime", G_GUINT64_CONSTANT (0x1234), "seq", 0x4242 + i, "hdrext-1",
+        ext, NULL);
+  }
+
+  set_state (state, GST_STATE_NULL);
+  validate_buffers_received (n_buffers / n_aggregate);
+  gst_object_unref (ext);
+  destroy_depayloader (state);
+}
+
+GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate)
+{
+  const gint num_buffers = 30;
+  const gint num_buffers_aggregate = 5; /* must match the modulo from
+                                           hdrext */
+
+  fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
+
+  hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
+      GST_RTP_DUMMY_PUSH_AGGREGATE_DEFAULT);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_drop)
+{
+  const gint num_buffers = 30;
+  const gint num_buffers_aggregate = 5; /* must match the modulo from
+                                           hdrext */
+
+  fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
+
+  hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
+      GST_RTP_DUMMY_PUSH_AGGREGATE_DROP);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_delayed)
+{
+  const gint num_buffers = 12;  /* must be two times
+                                   num_buffers_aggregate */
+  const gint num_buffers_aggregate = 6; /* must match the modulo from
+                                           hdrext + 1 */
+
+  fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
+  fail_unless_equals_int (num_buffers / num_buffers_aggregate, 2);
+
+  hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
+      GST_RTP_DUMMY_PUSH_AGGREGATE_DELAYED);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (rtp_base_depayload_hdr_ext_aggregate_flush)
+{
+  const gint num_buffers = 30;
+  const gint num_buffers_aggregate = 5; /* must match the modulo from
+                                           hdrext */
+
+  fail_unless_equals_int (num_buffers % num_buffers_aggregate, 0);
+
+  hdr_ext_aggregate_test (num_buffers, num_buffers_aggregate,
+      GST_RTP_DUMMY_PUSH_AGGREGATE_FLUSH);
+}
+
+GST_END_TEST;
+
 static Suite *
 rtp_basepayloading_suite (void)
 {
@@ -1921,7 +2128,10 @@ rtp_basepayloading_suite (void)
   tcase_add_test (tc_chain, rtp_base_depayload_multiple_exts);
   tcase_add_test (tc_chain, rtp_base_depayload_caps_request_ignored);
   tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_caps_change);
-
+  tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate);
+  tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_drop);
+  tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_delayed);
+  tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_aggregate_flush);
   return s;
 }
 
diff --git a/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c b/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c
index ca961328d2..9910d9d61d 100644
--- a/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c
+++ b/subprojects/gst-plugins-base/tests/check/libs/rtpdummyhdrextimpl.c
@@ -199,6 +199,7 @@ gst_rtp_dummy_hdr_ext_read (GstRTPHeaderExtension * ext,
 
   if (dummy->read_count % 5 == 1) {
     /* Every fifth buffer triggers caps change. */
+    ++dummy->caps_field_value;
     gst_rtp_header_extension_set_wants_update_non_rtp_src_caps (ext, TRUE);
   }
 
@@ -236,7 +237,7 @@ gst_rtp_dummy_hdr_ext_update_non_rtp_src_caps (GstRTPHeaderExtension * ext,
   GstRTPDummyHdrExt *dummy = GST_RTP_DUMMY_HDR_EXT (ext);
 
   gst_caps_set_simple (caps, "dummy-hdrext-val", G_TYPE_UINT,
-      ++dummy->caps_field_value, NULL);
+      dummy->caps_field_value, NULL);
 
   return TRUE;
 }