[Pkg-telepathy-commits] [libnice] 80/265: agent: Add a ComponentSource to Component

Simon McVittie smcv at debian.org
Wed May 14 12:04:55 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit 3724af1a0258ba9e9a455cfb3eec65e41ab907fd
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date:   Mon Jan 6 18:03:20 2014 +0000

    agent: Add a ComponentSource to Component
    
    This is a type of GSource which proxies all poll events from the sockets
    in a Component. It’s necessary for the implementation of
    GPollableInputStream and GPollableOutputStream.
    
    This adds no new external API, but does add ComponentSource and
    component_source_new() as new internal API.
---
 agent/component.c | 208 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 agent/component.h |   7 +-
 2 files changed, 213 insertions(+), 2 deletions(-)

diff --git a/agent/component.c b/agent/component.c
index 1bf6c33..7e5f79b 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -416,7 +416,18 @@ component_attach_socket (Component *component, NiceSocket *socket)
   g_assert (component->ctx != NULL);
 
   /* Find an existing SocketSource in the component which contains @socket, or
-   * create a new one. */
+   * create a new one.
+   *
+   * In order for socket_sources_age to work properly, socket_sources must only
+   * grow monotonically, or be entirely cleared. i.e. New SocketSources must be
+   * prepended to socket_sources, and all other existing SocketSource must be
+   * left untouched; *or* the whole of socket_sources must be cleared. If
+   * socket_sources is cleared, age is reset to 0 and *must not* be incremented
+   * again or the new sockets will not be picked up by ComponentSocket. This is
+   * guaranteed by the fact that socket_sources is only cleared on disconnection
+   * or discovery failure, which are both unrecoverable states.
+   *
+   * An empty socket_sources corresponds to age 0. */
   l = g_slist_find_custom (component->socket_sources, socket,
           _find_socket_source);
   if (l != NULL) {
@@ -427,6 +438,7 @@ component_attach_socket (Component *component, NiceSocket *socket)
     socket_source->component = component;
     component->socket_sources =
         g_slist_prepend (component->socket_sources, socket_source);
+    component->socket_sources_age++;
   }
 
   /* Create and attach a source */
@@ -509,6 +521,7 @@ component_free_socket_sources (Component *component)
   g_slist_free_full (component->socket_sources,
       (GDestroyNotify) socket_source_free);
   component->socket_sources = NULL;
+  component->socket_sources_age = 0;
 }
 
 GMainContext *
@@ -775,3 +788,196 @@ component_deschedule_io_callback (Component *component)
   g_source_remove (component->io_callback_id);
   component->io_callback_id = 0;
 }
+
+
+/**
+ * ComponentSource:
+ *
+ * This is a GSource which wraps a single Component and is dispatched whenever
+ * any of its NiceSockets are dispatched, i.e. it proxies all poll() events for
+ * every socket in the Component. It is designed for use by GPollableInputStream
+ * and GPollableOutputStream, so that a Component can be incorporated into a
+ * custom main context iteration.
+ *
+ * The callbacks dispatched by a ComponentSource have type GPollableSourceFunc.
+ *
+ * ComponentSource supports adding a GCancellable child source which will
+ * additionally dispatch if a provided GCancellable is cancelled.
+ *
+ * Internally, ComponentSource adds a new GSocketSource for each socket in the
+ * Component. Changes to the Component’s list of sockets are detected on each
+ * call to component_source_prepare(), which compares a stored age with the
+ * current age of the Component’s socket list — if the socket list has changed,
+ * the age will have increased (indicating added sockets) or will have been
+ * reset to 0 (indicating all sockets have been closed).
+ */
+typedef struct {
+  GSource parent;
+
+  GObject *pollable_stream;  /* owned */
+  GIOCondition condition;
+
+  Component *component;  /* unowned */
+  guint component_socket_sources_age;
+} ComponentSource;
+
+static gboolean
+component_source_prepare (GSource *source, gint *timeout_)
+{
+  ComponentSource *component_source = (ComponentSource *) source;
+  gint age_diff;
+
+  /* Needed due to accessing the Component. */
+  agent_lock ();
+
+  age_diff =
+      component_source->component->socket_sources_age -
+      component_source->component_socket_sources_age;
+
+  /* If the age has changed, either:
+   *  • a new socket has been *prepended* to component->socket_sources (and
+   *    age_diff > 0); or
+   *  • component->socket_sources has been emptied (and age_diff < 0).
+   * We can’t remove any child sources without destroying them, so must
+   * monotonically add new ones, or remove everything.
+   *
+   * Removing everything only happens on shutdown or failure, in which case
+   * the ComponentSource itself can be destroyed, automatically destroying all
+   * the child sources. */
+  if (age_diff < 0) {
+    g_source_destroy (source);
+  } else if (age_diff > 0) {
+    /* Add the new child sources. The difference between the two ages gives
+     * the number of new child sources. */
+    guint i;
+    GSList *l;
+
+    for (i = 0, l = component_source->component->socket_sources;
+         i < (guint) age_diff && l != NULL;
+         i++, l = l->next) {
+      GSource *child_source;
+      SocketSource *socket_source;
+
+      socket_source = l->data;
+
+      child_source = g_socket_create_source (socket_source->socket->fileno,
+          component_source->condition, NULL);
+      g_source_set_dummy_callback (child_source);
+      g_source_add_child_source (source, child_source);
+      g_source_unref (child_source);
+    }
+  }
+
+  /* Update the age. */
+  component_source->component_socket_sources_age =
+      component_source->component->socket_sources_age;
+
+  agent_unlock ();
+
+  /* We can’t be sure if the ComponentSource itself needs to be dispatched until
+   * poll() is called on all the child sources. */
+  return FALSE;
+}
+
+static gboolean
+component_source_dispatch (GSource *source, GSourceFunc callback,
+    gpointer user_data)
+{
+  ComponentSource *component_source = (ComponentSource *) source;
+  GPollableSourceFunc func = (GPollableSourceFunc) callback;
+
+  return func (component_source->pollable_stream, user_data);
+}
+
+static void
+component_source_finalize (GSource *source)
+{
+  ComponentSource *component_source = (ComponentSource *) source;
+
+  g_object_unref (component_source->pollable_stream);
+  component_source->pollable_stream = NULL;
+}
+
+static gboolean
+component_source_closure_callback (GObject *pollable_stream, gpointer user_data)
+{
+  GClosure *closure = user_data;
+  GValue param_value = G_VALUE_INIT;
+  GValue result_value = G_VALUE_INIT;
+  gboolean retval;
+
+  g_value_init (&result_value, G_TYPE_BOOLEAN);
+  g_value_init (&param_value, G_TYPE_OBJECT);
+  g_value_set_object (&param_value, pollable_stream);
+
+  g_closure_invoke (closure, &result_value, 1, &param_value, NULL);
+  retval = g_value_get_boolean (&result_value);
+
+  g_value_unset (&param_value);
+  g_value_unset (&result_value);
+
+  return retval;
+}
+
+static GSourceFuncs component_source_funcs = {
+  component_source_prepare,
+  NULL,  /* check */
+  component_source_dispatch,
+  component_source_finalize,
+  (GSourceFunc) component_source_closure_callback,
+};
+
+/**
+ * component_source_new:
+ * @component: a #Component
+ * @pollable_stream: a #GPollableInputStream or #GPollableOutputStream to pass
+ * to dispatched callbacks
+ * @condition: the I/O condition to poll for (e.g. %G_IO_IN or %G_IO_OUT)
+ * @cancellable: (allow-none): a #GCancellable, or %NULL
+ *
+ * Create a new #ComponentSource, a type of #GSource which proxies poll events
+ * from all sockets in the given @component.
+ *
+ * A callback function of type #GPollableSourceFunc must be connected to the
+ * returned #GSource using g_source_set_callback(). @pollable_stream is passed
+ * to all callbacks dispatched from the #GSource, and a reference is held on it
+ * by the #GSource.
+ *
+ * The #GSource will automatically update to poll sockets as they’re added to
+ * the @component (e.g. during peer discovery).
+ *
+ * Returns: (transfer full): a new #ComponentSource; unref with g_source_unref()
+ */
+GSource *
+component_source_new (Component *component, GObject *pollable_stream,
+    GIOCondition condition, GCancellable *cancellable)
+{
+  ComponentSource *component_source;
+
+  g_assert (component != NULL);
+  g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_stream) ||
+            G_IS_POLLABLE_OUTPUT_STREAM (pollable_stream));
+
+  component_source =
+      (ComponentSource *)
+          g_source_new (&component_source_funcs, sizeof (ComponentSource));
+  g_source_set_name ((GSource *) component_source, "ComponentSource");
+
+  component_source->component = component;
+  component_source->component_socket_sources_age = 0;
+  component_source->pollable_stream = g_object_ref (pollable_stream);
+  component_source->condition = condition;
+
+  /* Add a cancellable source. */
+  if (cancellable != NULL) {
+    GSource *cancellable_source;
+
+    cancellable_source = g_cancellable_source_new (cancellable);
+    g_source_set_dummy_callback (cancellable_source);
+    g_source_add_child_source ((GSource *) component_source,
+        cancellable_source);
+    g_source_unref (cancellable_source);
+  }
+
+  return (GSource *) component_source;
+}
diff --git a/agent/component.h b/agent/component.h
index 9879b46..64d1d34 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -141,7 +141,8 @@ struct _Component
   NiceComponentState state;
   GSList *local_candidates;    /**< list of Candidate objs */
   GSList *remote_candidates;   /**< list of Candidate objs */
-  GSList *socket_sources;      /**< list of SocketSource objs */
+  GSList *socket_sources;      /**< list of SocketSource objs; must only grow monotonically */
+  guint socket_sources_age;    /**< incremented when socket_sources changes */
   GSList *incoming_checks;     /**< list of IncomingCheck objs */
   GList *turn_servers;             /**< List of TURN servers */
   CandidatePair selected_pair; /**< independent from checklists, 
@@ -220,6 +221,10 @@ component_detach_all_sockets (Component *component);
 void
 component_free_socket_sources (Component *component);
 
+GSource *
+component_source_new (Component *component, GObject *pollable_stream,
+    GIOCondition condition, GCancellable *cancellable);
+
 GMainContext *
 component_dup_io_context (Component *component);
 void

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list