[Pkg-telepathy-commits] [libnice] 84/265: agent: Add GPollableInputStream support to NiceInputStream

Simon McVittie smcv at debian.org
Wed May 14 12:04:55 UTC 2014


This is an automated email from the git hooks/post-receive script.

smcv pushed a commit to branch debian
in repository libnice.

commit 0ac9f3f90483acc9cc590f2cefea2e823011971d
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date:   Mon Dec 16 14:42:13 2013 +0000

    agent: Add GPollableInputStream support to NiceInputStream
---
 agent/inputstream.c | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 153 insertions(+), 3 deletions(-)

diff --git a/agent/inputstream.c b/agent/inputstream.c
index de65f66..0e9f29b 100644
--- a/agent/inputstream.c
+++ b/agent/inputstream.c
@@ -60,13 +60,20 @@
 # include "config.h"
 #endif
 
+#include <errno.h>
+
 #include "inputstream.h"
 #include "agent-priv.h"
 
 static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
     gpointer user_data);
+static void nice_input_stream_init_pollable (
+    GPollableInputStreamInterface *iface);
 
-G_DEFINE_TYPE (NiceInputStream, nice_input_stream, G_TYPE_INPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (NiceInputStream,
+                         nice_input_stream, G_TYPE_INPUT_STREAM,
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                             nice_input_stream_init_pollable));
 
 enum
 {
@@ -89,6 +96,11 @@ static void nice_input_stream_set_property (GObject *object, guint prop_id,
     const GValue *value, GParamSpec *pspec);
 static gssize nice_input_stream_read (GInputStream *stream, void *buffer,
     gsize count, GCancellable *cancellable, GError **error);
+static gboolean nice_input_stream_is_readable (GPollableInputStream *stream);
+static gssize nice_input_stream_read_nonblocking (GPollableInputStream *stream,
+    void *buffer, gsize count, GError **error);
+static GSource *nice_input_stream_create_source (GPollableInputStream *stream,
+    GCancellable *cancellable);
 
 static void
 nice_input_stream_class_init (NiceInputStreamClass *klass)
@@ -239,6 +251,14 @@ nice_input_stream_init (NiceInputStream *stream)
   g_weak_ref_init (&stream->priv->agent_ref, NULL);
 }
 
+static void
+nice_input_stream_init_pollable (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable = nice_input_stream_is_readable;
+  iface->read_nonblocking = nice_input_stream_read_nonblocking;
+  iface->create_source = nice_input_stream_create_source;
+}
+
 /**
  * nice_input_stream_new:
  * @agent: A #NiceAgent
@@ -279,8 +299,11 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
   gssize len;
 
   /* Closed streams are not readable. */
-  if (g_input_stream_is_closed (stream))
-    return 0;
+  if (g_input_stream_is_closed (stream)) {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+        "Stream is closed.");
+    return -1;
+  }
 
   /* Has the agent disappeared? */
   agent = g_weak_ref_get (&priv->agent_ref);
@@ -298,6 +321,133 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
   return len;
 }
 
+static gboolean
+nice_input_stream_is_readable (GPollableInputStream *stream)
+{
+  NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
+  Component *component = NULL;
+  Stream *_stream = NULL;
+  gboolean retval = FALSE;
+  GSList *i;
+  NiceAgent *agent;  /* owned */
+
+  /* Closed streams are not readable. */
+  if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
+    return FALSE;
+
+  /* Has the agent disappeared? */
+  agent = g_weak_ref_get (&priv->agent_ref);
+  if (agent == NULL)
+    return FALSE;
+
+  agent_lock ();
+
+  if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+          &_stream, &component)) {
+    g_warning ("Could not find component %u in stream %u", priv->component_id,
+        priv->stream_id);
+    goto done;
+  }
+
+  /* If it’s a reliable agent, see if there’s any pending data in the pseudo-TCP
+   * buffer. */
+  if (agent->reliable && component->tcp != NULL &&
+      pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
+    retval = TRUE;
+    goto done;
+  }
+
+  /* Check whether any of the component’s FDs are pollable. */
+  for (i = component->socket_sources; i != NULL; i = i->next) {
+    SocketSource *socket_source = i->data;
+    NiceSocket *socket = socket_source->socket;
+
+    if (g_socket_condition_check (socket->fileno, G_IO_IN) != 0) {
+      retval = TRUE;
+      break;
+    }
+  }
+
+done:
+  agent_unlock ();
+
+  g_object_unref (agent);
+
+  return retval;
+}
+
+static gssize
+nice_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer,
+    gsize count, GError **error)
+{
+  NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
+  NiceAgent *agent;  /* owned */
+  gssize len;
+
+  /* Closed streams are not readable. */
+  if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+        "Stream is closed.");
+    return -1;
+  }
+
+  /* Has the agent disappeared? */
+  agent = g_weak_ref_get (&priv->agent_ref);
+  if (agent == NULL) {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+        "Stream is closed due to the NiceAgent being finalised.");
+    return -1;
+  }
+
+  len = nice_agent_recv_nonblocking (agent, priv->stream_id,
+      priv->component_id, (guint8 *) buffer, count, NULL, error);
+
+  g_object_unref (agent);
+
+  return len;
+}
+
+static GSource *
+nice_input_stream_create_source (GPollableInputStream *stream,
+    GCancellable *cancellable)
+{
+  NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
+  GSource *component_source = NULL;
+  Component *component = NULL;
+  Stream *_stream = NULL;
+  NiceAgent *agent;  /* owned */
+
+  /* Closed streams cannot have sources. */
+  if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
+    return g_pollable_source_new (G_OBJECT (stream));  /* dummy */
+
+  /* Has the agent disappeared? */
+  agent = g_weak_ref_get (&priv->agent_ref);
+  if (agent == NULL)
+    return g_pollable_source_new (G_OBJECT (stream));  /* dummy */
+
+  agent_lock ();
+
+  /* Grab the socket for this component. */
+  if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+          &_stream, &component)) {
+    g_warning ("Could not find component %u in stream %u", priv->component_id,
+        priv->stream_id);
+    component_source = g_pollable_source_new (G_OBJECT (stream));  /* dummy */
+    goto done;
+  }
+
+  component_source = component_source_new (component, G_OBJECT (stream),
+      G_IO_IN, cancellable);
+
+done:
+  agent_unlock ();
+
+  g_object_unref (agent);
+
+  return component_source;
+}
+
 static void
 streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
 {

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git



More information about the Pkg-telepathy-commits mailing list