diff --git a/gst/mpegtsdemux/tsdemux.c b/gst/mpegtsdemux/tsdemux.c index 39106a3846..aab4e04973 100644 --- a/gst/mpegtsdemux/tsdemux.c +++ b/gst/mpegtsdemux/tsdemux.c @@ -54,9 +54,6 @@ #define TABLE_ID_UNSET 0xFF -/* Size of the pendingbuffers array. */ -#define TS_MAX_PENDING_BUFFERS 256 - #define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024)) /* small PCR for wrap detection */ #define PCR_SMALL 17775000 @@ -122,19 +119,16 @@ struct _TSDemuxStream /* Output data */ PendingPacketState state; - /* Pending buffers array. */ - /* These buffers are stored in this array until the PES header (if needed) - * is succesfully parsed. */ - GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS]; - guint8 nbpending; + + /* Data to push (allocated) */ + guint8 *data; /* Size of data to push (if known) */ guint expected_size; + /* Size of currently queued data */ guint current_size; - - /* Current data to be pushed out */ - GList *currentlist; + guint allocated_size; /* Current PTS/DTS for this stream */ GstClockTime pts; @@ -1022,23 +1016,16 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream) static void gst_ts_demux_stream_flush (TSDemuxStream * stream) { - gint i; - stream->pts = GST_CLOCK_TIME_NONE; - for (i = 0; i < stream->nbpending; i++) - gst_buffer_unref (stream->pendingbuffers[i]); - memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); - stream->nbpending = 0; - - if (stream->currentlist) { - g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL); - g_list_free (stream->currentlist); - stream->currentlist = NULL; - } + GST_DEBUG ("flushing stream %p", stream); + if (stream->data) + g_free (stream->data); + stream->data = NULL; stream->state = PENDING_PACKET_EMPTY; stream->expected_size = 0; + stream->allocated_size = 0; stream->current_size = 0; stream->need_newsegment = TRUE; stream->pts = GST_CLOCK_TIME_NONE; @@ -1231,27 +1218,18 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream, } } -static GstFlowReturn -gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream) +static void +gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream, + guint8 * data, guint32 length, guint64 bufferoffset) { MpegTSBase *base = (MpegTSBase *) demux; PESHeader header; - GstBuffer *buf; - GstFlowReturn res = GST_FLOW_OK; gint offset = 0; - GstMapInfo map; - guint64 bufferoffset; PESParsingResult parseres; - buf = stream->pendingbuffers[0] = - gst_buffer_make_writable (stream->pendingbuffers[0]); - gst_buffer_map (buf, &map, GST_MAP_READ); - bufferoffset = GST_BUFFER_OFFSET (buf); + GST_MEMDUMP ("Header buffer", data, MIN (length, 32)); - GST_MEMDUMP ("Header buffer", map.data, MIN (map.size, 32)); - - parseres = mpegts_parse_pes_header (map.data, map.size, &header, &offset); - gst_buffer_unmap (buf, &map); + parseres = mpegts_parse_pes_header (data, length, &header, &offset); if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE)) goto discont; if (G_UNLIKELY (parseres == PES_PARSING_BAD)) { @@ -1271,45 +1249,32 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream) GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (header.DTS))); - { - MpegTSPacketizer2 *packetizer = base->packetizer; - - GST_BUFFER_DTS (buf) = - mpegts_packetizer_pts_to_ts (packetizer, stream->dts); - GST_BUFFER_PTS (buf) = - mpegts_packetizer_pts_to_ts (packetizer, stream->pts); - } - GST_DEBUG ("buf %" GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf))); } /* Remove PES headers */ - GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%" - G_GSIZE_FORMAT ")", header.header_size, header.packet_length, map.size); + GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)", + header.header_size, header.packet_length, length); stream->expected_size = header.packet_length; - gst_buffer_resize (buf, header.header_size, map.size - header.header_size); + data += header.header_size; + length -= header.header_size; - /* FIXME : responsible for switching to PENDING_PACKET_BUFFER and - * creating the bufferlist */ - if (1) { - /* Append to the buffer list */ - if (G_UNLIKELY (stream->currentlist == NULL)) { - guint8 i; + /* Create the output buffer */ + if (stream->expected_size) + stream->allocated_size = stream->expected_size; + else + stream->allocated_size = 8192; + g_assert (stream->data == NULL); + stream->data = g_malloc (stream->allocated_size); + memcpy (stream->data, data, length); + stream->current_size = length; - /* Push pending buffers into the list */ - for (i = stream->nbpending; i; i--) - stream->currentlist = - g_list_prepend (stream->currentlist, stream->pendingbuffers[i - 1]); - memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); - stream->nbpending = 0; - } - stream->state = PENDING_PACKET_BUFFER; - } + stream->state = PENDING_PACKET_BUFFER; - return res; + return; discont: stream->state = PENDING_PACKET_DISCONT; - return res; + return; } /* ONLY CALL THIS: @@ -1320,17 +1285,13 @@ static inline void gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream, MpegTSPacketizerPacket * packet) { - GstBuffer *buf; + guint8 *data; + guint size; GST_DEBUG ("state:%d", stream->state); - buf = packet->buffer; - - GST_DEBUG ("Resizing buffer to %d (size:%d) (Was %" G_GSIZE_FORMAT - " bytes long)", (int) (packet->payload - packet->bufmap.data), - (int) (packet->data_end - packet->payload), packet->bufmap.size); - gst_buffer_resize (buf, packet->payload - packet->bufmap.data, - packet->data_end - packet->payload); + size = packet->data_end - packet->payload; + data = packet->payload; if (stream->state == PENDING_PACKET_EMPTY) { if (G_UNLIKELY (!packet->payload_unit_start_indicator)) { @@ -1345,26 +1306,31 @@ gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream, switch (stream->state) { case PENDING_PACKET_HEADER: { - GST_LOG ("HEADER: appending data to array"); - /* Append to the array */ - stream->pendingbuffers[stream->nbpending++] = buf; - stream->current_size += packet->bufmap.size; + GST_LOG ("HEADER: Parsing PES header"); /* parse the header */ - gst_ts_demux_parse_pes_header (demux, stream); + gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset); break; } case PENDING_PACKET_BUFFER: { - GST_LOG ("BUFFER: appending data to bufferlist"); - stream->currentlist = g_list_prepend (stream->currentlist, buf); - stream->current_size += packet->bufmap.size; + GST_LOG ("BUFFER: appending data"); + if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) { + GST_LOG ("resizing buffer"); + stream->allocated_size = stream->allocated_size * 2; + stream->data = g_realloc (stream->data, stream->allocated_size); + } + memcpy (stream->data + stream->current_size, data, size); + stream->current_size += size; break; } case PENDING_PACKET_DISCONT: { - GST_LOG ("DISCONT: dropping buffer"); - gst_buffer_unref (packet->buffer); + GST_LOG ("DISCONT: not storing/pushing"); + if (G_UNLIKELY (stream->data)) { + g_free (stream->data); + stream->data = NULL; + } break; } default: @@ -1461,18 +1427,16 @@ static GstFlowReturn gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) { GstFlowReturn res = GST_FLOW_OK; - GList *tmp; MpegTSBaseStream *bs = (MpegTSBaseStream *) stream; + GstBuffer *buffer = NULL; MpegTSPacketizer2 *packetizer = MPEG_TS_BASE_PACKETIZER (demux); - GstBuffer *buf; - GstClockTime ts; GST_DEBUG_OBJECT (stream->pad, "stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid, bs->stream_type, stream->state); - if (G_UNLIKELY (stream->currentlist == NULL)) { - GST_LOG ("stream->current == NULL"); + if (G_UNLIKELY (stream->data == NULL)) { + GST_LOG ("stream->data == NULL"); goto beach; } @@ -1490,44 +1454,29 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) activate_pad_for_stream (demux, stream); if (G_UNLIKELY (stream->pad == NULL)) { - g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL); - g_list_free (stream->currentlist); - stream->currentlist = NULL; + g_free (stream->data); goto beach; } if (G_UNLIKELY (stream->need_newsegment)) calculate_and_push_newsegment (demux, stream); - /* We have a confirmed buffer, let's push it out */ - GST_LOG_OBJECT (stream->pad, "Putting pending data into GstBufferList"); - stream->currentlist = g_list_reverse (stream->currentlist); - buf = gst_buffer_make_writable ((GstBuffer *) stream->currentlist->data); - - ts = GST_BUFFER_PTS (buf); + buffer = gst_buffer_new_wrapped (stream->data, stream->current_size); GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT, GST_TIME_ARGS (stream->pts)); - if (GST_CLOCK_TIME_IS_VALID (stream->pts) - && !GST_CLOCK_TIME_IS_VALID (ts)) - ts = mpegts_packetizer_pts_to_ts (packetizer, stream->pts); + if (GST_CLOCK_TIME_IS_VALID (stream->pts)) + GST_BUFFER_PTS (buffer) = + mpegts_packetizer_pts_to_ts (packetizer, stream->pts); + if (GST_CLOCK_TIME_IS_VALID (stream->dts)) + GST_BUFFER_DTS (buffer) = + mpegts_packetizer_pts_to_ts (packetizer, stream->dts); - for (tmp = stream->currentlist->next; tmp; tmp = tmp->next) { - gst_buffer_copy_into (buf, (GstBuffer *) tmp->data, GST_BUFFER_COPY_MEMORY, - 0, -1); - gst_buffer_unref ((GstBuffer *) tmp->data); - } - - g_list_free (stream->currentlist); - stream->currentlist = NULL; - - GST_BUFFER_PTS (buf) = ts; - GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT); GST_DEBUG_OBJECT (stream->pad, - "Pushing buffer list with timestamp: %" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_PTS (buf))); + "Pushing buffer with timestamp: %" GST_TIME_FORMAT, + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); - res = gst_pad_push (stream->pad, buf); + res = gst_pad_push (stream->pad, buffer); GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res)); res = tsdemux_combine_flows (demux, stream, res); GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res)); @@ -1536,15 +1485,9 @@ beach: /* Reset everything */ GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res)); stream->state = PENDING_PACKET_EMPTY; - memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); - stream->nbpending = 0; + stream->data = NULL; stream->expected_size = 0; stream->current_size = 0; - if (stream->currentlist) { - g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL); - g_list_free (stream->currentlist); - } - stream->currentlist = NULL; return res; } @@ -1555,20 +1498,14 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream, { GstFlowReturn res = GST_FLOW_OK; -#if 0 - GST_DEBUG ("buffer:%p, data:%p", GST_BUFFER_DATA (packet->buffer), - packet->data); -#endif - GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", - packet->pid, - packet->payload_unit_start_indicator, - packet->adaptation_field_control, + GST_DEBUG ("data:%p", packet->data); + GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", packet->pid, + packet->payload_unit_start_indicator, packet->adaptation_field_control, packet->continuity_counter, packet->payload); if (section) { - GST_DEBUG ("section complete:%d, buffer size %" G_GSIZE_FORMAT, - section->complete, gst_buffer_get_size (section->buffer)); - gst_buffer_unref (packet->buffer); + GST_DEBUG ("section complete:%d, buffer size %d", + section->complete, section->section_length); return res; } @@ -1579,11 +1516,9 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream, if (packet->adaptation_field_control & 0x2) { if (packet->afc_flags & MPEGTS_AFC_PCR_FLAG) - gst_ts_demux_record_pcr (demux, stream, packet->pcr, - GST_BUFFER_OFFSET (packet->buffer)); + gst_ts_demux_record_pcr (demux, stream, packet->pcr, packet->offset); if (packet->afc_flags & MPEGTS_AFC_OPCR_FLAG) - gst_ts_demux_record_opcr (demux, stream, packet->opcr, - GST_BUFFER_OFFSET (packet->buffer)); + gst_ts_demux_record_opcr (demux, stream, packet->opcr, packet->offset); } if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED) @@ -1594,8 +1529,7 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream, /* Finally check if the data we queued completes a packet */ if (stream->expected_size && stream->current_size == stream->expected_size) res = gst_ts_demux_push_pending_data (demux, stream); - } else - gst_buffer_unref (packet->buffer); + } return res; } @@ -1621,11 +1555,7 @@ gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet, if (stream) { res = gst_ts_demux_handle_packet (demux, stream, packet, section); - } else if (packet->buffer) - gst_buffer_unref (packet->buffer); - } else { - if (packet->buffer) - gst_buffer_unref (packet->buffer); + } } return res; }