rtsp: Add initial buffer support.

The initial buffer contains data for a connection which should be used
before starting to actually read anything from the socket.
This commit is contained in:
Peter Kjellerstedt 2009-06-09 15:27:17 +02:00
parent 2c08c76383
commit 814eaa728a

View File

@ -114,8 +114,8 @@ typedef struct
guint coutl; guint coutl;
} DecodeCtx; } DecodeCtx;
static GstRTSPResult read_line (gint fd, guint8 * buffer, guint * idx, static GstRTSPResult read_line (GstRTSPConnection * conn, guint8 * buffer,
guint size, DecodeCtx * ctxp); guint * idx, guint size);
static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key, static GstRTSPResult parse_key_value (guint8 * buffer, gchar * key,
guint keysize, gchar ** value); guint keysize, gchar ** value);
static void parse_string (gchar * dest, gint size, gchar ** src); static void parse_string (gchar * dest, gint size, gchar ** src);
@ -186,6 +186,9 @@ struct _GstRTSPConnection
GstPoll *fdset; GstPoll *fdset;
gchar *ip; gchar *ip;
gchar *initial_buffer;
gsize initial_buffer_offset;
/* Session state */ /* Session state */
gint cseq; /* sequence number */ gint cseq; /* sequence number */
gchar session_id[512]; /* session id */ gchar session_id[512]; /* session id */
@ -638,7 +641,7 @@ setup_tunneling (GstRTSPConnection * conn, GTimeVal * timeout)
idx = 0; idx = 0;
while (TRUE) { while (TRUE) {
res = read_line (conn->fd0.fd, buffer, &idx, sizeof (buffer), NULL); res = read_line (conn, buffer, &idx, sizeof (buffer));
if (res == GST_RTSP_EEOF) if (res == GST_RTSP_EEOF)
goto eof; goto eof;
if (res == GST_RTSP_OK) if (res == GST_RTSP_OK)
@ -1057,10 +1060,44 @@ write_bytes (gint fd, const guint8 * buffer, guint * idx, guint size)
} }
static gint static gint
fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx) fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
{ {
gint out = 0; gint out = 0;
if (G_UNLIKELY (conn->initial_buffer != NULL)) {
gsize left = strlen (&conn->initial_buffer[conn->initial_buffer_offset]);
out = MIN (left, size);
memcpy (buffer, &conn->initial_buffer[conn->initial_buffer_offset], out);
if (left == (gsize) out) {
g_free (conn->initial_buffer);
conn->initial_buffer = NULL;
conn->initial_buffer_offset = 0;
} else
conn->initial_buffer_offset += out;
}
if (G_LIKELY (size > (guint) out)) {
gint r;
r = READ_SOCKET (conn->readfd->fd, &buffer[out], size - out);
if (r <= 0) {
if (out == 0)
out = r;
} else
out += r;
}
return out;
}
static gint
fill_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size)
{
DecodeCtx *ctx = conn->ctxp;
gint out = 0;
if (ctx) { if (ctx) {
while (size > 0) { while (size > 0) {
guint8 in[sizeof (ctx->out) * 4 / 3]; guint8 in[sizeof (ctx->out) * 4 / 3];
@ -1078,7 +1115,7 @@ fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx)
break; break;
/* try to read more bytes */ /* try to read more bytes */
r = READ_SOCKET (fd, in, sizeof (in)); r = fill_raw_bytes (conn, in, sizeof (in));
if (r <= 0) { if (r <= 0) {
if (out == 0) if (out == 0)
out = r; out = r;
@ -1091,14 +1128,14 @@ fill_bytes (gint fd, guint8 * buffer, guint size, DecodeCtx * ctx)
&ctx->save); &ctx->save);
} }
} else { } else {
out = READ_SOCKET (fd, buffer, size); out = fill_raw_bytes (conn, buffer, size);
} }
return out; return out;
} }
static GstRTSPResult static GstRTSPResult
read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) read_bytes (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
{ {
guint left; guint left;
@ -1110,7 +1147,7 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
while (left) { while (left) {
gint r; gint r;
r = fill_bytes (fd, &buffer[*idx], left, ctx); r = fill_bytes (conn, &buffer[*idx], left);
if (G_UNLIKELY (r == 0)) { if (G_UNLIKELY (r == 0)) {
return GST_RTSP_EEOF; return GST_RTSP_EEOF;
} else if (G_UNLIKELY (r < 0)) { } else if (G_UNLIKELY (r < 0)) {
@ -1127,13 +1164,13 @@ read_bytes (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx)
} }
static GstRTSPResult static GstRTSPResult
read_line (gint fd, guint8 * buffer, guint * idx, guint size, DecodeCtx * ctx) read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size)
{ {
while (TRUE) { while (TRUE) {
guint8 c; guint8 c;
gint r; gint r;
r = fill_bytes (fd, &c, 1, ctx); r = fill_bytes (conn, &c, 1);
if (G_UNLIKELY (r == 0)) { if (G_UNLIKELY (r == 0)) {
return GST_RTSP_EEOF; return GST_RTSP_EEOF;
} else if (G_UNLIKELY (r < 0)) { } else if (G_UNLIKELY (r < 0)) {
@ -1601,8 +1638,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
case STATE_START: case STATE_START:
builder->offset = 0; builder->offset = 0;
res = res =
read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 1);
&builder->offset, 1, conn->ctxp);
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1619,8 +1655,7 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
case STATE_DATA_HEADER: case STATE_DATA_HEADER:
{ {
res = res =
read_bytes (conn->readfd->fd, (guint8 *) builder->buffer, read_bytes (conn, (guint8 *) builder->buffer, &builder->offset, 4);
&builder->offset, 4, conn->ctxp);
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1636,8 +1671,8 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
case STATE_DATA_BODY: case STATE_DATA_BODY:
{ {
res = res =
read_bytes (conn->readfd->fd, builder->body_data, &builder->offset, read_bytes (conn, builder->body_data, &builder->offset,
builder->body_len, conn->ctxp); builder->body_len);
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1653,8 +1688,8 @@ build_next (GstRTSPBuilder * builder, GstRTSPMessage * message,
} }
case STATE_READ_LINES: case STATE_READ_LINES:
{ {
res = read_line (conn->readfd->fd, builder->buffer, &builder->offset, res = read_line (conn, builder->buffer, &builder->offset,
sizeof (builder->buffer), conn->ctxp); sizeof (builder->buffer));
if (res != GST_RTSP_OK) if (res != GST_RTSP_OK)
goto done; goto done;
@ -1803,7 +1838,7 @@ gst_rtsp_connection_read (GstRTSPConnection * conn, guint8 * data, guint size,
gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE); gst_poll_fd_ctl_read (conn->fdset, conn->readfd, TRUE);
while (TRUE) { while (TRUE) {
res = read_bytes (conn->readfd->fd, data, &offset, size, conn->ctxp); res = read_bytes (conn, data, &offset, size);
if (G_UNLIKELY (res == GST_RTSP_EEOF)) if (G_UNLIKELY (res == GST_RTSP_EEOF))
goto eof; goto eof;
if (G_LIKELY (res == GST_RTSP_OK)) if (G_LIKELY (res == GST_RTSP_OK))
@ -2012,6 +2047,10 @@ gst_rtsp_connection_close (GstRTSPConnection * conn)
g_free (conn->ip); g_free (conn->ip);
conn->ip = NULL; conn->ip = NULL;
g_free (conn->initial_buffer);
conn->initial_buffer = NULL;
conn->initial_buffer_offset = 0;
REMOVE_POLLFD (conn->fdset, &conn->fd0); REMOVE_POLLFD (conn->fdset, &conn->fd0);
REMOVE_POLLFD (conn->fdset, &conn->fd1); REMOVE_POLLFD (conn->fdset, &conn->fd1);
conn->writefd = NULL; conn->writefd = NULL;
@ -2701,6 +2740,9 @@ gst_rtsp_source_prepare (GSource * source, gint * timeout)
{ {
GstRTSPWatch *watch = (GstRTSPWatch *) source; GstRTSPWatch *watch = (GstRTSPWatch *) source;
if (watch->conn->initial_buffer != NULL)
return TRUE;
*timeout = (watch->conn->timeout * 1000); *timeout = (watch->conn->timeout * 1000);
return FALSE; return FALSE;
@ -2728,7 +2770,7 @@ gst_rtsp_source_dispatch (GSource * source, GSourceFunc callback G_GNUC_UNUSED,
GstRTSPResult res; GstRTSPResult res;
/* first read as much as we can */ /* first read as much as we can */
if (watch->readfd.revents & READ_COND) { if (watch->readfd.revents & READ_COND || watch->conn->initial_buffer != NULL) {
do { do {
res = build_next (&watch->builder, &watch->message, watch->conn); res = build_next (&watch->builder, &watch->message, watch->conn);
if (res == GST_RTSP_EINTR) if (res == GST_RTSP_EINTR)