[Pkg-telepathy-commits] [libnice] 128/265: agent: Restore the ability nice_agent_send() to send partial buffers

Simon McVittie smcv at debian.org
Wed May 14 12:05:00 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit 919b2c46b5a42661986758bc4cd662e5d571ccd6
Author: Olivier Crête <olivier.crete at collabora.com>
Date:   Thu Jan 30 22:06:32 2014 -0500

    agent: Restore the ability nice_agent_send() to send partial buffers
    
    This is very important for reliable mode.
    
    Also use it in the GOutputStream so as to not get into the case where
    there is still some space in the TCP buffer, but not enough for one message.
    Also warn against this problem.
---
 agent/agent.c        | 157 ++++++++++++++++++++++++++++++++-------------------
 agent/agent.h        |   7 +++
 agent/outputstream.c |  71 +++++++++--------------
 3 files changed, 134 insertions(+), 101 deletions(-)

diff --git a/agent/agent.c b/agent/agent.c
index 7e6af0d..39223f3 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -1035,23 +1035,29 @@ pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data)
  *
  * Returns the number of messages successfully sent on success (which may be
  * zero if sending the first buffer of the message would have blocked), or
- * a negative number on error. */
+ * a negative number on error. If "allow_partial" is TRUE, then it returns
+ * the number of bytes sent
+ */
 static gint
 pseudo_tcp_socket_send_messages (PseudoTcpSocket *self,
-    const NiceOutputMessage *messages, guint n_messages, GError **error)
+    const NiceOutputMessage *messages, guint n_messages, gboolean allow_partial,
+    GError **error)
 {
   guint i;
+  gint bytes_sent = 0;
 
   for (i = 0; i < n_messages; i++) {
     const NiceOutputMessage *message = &messages[i];
     guint j;
 
-    /* If there’s not enough space for the entire message, bail now before
-     * queuing anything. This doesn’t gel with the fact this function is only
-     * used in reliable mode, and there is no concept of a ‘message’, but is
-     * necessary because the calling API has no way of returning to the client
+    /* If allow_partial is FALSE and there’s not enough space for the
+     * entire message, bail now before queuing anything. This doesn’t
+     * gel with the fact this function is only used in reliable mode,
+     * and there is no concept of a ‘message’, but is necessary
+     * because the calling API has no way of returning to the client
      * and indicating that a message was partially sent. */
-    if (output_message_get_size (message) >
+    if (!allow_partial &&
+        output_message_get_size (message) >
         pseudo_tcp_socket_get_available_send_space (self)) {
       return i;
     }
@@ -1068,22 +1074,26 @@ pseudo_tcp_socket_send_messages (PseudoTcpSocket *self,
 
       /* In case of -1, the error is either EWOULDBLOCK or ENOTCONN, which both
        * need the user to wait for the reliable-transport-writable signal */
-      if (ret < 0 && pseudo_tcp_socket_get_error (self) == EWOULDBLOCK) {
-        ret = 0;
-        return i;
-      } else if (ret < 0 && pseudo_tcp_socket_get_error (self) == ENOTCONN) {
-        g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-            "TCP connection is not yet established.");
-        return ret;
-      } else if (ret < 0) {
-        g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+      if (ret < 0) {
+        if (pseudo_tcp_socket_get_error (self) == EWOULDBLOCK)
+          goto out;
+
+        if (pseudo_tcp_socket_get_error (self) == ENOTCONN)
+          g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+              "TCP connection is not yet established.");
+        else
+          g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
             "Error writing data to pseudo-TCP socket.");
-        return ret;
+        return -1;
+      } else {
+        bytes_sent += ret;
       }
     }
   }
 
-  return i;
+ out:
+
+  return allow_partial ? bytes_sent : (gint) i;
 }
 
 /* Will fill up @messages from the first free byte onwards (as determined using
@@ -2881,7 +2891,7 @@ output_message_get_size (const NiceOutputMessage *message)
   return message_len;
 }
 
-/**
+/*
  * nice_input_message_iter_reset:
  * @iter: a #NiceInputMessageIter
  *
@@ -2898,7 +2908,7 @@ nice_input_message_iter_reset (NiceInputMessageIter *iter)
   iter->offset = 0;
 }
 
-/**
+/*
  * nice_input_message_iter_is_at_end:
  * @iter: a #NiceInputMessageIter
  * @messages: (array length=n_messages): an array of #NiceInputMessages
@@ -2920,7 +2930,7 @@ nice_input_message_iter_is_at_end (NiceInputMessageIter *iter,
       iter->buffer == 0 && iter->offset == 0);
 }
 
-/**
+/*
  * nice_input_message_iter_get_n_valid_messages:
  * @iter: a #NiceInputMessageIter
  *
@@ -3227,31 +3237,29 @@ nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id,
   return local_messages.length;
 }
 
-NICEAPI_EXPORT gint
-nice_agent_send_messages_nonblocking (
+/* nice_agent_send_messages_nonblocking_internal:
+ *
+ * Returns: number of bytes sent if allow_partial is %TRUE, the number
+ * of messages otherwise.
+ */
+
+static gint
+nice_agent_send_messages_nonblocking_internal (
   NiceAgent *agent,
   guint stream_id,
   guint component_id,
   const NiceOutputMessage *messages,
   guint n_messages,
-  GCancellable *cancellable,
+  gboolean allow_partial,
   GError **error)
 {
   Stream *stream;
   Component *component;
-  gint n_sent_messages = -1;
+  gint n_sent = -1; /* is in bytes if allow_partial is TRUE,
+                       otherwise in messages */
   GError *child_error = NULL;
 
-  g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
-  g_return_val_if_fail (stream_id >= 1, -1);
-  g_return_val_if_fail (component_id >= 1, -1);
-  g_return_val_if_fail (n_messages == 0 || messages != NULL, -1);
-  g_return_val_if_fail (
-      cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
-  g_return_val_if_fail (error == NULL || *error == NULL, -1);
-
-  if (g_cancellable_set_error_if_cancelled (cancellable, error))
-    return -1;
+  g_assert (n_messages == 1 || !allow_partial);
 
   agent_lock ();
 
@@ -3267,15 +3275,15 @@ nice_agent_send_messages_nonblocking (
 
   if (component->tcp != NULL) {
     /* Send on the pseudo-TCP socket. */
-    n_sent_messages = pseudo_tcp_socket_send_messages (component->tcp, messages,
-        n_messages, &child_error);
+    n_sent = pseudo_tcp_socket_send_messages (component->tcp, messages,
+        n_messages, allow_partial, &child_error);
     adjust_tcp_clock (agent, stream, component);
 
     if (!pseudo_tcp_socket_can_send (component->tcp))
       g_cancellable_reset (component->tcp_writable_cancellable);
-
-    if (n_sent_messages < 0) {
-      /* Signal error */
+    if (n_sent < 0 && !g_error_matches (child_error, G_IO_ERROR,
+            G_IO_ERROR_WOULD_BLOCK)) {
+      /* Signal errors */
       priv_pseudo_tcp_error (agent, stream, component);
     }
   } else if (agent->reliable) {
@@ -3297,39 +3305,69 @@ nice_agent_send_messages_nonblocking (
     sock = component->selected_pair.local->sockptr;
     addr = &component->selected_pair.remote->addr;
 
-    n_sent_messages = nice_socket_send_messages (sock, addr, messages,
-        n_messages);
+    n_sent = nice_socket_send_messages (sock, addr, messages, n_messages);
 
-    if (n_sent_messages < 0) {
+    if (n_sent < 0) {
       g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
           "Error writing data to socket.");
+    } else if (allow_partial) {
+      g_assert (n_messages == 1);
+      n_sent = output_message_get_size (messages);
     }
   } else {
     /* Socket isn’t properly open yet. */
-    n_sent_messages = 0;  /* EWOULDBLOCK */
+    n_sent = 0;  /* EWOULDBLOCK */
   }
 
   /* Handle errors and cancellations. */
-  if (n_sent_messages == 0) {
+  if (n_sent == 0) {
     g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
         g_strerror (EAGAIN));
-    n_sent_messages = -1;
+    n_sent = -1;
   }
 
-  nice_debug ("%s: n_sent_messages: %d, n_messages: %u", G_STRFUNC,
-      n_sent_messages, n_messages);
+  nice_debug ("%s: n_sent: %d, n_messages: %u", G_STRFUNC,
+      n_sent, n_messages);
 
 done:
-  g_assert ((child_error != NULL) == (n_sent_messages == -1));
-  g_assert (n_sent_messages != 0);
-  g_assert (n_sent_messages < 0 || (guint) n_sent_messages <= n_messages);
+  g_assert ((child_error != NULL) == (n_sent == -1));
+  g_assert (n_sent != 0);
+  g_assert (n_sent < 0 ||
+      (!allow_partial && (guint) n_sent <= n_messages) ||
+      (allow_partial && n_messages == 1 &&
+          (gsize) n_sent <= output_message_get_size (&messages[0])));
 
   if (child_error != NULL)
     g_propagate_error (error, child_error);
 
   agent_unlock ();
 
-  return n_sent_messages;
+  return n_sent;
+}
+
+NICEAPI_EXPORT gint
+nice_agent_send_messages_nonblocking (
+  NiceAgent *agent,
+  guint stream_id,
+  guint component_id,
+  const NiceOutputMessage *messages,
+  guint n_messages,
+  GCancellable *cancellable,
+  GError **error)
+{
+  g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
+  g_return_val_if_fail (stream_id >= 1, -1);
+  g_return_val_if_fail (component_id >= 1, -1);
+  g_return_val_if_fail (n_messages == 0 || messages != NULL, -1);
+  g_return_val_if_fail (
+      cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
+  g_return_val_if_fail (error == NULL || *error == NULL, -1);
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  return nice_agent_send_messages_nonblocking_internal (agent, stream_id,
+      component_id, messages, n_messages, FALSE, error);
 }
 
 NICEAPI_EXPORT gint
@@ -3341,15 +3379,18 @@ nice_agent_send (
   const gchar *buf)
 {
   GOutputVector local_buf = { buf, len };
-  gint n_sent_messages;
   NiceOutputMessage local_message = { &local_buf, 1 };
+  gint n_sent_bytes;
+
+  g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
+  g_return_val_if_fail (stream_id >= 1, -1);
+  g_return_val_if_fail (component_id >= 1, -1);
+  g_return_val_if_fail (buf != NULL, -1);
 
-  n_sent_messages = nice_agent_send_messages_nonblocking (agent, stream_id,
-      component_id, &local_message, 1, NULL, NULL);
+  n_sent_bytes = nice_agent_send_messages_nonblocking_internal (agent,
+      stream_id, component_id, &local_message, 1, TRUE, NULL);
 
-  if (n_sent_messages == 1)
-    return len;
-  return n_sent_messages;
+  return n_sent_bytes;
 }
 
 NICEAPI_EXPORT GSList *
diff --git a/agent/agent.h b/agent/agent.h
index f0acd14..70fb7c6 100644
--- a/agent/agent.h
+++ b/agent/agent.h
@@ -678,6 +678,13 @@ nice_agent_send (
  * part-way through. Zero will be returned if @n_messages is zero, or if
  * transmission would have blocked on the first message.
  *
+ * In reliable mode, it is instead recommended to use
+ * nice_agent_send().  The return value can be less than @n_messages
+ * or 0 even if it is still possible to send a partial message. In
+ * this case, "nice-agent-writable" will never be triggered, so the
+ * application would have to use nice_agent_sent() to fill the buffer or have
+ * to retry sending at a later point.
+ *
  * On failure, -1 will be returned and @error will be set. If the #NiceAgent is
  * reliable and the socket is not yet connected, %G_IO_ERROR_BROKEN_PIPE will be
  * returned; if the write buffer is full, %G_IO_ERROR_WOULD_BLOCK will be
diff --git a/agent/outputstream.c b/agent/outputstream.c
index 3ffe469..2ba4b48 100644
--- a/agent/outputstream.c
+++ b/agent/outputstream.c
@@ -304,9 +304,9 @@ typedef struct {
 
   GCond cond;
   GMutex mutex;
-  GError *error;
 
   gboolean writable;
+  gboolean cancelled;
 } WriteData;
 
 static void
@@ -315,7 +315,6 @@ write_data_unref (WriteData *write_data)
   if (g_atomic_int_dec_and_test (&write_data->ref_count)) {
     g_cond_clear (&write_data->cond);
     g_mutex_clear (&write_data->mutex);
-    g_clear_error (&write_data->error);
     g_slice_free (WriteData, write_data);
   }
 }
@@ -326,8 +325,8 @@ write_cancelled_cb (GCancellable *cancellable, gpointer user_data)
   WriteData *write_data = user_data;
 
   g_mutex_lock (&write_data->mutex);
-  g_cancellable_set_error_if_cancelled (cancellable, &write_data->error);
   g_cond_broadcast (&write_data->cond);
+  write_data->cancelled = TRUE;
   g_mutex_unlock (&write_data->mutex);
 }
 
@@ -349,8 +348,7 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
 {
   NiceOutputStream *self = NICE_OUTPUT_STREAM (stream);
   gssize len = -1;
-  gint n_sent_messages;
-  GError *child_error = NULL;
+  gint n_sent;
   NiceAgent *agent = NULL;  /* owned */
   gulong cancel_id = 0, writeable_id;
   WriteData *write_data;
@@ -375,14 +373,13 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
     return 0;
   }
 
-  /* FIXME: nice_agent_send_full() is non-blocking, which is a bit unexpected
+  /* FIXME: nice_agent_send() is non-blocking, which is a bit unexpected
    * since nice_agent_recv() is blocking. Currently this uses a fairly dodgy
    * GCond solution; would be much better for nice_agent_send() to block
    * properly in the main loop. */
   len = 0;
   write_data = g_slice_new0 (WriteData);
   g_atomic_int_set (&write_data->ref_count, 3);
-  write_data->error = NULL;
 
   g_mutex_init (&write_data->mutex);
   g_cond_init (&write_data->cond);
@@ -402,59 +399,43 @@ nice_output_stream_write (GOutputStream *stream, const void *buffer, gsize count
 
 
   do {
-    GOutputVector local_buf = { (const guint8 *) buffer + len, count - len };
-    NiceOutputMessage local_message = {&local_buf, 1};
-
     /* Have to unlock while calling into the agent because
      * it will take the agent lock which will cause a deadlock if one of
      * the callbacks is called.
      */
+    if (g_cancellable_is_cancelled (cancellable))
+      break;
+
     write_data->writable = FALSE;
     g_mutex_unlock (&write_data->mutex);
 
-    n_sent_messages = nice_agent_send_messages_nonblocking (agent,
-        self->priv->stream_id, self->priv->component_id, &local_message, 1,
-        cancellable, &child_error);
+    n_sent = nice_agent_send (agent, self->priv->stream_id,
+        self->priv->component_id, count - len, buffer + len);
 
     g_mutex_lock (&write_data->mutex);
 
-    if (n_sent_messages == -1 &&
-        g_error_matches (child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-      /* EWOULDBLOCK. */
-      g_clear_error (&child_error);
-      if (!write_data->writable && !write_data->error)
+    if (n_sent <= 0) {
+      if (!write_data->writable && !write_data->cancelled)
         g_cond_wait (&write_data->cond, &write_data->mutex);
-    } else if (n_sent_messages > 0) {
+    } else if (n_sent > 0) {
       /* Success. */
-      len = count;
-    } else {
-      /* Other error. */
-      len = n_sent_messages;
-      break;
+      len += n_sent;
     }
   } while ((gsize) len < count);
 
   g_signal_handler_disconnect (G_OBJECT (agent), writeable_id);
   g_mutex_unlock (&write_data->mutex);
 
-  if (cancellable != NULL) {
+  if (cancel_id)
     g_cancellable_disconnect (cancellable, cancel_id);
-    /* If we were cancelled, but we have no other errors can couldn't write
-     * anything, return the cancellation error. If we could write
-     * something partial, there is no error.
-     */
-    if (write_data->error && !child_error && len == 0) {
-      g_propagate_error (&child_error, write_data->error);
-      len = -1;
-    }
+
+  if (len == 0) {
+    g_cancellable_set_error_if_cancelled (cancellable, error);
+    len = -1;
   }
 
   write_data_unref (write_data);
 
-  g_assert ((child_error != NULL) == (len == -1));
-  if (child_error)
-    g_propagate_error (error, child_error);
-
   g_object_unref (agent);
   g_assert (len != 0);
 
@@ -522,9 +503,7 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
 {
   NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
   NiceAgent *agent;  /* owned */
-  GOutputVector local_buf = { buffer, count };
-  NiceOutputMessage local_message = { &local_buf, 1 };
-  gint n_sent_messages;
+  gint n_sent;
 
   /* Closed streams are not writeable. */
   if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
@@ -551,12 +530,18 @@ nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
     return -1;
   }
 
-  n_sent_messages = nice_agent_send_messages_nonblocking (agent,
-      priv->stream_id, priv->component_id, &local_message, 1, NULL, error);
+  n_sent = nice_agent_send (agent, priv->stream_id, priv->component_id,
+      count, buffer);
+
+  if (n_sent == -1)
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+        g_strerror (EAGAIN));
+
+
 
   g_object_unref (agent);
 
-  return (n_sent_messages == 1) ? (gssize) count : n_sent_messages;
+  return n_sent;
 }
 
 static GSource *

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list