[Pkg-telepathy-commits] [libnice] 85/265: agent: Add GPollableOutputStream support to NiceOutputStream
Simon McVittie
smcv at debian.org
Wed May 14 12:04:55 UTC 2014
This is an automated email from the git hooks/post-receive script.
smcv pushed a commit to branch debian
in repository libnice.
commit cf9d3f18e6b0b0eaee701aaf61ae88b4819f360c
Author: Philip Withnall <philip.withnall at collabora.co.uk>
Date: Tue Dec 17 10:30:19 2013 +0000
agent: Add GPollableOutputStream support to NiceOutputStream
---
agent/agent.h | 2 +
agent/outputstream.c | 168 ++++++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 167 insertions(+), 3 deletions(-)
diff --git a/agent/agent.h b/agent/agent.h
index 8b4c974..649e715 100644
--- a/agent/agent.h
+++ b/agent/agent.h
@@ -1131,6 +1131,8 @@ nice_agent_parse_remote_candidate_sdp (
*
* Build a #GIOStream wrapper around the given stream and component in
* @agent. The I/O stream will be valid for as long as @stream_id is valid.
+ * The #GInputStream and #GOutputStream implement #GPollableInputStream and
+ * #GPollableOutputStream.
*
* This function may only be called on reliable #NiceAgents. It is an error to
* try and create an I/O stream wrapper for an unreliable stream.
diff --git a/agent/outputstream.c b/agent/outputstream.c
index 85b29e2..839c36c 100644
--- a/agent/outputstream.c
+++ b/agent/outputstream.c
@@ -65,13 +65,20 @@
# include "config.h"
#endif
+#include <errno.h>
+
#include "outputstream.h"
+#include "agent-priv.h"
+static void nice_output_stream_init_pollable (
+ GPollableOutputStreamInterface *iface);
static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
gpointer user_data);
-G_DEFINE_TYPE (NiceOutputStream,
- nice_output_stream, G_TYPE_OUTPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (NiceOutputStream,
+ nice_output_stream, G_TYPE_OUTPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+ nice_output_stream_init_pollable));
enum
{
@@ -96,7 +103,12 @@ static void nice_output_stream_set_property (GObject *object, guint prop_id,
static gssize nice_output_stream_write (GOutputStream *stream,
const void *buffer, gsize count, GCancellable *cancellable, GError **error);
-
+static gboolean nice_output_stream_is_writable (GPollableOutputStream *stream);
+static gssize nice_output_stream_write_nonblocking (
+ GPollableOutputStream *stream, const void *buffer, gsize count,
+ GError **error);
+static GSource *nice_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable);
/* Output Stream */
static void
@@ -248,6 +260,14 @@ nice_output_stream_init (NiceOutputStream *stream)
g_weak_ref_init (&stream->priv->agent_ref, NULL);
}
+static void
+nice_output_stream_init_pollable (GPollableOutputStreamInterface *iface)
+{
+ iface->is_writable = nice_output_stream_is_writable;
+ iface->write_nonblocking = nice_output_stream_write_nonblocking;
+ iface->create_source = nice_output_stream_create_source;
+}
+
/**
* nice_output_stream_new:
* @agent: A #NiceAgent
@@ -401,6 +421,148 @@ done:
return len;
}
+static gboolean
+nice_output_stream_is_writable (GPollableOutputStream *stream)
+{
+ NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
+ Component *component = NULL;
+ Stream *_stream = NULL;
+ gboolean retval = FALSE;
+ GSList *i;
+ NiceAgent *agent; /* owned */
+
+ /* Closed streams are not writeable. */
+ if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
+ return FALSE;
+
+ /* Has the agent disappeared? */
+ agent = g_weak_ref_get (&priv->agent_ref);
+ if (agent == NULL)
+ return FALSE;
+
+ agent_lock ();
+
+ if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+ &_stream, &component)) {
+ g_warning ("Could not find component %u in stream %u", priv->component_id,
+ priv->stream_id);
+ goto done;
+ }
+
+ /* If it’s a reliable agent, see if there’s any space in the pseudo-TCP output
+ * buffer. */
+ if (agent->reliable && component->tcp != NULL &&
+ pseudo_tcp_socket_can_send (component->tcp)) {
+ retval = TRUE;
+ goto done;
+ }
+
+ /* Check whether any of the component’s FDs are pollable. */
+ for (i = component->socket_sources; i != NULL; i = i->next) {
+ SocketSource *socket_source = i->data;
+ NiceSocket *socket = socket_source->socket;
+
+ if (g_socket_condition_check (socket->fileno, G_IO_OUT) != 0) {
+ retval = TRUE;
+ break;
+ }
+ }
+
+done:
+ agent_unlock ();
+
+ g_object_unref (agent);
+
+ return retval;
+}
+
+static gssize
+nice_output_stream_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer, gsize count, GError **error)
+{
+ NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
+ NiceAgent *agent; /* owned */
+ gssize len;
+
+ /* Closed streams are not writeable. */
+ if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream))) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is closed.");
+ return -1;
+ }
+
+ /* Has the agent disappeared? */
+ agent = g_weak_ref_get (&priv->agent_ref);
+ if (agent == NULL) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ "Stream is closed due to the NiceAgent being finalised.");
+ return -1;
+ }
+
+ if (count == 0)
+ return 0;
+
+ /* This is equivalent to the default GPollableOutputStream implementation. */
+ if (!g_pollable_output_stream_is_writable (stream)) {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ g_strerror (EAGAIN));
+ return -1;
+ }
+
+ len = nice_agent_send_full (agent, priv->stream_id, priv->component_id,
+ buffer, count, NULL, error);
+
+ g_object_unref (agent);
+
+ return len;
+}
+
+static GSource *
+nice_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ NiceOutputStreamPrivate *priv = NICE_OUTPUT_STREAM (stream)->priv;
+ GSource *component_source = NULL;
+ Component *component = NULL;
+ Stream *_stream = NULL;
+ NiceAgent *agent; /* owned */
+
+ /* Closed streams cannot have sources. */
+ if (g_output_stream_is_closed (G_OUTPUT_STREAM (stream)))
+ return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
+
+ /* Has the agent disappeared? */
+ agent = g_weak_ref_get (&priv->agent_ref);
+ if (agent == NULL)
+ return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
+
+ agent_lock ();
+
+ /* Grab the socket for this component. */
+ if (!agent_find_component (agent, priv->stream_id, priv->component_id,
+ &_stream, &component)) {
+ g_warning ("Could not find component %u in stream %u", priv->component_id,
+ priv->stream_id);
+ component_source = g_pollable_source_new (G_OBJECT (stream)); /* dummy */
+ goto done;
+ }
+
+ /* Note: We need G_IO_IN here to handle pseudo-TCP streams. If our TCP
+ * transmit buffer is full, but the kernel's receive buffer has pending ACKs
+ * sitting in it, we need to receive those ACKs so we can transmit the head
+ * bytes in the transmit buffer, and hence free up space in the tail of the
+ * buffer so the stream is writeable again. */
+ component_source = component_source_new (component, G_OBJECT (stream),
+ G_IO_IN | G_IO_OUT, cancellable);
+
+done:
+ agent_unlock ();
+
+ g_object_unref (agent);
+
+ return component_source;
+}
+
static void
streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
{
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-telepathy/libnice.git
More information about the Pkg-telepathy-commits
mailing list