[Pkg-voip-commits] [janus] 168/282: Fixes after review
Jonas Smedegaard
dr at jones.dk
Wed Dec 20 21:53:38 UTC 2017
This is an automated email from the git hooks/post-receive script.
js pushed a commit to annotated tag debian/0.2.6-1
in repository janus.
commit 006e4837264b57d152afeeda41e2a18c23be427e
Author: Piter Konstantinov <pit.here at gmail.com>
Date: Fri Nov 3 12:54:00 2017 +0300
Fixes after review
---
conf/janus.eventhandler.rabbitmqevh.cfg.sample | 3 +
configure.ac | 6 +-
events/janus_rabbitmqevh.c | 481 +++++++++++++------------
3 files changed, 260 insertions(+), 230 deletions(-)
diff --git a/conf/janus.eventhandler.rabbitmqevh.cfg.sample b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
index 40e6771..9adcbe0 100644
--- a/conf/janus.eventhandler.rabbitmqevh.cfg.sample
+++ b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
@@ -6,6 +6,9 @@ events = all ; Comma separated list of the events mask you're interested
; in. Valid values are none, sessions, handles, jsep, webrtc,
; media, plugins, transports, core and all. By default we
; subscribe to everything (all)
+grouping = yes ; Whether events should be sent individually , or if it's ok
+ ; to group them. The default is 'yes' to limit the number of
+ ; messages
host = 192.168.99.100 ; The address of the RabbitMQ server
;port = 5672 ; The port of the RabbitMQ server (5672 by default)
;username = guest ; Username to use to authenticate, if needed
diff --git a/configure.ac b/configure.ac
index c5e3ec6..9b520e6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -803,8 +803,8 @@ AM_COND_IF([ENABLE_SAMPLEEVH],
[echo " Sample event handler: yes"],
[echo " Sample event handler: no"])
AM_COND_IF([ENABLE_RABBITMQEVH],
- [echo " RabbitMQ event handler:yes"],
- [echo " RabbitMQ event handler:no"])
+ [echo " RabbitMQ event handler:yes"],
+ [echo " RabbitMQ event handler:no"])
AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
echo "JavaScript modules: yes"
echo " Using npm: $NPM"
@@ -825,4 +825,4 @@ AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
echo
echo "If this configuration is ok for you, do a 'make' to start building Janus. A 'make install' will install Janus and its plugins to the specified prefix. Finally, a 'make configs' will install some sample configuration files too (something you'll only want to do the first time, though)."
-echo
\ No newline at end of file
+echo
diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c
index 78c837b..76bdecd 100644
--- a/events/janus_rabbitmqevh.c
+++ b/events/janus_rabbitmqevh.c
@@ -1,5 +1,5 @@
/*! \file janus_rabbitmqevh.c
- * \author Lorenzo Miniero <lorenzo at meetecho.com>
+ * \author Piter Konstantinov <pit.here at gmail.com>
* \copyright GNU General Public License v3
* \brief Janus RabbitMQEventHandler plugin
* \details This is a trivial RabbitMQ event handler plugin for Janus
@@ -77,6 +77,7 @@ static void *janus_rabbitmqevh_handler(void *data);
/* Queue of events to handle */
static GAsyncQueue *events = NULL;
+static gboolean group_events = TRUE;
static json_t exit_event;
static void janus_rabbitmqevh_event_free(json_t *event) {
if(!event || event == &exit_event)
@@ -87,7 +88,7 @@ static void janus_rabbitmqevh_event_free(json_t *event) {
/* JSON serialization options */
static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
-/* FIXME: Should it be configable? */
+/* FIXME: Should it be configurable? */
#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
/* RabbitMQ session */
@@ -199,6 +200,11 @@ int janus_rabbitmqevh_init(const char *config_path) {
}
}
+ /* Is grouping of events ok? */
+ item = janus_config_get_item_drilldown(config, "general", "grouping");
+ if(item && item->value)
+ group_events = janus_is_true(item->value);
+
/* Handle configuration, starting from the server details */
item = janus_config_get_item_drilldown(config, "general", "host");
if(item && item->value)
@@ -230,7 +236,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
/* SSL config*/
item = janus_config_get_item_drilldown(config, "general", "ssl_enable");
if(!item || !item->value || !janus_is_true(item->value)) {
- JANUS_LOG(LOG_INFO, "RabbitMQ SSL support disabled\n");
+ JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: RabbitMQ SSL support disabled\n");
} else {
ssl_enable = TRUE;
item = janus_config_get_item_drilldown(config, "general", "ssl_cacert");
@@ -253,13 +259,13 @@ int janus_rabbitmqevh_init(const char *config_path) {
/* Parse configuration */
item = janus_config_get_item_drilldown(config, "general", "route_key");
if(!item || !item->value) {
- JANUS_LOG(LOG_FATAL, "Missing name of outgoing route_key for RabbitMQ...\n");
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Missing name of outgoing route_key for RabbitMQ...\n");
goto error;
}
route_key = g_strdup(item->value);
item = janus_config_get_item_drilldown(config, "general", "exchange");
if(!item || !item->value) {
- JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ, using default\n");
+ JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Missing name of outgoing exchange for RabbitMQ, using default\n");
} else {
exchange = g_strdup(item->value);
}
@@ -273,11 +279,11 @@ int janus_rabbitmqevh_init(const char *config_path) {
rmq_conn = amqp_new_connection();
amqp_socket_t *socket = NULL;
int status;
- JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n");
+ JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Creating RabbitMQ socket...\n");
if (ssl_enable) {
socket = amqp_ssl_socket_new(rmq_conn);
if(socket == NULL) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n");
goto error;
}
if(ssl_verify_peer) {
@@ -293,35 +299,35 @@ int janus_rabbitmqevh_init(const char *config_path) {
if(ssl_cacert_file) {
status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file);
if(status != AMQP_STATUS_OK) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
goto error;
}
}
if(ssl_cert_file && ssl_key_file) {
amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file);
if(status != AMQP_STATUS_OK) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
goto error;
}
}
} else {
socket = amqp_tcp_socket_new(rmq_conn);
if(socket == NULL) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n");
goto error;
}
}
- JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n");
+ JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Connecting to RabbitMQ server...\n");
status = amqp_socket_open(socket, rmqhost, rmqport);
if(status != AMQP_STATUS_OK) {
JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
goto error;
}
- JANUS_LOG(LOG_VERB, "Logging in...\n");
+ JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Logging in...\n");
amqp_rpc_reply_t result = amqp_login(rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
rmq_channel = 1;
@@ -329,17 +335,17 @@ int janus_rabbitmqevh_init(const char *config_path) {
amqp_channel_open(rmq_conn, rmq_channel);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
rmq_exchange = amqp_empty_bytes;
if(exchange != NULL) {
- JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
+ JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Declaring exchange...\n");
rmq_exchange = amqp_cstring_bytes(exchange);
amqp_exchange_declare(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
}
@@ -348,7 +354,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
amqp_queue_declare(rmq_conn, rmq_channel, rmq_route_key, 0, 0, 0, 0, amqp_empty_table);
result = amqp_get_rpc_reply(rmq_conn);
if(result.reply_type != AMQP_RESPONSE_NORMAL) {
- JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+ JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
goto error;
}
@@ -357,10 +363,10 @@ int janus_rabbitmqevh_init(const char *config_path) {
g_atomic_int_set(&initialized, 1);
GError *error = NULL;
- handler_thread = g_thread_try_new("janus sampleevh handler", janus_rabbitmqevh_handler, NULL, &error);
+ handler_thread = g_thread_try_new("janus rabbitmqevh handler", janus_rabbitmqevh_handler, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
- JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the SampleEventHandler handler thread...\n", error->code, error->message ? error->message : "??");
+ JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n", error->code, error->message ? error->message : "??");
goto error;
}
@@ -479,8 +485,9 @@ void janus_rabbitmqevh_incoming_event(json_t *event) {
/* Thread to handle incoming events */
static void *janus_rabbitmqevh_handler(void *data) {
JANUS_LOG(LOG_VERB, "Joining RabbitMQEventHandler handler thread\n");
- json_t *event = NULL;
+ json_t *event = NULL, *output = NULL;
char *event_text = NULL;
+ int count = 0, max = group_events ? 100 : 1;
while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
@@ -489,218 +496,239 @@ static void *janus_rabbitmqevh_handler(void *data) {
continue;
if(event == &exit_event)
break;
+ count = 0;
+ output = NULL;
+
+ while(TRUE) {
+ /* Handle event: just for fun, let's see how long it took for us to take care of this */
+ json_t *created = json_object_get(event, "timestamp");
+ if(created && json_is_integer(created)) {
+ gint64 then = json_integer_value(created);
+ gint64 now = janus_get_monotonic_time();
+ JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
+ }
- /* Handle event: just for fun, let's see how long it took for us to take care of this */
- json_t *created = json_object_get(event, "timestamp");
- if(created && json_is_integer(created)) {
- gint64 then = json_integer_value(created);
- gint64 now = janus_get_monotonic_time();
- JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
- }
+ /* Let's check what kind of event this is: we don't really do anything
+ * with it in this plugin, it's just to show how you can handle
+ * different types of events in an event handler. */
+ int type = json_integer_value(json_object_get(event, "type"));
+ switch(type) {
+ case JANUS_EVENT_TYPE_SESSION:
+ /* This is a session related event. The only info that is
+ * required is a name for the event itself: a "created"
+ * event may also contain transport info, in the form of
+ * the transport module that originated the session
+ * (e.g., "janus.transport.http") and an internal unique
+ * ID for the transport instance (which may be associated
+ * to a connection or anything else within the specifics
+ * of the transport module itself). Here's an example of
+ * a new session being created:
+ {
+ "type": 1,
+ "timestamp": 3583879627,
+ "session_id": 2004798115,
+ "event": {
+ "name": "created"
+ },
+ "transport": {
+ "transport": "janus.transport.http",
+ "id": "0x7fcb100008c0"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_HANDLE:
+ /* This is a handle related event. The only info that is provided
+ * are the name for the event itself and the package name of the
+ * plugin this handle refers to (e.g., "janus.plugin.echotest").
+ * Here's an example of a new handled being attached in a session
+ * to the EchoTest plugin:
+ {
+ "type": 2,
+ "timestamp": 3570304977,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "name": "attached",
+ "plugin: "janus.plugin.echotest"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_JSEP:
+ /* This is a JSEP/SDP related event. It provides information
+ * about an ongoing WebRTC negotiation, and so tells you
+ * about the SDP being sent/received, and who's sending it
+ * ("local" means Janus, "remote" means the user). Here's an
+ * example, where the user originated an offer towards Janus:
+ {
+ "type": 8,
+ "timestamp": 3570400208,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "owner": "remote",
+ "jsep": {
+ "type": "offer",
+ "sdp": "v=0[..]\r\n"
+ }
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_WEBRTC:
+ /* This is a WebRTC related event, and so the content of
+ * the event may vary quite a bit. In fact, you may be notified
+ * about ICE or DTLS states, or when a WebRTC PeerConnection
+ * goes up or down. Here are some examples, in no particular order:
+ {
+ "type": 16,
+ "timestamp": 3570416659,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "ice": "connecting",
+ "stream_id": 1,
+ "component_id": 1
+ }
+ }
+ *
+ {
+ "type": 16,
+ "timestamp": 3570637554,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "selected-pair": "[..]",
+ "stream_id": 1,
+ "component_id": 1
+ }
+ }
+ *
+ {
+ "type": 16,
+ "timestamp": 3570656112,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "dtls": "connected",
+ "stream_id": 1,
+ "component_id": 1
+ }
+ }
+ *
+ {
+ "type": 16,
+ "timestamp": 3570657237,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "connection": "webrtcup"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_MEDIA:
+ /* This is a media related event. This can contain different
+ * information about the health of a media session, or about
+ * what's going on in general (e.g., when Janus started/stopped
+ * receiving media of a certain type, or (TODO) when some media related
+ * statistics are available). Here's an example of Janus getting
+ * video from the peer for the first time, or after a second
+ * of no video at all (which would have triggered a "receiving": false):
+ {
+ "type": 32,
+ "timestamp": 3571078797,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "media": "video",
+ "receiving": "true"
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_PLUGIN:
+ /* This is a plugin related event. Since each plugin may
+ * provide info in a very custom way, the format of this event
+ * is in general very dynamic. You'll always find, though,
+ * an "event" object containing the package name of the
+ * plugin (e.g., "janus.plugin.echotest") and a "data"
+ * object that contains whatever the plugin decided to
+ * notify you about, that will always vary from plugin to
+ * plugin. Besides, notice that "session_id" and "handle_id"
+ * may or may not be present: when they are, you'll know
+ * the event has been triggered within the context of a
+ * specific handle session with the plugin; when they're
+ * not, the plugin sent an event out of context of a
+ * specific session it is handling. Here's an example:
+ {
+ "type": 64,
+ "timestamp": 3570336031,
+ "session_id": 2004798115,
+ "handle_id": 3708519405,
+ "event": {
+ "plugin": "janus.plugin.echotest",
+ "data": {
+ "audio_active": "true",
+ "video_active": "true",
+ "bitrate": 0
+ }
+ }
+ }
+ */
+ break;
+ case JANUS_EVENT_TYPE_TRANSPORT:
+ /* This is a transport related event (TODO). The syntax of
+ * the common format (transport specific data aside) is
+ * exactly the same as that of the plugin related events
+ * above, with a "transport" property instead of "plugin"
+ * to contain the transport package name. */
+ break;
+ case JANUS_EVENT_TYPE_CORE:
+ /* This is a core related event. This can contain different
+ * information about the health of the Janus instance, or
+ * more generically on some events in the Janus life cycle
+ * (e.g., when it's just been started or when a shutdown
+ * has been requested). Considering the heterogeneous nature
+ * of the information being reported, the content is always
+ * a JSON object (event). Core events are the only ones
+ * missing a session_id. Here's an example:
+ {
+ "type": 256,
+ "timestamp": 28381185382,
+ "event": {
+ "status": "started"
+ }
+ }
+ */
+ break;
+ default:
+ JANUS_LOG(LOG_WARN, "Unknown type of event '%d'\n", type);
+ break;
+ }
- /* Let's check what kind of event this is: we don't really do anything
- * with it in this plugin, it's just to show how you can handle
- * different types of events in an event handler. */
- int type = json_integer_value(json_object_get(event, "type"));
- switch(type) {
- case JANUS_EVENT_TYPE_SESSION:
- /* This is a session related event. The only info that is
- * required is a name for the event itself: a "created"
- * event may also contain transport info, in the form of
- * the transport module that originated the session
- * (e.g., "janus.transport.http") and an internal unique
- * ID for the transport instance (which may be associated
- * to a connection or anything else within the specifics
- * of the transport module itself). Here's an example of
- * a new session being created:
- {
- "type": 1,
- "timestamp": 3583879627,
- "session_id": 2004798115,
- "event": {
- "name": "created"
- },
- "transport": {
- "transport": "janus.transport.http",
- "id": "0x7fcb100008c0"
- }
- }
- */
+ if(!group_events) {
+ /* We're done here, we just need a single event */
+ output = event;
break;
- case JANUS_EVENT_TYPE_HANDLE:
- /* This is a handle related event. The only info that is provided
- * are the name for the event itself and the package name of the
- * plugin this handle refers to (e.g., "janus.plugin.echotest").
- * Here's an example of a new handled being attached in a session
- * to the EchoTest plugin:
- {
- "type": 2,
- "timestamp": 3570304977,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "name": "attached",
- "plugin: "janus.plugin.echotest"
- }
- }
- */
- break;
- case JANUS_EVENT_TYPE_JSEP:
- /* This is a JSEP/SDP related event. It provides information
- * about an ongoing WebRTC negotiation, and so tells you
- * about the SDP being sent/received, and who's sending it
- * ("local" means Janus, "remote" means the user). Here's an
- * example, where the user originated an offer towards Janus:
- {
- "type": 8,
- "timestamp": 3570400208,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "owner": "remote",
- "jsep": {
- "type": "offer",
- "sdp": "v=0[..]\r\n"
- }
- }
- }
- */
- break;
- case JANUS_EVENT_TYPE_WEBRTC:
- /* This is a WebRTC related event, and so the content of
- * the event may vary quite a bit. In fact, you may be notified
- * about ICE or DTLS states, or when a WebRTC PeerConnection
- * goes up or down. Here are some examples, in no particular order:
- {
- "type": 16,
- "timestamp": 3570416659,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "ice": "connecting",
- "stream_id": 1,
- "component_id": 1
- }
- }
- *
- {
- "type": 16,
- "timestamp": 3570637554,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "selected-pair": "[..]",
- "stream_id": 1,
- "component_id": 1
- }
- }
- *
- {
- "type": 16,
- "timestamp": 3570656112,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "dtls": "connected",
- "stream_id": 1,
- "component_id": 1
- }
- }
- *
- {
- "type": 16,
- "timestamp": 3570657237,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "connection": "webrtcup"
- }
- }
- */
- break;
- case JANUS_EVENT_TYPE_MEDIA:
- /* This is a media related event. This can contain different
- * information about the health of a media session, or about
- * what's going on in general (e.g., when Janus started/stopped
- * receiving media of a certain type, or (TODO) when some media related
- * statistics are available). Here's an example of Janus getting
- * video from the peer for the first time, or after a second
- * of no video at all (which would have triggered a "receiving": false):
- {
- "type": 32,
- "timestamp": 3571078797,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "media": "video",
- "receiving": "true"
- }
- }
- */
- break;
- case JANUS_EVENT_TYPE_PLUGIN:
- /* This is a plugin related event. Since each plugin may
- * provide info in a very custom way, the format of this event
- * is in general very dynamic. You'll always find, though,
- * an "event" object containing the package name of the
- * plugin (e.g., "janus.plugin.echotest") and a "data"
- * object that contains whatever the plugin decided to
- * notify you about, that will always vary from plugin to
- * plugin. Besides, notice that "session_id" and "handle_id"
- * may or may not be present: when they are, you'll know
- * the event has been triggered within the context of a
- * specific handle session with the plugin; when they're
- * not, the plugin sent an event out of context of a
- * specific session it is handling. Here's an example:
- {
- "type": 64,
- "timestamp": 3570336031,
- "session_id": 2004798115,
- "handle_id": 3708519405,
- "event": {
- "plugin": "janus.plugin.echotest",
- "data": {
- "audio_active": "true",
- "video_active": "true",
- "bitrate": 0
- }
- }
- }
- */
- break;
- case JANUS_EVENT_TYPE_TRANSPORT:
- /* This is a transport related event (TODO). The syntax of
- * the common format (transport specific data aside) is
- * exactly the same as that of the plugin related events
- * above, with a "transport" property instead of "plugin"
- * to contain the transport package name. */
- break;
- case JANUS_EVENT_TYPE_CORE:
- /* This is a core related event. This can contain different
- * information about the health of the Janus instance, or
- * more generically on some events in the Janus life cycle
- * (e.g., when it's just been started or when a shutdown
- * has been requested). Considering the heterogeneous nature
- * of the information being reported, the content is always
- * a JSON object (event). Core events are the only ones
- * missing a session_id. Here's an example:
- {
- "type": 256,
- "timestamp": 28381185382,
- "event": {
- "status": "started"
- }
- }
- */
+ }
+ /* If we got here, we're grouping */
+ if(output == NULL)
+ output = json_array();
+ json_array_append_new(output, event);
+ /* Never group more than a maximum number of events, though, or we might stay here forever */
+ count++;
+ if(count == max)
break;
- default:
- JANUS_LOG(LOG_WARN, "Unknown type of event '%d'\n", type);
+ event = g_async_queue_try_pop(events);
+ if(event == NULL || event == &exit_event)
break;
}
if(!g_atomic_int_get(&stopping)) {
/* Since this a simple plugin, it does the same for all events: so just convert to string... */
- event_text = json_dumps(event, json_format);
+ event_text = json_dumps(output, json_format);
amqp_basic_properties_t props;
props._flags = 0;
props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
@@ -708,16 +736,15 @@ static void *janus_rabbitmqevh_handler(void *data) {
amqp_bytes_t message = amqp_cstring_bytes(event_text);
int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, rmq_route_key, 0, 0, &props, message);
if(status != AMQP_STATUS_OK) {
- JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
+ JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Error publishing... %d, %s\n", status, amqp_error_string2(status));
}
free(event_text);
event_text = NULL;
}
- /* Cleanup */
/* Done, let's unref the event */
- json_decref(event);
- event = NULL;
+ json_decref(output);
+ output = NULL;
}
JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler handler thread\n");
return NULL;
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-voip/janus.git
More information about the Pkg-voip-commits
mailing list