[Pkg-voip-commits] [janus] 161/282: RabbitMQ event handler
Jonas Smedegaard
dr at jones.dk
Wed Dec 20 21:53:37 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to annotated tag debian/0.2.6-1
in repository janus.
commit 7cc0387c04f78c588b29bc835ef93ca89daaaa10
Author: Piter Konstantinov <pit.here at gmail.com>
Date: Mon Oct 30 18:14:48 2017 +0300
RabbitMQ event handler
---
Makefile.am | 10 +
conf/janus.eventhandler.rabbitmqevh.cfg.sample | 23 +
configure.ac | 23 +-
events/janus_rabbitmqevh.c | 724 +++++++++++++++++++++++++
4 files changed, 779 insertions(+), 1 deletion(-)
diff --git a/Makefile.am b/Makefile.am
index 53cc180..51bf21f 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -279,6 +279,16 @@ conf_DATA += conf/janus.eventhandler.sampleevh.cfg.sample
EXTRA_DIST += conf/janus.eventhandler.sampleevh.cfg.sample
endif
+if ENABLE_RABBITMQEVH
+event_LTLIBRARIES += events/libjanus_rabbitmqevh.la
+events_libjanus_rabbitmqevh_la_SOURCES = events/janus_rabbitmqevh.c
+events_libjanus_rabbitmqevh_la_CFLAGS = $(events_cflags)
+events_libjanus_rabbitmqevh_la_LDFLAGS = $(events_ldflags) -lrabbitmq
+events_libjanus_rabbitmqevh_la_LIBADD = $(events_libadd)
+conf_DATA += conf/janus.eventhandler.rabbitmqevh.cfg.sample
+EXTRA_DIST += conf/janus.eventhandler.rabbitmqevh.cfg.sample
+endif
+
##
# Plugins
##
diff --git a/conf/janus.eventhandler.rabbitmqevh.cfg.sample b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
new file mode 100644
index 0000000..40e6771
--- /dev/null
+++ b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
@@ -0,0 +1,23 @@
+; This configures the RabbitMQ event handler.
+
+[general]
+enabled = no ; By default the module is not enabled
+events = all ; Comma separated list of the events mask you're interested
+ ; in. Valid values are none, sessions, handles, jsep, webrtc,
+ ; media, plugins, transports, core and all. By default we
+ ; subscribe to everything (all)
+host = 192.168.99.100 ; The address of the RabbitMQ server
+;port = 5672 ; The port of the RabbitMQ server (5672 by default)
+;username = guest ; Username to use to authenticate, if needed
+;password = guest ; Password to use to authenticate, if needed
+;vhost = / ; Virtual host to specify when logging in, if needed
+exchange = janus-exchange
+route_key = janus-events ; Name of the queue for event messages
+
+;ssl_enable = no ; Whether ssl support must be enabled
+;ssl_verify_peer = yes ; Whether peer verification must be enabled
+;ssl_verify_hostname = yes ; Whether hostname verification must be enabled
+; certificates to use when SSL support is enabled, if needed
+;ssl_cacert = /path/to/cacert.pem
+;ssl_cert = /path/to/cert.pem
+;ssl_key = /path/to/key.pem
diff --git a/configure.ac b/configure.ac
index e29b4dc..c5e3ec6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -128,6 +128,8 @@ AC_ARG_ENABLE([all-handlers],
[
AS_IF([test "x$enable_sample_event_handler" != "xyes"],
[enable_sample_event_handler=no])
+ AS_IF([test "x$enable_rabbitmq_event_handler" != "xyes"],
+ [enable_rabbitmq_event_handler=no])
],
[])
@@ -184,6 +186,13 @@ AC_ARG_ENABLE([sample-event-handler],
[enable_sample_event_handler=no])],
[enable_sample_event_handler=maybe])
+AC_ARG_ENABLE([rabbitmq-event-handler],
+ [AS_HELP_STRING([--disable-rabbitmq-event-handler],
+ [Disable RabbitMQ event handler ])],
+ [AS_IF([test "x$enable_rabbitmq_event_handler" != "xyes"],
+ [enable_rabbitmq_event_handler=no])],
+ [enable_rabbitmq_event_handler=maybe])
+
PKG_CHECK_MODULES([JANUS],
[
glib-2.0 >= $glib_version
@@ -397,10 +406,18 @@ AC_CHECK_LIB([rabbitmq],
AC_DEFINE(HAVE_RABBITMQ)
enable_rabbitmq=yes
])
+ AS_IF([test "x$enable_rabbitmq_event_handler" != "xno"],
+ [
+ AC_DEFINE(HAVE_RABBITMQEVH)
+ enable_rabbitmq_event_handler=yes
+ ])
+
],
[
AS_IF([test "x$enable_rabbitmq" = "xyes"],
[AC_MSG_ERROR([rabbitmq-c not found. See README.md for installation instructions or use --disable-rabbitmq])])
+ AS_IF([test "x$enable_rabbitmq_event_handler" = "xyes"],
+ [AC_MSG_ERROR([rabbitmq-c not found. See README.md for installation instructions or use --disable-rabbitmq-event-handler])])
])
AC_CHECK_LIB([paho-mqtt3a],
[MQTTAsync_create],
@@ -416,6 +433,7 @@ AC_CHECK_LIB([paho-mqtt3a],
[AC_MSG_ERROR([paho c client not found. See README.md for installation instructions or use --disable-mqtt])])
])
AM_CONDITIONAL([ENABLE_RABBITMQ], [test "x$enable_rabbitmq" = "xyes"])
+AM_CONDITIONAL([ENABLE_RABBITMQEVH], [test "x$enable_rabbitmq_event_handler" = "xyes"])
AM_CONDITIONAL([ENABLE_MQTT], [test "x$enable_mqtt" = "xyes"])
AC_TRY_COMPILE([
@@ -784,6 +802,9 @@ echo "Event handlers:"
AM_COND_IF([ENABLE_SAMPLEEVH],
[echo " Sample event handler: yes"],
[echo " Sample event handler: no"])
+AM_COND_IF([ENABLE_RABBITMQEVH],
+ [echo " RabbitMQ event handler:yes"],
+ [echo " RabbitMQ event handler:no"])
AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
echo "JavaScript modules: yes"
echo " Using npm: $NPM"
@@ -804,4 +825,4 @@ AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
echo
echo "If this configuration is ok for you, do a 'make' to start building Janus. A 'make install' will install Janus and its plugins to the specified prefix. Finally, a 'make configs' will install some sample configuration files too (something you'll only want to do the first time, though)."
-echo
+echo
\ No newline at end of file
diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c
new file mode 100644
index 0000000..78c837b
--- /dev/null
+++ b/events/janus_rabbitmqevh.c
@@ -0,0 +1,724 @@
+/*! \file janus_rabbitmqevh.c
+ * \author Lorenzo Miniero <lorenzo at meetecho.com>
+ * \copyright GNU General Public License v3
+ * \brief Janus RabbitMQEventHandler plugin
+ * \details This is a trivial RabbitMQ event handler plugin for Janus
+ *
+ * \ingroup eventhandlers
+ * \ref eventhandlers
+ */
+
+#include "eventhandler.h"
+
+#include <math.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+#include <amqp_tcp_socket.h>
+#include <amqp_ssl_socket.h>
+
+#include "../debug.h"
+#include "../config.h"
+#include "../mutex.h"
+#include "../utils.h"
+
+
+/* Plugin information */
+#define JANUS_RABBITMQEVH_VERSION 1
+#define JANUS_RABBITMQEVH_VERSION_STRING "0.0.1"
+#define JANUS_RABBITMQEVH_DESCRIPTION "This is a trivial RabbitMQ event handler plugin for Janus."
+#define JANUS_RABBITMQEVH_NAME "JANUS RabbitMQEventHandler plugin"
+#define JANUS_RABBITMQEVH_AUTHOR "Meetecho s.r.l."
+#define JANUS_RABBITMQEVH_PACKAGE "janus.eventhandler.rabbitmqevh"
+
+/* Plugin methods */
+janus_eventhandler *create(void);
+int janus_rabbitmqevh_init(const char *config_path);
+void janus_rabbitmqevh_destroy(void);
+int janus_rabbitmqevh_get_api_compatibility(void);
+int janus_rabbitmqevh_get_version(void);
+const char *janus_rabbitmqevh_get_version_string(void);
+const char *janus_rabbitmqevh_get_description(void);
+const char *janus_rabbitmqevh_get_name(void);
+const char *janus_rabbitmqevh_get_author(void);
+const char *janus_rabbitmqevh_get_package(void);
+void janus_rabbitmqevh_incoming_event(json_t *event);
+
+/* Event handler setup */
+static janus_eventhandler janus_rabbitmqevh =
+ JANUS_EVENTHANDLER_INIT (
+ .init = janus_rabbitmqevh_init,
+ .destroy = janus_rabbitmqevh_destroy,
+
+ .get_api_compatibility = janus_rabbitmqevh_get_api_compatibility,
+ .get_version = janus_rabbitmqevh_get_version,
+ .get_version_string = janus_rabbitmqevh_get_version_string,
+ .get_description = janus_rabbitmqevh_get_description,
+ .get_name = janus_rabbitmqevh_get_name,
+ .get_author = janus_rabbitmqevh_get_author,
+ .get_package = janus_rabbitmqevh_get_package,
+
+ .incoming_event = janus_rabbitmqevh_incoming_event,
+
+ .events_mask = JANUS_EVENT_TYPE_NONE
+ );
+
+/* Plugin creator */
+janus_eventhandler *create(void) {
+ JANUS_LOG(LOG_VERB, "%s created!\n", JANUS_RABBITMQEVH_NAME);
+ return &janus_rabbitmqevh;
+}
+
+
+/* Useful stuff */
+static volatile gint initialized = 0, stopping = 0;
+static GThread *handler_thread;
+static void *janus_rabbitmqevh_handler(void *data);
+
+/* Queue of events to handle */
+static GAsyncQueue *events = NULL;
+static json_t exit_event;
+static void janus_rabbitmqevh_event_free(json_t *event) {
+ if(!event || event == &exit_event)
+ return;
+ json_decref(event);
+}
+
+/* JSON serialization options */
+static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
+
+/* FIXME: Should it be configable? */
+#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
+
+/* RabbitMQ session */
+static amqp_connection_state_t rmq_conn;
+static amqp_channel_t rmq_channel = 0;
+static amqp_bytes_t rmq_exchange;
+static amqp_bytes_t rmq_route_key;
+
+
+/* Plugin implementation */
+int janus_rabbitmqevh_init(const char *config_path) {
+ if(g_atomic_int_get(&stopping)) {
+ /* Still stopping from before */
+ return -1;
+ }
+ if(config_path == NULL) {
+ /* Invalid arguments */
+ return -1;
+ }
+ /* Read configuration */
+ char filename[255];
+ g_snprintf(filename, 255, "%s/%s.cfg", config_path, JANUS_RABBITMQEVH_PACKAGE);
+ JANUS_LOG(LOG_VERB, "Configuration file: %s\n", filename);
+ janus_config *config = janus_config_parse(filename);
+ if(config != NULL)
+ janus_config_print(config);
+
+ char *rmqhost = NULL;
+ const char *vhost = NULL, *username = NULL, *password = NULL;
+ const char *ssl_cacert_file = NULL;
+ const char *ssl_cert_file = NULL;
+ const char *ssl_key_file = NULL;
+ gboolean ssl_enable = FALSE;
+ gboolean ssl_verify_peer = FALSE;
+ gboolean ssl_verify_hostname = FALSE;
+ const char *route_key = NULL, *exchange = NULL;
+
+ /* Setup the event handler, if required */
+ janus_config_item *item = janus_config_get_item_drilldown(config, "general", "enabled");
+ if(!item || !item->value || !janus_is_true(item->value)) {
+ JANUS_LOG(LOG_WARN, "RabbitMQ event handler disabled\n");
+ goto error;
+ }
+
+ item = janus_config_get_item_drilldown(config, "general", "json");
+ if(item && item->value) {
+ /* Check how we need to format/serialize the JSON output */
+ if(!strcasecmp(item->value, "indented")) {
+ /* Default: indented, we use three spaces for that */
+ json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
+ } else if(!strcasecmp(item->value, "plain")) {
+ /* Not indented and no new lines, but still readable */
+ json_format = JSON_INDENT(0) | JSON_PRESERVE_ORDER;
+ } else if(!strcasecmp(item->value, "compact")) {
+ /* Compact, so no spaces between separators */
+ json_format = JSON_COMPACT | JSON_PRESERVE_ORDER;
+ } else {
+ JANUS_LOG(LOG_WARN, "Unsupported JSON format option '%s', using default (indented)\n", item->value);
+ json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
+ }
+ }
+
+ /* Which events should we subscribe to? */
+ item = janus_config_get_item_drilldown(config, "general", "events");
+ if(item && item->value) {
+ if(!strcasecmp(item->value, "none")) {
+ /* Don't subscribe to anything at all */
+ janus_flags_reset(&janus_rabbitmqevh.events_mask);
+ } else if(!strcasecmp(item->value, "all")) {
+ /* Subscribe to everything */
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_ALL);
+ } else {
+ /* Check what we need to subscribe to */
+ gchar **subscribe = g_strsplit(item->value, ",", -1);
+ if(subscribe != NULL) {
+ gchar *index = subscribe[0];
+ if(index != NULL) {
+ int i=0;
+ while(index != NULL) {
+ while(isspace(*index))
+ index++;
+ if(strlen(index)) {
+ if(!strcasecmp(index, "sessions")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_SESSION);
+ } else if(!strcasecmp(index, "handles")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_HANDLE);
+ } else if(!strcasecmp(index, "jsep")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_JSEP);
+ } else if(!strcasecmp(index, "webrtc")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_WEBRTC);
+ } else if(!strcasecmp(index, "media")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_MEDIA);
+ } else if(!strcasecmp(index, "plugins")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_PLUGIN);
+ } else if(!strcasecmp(index, "transports")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_TRANSPORT);
+ } else if(!strcasecmp(index, "core")) {
+ janus_flags_set(&janus_rabbitmqevh.events_mask, JANUS_EVENT_TYPE_CORE);
+ } else {
+ JANUS_LOG(LOG_WARN, "Unknown event type '%s'\n", index);
+ }
+ }
+ i++;
+ index = subscribe[i];
+ }
+ }
+ g_strfreev(subscribe);
+ }
+ }
+ }
+
+ /* Handle configuration, starting from the server details */
+ item = janus_config_get_item_drilldown(config, "general", "host");
+ if(item && item->value)
+ rmqhost = g_strdup(item->value);
+ else
+ rmqhost = g_strdup("localhost");
+ int rmqport = AMQP_PROTOCOL_PORT;
+ item = janus_config_get_item_drilldown(config, "general", "port");
+ if(item && item->value)
+ rmqport = atoi(item->value);
+
+ /* Credentials and Virtual Host */
+ item = janus_config_get_item_drilldown(config, "general", "vhost");
+ if(item && item->value)
+ vhost = g_strdup(item->value);
+ else
+ vhost = g_strdup("/");
+ item = janus_config_get_item_drilldown(config, "general", "username");
+ if(item && item->value)
+ username = g_strdup(item->value);
+ else
+ username = g_strdup("guest");
+ item = janus_config_get_item_drilldown(config, "general", "password");
+ if(item && item->value)
+ password = g_strdup(item->value);
+ else
+ password = g_strdup("guest");
+
+ /* SSL config*/
+ item = janus_config_get_item_drilldown(config, "general", "ssl_enable");
+ if(!item || !item->value || !janus_is_true(item->value)) {
+ JANUS_LOG(LOG_INFO, "RabbitMQ SSL support disabled\n");
+ } else {
+ ssl_enable = TRUE;
+ item = janus_config_get_item_drilldown(config, "general", "ssl_cacert");
+ if(item && item->value)
+ ssl_cacert_file = g_strdup(item->value);
+ item = janus_config_get_item_drilldown(config, "general", "ssl_cert");
+ if(item && item->value)
+ ssl_cert_file = g_strdup(item->value);
+ item = janus_config_get_item_drilldown(config, "general", "ssl_key");
+ if(item && item->value)
+ ssl_key_file = g_strdup(item->value);
+ item = janus_config_get_item_drilldown(config, "general", "ssl_verify_peer");
+ if(item && item->value && janus_is_true(item->value))
+ ssl_verify_peer = TRUE;
+ item = janus_config_get_item_drilldown(config, "general", "ssl_verify_hostname");
+ if(item && item->value && janus_is_true(item->value))
+ ssl_verify_hostname = TRUE;
+ }
+
+ /* Parse configuration */
+ item = janus_config_get_item_drilldown(config, "general", "route_key");
+ if(!item || !item->value) {
+ JANUS_LOG(LOG_FATAL, "Missing name of outgoing route_key for RabbitMQ...\n");
+ goto error;
+ }
+ route_key = g_strdup(item->value);
+ item = janus_config_get_item_drilldown(config, "general", "exchange");
+ if(!item || !item->value) {
+ JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ, using default\n");
+ } else {
+ exchange = g_strdup(item->value);
+ }
+ if (exchange == NULL) {
+ JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s)\n", rmqhost, rmqport, route_key);
+ } else {
+ JANUS_LOG(LOG_INFO, "RabbitMQ event handler enabled, %s:%d (%s) exch: (%s)\n", rmqhost, rmqport, route_key, exchange);
+ }
+
+ /* Connect */
+ rmq_conn = amqp_new_connection();
+ amqp_socket_t *socket = NULL;
+ int status;
+ JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n");
+ if (ssl_enable) {
+ socket = amqp_ssl_socket_new(rmq_conn);
+ if(socket == NULL) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+ goto error;
+ }
+ if(ssl_verify_peer) {
+ amqp_ssl_socket_set_verify_peer(socket, 1);
+ } else {
+ amqp_ssl_socket_set_verify_peer(socket, 0);
+ }
+ if(ssl_verify_hostname) {
+ amqp_ssl_socket_set_verify_hostname(socket, 1);
+ } else {
+ amqp_ssl_socket_set_verify_hostname(socket, 0);
+ }
+ if(ssl_cacert_file) {
+ status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file);
+ if(status != AMQP_STATUS_OK) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
+ goto error;
+ }
+ }
+ if(ssl_cert_file && ssl_key_file) {
+ amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file);
+ if(status != AMQP_STATUS_OK) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
+ goto error;
+ }
+ }
+ } else {
+ socket = amqp_tcp_socket_new(rmq_conn);
+ if(socket == NULL) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+ goto error;
+ }
+ }
+
+ JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n");
+ status = amqp_socket_open(socket, rmqhost, rmqport);
+ if(status != AMQP_STATUS_OK) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
+ goto error;
+ }
+ JANUS_LOG(LOG_VERB, "Logging in...\n");
+ amqp_rpc_reply_t result = amqp_login(rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
+ if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ goto error;
+ }
+ rmq_channel = 1;
+ JANUS_LOG(LOG_VERB, "Opening channel...\n");
+ amqp_channel_open(rmq_conn, rmq_channel);
+ result = amqp_get_rpc_reply(rmq_conn);
+ if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ goto error;
+ }
+ rmq_exchange = amqp_empty_bytes;
+ if(exchange != NULL) {
+ JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
+ rmq_exchange = amqp_cstring_bytes(exchange);
+ amqp_exchange_declare(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
+ result = amqp_get_rpc_reply(rmq_conn);
+ if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ goto error;
+ }
+ }
+ JANUS_LOG(LOG_VERB, "Declaring outgoing queue... (%s)\n", route_key);
+ rmq_route_key = amqp_cstring_bytes(route_key);
+ amqp_queue_declare(rmq_conn, rmq_channel, rmq_route_key, 0, 0, 0, 0, amqp_empty_table);
+ result = amqp_get_rpc_reply(rmq_conn);
+ if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ goto error;
+ }
+
+ /* Initialize the events queue */
+ events = g_async_queue_new_full((GDestroyNotify) janus_rabbitmqevh_event_free);
+ g_atomic_int_set(&initialized, 1);
+
+ GError *error = NULL;
+ handler_thread = g_thread_try_new("janus sampleevh handler", janus_rabbitmqevh_handler, NULL, &error);
+ if(error != NULL) {
+ g_atomic_int_set(&initialized, 0);
+ JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the SampleEventHandler handler thread...\n", error->code, error->message ? error->message : "??");
+ goto error;
+ }
+
+ /* Done */
+ JANUS_LOG(LOG_INFO, "Setup of RabbitMQ event handler completed\n");
+
+ if(rmqhost)
+ g_free((char *)rmqhost);
+ if(config)
+ janus_config_destroy(config);
+
+ JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_RABBITMQEVH_NAME);
+ return 0;
+
+error:
+ /* If we got here, something went wrong */
+ if(rmqhost)
+ g_free((char *)rmqhost);
+ if(vhost)
+ g_free((char *)vhost);
+ if(username)
+ g_free((char *)username);
+ if(password)
+ g_free((char *)password);
+ if(route_key)
+ g_free((char *)route_key);
+ if(exchange)
+ g_free((char *)exchange);
+ if(ssl_cacert_file)
+ g_free((char *)ssl_cacert_file);
+ if(ssl_cert_file)
+ g_free((char *)ssl_cert_file);
+ if(ssl_key_file)
+ g_free((char *)ssl_key_file);
+ if(config)
+ janus_config_destroy(config);
+ return -1;
+}
+
+void janus_rabbitmqevh_destroy(void) {
+ if(!g_atomic_int_get(&initialized))
+ return;
+ g_atomic_int_set(&stopping, 1);
+
+ g_async_queue_push(events, &exit_event);
+ if(handler_thread != NULL) {
+ g_thread_join(handler_thread);
+ handler_thread = NULL;
+ }
+
+ g_async_queue_unref(events);
+ events = NULL;
+
+ if(rmq_conn && rmq_channel) {
+ amqp_channel_close(rmq_conn, rmq_channel, AMQP_REPLY_SUCCESS);
+ amqp_connection_close(rmq_conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection(rmq_conn);
+ }
+ if(rmq_exchange.bytes)
+ g_free((char *)rmq_exchange.bytes);
+ if(rmq_route_key.bytes)
+ g_free((char *)rmq_route_key.bytes);
+
+ g_atomic_int_set(&initialized, 0);
+ g_atomic_int_set(&stopping, 0);
+ JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_RABBITMQEVH_NAME);
+}
+
+int janus_rabbitmqevh_get_api_compatibility(void) {
+ /* Important! This is what your plugin MUST always return: don't lie here or bad things will happen */
+ return JANUS_EVENTHANDLER_API_VERSION;
+}
+
+int janus_rabbitmqevh_get_version(void) {
+ return JANUS_RABBITMQEVH_VERSION;
+}
+
+const char *janus_rabbitmqevh_get_version_string(void) {
+ return JANUS_RABBITMQEVH_VERSION_STRING;
+}
+
+const char *janus_rabbitmqevh_get_description(void) {
+ return JANUS_RABBITMQEVH_DESCRIPTION;
+}
+
+const char *janus_rabbitmqevh_get_name(void) {
+ return JANUS_RABBITMQEVH_NAME;
+}
+
+const char *janus_rabbitmqevh_get_author(void) {
+ return JANUS_RABBITMQEVH_AUTHOR;
+}
+
+const char *janus_rabbitmqevh_get_package(void) {
+ return JANUS_RABBITMQEVH_PACKAGE;
+}
+
+void janus_rabbitmqevh_incoming_event(json_t *event) {
+ if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
+ /* Janus is closing or the plugin is: unref the event as we won't handle it */
+ json_decref(event);
+ return;
+ }
+
+ /* Do NOT handle the event here in this callback! Since Janus notifies you right
+ * away when something happens, these events are triggered from working threads and
+ * not some sort of message bus. As such, performing I/O or network operations in
+ * here could dangerously slow Janus down. Let's just reference and enqueue the event,
+ * and handle it in our own thread: the event contains a monotonic time indicator of
+ * when the event actually happened on this machine, so that, if relevant, we can compute
+ * any delay in the actual event processing ourselves. */
+ json_incref(event);
+ g_async_queue_push(events, event);
+}
+
+/* Thread to handle incoming events */
+static void *janus_rabbitmqevh_handler(void *data) {
+ JANUS_LOG(LOG_VERB, "Joining RabbitMQEventHandler handler thread\n");
+ json_t *event = NULL;
+ char *event_text = NULL;
+
+ while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
+
+ event = g_async_queue_pop(events);
+ if(event == NULL)
+ continue;
+ if(event == &exit_event)
+ break;
+
+ /* Handle event: just for fun, let's see how long it took for us to take care of this */
+ json_t *created = json_object_get(event, "timestamp");
+ if(created && json_is_integer(created)) {
+ gint64 then = json_integer_value(created);
+ gint64 now = janus_get_monotonic_time();
+ JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
+ }
+
+ /* Let's check what kind of event this is: we don't really do anything
+ * with it in this plugin, it's just to show how you can handle
+ * different types of events in an event handler. */
+ int type = json_integer_value(json_object_get(event, "type"));
+ switch(type) {
+ case JANUS_EVENT_TYPE_SESSION:
+ /* This is a session related event. The only info that is
+ * required is a name for the event itself: a "created"
+ * event may also contain transport info, in the form of
+ * the transport module that originated the session
+ * (e.g., "janus.transport.http") and an internal unique
+ * ID for the transport instance (which may be associated
+ * to a connection or anything else within the specifics
+ * of the transport module itself). Here's an example of
+ * a new session being created:
+ {
+ "type": 1,
+ "timestamp": 3583879627,
+ "session_id": 2004798115,
+ "event": {
+ "name": "created"
+ },
+ "transport": {
+ "transport": "janus.transport.http",
+ "id": "0x7fcb100008c0"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_HANDLE:
+ /* This is a handle related event. The only info that is provided
+ * are the name for the event itself and the package name of the
+ * plugin this handle refers to (e.g., "janus.plugin.echotest").
+ * Here's an example of a new handled being attached in a session
+ * to the EchoTest plugin:
+ {
+ "type": 2,
+ "timestamp": 3570304977,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "name": "attached",
+ "plugin: "janus.plugin.echotest"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_JSEP:
+ /* This is a JSEP/SDP related event. It provides information
+ * about an ongoing WebRTC negotiation, and so tells you
+ * about the SDP being sent/received, and who's sending it
+ * ("local" means Janus, "remote" means the user). Here's an
+ * example, where the user originated an offer towards Janus:
+ {
+ "type": 8,
+ "timestamp": 3570400208,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "owner": "remote",
+ "jsep": {
+ "type": "offer",
+ "sdp": "v=0[..]\r\n"
+ }
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_WEBRTC:
+ /* This is a WebRTC related event, and so the content of
+ * the event may vary quite a bit. In fact, you may be notified
+ * about ICE or DTLS states, or when a WebRTC PeerConnection
+ * goes up or down. Here are some examples, in no particular order:
+ {
+ "type": 16,
+ "timestamp": 3570416659,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "ice": "connecting",
+ "stream_id": 1,
+ "component_id": 1
+ }
+ }
+ *
+ {
+ "type": 16,
+ "timestamp": 3570637554,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "selected-pair": "[..]",
+ "stream_id": 1,
+ "component_id": 1
+ }
+ }
+ *
+ {
+ "type": 16,
+ "timestamp": 3570656112,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "dtls": "connected",
+ "stream_id": 1,
+ "component_id": 1
+ }
+ }
+ *
+ {
+ "type": 16,
+ "timestamp": 3570657237,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "connection": "webrtcup"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_MEDIA:
+ /* This is a media related event. This can contain different
+ * information about the health of a media session, or about
+ * what's going on in general (e.g., when Janus started/stopped
+ * receiving media of a certain type, or (TODO) when some media related
+ * statistics are available). Here's an example of Janus getting
+ * video from the peer for the first time, or after a second
+ * of no video at all (which would have triggered a "receiving": false):
+ {
+ "type": 32,
+ "timestamp": 3571078797,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "media": "video",
+ "receiving": "true"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_PLUGIN:
+ /* This is a plugin related event. Since each plugin may
+ * provide info in a very custom way, the format of this event
+ * is in general very dynamic. You'll always find, though,
+ * an "event" object containing the package name of the
+ * plugin (e.g., "janus.plugin.echotest") and a "data"
+ * object that contains whatever the plugin decided to
+ * notify you about, that will always vary from plugin to
+ * plugin. Besides, notice that "session_id" and "handle_id"
+ * may or may not be present: when they are, you'll know
+ * the event has been triggered within the context of a
+ * specific handle session with the plugin; when they're
+ * not, the plugin sent an event out of context of a
+ * specific session it is handling. Here's an example:
+ {
+ "type": 64,
+ "timestamp": 3570336031,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "plugin": "janus.plugin.echotest",
+ "data": {
+ "audio_active": "true",
+ "video_active": "true",
+ "bitrate": 0
+ }
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_TRANSPORT:
+ /* This is a transport related event (TODO). The syntax of
+ * the common format (transport specific data aside) is
+ * exactly the same as that of the plugin related events
+ * above, with a "transport" property instead of "plugin"
+ * to contain the transport package name. */
+ break;
+ case JANUS_EVENT_TYPE_CORE:
+ /* This is a core related event. This can contain different
+ * information about the health of the Janus instance, or
+ * more generically on some events in the Janus life cycle
+ * (e.g., when it's just been started or when a shutdown
+ * has been requested). Considering the heterogeneous nature
+ * of the information being reported, the content is always
+ * a JSON object (event). Core events are the only ones
+ * missing a session_id. Here's an example:
+ {
+ "type": 256,
+ "timestamp": 28381185382,
+ "event": {
+ "status": "started"
+ }
+ }
+ */
+ break;
+ default:
+ JANUS_LOG(LOG_WARN, "Unknown type of event '%d'\n", type);
+ break;
+ }
+
+ if(!g_atomic_int_get(&stopping)) {
+ /* Since this a simple plugin, it does the same for all events: so just convert to string... */
+ event_text = json_dumps(event, json_format);
+ amqp_basic_properties_t props;
+ props._flags = 0;
+ props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
+ props.content_type = amqp_cstring_bytes("application/json");
+ amqp_bytes_t message = amqp_cstring_bytes(event_text);
+ int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, rmq_route_key, 0, 0, &props, message);
+ if(status != AMQP_STATUS_OK) {
+ JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
+ }
+ free(event_text);
+ event_text = NULL;
+ }
+
+ /* Cleanup */
+ /* Done, let's unref the event */
+ json_decref(event);
+ event = NULL;
+ }
+ JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler handler thread\n");
+ return NULL;
+}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-voip/janus.git
More information about the Pkg-voip-commits
mailing list