[Pkg-telepathy-commits] [libnice] 96/265: agent: Queue incoming pseudo-TCP messages until ACKs can be sent

Simon McVittie smcv at debian.org
Wed May 14 12:04:56 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit 18e2e3a2cd3b71b5bafac4eb5a4e4d3ccad883e3
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date:   Tue Jan 21 09:55:57 2014 +0000

    agent: Queue incoming pseudo-TCP messages until ACKs can be sent
    
    If pseudo-TCP messages are received before a socket has been selected
    from all the STUN candidates, they would previously be immediately
    passed to the pseudo-TCP state machine, which would attempt to send ACKs
    for them. This would fail (due to a lack of an outbound UDP socket), and
    would incur a retransmit timeout in the TCP state machine. This slowed
    down the tests enormously if one agent in a test completed candidate
    selection before the other (which is an entirely reasonable scenario).
    
    This never occurred before because the existing tests artificially run
    both agents in lock-step, and never send data packets from one to the
    other until both have completed candidate selection. This is basically
    cheating.
    
    Fix the problem by queuing incoming pseudo-TCP messages until an
    outbound UDP socket is available to send the ACKs or SYNACKs on.
---
 agent/agent.c     | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 agent/component.c |  8 ++++++
 agent/component.h |  5 ++++
 3 files changed, 90 insertions(+), 1 deletion(-)

diff --git a/agent/agent.c b/agent/agent.c
index f31816e..4d09223 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -1140,7 +1140,10 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
     NiceSocket *sock;
     NiceAddress *addr;
 
+    sock = component->selected_pair.local->sockptr;
+
 #ifndef NDEBUG
+{
     gchar tmpbuf[INET6_ADDRSTRLEN];
     nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
 
@@ -1149,13 +1152,16 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
         component->agent, component->stream->id, component->id, len,
         sock->fileno, g_socket_get_fd (sock->fileno), tmpbuf,
         nice_address_get_port (&component->selected_pair.remote->addr));
+}
 #endif
 
-    sock = component->selected_pair.local->sockptr;
     addr = &component->selected_pair.remote->addr;
     if (nice_socket_send (sock, addr, len, buffer)) {
       return WR_SUCCESS;
     }
+  } else {
+    nice_debug ("%s: WARNING: Failed to send pseudo-TCP packet from agent %p "
+        "as no pair has been selected yet.", G_STRFUNC, component->agent);
   }
 
   return WR_FAIL;
@@ -1284,6 +1290,52 @@ void agent_signal_initial_binding_request_received (NiceAgent *agent, Stream *st
   }
 }
 
+/* If the Component now has a selected_pair, and has pending TCP packets which
+ * it couldn’t receive before due to not being able to send out ACKs (or
+ * SYNACKs, for the initial SYN packet), handle them now.
+ *
+ * Must be called with the agent lock held. */
+static void
+process_queued_tcp_packets (NiceAgent *agent, Stream *stream,
+    Component *component)
+{
+  GOutputVector *vec;
+
+  if (component->selected_pair.local == NULL || component->tcp == NULL)
+    return;
+
+  nice_debug ("%s: Sending outstanding packets for agent %p.", G_STRFUNC,
+      agent);
+
+  while ((vec = g_queue_peek_head (&component->queued_tcp_packets)) != NULL) {
+    gboolean retval;
+
+    g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+
+    nice_debug ("%s: Sending %" G_GSIZE_FORMAT " bytes.", G_STRFUNC, vec->size);
+    retval =
+        pseudo_tcp_socket_notify_packet (component->tcp, vec->buffer,
+            vec->size);
+
+    if (agent != NULL) {
+      adjust_tcp_clock (agent, stream, component);
+      g_object_remove_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
+    } else {
+      nice_debug ("%s: Agent %p was destroyed in "
+          "pseudo_tcp_socket_notify_packet().", G_STRFUNC, agent);
+    }
+
+    if (!retval) {
+      /* Failed to send; try again later. */
+      break;
+    }
+
+    g_queue_pop_head (&component->queued_tcp_packets);
+    g_free ((gpointer) vec->buffer);
+    g_slice_free (GOutputVector, vec);
+  }
+}
+
 void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint component_id, const gchar *local_foundation, const gchar *remote_foundation)
 {
   Component *component;
@@ -1301,6 +1353,8 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co
   }
 
   if (component->tcp) {
+    process_queued_tcp_packets (agent, stream, component);
+
     pseudo_tcp_socket_connect (component->tcp);
     pseudo_tcp_socket_notify_mtu (component->tcp, MAX_TCP_MTU);
     adjust_tcp_clock (agent, stream, component);
@@ -1382,6 +1436,8 @@ void agent_signal_component_state_change (NiceAgent *agent, guint stream_id, gui
 
     component->state = state;
 
+    process_queued_tcp_packets (agent, stream, component);
+
     g_signal_emit (agent, signals[SIGNAL_COMPONENT_STATE_CHANGED], 0,
 		   stream_id, component_id, state);
   }
@@ -2428,6 +2484,26 @@ agent_recv_locked (
 handle_tcp:
   /* Unhandled STUN; try handling TCP data, then pass to the client. */
   if (len > 0 && component->tcp) {
+    /* If we don’t yet have an underlying selected socket, queue up the incoming
+     * data to handle later. This is because we can’t send ACKs (or, more
+     * importantly for the first few packets, SYNACKs) without an underlying
+     * socket. We’d rather wait a little longer for a pair to be selected, then
+     * process the incoming packets and send out ACKs, than try to process them
+     * now, fail to send the ACKs, and incur a timeout in our pseudo-TCP state
+     * machine. */
+    if (component->selected_pair.local == NULL) {
+      GOutputVector *vec = g_slice_new (GOutputVector);
+      vec->buffer = g_memdup (local_buf, len);
+      vec->size = len;
+      g_queue_push_tail (&component->queued_tcp_packets, vec);
+      nice_debug ("%s: Queued %" G_GSSIZE_FORMAT " bytes for agent %p.",
+          G_STRFUNC, len, agent);
+
+      return 0;
+    } else {
+      process_queued_tcp_packets (agent, stream, component);
+    }
+
     /* Received data on a reliable connection. */
     g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
 
diff --git a/agent/component.c b/agent/component.c
index 590ebcf..63a029e 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -131,6 +131,8 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
   component_set_io_context (component, NULL);
   component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
 
+  g_queue_init (&component->queued_tcp_packets);
+
   return component;
 }
 
@@ -140,6 +142,7 @@ component_free (Component *cmp)
   GSList *i;
   GList *item;
   IOCallbackData *data;
+  GOutputVector *vec;
 
   for (i = cmp->local_candidates; i; i = i->next) {
     NiceCandidate *candidate = i->data;
@@ -201,6 +204,11 @@ component_free (Component *cmp)
     cmp->ctx = NULL;
   }
 
+  while ((vec = g_queue_pop_head (&cmp->queued_tcp_packets)) != NULL) {
+    g_free ((gpointer) vec->buffer);
+    g_slice_free (GOutputVector, vec);
+  }
+
   g_mutex_clear (&cmp->io_mutex);
 
   g_slice_free (Component, cmp);
diff --git a/agent/component.h b/agent/component.h
index 64d1d34..e66aa0a 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -188,6 +188,11 @@ struct _Component
 
   guint min_port;
   guint max_port;
+
+  /* Queue of messages received before a selected socket was available to send
+   * ACKs on. The messages are dequeued to the pseudo-TCP socket once a selected
+   * UDP socket is available. This is only used for reliable Components. */
+  GQueue queued_tcp_packets;
 };
 
 Component *

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list