tsdemux: Adapt to new packetizer API

We no longer use GstBufferList and instead copy the incoming data
into newly allocated memory.

This makes tsdemux behaviour 3 to 4 times faster.
This commit is contained in:
Edward Hervey 2012-05-21 17:53:37 +02:00
parent 7fdf1cb0c8
commit 6bf54ff850

View File

@ -54,9 +54,6 @@
#define TABLE_ID_UNSET 0xFF #define TABLE_ID_UNSET 0xFF
/* Size of the pendingbuffers array. */
#define TS_MAX_PENDING_BUFFERS 256
#define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024)) #define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
/* small PCR for wrap detection */ /* small PCR for wrap detection */
#define PCR_SMALL 17775000 #define PCR_SMALL 17775000
@ -122,19 +119,16 @@ struct _TSDemuxStream
/* Output data */ /* Output data */
PendingPacketState state; PendingPacketState state;
/* Pending buffers array. */
/* These buffers are stored in this array until the PES header (if needed) /* Data to push (allocated) */
* is succesfully parsed. */ guint8 *data;
GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS];
guint8 nbpending;
/* Size of data to push (if known) */ /* Size of data to push (if known) */
guint expected_size; guint expected_size;
/* Size of currently queued data */ /* Size of currently queued data */
guint current_size; guint current_size;
guint allocated_size;
/* Current data to be pushed out */
GList *currentlist;
/* Current PTS/DTS for this stream */ /* Current PTS/DTS for this stream */
GstClockTime pts; GstClockTime pts;
@ -1022,23 +1016,16 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
static void static void
gst_ts_demux_stream_flush (TSDemuxStream * stream) gst_ts_demux_stream_flush (TSDemuxStream * stream)
{ {
gint i;
stream->pts = GST_CLOCK_TIME_NONE; stream->pts = GST_CLOCK_TIME_NONE;
for (i = 0; i < stream->nbpending; i++) GST_DEBUG ("flushing stream %p", stream);
gst_buffer_unref (stream->pendingbuffers[i]);
memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
stream->nbpending = 0;
if (stream->currentlist) {
g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
g_list_free (stream->currentlist);
stream->currentlist = NULL;
}
if (stream->data)
g_free (stream->data);
stream->data = NULL;
stream->state = PENDING_PACKET_EMPTY; stream->state = PENDING_PACKET_EMPTY;
stream->expected_size = 0; stream->expected_size = 0;
stream->allocated_size = 0;
stream->current_size = 0; stream->current_size = 0;
stream->need_newsegment = TRUE; stream->need_newsegment = TRUE;
stream->pts = GST_CLOCK_TIME_NONE; stream->pts = GST_CLOCK_TIME_NONE;
@ -1231,27 +1218,18 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
} }
} }
static GstFlowReturn static void
gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream) gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
guint8 * data, guint32 length, guint64 bufferoffset)
{ {
MpegTSBase *base = (MpegTSBase *) demux; MpegTSBase *base = (MpegTSBase *) demux;
PESHeader header; PESHeader header;
GstBuffer *buf;
GstFlowReturn res = GST_FLOW_OK;
gint offset = 0; gint offset = 0;
GstMapInfo map;
guint64 bufferoffset;
PESParsingResult parseres; PESParsingResult parseres;
buf = stream->pendingbuffers[0] = GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
gst_buffer_make_writable (stream->pendingbuffers[0]);
gst_buffer_map (buf, &map, GST_MAP_READ);
bufferoffset = GST_BUFFER_OFFSET (buf);
GST_MEMDUMP ("Header buffer", map.data, MIN (map.size, 32)); parseres = mpegts_parse_pes_header (data, length, &header, &offset);
parseres = mpegts_parse_pes_header (map.data, map.size, &header, &offset);
gst_buffer_unmap (buf, &map);
if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE)) if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE))
goto discont; goto discont;
if (G_UNLIKELY (parseres == PES_PARSING_BAD)) { if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
@ -1271,45 +1249,32 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (stream->pts),
GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (header.DTS))); GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (header.DTS)));
{
MpegTSPacketizer2 *packetizer = base->packetizer;
GST_BUFFER_DTS (buf) =
mpegts_packetizer_pts_to_ts (packetizer, stream->dts);
GST_BUFFER_PTS (buf) =
mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
}
GST_DEBUG ("buf %" GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (buf)));
} }
/* Remove PES headers */ /* Remove PES headers */
GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%" GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
G_GSIZE_FORMAT ")", header.header_size, header.packet_length, map.size); header.header_size, header.packet_length, length);
stream->expected_size = header.packet_length; stream->expected_size = header.packet_length;
gst_buffer_resize (buf, header.header_size, map.size - header.header_size); data += header.header_size;
length -= header.header_size;
/* FIXME : responsible for switching to PENDING_PACKET_BUFFER and /* Create the output buffer */
* creating the bufferlist */ if (stream->expected_size)
if (1) { stream->allocated_size = stream->expected_size;
/* Append to the buffer list */ else
if (G_UNLIKELY (stream->currentlist == NULL)) { stream->allocated_size = 8192;
guint8 i; g_assert (stream->data == NULL);
stream->data = g_malloc (stream->allocated_size);
memcpy (stream->data, data, length);
stream->current_size = length;
/* Push pending buffers into the list */
for (i = stream->nbpending; i; i--)
stream->currentlist =
g_list_prepend (stream->currentlist, stream->pendingbuffers[i - 1]);
memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
stream->nbpending = 0;
}
stream->state = PENDING_PACKET_BUFFER; stream->state = PENDING_PACKET_BUFFER;
}
return res; return;
discont: discont:
stream->state = PENDING_PACKET_DISCONT; stream->state = PENDING_PACKET_DISCONT;
return res; return;
} }
/* ONLY CALL THIS: /* ONLY CALL THIS:
@ -1320,17 +1285,13 @@ static inline void
gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream, gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
MpegTSPacketizerPacket * packet) MpegTSPacketizerPacket * packet)
{ {
GstBuffer *buf; guint8 *data;
guint size;
GST_DEBUG ("state:%d", stream->state); GST_DEBUG ("state:%d", stream->state);
buf = packet->buffer; size = packet->data_end - packet->payload;
data = packet->payload;
GST_DEBUG ("Resizing buffer to %d (size:%d) (Was %" G_GSIZE_FORMAT
" bytes long)", (int) (packet->payload - packet->bufmap.data),
(int) (packet->data_end - packet->payload), packet->bufmap.size);
gst_buffer_resize (buf, packet->payload - packet->bufmap.data,
packet->data_end - packet->payload);
if (stream->state == PENDING_PACKET_EMPTY) { if (stream->state == PENDING_PACKET_EMPTY) {
if (G_UNLIKELY (!packet->payload_unit_start_indicator)) { if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
@ -1345,26 +1306,31 @@ gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
switch (stream->state) { switch (stream->state) {
case PENDING_PACKET_HEADER: case PENDING_PACKET_HEADER:
{ {
GST_LOG ("HEADER: appending data to array"); GST_LOG ("HEADER: Parsing PES header");
/* Append to the array */
stream->pendingbuffers[stream->nbpending++] = buf;
stream->current_size += packet->bufmap.size;
/* parse the header */ /* parse the header */
gst_ts_demux_parse_pes_header (demux, stream); gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset);
break; break;
} }
case PENDING_PACKET_BUFFER: case PENDING_PACKET_BUFFER:
{ {
GST_LOG ("BUFFER: appending data to bufferlist"); GST_LOG ("BUFFER: appending data");
stream->currentlist = g_list_prepend (stream->currentlist, buf); if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) {
stream->current_size += packet->bufmap.size; GST_LOG ("resizing buffer");
stream->allocated_size = stream->allocated_size * 2;
stream->data = g_realloc (stream->data, stream->allocated_size);
}
memcpy (stream->data + stream->current_size, data, size);
stream->current_size += size;
break; break;
} }
case PENDING_PACKET_DISCONT: case PENDING_PACKET_DISCONT:
{ {
GST_LOG ("DISCONT: dropping buffer"); GST_LOG ("DISCONT: not storing/pushing");
gst_buffer_unref (packet->buffer); if (G_UNLIKELY (stream->data)) {
g_free (stream->data);
stream->data = NULL;
}
break; break;
} }
default: default:
@ -1461,18 +1427,16 @@ static GstFlowReturn
gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
{ {
GstFlowReturn res = GST_FLOW_OK; GstFlowReturn res = GST_FLOW_OK;
GList *tmp;
MpegTSBaseStream *bs = (MpegTSBaseStream *) stream; MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
GstBuffer *buffer = NULL;
MpegTSPacketizer2 *packetizer = MPEG_TS_BASE_PACKETIZER (demux); MpegTSPacketizer2 *packetizer = MPEG_TS_BASE_PACKETIZER (demux);
GstBuffer *buf;
GstClockTime ts;
GST_DEBUG_OBJECT (stream->pad, GST_DEBUG_OBJECT (stream->pad,
"stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid, "stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid,
bs->stream_type, stream->state); bs->stream_type, stream->state);
if (G_UNLIKELY (stream->currentlist == NULL)) { if (G_UNLIKELY (stream->data == NULL)) {
GST_LOG ("stream->current == NULL"); GST_LOG ("stream->data == NULL");
goto beach; goto beach;
} }
@ -1490,44 +1454,29 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
activate_pad_for_stream (demux, stream); activate_pad_for_stream (demux, stream);
if (G_UNLIKELY (stream->pad == NULL)) { if (G_UNLIKELY (stream->pad == NULL)) {
g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL); g_free (stream->data);
g_list_free (stream->currentlist);
stream->currentlist = NULL;
goto beach; goto beach;
} }
if (G_UNLIKELY (stream->need_newsegment)) if (G_UNLIKELY (stream->need_newsegment))
calculate_and_push_newsegment (demux, stream); calculate_and_push_newsegment (demux, stream);
/* We have a confirmed buffer, let's push it out */ buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
GST_LOG_OBJECT (stream->pad, "Putting pending data into GstBufferList");
stream->currentlist = g_list_reverse (stream->currentlist);
buf = gst_buffer_make_writable ((GstBuffer *) stream->currentlist->data);
ts = GST_BUFFER_PTS (buf);
GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT, GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
GST_TIME_ARGS (stream->pts)); GST_TIME_ARGS (stream->pts));
if (GST_CLOCK_TIME_IS_VALID (stream->pts) if (GST_CLOCK_TIME_IS_VALID (stream->pts))
&& !GST_CLOCK_TIME_IS_VALID (ts)) GST_BUFFER_PTS (buffer) =
ts = mpegts_packetizer_pts_to_ts (packetizer, stream->pts); mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
if (GST_CLOCK_TIME_IS_VALID (stream->dts))
GST_BUFFER_DTS (buffer) =
mpegts_packetizer_pts_to_ts (packetizer, stream->dts);
for (tmp = stream->currentlist->next; tmp; tmp = tmp->next) {
gst_buffer_copy_into (buf, (GstBuffer *) tmp->data, GST_BUFFER_COPY_MEMORY,
0, -1);
gst_buffer_unref ((GstBuffer *) tmp->data);
}
g_list_free (stream->currentlist);
stream->currentlist = NULL;
GST_BUFFER_PTS (buf) = ts;
GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
GST_DEBUG_OBJECT (stream->pad, GST_DEBUG_OBJECT (stream->pad,
"Pushing buffer list with timestamp: %" GST_TIME_FORMAT, "Pushing buffer with timestamp: %" GST_TIME_FORMAT,
GST_TIME_ARGS (GST_BUFFER_PTS (buf))); GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
res = gst_pad_push (stream->pad, buf); res = gst_pad_push (stream->pad, buffer);
GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res)); GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
res = tsdemux_combine_flows (demux, stream, res); res = tsdemux_combine_flows (demux, stream, res);
GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res)); GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
@ -1536,15 +1485,9 @@ beach:
/* Reset everything */ /* Reset everything */
GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res)); GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
stream->state = PENDING_PACKET_EMPTY; stream->state = PENDING_PACKET_EMPTY;
memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS); stream->data = NULL;
stream->nbpending = 0;
stream->expected_size = 0; stream->expected_size = 0;
stream->current_size = 0; stream->current_size = 0;
if (stream->currentlist) {
g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
g_list_free (stream->currentlist);
}
stream->currentlist = NULL;
return res; return res;
} }
@ -1555,20 +1498,14 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
{ {
GstFlowReturn res = GST_FLOW_OK; GstFlowReturn res = GST_FLOW_OK;
#if 0 GST_DEBUG ("data:%p", packet->data);
GST_DEBUG ("buffer:%p, data:%p", GST_BUFFER_DATA (packet->buffer), GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", packet->pid,
packet->data); packet->payload_unit_start_indicator, packet->adaptation_field_control,
#endif
GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p",
packet->pid,
packet->payload_unit_start_indicator,
packet->adaptation_field_control,
packet->continuity_counter, packet->payload); packet->continuity_counter, packet->payload);
if (section) { if (section) {
GST_DEBUG ("section complete:%d, buffer size %" G_GSIZE_FORMAT, GST_DEBUG ("section complete:%d, buffer size %d",
section->complete, gst_buffer_get_size (section->buffer)); section->complete, section->section_length);
gst_buffer_unref (packet->buffer);
return res; return res;
} }
@ -1579,11 +1516,9 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
if (packet->adaptation_field_control & 0x2) { if (packet->adaptation_field_control & 0x2) {
if (packet->afc_flags & MPEGTS_AFC_PCR_FLAG) if (packet->afc_flags & MPEGTS_AFC_PCR_FLAG)
gst_ts_demux_record_pcr (demux, stream, packet->pcr, gst_ts_demux_record_pcr (demux, stream, packet->pcr, packet->offset);
GST_BUFFER_OFFSET (packet->buffer));
if (packet->afc_flags & MPEGTS_AFC_OPCR_FLAG) if (packet->afc_flags & MPEGTS_AFC_OPCR_FLAG)
gst_ts_demux_record_opcr (demux, stream, packet->opcr, gst_ts_demux_record_opcr (demux, stream, packet->opcr, packet->offset);
GST_BUFFER_OFFSET (packet->buffer));
} }
if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED) if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)
@ -1594,8 +1529,7 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
/* Finally check if the data we queued completes a packet */ /* Finally check if the data we queued completes a packet */
if (stream->expected_size && stream->current_size == stream->expected_size) if (stream->expected_size && stream->current_size == stream->expected_size)
res = gst_ts_demux_push_pending_data (demux, stream); res = gst_ts_demux_push_pending_data (demux, stream);
} else }
gst_buffer_unref (packet->buffer);
return res; return res;
} }
@ -1621,11 +1555,7 @@ gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
if (stream) { if (stream) {
res = gst_ts_demux_handle_packet (demux, stream, packet, section); res = gst_ts_demux_handle_packet (demux, stream, packet, section);
} else if (packet->buffer) }
gst_buffer_unref (packet->buffer);
} else {
if (packet->buffer)
gst_buffer_unref (packet->buffer);
} }
return res; return res;
} }