[Pkg-voip-commits] [janus] 160/282: Add optional exchange for RabbitMQ transport
Jonas Smedegaard
dr at jones.dk
Wed Dec 20 21:53:37 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to annotated tag debian/0.2.6-1
in repository janus.
commit b9d3ca04283fe1d739978eab5f65e1fae18c08b8
Author: Piter Konstantinov <pit.here at gmail.com>
Date: Mon Oct 30 18:03:38 2017 +0300
Add optional exchange for RabbitMQ transport
---
conf/janus.transport.rabbitmq.cfg.sample | 1 +
transports/janus_rabbitmq.c | 34 ++++++++++++++++++++++++++++++--
2 files changed, 33 insertions(+), 2 deletions(-)
diff --git a/conf/janus.transport.rabbitmq.cfg.sample b/conf/janus.transport.rabbitmq.cfg.sample
index 2d1bf79..21ca5aa 100644
--- a/conf/janus.transport.rabbitmq.cfg.sample
+++ b/conf/janus.transport.rabbitmq.cfg.sample
@@ -25,6 +25,7 @@ host = localhost ; The address of the RabbitMQ server
;vhost = / ; Virtual host to specify when logging in, if needed
to_janus = to-janus ; Name of the queue for incoming messages
from_janus = from-janus ; Name of the queue for outgoing messages
+;janus_exchange = janus-exchange ; Exchange for outgoing messages, using default if not provided
;ssl_enable = no ; Whether ssl support must be enabled
;ssl_verify_peer = yes ; Whether peer verification must be enabled
;ssl_verify_hostname = yes ; Whether hostname verification must be enabled
diff --git a/transports/janus_rabbitmq.c b/transports/janus_rabbitmq.c
index ecac6fe..35deb0e 100644
--- a/transports/janus_rabbitmq.c
+++ b/transports/janus_rabbitmq.c
@@ -107,6 +107,9 @@ static gboolean rmq_janus_api_enabled = FALSE;
static gboolean rmq_admin_api_enabled = FALSE;
static gboolean notify_events = TRUE;
+/* FIXME: Should it be configable? */
+#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
+
/* JSON serialization options */
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
@@ -116,6 +119,7 @@ typedef struct janus_rabbitmq_client {
amqp_connection_state_t rmq_conn; /* AMQP connection state */
amqp_channel_t rmq_channel; /* AMQP channel */
gboolean janus_api_enabled; /* Whether the Janus API via RabbitMQ is enabled */
+ amqp_bytes_t janus_exchange; /* AMQP exchange for outgoing messages */
amqp_bytes_t to_janus_queue; /* AMQP outgoing messages queue (Janus API) */
amqp_bytes_t from_janus_queue; /* AMQP incoming messages queue (Janus API) */
gboolean admin_api_enabled; /* Whether the Janus API via RabbitMQ is enabled */
@@ -255,6 +259,7 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
/* Now check if the Janus API must be supported */
const char *to_janus = NULL, *from_janus = NULL;
const char *to_janus_admin = NULL, *from_janus_admin = NULL;
+ const char *janus_exchange = NULL;
item = janus_config_get_item_drilldown(config, "general", "enable");
if(!item || !item->value || !janus_is_true(item->value)) {
JANUS_LOG(LOG_WARN, "RabbitMQ support disabled (Janus API)\n");
@@ -272,7 +277,17 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
goto error;
}
from_janus = g_strdup(item->value);
- JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
+ item = janus_config_get_item_drilldown(config, "general", "janus_exchange");
+ if(!item || !item->value) {
+ JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ integration, using default\n");
+ } else {
+ janus_exchange = g_strdup(item->value);
+ }
+ if (janus_exchange == NULL) {
+ JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s)\n", rmqhost, rmqport, to_janus, from_janus);
+ } else {
+ JANUS_LOG(LOG_INFO, "RabbitMQ support for Janus API enabled, %s:%d (%s/%s) exch: (%s)\n", rmqhost, rmqport, to_janus, from_janus, janus_exchange);
+ }
rmq_janus_api_enabled = TRUE;
}
/* Do the same for the admin API */
@@ -368,6 +383,17 @@ int janus_rabbitmq_init(janus_transport_callbacks *callback, const char *config_
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
+ rmq_client->janus_exchange = amqp_empty_bytes;
+ if(janus_exchange != NULL) {
+ JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
+ rmq_client->janus_exchange = amqp_cstring_bytes(janus_exchange);
+ amqp_exchange_declare(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
+ result = amqp_get_rpc_reply(rmq_client->rmq_conn);
+ if(result.reply_type != AMQP_RESPONSE_NORMAL) {
+ JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ goto error;
+ }
+ }
rmq_client->janus_api_enabled = FALSE;
if(rmq_janus_api_enabled) {
rmq_client->janus_api_enabled = TRUE;
@@ -470,6 +496,8 @@ error:
g_free((char *)username);
if(password)
g_free((char *)password);
+ if(janus_exchange)
+ g_free((char *)janus_exchange);
if(to_janus)
g_free((char *)to_janus);
if(from_janus)
@@ -514,6 +542,8 @@ void janus_rabbitmq_destroy(void) {
g_free((char *)rmq_client->to_janus_admin_queue.bytes);
if(rmq_client->from_janus_admin_queue.bytes)
g_free((char *)rmq_client->from_janus_admin_queue.bytes);
+ if(rmq_client->janus_exchange.bytes)
+ g_free((char *)rmq_client->janus_exchange.bytes);
}
g_free(rmq_client);
@@ -713,7 +743,7 @@ void *janus_rmq_out_thread(void *data) {
props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
amqp_bytes_t message = amqp_cstring_bytes(payload_text);
- int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, amqp_empty_bytes,
+ int status = amqp_basic_publish(rmq_client->rmq_conn, rmq_client->rmq_channel, rmq_client->janus_exchange,
response->admin ? rmq_client->from_janus_admin_queue : rmq_client->from_janus_queue,
0, 0, &props, message);
if(status != AMQP_STATUS_OK) {
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-voip/janus.git
More information about the Pkg-voip-commits
mailing list