[Pkg-telepathy-commits] [libnice] 96/265: agent: Queue incoming pseudo-TCP messages until ACKs can be sent
Simon McVittie
smcv at debian.org
Wed May 14 12:04:56 UTC 2014
This is an automated email from the git hooks/post-receive script.
smcv pushed a commit to branch debian
in repository libnice.
commit 18e2e3a2cd3b71b5bafac4eb5a4e4d3ccad883e3
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date: Tue Jan 21 09:55:57 2014 +0000
agent: Queue incoming pseudo-TCP messages until ACKs can be sent
If pseudo-TCP messages are received before a socket has been selected
from all the STUN candidates, they would previously be immediately
passed to the pseudo-TCP state machine, which would attempt to send ACKs
for them. This would fail (due to a lack of an outbound UDP socket), and
would incur a retransmit timeout in the TCP state machine. This slowed
down the tests enormously if one agent in a test completed candidate
selection before the other (which is an entirely reasonable scenario).
This never occurred before because the existing tests artificially run
both agents in lock-step, and never send data packets from one to the
other until both have completed candidate selection. This is basically
cheating.
Fix the problem by queuing incoming pseudo-TCP messages until an
outbound UDP socket is available to send the ACKs or SYNACKs on.
---
agent/agent.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
agent/component.c | 8 ++++++
agent/component.h | 5 ++++
3 files changed, 90 insertions(+), 1 deletion(-)
diff --git a/agent/agent.c b/agent/agent.c
index f31816e..4d09223 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -1140,7 +1140,10 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
NiceSocket *sock;
NiceAddress *addr;
+ sock = component->selected_pair.local->sockptr;
+
#ifndef NDEBUG
+{
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
@@ -1149,13 +1152,16 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
component->agent, component->stream->id, component->id, len,
sock->fileno, g_socket_get_fd (sock->fileno), tmpbuf,
nice_address_get_port (&component->selected_pair.remote->addr));
+}
#endif
- sock = component->selected_pair.local->sockptr;
addr = &component->selected_pair.remote->addr;
if (nice_socket_send (sock, addr, len, buffer)) {
return WR_SUCCESS;
}
+ } else {
+ nice_debug ("%s: WARNING: Failed to send pseudo-TCP packet from agent %p "
+ "as no pair has been selected yet.", G_STRFUNC, component->agent);
}
return WR_FAIL;
@@ -1284,6 +1290,52 @@ void agent_signal_initial_binding_request_received (NiceAgent *agent, Stream *st
}
}
+/* If the Component now has a selected_pair, and has pending TCP packets which
+ * it couldn’t receive before due to not being able to send out ACKs (or
+ * SYNACKs, for the initial SYN packet), handle them now.
+ *
+ * Must be called with the agent lock held. */
+static void
+process_queued_tcp_packets (NiceAgent *agent, Stream *stream,
+ Component *component)
+{
+ GOutputVector *vec;
+
+ if (component->selected_pair.local == NULL || component->tcp == NULL)
+ return;
+
+ nice_debug ("%s: Sending outstanding packets for agent %p.", G_STRFUNC,
+ agent);
+
+ while ((vec = g_queue_peek_head (&component->queued_tcp_packets)) != NULL) {
+ gboolean retval;
+
+ g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+
+ nice_debug ("%s: Sending %" G_GSIZE_FORMAT " bytes.", G_STRFUNC, vec->size);
+ retval =
+ pseudo_tcp_socket_notify_packet (component->tcp, vec->buffer,
+ vec->size);
+
+ if (agent != NULL) {
+ adjust_tcp_clock (agent, stream, component);
+ g_object_remove_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+ } else {
+ nice_debug ("%s: Agent %p was destroyed in "
+ "pseudo_tcp_socket_notify_packet().", G_STRFUNC, agent);
+ }
+
+ if (!retval) {
+ /* Failed to send; try again later. */
+ break;
+ }
+
+ g_queue_pop_head (&component->queued_tcp_packets);
+ g_free ((gpointer) vec->buffer);
+ g_slice_free (GOutputVector, vec);
+ }
+}
+
void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint component_id, const gchar *local_foundation, const gchar *remote_foundation)
{
Component *component;
@@ -1301,6 +1353,8 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co
}
if (component->tcp) {
+ process_queued_tcp_packets (agent, stream, component);
+
pseudo_tcp_socket_connect (component->tcp);
pseudo_tcp_socket_notify_mtu (component->tcp, MAX_TCP_MTU);
adjust_tcp_clock (agent, stream, component);
@@ -1382,6 +1436,8 @@ void agent_signal_component_state_change (NiceAgent *agent, guint stream_id, gui
component->state = state;
+ process_queued_tcp_packets (agent, stream, component);
+
g_signal_emit (agent, signals[SIGNAL_COMPONENT_STATE_CHANGED], 0,
stream_id, component_id, state);
}
@@ -2428,6 +2484,26 @@ agent_recv_locked (
handle_tcp:
/* Unhandled STUN; try handling TCP data, then pass to the client. */
if (len > 0 && component->tcp) {
+ /* If we don’t yet have an underlying selected socket, queue up the incoming
+ * data to handle later. This is because we can’t send ACKs (or, more
+ * importantly for the first few packets, SYNACKs) without an underlying
+ * socket. We’d rather wait a little longer for a pair to be selected, then
+ * process the incoming packets and send out ACKs, than try to process them
+ * now, fail to send the ACKs, and incur a timeout in our pseudo-TCP state
+ * machine. */
+ if (component->selected_pair.local == NULL) {
+ GOutputVector *vec = g_slice_new (GOutputVector);
+ vec->buffer = g_memdup (local_buf, len);
+ vec->size = len;
+ g_queue_push_tail (&component->queued_tcp_packets, vec);
+ nice_debug ("%s: Queued %" G_GSSIZE_FORMAT " bytes for agent %p.",
+ G_STRFUNC, len, agent);
+
+ return 0;
+ } else {
+ process_queued_tcp_packets (agent, stream, component);
+ }
+
/* Received data on a reliable connection. */
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
diff --git a/agent/component.c b/agent/component.c
index 590ebcf..63a029e 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -131,6 +131,8 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
component_set_io_context (component, NULL);
component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
+ g_queue_init (&component->queued_tcp_packets);
+
return component;
}
@@ -140,6 +142,7 @@ component_free (Component *cmp)
GSList *i;
GList *item;
IOCallbackData *data;
+ GOutputVector *vec;
for (i = cmp->local_candidates; i; i = i->next) {
NiceCandidate *candidate = i->data;
@@ -201,6 +204,11 @@ component_free (Component *cmp)
cmp->ctx = NULL;
}
+ while ((vec = g_queue_pop_head (&cmp->queued_tcp_packets)) != NULL) {
+ g_free ((gpointer) vec->buffer);
+ g_slice_free (GOutputVector, vec);
+ }
+
g_mutex_clear (&cmp->io_mutex);
g_slice_free (Component, cmp);
diff --git a/agent/component.h b/agent/component.h
index 64d1d34..e66aa0a 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -188,6 +188,11 @@ struct _Component
guint min_port;
guint max_port;
+
+ /* Queue of messages received before a selected socket was available to send
+ * ACKs on. The messages are dequeued to the pseudo-TCP socket once a selected
+ * UDP socket is available. This is only used for reliable Components. */
+ GQueue queued_tcp_packets;
};
Component *
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git
More information about the Pkg-telepathy-commits
mailing list