From aa58a70d6676f9bc394780a90a39ff47d538fa68 Mon Sep 17 00:00:00 2001
From: Florin Apostol <florin.apostol@oregan.net>
Date: Tue, 16 Feb 2016 14:44:27 +0000
Subject: [PATCH] adaptivedemux: use realtime_clock for waiting for a condition

There are several places in adaptivedemux where it waits for
time to pass, for example to wait until it should next download
a fragment. The problem with this approach is that it means that
unit tests are forced to execute in realtime.

This commit replaces the use of g_cond_wait_until() with single
shot GstClockID that signals the condition variable. Under normal
usage, this behaves exactly as before. A unit test can replace the
system clock with a GstTestClock, allowing the test to control the
timing in adaptivedemux.

https://bugzilla.gnome.org/show_bug.cgi?id=762147
---
 gst-libs/gst/adaptivedemux/gstadaptivedemux.c | 103 ++++++++++++++----
 1 file changed, 80 insertions(+), 23 deletions(-)

diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
index df5ace396b..9c2bb444a2 100644
--- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
+++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c
@@ -201,6 +201,14 @@ struct _GstAdaptiveDemuxPrivate
   GMutex segment_lock;
 };
 
+typedef struct _GstAdaptiveDemuxTimer
+{
+  GCond *cond;
+  GMutex *mutex;
+  GstClockID clock_id;
+  gboolean fired;
+} GstAdaptiveDemuxTimer;
+
 static GstBinClass *parent_class = NULL;
 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
@@ -271,6 +279,11 @@ gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
 static GstFlowReturn
 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
     GstAdaptiveDemuxStream * stream, GstClockTime duration);
+static gboolean
+gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
+    GstClockTime end_time);
+static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
+    GstClockTime time, GstClockID id, gpointer user_data);
 
 
 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
@@ -2701,8 +2714,7 @@ static void
 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
 {
   GstAdaptiveDemux *demux = stream->demux;
-  guint64 next_download =
-      GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
+  GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
   GstFlowReturn ret;
   gboolean live;
 
@@ -2844,9 +2856,8 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
       gint64 wait_time =
           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
       if (wait_time > 0) {
-        gint64 end_time =
-            GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
-            + wait_time / GST_USECOND;
+        GstClockTime end_time =
+            gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
 
         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
             GST_TIME_ARGS (wait_time));
@@ -2860,8 +2871,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
           stream->last_ret = GST_FLOW_FLUSHING;
           goto cancelled;
         }
-        g_cond_wait_until (&stream->fragment_download_cond,
-            &stream->fragment_download_lock, end_time);
+        gst_adaptive_demux_wait_until (demux->realtime_clock,
+            &stream->fragment_download_cond, &stream->fragment_download_lock,
+            end_time);
         g_mutex_unlock (&stream->fragment_download_lock);
 
         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
@@ -2880,8 +2892,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
 
     stream->last_ret = GST_FLOW_OK;
 
-    next_download =
-        GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
+    next_download = gst_adaptive_demux_get_monotonic_time (demux);
     ret = gst_adaptive_demux_stream_download_fragment (stream);
 
     if (ret == GST_FLOW_FLUSHING) {
@@ -2967,9 +2978,7 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
         }
 
         /* Wait half the fragment duration before retrying */
-        next_download +=
-            gst_util_uint64_scale
-            (stream->fragment.duration, G_USEC_PER_SEC, 2 * GST_SECOND);
+        next_download += stream->fragment.duration / 2;
 
         GST_MANIFEST_UNLOCK (demux);
 
@@ -2980,8 +2989,9 @@ gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
           stream->last_ret = GST_FLOW_FLUSHING;
           goto cancelled;
         }
-        g_cond_wait_until (&stream->fragment_download_cond,
-            &stream->fragment_download_lock, next_download);
+        gst_adaptive_demux_wait_until (demux->realtime_clock,
+            &stream->fragment_download_cond, &stream->fragment_download_lock,
+            next_download);
         g_mutex_unlock (&stream->fragment_download_lock);
 
         GST_DEBUG_OBJECT (demux, "Retrying now");
@@ -3062,7 +3072,7 @@ download_error:
 static void
 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
 {
-  gint64 next_update;
+  GstClockTime next_update;
   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
 
   /* Loop for updating of the playlist. This periodically checks if
@@ -3075,8 +3085,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
   GST_MANIFEST_LOCK (demux);
 
   next_update =
-      GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
-      klass->get_manifest_update_interval (demux);
+      gst_adaptive_demux_get_monotonic_time (demux) +
+      klass->get_manifest_update_interval (demux) * GST_USECOND;
 
   /* Updating playlist only needed for live playlists */
   while (gst_adaptive_demux_is_live (demux)) {
@@ -3092,7 +3102,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
       g_mutex_unlock (&demux->priv->updates_timed_lock);
       goto quit;
     }
-    g_cond_wait_until (&demux->priv->updates_timed_cond,
+    gst_adaptive_demux_wait_until (demux->realtime_clock,
+        &demux->priv->updates_timed_cond,
         &demux->priv->updates_timed_lock, next_update);
     g_mutex_unlock (&demux->priv->updates_timed_lock);
 
@@ -3116,9 +3127,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
       demux->priv->update_failed_count++;
       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
         GST_WARNING_OBJECT (demux, "Could not update the playlist");
-        next_update =
-            GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux))
-            + klass->get_manifest_update_interval (demux);
+        next_update = gst_adaptive_demux_get_monotonic_time (demux)
+            + klass->get_manifest_update_interval (demux) * GST_USECOND;
       } else {
         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
             (_("Internal data stream error.")), ("Could not update playlist"));
@@ -3130,8 +3140,8 @@ gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
     } else {
       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
       next_update =
-          GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) +
-          klass->get_manifest_update_interval (demux);
+          gst_adaptive_demux_get_monotonic_time (demux) +
+          klass->get_manifest_update_interval (demux) * GST_USECOND;
 
       /* Wake up download tasks */
       g_mutex_lock (&demux->priv->manifest_update_lock);
@@ -3490,3 +3500,50 @@ gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
   gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
   return g_date_time_new_from_timeval_utc (&gtv);
 }
+
+static gboolean
+gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
+    GstClockTime end_time)
+{
+  GstAdaptiveDemuxTimer timer;
+  GstClockReturn res;
+
+  if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
+    /* for an invalid time, gst_clock_id_wait_async will try to call
+     * gst_adaptive_demux_clock_callback from the current thread.
+     * It still holds the mutex while doing that, so it will deadlock.
+     * g_cond_wait_until would return immediately with false, so we'll do the same.
+     */
+    return FALSE;
+  }
+  timer.fired = FALSE;
+  timer.cond = cond;
+  timer.mutex = mutex;
+  timer.clock_id = gst_clock_new_single_shot_id (clock, end_time);
+  res =
+      gst_clock_id_wait_async (timer.clock_id,
+      gst_adaptive_demux_clock_callback, &timer, NULL);
+  /* clock does not support asynchronously wait. Assert and return */
+  if (res == GST_CLOCK_UNSUPPORTED) {
+    gst_clock_id_unref (timer.clock_id);
+    g_return_val_if_reached (TRUE);
+  }
+  /* the gst_adaptive_demux_clock_callback will signal the
+     cond when the clock's single shot timer fires */
+  g_cond_wait (cond, mutex);
+  gst_clock_id_unref (timer.clock_id);
+  return !timer.fired;
+}
+
+static gboolean
+gst_adaptive_demux_clock_callback (GstClock * clock,
+    GstClockTime time, GstClockID id, gpointer user_data)
+{
+  GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
+  g_return_val_if_fail (timer != NULL, FALSE);
+  g_mutex_lock (timer->mutex);
+  timer->fired = TRUE;
+  g_cond_signal (timer->cond);
+  g_mutex_unlock (timer->mutex);
+  return TRUE;
+}