[Pkg-voip-commits] [janus] 168/282: Fixes after review

Jonas Smedegaard dr at jones.dk
Wed Dec 20 21:53:38 UTC 2017


This is an automated email from the git hooks/post-receive script.

js pushed a commit to annotated tag debian/0.2.6-1
in repository janus.

commit 006e4837264b57d152afeeda41e2a18c23be427e
Author: Piter Konstantinov <pit.here at gmail.com>
Date:   Fri Nov 3 12:54:00 2017 +0300

    Fixes after review
---
 conf/janus.eventhandler.rabbitmqevh.cfg.sample |   3 +
 configure.ac                                   |   6 +-
 events/janus_rabbitmqevh.c                     | 481 +++++++++++++------------
 3 files changed, 260 insertions(+), 230 deletions(-)

diff --git a/conf/janus.eventhandler.rabbitmqevh.cfg.sample b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
index 40e6771..9adcbe0 100644
--- a/conf/janus.eventhandler.rabbitmqevh.cfg.sample
+++ b/conf/janus.eventhandler.rabbitmqevh.cfg.sample
@@ -6,6 +6,9 @@ events = all				; Comma separated list of the events mask you're interested
 							; in. Valid values are none, sessions, handles, jsep, webrtc,
 							; media, plugins, transports, core and all. By default we
 							; subscribe to everything (all)
+grouping = yes				; Whether events should be sent individually , or if it's ok
+							; to group them. The default is 'yes' to limit the number of
+							; messages
 host = 192.168.99.100		; The address of the RabbitMQ server
 ;port = 5672				; The port of the RabbitMQ server (5672 by default)
 ;username = guest			; Username to use to authenticate, if needed
diff --git a/configure.ac b/configure.ac
index c5e3ec6..9b520e6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -803,8 +803,8 @@ AM_COND_IF([ENABLE_SAMPLEEVH],
 	[echo "    Sample event handler:  yes"],
 	[echo "    Sample event handler:  no"])
 AM_COND_IF([ENABLE_RABBITMQEVH],
-  [echo "    RabbitMQ event handler:yes"],
-  [echo "    RabbitMQ event handler:no"])
+	[echo "    RabbitMQ event handler:yes"],
+	[echo "    RabbitMQ event handler:no"])
 AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
 	echo "JavaScript modules:        yes"
 	echo "    Using npm:             $NPM"
@@ -825,4 +825,4 @@ AM_COND_IF([ENABLE_JAVASCRIPT_MODULES], [
 
 echo
 echo "If this configuration is ok for you, do a 'make' to start building Janus. A 'make install' will install Janus and its plugins to the specified prefix. Finally, a 'make configs' will install some sample configuration files too (something you'll only want to do the first time, though)."
-echo
\ No newline at end of file
+echo
diff --git a/events/janus_rabbitmqevh.c b/events/janus_rabbitmqevh.c
index 78c837b..76bdecd 100644
--- a/events/janus_rabbitmqevh.c
+++ b/events/janus_rabbitmqevh.c
@@ -1,5 +1,5 @@
 /*! \file   janus_rabbitmqevh.c
- * \author Lorenzo Miniero <lorenzo at meetecho.com>
+ * \author Piter Konstantinov <pit.here at gmail.com>
  * \copyright GNU General Public License v3
  * \brief  Janus RabbitMQEventHandler plugin
  * \details  This is a trivial RabbitMQ event handler plugin for Janus
@@ -77,6 +77,7 @@ static void *janus_rabbitmqevh_handler(void *data);
 
 /* Queue of events to handle */
 static GAsyncQueue *events = NULL;
+static gboolean group_events = TRUE;
 static json_t exit_event;
 static void janus_rabbitmqevh_event_free(json_t *event) {
 	if(!event || event == &exit_event)
@@ -87,7 +88,7 @@ static void janus_rabbitmqevh_event_free(json_t *event) {
 /* JSON serialization options */
 static size_t json_format = JSON_INDENT(3) | JSON_PRESERVE_ORDER;
 
-/* FIXME: Should it be configable? */
+/* FIXME: Should it be configurable? */
 #define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"
 
 /* RabbitMQ session */
@@ -199,6 +200,11 @@ int janus_rabbitmqevh_init(const char *config_path) {
 		}
 	}
 
+	/* Is grouping of events ok? */
+	item = janus_config_get_item_drilldown(config, "general", "grouping");
+	if(item && item->value)
+		group_events = janus_is_true(item->value);
+
 	/* Handle configuration, starting from the server details */
 	item = janus_config_get_item_drilldown(config, "general", "host");
 	if(item && item->value)
@@ -230,7 +236,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
 	/* SSL config*/
 	item = janus_config_get_item_drilldown(config, "general", "ssl_enable");
 	if(!item || !item->value || !janus_is_true(item->value)) {
-		JANUS_LOG(LOG_INFO, "RabbitMQ SSL support disabled\n");
+		JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: RabbitMQ SSL support disabled\n");
 	} else {
 		ssl_enable = TRUE;
 		item = janus_config_get_item_drilldown(config, "general", "ssl_cacert");
@@ -253,13 +259,13 @@ int janus_rabbitmqevh_init(const char *config_path) {
 	/* Parse configuration */
 	item = janus_config_get_item_drilldown(config, "general", "route_key");
 	if(!item || !item->value) {
-		JANUS_LOG(LOG_FATAL, "Missing name of outgoing route_key for RabbitMQ...\n");
+		JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Missing name of outgoing route_key for RabbitMQ...\n");
 		goto error;
 	}
 	route_key = g_strdup(item->value);
 	item = janus_config_get_item_drilldown(config, "general", "exchange");
 	if(!item || !item->value) {
-		JANUS_LOG(LOG_INFO, "Missing name of outgoing exchange for RabbitMQ, using default\n");
+		JANUS_LOG(LOG_INFO, "RabbitMQEventHandler: Missing name of outgoing exchange for RabbitMQ, using default\n");
 	} else {
 		exchange = g_strdup(item->value);
 	}
@@ -273,11 +279,11 @@ int janus_rabbitmqevh_init(const char *config_path) {
 	rmq_conn = amqp_new_connection();
 	amqp_socket_t *socket = NULL;
 	int status;
-	JANUS_LOG(LOG_VERB, "Creating RabbitMQ socket...\n");
+	JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Creating RabbitMQ socket...\n");
 	if (ssl_enable) {
 		socket = amqp_ssl_socket_new(rmq_conn);
 		if(socket == NULL) {
-			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+			JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n");
 			goto error;
 		}
 		if(ssl_verify_peer) {
@@ -293,35 +299,35 @@ int janus_rabbitmqevh_init(const char *config_path) {
 		if(ssl_cacert_file) {
 			status = amqp_ssl_socket_set_cacert(socket, ssl_cacert_file);
 			if(status != AMQP_STATUS_OK) {
-				JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
+				JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error setting CA certificate... (%s)\n", amqp_error_string2(status));
 				goto error;
 			}
 		}
 		if(ssl_cert_file && ssl_key_file) {
 			amqp_ssl_socket_set_key(socket, ssl_cert_file, ssl_key_file);
 			if(status != AMQP_STATUS_OK) {
-				JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
+				JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error setting key... (%s)\n", amqp_error_string2(status));
 				goto error;
 			}
 		}
 	} else {
 		socket = amqp_tcp_socket_new(rmq_conn);
 		if(socket == NULL) {
-			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error creating socket...\n");
+			JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error creating socket...\n");
 			goto error;
 		}
 	}
 
-	JANUS_LOG(LOG_VERB, "Connecting to RabbitMQ server...\n");
+	JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Connecting to RabbitMQ server...\n");
 	status = amqp_socket_open(socket, rmqhost, rmqport);
 	if(status != AMQP_STATUS_OK) {
 		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening socket... (%s)\n", amqp_error_string2(status));
 		goto error;
 	}
-	JANUS_LOG(LOG_VERB, "Logging in...\n");
+	JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Logging in...\n");
 	amqp_rpc_reply_t result = amqp_login(rmq_conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password);
 	if(result.reply_type != AMQP_RESPONSE_NORMAL) {
-		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+		JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error logging in... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
 		goto error;
 	}
 	rmq_channel = 1;
@@ -329,17 +335,17 @@ int janus_rabbitmqevh_init(const char *config_path) {
 	amqp_channel_open(rmq_conn, rmq_channel);
 	result = amqp_get_rpc_reply(rmq_conn);
 	if(result.reply_type != AMQP_RESPONSE_NORMAL) {
-		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+		JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error opening channel... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
 		goto error;
 	}
 	rmq_exchange = amqp_empty_bytes;
 	if(exchange != NULL) {
-		JANUS_LOG(LOG_VERB, "Declaring exchange...\n");
+		JANUS_LOG(LOG_VERB, "RabbitMQEventHandler: Declaring exchange...\n");
 		rmq_exchange = amqp_cstring_bytes(exchange);
 		amqp_exchange_declare(rmq_conn, rmq_channel, rmq_exchange, amqp_cstring_bytes(JANUS_RABBITMQ_EXCHANGE_TYPE), 0, 0, 0, 0, amqp_empty_table);
 		result = amqp_get_rpc_reply(rmq_conn);
 		if(result.reply_type != AMQP_RESPONSE_NORMAL) {
-			JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+			JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error diclaring exchange... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
 			goto error;
 		}
 	}
@@ -348,7 +354,7 @@ int janus_rabbitmqevh_init(const char *config_path) {
 	amqp_queue_declare(rmq_conn, rmq_channel, rmq_route_key, 0, 0, 0, 0, amqp_empty_table);
 	result = amqp_get_rpc_reply(rmq_conn);
 	if(result.reply_type != AMQP_RESPONSE_NORMAL) {
-		JANUS_LOG(LOG_FATAL, "Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
+		JANUS_LOG(LOG_FATAL, "RabbitMQEventHandler: Can't connect to RabbitMQ server: error declaring queue... %s, %s\n", amqp_error_string2(result.library_error), amqp_method_name(result.reply.id));
 		goto error;
 	}
 
@@ -357,10 +363,10 @@ int janus_rabbitmqevh_init(const char *config_path) {
 	g_atomic_int_set(&initialized, 1);
 
 	GError *error = NULL;
-	handler_thread = g_thread_try_new("janus sampleevh handler", janus_rabbitmqevh_handler, NULL, &error);
+	handler_thread = g_thread_try_new("janus rabbitmqevh handler", janus_rabbitmqevh_handler, NULL, &error);
 	if(error != NULL) {
 		g_atomic_int_set(&initialized, 0);
-		JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the SampleEventHandler handler thread...\n", error->code, error->message ? error->message : "??");
+		JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQEventHandler handler thread...\n", error->code, error->message ? error->message : "??");
 		goto error;
 	}
 
@@ -479,8 +485,9 @@ void janus_rabbitmqevh_incoming_event(json_t *event) {
 /* Thread to handle incoming events */
 static void *janus_rabbitmqevh_handler(void *data) {
 	JANUS_LOG(LOG_VERB, "Joining RabbitMQEventHandler handler thread\n");
-	json_t *event = NULL;
+	json_t *event = NULL, *output = NULL;
 	char *event_text = NULL;
+	int count = 0, max = group_events ? 100 : 1;
 
 	while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
 
@@ -489,218 +496,239 @@ static void *janus_rabbitmqevh_handler(void *data) {
 			continue;
 		if(event == &exit_event)
 			break;
+		count = 0;
+		output = NULL;
+
+		while(TRUE) {
+			/* Handle event: just for fun, let's see how long it took for us to take care of this */
+			json_t *created = json_object_get(event, "timestamp");
+			if(created && json_is_integer(created)) {
+				gint64 then = json_integer_value(created);
+				gint64 now = janus_get_monotonic_time();
+				JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
+			}
 
-		/* Handle event: just for fun, let's see how long it took for us to take care of this */
-		json_t *created = json_object_get(event, "timestamp");
-		if(created && json_is_integer(created)) {
-			gint64 then = json_integer_value(created);
-			gint64 now = janus_get_monotonic_time();
-			JANUS_LOG(LOG_DBG, "Handled event after %"SCNu64" us\n", now-then);
-		}
+			/* Let's check what kind of event this is: we don't really do anything
+			 * with it in this plugin, it's just to show how you can handle
+			 * different types of events in an event handler. */
+			int type = json_integer_value(json_object_get(event, "type"));
+			switch(type) {
+				case JANUS_EVENT_TYPE_SESSION:
+					/* This is a session related event. The only info that is
+					 * required is a name for the event itself: a "created"
+					 * event may also contain transport info, in the form of
+					 * the transport module that originated the session
+					 * (e.g., "janus.transport.http") and an internal unique
+					 * ID for the transport instance (which may be associated
+					 * to a connection or anything else within the specifics
+					 * of the transport module itself). Here's an example of
+					 * a new session being created:
+						{
+						   "type": 1,
+						   "timestamp": 3583879627,
+						   "session_id": 2004798115,
+						   "event": {
+							  "name": "created"
+						   },
+						   "transport": {
+						      "transport": "janus.transport.http",
+						      "id": "0x7fcb100008c0"
+						   }
+						}
+					*/
+					break;
+				case JANUS_EVENT_TYPE_HANDLE:
+					/* This is a handle related event. The only info that is provided
+					 * are the name for the event itself and the package name of the
+					 * plugin this handle refers to (e.g., "janus.plugin.echotest").
+					 * Here's an example of a new handled being attached in a session
+					 * to the EchoTest plugin:
+						{
+						   "type": 2,
+						   "timestamp": 3570304977,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "name": "attached",
+							  "plugin: "janus.plugin.echotest"
+						   }
+						}
+					*/
+					break;
+				case JANUS_EVENT_TYPE_JSEP:
+					/* This is a JSEP/SDP related event. It provides information
+					 * about an ongoing WebRTC negotiation, and so tells you
+					 * about the SDP being sent/received, and who's sending it
+					 * ("local" means Janus, "remote" means the user). Here's an
+					 * example, where the user originated an offer towards Janus:
+						{
+						   "type": 8,
+						   "timestamp": 3570400208,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "owner": "remote",
+							  "jsep": {
+								 "type": "offer",
+								 "sdp": "v=0[..]\r\n"
+							  }
+						   }
+						}
+					*/
+					break;
+				case JANUS_EVENT_TYPE_WEBRTC:
+					/* This is a WebRTC related event, and so the content of
+					 * the event may vary quite a bit. In fact, you may be notified
+					 * about ICE or DTLS states, or when a WebRTC PeerConnection
+					 * goes up or down. Here are some examples, in no particular order:
+						{
+						   "type": 16,
+						   "timestamp": 3570416659,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "ice": "connecting",
+							  "stream_id": 1,
+							  "component_id": 1
+						   }
+						}
+					 *
+						{
+						   "type": 16,
+						   "timestamp": 3570637554,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "selected-pair": "[..]",
+							  "stream_id": 1,
+							  "component_id": 1
+						   }
+						}
+					 *
+						{
+						   "type": 16,
+						   "timestamp": 3570656112,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "dtls": "connected",
+							  "stream_id": 1,
+							  "component_id": 1
+						   }
+						}
+					 *
+						{
+						   "type": 16,
+						   "timestamp": 3570657237,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "connection": "webrtcup"
+						   }
+						}
+					*/
+					break;
+				case JANUS_EVENT_TYPE_MEDIA:
+					/* This is a media related event. This can contain different
+					 * information about the health of a media session, or about
+					 * what's going on in general (e.g., when Janus started/stopped
+					 * receiving media of a certain type, or (TODO) when some media related
+					 * statistics are available). Here's an example of Janus getting
+					 * video from the peer for the first time, or after a second
+					 * of no video at all (which would have triggered a "receiving": false):
+						{
+						   "type": 32,
+						   "timestamp": 3571078797,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "media": "video",
+							  "receiving": "true"
+						   }
+						}
+					*/
+					break;
+				case JANUS_EVENT_TYPE_PLUGIN:
+					/* This is a plugin related event. Since each plugin may
+					 * provide info in a very custom way, the format of this event
+					 * is in general very dynamic. You'll always find, though,
+					 * an "event" object containing the package name of the
+					 * plugin (e.g., "janus.plugin.echotest") and a "data"
+					 * object that contains whatever the plugin decided to
+					 * notify you about, that will always vary from plugin to
+					 * plugin. Besides, notice that "session_id" and "handle_id"
+					 * may or may not be present: when they are, you'll know
+					 * the event has been triggered within the context of a
+					 * specific handle session with the plugin; when they're
+					 * not, the plugin sent an event out of context of a
+					 * specific session it is handling. Here's an example:
+						{
+						   "type": 64,
+						   "timestamp": 3570336031,
+						   "session_id": 2004798115,
+						   "handle_id": 3708519405,
+						   "event": {
+							  "plugin": "janus.plugin.echotest",
+							  "data": {
+								 "audio_active": "true",
+								 "video_active": "true",
+								 "bitrate": 0
+							  }
+						   }
+						}
+					*/
+					break;
+				case JANUS_EVENT_TYPE_TRANSPORT:
+					/* This is a transport related event (TODO). The syntax of
+					 * the common format (transport specific data aside) is
+					 * exactly the same as that of the plugin related events
+					 * above, with a "transport" property instead of "plugin"
+					 * to contain the transport package name. */
+					break;
+				case JANUS_EVENT_TYPE_CORE:
+					/* This is a core related event. This can contain different
+					 * information about the health of the Janus instance, or
+					 * more generically on some events in the Janus life cycle
+					 * (e.g., when it's just been started or when a shutdown
+					 * has been requested). Considering the heterogeneous nature
+					 * of the information being reported, the content is always
+					 * a JSON object (event). Core events are the only ones
+					 * missing a session_id. Here's an example:
+						{
+						   "type": 256,
+						   "timestamp": 28381185382,
+						   "event": {
+							  "status": "started"
+						   }
+						}
+					*/
+					break;
+				default:
+					JANUS_LOG(LOG_WARN, "Unknown type of event '%d'\n", type);
+					break;
+			}
 
-		/* Let's check what kind of event this is: we don't really do anything
-		 * with it in this plugin, it's just to show how you can handle
-		 * different types of events in an event handler. */
-		int type = json_integer_value(json_object_get(event, "type"));
-		switch(type) {
-			case JANUS_EVENT_TYPE_SESSION:
-				/* This is a session related event. The only info that is
-				 * required is a name for the event itself: a "created"
-				 * event may also contain transport info, in the form of
-				 * the transport module that originated the session
-				 * (e.g., "janus.transport.http") and an internal unique
-				 * ID for the transport instance (which may be associated
-				 * to a connection or anything else within the specifics
-				 * of the transport module itself). Here's an example of
-				 * a new session being created:
-					{
-					   "type": 1,
-					   "timestamp": 3583879627,
-					   "session_id": 2004798115,
-					   "event": {
-						  "name": "created"
-					   },
-					   "transport": {
-					      "transport": "janus.transport.http",
-					      "id": "0x7fcb100008c0"
-					   }
-					}
-				*/
+			if(!group_events) {
+				/* We're done here, we just need a single event */
+				output = event;
 				break;
-			case JANUS_EVENT_TYPE_HANDLE:
-				/* This is a handle related event. The only info that is provided
-				 * are the name for the event itself and the package name of the
-				 * plugin this handle refers to (e.g., "janus.plugin.echotest").
-				 * Here's an example of a new handled being attached in a session
-				 * to the EchoTest plugin:
-					{
-					   "type": 2,
-					   "timestamp": 3570304977,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "name": "attached",
-						  "plugin: "janus.plugin.echotest"
-					   }
-					}
-				*/
-				break;
-			case JANUS_EVENT_TYPE_JSEP:
-				/* This is a JSEP/SDP related event. It provides information
-				 * about an ongoing WebRTC negotiation, and so tells you
-				 * about the SDP being sent/received, and who's sending it
-				 * ("local" means Janus, "remote" means the user). Here's an
-				 * example, where the user originated an offer towards Janus:
-					{
-					   "type": 8,
-					   "timestamp": 3570400208,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "owner": "remote",
-						  "jsep": {
-							 "type": "offer",
-							 "sdp": "v=0[..]\r\n"
-						  }
-					   }
-					}
-				*/
-				break;
-			case JANUS_EVENT_TYPE_WEBRTC:
-				/* This is a WebRTC related event, and so the content of
-				 * the event may vary quite a bit. In fact, you may be notified
-				 * about ICE or DTLS states, or when a WebRTC PeerConnection
-				 * goes up or down. Here are some examples, in no particular order:
-					{
-					   "type": 16,
-					   "timestamp": 3570416659,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "ice": "connecting",
-						  "stream_id": 1,
-						  "component_id": 1
-					   }
-					}
-				 *
-					{
-					   "type": 16,
-					   "timestamp": 3570637554,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "selected-pair": "[..]",
-						  "stream_id": 1,
-						  "component_id": 1
-					   }
-					}
-				 *
-					{
-					   "type": 16,
-					   "timestamp": 3570656112,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "dtls": "connected",
-						  "stream_id": 1,
-						  "component_id": 1
-					   }
-					}
-				 *
-					{
-					   "type": 16,
-					   "timestamp": 3570657237,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "connection": "webrtcup"
-					   }
-					}
-				*/
-				break;
-			case JANUS_EVENT_TYPE_MEDIA:
-				/* This is a media related event. This can contain different
-				 * information about the health of a media session, or about
-				 * what's going on in general (e.g., when Janus started/stopped
-				 * receiving media of a certain type, or (TODO) when some media related
-				 * statistics are available). Here's an example of Janus getting
-				 * video from the peer for the first time, or after a second
-				 * of no video at all (which would have triggered a "receiving": false):
-					{
-					   "type": 32,
-					   "timestamp": 3571078797,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "media": "video",
-						  "receiving": "true"
-					   }
-					}
-				*/
-				break;
-			case JANUS_EVENT_TYPE_PLUGIN:
-				/* This is a plugin related event. Since each plugin may
-				 * provide info in a very custom way, the format of this event
-				 * is in general very dynamic. You'll always find, though,
-				 * an "event" object containing the package name of the
-				 * plugin (e.g., "janus.plugin.echotest") and a "data"
-				 * object that contains whatever the plugin decided to
-				 * notify you about, that will always vary from plugin to
-				 * plugin. Besides, notice that "session_id" and "handle_id"
-				 * may or may not be present: when they are, you'll know
-				 * the event has been triggered within the context of a
-				 * specific handle session with the plugin; when they're
-				 * not, the plugin sent an event out of context of a
-				 * specific session it is handling. Here's an example:
-					{
-					   "type": 64,
-					   "timestamp": 3570336031,
-					   "session_id": 2004798115,
-					   "handle_id": 3708519405,
-					   "event": {
-						  "plugin": "janus.plugin.echotest",
-						  "data": {
-							 "audio_active": "true",
-							 "video_active": "true",
-							 "bitrate": 0
-						  }
-					   }
-					}
-				*/
-				break;
-			case JANUS_EVENT_TYPE_TRANSPORT:
-				/* This is a transport related event (TODO). The syntax of
-				 * the common format (transport specific data aside) is
-				 * exactly the same as that of the plugin related events
-				 * above, with a "transport" property instead of "plugin"
-				 * to contain the transport package name. */
-				break;
-			case JANUS_EVENT_TYPE_CORE:
-				/* This is a core related event. This can contain different
-				 * information about the health of the Janus instance, or
-				 * more generically on some events in the Janus life cycle
-				 * (e.g., when it's just been started or when a shutdown
-				 * has been requested). Considering the heterogeneous nature
-				 * of the information being reported, the content is always
-				 * a JSON object (event). Core events are the only ones
-				 * missing a session_id. Here's an example:
-					{
-					   "type": 256,
-					   "timestamp": 28381185382,
-					   "event": {
-						  "status": "started"
-					   }
-					}
-				*/
+			}
+			/* If we got here, we're grouping */
+			if(output == NULL)
+				output = json_array();
+			json_array_append_new(output, event);
+			/* Never group more than a maximum number of events, though, or we might stay here forever */
+			count++;
+			if(count == max)
 				break;
-			default:
-				JANUS_LOG(LOG_WARN, "Unknown type of event '%d'\n", type);
+			event = g_async_queue_try_pop(events);
+			if(event == NULL || event == &exit_event)
 				break;
 		}
 
 		if(!g_atomic_int_get(&stopping)) {
 			/* Since this a simple plugin, it does the same for all events: so just convert to string... */
-			event_text = json_dumps(event, json_format);
+			event_text = json_dumps(output, json_format);
 			amqp_basic_properties_t props;
 			props._flags = 0;
 			props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
@@ -708,16 +736,15 @@ static void *janus_rabbitmqevh_handler(void *data) {
 			amqp_bytes_t message = amqp_cstring_bytes(event_text);
 			int status = amqp_basic_publish(rmq_conn, rmq_channel, rmq_exchange, rmq_route_key, 0, 0, &props, message);
 			if(status != AMQP_STATUS_OK) {
-				JANUS_LOG(LOG_ERR, "Error publishing... %d, %s\n", status, amqp_error_string2(status));
+				JANUS_LOG(LOG_ERR, "RabbitMQEventHandler: Error publishing... %d, %s\n", status, amqp_error_string2(status));
 			}
 			free(event_text);
 			event_text = NULL;
 		}
 
-		/* Cleanup */
 		/* Done, let's unref the event */
-		json_decref(event);
-		event = NULL;
+		json_decref(output);
+		output = NULL;
 	}
 	JANUS_LOG(LOG_VERB, "Leaving RabbitMQEventHandler handler thread\n");
 	return NULL;

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-voip/janus.git



More information about the Pkg-voip-commits mailing list