[Pkg-voip-commits] [janus] 161/282: RabbitMQ event handler

Jonas Smedegaard dr at jones.dk
Wed Dec 20 21:53:37 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to annotated tag debian/0.2.6-1
in repository janus.

commit 7cc0387c04f78c588b29bc835ef93ca89daaaa10
Author: Piter Konstantinov <pit.here at gmail.com>
Date:   Mon Oct 30 18:14:48 2017 +0300

    RabbitMQ event handler
---
 Makefile.am                                    |  10 +
 conf/janus.eventhandler.rabbitmqevh.cfg.sample |  23 +
 configure.ac                                   |  23 +-
 events/janus_rabbitmqevh.c                     | 724 +++++++++++++++++++++++++
 4 files changed, 779 insertions(+), 1 deletion(-)

diff --git a/Makefile.am b/Makefile.am
index 53cc180..51bf21f 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -279,6 +279,16 @@ conf_DATA += conf/janus.eventhandler.sampleevh.cfg.sample
 EXTRA_DIST += conf/janus.eventhandler.sampleevh.cfg.sample
 endif
 
+if ENABLE_RABBITMQEVH
+event_LTLIBRARIES += events/libjanus_rabbitmqevh.la
+events_libjanus_rabbitmqevh_la_SOURCES = events/janus_rabbitmqevh.c
+events_libjanus_rabbitmqevh_la_CFLAGS = $(events_cflags)
+events_libjanus_rabbitmqevh_la_LDFLAGS = $(events_ldflags) -lrabbitmq
+events_libjanus_rabbitmqevh_la_LIBADD = $(events_libadd)
+conf_DATA += conf/janus.eventhandler.rabbitmqevh.cfg.sample
+EXTRA_DIST += conf/janus.eventhandler.rabbitmqevh.cfg.sample
+endif
+
 ##
 # Plugins
 ##
diff --git a/conf/janus.eventhandler.rabbitmqevh.cfg.sample b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
new file mode 100644
index 0000000..40e6771
--- /dev/null
+++ b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
@@ -0,0 +1,23 @@
+; This configures the RabbitMQ event handler.
+
+[general]
+enabled = no				; By default the module is not enabled
+events = all				; Comma separated list of the events mask you're interested
+							; in. Valid values are none, sessions, handles, jsep, webrtc,
+							; media, plugins, transports, core and all. By default we
+							; subscribe to everything (all)
+host = 192.168.99.100		; The address of the RabbitMQ server
+;port = 5672				; The port of the RabbitMQ server (5672 by default)
+;username = guest			; Username to use to authenticate, if needed
+;password = guest			; Password to use to authenticate, if needed
+;vhost = /					; Virtual host to specify when logging in, if needed
+exchange = janus-exchange
+route_key = janus-events	; Name of the queue for event messages
+
+;ssl_enable = no			; Whether ssl support must be enabled
+;ssl_verify_peer = yes		; Whether peer verification must be enabled
+;ssl_verify_hostname = yes	; Whether hostname verification must be enabled
+; certificates to use when SSL support is enabled, if needed
+;ssl_cacert = /path/to/cacert.pem
+;ssl_cert = /path/to/cert.pem
+;ssl_key = /path/to/key.pem
diff --git a/configure.ac b/configure.ac
index e29b4dc..c5e3ec6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -128,6 +128,8 @@ AC_ARG_ENABLE([all-handlers],
               [
                AS_IF([test "x$enable_sample_event_handler" != "xyes"],
                      [enable_sample_event_handler=no])
+               AS_IF([test "x$enable_rabbitmq_event_handler" != "xyes"],
+                     [enable_rabbitmq_event_handler=no])
               ],
               [])
 
@@ -184,6 +186,13 @@ AC_ARG_ENABLE([sample-event-handler],
                      [enable_sample_event_handler=no])],
               [enable_sample_event_handler=maybe])
 
+AC_ARG_ENABLE([rabbitmq-event-handler],
+              [AS_HELP_STRING([--disable-rabbitmq-event-handler],
+                              [Disable RabbitMQ event handler ])],
+              [AS_IF([test "x$enable_rabbitmq_event_handler" != "xyes"],
+                     [enable_rabbitmq_event_handler=no])],
+              [enable_rabbitmq_event_handler=maybe])
+
 PKG_CHECK_MODULES([JANUS],
                   [
                     glib-2.0 >= $glib_version
@@ -397,10 +406,18 @@ AC_CHECK_LIB([rabbitmq],
                   AC_DEFINE(HAVE_RABBITMQ)
                   enable_rabbitmq=yes
                ])
+               AS_IF([test "x$enable_rabbitmq_event_handler" != "xno"],
+               [
+                 AC_DEFINE(HAVE_RABBITMQEVH)
+                 enable_rabbitmq_event_handler=yes
+               ])
+
              ],
              [
                AS_IF([test "x$enable_rabbitmq" = "xyes"],
                      [AC_MSG_ERROR([rabbitmq-c not found. See README.md for installation instructions or use --disable-rabbitmq])])
+               AS_IF([test "x$enable_rabbitmq_event_handler" = "xyes"],
+                     [AC_MSG_ERROR([rabbitmq-c not found. See README.md for installation instructions or use --disable-rabbitmq-event-handler])])
              ])
 AC_CHECK_LIB([paho-mqtt3a],
              [MQTTAsync_create],
@@ -416,6 +433,7 @@ AC_CHECK_LIB([paho-mqtt3a],
                      [AC_MSG_ERROR([paho c client not found. See README.md for installation instructions or use --disable-mqtt])])
              ])
 AM_CONDITIONAL([ENABLE_RABBITMQ], [test "x$enable_rabbitmq" = "xyes"])
+AM_CONDITIONAL([ENABLE_RABBITMQEVH], [test "x$enable_rabbitmq_event_handler" = "xyes"])
 AM_CONDITIONAL([ENABLE_MQTT], [test "x$enable_mqtt" = "xyes"])
 
 AC_TRY_COMPILE([
@@ -784,6 +802,9 @@ echo "Event handlers:"
 AM_COND_IF([ENABLE_SAMPLEEVH],
 	[echo "    Sample event handler:  yes"],
 	[echo "    Sample event handler:  no"])
+AM_COND_IF([ENABLE_RABBITMQEVH],
+  [echo "    RabbitMQ event handler:yes"],
+  [echo "    RabbitMQ event handler:no"])
 AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
 	echo "JavaScript modules:        yes"
 	echo "    Using npm:             $NPM"
@@ -804,4 +825,4 @@ AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
 
 echo
 echo "If this configuration is ok for you, do a 'make' to start building Janus. A 'make install' will install Janus and its plugins to the specified prefix. Finally, a 'make configs' will install some sample configuration files too (something you'll only want to do the first time, though)."
-echo
+echo
\ No newline at end of file
diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c
new file mode 100644
index 0000000..78c837b
--- /dev/null
+++ b/events/janus_rabbitmqevh.c
@@ -0,0 +1,724 @@
+/*! \file   janus_rabbitmqevh.c
+ * \author Lorenzo Miniero <lorenzo at meetecho.com>
+ * \copyright GNU General Public License v3
+ * \brief  Janus RabbitMQEventHandler plugin
+ * \details  This is a trivial RabbitMQ event handler plugin for Janus
+ *
+ * \ingroup eventhandlers
+ * \ref eventhandlers
+ */
+
+#include "eventhandler.h"
+
+#include <math.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+#include <amqp_tcp_socket.h>
+#include <amqp_ssl_socket.h>
+
+#include "../debug.h"
+#include "../config.h"
+#include "../mutex.h"
+#include "../utils.h"
+
+
+/* Plugin information */
+#define JANUS_RABBITMQEVH_VERSION			1
+#define JANUS_RABBITMQEVH_VERSION_STRING	"0.0.1"
+#define JANUS_RABBITMQEVH_DESCRIPTION		"This is a trivial RabbitMQ event handler plugin for Janus."
+#define JANUS_RABBITMQEVH_NAME				"JANUS RabbitMQEventHandler plugin"
+#define JANUS_RABBITMQEVH_AUTHOR			"Meetecho s.r.l."
+#define JANUS_RABBITMQEVH_PACKAGE			"janus.eventhandler.rabbitmqevh"
+
+/* Plugin methods */
+janus_eventhandler *create(void);
+int janus_rabbitmqevh_init(const char *config_path);
+void janus_rabbitmqevh_destroy(void);
+int janus_rabbitmqevh_get_api_compatibility(void);
+int janus_rabbitmqevh_get_version(void);
+const char *janus_rabbitmqevh_get_version_string(void);
+const char *janus_rabbitmqevh_get_description(void);
+const char *janus_rabbitmqevh_get_name(void);
+const char *janus_rabbitmqevh_get_author(void);
+const char *janus_rabbitmqevh_get_package(void);
+void janus_rabbitmqevh_incoming_event(json_t *event);
+
+/* Event handler setup */
+static janus_eventhandler janus_rabbitmqevh =
+	JANUS_EVENTHANDLER_INIT (
+		.init = janus_rabbitmqevh_init,
+		.destroy = janus_rabbitmqevh_destroy,
+
+		.get_api_compatibility = janus_rabbitmqevh_get_api_compatibility,
+		.get_version = janus_rabbitmqevh_get_version,
+		.get_version_string = janus_rabbitmqevh_get_version_string,
+		.get_description = janus_rabbitmqevh_get_description,
+		.get_name = janus_rabbitmqevh_get_name,
+		.get_author = janus_rabbitmqevh_get_author,
+		.get_package = janus_rabbitmqevh_get_package,
+
+		.incoming_event = janus_rabbitmqevh_incoming_event,
+
+		.events_mask = JANUS_EVENT_TYPE_NONE
+	);
+
+/* Plugin creator */
+janus_eventhandler *create(void) {
+	JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_RABBITMQEVH_NAME);
+	return &janus_rabbitmqevh;
+}
+
+
+/* Useful stuff */
+static volatile gint initialized = 0, stopping = 0;
+static GThread *handler_thread;
+static void *janus_rabbitmqevh_handler(void *data);
+
+/* Queue of events to handle */
+static GAsyncQueue *events = NULL;
+static json_t exit_event;
+static void janus_rabbitmqevh_event_free(json_t *event) {
+	if(!event || event == &exit_event)
+		return;
+	json_decref(event);
+}
+
+/* JSON serialization options */
+static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
+
+/* FIXME: Should it be configable? */
+#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
+
+/* RabbitMQ session */
+static amqp_connection_state_t rmq_conn;
+static amqp_channel_t rmq_channel = 0;
+static amqp_bytes_t rmq_exchange;
+static amqp_bytes_t rmq_route_key;
+
+
+/* Plugin implementation */
+int janus_rabbitmqevh_init(const char *config_path) {
+	if(g_atomic_int_get(&stopping)) {
+		/* Still stopping from before */
+		return -1;
+	}
+	if(config_path == NULL) {
+		/* Invalid arguments */
+		return -1;
+	}
+	/* Read configuration */
+	char filename[255];
+	g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_RABBITMQEVH_PACKAGE);
+	JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
+	janus_config *config = janus_config_parse(filename);
+	if(config != NULL)
+		janus_config_print(config);
+
+	char *rmqhost = NULL;
+	const char *vhost = NULL, *username = NULL, *password = NULL;
+	const char *ssl_cacert_file = NULL;
+	const char *ssl_cert_file = NULL;
+	const char *ssl_key_file = NULL;
+	gboolean ssl_enable = FALSE;
+	gboolean ssl_verify_peer = FALSE;
+	gboolean ssl_verify_hostname = FALSE;
+	const char *route_key = NULL, *exchange = NULL;
+
+	/* Setup the event handler, if required */
+	janus_config_item *item = janus_config_get_item_drilldown(config, "general", "enabled");
+	if(!item || !item->value || !janus_is_true(item->value)) {
+		JANUS_LOG(LOG_WARN, "RabbitMQ event handler disabled\n");
+		goto error;
+	}
+
+	item = janus_config_get_item_drilldown(config, "general", "json");
+	if(item && item->value) {
+		/* Check how we need to format/serialize the JSON output */
+		if(!strcasecmp(item->value, "indented")) {
+			/* Default: indented, we use three spaces for that */
+			json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
+		} else if(!strcasecmp(item->value, "plain")) {
+			/* Not indented and no new lines, but still readable */
+			json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
+		} else if(!strcasecmp(item->value, "compact")) {
+			/* Compact, so no spaces between separators */
+			json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
+		} else {
+			JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
+			json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
+		}
+	}
+
+	/* Which events should we subscribe to? */
+	item = janus_config_get_item_drilldown(config, "general", "events");
+	if(item && item->value) {
+		if(!strcasecmp(item->value, "none")) {
+			/* Don't subscribe to anything at all */
+			janus_flags_reset(&janus_rabbitmqevh.events_mask);
+		} else if(!strcasecmp(item->value, "all")) {
+			/* Subscribe to everything */
+			janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_ALL);
+		} else {
+			/* Check what we need to subscribe to */
+			gchar **subscribe = g_strsplit(item->value, ",", -1);
+			if(subscribe != NULL) {
+				gchar *index = subscribe[0];
+				if(index != NULL) {
+					int i=0;
+					while(index != NULL) {
+						while(isspace(*index))
+							index++;
+						if(strlen(index)) {
+							if(!strcasecmp(index, "sessions")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_SESSION);
+							} else if(!strcasecmp(index, "handles")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_HANDLE);
+							} else if(!strcasecmp(index, "jsep")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_JSEP);
+							} else if(!strcasecmp(index, "webrtc")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_WEBRTC);
+							} else if(!strcasecmp(index, "media")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_MEDIA);
+							} else if(!strcasecmp(index, "plugins")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_PLUGIN);
+							} else if(!strcasecmp(index, "transports")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_TRANSPORT);
+							} else if(!strcasecmp(index, "core")) {
+								janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_CORE);
+							} else {
+								JANUS_LOG(LOG_WARN, "Unknown event type '%s'\n", index);
+							}
+						}
+						i++;
+						index = subscribe[i];
+					}
+				}
+				g_strfreev(subscribe);
+			}
+		}
+	}
+
+	/* Handle configuration, starting from the server details */
+	item = janus_config_get_item_drilldown(config, "general", "host");
+	if(item && item->value)
+		rmqhost = g_strdup(item->value);
+	else
+		rmqhost = g_strdup("localhost");
+	int rmqport = AMQP_PROTOCOL_PORT;
+	item = janus_config_get_item_drilldown(config, "general", "port");
+	if(item && item->value)
+		rmqport = atoi(item->value);
+
+	/* Credentials and Virtual Host */
+	item = janus_config_get_item_drilldown(config, "general", "vhost");
+	if(item && item->value)
+		vhost = g_strdup(item->value);
+	else
+	vhost = g_strdup("/");
+	item = janus_config_get_item_drilldown(config, "general", "username");
+	if(item && item->value)
+		username = g_strdup(item->value);
+	else
+		username = g_strdup("guest");
+	item = janus_config_get_item_drilldown(config, "general", "password");
+	if(item && item->value)
+		password = g_strdup(item->value);
+	else
+		password = g_strdup("guest");
+
+	/* SSL config*/
+	item = janus_config_get_item_drilldown(config, "general", "ssl_enable");
+	if(!item || !item->value || !janus_is_true(item->value)) {
+		JANUS_LOG(LOG_INFO, "RabbitMQ SSL support disabled\n");
+	} else {
+		ssl_enable = TRUE;
+		item = janus_config_get_item_drilldown(config, "general", "ssl_cacert");
+		if(item && item->value)
+			ssl_cacert_file = g_strdup(item->value);
+		item = janus_config_get_item_drilldown(config, "general", "ssl_cert");
+		if(item && item->value)
+			ssl_cert_file = g_strdup(item->value);
+		item = janus_config_get_item_drilldown(config, "general", "ssl_key");
+		if(item && item->value)
+			ssl_key_file = g_strdup(item->value);
+		item = janus_config_get_item_drilldown(config, "general", "ssl_verify_peer");
+		if(item && item->value && janus_is_true(item->value))
+			ssl_verify_peer = TRUE;
+		item = janus_config_get_item_drilldown(config, "general", "ssl_verify_hostname");
+		if(item && item->value && janus_is_true(item->value))
+			ssl_verify_hostname = TRUE;
+	}
+
+	/* Parse configuration */
+	item = janus_config_get_item_drilldown(config, "general", "route_key");
+	if(!item || !item->value) {
+		JANUS_LOG(LOG_FATAL, "Missing name of outgoing route_key for RabbitMQ...\n");
+		goto error;
+	}
+	route_key = g_strdup(item->value);
+	item = janus_config_get_item_drilldown(config, "general", "exchange");
+	if(!item || !item->value) {
+		JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ, using default\n");
+	} else {
+		exchange = g_strdup(item->value);
+	}
+	if (exchange == NULL) {
+		JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s)\n", rmqhost, rmqport, route_key);
+	} else {
+		JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s) exch: (%s)\n", rmqhost, rmqport, route_key, exchange);
+	}
+
+	/* Connect */
+	rmq_conn = amqp_new_connection();
+	amqp_socket_t *socket = NULL;
+	int status;
+	JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n");
+	if (ssl_enable) {
+		socket = amqp_ssl_socket_new(rmq_conn);
+		if(socket == NULL) {
+			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+			goto error;
+		}
+		if(ssl_verify_peer) {
+			amqp_ssl_socket_set_verify_peer(socket, 1);
+		} else {
+			amqp_ssl_socket_set_verify_peer(socket, 0);
+		}
+		if(ssl_verify_hostname) {
+			amqp_ssl_socket_set_verify_hostname(socket, 1);
+		} else {
+			amqp_ssl_socket_set_verify_hostname(socket, 0);
+		}
+		if(ssl_cacert_file) {
+			status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file);
+			if(status != AMQP_STATUS_OK) {
+				JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
+				goto error;
+			}
+		}
+		if(ssl_cert_file && ssl_key_file) {
+			amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file);
+			if(status != AMQP_STATUS_OK) {
+				JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
+				goto error;
+			}
+		}
+	} else {
+		socket = amqp_tcp_socket_new(rmq_conn);
+		if(socket == NULL) {
+			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+			goto error;
+		}
+	}
+
+	JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n");
+	status = amqp_socket_open(socket, rmqhost, rmqport);
+	if(status != AMQP_STATUS_OK) {
+		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
+		goto error;
+	}
+	JANUS_LOG(LOG_VERB, "Logging in...\n");
+	amqp_rpc_reply_t result = amqp_login(rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
+	if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+		goto error;
+	}
+	rmq_channel = 1;
+	JANUS_LOG(LOG_VERB, "Opening channel...\n");
+	amqp_channel_open(rmq_conn, rmq_channel);
+	result = amqp_get_rpc_reply(rmq_conn);
+	if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+		goto error;
+	}
+	rmq_exchange = amqp_empty_bytes;
+	if(exchange != NULL) {
+		JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
+		rmq_exchange = amqp_cstring_bytes(exchange);
+		amqp_exchange_declare(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
+		result = amqp_get_rpc_reply(rmq_conn);
+		if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+			goto error;
+		}
+	}
+	JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
+	rmq_route_key = amqp_cstring_bytes(route_key);
+	amqp_queue_declare(rmq_conn, rmq_channel, rmq_route_key, 0, 0, 0, 0, amqp_empty_table);
+	result = amqp_get_rpc_reply(rmq_conn);
+	if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+		goto error;
+	}
+
+	/* Initialize the events queue */
+	events = g_async_queue_new_full((GDestroyNotify) janus_rabbitmqevh_event_free);
+	g_atomic_int_set(&initialized, 1);
+
+	GError *error = NULL;
+	handler_thread = g_thread_try_new("janus sampleevh handler", janus_rabbitmqevh_handler, NULL, &error);
+	if(error != NULL) {
+		g_atomic_int_set(&initialized, 0);
+		JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the SampleEventHandler handler thread...\n", error->code, error->message ? error->message : "??");
+		goto error;
+	}
+
+	/* Done */
+	JANUS_LOG(LOG_INFO, "Setup of RabbitMQ event handler completed\n");
+
+	if(rmqhost)
+		g_free((char *)rmqhost);
+	if(config)
+		janus_config_destroy(config);
+
+	JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_RABBITMQEVH_NAME);
+	return 0;
+
+error:
+	/* If we got here, something went wrong */
+	if(rmqhost)
+		g_free((char *)rmqhost);
+	if(vhost)
+		g_free((char *)vhost);
+	if(username)
+		g_free((char *)username);
+	if(password)
+		g_free((char *)password);
+	if(route_key)
+		g_free((char *)route_key);
+	if(exchange)
+		g_free((char *)exchange);
+	if(ssl_cacert_file)
+		g_free((char *)ssl_cacert_file);
+	if(ssl_cert_file)
+		g_free((char *)ssl_cert_file);
+	if(ssl_key_file)
+		g_free((char *)ssl_key_file);
+	if(config)
+		janus_config_destroy(config);
+	return -1;
+}
+
+void janus_rabbitmqevh_destroy(void) {
+	if(!g_atomic_int_get(&initialized))
+		return;
+	g_atomic_int_set(&stopping, 1);
+
+	g_async_queue_push(events, &exit_event);
+	if(handler_thread != NULL) {
+		g_thread_join(handler_thread);
+		handler_thread = NULL;
+	}
+
+	g_async_queue_unref(events);
+	events = NULL;
+
+	if(rmq_conn && rmq_channel) {
+		amqp_channel_close(rmq_conn, rmq_channel, AMQP_REPLY_SUCCESS);
+		amqp_connection_close(rmq_conn, AMQP_REPLY_SUCCESS);
+		amqp_destroy_connection(rmq_conn);
+	}
+	if(rmq_exchange.bytes)
+		g_free((char *)rmq_exchange.bytes);
+	if(rmq_route_key.bytes)
+		g_free((char *)rmq_route_key.bytes);
+
+	g_atomic_int_set(&initialized, 0);
+	g_atomic_int_set(&stopping, 0);
+	JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_RABBITMQEVH_NAME);
+}
+
+int janus_rabbitmqevh_get_api_compatibility(void) {
+	/* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
+	return JANUS_EVENTHANDLER_API_VERSION;
+}
+
+int janus_rabbitmqevh_get_version(void) {
+	return JANUS_RABBITMQEVH_VERSION;
+}
+
+const char *janus_rabbitmqevh_get_version_string(void) {
+	return JANUS_RABBITMQEVH_VERSION_STRING;
+}
+
+const char *janus_rabbitmqevh_get_description(void) {
+	return JANUS_RABBITMQEVH_DESCRIPTION;
+}
+
+const char *janus_rabbitmqevh_get_name(void) {
+	return JANUS_RABBITMQEVH_NAME;
+}
+
+const char *janus_rabbitmqevh_get_author(void) {
+	return JANUS_RABBITMQEVH_AUTHOR;
+}
+
+const char *janus_rabbitmqevh_get_package(void) {
+	return JANUS_RABBITMQEVH_PACKAGE;
+}
+
+void janus_rabbitmqevh_incoming_event(json_t *event) {
+	if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
+		/* Janus is closing or the plugin is: unref the event as we won't handle it */
+		json_decref(event);
+		return;
+	}
+
+	/* Do NOT handle the event here in this callback! Since Janus notifies you right
+	 * away when something happens, these events are triggered from working threads and
+	 * not some sort of message bus. As such, performing I/O or network operations in
+	 * here could dangerously slow Janus down. Let's just reference and enqueue the event,
+	 * and handle it in our own thread: the event contains a monotonic time indicator of
+	 * when the event actually happened on this machine, so that, if relevant, we can compute
+	 * any delay in the actual event processing ourselves. */
+	json_incref(event);
+	g_async_queue_push(events, event);
+}
+
+/* Thread to handle incoming events */
+static void *janus_rabbitmqevh_handler(void *data) {
+	JANUS_LOG(LOG_VERB, "Joining RabbitMQEventHandler handler thread\n");
+	json_t *event = NULL;
+	char *event_text = NULL;
+
+	while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
+
+		event = g_async_queue_pop(events);
+		if(event == NULL)
+			continue;
+		if(event == &exit_event)
+			break;
+
+		/* Handle event: just for fun, let's see how long it took for us to take care of this */
+		json_t *created = json_object_get(event, "timestamp");
+		if(created && json_is_integer(created)) {
+			gint64 then = json_integer_value(created);
+			gint64 now = janus_get_monotonic_time();
+			JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
+		}
+
+		/* Let's check what kind of event this is: we don't really do anything
+		 * with it in this plugin, it's just to show how you can handle
+		 * different types of events in an event handler. */
+		int type = json_integer_value(json_object_get(event, "type"));
+		switch(type) {
+			case JANUS_EVENT_TYPE_SESSION:
+				/* This is a session related event. The only info that is
+				 * required is a name for the event itself: a "created"
+				 * event may also contain transport info, in the form of
+				 * the transport module that originated the session
+				 * (e.g., "janus.transport.http") and an internal unique
+				 * ID for the transport instance (which may be associated
+				 * to a connection or anything else within the specifics
+				 * of the transport module itself). Here's an example of
+				 * a new session being created:
+					{
+					   "type": 1,
+					   "timestamp": 3583879627,
+					   "session_id": 2004798115,
+					   "event": {
+						  "name": "created"
+					   },
+					   "transport": {
+					      "transport": "janus.transport.http",
+					      "id": "0x7fcb100008c0"
+					   }
+					}
+				*/
+				break;
+			case JANUS_EVENT_TYPE_HANDLE:
+				/* This is a handle related event. The only info that is provided
+				 * are the name for the event itself and the package name of the
+				 * plugin this handle refers to (e.g., "janus.plugin.echotest").
+				 * Here's an example of a new handled being attached in a session
+				 * to the EchoTest plugin:
+					{
+					   "type": 2,
+					   "timestamp": 3570304977,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "name": "attached",
+						  "plugin: "janus.plugin.echotest"
+					   }
+					}
+				*/
+				break;
+			case JANUS_EVENT_TYPE_JSEP:
+				/* This is a JSEP/SDP related event. It provides information
+				 * about an ongoing WebRTC negotiation, and so tells you
+				 * about the SDP being sent/received, and who's sending it
+				 * ("local" means Janus, "remote" means the user). Here's an
+				 * example, where the user originated an offer towards Janus:
+					{
+					   "type": 8,
+					   "timestamp": 3570400208,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "owner": "remote",
+						  "jsep": {
+							 "type": "offer",
+							 "sdp": "v=0[..]\r\n"
+						  }
+					   }
+					}
+				*/
+				break;
+			case JANUS_EVENT_TYPE_WEBRTC:
+				/* This is a WebRTC related event, and so the content of
+				 * the event may vary quite a bit. In fact, you may be notified
+				 * about ICE or DTLS states, or when a WebRTC PeerConnection
+				 * goes up or down. Here are some examples, in no particular order:
+					{
+					   "type": 16,
+					   "timestamp": 3570416659,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "ice": "connecting",
+						  "stream_id": 1,
+						  "component_id": 1
+					   }
+					}
+				 *
+					{
+					   "type": 16,
+					   "timestamp": 3570637554,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "selected-pair": "[..]",
+						  "stream_id": 1,
+						  "component_id": 1
+					   }
+					}
+				 *
+					{
+					   "type": 16,
+					   "timestamp": 3570656112,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "dtls": "connected",
+						  "stream_id": 1,
+						  "component_id": 1
+					   }
+					}
+				 *
+					{
+					   "type": 16,
+					   "timestamp": 3570657237,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "connection": "webrtcup"
+					   }
+					}
+				*/
+				break;
+			case JANUS_EVENT_TYPE_MEDIA:
+				/* This is a media related event. This can contain different
+				 * information about the health of a media session, or about
+				 * what's going on in general (e.g., when Janus started/stopped
+				 * receiving media of a certain type, or (TODO) when some media related
+				 * statistics are available). Here's an example of Janus getting
+				 * video from the peer for the first time, or after a second
+				 * of no video at all (which would have triggered a "receiving": false):
+					{
+					   "type": 32,
+					   "timestamp": 3571078797,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "media": "video",
+						  "receiving": "true"
+					   }
+					}
+				*/
+				break;
+			case JANUS_EVENT_TYPE_PLUGIN:
+				/* This is a plugin related event. Since each plugin may
+				 * provide info in a very custom way, the format of this event
+				 * is in general very dynamic. You'll always find, though,
+				 * an "event" object containing the package name of the
+				 * plugin (e.g., "janus.plugin.echotest") and a "data"
+				 * object that contains whatever the plugin decided to
+				 * notify you about, that will always vary from plugin to
+				 * plugin. Besides, notice that "session_id" and "handle_id"
+				 * may or may not be present: when they are, you'll know
+				 * the event has been triggered within the context of a
+				 * specific handle session with the plugin; when they're
+				 * not, the plugin sent an event out of context of a
+				 * specific session it is handling. Here's an example:
+					{
+					   "type": 64,
+					   "timestamp": 3570336031,
+					   "session_id": 2004798115,
+					   "handle_id": 3708519405,
+					   "event": {
+						  "plugin": "janus.plugin.echotest",
+						  "data": {
+							 "audio_active": "true",
+							 "video_active": "true",
+							 "bitrate": 0
+						  }
+					   }
+					}
+				*/
+				break;
+			case JANUS_EVENT_TYPE_TRANSPORT:
+				/* This is a transport related event (TODO). The syntax of
+				 * the common format (transport specific data aside) is
+				 * exactly the same as that of the plugin related events
+				 * above, with a "transport" property instead of "plugin"
+				 * to contain the transport package name. */
+				break;
+			case JANUS_EVENT_TYPE_CORE:
+				/* This is a core related event. This can contain different
+				 * information about the health of the Janus instance, or
+				 * more generically on some events in the Janus life cycle
+				 * (e.g., when it's just been started or when a shutdown
+				 * has been requested). Considering the heterogeneous nature
+				 * of the information being reported, the content is always
+				 * a JSON object (event). Core events are the only ones
+				 * missing a session_id. Here's an example:
+					{
+					   "type": 256,
+					   "timestamp": 28381185382,
+					   "event": {
+						  "status": "started"
+					   }
+					}
+				*/
+				break;
+			default:
+				JANUS_LOG(LOG_WARN, "Unknown type of event '%d'\n", type);
+				break;
+		}
+
+		if(!g_atomic_int_get(&stopping)) {
+			/* Since this a simple plugin, it does the same for all events: so just convert to string... */
+			event_text = json_dumps(event, json_format);
+			amqp_basic_properties_t props;
+			props._flags = 0;
+			props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
+			props.content_type = amqp_cstring_bytes("application/json");
+			amqp_bytes_t message = amqp_cstring_bytes(event_text);
+			int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, rmq_route_key, 0, 0, &props, message);
+			if(status != AMQP_STATUS_OK) {
+				JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
+			}
+			free(event_text);
+			event_text = NULL;
+		}
+
+		/* Cleanup */
+		/* Done, let's unref the event */
+		json_decref(event);
+		event = NULL;
+	}
+	JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler handler thread\n");
+	return NULL;
+}

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-voip/janus.git



More information about the Pkg-voip-commits mailing list