[Pkg-telepathy-commits] [libnice] 77/265: agent: Move GSource handling into Component

Simon McVittie smcv at debian.org
Wed May 14 12:04:54 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit c56727025dd1ffa2e0513bf6bfc5218b58e2b483
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date:   Thu Jan 2 15:01:40 2014 +0000

    agent: Move GSource handling into Component
    
    Rather than handle GSource creation, attachment and removal in
    NiceAgent, handle it inside Component. This brings it closer to the
    networking code, and improves encapsulation of the state of each
    Component.
---
 agent/agent-priv.h |  11 ++--
 agent/agent.c      | 146 ++++++++++++++++++++++++++++-------------------------
 agent/component.c  | 124 +++++++++++++++++++++++++++++++--------------
 agent/component.h  |  25 +++++----
 agent/discovery.c  |   5 +-
 5 files changed, 185 insertions(+), 126 deletions(-)

diff --git a/agent/agent-priv.h b/agent/agent-priv.h
index cf212f8..350d1a3 100644
--- a/agent/agent-priv.h
+++ b/agent/agent-priv.h
@@ -166,15 +166,16 @@ guint64 agent_candidate_pair_priority (NiceAgent *agent, NiceCandidate *local, N
 
 GSource *agent_timeout_add_with_context (NiceAgent *agent, guint interval, GSourceFunc function, gpointer data);
 
-void agent_attach_stream_component_socket (NiceAgent *agent,
-    Stream *stream,
-    Component *component,
-    NiceSocket *socket);
-
 StunUsageIceCompatibility agent_to_ice_compatibility (NiceAgent *agent);
 StunUsageTurnCompatibility agent_to_turn_compatibility (NiceAgent *agent);
 NiceTurnSocketCompatibility agent_to_turn_socket_compatibility (NiceAgent *agent);
 
 void _priv_set_socket_tos (NiceAgent *agent, NiceSocket *sock, gint tos);
 
+gboolean
+component_io_cb (
+  GSocket *gsocket,
+  GIOCondition condition,
+  gpointer data);
+
 #endif /*_NICE_AGENT_PRIV_H */
diff --git a/agent/agent.c b/agent/agent.c
index 261fd56..647b543 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -1003,7 +1003,7 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
   if (component->tcp) {
     agent_signal_component_state_change (agent, stream->id,
         component->id, NICE_COMPONENT_STATE_FAILED);
-    component_detach_socket_sources (component);
+    component_detach_all_sockets (component);
   }
   priv_destroy_component_tcp (component);
 }
@@ -1420,8 +1420,7 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent,
       new_socket = nice_udp_bsd_socket_new (&addr);
       if (new_socket) {
         _priv_set_socket_tos (agent, new_socket, stream->tos);
-        agent_attach_stream_component_socket (agent, stream,
-            component, new_socket);
+        component_attach_socket (component, new_socket);
         socket = new_socket;
       }
     }
@@ -1469,8 +1468,7 @@ priv_add_new_candidate_discovery_turn (NiceAgent *agent,
     cdisco->nicesock = nice_tcp_turn_socket_new (socket,
         agent_to_turn_socket_compatibility (agent));
 
-    agent_attach_stream_component_socket (agent, stream,
-        component, cdisco->nicesock);
+    component_attach_socket (component, cdisco->nicesock);
   }
 
   cdisco->turn = turn;
@@ -2666,79 +2664,95 @@ io_ctx_free (IOCtx *ctx)
   g_slice_free (IOCtx, ctx);
 }
 
-static gboolean
-nice_agent_g_source_cb (
-  GSocket *gsocket,
-  GIOCondition condition,
-  gpointer data)
+gboolean
+component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
 {
-  IOCtx *ctx = data;
-  NiceAgent *agent = ctx->agent;
-  Stream *stream = ctx->stream;
-  Component *component = ctx->component;
-  guint8 buf[MAX_BUFFER_SIZE];
+  SocketSource *socket_source = user_data;
+  Component *component;
+  NiceAgent *agent;
+  Stream *stream;
+  guint8 local_buf[MAX_BUFFER_SIZE];
   gssize len;
+  guint8 *recv_buf;
+  gsize recv_buf_len;
+  gboolean retval = FALSE;
+  NiceAgentRecvFunc io_callback;
 
   agent_lock ();
 
+  component = socket_source->component;
+  agent = component->agent;
+  stream = component->stream;
+
   if (g_source_is_destroyed (g_main_current_source ())) {
-    agent_unlock ();
-    return FALSE;
+    /* Silently return FALSE. */
+    nice_debug ("%s: source %p destroyed", G_STRFUNC, g_main_current_source ());
+    goto done;
   }
 
-  /* Actually read the data. This will return 0 if the data has already been
-   * handled. */
-  len = _nice_agent_recv_locked (agent, stream, component, ctx->socket,
-                                 buf, MAX_BUFFER_SIZE);
+  /* FIXME: Compartmentalise this in component.c */
+  g_mutex_lock (&component->io_mutex);
+  io_callback = component->io_callback;
+  g_mutex_unlock (&component->io_mutex);
 
-  if (len < 0) {
-    /* Error. Detach the source but don’t close the socket. We don’t close the
-     * socket because it would be way too complicated to take care of every path
-     * where it might still be used. */
-    nice_debug ("Agent %p: unable to recv from socket %p. Detaching",
-        ctx->agent, ctx->socket);
-    component_detach_socket_source (component, ctx->socket);
-  } else if (len > 0) {
-    component_emit_io_callback (component, buf, len);
+  /* Choose which receive buffer to use. If we’re reading for
+   * nice_agent_attach_recv(), use a local static buffer. If we’re reading for
+   * nice_agent_recv(), use the buffer provided by the client. */
+  g_assert (io_callback == NULL || component->recv_buf == NULL);
+
+  if (io_callback != NULL) {
+    recv_buf = local_buf;
+    recv_buf_len = sizeof (local_buf);
+  } else if (component->recv_buf != NULL) {
+    recv_buf = component->recv_buf + component->recv_buf_valid_len;
+    recv_buf_len = component->recv_buf_len - component->recv_buf_valid_len;
+  } else {
+    /* I/O is paused. Try again later. */
+    retval = TRUE;
+    goto done;
   }
 
-  agent_unlock ();
+  nice_debug ("Receiving on source %p (socket %p, FD %d).",
+      socket_source->source, socket_source->socket,
+      g_socket_get_fd (socket_source->socket->fileno));
 
-  return TRUE;
-}
+  /* Actually read the data. This will return 0 if the data has already been
+   * handled (e.g. for STUN control packets). */
+  len = agent_recv_locked (agent, stream, component, socket_source->socket,
+      recv_buf, recv_buf_len);
 
-/*
- * Attaches one socket handle to the main loop event context.
- *
- * Takes ownership of the socket.
- */
+  nice_debug ("\tReceived %" G_GSSIZE_FORMAT " bytes.", len);
 
-void
-agent_attach_stream_component_socket (NiceAgent *agent,
-    Stream *stream,
-    Component *component,
-    NiceSocket *socket)
-{
-  GSource *source;
-  IOCtx *ctx;
+  if (len == 0) {
+    /* No data was available, probably due to being a reliable connection and
+     * hence the data is stored in the pseudotcp buffer. */
+    retval = TRUE;
+    goto done;
+  } else if (len < 0) {
+    /* Error. Detach the source but don’t close the socket. We don’t close the
+     * socket because it would be way too complicated to take care of every path
+     * where it might still be used. */
+    g_set_error (component->recv_buf_error, G_IO_ERROR, G_IO_ERROR_FAILED,
+        "Unable to receive from socket %p. Detaching.", socket);
+    nice_debug ("%s: error receiving from socket %p", G_STRFUNC, socket);
+    goto done;
+  }
 
-  if (!component->ctx) {
-    component_add_detached_socket (component, socket);
-    return;
+  /* Actual data to notify the client about. */
+  if (io_callback != NULL) {
+    component_emit_io_callback (component, recv_buf, len);
+  } else {
+    /* Data has been stored in the component’s receive buffer to be picked up
+     * later by nice_agent_recv(). */
+    component->recv_buf_valid_len += len;
   }
 
-  /* note: without G_IO_ERR the glib mainloop goes into
-   *       busyloop if errors are encountered */
-  source = g_socket_create_source (socket->fileno, G_IO_IN | G_IO_ERR, NULL);
+  retval = TRUE;
 
-  ctx = io_ctx_new (agent, stream, component, socket, source);
-  g_source_set_callback (source, (GSourceFunc) nice_agent_g_source_cb,
-      ctx, (GDestroyNotify) io_ctx_free);
-  nice_debug ("Agent %p : Attach source %p (stream %u).", agent, source,
-      stream->id);
+done:
+  agent_unlock ();
 
-  /* Add the pair to the component. */
-  component_add_socket_source (component, socket, source);
+  return retval;
 }
 
 NICEAPI_EXPORT gboolean
@@ -2768,20 +2782,12 @@ nice_agent_attach_recv (
     goto done;
   }
 
-  /* Set the component’s I/O callback. */
-  component_set_io_callback (component, func, data, ctx);
+  /* Set the component’s I/O context. */
+  component_set_io_context (component, ctx);
+  component_set_io_callback (component, func, data);
   ret = TRUE;
 
   if (func) {
-    GSList *i;
-
-    /* Attach any detached sockets to the new main context. */
-    for (i = component->socket_sources; i != NULL; i = i->next) {
-      SocketSource *socket_source = i->data;
-      agent_attach_stream_component_socket (agent, stream, component,
-          socket_source->socket);
-    }
-
     /* If we got detached, maybe our readable callback didn't finish reading
      * all available data in the pseudotcp, so we need to make sure we free
      * our recv window, so the readable callback can be triggered again on the
diff --git a/agent/component.c b/agent/component.c
index a70e9e2..62ecd4f 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -63,8 +63,35 @@ component_deschedule_io_callback (Component *component);
 /* Must *not* take the agent lock, since it’s called from within
  * component_set_io_callback(), which holds the Component’s I/O lock. */
 static void
+socket_source_attach (SocketSource *socket_source, GMainContext *context)
+{
+  GSource *source;
+
+  /* Create a source. */
+  source = g_socket_create_source (socket_source->socket->fileno,
+      G_IO_IN, NULL);
+  g_source_set_callback (source, (GSourceFunc) component_io_cb,
+      socket_source, NULL);
+
+  /* Add the source. */
+  nice_debug ("Attaching source %p (socket %p, FD %d) to context %p", source,
+      socket_source->socket, g_socket_get_fd (socket_source->socket->fileno),
+      context);
+
+  g_assert (socket_source->source == NULL);
+  socket_source->source = source;
+  g_source_attach (source, context);
+}
+
+static void
 socket_source_detach (SocketSource *source)
 {
+  nice_debug ("Detaching source %p (socket %p, FD %d) from context %p",
+      source->source, source->socket,
+      (source->socket->fileno != NULL) ?
+          g_socket_get_fd (source->socket->fileno) : 0,
+      (source->source != NULL) ? g_source_get_context (source->source) : 0);
+
   if (source->source != NULL) {
     g_source_destroy (source->source);
     g_source_unref (source->source);
@@ -136,7 +163,7 @@ component_free (Component *cmp)
 
   g_slist_free (cmp->local_candidates);
   g_slist_free (cmp->remote_candidates);
-  g_slist_free_full (cmp->socket_sources, (GDestroyNotify) socket_source_free);
+  component_free_socket_sources (cmp);
   g_slist_free (cmp->incoming_checks);
 
   for (item = cmp->turn_servers; item; item = g_list_next (item)) {
@@ -375,20 +402,20 @@ _find_socket_source (gconstpointer a, gconstpointer b)
   return (source_a->socket == socket_b) ? 0 : 1;
 }
 
-/* This takes ownership of socket and source.
- * It attaches the source to the component’s context. */
+/* This takes ownership of the socket.
+ * It creates and attaches a source to the component’s context. */
 void
-component_add_socket_source (Component *component, NiceSocket *socket,
-    GSource *source)
+component_attach_socket (Component *component, NiceSocket *socket)
 {
   GSList *l;
   SocketSource *socket_source;
 
   g_assert (component != NULL);
   g_assert (socket != NULL);
-  g_assert (source != NULL);
 
-  /* Find an existing SocketSource in the component which contains socket, or
+  g_assert (component->ctx != NULL);
+
+  /* Find an existing SocketSource in the component which contains @socket, or
    * create a new one. */
   l = g_slist_find_custom (component->socket_sources, socket,
           _find_socket_source);
@@ -397,31 +424,36 @@ component_add_socket_source (Component *component, NiceSocket *socket,
   } else {
     socket_source = g_slice_new0 (SocketSource);
     socket_source->socket = socket;
+    socket_source->component = component;
     component->socket_sources =
         g_slist_prepend (component->socket_sources, socket_source);
   }
 
-  /* Add the source. */
-  g_assert (socket_source->source == NULL);
-  g_assert (component->ctx != NULL);
-  socket_source->source = source;
-  g_source_attach (source, component->ctx);
+  /* Create and attach a source */
+  nice_debug ("Component %p (agent %p): Attach source (stream %u).",
+      component, component->agent, component->stream->id);
+  socket_source_attach (socket_source, component->ctx);
 }
 
-void
-component_add_detached_socket (Component *component, NiceSocket *socket)
+/* Reattaches socket handles of @component to the main context.
+ *
+ * Must *not* take the agent lock, since it’s called from within
+ * component_set_io_callback(), which holds the Component’s I/O lock. */
+static void
+component_reattach_all_sockets (Component *component)
 {
-  SocketSource *socket_source;
+  GSList *i;
 
-  socket_source = g_slice_new0 (SocketSource);
-  socket_source->socket = socket;
-  socket_source->source = NULL;
-  component->socket_sources =
-      g_slist_prepend (component->socket_sources, socket_source);
+  for (i = component->socket_sources; i != NULL; i = i->next) {
+    SocketSource *socket_source = i->data;
+    nice_debug ("Reattach source %p.", socket_source->source);
+    socket_source_detach (socket_source);
+    socket_source_attach (socket_source, component->ctx);
+  }
 }
 
 /**
- * component_detach_socket_source:
+ * component_detach_socket:
  * @component: a #Component
  * @socket: the socket to detach the source for
  *
@@ -431,11 +463,13 @@ component_add_detached_socket (Component *component, NiceSocket *socket)
  * If the @socket doesn’t exist in this @component, do nothing.
  */
 void
-component_detach_socket_source (Component *component, NiceSocket *socket)
+component_detach_socket (Component *component, NiceSocket *socket)
 {
   GSList *l;
   SocketSource *socket_source;
 
+  nice_debug ("Detach socket %p.", socket);
+
   /* Find the SocketSource for the socket. */
   l = g_slist_find_custom (component->socket_sources, socket,
           _find_socket_source);
@@ -455,13 +489,14 @@ component_detach_socket_source (Component *component, NiceSocket *socket)
  * component_set_io_callback(), which holds the Component’s I/O lock.
  */
 void
-component_detach_socket_sources (Component *component)
+component_detach_all_sockets (Component *component)
 {
   GSList *i;
 
   for (i = component->socket_sources; i != NULL; i = i->next) {
     SocketSource *socket_source = i->data;
-    nice_debug ("Detach source %p.", socket_source->source);
+    nice_debug ("Detach source %p, socket %p.", socket_source->source,
+        socket_source->socket);
     socket_source_detach (socket_source);
   }
 }
@@ -469,38 +504,51 @@ component_detach_socket_sources (Component *component)
 void
 component_free_socket_sources (Component *component)
 {
+  nice_debug ("Free socket sources for component %p.", component);
+
   g_slist_free_full (component->socket_sources,
       (GDestroyNotify) socket_source_free);
   component->socket_sources = NULL;
 }
 
+/* If @context is %NULL, a fresh context is used, so component->ctx is always
+ * guaranteed to be non-%NULL. */
 void
-component_set_io_callback (Component *component, NiceAgentRecvFunc func,
-    gpointer user_data, GMainContext *context)
+component_set_io_context (Component *component, GMainContext *context)
 {
   g_mutex_lock (&component->io_mutex);
 
-  /* Reference the context early so we don’t accidentally free it below. */
-  if (context != NULL && func != NULL)
-    g_main_context_ref (context);
-
-  if (component->io_callback != NULL)
-    component_detach_socket_sources (component);
+  if (component->ctx != context || component->ctx == NULL) {
+    if (context == NULL)
+      context = g_main_context_new ();
+    else
+      g_main_context_ref (context);
+
+    component_detach_all_sockets (component);
+    if (component->ctx != NULL)
+      g_main_context_unref (component->ctx);
+    component->ctx = context;
+    component_reattach_all_sockets (component);
+  }
 
-  component->io_callback = NULL;
-  component->io_user_data = NULL;
+  g_mutex_unlock (&component->io_mutex);
+}
 
-  if (component->ctx != NULL)
-    g_main_context_unref (component->ctx);
-  component->ctx = NULL;
+void
+component_set_io_callback (Component *component,
+    NiceAgentRecvFunc func, gpointer user_data)
+{
+  g_mutex_lock (&component->io_mutex);
 
   if (func != NULL) {
     component->io_callback = func;
     component->io_user_data = user_data;
-    component->ctx = context;  /* referenced above */
 
     component_schedule_io_callback (component);
   } else {
+    component->io_callback = NULL;
+    component->io_user_data = NULL;
+
     component_deschedule_io_callback (component);
   }
 
diff --git a/agent/component.h b/agent/component.h
index d59ad4b..5186bc1 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -99,10 +99,14 @@ struct _IncomingCheck
  * GSources in a Component must be attached to the same main context:
  * component->ctx.
  *
- * Socket must be non-NULL, but source may be NULL if it has been detached. */
+ * Socket must be non-NULL, but source may be NULL if it has been detached.
+ *
+ * The Component is stored so this may be used as the user data for a GSource
+ * callback. */
 typedef struct {
   NiceSocket *socket;
   GSource *source;
+  Component *component;
 } SocketSource;
 
 
@@ -144,6 +148,9 @@ struct _Component
 				    see ICE 11.1. "Sending Media" (ID-19) */
   NiceCandidate *restart_candidate; /**< for storing active remote candidate during a restart */
 
+  /* I/O handling. The main context must always be non-NULL, and is used for all
+   * socket recv() operations. All io_callback emissions are invoked in this
+   * context too. */
   GMutex io_mutex;                  /**< protects io_callback, io_user_data,
                                          pending_io_messages and io_callback_id.
                                          immutable: can be accessed without
@@ -197,21 +204,19 @@ component_set_selected_remote_candidate (NiceAgent *agent, Component *component,
     NiceCandidate *candidate);
 
 void
-component_add_socket_source (Component *component, NiceSocket *socket,
-    GSource *source);
-void
-component_add_detached_socket (Component *component, NiceSocket *socket);
-
+component_attach_socket (Component *component, NiceSocket *socket);
 void
-component_detach_socket_source (Component *component, NiceSocket *socket);
+component_detach_socket (Component *component, NiceSocket *socket);
 void
-component_detach_socket_sources (Component *component);
+component_detach_all_sockets (Component *component);
 void
 component_free_socket_sources (Component *component);
 
 void
-component_set_io_callback (Component *component, NiceAgentRecvFunc func,
-    gpointer user_data, GMainContext *context);
+component_set_io_context (Component *component, GMainContext *context);
+void
+component_set_io_callback (Component *component,
+    NiceAgentRecvFunc func, gpointer user_data);
 void
 component_emit_io_callback (Component *component,
     const guint8 *buf, gsize buf_len);
diff --git a/agent/discovery.c b/agent/discovery.c
index 418d687..012a290 100644
--- a/agent/discovery.c
+++ b/agent/discovery.c
@@ -489,8 +489,7 @@ NiceCandidate *discovery_add_local_host_candidate (
     goto errors;
 
   _priv_set_socket_tos (agent, udp_socket, stream->tos);
-  agent_attach_stream_component_socket (agent, stream,
-      component, udp_socket);
+  component_attach_socket (component, udp_socket);
 
   return candidate;
 
@@ -621,7 +620,7 @@ discovery_add_relay_candidate (
   if (!priv_add_local_candidate_pruned (agent, stream_id, component, candidate))
     goto errors;
 
-  component_add_detached_socket (component, relay_socket);
+  component_attach_socket (component, relay_socket);
   agent_signal_new_candidate (agent, candidate);
 
   return candidate;

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list