rtmp2: Count outgoing bytes and acked bytes

For statistics.
This commit is contained in:
Jan Alexander Steffens (heftig) 2020-02-14 12:34:44 +01:00
parent 0c344a7efb
commit 5d720eb59e
No known key found for this signature in database
GPG Key ID: DE5E0C5F25941CA5

View File

@ -80,7 +80,9 @@ struct _GstRtmpConnection
guint32 out_window_ack_size, out_window_ack_size_pending; guint32 out_window_ack_size, out_window_ack_size_pending;
guint64 in_bytes_total; guint64 in_bytes_total;
guint64 out_bytes_total;
guint64 in_bytes_acked; guint64 in_bytes_acked;
guint64 out_bytes_acked;
}; };
@ -113,6 +115,8 @@ static void gst_rtmp_connection_handle_message (GstRtmpConnection * sc,
GstBuffer * buffer); GstBuffer * buffer);
static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self, static void gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
guint32 in_chunk_size); guint32 in_chunk_size);
static void gst_rtmp_connection_handle_ack (GstRtmpConnection * self,
guint32 bytes);
static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * static void gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection *
self, guint32 in_chunk_size); self, guint32 in_chunk_size);
@ -548,19 +552,25 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
{ {
GOutputStream *os = G_OUTPUT_STREAM (obj); GOutputStream *os = G_OUTPUT_STREAM (obj);
GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data); GstRtmpConnection *self = GST_RTMP_CONNECTION (user_data);
gsize bytes_written = 0;
GError *error = NULL; GError *error = NULL;
gboolean res; gboolean res;
self->writing = FALSE; self->writing = FALSE;
res = gst_rtmp_output_stream_write_all_buffer_finish (os, result, NULL, res = gst_rtmp_output_stream_write_all_buffer_finish (os, result,
&error); &bytes_written, &error);
self->out_bytes_total += bytes_written;
if (!res) { if (!res) {
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
GST_INFO_OBJECT (self, "write cancelled"); GST_INFO_OBJECT (self,
"write cancelled (wrote %" G_GSIZE_FORMAT " bytes)", bytes_written);
} else { } else {
GST_ERROR_OBJECT (self, "write error: %s", error->message); GST_ERROR_OBJECT (self,
"write error: %s (wrote %" G_GSIZE_FORMAT " bytes)",
error->message, bytes_written);
} }
gst_rtmp_connection_emit_error (self); gst_rtmp_connection_emit_error (self);
g_error_free (error); g_error_free (error);
@ -568,7 +578,8 @@ gst_rtmp_connection_write_buffer_done (GObject * obj,
return; return;
} }
GST_LOG_OBJECT (self, "write completed"); GST_LOG_OBJECT (self, "write completed; wrote %" G_GSIZE_FORMAT " bytes",
bytes_written);
gst_rtmp_connection_apply_protocol_control (self); gst_rtmp_connection_apply_protocol_control (self);
gst_rtmp_connection_start_write (self); gst_rtmp_connection_start_write (self);
@ -717,9 +728,9 @@ gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
break; break;
case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT: case GST_RTMP_MESSAGE_TYPE_ACKNOWLEDGEMENT:
/* We don't really send ack requests that we care about, so ignore */
GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT, GST_DEBUG_OBJECT (connection, "acknowledgement %" G_GUINT32_FORMAT,
pc.param); pc.param);
gst_rtmp_connection_handle_ack (connection, pc.param);
break; break;
case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE: case GST_RTMP_MESSAGE_TYPE_WINDOW_ACK_SIZE:
@ -826,6 +837,31 @@ gst_rtmp_connection_handle_set_chunk_size (GstRtmpConnection * self,
self->in_chunk_size = chunk_size; self->in_chunk_size = chunk_size;
} }
static void
gst_rtmp_connection_handle_ack (GstRtmpConnection * self, guint32 bytes)
{
guint64 last_ack, new_ack;
guint32 last_ack_low, last_ack_high;
last_ack = self->out_bytes_acked;
last_ack_low = last_ack & G_MAXUINT32;
last_ack_high = (last_ack >> 32) & G_MAXUINT32;
if (bytes < last_ack_low) {
GST_WARNING_OBJECT (self,
"Acknowledgement bytes regression, assuming rollover: %"
G_GUINT32_FORMAT " < %" G_GUINT32_FORMAT, bytes, last_ack_low);
last_ack_high += 1;
}
new_ack = (((guint64) last_ack_high) << 32) | bytes;
GST_LOG_OBJECT (self, "Peer acknowledged %" G_GUINT64_FORMAT " bytes",
new_ack - last_ack);
self->out_bytes_acked = new_ack;
}
static void static void
gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self, gst_rtmp_connection_handle_window_ack_size (GstRtmpConnection * self,
guint32 window_ack_size) guint32 window_ack_size)