[Pkg-telepathy-commits] [libnice] 55/265: pseudotcp: Implement window scaling for PseudoTCP.

Simon McVittie smcv at debian.org
Wed May 14 12:04:52 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit fe5dc58b239ac5ab252cd95b9a0650a208e5df9c
Author: Olivier Crête <olivier.crete at collabora.com>
Date:   Fri Jan 3 18:08:39 2014 -0500

    pseudotcp: Implement window scaling for PseudoTCP.
    
    Syncs with libjingle SVN rev 77
---
 agent/pseudotcp.c | 575 +++++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 458 insertions(+), 117 deletions(-)

diff --git a/agent/pseudotcp.c b/agent/pseudotcp.c
index 54a714c..2e8ecc4 100644
--- a/agent/pseudotcp.c
+++ b/agent/pseudotcp.c
@@ -156,6 +156,15 @@ const guint16 PACKET_MAXIMUMS[] = {
 #define DEFAULT_ACK_DELAY    100 /* 100 milliseconds */
 #define DEFAULT_NO_DELAY     FALSE
 
+#define DEFAULT_RCV_BUF_SIZE (60 * 1024)
+#define DEFAULT_SND_BUF_SIZE (90 * 1024)
+
+#define TCP_OPT_EOL       0  // End of list.
+#define TCP_OPT_NOOP      1  // No-op.
+#define TCP_OPT_MSS       2  // Maximum segment size.
+#define TCP_OPT_WND_SCALE 3  // Window scale factor.
+
+
 /*
 #define FLAG_FIN 0x01
 #define FLAG_SYN 0x02
@@ -229,6 +238,150 @@ time_diff(guint32 later, guint32 earlier)
   }
 }
 
+////////////////////////////////////////////////////////
+// PseudoTcpFifo works exactly like FifoBuffer in libjingle
+////////////////////////////////////////////////////////
+
+
+typedef struct {
+  guint8 *buffer;
+  gsize buffer_length;
+  gsize data_length;
+  gsize read_position;
+} PseudoTcpFifo;
+
+
+static void
+pseudo_tcp_fifo_init (PseudoTcpFifo *b, gsize size)
+{
+  b->buffer = g_slice_alloc (size);
+  b->buffer_length = size;
+}
+
+static void
+pseudo_tcp_fifo_clear (PseudoTcpFifo *b)
+{
+  if (b->buffer)
+    g_slice_free1 (b->buffer_length, b->buffer);
+  b->buffer = NULL;
+  b->buffer_length = 0;
+}
+
+static gsize
+pseudo_tcp_fifo_get_buffered (PseudoTcpFifo *b)
+{
+  return b->data_length;
+}
+
+static gboolean
+pseudo_tcp_fifo_set_capacity (PseudoTcpFifo *b, gsize size)
+{
+  if (b->data_length > size)
+    return FALSE;
+
+  if (size != b->data_length) {
+    guint8 *buffer = g_slice_alloc (size);
+    gsize copy = b->data_length;
+    gsize tail_copy = min (copy, b->buffer_length - b->read_position);
+
+    memcpy (buffer, &b->buffer[b->read_position], tail_copy);
+    memcpy (buffer + tail_copy, &b->buffer[0], copy - tail_copy);
+    g_slice_free1 (b->buffer_length, b->buffer);
+    b->buffer = buffer;
+    b->buffer_length = size;
+    b->read_position = 0;
+  }
+
+  return TRUE;
+}
+
+static void
+pseudo_tcp_fifo_consume_read_data (PseudoTcpFifo *b, gsize size)
+{
+  g_assert (size <= b->data_length);
+
+  b->read_position = (b->read_position + size) % b->buffer_length;
+  b->data_length -= size;
+}
+
+static void
+pseudo_tcp_fifo_consume_write_buffer (PseudoTcpFifo *b, gsize size)
+{
+  g_assert (size <= b->buffer_length - b->data_length);
+
+  b->data_length += size;
+}
+
+static gsize
+pseudo_tcp_fifo_get_write_remaining (PseudoTcpFifo *b)
+{
+  return b->buffer_length - b->data_length;
+}
+
+static gsize
+pseudo_tcp_fifo_read_offset (PseudoTcpFifo *b, guint8 *buffer, gsize bytes,
+    gsize offset)
+{
+  gsize available = b->data_length - offset;
+  gsize read_position = (b->read_position + offset) % b->buffer_length;
+  gsize copy = min (bytes, available);
+  gsize tail_copy = min(copy, b->buffer_length - read_position);
+
+  /* EOS */
+  if (offset >= b->data_length)
+    return 0;
+
+  memcpy(buffer, &b->buffer[read_position], tail_copy);
+  memcpy(buffer + tail_copy, &b->buffer[0], copy - tail_copy);
+
+  return copy;
+}
+
+static gsize
+pseudo_tcp_fifo_write_offset (PseudoTcpFifo *b, const guint8 *buffer,
+    gsize bytes, gsize offset)
+{
+  gsize available = b->buffer_length - b->data_length - offset;
+  gsize write_position = (b->read_position + b->data_length + offset)
+      % b->buffer_length;
+  gsize copy = min (bytes, available);
+  gsize tail_copy = min(copy, b->buffer_length - write_position);
+
+  if (b->data_length + offset >= b->buffer_length) {
+    return 0;
+  }
+
+  memcpy(&b->buffer[write_position], buffer, tail_copy);
+  memcpy(&b->buffer[0], buffer + tail_copy, copy - tail_copy);
+
+  return copy;
+}
+
+static gsize
+pseudo_tcp_fifo_read (PseudoTcpFifo *b, guint8 *buffer, gsize bytes)
+{
+  gsize copy;
+
+  copy = pseudo_tcp_fifo_read_offset (b, buffer, bytes, 0);
+
+  b->read_position = (b->read_position + copy) % b->buffer_length;
+  b->data_length -= copy;
+
+  return copy;
+}
+
+static gsize
+pseudo_tcp_fifo_write (PseudoTcpFifo *b, const guint8 *buffer, gsize bytes)
+{
+  gsize copy;
+
+  copy = pseudo_tcp_fifo_write_offset (b, buffer, bytes, 0);
+  b->data_length += copy;
+
+  return copy;
+}
+
+
 //////////////////////////////////////////////////////////////////////
 // PseudoTcp
 //////////////////////////////////////////////////////////////////////
@@ -245,14 +398,6 @@ typedef enum {
   sfImmediateAck
 } SendFlags;
 
-enum {
-  // Note: can't go as high as 1024 * 64, because of uint16 precision
-  kRcvBufSize = 1024 * 60,
-  // Note: send buffer should be larger to make sure we can always fill the
-  // receiver window
-  kSndBufSize = 1024 * 90
-};
-
 typedef struct {
   guint32 conv, seq, ack;
   guint8 flags;
@@ -287,13 +432,16 @@ struct _PseudoTcpSocketPrivate {
 
   // Incoming data
   GList *rlist;
-  gchar rbuf[kRcvBufSize];
-  guint32 rcv_nxt, rcv_wnd, rpos, rlen, lastrecv;
+  guint32 rbuf_len, rcv_nxt, rcv_wnd, lastrecv;
+  guint8 rwnd_scale; // Window scale factor
+  PseudoTcpFifo rbuf;
 
   // Outgoing data
   GList *slist;
-  gchar sbuf[kSndBufSize];
-  guint32 snd_nxt, snd_wnd, slen, lastsend, snd_una;
+  guint32 sbuf_len, snd_nxt, snd_wnd, lastsend, snd_una;
+  guint8 swnd_scale; // Window scale factor
+  PseudoTcpFifo sbuf;
+
   // Maximum segment size, estimated protocol level, largest segment sent
   guint32 mss, msslevel, largest, mtu_advise;
   // Retransmit timer
@@ -313,6 +461,10 @@ struct _PseudoTcpSocketPrivate {
 
   gboolean use_nagling;
   guint32 ack_delay;
+
+  // This is used by unit tests to test backward compatibility of
+  // PseudoTcp implementations that don't support window scaling.
+  gboolean support_wnd_scale;
 };
 
 
@@ -324,6 +476,8 @@ enum
   PROP_STATE,
   PROP_ACK_DELAY,
   PROP_NO_DELAY,
+  PROP_RCV_BUF,
+  PROP_SND_BUF,
   LAST_PROPERTY
 };
 
@@ -335,10 +489,11 @@ static void pseudo_tcp_socket_set_property (GObject *object, guint property_id,
 static void pseudo_tcp_socket_finalize (GObject *object);
 
 
+static void queue_connect_message (PseudoTcpSocket *self);
 static guint32 queue(PseudoTcpSocket *self, const gchar * data,
     guint32 len, gboolean bCtrl);
 static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq,
-    guint8 flags, const gchar * data, guint32 len);
+    guint8 flags, guint32 offset, guint32 len);
 static gboolean parse(PseudoTcpSocket *self,
     const guint8 * buffer, guint32 size);
 static gboolean process(PseudoTcpSocket *self, Segment *seg);
@@ -346,6 +501,10 @@ static gboolean transmit(PseudoTcpSocket *self, const GList *seg, guint32 now);
 static void attempt_send(PseudoTcpSocket *self, SendFlags sflags);
 static void closedown(PseudoTcpSocket *self, guint32 err);
 static void adjustMTU(PseudoTcpSocket *self);
+static void parse_options (PseudoTcpSocket *self, const guint8 *data,
+    guint32 len);
+static void resize_send_buffer (PseudoTcpSocket *self, guint32 new_size);
+static void resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size);
 
 
 // The following logging is for detailed (packet-level) pseudotcp analysis only.
@@ -409,6 +568,18 @@ pseudo_tcp_socket_class_init (PseudoTcpSocketClass *cls)
           "Disable the Nagle algorithm (like the TCP_NODELAY option)",
           DEFAULT_NO_DELAY,
           G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (object_class, PROP_RCV_BUF,
+      g_param_spec_uint ("rcv-buf", "Receive Buffer",
+          "Receive Buffer size",
+          1, G_MAXUINT, DEFAULT_RCV_BUF_SIZE,
+          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (object_class, PROP_SND_BUF,
+      g_param_spec_uint ("snd-buf", "Send Buffer",
+          "Send Buffer size",
+          1, G_MAXUINT, DEFAULT_SND_BUF_SIZE,
+          G_PARAM_READWRITE| G_PARAM_STATIC_STRINGS));
 }
 
 
@@ -436,6 +607,12 @@ pseudo_tcp_socket_get_property (GObject *object,
     case PROP_NO_DELAY:
       g_value_set_boolean (value, !self->priv->use_nagling);
       break;
+    case PROP_RCV_BUF:
+      g_value_set_uint (value, self->priv->rbuf_len);
+      break;
+    case PROP_SND_BUF:
+      g_value_set_uint (value, self->priv->sbuf_len);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
       break;
@@ -466,6 +643,14 @@ pseudo_tcp_socket_set_property (GObject *object,
     case PROP_NO_DELAY:
       self->priv->use_nagling = !g_value_get_boolean (value);
       break;
+    case PROP_RCV_BUF:
+      g_return_if_fail (self->priv->state == TCP_LISTEN);
+      resize_receive_buffer (self, g_value_get_uint (value));
+      break;
+    case PROP_SND_BUF:
+      g_return_if_fail (self->priv->state == TCP_LISTEN);
+      resize_send_buffer (self, g_value_get_uint (value));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
       break;
@@ -495,6 +680,9 @@ pseudo_tcp_socket_finalize (GObject *object)
   g_list_free (priv->rlist);
   priv->rlist = NULL;
 
+  pseudo_tcp_fifo_clear (&priv->rbuf);
+  pseudo_tcp_fifo_clear (&priv->sbuf);
+
   g_free (priv);
   self->priv = NULL;
 
@@ -517,12 +705,18 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj)
   priv->shutdown = SD_NONE;
   priv->error = 0;
 
+  priv->rbuf_len = DEFAULT_RCV_BUF_SIZE;
+  pseudo_tcp_fifo_init (&priv->rbuf, priv->rbuf_len);
+  priv->sbuf_len = DEFAULT_SND_BUF_SIZE;
+  pseudo_tcp_fifo_init (&priv->sbuf, priv->sbuf_len);
+
   priv->state = TCP_LISTEN;
   priv->conv = 0;
-  priv->rcv_wnd = sizeof(priv->rbuf);
-  priv->snd_nxt = priv->slen = 0;
+  priv->rcv_wnd = priv->rbuf_len;
+  priv->rwnd_scale = priv->swnd_scale = 0;
+  priv->snd_nxt = 0;
   priv->snd_wnd = 1;
-  priv->snd_una = priv->rcv_nxt = priv->rlen = priv->rpos = 0;
+  priv->snd_una = priv->rcv_nxt = 0;
   priv->bReadEnable = TRUE;
   priv->bWriteEnable = FALSE;
   priv->t_ack = 0;
@@ -535,7 +729,7 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj)
   priv->rto_base = 0;
 
   priv->cwnd = 2 * priv->mss;
-  priv->ssthresh = sizeof(priv->rbuf);
+  priv->ssthresh = priv->rbuf_len;
   priv->lastrecv = priv->lastsend = priv->last_traffic = now;
   priv->bOutgoing = FALSE;
 
@@ -549,6 +743,8 @@ pseudo_tcp_socket_init (PseudoTcpSocket *obj)
 
   priv->ack_delay = DEFAULT_ACK_DELAY;
   priv->use_nagling = !DEFAULT_NO_DELAY;
+
+  priv->support_wnd_scale = TRUE;
 }
 
 PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation,
@@ -561,11 +757,30 @@ PseudoTcpSocket *pseudo_tcp_socket_new (guint32 conversation,
       NULL);
 }
 
+static void
+queue_connect_message (PseudoTcpSocket *self)
+{
+  PseudoTcpSocketPrivate *priv = self->priv;
+  guint8 buf[4];
+  gsize size = 1;
+
+  buf[0] = CTL_CONNECT;
+  if (priv->support_wnd_scale) {
+    buf[1] = TCP_OPT_WND_SCALE;
+    buf[2] = 1;
+    buf[3] = priv->rwnd_scale;
+    size = 4;
+  }
+
+  priv->snd_wnd = size;
+
+  queue(self, (char*) buf, size, TRUE);
+}
+
 gboolean
 pseudo_tcp_socket_connect(PseudoTcpSocket *self)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
-  gchar buffer[1];
 
   if (priv->state != TCP_LISTEN) {
     priv->error = EINVAL;
@@ -575,8 +790,7 @@ pseudo_tcp_socket_connect(PseudoTcpSocket *self)
   priv->state = TCP_SYN_SENT;
   DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_SYN_SENT");
 
-  buffer[0] = CTL_CONNECT;
-  queue(self, buffer, 1, TRUE);
+  queue_connect_message (self);
   attempt_send(self, sfNone);
 
   return TRUE;
@@ -672,13 +886,15 @@ pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
   guint32 now = get_current_time ();
+  gsize snd_buffered;
 
   if (priv->shutdown == SD_FORCEFUL)
     return FALSE;
 
+  snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
   if ((priv->shutdown == SD_GRACEFUL)
       && ((priv->state != TCP_ESTABLISHED)
-          || ((priv->slen == 0) && (priv->t_ack == 0)))) {
+          || ((snd_buffered == 0) && (priv->t_ack == 0)))) {
     return FALSE;
   }
 
@@ -702,62 +918,39 @@ pseudo_tcp_socket_get_next_clock(PseudoTcpSocket *self, long *timeout)
   return TRUE;
 }
 
-static guint32
-get_receive_buffer_space (PseudoTcpSocket *self)
-{
-  PseudoTcpSocketPrivate *priv = self->priv;
-
-  return sizeof(priv->rbuf) - priv->rlen + priv->rpos;
-}
-
-static guint32
-get_receive_buffer_consecutive_space (PseudoTcpSocket *self)
-{
-  PseudoTcpSocketPrivate *priv = self->priv;
-
-  return sizeof(priv->rbuf) - priv->rlen;
-}
-
-static void
-consolidate_receiver_buffer_space (PseudoTcpSocket *self)
-{
-  PseudoTcpSocketPrivate *priv = self->priv;
-
-  memmove(priv->rbuf, priv->rbuf + priv->rpos, sizeof(priv->rbuf) - priv->rpos);
-  priv->rlen -= priv->rpos;
-  priv->rpos = 0;
-}
 
 gint
 pseudo_tcp_socket_recv(PseudoTcpSocket *self, char * buffer, size_t len)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
-  guint32 read;
+  gsize read;
+  gsize available_space;
 
   if (priv->state != TCP_ESTABLISHED) {
     priv->error = ENOTCONN;
     return -1;
   }
 
-  // Make sure read position is correct.
-  g_assert (priv->rpos <= priv->rlen);
-  if (priv->rlen == priv->rpos) {
+  if (len == 0)
+    return 0;
+
+  read = pseudo_tcp_fifo_read (&priv->rbuf, (guint8 *) buffer, len);
+
+ // If there's no data in |m_rbuf|.
+  if (read == 0) {
     priv->bReadEnable = TRUE;
     priv->error = EWOULDBLOCK;
     return -1;
   }
 
-  read = min((guint32) len, priv->rlen - priv->rpos);
-  memcpy(buffer, priv->rbuf + priv->rpos, read);
-  priv->rpos += read;
-
+  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
 
-  if (get_receive_buffer_space (self) - priv->rcv_wnd >=
-      min(sizeof(priv->rbuf) / 2, priv->mss)) {
+  if (available_space - priv->rcv_wnd >=
+      min (priv->rbuf_len / 2, priv->mss)) {
     // !?! Not sure about this was closed business
     gboolean bWasClosed = (priv->rcv_wnd == 0);
 
-    priv->rcv_wnd = get_receive_buffer_space (self);
+    priv->rcv_wnd = available_space;
 
     if (bWasClosed) {
       attempt_send(self, sfImmediateAck);
@@ -772,13 +965,16 @@ pseudo_tcp_socket_send(PseudoTcpSocket *self, const char * buffer, guint32 len)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
   gint written;
+  gsize available_space;
 
   if (priv->state != TCP_ESTABLISHED) {
     priv->error = ENOTCONN;
     return -1;
   }
 
-  if (priv->slen == sizeof(priv->sbuf)) {
+  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
+
+  if (!available_space) {
     priv->bWriteEnable = TRUE;
     priv->error = EWOULDBLOCK;
     return -1;
@@ -798,7 +994,8 @@ void
 pseudo_tcp_socket_close(PseudoTcpSocket *self, gboolean force)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
-  //nice_agent ("Closing socket %p : %d", sock, force?"true":"false");
+  DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "Closing socket %p : %s", self,
+      force ? "forcefully" : "gracefully");
   priv->shutdown = force ? SD_FORCEFUL : SD_GRACEFUL;
 }
 
@@ -817,10 +1014,12 @@ static guint32
 queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
+  gsize available_space;
 
-  if (len > sizeof(priv->sbuf) - priv->slen) {
+  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
+  if (len > available_space) {
     g_assert(!bCtrl);
-    len = sizeof(priv->sbuf) - priv->slen;
+    len = available_space;
   }
 
   // We can concatenate data if the last segment is the same type
@@ -831,21 +1030,30 @@ queue(PseudoTcpSocket *self, const gchar * data, guint32 len, gboolean bCtrl)
     ((SSegment *)g_list_last (priv->slist)->data)->len += len;
   } else {
     SSegment *sseg = g_slice_new0 (SSegment);
-    sseg->seq = priv->snd_una + priv->slen;
+    gsize snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
+
+    sseg->seq = priv->snd_una + snd_buffered;
     sseg->len = len;
     sseg->bCtrl = bCtrl;
     priv->slist = g_list_append (priv->slist, sseg);
   }
 
-  memcpy(priv->sbuf + priv->slen, data, len);
-  priv->slen += len;
   //LOG(LS_INFO) << "PseudoTcp::queue - priv->slen = " << priv->slen;
-  return len;
+  return pseudo_tcp_fifo_write (&priv->sbuf, (guint8*) data, len);;
 }
 
+// Creates a packet and submits it to the network. This method can either
+// send payload or just an ACK packet.
+//
+// |seq| is the sequence number of this packet.
+// |flags| is the flags for sending this packet.
+// |offset| is the offset to read from |m_sbuf|.
+// |len| is the number of bytes to read from |m_sbuf| as payload. If this
+// value is 0 then this is an ACK packet, otherwise this packet has payload.
+
 static PseudoTcpWriteResult
 packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
-    const gchar * data, guint32 len)
+    guint32 offset, guint32 len)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
   guint32 now = get_current_time();
@@ -863,15 +1071,20 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
   *(buffer.u32 + 2) = htonl(priv->rcv_nxt);
   buffer.u8[12] = 0;
   buffer.u8[13] = flags;
-  *(buffer.u16 + 7) = htons((guint16)priv->rcv_wnd);
+  *(buffer.u16 + 7) = htons((guint16)(priv->rcv_wnd >> priv->rwnd_scale));
 
   // Timestamp computations
   *(buffer.u32 + 4) = htonl(now);
   *(buffer.u32 + 5) = htonl(priv->ts_recent);
   priv->ts_lastack = priv->rcv_nxt;
 
-  if (data != NULL)
-    memcpy(buffer.u8 + HEADER_SIZE, data, len);
+  if (len) {
+    gsize bytes_read;
+
+    bytes_read = pseudo_tcp_fifo_read_offset (&priv->sbuf, buffer.u8 + HEADER_SIZE,
+        len, offset);
+    g_assert (bytes_read == len);
+  }
 
   DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "<-- <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
       "<WND=%d><TS=%d><TSR=%d><LEN=%d>",
@@ -880,11 +1093,11 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
 
   wres = priv->callbacks.WritePacket(self, (gchar *) buffer.u8, len + HEADER_SIZE,
                                      priv->callbacks.user_data);
-  /* Note: When data is NULL, this is an ACK packet.  We don't read the
+  /* Note: When len is 0, this is an ACK packet.  We don't read the
      return value for those, and thus we won't retry.  So go ahead and treat
      the packet as a success (basically simulate as if it were dropped),
      which will prevent our timers from being messed up. */
-  if ((wres != WR_SUCCESS) && (NULL != data))
+  if ((wres != WR_SUCCESS) && (0 != len))
     return wres;
 
   priv->t_ack = 0;
@@ -943,6 +1156,9 @@ process(PseudoTcpSocket *self, Segment *seg)
   gboolean bIgnoreData;
   gboolean bNewData;
   gboolean bConnect = FALSE;
+  gsize snd_buffered;
+  gsize available_space;
+  guint32 kIdealRefillSize;
 
   /* If this is the wrong conversation, send a reset!?!
      (with the correct conversation?) */
@@ -978,11 +1194,12 @@ process(PseudoTcpSocket *self, Segment *seg)
       return FALSE;
     } else if (seg->data[0] == CTL_CONNECT) {
       bConnect = TRUE;
+
+      parse_options (self, (guint8 *) &seg->data[1], seg->len - 1);
+
       if (priv->state == TCP_LISTEN) {
-        char buffer[1];
         priv->state = TCP_SYN_RECEIVED;
-        buffer[0] = CTL_CONNECT;
-        queue(self, buffer, 1, TRUE);
+        queue_connect_message (self);
       } else if (priv->state == TCP_SYN_SENT) {
         priv->state = TCP_ESTABLISHED;
         DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
@@ -1007,7 +1224,6 @@ process(PseudoTcpSocket *self, Segment *seg)
   if ((seg->ack > priv->snd_una) && (seg->ack <= priv->snd_nxt)) {
     guint32 nAcked;
     guint32 nFree;
-    guint32 kIdealRefillSize;
 
     // Calculate round-trip time
     if (seg->tsecr) {
@@ -1031,16 +1247,14 @@ process(PseudoTcpSocket *self, Segment *seg)
       }
     }
 
-    priv->snd_wnd = seg->wnd;
+    priv->snd_wnd = seg->wnd << priv->swnd_scale;
 
     nAcked = seg->ack - priv->snd_una;
     priv->snd_una = seg->ack;
 
     priv->rto_base = (priv->snd_una == priv->snd_nxt) ? 0 : now;
 
-    priv->slen -= nAcked;
-    memmove(priv->sbuf, priv->sbuf + nAcked, priv->slen);
-    //LOG(LS_INFO) << "PseudoTcp::process - priv->slen = " << priv->slen;
+    pseudo_tcp_fifo_consume_read_data (&priv->sbuf, nAcked);
 
     for (nFree = nAcked; nFree > 0; ) {
       SSegment *data;
@@ -1085,29 +1299,10 @@ process(PseudoTcpSocket *self, Segment *seg)
         priv->cwnd += max(1LU, priv->mss * priv->mss / priv->cwnd);
       }
     }
-
-    // !?! A bit hacky
-    if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
-      priv->state = TCP_ESTABLISHED;
-      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
-      adjustMTU(self);
-      if (priv->callbacks.PseudoTcpOpened)
-        priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
-    }
-
-    // If we make room in the send queue, notify the user
-    // The goal it to make sure we always have at least enough data to fill the
-    // window.  We'd like to notify the app when we are halfway to that point.
-    kIdealRefillSize = (sizeof(priv->sbuf) + sizeof(priv->rbuf)) / 2;
-    if (priv->bWriteEnable && (priv->slen < kIdealRefillSize)) {
-      priv->bWriteEnable = FALSE;
-      if (priv->callbacks.PseudoTcpWritable)
-        priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
-    }
   } else if (seg->ack == priv->snd_una) {
     /* !?! Note, tcp says don't do this... but otherwise how does a
        closed window become open? */
-    priv->snd_wnd = seg->wnd;
+    priv->snd_wnd = seg->wnd << priv->swnd_scale;
 
     // Check duplicate acks
     if (seg->len > 0) {
@@ -1136,6 +1331,27 @@ process(PseudoTcpSocket *self, Segment *seg)
     }
   }
 
+  // !?! A bit hacky
+  if ((priv->state == TCP_SYN_RECEIVED) && !bConnect) {
+    priv->state = TCP_ESTABLISHED;
+    DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_ESTABLISHED");
+    adjustMTU(self);
+    if (priv->callbacks.PseudoTcpOpened)
+      priv->callbacks.PseudoTcpOpened(self, priv->callbacks.user_data);
+  }
+
+  // If we make room in the send queue, notify the user
+  // The goal it to make sure we always have at least enough data to fill the
+  // window.  We'd like to notify the app when we are halfway to that point.
+  kIdealRefillSize = (priv->sbuf_len + priv->rbuf_len) / 2;
+
+  snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
+  if (priv->bWriteEnable && snd_buffered < kIdealRefillSize) {
+    priv->bWriteEnable = FALSE;
+    if (priv->callbacks.PseudoTcpWritable)
+      priv->callbacks.PseudoTcpWritable(self, priv->callbacks.user_data);
+  }
+
   /* Conditions where acks must be sent:
    * 1) Segment is too old (they missed an ACK) (immediately)
    * 2) Segment is too new (we missed a segment) (immediately)
@@ -1172,9 +1388,11 @@ process(PseudoTcpSocket *self, Segment *seg)
       seg->len = 0;
     }
   }
-  if ((seg->seq + seg->len - priv->rcv_nxt) > get_receive_buffer_space (self)) {
-    guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt -
-        get_receive_buffer_space (self);
+
+  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
+
+  if ((seg->seq + seg->len - priv->rcv_nxt) > available_space) {
+    guint32 nAdjust = seg->seq + seg->len - priv->rcv_nxt - available_space;
     if (nAdjust < seg->len) {
       seg->len -= nAdjust;
     } else {
@@ -1192,18 +1410,16 @@ process(PseudoTcpSocket *self, Segment *seg)
       }
     } else {
       guint32 nOffset = seg->seq - priv->rcv_nxt;
+      gsize res;
 
-      if (get_receive_buffer_consecutive_space (self) < seg->len + nOffset) {
-        consolidate_receiver_buffer_space (self);
-        g_assert (get_receive_buffer_consecutive_space (self) >=
-            seg->len + nOffset);
-      }
+      res = pseudo_tcp_fifo_write_offset (&priv->rbuf, (guint8 *) seg->data,
+          seg->len, nOffset);
+      g_assert (res == seg->len);
 
-      memcpy(priv->rbuf + priv->rlen + nOffset, seg->data, seg->len);
       if (seg->seq == priv->rcv_nxt) {
         GList *iter = NULL;
 
-        priv->rlen += seg->len;
+        pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, seg->len);
         priv->rcv_nxt += seg->len;
         priv->rcv_wnd -= seg->len;
         bNewData = TRUE;
@@ -1216,7 +1432,7 @@ process(PseudoTcpSocket *self, Segment *seg)
             sflags = sfImmediateAck; // (Fast Recovery)
             DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Recovered %d bytes (%d -> %d)",
                 nAdjust, priv->rcv_nxt, priv->rcv_nxt + nAdjust);
-            priv->rlen += nAdjust;
+            pseudo_tcp_fifo_consume_write_buffer (&priv->rbuf, nAdjust);
             priv->rcv_nxt += nAdjust;
             priv->rcv_wnd -= nAdjust;
           }
@@ -1268,8 +1484,8 @@ transmit(PseudoTcpSocket *self, const GList *seg, guint32 now)
   while (TRUE) {
     guint32 seq = segment->seq;
     guint8 flags = (segment->bCtrl ? FLAG_CTL : 0);
-    const gchar * buffer = priv->sbuf + (segment->seq - priv->snd_una);
-    PseudoTcpWriteResult wres = packet(self, seq, flags, buffer, nTransmit);
+    PseudoTcpWriteResult wres = packet(self, seq, flags,
+        segment->seq - priv->snd_una, nTransmit);
 
     if (wres == WR_SUCCESS)
       break;
@@ -1344,6 +1560,7 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
     guint32 nInFlight;
     guint32 nUseable;
     guint32 nAvailable;
+    gsize snd_buffered;
     GList *iter;
 
     cwnd = priv->cwnd;
@@ -1353,7 +1570,8 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
     nWindow = min(priv->snd_wnd, cwnd);
     nInFlight = priv->snd_nxt - priv->snd_una;
     nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0;
-    nAvailable = min(priv->slen - nInFlight, priv->mss);
+    snd_buffered = pseudo_tcp_fifo_get_buffered (&priv->sbuf);
+    nAvailable = min(snd_buffered - nInFlight, priv->mss);
 
     if (nAvailable > nUseable) {
       if (nUseable * 4 < nWindow) {
@@ -1365,12 +1583,13 @@ attempt_send(PseudoTcpSocket *self, SendFlags sflags)
     }
 
     if (bFirst) {
+      gsize available_space = pseudo_tcp_fifo_get_write_remaining (&priv->sbuf);
       bFirst = FALSE;
       DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "[cwnd: %d  nWindow: %d  nInFlight: %d "
-          "nAvailable: %d nQueued: %d  nEmpty: %" G_GSIZE_FORMAT
+          "nAvailable: %d nQueued: %" G_GSIZE_FORMAT " nEmpty: %" G_GSIZE_FORMAT
           "  ssthresh: %d]",
-          priv->cwnd, nWindow, nInFlight, nAvailable, priv->slen - nInFlight,
-          sizeof(priv->sbuf) - priv->slen, priv->ssthresh);
+          priv->cwnd, nWindow, nInFlight, nAvailable, snd_buffered,
+          available_space, priv->ssthresh);
     }
 
     if (nAvailable == 0) {
@@ -1427,7 +1646,6 @@ static void
 closedown(PseudoTcpSocket *self, guint32 err)
 {
   PseudoTcpSocketPrivate *priv = self->priv;
-  priv->slen = 0;
 
   DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "State: TCP_CLOSED");
   priv->state = TCP_CLOSED;
@@ -1455,3 +1673,126 @@ adjustMTU(PseudoTcpSocket *self)
   priv->ssthresh = max(priv->ssthresh, 2 * priv->mss);
   priv->cwnd = max(priv->cwnd, priv->mss);
 }
+
+static void
+apply_window_scale_option (PseudoTcpSocket *self, guint8 scale_factor)
+{
+   PseudoTcpSocketPrivate *priv = self->priv;
+
+   priv->swnd_scale = scale_factor;
+}
+
+static void
+apply_option(PseudoTcpSocket *self, char kind, const guint8* data, guint32 len)
+{
+  if (kind == TCP_OPT_MSS) {
+    DEBUG (PSEUDO_TCP_DEBUG_NORMAL,
+        "Peer specified MSS option which is not supported.");
+    // TODO: Implement.
+  } else if (kind == TCP_OPT_WND_SCALE) {
+    // Window scale factor.
+    // http://www.ietf.org/rfc/rfc1323.txt
+    if (len != 1) {
+      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid window scale option received.");
+      return;
+    }
+    apply_window_scale_option(self, data[0]);
+  }
+}
+
+
+static void
+parse_options (PseudoTcpSocket *self, const guint8 *data, guint32 len)
+{
+  PseudoTcpSocketPrivate *priv = self->priv;
+  gboolean has_window_scaling_option = FALSE;
+  guint32 pos = 0;
+
+  // See http://www.freesoft.org/CIE/Course/Section4/8.htm for
+  // parsing the options list.
+  while (pos < len) {
+    guint8 kind = TCP_OPT_EOL;
+    guint8 opt_len;
+
+    kind = data[pos];
+    pos++;
+
+    if (kind == TCP_OPT_EOL) {
+      // End of option list.
+      break;
+    } else if (kind == TCP_OPT_NOOP) {
+      // No op.
+      continue;
+    }
+
+    // Length of this option.
+    g_assert(len);
+    opt_len = data[pos];
+    pos++;
+
+    // Content of this option.
+    if (opt_len <= len - pos) {
+      apply_option (self, kind, data + pos, opt_len);
+      pos += opt_len;
+    } else {
+      DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Invalid option length received.");
+      return;
+    }
+
+    if (kind == TCP_OPT_WND_SCALE)
+      has_window_scaling_option = TRUE;
+  }
+
+  if (!has_window_scaling_option) {
+    DEBUG (PSEUDO_TCP_DEBUG_NORMAL, "Peer doesn't support window scaling");
+    if (priv->rwnd_scale > 0) {
+      // Peer doesn't support TCP options and window scaling.
+      // Revert receive buffer size to default value.
+      resize_receive_buffer (self, DEFAULT_RCV_BUF_SIZE);
+      priv->swnd_scale = 0;
+    }
+  }
+}
+
+static void
+resize_send_buffer (PseudoTcpSocket *self, guint32 new_size)
+{
+  PseudoTcpSocketPrivate *priv = self->priv;
+
+  priv->sbuf_len = new_size;
+  pseudo_tcp_fifo_set_capacity (&priv->sbuf, new_size);
+}
+
+
+static void
+resize_receive_buffer (PseudoTcpSocket *self, guint32 new_size)
+{
+  PseudoTcpSocketPrivate *priv = self->priv;
+  guint8 scale_factor = 0;
+  gboolean result;
+  gsize available_space;
+
+  // Determine the scale factor such that the scaled window size can fit
+  // in a 16-bit unsigned integer.
+  while (new_size > 0xFFFF) {
+    ++scale_factor;
+    new_size >>= 1;
+  }
+
+  // Determine the proper size of the buffer.
+  new_size <<= scale_factor;
+  result = pseudo_tcp_fifo_set_capacity (&priv->rbuf, new_size);
+
+  // Make sure the new buffer is large enough to contain data in the old
+  // buffer. This should always be true because this method is called either
+  // before connection is established or when peers are exchanging connect
+  // messages.
+  g_assert(result);
+  priv->rbuf_len = new_size;
+  priv->rwnd_scale = scale_factor;
+  priv->ssthresh = new_size;
+
+  available_space = pseudo_tcp_fifo_get_write_remaining (&priv->rbuf);
+  priv->rcv_wnd = available_space;
+}
+

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list