mpegtsmux: avoid memcpy due to temporary packet buffer

... by writing directly into the output buffer instead.
This commit is contained in:
Mark Nauwelaerts 2012-06-11 14:03:26 +02:00
parent 8800946aa3
commit 05dcabfe16
3 changed files with 128 additions and 32 deletions

View File

@ -151,7 +151,8 @@ static void gst_mpegtsmux_get_property (GObject * object, guint prop_id,
static void mpegtsmux_reset (MpegTsMux * mux, gboolean alloc); static void mpegtsmux_reset (MpegTsMux * mux, gboolean alloc);
static void mpegtsmux_dispose (GObject * object); static void mpegtsmux_dispose (GObject * object);
static gboolean new_packet_cb (guint8 * data, guint len, void *user_data, static void alloc_packet_cb (GstBuffer ** _buf, void *user_data);
static gboolean new_packet_cb (GstBuffer * buf, void *user_data,
gint64 new_pcr); gint64 new_pcr);
static void release_buffer_cb (guint8 * data, void *user_data); static void release_buffer_cb (guint8 * data, void *user_data);
static GstFlowReturn mpegtsmux_collect_packet (MpegTsMux * mux, static GstFlowReturn mpegtsmux_collect_packet (MpegTsMux * mux,
@ -342,6 +343,7 @@ mpegtsmux_reset (MpegTsMux * mux, gboolean alloc)
if (alloc) { if (alloc) {
mux->tsmux = tsmux_new (); mux->tsmux = tsmux_new ();
tsmux_set_write_func (mux->tsmux, new_packet_cb, mux); tsmux_set_write_func (mux->tsmux, new_packet_cb, mux);
tsmux_set_alloc_func (mux->tsmux, alloc_packet_cb, mux);
} }
} }
@ -1368,20 +1370,20 @@ exit:
/* Called when the TsMux has prepared a packet for output. Return FALSE /* Called when the TsMux has prepared a packet for output. Return FALSE
* on error */ * on error */
static gboolean static gboolean
new_packet_cb (guint8 * data, guint len, void *user_data, gint64 new_pcr) new_packet_cb (GstBuffer * buf, void *user_data, gint64 new_pcr)
{ {
MpegTsMux *mux = (MpegTsMux *) user_data; MpegTsMux *mux = (MpegTsMux *) user_data;
gint offset = 0; gint offset = 0;
GstBuffer *buf;
if (mux->m2ts_mode == TRUE) offset = GST_BUFFER_DATA (buf) - GST_BUFFER_MALLOCDATA (buf);
offset = 4;
buf = gst_buffer_new_and_alloc (NORMAL_TS_PACKET_LENGTH + offset);
GST_BUFFER_TIMESTAMP (buf) = mux->last_ts; GST_BUFFER_TIMESTAMP (buf) = mux->last_ts;
memcpy (GST_BUFFER_DATA (buf) + offset, data, len);
/* do common init (flags and streamheaders) */ /* do common init (flags and streamheaders) */
new_packet_common_init (mux, buf, data, len); new_packet_common_init (mux, buf,
GST_BUFFER_DATA (buf), GST_BUFFER_SIZE (buf));
/* all is meant for downstream, including any prefix */
GST_BUFFER_DATA (buf) = GST_BUFFER_MALLOCDATA (buf);
GST_BUFFER_SIZE (buf) += offset;
if (offset) if (offset)
return new_packet_m2ts (mux, buf, new_pcr); return new_packet_m2ts (mux, buf, new_pcr);
@ -1391,6 +1393,26 @@ new_packet_cb (guint8 * data, guint len, void *user_data, gint64 new_pcr)
return TRUE; return TRUE;
} }
/* called when TsMux needs new packet to write into */
static void
alloc_packet_cb (GstBuffer ** _buf, void *user_data)
{
MpegTsMux *mux = (MpegTsMux *) user_data;
GstBuffer *buf;
gint offset = 0;
if (mux->m2ts_mode == TRUE)
offset = 4;
/* TODO might be even more efficient to avoid later memcpy
* if these are subbuffer from a larger buffer or so */
buf = gst_buffer_new_and_alloc (NORMAL_TS_PACKET_LENGTH + offset);
GST_BUFFER_DATA (buf) += offset;
GST_BUFFER_SIZE (buf) -= offset;
*_buf = buf;
}
static void static void
mpegtsdemux_set_header_on_caps (MpegTsMux * mux) mpegtsdemux_set_header_on_caps (MpegTsMux * mux)
{ {

View File

@ -160,6 +160,25 @@ tsmux_set_write_func (TsMux * mux, TsMuxWriteFunc func, void *user_data)
mux->write_func_data = user_data; mux->write_func_data = user_data;
} }
/**
* tsmux_set_alloc_func:
* @mux: a #TsMux
* @func: a user callback function
* @user_data: user data passed to @func
*
* Set the callback function and user data to be called when @mux needs
* a new buffer to write a packet into.
* @user_data will be passed as user data in @func.
*/
void
tsmux_set_alloc_func (TsMux * mux, TsMuxAllocFunc func, void *user_data)
{
g_return_if_fail (mux != NULL);
mux->alloc_func = func;
mux->alloc_func_data = user_data;
}
/** /**
* tsmux_set_pat_interval: * tsmux_set_pat_interval:
* @mux: a #TsMux * @mux: a #TsMux
@ -435,13 +454,32 @@ tsmux_find_stream (TsMux * mux, guint16 pid)
} }
static gboolean static gboolean
tsmux_packet_out (TsMux * mux, gint64 pcr) tsmux_get_buffer (TsMux * mux, GstBuffer ** buf)
{ {
if (G_UNLIKELY (mux->write_func == NULL)) g_return_val_if_fail (buf, FALSE);
return TRUE;
return mux->write_func (mux->packet_buf, TSMUX_PACKET_LENGTH, if (G_UNLIKELY (!mux->alloc_func))
mux->write_func_data, pcr); return FALSE;
mux->alloc_func (buf, mux->alloc_func_data);
if (!*buf)
return FALSE;
g_assert (GST_BUFFER_SIZE (*buf) == TSMUX_PACKET_LENGTH);
return TRUE;
}
static gboolean
tsmux_packet_out (TsMux * mux, GstBuffer * buf, gint64 pcr)
{
if (G_UNLIKELY (mux->write_func == NULL)) {
if (buf)
gst_buffer_unref (buf);
return TRUE;
}
return mux->write_func (buf, mux->write_func_data, pcr);
} }
/* /*
@ -707,6 +745,8 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
TsMuxPacketInfo *pi = &stream->pi; TsMuxPacketInfo *pi = &stream->pi;
gboolean res; gboolean res;
gint64 cur_pcr = -1; gint64 cur_pcr = -1;
GstBuffer *buf = NULL;
guint8 *data;
g_return_val_if_fail (mux != NULL, FALSE); g_return_val_if_fail (mux != NULL, FALSE);
g_return_val_if_fail (stream != NULL, FALSE); g_return_val_if_fail (stream != NULL, FALSE);
@ -786,19 +826,32 @@ tsmux_write_stream_packet (TsMux * mux, TsMuxStream * stream)
} }
pi->stream_avail = tsmux_stream_bytes_avail (stream); pi->stream_avail = tsmux_stream_bytes_avail (stream);
if (!tsmux_write_ts_header (mux->packet_buf, pi, &payload_len, &payload_offs)) /* obtain buffer */
if (!tsmux_get_buffer (mux, &buf))
return FALSE; return FALSE;
if (!tsmux_stream_get_data (stream, mux->packet_buf + payload_offs, data = GST_BUFFER_DATA (buf);
payload_len))
return FALSE;
res = tsmux_packet_out (mux, cur_pcr); if (!tsmux_write_ts_header (data, pi, &payload_len, &payload_offs))
goto fail;
if (!tsmux_stream_get_data (stream, data + payload_offs, payload_len))
goto fail;
res = tsmux_packet_out (mux, buf, cur_pcr);
/* Reset all dynamic flags */ /* Reset all dynamic flags */
stream->pi.flags &= TSMUX_PACKET_FLAG_PES_FULL_HEADER; stream->pi.flags &= TSMUX_PACKET_FLAG_PES_FULL_HEADER;
return res; return res;
/* ERRORS */
fail:
{
if (buf)
gst_buffer_unref (buf);
return FALSE;
}
} }
/** /**
@ -824,6 +877,7 @@ tsmux_write_section (TsMux * mux, TsMuxSection * section)
guint payload_remain; guint payload_remain;
guint payload_len, payload_offs; guint payload_len, payload_offs;
TsMuxPacketInfo *pi; TsMuxPacketInfo *pi;
GstBuffer *buf = NULL;
pi = &section->pi; pi = &section->pi;
@ -833,44 +887,61 @@ tsmux_write_section (TsMux * mux, TsMuxSection * section)
payload_remain = pi->stream_avail; payload_remain = pi->stream_avail;
while (payload_remain > 0) { while (payload_remain > 0) {
guint8 *data;
/* obtain buffer */
if (!tsmux_get_buffer (mux, &buf))
goto fail;
data = GST_BUFFER_DATA (buf);
if (pi->packet_start_unit_indicator) { if (pi->packet_start_unit_indicator) {
/* Need to write an extra single byte start pointer */ /* Need to write an extra single byte start pointer */
pi->stream_avail++; pi->stream_avail++;
if (!tsmux_write_ts_header (mux->packet_buf, pi, if (!tsmux_write_ts_header (data, pi, &payload_len, &payload_offs)) {
&payload_len, &payload_offs)) {
pi->stream_avail--; pi->stream_avail--;
return FALSE; goto fail;
} }
pi->stream_avail--; pi->stream_avail--;
/* Write the pointer byte */ /* Write the pointer byte */
mux->packet_buf[payload_offs] = 0x00; data[payload_offs] = 0x00;
payload_offs++; payload_offs++;
payload_len--; payload_len--;
pi->packet_start_unit_indicator = FALSE; pi->packet_start_unit_indicator = FALSE;
} else { } else {
if (!tsmux_write_ts_header (mux->packet_buf, pi, if (!tsmux_write_ts_header (data, pi, &payload_len, &payload_offs))
&payload_len, &payload_offs)) goto fail;
return FALSE;
} }
TS_DEBUG ("Outputting %d bytes to section. %d remaining after", TS_DEBUG ("Outputting %d bytes to section. %d remaining after",
payload_len, payload_remain - payload_len); payload_len, payload_remain - payload_len);
memcpy (mux->packet_buf + payload_offs, cur_in, payload_len); memcpy (data + payload_offs, cur_in, payload_len);
cur_in += payload_len; cur_in += payload_len;
payload_remain -= payload_len; payload_remain -= payload_len;
/* we do not write PCR in section */ /* we do not write PCR in section */
if (G_UNLIKELY (!tsmux_packet_out (mux, -1))) { if (G_UNLIKELY (!tsmux_packet_out (mux, buf, -1))) {
return FALSE; /* buffer given away */
buf = NULL;
goto fail;
} }
buf = NULL;
} }
return TRUE; return TRUE;
/* ERRORS */
fail:
{
if (buf)
gst_buffer_unref (buf);
return FALSE;
}
} }
static void static void

View File

@ -99,7 +99,8 @@ G_BEGIN_DECLS
typedef struct TsMuxSection TsMuxSection; typedef struct TsMuxSection TsMuxSection;
typedef struct TsMux TsMux; typedef struct TsMux TsMux;
typedef gboolean (*TsMuxWriteFunc) (guint8 *data, guint len, void *user_data, gint64 new_pcr); typedef gboolean (*TsMuxWriteFunc) (GstBuffer * buf, void *user_data, gint64 new_pcr);
typedef void (*TsMuxAllocFunc) (GstBuffer ** buf, void *user_data);
struct TsMuxSection { struct TsMuxSection {
TsMuxPacketInfo pi; TsMuxPacketInfo pi;
@ -159,11 +160,12 @@ struct TsMux {
/* last time PAT written in MPEG PTS clock time */ /* last time PAT written in MPEG PTS clock time */
gint64 last_pat_ts; gint64 last_pat_ts;
/* temp packet buffer */
guint8 packet_buf[TSMUX_PACKET_LENGTH];
/* callback to write finished packet */ /* callback to write finished packet */
TsMuxWriteFunc write_func; TsMuxWriteFunc write_func;
void *write_func_data; void *write_func_data;
/* callback to alloc new packet buffer */
TsMuxAllocFunc alloc_func;
void *alloc_func_data;
/* scratch space for writing ES_info descriptors */ /* scratch space for writing ES_info descriptors */
guint8 es_info_buf[TSMUX_MAX_ES_INFO_LENGTH]; guint8 es_info_buf[TSMUX_MAX_ES_INFO_LENGTH];
@ -175,6 +177,7 @@ void tsmux_free (TsMux *mux);
/* Setting muxing session properties */ /* Setting muxing session properties */
void tsmux_set_write_func (TsMux *mux, TsMuxWriteFunc func, void *user_data); void tsmux_set_write_func (TsMux *mux, TsMuxWriteFunc func, void *user_data);
void tsmux_set_alloc_func (TsMux *mux, TsMuxAllocFunc func, void *user_data);
void tsmux_set_pat_interval (TsMux *mux, guint interval); void tsmux_set_pat_interval (TsMux *mux, guint interval);
guint tsmux_get_pat_interval (TsMux *mux); guint tsmux_get_pat_interval (TsMux *mux);
guint16 tsmux_get_new_pid (TsMux *mux); guint16 tsmux_get_new_pid (TsMux *mux);