From bb6bf7d23bb7b62f20880f5ee7e7df99bcacbd00 Mon Sep 17 00:00:00 2001
From: Thibault Saunier <tsaunier@igalia.com>
Date: Fri, 28 Apr 2023 23:58:31 +0200
Subject: [PATCH] ges: pipeline: Implement context sharing logic

That is the logic from `playbin2`

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer/-/issues/4005

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/7883>
---
 .../gst-editing-services/ges/ges-pipeline.c   | 226 +++++++++++++++++-
 1 file changed, 220 insertions(+), 6 deletions(-)

diff --git a/subprojects/gst-editing-services/ges/ges-pipeline.c b/subprojects/gst-editing-services/ges/ges-pipeline.c
index 690408e7bc..e6c0cdd7d2 100644
--- a/subprojects/gst-editing-services/ges/ges-pipeline.c
+++ b/subprojects/gst-editing-services/ges/ges-pipeline.c
@@ -75,6 +75,8 @@ struct _GESPipelinePrivate
 {
   GESTimeline *timeline;
   GstElement *playsink;
+  GstElement *video_sink;
+  GstElement *audio_sink;
   GstElement *encodebin;
   /* Note : urisink is only created when a URI has been provided */
   GstElement *urisink;
@@ -88,6 +90,9 @@ struct _GESPipelinePrivate
   GstEncodingProfile *profile;
 
   GThread *valid_thread;
+
+  GList *contexts;
+
 };
 
 enum
@@ -210,12 +215,10 @@ ges_pipeline_set_property (GObject * object, guint property_id,
 
   switch (property_id) {
     case PROP_AUDIO_SINK:
-      g_object_set_property (G_OBJECT (self->priv->playsink), "audio-sink",
-          value);
+      ges_pipeline_preview_set_audio_sink (self, g_value_get_object (value));
       break;
     case PROP_VIDEO_SINK:
-      g_object_set_property (G_OBJECT (self->priv->playsink), "video-sink",
-          value);
+      ges_pipeline_preview_set_video_sink (self, g_value_get_object (value));
       break;
     case PROP_TIMELINE:
       ges_pipeline_set_timeline (GES_PIPELINE (object),
@@ -265,6 +268,88 @@ ges_pipeline_constructed (GObject * object)
   ((GObjectClass *) ges_pipeline_parent_class)->constructed (object);
 }
 
+static void
+ges_pipeline_update_context (GESPipeline * self, GstContext * context)
+{
+  GList *l;
+  const gchar *context_type;
+
+  GST_OBJECT_LOCK (self);
+  context_type = gst_context_get_context_type (context);
+  for (l = self->priv->contexts; l; l = l->next) {
+    GstContext *tmp = l->data;
+    const gchar *tmp_type = gst_context_get_context_type (tmp);
+
+    /* Always store newest context but never replace
+     * a persistent one by a non-persistent one */
+    if (strcmp (context_type, tmp_type) == 0 &&
+        (gst_context_is_persistent (context) ||
+            !gst_context_is_persistent (tmp))) {
+      gst_context_replace ((GstContext **) & l->data, context);
+      break;
+    }
+  }
+  /* Not found? Add */
+  if (l == NULL) {
+    self->priv->contexts =
+        g_list_prepend (self->priv->contexts, gst_context_ref (context));
+  }
+  GST_OBJECT_UNLOCK (self);
+}
+
+
+static void
+ges_pipeline_handle_message (GstBin * bin, GstMessage * msg)
+{
+  GESPipeline *self = GES_PIPELINE (bin);
+
+  if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_NEED_CONTEXT) {
+    const gchar *context_type;
+    GList *iter;
+
+    gst_message_parse_context_type (msg, &context_type);
+    GST_OBJECT_LOCK (self);
+    for (iter = self->priv->contexts; iter; iter = g_list_next (iter)) {
+      GstContext *tmp = iter->data;
+      const gchar *tmp_type = gst_context_get_context_type (tmp);
+
+      if (strcmp (context_type, tmp_type) == 0) {
+        GST_DEBUG_OBJECT (msg->src, "Setting context %s %" GST_PTR_FORMAT,
+            tmp_type, tmp);
+        gst_element_set_context (GST_ELEMENT (GST_MESSAGE_SRC (msg)), tmp);
+        break;
+      }
+    }
+    GST_OBJECT_UNLOCK (self);
+
+    /* don't need to post upward this if it's answered by us */
+    if (iter) {
+      gst_message_unref (msg);
+      msg = NULL;
+    }
+  } else if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_HAVE_CONTEXT) {
+    GstContext *context;
+
+    gst_message_parse_have_context (msg, &context);
+    ges_pipeline_update_context (self, context);
+    gst_context_unref (context);
+  }
+
+  if (msg) {
+    GST_BIN_CLASS (ges_pipeline_parent_class)->handle_message (bin, msg);
+  }
+}
+
+static void
+ges_pipeline_finalize (GObject * object)
+{
+  GESPipeline *self = GES_PIPELINE (object);
+
+  g_list_free_full (self->priv->contexts, (GDestroyNotify) gst_context_unref);
+
+  G_OBJECT_CLASS (ges_pipeline_parent_class)->finalize (object);
+}
+
 static void
 ges_pipeline_dispose (GObject * object)
 {
@@ -277,6 +362,8 @@ ges_pipeline_dispose (GObject * object)
       gst_object_unref (self->priv->playsink);
     self->priv->playsink = NULL;
   }
+  gst_clear_object ((GstObject **) & self->priv->video_sink);
+  gst_clear_object ((GstObject **) & self->priv->audio_sink);
 
   if (self->priv->encodebin) {
     if (self->priv->mode & (GES_PIPELINE_MODE_RENDER |
@@ -308,6 +395,7 @@ ges_pipeline_class_init (GESPipelineClass * klass)
 {
   GObjectClass *object_class = G_OBJECT_CLASS (klass);
   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
+  GstBinClass *bin_klass = (GstBinClass *) klass;
 
   GST_DEBUG_CATEGORY_INIT (ges_pipeline_debug, "gespipeline",
       GST_DEBUG_FG_YELLOW, "ges pipeline");
@@ -315,6 +403,7 @@ ges_pipeline_class_init (GESPipelineClass * klass)
 
   object_class->constructed = ges_pipeline_constructed;
   object_class->dispose = ges_pipeline_dispose;
+  object_class->finalize = ges_pipeline_finalize;
   object_class->get_property = ges_pipeline_get_property;
   object_class->set_property = ges_pipeline_set_property;
 
@@ -395,6 +484,7 @@ ges_pipeline_class_init (GESPipelineClass * klass)
   g_object_class_install_properties (object_class, PROP_LAST, properties);
 
   element_class->change_state = GST_DEBUG_FUNCPTR (ges_pipeline_change_state);
+  bin_klass->handle_message = GST_DEBUG_FUNCPTR (ges_pipeline_handle_message);
 
   /* TODO : Add state_change handlers
    * Don't change state if we don't have a timeline */
@@ -605,6 +695,22 @@ ges_pipeline_change_state (GstElement * element, GstStateChange transition)
   self = GES_PIPELINE (element);
 
   switch (transition) {
+    case GST_STATE_CHANGE_NULL_TO_READY:
+    {
+      if (self->priv->video_sink == NULL) {
+        GstElement *tmpsink =
+            gst_element_factory_make ("autovideosink", "internal-sink");
+        ges_pipeline_preview_set_video_sink (self, g_object_ref_sink (tmpsink));
+        gst_object_unref (tmpsink);
+      }
+      if (self->priv->audio_sink == NULL) {
+        GstElement *tmpsink =
+            gst_element_factory_make ("autoaudiosink", "internal-sink");
+        ges_pipeline_preview_set_audio_sink (self, g_object_ref_sink (tmpsink));
+        gst_object_unref (tmpsink);
+      }
+      break;
+    }
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       if (G_UNLIKELY (self->priv->timeline == NULL)) {
         GST_ERROR_OBJECT (element,
@@ -1474,6 +1580,57 @@ ges_pipeline_preview_get_video_sink (GESPipeline * self)
   return sink;
 };
 
+static GstBusSyncReply
+activate_sink_bus_handler (GstBus * bus, GstMessage * msg, GESPipeline * self)
+{
+  if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR) {
+    /* Only proxy errors from a fixed sink. If that fails we can just error out
+     * early as stuff will fail later anyway */
+    if (self->priv->video_sink
+        && gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
+            GST_OBJECT_CAST (self->priv->video_sink)))
+      gst_element_post_message (GST_ELEMENT_CAST (self), msg);
+    else
+      gst_message_unref (msg);
+  } else if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_NEED_CONTEXT) {
+    const gchar *context_type;
+    GList *l;
+
+    gst_message_parse_context_type (msg, &context_type);
+    GST_OBJECT_LOCK (self);
+    for (l = self->priv->contexts; l; l = l->next) {
+      GstContext *tmp = l->data;
+      const gchar *tmp_type = gst_context_get_context_type (tmp);
+
+      if (strcmp (context_type, tmp_type) == 0) {
+        gst_element_set_context (GST_ELEMENT (GST_MESSAGE_SRC (msg)), l->data);
+        break;
+      }
+    }
+    GST_OBJECT_UNLOCK (self);
+
+    /* Forward if we couldn't answer the message */
+    if (l == NULL) {
+      gst_element_post_message (GST_ELEMENT_CAST (self), msg);
+    } else {
+      gst_message_unref (msg);
+    }
+  } else if (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_HAVE_CONTEXT) {
+    GstContext *context;
+
+    gst_message_parse_have_context (msg, &context);
+    ges_pipeline_update_context (self, context);
+    gst_context_unref (context);
+
+    gst_element_post_message (GST_ELEMENT_CAST (self), msg);
+  } else {
+    gst_element_post_message (GST_ELEMENT_CAST (self), msg);
+  }
+
+  /* Doesn't really matter, nothing is using this bus */
+  return GST_BUS_DROP;
+}
+
 /**
  * ges_pipeline_preview_set_video_sink:
  * @self: A #GESPipeline in #GST_STATE_NULL
@@ -1484,10 +1641,39 @@ ges_pipeline_preview_get_video_sink (GESPipeline * self)
 void
 ges_pipeline_preview_set_video_sink (GESPipeline * self, GstElement * sink)
 {
+  GstStateChangeReturn sret;
+
   g_return_if_fail (GES_IS_PIPELINE (self));
   CHECK_THREAD (self);
 
-  g_object_set (self->priv->playsink, "video-sink", sink, NULL);
+  gst_object_replace ((GstObject **) & self->priv->video_sink,
+      (GstObject *) sink);
+
+  g_object_set (self->priv->playsink, "video-sink", self->priv->video_sink,
+      NULL);
+
+  if (!sink)
+    return;
+
+  GstBus *bus = gst_bus_new ();
+  if (!GST_OBJECT_PARENT (sink)) {
+    gst_bus_set_sync_handler (bus,
+        (GstBusSyncHandler) activate_sink_bus_handler, self, NULL);
+    gst_element_set_bus (sink, bus);
+  }
+
+  sret = gst_element_set_state (sink, GST_STATE_READY);
+  if (sret == GST_STATE_CHANGE_FAILURE) {
+    GST_ERROR_OBJECT (self, "Could not activate sink");
+
+    goto done;
+  }
+
+done:
+  if (bus) {
+    gst_element_set_bus (sink, NULL);
+    gst_object_unref (bus);
+  }
 };
 
 /**
@@ -1521,8 +1707,36 @@ ges_pipeline_preview_get_audio_sink (GESPipeline * self)
 void
 ges_pipeline_preview_set_audio_sink (GESPipeline * self, GstElement * sink)
 {
+  GstStateChangeReturn sret;
+
   g_return_if_fail (GES_IS_PIPELINE (self));
   CHECK_THREAD (self);
 
-  g_object_set (self->priv->playsink, "audio-sink", sink, NULL);
+  gst_object_replace ((GstObject **) & self->priv->audio_sink,
+      (GstObject *) sink);
+  g_object_set (self->priv->playsink, "audio-sink", self->priv->audio_sink,
+      NULL);
+
+  if (!sink)
+    return;
+
+  GstBus *bus = gst_bus_new ();
+  if (!GST_OBJECT_PARENT (sink)) {
+    gst_bus_set_sync_handler (bus,
+        (GstBusSyncHandler) activate_sink_bus_handler, self, NULL);
+    gst_element_set_bus (sink, bus);
+  }
+
+  sret = gst_element_set_state (sink, GST_STATE_READY);
+  if (sret == GST_STATE_CHANGE_FAILURE) {
+    GST_ERROR_OBJECT (self, "Could not activate sink");
+
+    goto done;
+  }
+
+done:
+  if (bus) {
+    gst_element_set_bus (sink, NULL);
+    gst_object_unref (bus);
+  }
 };