[Pkg-voip-commits] [janus] 160/282: Add optional exchange for RabbitMQ transport

Jonas Smedegaard dr at jones.dk
Wed Dec 20 21:53:37 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to annotated tag debian/0.2.6-1
in repository janus.

commit b9d3ca04283fe1d739978eab5f65e1fae18c08b8
Author: Piter Konstantinov <pit.here at gmail.com>
Date:   Mon Oct 30 18:03:38 2017 +0300

    Add optional exchange for RabbitMQ transport
---
 conf/janus.transport.rabbitmq.cfg.sample |  1 +
 transports/janus_rabbitmq.c              | 34 ++++++++++++++++++++++++++++++--
 2 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/conf/janus.transport.rabbitmq.cfg.sample b/conf/janus.transport.rabbitmq.cfg.sample
index 2d1bf79..21ca5aa 100644
--- a/conf/janus.transport.rabbitmq.cfg.sample
+++ b/conf/janus.transport.rabbitmq.cfg.sample
@@ -25,6 +25,7 @@ host = localhost			; The address of the RabbitMQ server
 ;vhost = /					; Virtual host to specify when logging in, if needed
 to_janus = to-janus			; Name of the queue for incoming messages
 from_janus = from-janus		; Name of the queue for outgoing messages
+;janus_exchange = janus-exchange ; Exchange for outgoing messages, using default if not provided
 ;ssl_enable = no			; Whether ssl support must be enabled
 ;ssl_verify_peer = yes		; Whether peer verification must be enabled
 ;ssl_verify_hostname = yes	; Whether hostname verification must be enabled
diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c
index ecac6fe..35deb0e 100644
--- a/transports/janus_rabbitmq.c
+++ b/transports/janus_rabbitmq.c
@@ -107,6 +107,9 @@ static gboolean rmq_janus_api_enabled = FALSE;
 static gboolean rmq_admin_api_enabled = FALSE;
 static gboolean notify_events = TRUE;
 
+/* FIXME: Should it be configable? */
+#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
+
 /* JSON serialization options */
 static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
 
@@ -116,6 +119,7 @@ typedef struct janus_rabbitmq_client {
 	amqp_connection_state_t rmq_conn;		/* AMQP connection state */
 	amqp_channel_t rmq_channel;				/* AMQP channel */
 	gboolean janus_api_enabled;				/* Whether the Janus API via RabbitMQ is enabled */
+	amqp_bytes_t janus_exchange;			/* AMQP exchange for outgoing messages */
 	amqp_bytes_t to_janus_queue;			/* AMQP outgoing messages queue (Janus API) */
 	amqp_bytes_t from_janus_queue;			/* AMQP incoming messages queue (Janus API) */
 	gboolean admin_api_enabled;				/* Whether the Janus API via RabbitMQ is enabled */
@@ -255,6 +259,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
 	/* Now check if the Janus API must be supported */
 	const char *to_janus = NULL, *from_janus = NULL;
 	const char *to_janus_admin = NULL, *from_janus_admin = NULL;
+	const char *janus_exchange = NULL;
 	item = janus_config_get_item_drilldown(config, "general", "enable");
 	if(!item || !item->value || !janus_is_true(item->value)) {
 		JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Janus API)\n");
@@ -272,7 +277,17 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
 			goto error;
 		}
 		from_janus = g_strdup(item->value);
-		JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
+		item = janus_config_get_item_drilldown(config, "general", "janus_exchange");
+		if(!item || !item->value) {
+			JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n");
+		} else {
+			janus_exchange = g_strdup(item->value);
+		}
+		if (janus_exchange == NULL) {
+			JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
+		} else {
+			JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exch: (%s)\n", rmqhost, rmqport, to_janus, from_janus, janus_exchange);
+		}
 		rmq_janus_api_enabled = TRUE;
 	}
 	/* Do the same for the admin API */
@@ -368,6 +383,17 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
 			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
 			goto error;
 		}
+		rmq_client->janus_exchange = amqp_empty_bytes;
+		if(janus_exchange != NULL) {
+			JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
+			rmq_client->janus_exchange = amqp_cstring_bytes(janus_exchange);
+			amqp_exchange_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
+			result = amqp_get_rpc_reply(rmq_client->rmq_conn);
+			if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+				JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+				goto error;
+			}
+		}
 		rmq_client->janus_api_enabled = FALSE;
 		if(rmq_janus_api_enabled) {
 			rmq_client->janus_api_enabled = TRUE;
@@ -470,6 +496,8 @@ error:
 		g_free((char *)username);
 	if(password)
 		g_free((char *)password);
+	if(janus_exchange)
+		g_free((char *)janus_exchange);
 	if(to_janus)
 		g_free((char *)to_janus);
 	if(from_janus)
@@ -514,6 +542,8 @@ void janus_rabbitmq_destroy(void) {
 			g_free((char *)rmq_client->to_janus_admin_queue.bytes);
 		if(rmq_client->from_janus_admin_queue.bytes)
 			g_free((char *)rmq_client->from_janus_admin_queue.bytes);
+		if(rmq_client->janus_exchange.bytes)
+			g_free((char *)rmq_client->janus_exchange.bytes);
 	}
 	g_free(rmq_client);
 
@@ -713,7 +743,7 @@ void *janus_rmq_out_thread(void *data) {
 			props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
 			props.content_type = amqp_cstring_bytes("application/json");
 			amqp_bytes_t message = amqp_cstring_bytes(payload_text);
-			int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_empty_bytes,
+			int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange,
 				response->admin ? rmq_client->from_janus_admin_queue : rmq_client->from_janus_queue,
 				0, 0, &props, message);
 			if(status != AMQP_STATUS_OK) {

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-voip/janus.git



More information about the Pkg-voip-commits mailing list