diff --git a/ext/dash/gstdashdemux.c b/ext/dash/gstdashdemux.c index 42e11b54cf..1f562b4959 100644 --- a/ext/dash/gstdashdemux.c +++ b/ext/dash/gstdashdemux.c @@ -253,7 +253,7 @@ gst_dash_demux_stream_get_fragment_waiting_time (GstAdaptiveDemuxStream * static void gst_dash_demux_advance_period (GstAdaptiveDemux * demux); static gboolean gst_dash_demux_has_next_period (GstAdaptiveDemux * demux); static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream); + GstAdaptiveDemuxStream * stream, GstBuffer * buffer); static GstFlowReturn gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); @@ -631,6 +631,8 @@ gst_dash_demux_setup_all_streams (GstDashDemux * demux) } gst_isoff_sidx_parser_init (&stream->sidx_parser); + if (gst_mpd_client_has_isoff_ondemand_profile (demux->client)) + stream->sidx_adapter = gst_adapter_new (); } return TRUE; @@ -1301,6 +1303,7 @@ gst_dash_demux_stream_select_bitrate (GstAdaptiveDemuxStream * stream, /* if we switched, we need a new index */ gst_isoff_sidx_parser_clear (&dashstream->sidx_parser); gst_isoff_sidx_parser_init (&dashstream->sidx_parser); + gst_adapter_clear (dashstream->sidx_adapter); } } @@ -1388,6 +1391,8 @@ gst_dash_demux_seek (GstAdaptiveDemux * demux, GstEvent * seek) if (flags & GST_SEEK_FLAG_FLUSH) { gst_isoff_sidx_parser_clear (&dashstream->sidx_parser); gst_isoff_sidx_parser_init (&dashstream->sidx_parser); + if (dashstream->sidx_adapter) + gst_adapter_clear (dashstream->sidx_adapter); } gst_dash_demux_stream_seek (iter->data, rate >= 0, 0, target_pos, NULL); } @@ -1631,24 +1636,25 @@ gst_dash_demux_stream_fragment_finished (GstAdaptiveDemux * demux, static GstFlowReturn gst_dash_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream) + GstAdaptiveDemuxStream * stream, GstBuffer * buffer) { GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream; GstDashDemux *dashdemux = GST_DASH_DEMUX_CAST (demux); GstFlowReturn ret = GST_FLOW_OK; - GstBuffer *buffer; - gsize available; if (!gst_mpd_client_has_isoff_ondemand_profile (dashdemux->client)) - return GST_ADAPTIVE_DEMUX_CLASS (parent_class)->data_received (demux, - stream); + return gst_adaptive_demux_stream_push_buffer (stream, buffer); + + gst_adapter_push (dash_stream->sidx_adapter, buffer); + buffer = NULL; if (stream->downloading_index) { GstIsoffParserResult res; guint consumed; + gsize available; - available = gst_adapter_available (stream->adapter); - buffer = gst_adapter_take_buffer (stream->adapter, available); + available = gst_adapter_available (dash_stream->sidx_adapter); + buffer = gst_adapter_take_buffer (dash_stream->sidx_adapter, available); if (dash_stream->sidx_parser.status != GST_ISOFF_SIDX_PARSER_FINISHED) { res = @@ -1677,23 +1683,24 @@ gst_dash_demux_data_received (GstAdaptiveDemux * demux, /* we still need to keep some data around for the next parsing round * so just push what was already processed by the parser */ pending = _gst_buffer_split (buffer, consumed, -1); - gst_adapter_push (stream->adapter, pending); + gst_adapter_push (dash_stream->sidx_adapter, pending); } } } ret = gst_adaptive_demux_stream_push_buffer (stream, buffer); } else if (dash_stream->sidx_parser.status == GST_ISOFF_SIDX_PARSER_FINISHED) { + gsize available; while (ret == GST_FLOW_OK - && ((available = gst_adapter_available (stream->adapter)) > 0)) { + && ((available = gst_adapter_available (dash_stream->sidx_adapter)) > 0)) { gboolean advance = FALSE; if (available < dash_stream->sidx_current_remaining) { - buffer = gst_adapter_take_buffer (stream->adapter, available); + buffer = gst_adapter_take_buffer (dash_stream->sidx_adapter, available); dash_stream->sidx_current_remaining -= available; } else { buffer = - gst_adapter_take_buffer (stream->adapter, + gst_adapter_take_buffer (dash_stream->sidx_adapter, dash_stream->sidx_current_remaining); dash_stream->sidx_current_remaining = 0; advance = TRUE; @@ -1714,8 +1721,8 @@ gst_dash_demux_data_received (GstAdaptiveDemux * demux, /* this should be the main header, just push it all */ ret = gst_adaptive_demux_stream_push_buffer (stream, - gst_adapter_take_buffer (stream->adapter, - gst_adapter_available (stream->adapter))); + gst_adapter_take_buffer (dash_stream->sidx_adapter, + gst_adapter_available (dash_stream->sidx_adapter))); } return ret; @@ -1727,6 +1734,8 @@ gst_dash_demux_stream_free (GstAdaptiveDemuxStream * stream) GstDashDemuxStream *dash_stream = (GstDashDemuxStream *) stream; gst_isoff_sidx_parser_clear (&dash_stream->sidx_parser); + if (dash_stream->sidx_adapter) + g_object_unref (dash_stream->sidx_adapter); } static GstDashDemuxClockDrift * diff --git a/ext/dash/gstdashdemux.h b/ext/dash/gstdashdemux.h index 5ed96205b3..326d6f9619 100644 --- a/ext/dash/gstdashdemux.h +++ b/ext/dash/gstdashdemux.h @@ -67,6 +67,7 @@ struct _GstDashDemuxStream GstMediaFragmentInfo current_fragment; /* index parsing */ + GstAdapter *sidx_adapter; GstSidxParser sidx_parser; gsize sidx_current_remaining; gint sidx_index; diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c index 43ecdb79bb..fed487ae3e 100644 --- a/ext/hls/gsthlsdemux.c +++ b/ext/hls/gsthlsdemux.c @@ -92,7 +92,7 @@ gst_hls_demux_start_fragment (GstAdaptiveDemux * demux, static GstFlowReturn gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream); + GstAdaptiveDemuxStream * stream, GstBuffer * buffer); static gboolean gst_hls_demux_stream_has_next_fragment (GstAdaptiveDemuxStream * stream); static GstFlowReturn gst_hls_demux_advance_fragment (GstAdaptiveDemuxStream * @@ -114,6 +114,7 @@ gst_hls_demux_finalize (GObject * obj) GstHLSDemux *demux = GST_HLS_DEMUX (obj); gst_hls_demux_reset (GST_ADAPTIVE_DEMUX_CAST (demux)); + g_object_unref (demux->pending_encrypted_data); gst_m3u8_client_free (demux->client); G_OBJECT_CLASS (parent_class)->finalize (obj); @@ -172,6 +173,7 @@ static void gst_hls_demux_init (GstHLSDemux * demux) { demux->do_typefind = TRUE; + demux->pending_encrypted_data = gst_adapter_new (); } static GstStateChangeReturn @@ -520,6 +522,7 @@ key_failed: } } +/* Handles decrypted buffers only */ static GstFlowReturn gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer, gboolean force) @@ -532,6 +535,10 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, guint buffer_size; GstTypeFindProbability prob = GST_TYPE_FIND_NONE; + if (hlsdemux->pending_typefind_buffer) + buffer = gst_buffer_append (hlsdemux->pending_typefind_buffer, buffer); + hlsdemux->pending_typefind_buffer = NULL; + gst_buffer_map (buffer, &info, GST_MAP_READ); buffer_size = info.size; @@ -553,11 +560,7 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, gst_buffer_unref (buffer); return GST_FLOW_NOT_NEGOTIATED; } else { - if (hlsdemux->pending_buffer) - hlsdemux->pending_buffer = - gst_buffer_append (buffer, hlsdemux->pending_buffer); - else - hlsdemux->pending_buffer = buffer; + hlsdemux->pending_typefind_buffer = buffer; return GST_FLOW_OK; } } @@ -569,6 +572,8 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, hlsdemux->do_typefind = FALSE; } + g_assert (hlsdemux->pending_typefind_buffer == NULL); + if (buffer) return gst_adaptive_demux_stream_push_buffer (stream, buffer); return GST_FLOW_OK; @@ -584,35 +589,30 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, if (hlsdemux->current_key) gst_hls_demux_decrypt_end (hlsdemux); - /* ideally this should be empty, but this eos might have been - * caused by an error on the source element */ - GST_DEBUG_OBJECT (demux, "Data still on the adapter when EOS was received" - ": %" G_GSIZE_FORMAT, gst_adapter_available (stream->adapter)); - gst_adapter_clear (stream->adapter); - if (stream->last_ret == GST_FLOW_OK) { - if (hlsdemux->pending_buffer) { + if (hlsdemux->pending_decrypted_buffer) { if (hlsdemux->current_key) { GstMapInfo info; gssize unpadded_size; /* Handle pkcs7 unpadding here */ - gst_buffer_map (hlsdemux->pending_buffer, &info, GST_MAP_READ); + gst_buffer_map (hlsdemux->pending_decrypted_buffer, &info, + GST_MAP_READ); unpadded_size = info.size - info.data[info.size - 1]; - gst_buffer_unmap (hlsdemux->pending_buffer, &info); + gst_buffer_unmap (hlsdemux->pending_decrypted_buffer, &info); - gst_buffer_resize (hlsdemux->pending_buffer, 0, unpadded_size); + gst_buffer_resize (hlsdemux->pending_decrypted_buffer, 0, + unpadded_size); } ret = - gst_hls_demux_handle_buffer (demux, stream, hlsdemux->pending_buffer, - TRUE); - hlsdemux->pending_buffer = NULL; + gst_hls_demux_handle_buffer (demux, stream, + hlsdemux->pending_decrypted_buffer, TRUE); + hlsdemux->pending_decrypted_buffer = NULL; } } else { - if (hlsdemux->pending_buffer) - gst_buffer_unref (hlsdemux->pending_buffer); - hlsdemux->pending_buffer = NULL; + gst_buffer_replace (&hlsdemux->pending_decrypted_buffer, NULL); + gst_adapter_clear (hlsdemux->pending_encrypted_data); } if (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED) @@ -623,27 +623,27 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, static GstFlowReturn gst_hls_demux_data_received (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream) + GstAdaptiveDemuxStream * stream, GstBuffer * buffer) { GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); - gsize available; - GstBuffer *buffer = NULL; - - available = gst_adapter_available (stream->adapter); /* Is it encrypted? */ if (hlsdemux->current_key) { GError *err = NULL; + gsize size; GstBuffer *tmp_buffer; - /* must be a multiple of 16 */ - available = available & (~0xF); + gst_adapter_push (hlsdemux->pending_encrypted_data, buffer); + size = gst_adapter_available (hlsdemux->pending_encrypted_data); - if (available == 0) { + /* must be a multiple of 16 */ + size = size & (~0xF); + + if (size == 0) { return GST_FLOW_OK; } - buffer = gst_adapter_take_buffer (stream->adapter, available); + buffer = gst_adapter_take_buffer (hlsdemux->pending_encrypted_data, size); buffer = gst_hls_demux_decrypt_fragment (hlsdemux, buffer, &err); if (buffer == NULL) { GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Failed to decrypt buffer"), @@ -652,15 +652,9 @@ gst_hls_demux_data_received (GstAdaptiveDemux * demux, return GST_FLOW_ERROR; } - tmp_buffer = hlsdemux->pending_buffer; - hlsdemux->pending_buffer = buffer; + tmp_buffer = hlsdemux->pending_decrypted_buffer; + hlsdemux->pending_decrypted_buffer = buffer; buffer = tmp_buffer; - } else { - buffer = gst_adapter_take_buffer (stream->adapter, available); - if (hlsdemux->pending_buffer) { - buffer = gst_buffer_append (hlsdemux->pending_buffer, buffer); - hlsdemux->pending_buffer = NULL; - } } return gst_hls_demux_handle_buffer (demux, stream, buffer, FALSE); @@ -779,9 +773,9 @@ gst_hls_demux_reset (GstAdaptiveDemux * ademux) demux->client = gst_m3u8_client_new ("", NULL); demux->srcpad_counter = 0; - if (demux->pending_buffer) - gst_buffer_unref (demux->pending_buffer); - demux->pending_buffer = NULL; + gst_adapter_clear (demux->pending_encrypted_data); + gst_buffer_replace (&demux->pending_decrypted_buffer, NULL); + gst_buffer_replace (&demux->pending_typefind_buffer, NULL); if (demux->current_key) { g_free (demux->current_key); demux->current_key = NULL; diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h index 1154236f6e..da8406918d 100644 --- a/ext/hls/gsthlsdemux.h +++ b/ext/hls/gsthlsdemux.h @@ -84,11 +84,10 @@ struct _GstHLSDemux #endif gchar *current_key; guint8 *current_iv; - GstBuffer *pending_buffer; /* decryption scenario: - * the last buffer can only be pushed when - * resized, so need to store and wait for - * EOS to know it is the last */ - + GstAdapter *pending_encrypted_data; /* for chunking data into 16 byte multiples for decryption */ + GstBuffer *pending_decrypted_buffer; /* last decrypted buffer for pkcs7 unpadding. + We only know that it is the last at EOS */ + GstBuffer *pending_typefind_buffer; /* for collecting data until typefind succeeds */ gboolean reset_pts; }; diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c index c0215773f8..4e1f2b9593 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.c +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.c @@ -273,7 +273,7 @@ gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream * stream, GstFlowReturn ret, GError * err); static GstFlowReturn gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream); + GstAdaptiveDemuxStream * stream, GstBuffer * buffer); static GstFlowReturn gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); @@ -1117,7 +1117,6 @@ gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad) gst_segment_init (&stream->segment, GST_FORMAT_TIME); g_cond_init (&stream->fragment_download_cond); g_mutex_init (&stream->fragment_download_lock); - stream->adapter = gst_adapter_new (); demux->next_streams = g_list_append (demux->next_streams, stream); @@ -1221,8 +1220,6 @@ gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream) g_clear_pointer (&stream->pending_tags, gst_tag_list_unref); - g_object_unref (stream->adapter); - g_free (stream); } @@ -1722,7 +1719,6 @@ gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux) stream->download_error_count = 0; stream->need_header = TRUE; - gst_adapter_clear (stream->adapter); } } @@ -2019,12 +2015,8 @@ gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux, */ static GstFlowReturn gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux, - GstAdaptiveDemuxStream * stream) + GstAdaptiveDemuxStream * stream, GstBuffer * buffer) { - GstBuffer *buffer; - - buffer = gst_adapter_take_buffer (stream->adapter, - gst_adapter_available (stream->adapter)); return gst_adaptive_demux_stream_push_buffer (stream, buffer); } @@ -2124,12 +2116,10 @@ _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) stream->download_chunk_start_time; stream->download_total_bytes += gst_buffer_get_size (buffer); - gst_adapter_push (stream->adapter, buffer); - GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT - ". Now %" G_GSIZE_FORMAT " on adapter", gst_buffer_get_size (buffer), - gst_adapter_available (stream->adapter)); + GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT, + gst_buffer_get_size (buffer)); - ret = klass->data_received (demux, stream); + ret = klass->data_received (demux, stream, buffer); if (ret == GST_FLOW_FLUSHING) { /* do not make any changes if the stream is cancelled */ @@ -3302,7 +3292,6 @@ gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux, if (gst_adaptive_demux_stream_select_bitrate (demux, stream, gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) { stream->need_header = TRUE; - gst_adapter_clear (stream->adapter); ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH; } diff --git a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h index 8fcc47ab06..3db54bb44a 100644 --- a/gst-libs/gst/adaptivedemux/gstadaptivedemux.h +++ b/gst-libs/gst/adaptivedemux/gstadaptivedemux.h @@ -120,8 +120,6 @@ struct _GstAdaptiveDemuxStream GstSegment segment; - GstAdapter *adapter; - GstCaps *pending_caps; GstEvent *pending_segment; GstTagList *pending_tags; @@ -385,13 +383,14 @@ struct _GstAdaptiveDemuxClass * data_received: * @demux: #GstAdaptiveDemux * @stream: #GstAdaptiveDemuxStream + * @buffer: #GstBuffer * * Notifies the subclass that a fragment chunk was downloaded. The subclass - * can look at the data at the adapter and modify/push data as desired. + * can look at the data and modify/push data as desired. * * Returns: #GST_FLOW_OK if successful, #GST_FLOW_ERROR in case of error. */ - GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream); + GstFlowReturn (*data_received) (GstAdaptiveDemux * demux, GstAdaptiveDemuxStream * stream, GstBuffer * buffer); /** * get_live_seek_range: