[Pkg-telepathy-commits] [libnice] 78/265: agent: Add nice_agent_recv() allowing blocking receives on sockets

Simon McVittie smcv at debian.org
Wed May 14 12:04:55 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit 243c47ecc9d694ecfe230880081634936770a959
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date:   Mon Dec 16 14:02:28 2013 +0000

    agent: Add nice_agent_recv() allowing blocking receives on sockets
    
    This is a blocking receive function, designed to be called from a worker
    thread. It cannot be used in conjunction with the existing
    nice_agent_attach_recv() API, as the blocking receive and the GSource
    would compete over access to the single copy of the data in the kernel’s
    receive buffer.
---
 agent/agent-priv.h                          |   3 +
 agent/agent.c                               | 363 ++++++++++++++++++++--------
 agent/agent.h                               |   9 +
 agent/component.c                           |  32 ++-
 agent/component.h                           |  20 +-
 docs/reference/libnice/libnice-sections.txt |   1 +
 nice/libnice.sym                            |   1 +
 7 files changed, 324 insertions(+), 105 deletions(-)

diff --git a/agent/agent-priv.h b/agent/agent-priv.h
index 350d1a3..b2d5c54 100644
--- a/agent/agent-priv.h
+++ b/agent/agent-priv.h
@@ -178,4 +178,7 @@ component_io_cb (
   GIOCondition condition,
   gpointer data);
 
+gssize agent_recv_locked (NiceAgent *agent, Stream *stream,
+    Component *component, NiceSocket *socket, guint8 *buf, gsize buf_len);
+
 #endif /*_NICE_AGENT_PRIV_H */
diff --git a/agent/agent.c b/agent/agent.c
index 647b543..bd21e78 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -1,7 +1,7 @@
 /*
  * This file is part of the Nice GLib ICE library.
  *
- * (C) 2006-2010 Collabora Ltd.
+ * (C) 2006-2010, 2013 Collabora Ltd.
  *  Contact: Youness Alaoui
  * (C) 2006-2010 Nokia Corporation. All rights reserved.
  *  Contact: Kai Vehmanen
@@ -25,6 +25,7 @@
  *   Dafydd Harries, Collabora Ltd.
  *   Youness Alaoui, Collabora Ltd.
  *   Kai Vehmanen, Nokia
+ *   Philip Withnall, Collabora Ltd.
  *
  * Alternatively, the contents of this file may be used under the terms of the
  * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
@@ -982,21 +983,6 @@ nice_agent_set_property (
 
 }
 
-
-static void priv_destroy_component_tcp (Component *component)
-{
-    if (component->tcp_clock) {
-      g_source_destroy (component->tcp_clock);
-      g_source_unref (component->tcp_clock);
-      component->tcp_clock = NULL;
-    }
-    if (component->tcp) {
-      pseudo_tcp_socket_close (component->tcp, TRUE);
-      g_object_unref (component->tcp);
-      component->tcp = NULL;
-    }
-}
-
 static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
     Component *component)
 {
@@ -1004,8 +990,16 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
     agent_signal_component_state_change (agent, stream->id,
         component->id, NICE_COMPONENT_STATE_FAILED);
     component_detach_all_sockets (component);
+    pseudo_tcp_socket_close (component->tcp, TRUE);
+    g_object_unref (component->tcp);
+    component->tcp = NULL;
+  }
+
+  if (component->tcp_clock) {
+    g_source_destroy (component->tcp_clock);
+    g_source_unref (component->tcp_clock);
+    component->tcp_clock = NULL;
   }
-  priv_destroy_component_tcp (component);
 }
 
 static void
@@ -1033,6 +1027,7 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
   Stream *stream = component->stream;
   guint8 buf[MAX_BUFFER_SIZE];
   gssize len;
+  gboolean has_io_callback;
 
   nice_debug ("Agent %p: s%d:%d pseudo Tcp socket readable", agent,
       stream->id, component->id);
@@ -1041,30 +1036,58 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
 
   g_object_add_weak_pointer (G_OBJECT (sock), (gpointer *)&sock);
   g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *)&agent);
+  has_io_callback = component_has_io_callback (component);
 
   do {
-    gboolean has_io_callback = component_has_io_callback (component);
-
-    if (has_io_callback)
+    /* Only dequeue pseudo-TCP data if we can reliably inform the client. */
+    if (has_io_callback) {
       len = pseudo_tcp_socket_recv (sock, (gchar *) buf, sizeof(buf));
-    else
+    } else if (component->recv_buf != NULL) {
+      len = pseudo_tcp_socket_recv (sock,
+          (gchar *) component->recv_buf + component->recv_buf_valid_len,
+          component->recv_buf_len - component->recv_buf_valid_len);
+    } else {
       len = 0;
+    }
 
-    if (len > 0) {
+    nice_debug ("%s: len %" G_GSSIZE_FORMAT, G_STRFUNC, len);
+
+    if (len > 0 && has_io_callback) {
       component_emit_io_callback (component, buf, len);
       if (sock == NULL) {
         nice_debug ("PseudoTCP socket got destroyed in readable callback!");
         break;
       }
+    } else if (len > 0 && component->recv_buf != NULL) {
+      /* No callback to call. The data has been copied directly into the
+       * client’s receive buffer. */
+      component->recv_buf_valid_len += len;
     } else if (len < 0 &&
         pseudo_tcp_socket_get_error (sock) != EWOULDBLOCK) {
       /* Signal error */
+      nice_debug ("%s: calling priv_pseudo_tcp_error()", G_STRFUNC);
       priv_pseudo_tcp_error (agent, stream, component);
+
+      if (component->recv_buf != NULL) {
+        GIOErrorEnum error_code;
+
+        if (pseudo_tcp_socket_get_error (sock) == ENOTCONN)
+          error_code = G_IO_ERROR_BROKEN_PIPE;
+        else
+          error_code = G_IO_ERROR_FAILED;
+
+        g_set_error (component->recv_buf_error, G_IO_ERROR, error_code,
+            "Error reading data from pseudo-TCP socket.");
+      }
     } else if (len < 0 &&
         pseudo_tcp_socket_get_error (sock) == EWOULDBLOCK){
       component->tcp_readable = FALSE;
     }
-  } while (len > 0);
+
+    has_io_callback = component_has_io_callback (component);
+  } while (len > 0 &&
+           (has_io_callback ||
+            component->recv_buf_valid_len < component->recv_buf_len));
 
   if (agent) {
     adjust_tcp_clock (agent, stream, component);
@@ -1097,8 +1120,8 @@ pseudo_tcp_socket_closed (PseudoTcpSocket *sock, guint32 err,
   NiceAgent *agent = component->agent;
   Stream *stream = component->stream;
 
-  nice_debug ("Agent %p: s%d:%d pseudo Tcp socket closed",  agent,
-      stream->id, component->id);
+  nice_debug ("Agent %p: s%d:%d pseudo Tcp socket closed. "
+      "Calling priv_pseudo_tcp_error().",  agent, stream->id, component->id);
   priv_pseudo_tcp_error (agent, stream, component);
 }
 
@@ -1117,8 +1140,10 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
     gchar tmpbuf[INET6_ADDRSTRLEN];
     nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
 
-    nice_debug ("Agent %p : s%d:%d: sending %d bytes to [%s]:%d",
-        component->agent, component->stream->id, component->id, len, tmpbuf,
+    nice_debug (
+        "Agent %p : s%d:%d: sending %d bytes on socket %p (FD %d) to [%s]:%d",
+        component->agent, component->stream->id, component->id, len,
+        sock->fileno, g_socket_get_fd (sock->fileno), tmpbuf,
         nice_address_get_port (&component->selected_pair.remote->addr));
 #endif
 
@@ -1176,7 +1201,8 @@ adjust_tcp_clock (NiceAgent *agent, Stream *stream, Component *component)
       component->tcp_clock = agent_timeout_add_with_context (agent,
           timeout, notify_pseudo_tcp_socket_clock, component);
     } else {
-      nice_debug ("Agent %p: component %d pseudo tcp socket should be destroyed",
+      nice_debug ("Agent %p: component %d pseudo-TCP socket should be "
+          "destroyed. Calling priv_pseudo_tcp_error().",
           agent, component->id);
       priv_pseudo_tcp_error (agent, stream, component);
     }
@@ -2302,7 +2328,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
 }
 
 /*
- * _nice_agent_recv_locked:
+ * agent_recv_locked:
  * @agent: a #NiceAgent
  * @stream: the stream to receive from
  * @component: the component to receive from
@@ -2311,15 +2337,17 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
  * @buf_len: the length of @buf
  *
  * Receive up to @buf_len bytes of data from the given
- * @stream/@component/@socket, in a non-blocking fashion.
+ * @stream/@component/@socket, in a non-blocking fashion. If the socket is a
+ * datagram socket and @buf_len is not big enough to hold an entire packet, the
+ * remaining bytes of the packet will be silently dropped.
  *
  * NOTE: Must be called with the agent’s lock held.
  *
  * Returns: number of bytes stored in @buf, 0 if no data is available, or -1 on
  * error
  */
-static gssize
-_nice_agent_recv_locked (
+gssize
+agent_recv_locked (
   NiceAgent *agent,
   Stream *stream,
   Component *component,
@@ -2330,9 +2358,15 @@ _nice_agent_recv_locked (
   NiceAddress from;
   gssize len;
   GList *item;
+  guint8 local_buf[MAX_BUFFER_SIZE];
+  gsize local_buf_len = MAX_BUFFER_SIZE;
 
-  /* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success. */
-  len = nice_socket_recv (socket, &from, buf_len, (gchar *) buf);
+  /* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success.
+   *
+   * FIXME: We have to receive into a local buffer then copy out because
+   * otherwise, if @buf is too small, we could lose data, even when in
+   * reliable mode (because reliable streams are packetised). */
+  len = nice_socket_recv (socket, &from, local_buf_len, (gchar *) local_buf);
 
   if (len == 0) {
     return 0;
@@ -2352,14 +2386,6 @@ _nice_agent_recv_locked (
   }
 #endif
 
-
-  if ((gsize) len > buf_len)
-    {
-      /* buffer is not big enough to accept this packet */
-      /* XXX: test this case */
-      return 0;
-    }
-
   for (item = component->turn_servers; item; item = g_list_next (item)) {
     TurnServer *turn = item->data;
     if (nice_address_equal (&from, &turn->server)) {
@@ -2374,7 +2400,7 @@ _nice_agent_recv_locked (
             cand->stream_id == stream->id &&
             cand->component_id == component->id) {
           len = nice_turn_socket_parse_recv (cand->sockptr, &socket,
-              &from, len, (gchar *) buf, &from, (gchar *) buf, len);
+              &from, len, (gchar *) local_buf, &from, (gchar *) local_buf, len);
         }
       }
       break;
@@ -2383,15 +2409,15 @@ _nice_agent_recv_locked (
 
   agent->media_after_tick = TRUE;
 
-  if (stun_message_validate_buffer_length ((uint8_t *) buf, (size_t) len,
+  if (stun_message_validate_buffer_length ((uint8_t *) local_buf, (size_t) len,
       (agent->compatibility != NICE_COMPATIBILITY_OC2007 &&
        agent->compatibility != NICE_COMPATIBILITY_OC2007R2)) != len)
-    /* If the retval is no 0, its not a valid stun packet, probably data */
+    /* If the retval is not 0, it’s not a valid STUN packet; probably data. */
     goto handle_tcp;
 
 
   if (conn_check_handle_inbound_stun (agent, stream, component, socket,
-          &from, (gchar *) buf, len))
+          &from, (gchar *) local_buf, len))
     /* handled STUN message*/
     return 0;
 
@@ -2400,7 +2426,10 @@ handle_tcp:
   if (len > 0 && component->tcp) {
     /* Received data on a reliable connection. */
     g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
-    pseudo_tcp_socket_notify_packet (component->tcp, (gchar *) buf, len);
+
+    nice_debug ("%s: notifying pseudo-TCP of packet, length %" G_GSSIZE_FORMAT,
+        G_STRFUNC, len);
+    pseudo_tcp_socket_notify_packet (component->tcp, (gchar *) local_buf, len);
 
     if (agent) {
       adjust_tcp_clock (agent, stream, component);
@@ -2418,9 +2447,194 @@ handle_tcp:
     return 0;
   }
 
+  /* Yay for poor performance! */
+  if (len >= 0) {
+    len = MIN (buf_len, (gsize) len);
+    memcpy (buf, local_buf, len);
+  }
+
   return len;
 }
 
+static gboolean
+nice_agent_recv_cancelled_cb (GCancellable *cancellable, gpointer user_data)
+{
+  GError **error = user_data;
+  return !g_cancellable_set_error_if_cancelled (cancellable, error);
+}
+
+/**
+ * nice_agent_recv:
+ * @agent: a #NiceAgent
+ * @stream_id: the ID of the stream to receive on
+ * @component_id: the ID of the component to receive on
+ * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
+ * to write the received data into, of length at least @buf_len
+ * @buf_len: length of @buf
+ * @cancellable: (allow-none): a #GCancellable to allow the operation to be
+ * cancelled from another thread, or %NULL
+ * @error: (allow-none): return location for a #GError, or %NULL
+ *
+ * Block on receiving data from the given stream/component combination on
+ * @agent, returning only once at least 1 byte has been received and written
+ * into @buf, the stream is closed by the other end or by calling
+ * nice_agent_remove_stream(), or @cancellable is cancelled.
+ *
+ * In the non-error case, in reliable mode, this will block until exactly
+ * @buf_len bytes have been received. In non-reliable mode, it will block until
+ * a single message has been received. In this case, @buf must be big enough to
+ * contain an entire message (65536 bytes), or any excess data may be silently
+ * dropped.
+ *
+ * This must not be used in combination with nice_agent_attach_recv() on the
+ * same stream/component pair.
+ *
+ * Internally, this may iterate the current thread’s default main context.
+ *
+ * If the stream/component pair doesn’t exist, or if a suitable candidate socket
+ * hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
+ * returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
+ * cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
+ *
+ * Returns: the number of bytes written to @buf on success (guaranteed to be
+ * greater than 0 unless @buf_len is 0), or -1 on error
+ *
+ * Since: 0.1.5
+ */
+NICEAPI_EXPORT gssize
+nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
+  guint8 *buf, gsize buf_len, GCancellable *cancellable, GError **error)
+{
+  GMainContext *context;
+  Stream *stream;
+  Component *component;
+  gssize len = -1;
+  GSource *cancellable_source = NULL;
+  gboolean received_enough = FALSE, error_reported = FALSE;
+  GError *child_error = NULL;
+
+  g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
+  g_return_val_if_fail (stream_id >= 1, -1);
+  g_return_val_if_fail (component_id >= 1, -1);
+  g_return_val_if_fail (buf != NULL, -1);
+  g_return_val_if_fail (
+      cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
+  g_return_val_if_fail (error == NULL || *error == NULL, -1);
+
+  if (buf_len == 0)
+    return 0;
+
+  agent_lock ();
+
+  /* We’re not going to do the
+   * implement-a-ring-buffer-to-cater-for-tiny-input-buffers game, so just warn
+   * if the buffer size is too small, and silently drop any overspilling
+   * bytes. */
+  g_warn_if_fail (agent->reliable || buf_len >= MAX_BUFFER_SIZE);
+
+  if (!agent_find_component (agent, stream_id, component_id,
+          &stream, &component)) {
+    g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
+                 "Invalid stream/component.");
+    goto done;
+  }
+
+  /* Set the component’s receive buffer. */
+  context = component_dup_io_context (component);
+  component_set_io_callback (component, NULL, NULL, buf, buf_len, &child_error);
+
+  /* Add the cancellable as a source. */
+  if (cancellable != NULL) {
+    cancellable_source = g_cancellable_source_new (cancellable);
+    g_source_set_callback (cancellable_source,
+        (GSourceFunc) nice_agent_recv_cancelled_cb, &child_error, NULL);
+    g_source_attach (cancellable_source, context);
+  }
+
+  /* Is there already pending data left over from having an I/O callback
+   * attached and switching to using nice_agent_recv()? This is a horrifically
+   * specific use case which I hope nobody ever tries. And yet, it still must be
+   * supported. */
+  g_mutex_lock (&component->io_mutex);
+
+  while (!received_enough &&
+         !g_queue_is_empty (&component->pending_io_messages)) {
+    IOCallbackData *data;
+    gsize copied_len;
+
+    g_assert (component->io_callback_id == 0);
+
+    data = g_queue_peek_head (&component->pending_io_messages);
+    copied_len = MIN (data->buf_len - data->offset,
+        component->recv_buf_len - component->recv_buf_valid_len);
+
+    memcpy (component->recv_buf + component->recv_buf_valid_len,
+        data->buf + data->offset, len);
+    component->recv_buf_valid_len += copied_len;
+
+    /* If we only managed to grab part of the buffer, leave the buffer in the
+     * queue and have another go at it later. */
+    if (copied_len < data->buf_len - data->offset) {
+      data->offset += copied_len;
+    } else {
+      g_queue_pop_head (&component->pending_io_messages);
+      io_callback_data_free (data);
+    }
+
+    received_enough =
+        ((agent->reliable && component->recv_buf_valid_len >= buf_len) ||
+         (!agent->reliable && component->recv_buf_valid_len > 0));
+  }
+
+  g_mutex_unlock (&component->io_mutex);
+
+  /* Each iteration of the main context will either receive some data, a
+   * cancellation error or a socket error.
+   *
+   * In reliable mode, iterate the loop enough to receive exactly @buf_len
+   * bytes. In non-reliable mode, iterate the loop to receive a single message.
+   */
+  while (!received_enough && !error_reported) {
+    agent_unlock ();
+    g_main_context_iteration (context, TRUE);
+    agent_lock ();
+
+    received_enough =
+        ((agent->reliable && component->recv_buf_valid_len >= buf_len) ||
+         (!agent->reliable && component->recv_buf_valid_len > 0));
+    error_reported = (child_error != NULL);
+  }
+
+  len = component->recv_buf_valid_len;
+  nice_debug ("%s: len: %" G_GSIZE_FORMAT ", buf_len: %" G_GSIZE_FORMAT,
+      G_STRFUNC, len, buf_len);
+
+  /* Tidy up. */
+  if (cancellable_source != NULL) {
+    g_source_destroy (cancellable_source);
+    g_source_unref (cancellable_source);
+  }
+
+  component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
+  g_main_context_unref (context);
+
+  /* Handle errors and cancellations. */
+  if (!received_enough) {
+    g_assert (error_reported);
+    len = -1;
+  }
+
+done:
+  g_assert ((child_error != NULL) == (len == -1));
+  g_assert (len != 0);
+
+  if (child_error != NULL)
+    g_propagate_error (error, child_error);
+
+  agent_unlock ();
+
+  return len;
+}
 
 NICEAPI_EXPORT gint
 nice_agent_send (
@@ -2624,46 +2838,6 @@ nice_agent_dispose (GObject *object)
 
 }
 
-
-typedef struct _IOCtx IOCtx;
-
-struct _IOCtx
-{
-  GSource *source;
-  NiceAgent *agent;
-  Stream *stream;
-  Component *component;
-  NiceSocket *socket;
-};
-
-
-static IOCtx *
-io_ctx_new (
-  NiceAgent *agent,
-  Stream *stream,
-  Component *component,
-  NiceSocket *socket,
-  GSource *source)
-{
-  IOCtx *ctx;
-
-  ctx = g_slice_new0 (IOCtx);
-  ctx->agent = agent;
-  ctx->stream = stream;
-  ctx->component = component;
-  ctx->socket = socket;
-  ctx->source = source;
-
-  return ctx;
-}
-
-
-static void
-io_ctx_free (IOCtx *ctx)
-{
-  g_slice_free (IOCtx, ctx);
-}
-
 gboolean
 component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
 {
@@ -2676,7 +2850,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
   guint8 *recv_buf;
   gsize recv_buf_len;
   gboolean retval = FALSE;
-  NiceAgentRecvFunc io_callback;
+  gboolean has_io_callback;
 
   agent_lock ();
 
@@ -2690,17 +2864,14 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
     goto done;
   }
 
-  /* FIXME: Compartmentalise this in component.c */
-  g_mutex_lock (&component->io_mutex);
-  io_callback = component->io_callback;
-  g_mutex_unlock (&component->io_mutex);
+  has_io_callback = component_has_io_callback (component);
 
   /* Choose which receive buffer to use. If we’re reading for
    * nice_agent_attach_recv(), use a local static buffer. If we’re reading for
    * nice_agent_recv(), use the buffer provided by the client. */
-  g_assert (io_callback == NULL || component->recv_buf == NULL);
+  g_assert (!has_io_callback || component->recv_buf == NULL);
 
-  if (io_callback != NULL) {
+  if (has_io_callback) {
     recv_buf = local_buf;
     recv_buf_len = sizeof (local_buf);
   } else if (component->recv_buf != NULL) {
@@ -2739,7 +2910,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
   }
 
   /* Actual data to notify the client about. */
-  if (io_callback != NULL) {
+  if (has_io_callback) {
     component_emit_io_callback (component, recv_buf, len);
   } else {
     /* Data has been stored in the component’s receive buffer to be picked up
@@ -2784,7 +2955,7 @@ nice_agent_attach_recv (
 
   /* Set the component’s I/O context. */
   component_set_io_context (component, ctx);
-  component_set_io_callback (component, func, data);
+  component_set_io_callback (component, func, data, NULL, 0, NULL);
   ret = TRUE;
 
   if (func) {
diff --git a/agent/agent.h b/agent/agent.h
index c2a39d1..64df422 100644
--- a/agent/agent.h
+++ b/agent/agent.h
@@ -676,6 +676,15 @@ nice_agent_attach_recv (
   NiceAgentRecvFunc func,
   gpointer data);
 
+gssize
+nice_agent_recv (
+    NiceAgent *agent,
+    guint stream_id,
+    guint component_id,
+    guint8 *buf,
+    gsize buf_len,
+    GCancellable *cancellable,
+    GError **error);
 
 /**
  * nice_agent_set_selected_pair:
diff --git a/agent/component.c b/agent/component.c
index 62ecd4f..1bf6c33 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -61,7 +61,7 @@ component_deschedule_io_callback (Component *component);
 
 
 /* Must *not* take the agent lock, since it’s called from within
- * component_set_io_callback(), which holds the Component’s I/O lock. */
+ * component_set_io_context(), which holds the Component’s I/O lock. */
 static void
 socket_source_attach (SocketSource *socket_source, GMainContext *context)
 {
@@ -129,7 +129,7 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
    * will be updated when nice_agent_attach_recv() or nice_agent_recv() are
    * called. */
   component_set_io_context (component, NULL);
-  component_set_io_callback (component, NULL, NULL);
+  component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
 
   return component;
 }
@@ -438,7 +438,7 @@ component_attach_socket (Component *component, NiceSocket *socket)
 /* Reattaches socket handles of @component to the main context.
  *
  * Must *not* take the agent lock, since it’s called from within
- * component_set_io_callback(), which holds the Component’s I/O lock. */
+ * component_set_io_context(), which holds the Component’s I/O lock. */
 static void
 component_reattach_all_sockets (Component *component)
 {
@@ -486,7 +486,7 @@ component_detach_socket (Component *component, NiceSocket *socket)
  * sockets themselves untouched.
  *
  * Must *not* take the agent lock, since it’s called from within
- * component_set_io_callback(), which holds the Component’s I/O lock.
+ * component_set_io_context(), which holds the Component’s I/O lock.
  */
 void
 component_detach_all_sockets (Component *component)
@@ -511,6 +511,12 @@ component_free_socket_sources (Component *component)
   component->socket_sources = NULL;
 }
 
+GMainContext *
+component_dup_io_context (Component *component)
+{
+  return g_main_context_ref (component->ctx);
+}
+
 /* If @context is %NULL, a fresh context is used, so component->ctx is always
  * guaranteed to be non-%NULL. */
 void
@@ -534,24 +540,40 @@ component_set_io_context (Component *component, GMainContext *context)
   g_mutex_unlock (&component->io_mutex);
 }
 
+/* (func, user_data) and (recv_buf, recv_buf_len) are mutually exclusive.
+ * At most one of the two must be specified; if both are NULL, the Component
+ * will not receive any data (i.e. reception is paused). */
 void
 component_set_io_callback (Component *component,
-    NiceAgentRecvFunc func, gpointer user_data)
+    NiceAgentRecvFunc func, gpointer user_data,
+    guint8 *recv_buf, gsize recv_buf_len,
+    GError **error)
 {
+  g_assert (func == NULL || recv_buf == NULL);
+  g_assert (recv_buf != NULL || recv_buf_len == 0);
+  g_assert (error == NULL || *error == NULL);
+
   g_mutex_lock (&component->io_mutex);
 
   if (func != NULL) {
     component->io_callback = func;
     component->io_user_data = user_data;
+    component->recv_buf = NULL;
+    component->recv_buf_len = 0;
 
     component_schedule_io_callback (component);
   } else {
     component->io_callback = NULL;
     component->io_user_data = NULL;
+    component->recv_buf = recv_buf;
+    component->recv_buf_len = recv_buf_len;
 
     component_deschedule_io_callback (component);
   }
 
+  component->recv_buf_valid_len = 0;
+  component->recv_buf_error = error;
+
   g_mutex_unlock (&component->io_mutex);
 }
 
diff --git a/agent/component.h b/agent/component.h
index 5186bc1..9879b46 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -150,7 +150,10 @@ struct _Component
 
   /* I/O handling. The main context must always be non-NULL, and is used for all
    * socket recv() operations. All io_callback emissions are invoked in this
-   * context too. */
+   * context too.
+   *
+   * recv_buf and io_callback are mutually exclusive, but it is allowed for both
+   * to be NULL if the Component is not currently ready to receive data. */
   GMutex io_mutex;                  /**< protects io_callback, io_user_data,
                                          pending_io_messages and io_callback_id.
                                          immutable: can be accessed without
@@ -159,8 +162,6 @@ struct _Component
                                          taken before this one */
   NiceAgentRecvFunc io_callback;    /**< function called on io cb */
   gpointer io_user_data;            /**< data passed to the io function */
-  GMainContext *ctx;                /**< context for GSources for this
-                                       component */
   GQueue pending_io_messages;       /**< queue of packets which have been
                                          received but not passed to the client
                                          in an I/O callback or recv() call yet.
@@ -168,6 +169,13 @@ struct _Component
                                          IOCallbackData */
   guint io_callback_id;             /* GSource ID of the I/O callback */
 
+  GMainContext *ctx;                /**< context for GSources for this
+                                       component */
+  guint8 *recv_buf;                 /**< unowned buffer for receiving into */
+  gsize recv_buf_len;               /**< allocated size of recv_buf in bytes */
+  gsize recv_buf_valid_len;         /**< length of valid data in recv_buf */
+  GError **recv_buf_error;          /**< error information about failed reads */
+
   NiceAgent *agent;  /* unowned, immutable: can be accessed without holding the
                       * agent lock */
   Stream *stream;  /* unowned, immutable: can be accessed without holding the
@@ -212,11 +220,15 @@ component_detach_all_sockets (Component *component);
 void
 component_free_socket_sources (Component *component);
 
+GMainContext *
+component_dup_io_context (Component *component);
 void
 component_set_io_context (Component *component, GMainContext *context);
 void
 component_set_io_callback (Component *component,
-    NiceAgentRecvFunc func, gpointer user_data);
+    NiceAgentRecvFunc func, gpointer user_data,
+    guint8 *recv_buf, gsize recv_buf_len,
+    GError **error);
 void
 component_emit_io_callback (Component *component,
     const guint8 *buf, gsize buf_len);
diff --git a/docs/reference/libnice/libnice-sections.txt b/docs/reference/libnice/libnice-sections.txt
index bc276ab..7a1c53a 100644
--- a/docs/reference/libnice/libnice-sections.txt
+++ b/docs/reference/libnice/libnice-sections.txt
@@ -23,6 +23,7 @@ nice_agent_get_remote_candidates
 nice_agent_get_local_candidates
 nice_agent_get_selected_pair
 nice_agent_send
+nice_agent_recv
 nice_agent_attach_recv
 nice_agent_set_selected_pair
 nice_agent_set_selected_remote_candidate
diff --git a/nice/libnice.sym b/nice/libnice.sym
index 195092d..0f7208d 100644
--- a/nice/libnice.sym
+++ b/nice/libnice.sym
@@ -16,6 +16,7 @@ nice_address_set_port
 nice_address_to_string
 nice_agent_add_local_address
 nice_agent_add_stream
+nice_agent_recv
 nice_agent_attach_recv
 nice_agent_gather_candidates
 nice_agent_generate_local_candidate_sdp

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list