[Pkg-telepathy-commits] [libnice] 80/265: agent: Add a ComponentSource to Component
Simon McVittie
smcv at debian.org
Wed May 14 12:04:55 UTC 2014
This is an automated email from the git hooks/post-receive script.
smcv pushed a commit to branch debian
in repository libnice.
commit 3724af1a0258ba9e9a455cfb3eec65e41ab907fd
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date: Mon Jan 6 18:03:20 2014 +0000
agent: Add a ComponentSource to Component
This is a type of GSource which proxies all poll events from the sockets
in a Component. It’s necessary for the implementation of
GPollableInputStream and GPollableOutputStream.
This adds no new external API, but does add ComponentSource and
component_source_new() as new internal API.
---
agent/component.c | 208 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
agent/component.h | 7 +-
2 files changed, 213 insertions(+), 2 deletions(-)
diff --git a/agent/component.c b/agent/component.c
index 1bf6c33..7e5f79b 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -416,7 +416,18 @@ component_attach_socket (Component *component, NiceSocket *socket)
g_assert (component->ctx != NULL);
/* Find an existing SocketSource in the component which contains @socket, or
- * create a new one. */
+ * create a new one.
+ *
+ * In order for socket_sources_age to work properly, socket_sources must only
+ * grow monotonically, or be entirely cleared. i.e. New SocketSources must be
+ * prepended to socket_sources, and all other existing SocketSource must be
+ * left untouched; *or* the whole of socket_sources must be cleared. If
+ * socket_sources is cleared, age is reset to 0 and *must not* be incremented
+ * again or the new sockets will not be picked up by ComponentSocket. This is
+ * guaranteed by the fact that socket_sources is only cleared on disconnection
+ * or discovery failure, which are both unrecoverable states.
+ *
+ * An empty socket_sources corresponds to age 0. */
l = g_slist_find_custom (component->socket_sources, socket,
_find_socket_source);
if (l != NULL) {
@@ -427,6 +438,7 @@ component_attach_socket (Component *component, NiceSocket *socket)
socket_source->component = component;
component->socket_sources =
g_slist_prepend (component->socket_sources, socket_source);
+ component->socket_sources_age++;
}
/* Create and attach a source */
@@ -509,6 +521,7 @@ component_free_socket_sources (Component *component)
g_slist_free_full (component->socket_sources,
(GDestroyNotify) socket_source_free);
component->socket_sources = NULL;
+ component->socket_sources_age = 0;
}
GMainContext *
@@ -775,3 +788,196 @@ component_deschedule_io_callback (Component *component)
g_source_remove (component->io_callback_id);
component->io_callback_id = 0;
}
+
+
+/**
+ * ComponentSource:
+ *
+ * This is a GSource which wraps a single Component and is dispatched whenever
+ * any of its NiceSockets are dispatched, i.e. it proxies all poll() events for
+ * every socket in the Component. It is designed for use by GPollableInputStream
+ * and GPollableOutputStream, so that a Component can be incorporated into a
+ * custom main context iteration.
+ *
+ * The callbacks dispatched by a ComponentSource have type GPollableSourceFunc.
+ *
+ * ComponentSource supports adding a GCancellable child source which will
+ * additionally dispatch if a provided GCancellable is cancelled.
+ *
+ * Internally, ComponentSource adds a new GSocketSource for each socket in the
+ * Component. Changes to the Component’s list of sockets are detected on each
+ * call to component_source_prepare(), which compares a stored age with the
+ * current age of the Component’s socket list — if the socket list has changed,
+ * the age will have increased (indicating added sockets) or will have been
+ * reset to 0 (indicating all sockets have been closed).
+ */
+typedef struct {
+ GSource parent;
+
+ GObject *pollable_stream; /* owned */
+ GIOCondition condition;
+
+ Component *component; /* unowned */
+ guint component_socket_sources_age;
+} ComponentSource;
+
+static gboolean
+component_source_prepare (GSource *source, gint *timeout_)
+{
+ ComponentSource *component_source = (ComponentSource *) source;
+ gint age_diff;
+
+ /* Needed due to accessing the Component. */
+ agent_lock ();
+
+ age_diff =
+ component_source->component->socket_sources_age -
+ component_source->component_socket_sources_age;
+
+ /* If the age has changed, either:
+ * • a new socket has been *prepended* to component->socket_sources (and
+ * age_diff > 0); or
+ * • component->socket_sources has been emptied (and age_diff < 0).
+ * We can’t remove any child sources without destroying them, so must
+ * monotonically add new ones, or remove everything.
+ *
+ * Removing everything only happens on shutdown or failure, in which case
+ * the ComponentSource itself can be destroyed, automatically destroying all
+ * the child sources. */
+ if (age_diff < 0) {
+ g_source_destroy (source);
+ } else if (age_diff > 0) {
+ /* Add the new child sources. The difference between the two ages gives
+ * the number of new child sources. */
+ guint i;
+ GSList *l;
+
+ for (i = 0, l = component_source->component->socket_sources;
+ i < (guint) age_diff && l != NULL;
+ i++, l = l->next) {
+ GSource *child_source;
+ SocketSource *socket_source;
+
+ socket_source = l->data;
+
+ child_source = g_socket_create_source (socket_source->socket->fileno,
+ component_source->condition, NULL);
+ g_source_set_dummy_callback (child_source);
+ g_source_add_child_source (source, child_source);
+ g_source_unref (child_source);
+ }
+ }
+
+ /* Update the age. */
+ component_source->component_socket_sources_age =
+ component_source->component->socket_sources_age;
+
+ agent_unlock ();
+
+ /* We can’t be sure if the ComponentSource itself needs to be dispatched until
+ * poll() is called on all the child sources. */
+ return FALSE;
+}
+
+static gboolean
+component_source_dispatch (GSource *source, GSourceFunc callback,
+ gpointer user_data)
+{
+ ComponentSource *component_source = (ComponentSource *) source;
+ GPollableSourceFunc func = (GPollableSourceFunc) callback;
+
+ return func (component_source->pollable_stream, user_data);
+}
+
+static void
+component_source_finalize (GSource *source)
+{
+ ComponentSource *component_source = (ComponentSource *) source;
+
+ g_object_unref (component_source->pollable_stream);
+ component_source->pollable_stream = NULL;
+}
+
+static gboolean
+component_source_closure_callback (GObject *pollable_stream, gpointer user_data)
+{
+ GClosure *closure = user_data;
+ GValue param_value = G_VALUE_INIT;
+ GValue result_value = G_VALUE_INIT;
+ gboolean retval;
+
+ g_value_init (&result_value, G_TYPE_BOOLEAN);
+ g_value_init (¶m_value, G_TYPE_OBJECT);
+ g_value_set_object (¶m_value, pollable_stream);
+
+ g_closure_invoke (closure, &result_value, 1, ¶m_value, NULL);
+ retval = g_value_get_boolean (&result_value);
+
+ g_value_unset (¶m_value);
+ g_value_unset (&result_value);
+
+ return retval;
+}
+
+static GSourceFuncs component_source_funcs = {
+ component_source_prepare,
+ NULL, /* check */
+ component_source_dispatch,
+ component_source_finalize,
+ (GSourceFunc) component_source_closure_callback,
+};
+
+/**
+ * component_source_new:
+ * @component: a #Component
+ * @pollable_stream: a #GPollableInputStream or #GPollableOutputStream to pass
+ * to dispatched callbacks
+ * @condition: the I/O condition to poll for (e.g. %G_IO_IN or %G_IO_OUT)
+ * @cancellable: (allow-none): a #GCancellable, or %NULL
+ *
+ * Create a new #ComponentSource, a type of #GSource which proxies poll events
+ * from all sockets in the given @component.
+ *
+ * A callback function of type #GPollableSourceFunc must be connected to the
+ * returned #GSource using g_source_set_callback(). @pollable_stream is passed
+ * to all callbacks dispatched from the #GSource, and a reference is held on it
+ * by the #GSource.
+ *
+ * The #GSource will automatically update to poll sockets as they’re added to
+ * the @component (e.g. during peer discovery).
+ *
+ * Returns: (transfer full): a new #ComponentSource; unref with g_source_unref()
+ */
+GSource *
+component_source_new (Component *component, GObject *pollable_stream,
+ GIOCondition condition, GCancellable *cancellable)
+{
+ ComponentSource *component_source;
+
+ g_assert (component != NULL);
+ g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_stream) ||
+ G_IS_POLLABLE_OUTPUT_STREAM (pollable_stream));
+
+ component_source =
+ (ComponentSource *)
+ g_source_new (&component_source_funcs, sizeof (ComponentSource));
+ g_source_set_name ((GSource *) component_source, "ComponentSource");
+
+ component_source->component = component;
+ component_source->component_socket_sources_age = 0;
+ component_source->pollable_stream = g_object_ref (pollable_stream);
+ component_source->condition = condition;
+
+ /* Add a cancellable source. */
+ if (cancellable != NULL) {
+ GSource *cancellable_source;
+
+ cancellable_source = g_cancellable_source_new (cancellable);
+ g_source_set_dummy_callback (cancellable_source);
+ g_source_add_child_source ((GSource *) component_source,
+ cancellable_source);
+ g_source_unref (cancellable_source);
+ }
+
+ return (GSource *) component_source;
+}
diff --git a/agent/component.h b/agent/component.h
index 9879b46..64d1d34 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -141,7 +141,8 @@ struct _Component
NiceComponentState state;
GSList *local_candidates; /**< list of Candidate objs */
GSList *remote_candidates; /**< list of Candidate objs */
- GSList *socket_sources; /**< list of SocketSource objs */
+ GSList *socket_sources; /**< list of SocketSource objs; must only grow monotonically */
+ guint socket_sources_age; /**< incremented when socket_sources changes */
GSList *incoming_checks; /**< list of IncomingCheck objs */
GList *turn_servers; /**< List of TURN servers */
CandidatePair selected_pair; /**< independent from checklists,
@@ -220,6 +221,10 @@ component_detach_all_sockets (Component *component);
void
component_free_socket_sources (Component *component);
+GSource *
+component_source_new (Component *component, GObject *pollable_stream,
+ GIOCondition condition, GCancellable *cancellable);
+
GMainContext *
component_dup_io_context (Component *component);
void
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git
More information about the Pkg-telepathy-commits
mailing list