[Debian-ha-svn-commits] [SCM] corosync Debian packaging branch,	experimental, updated. debian/1.2.7-1
    Guido Günther 
    agx at sigxcpu.org
       
    Fri Aug 13 14:38:09 UTC 2010
    
    
  
The following commit has been merged in the experimental branch:
commit 1c157d6ac5b0a3cd847154f596d22691b4c17314
Author: Guido Günther <agx at sigxcpu.org>
Date:   Fri Aug 13 09:29:48 2010 +0200
    New upstream version 1.2.7
diff --git a/CHANGELOG b/CHANGELOG
index 49e72fe..01b307e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,4 +1,103 @@
 ------------------------------------------------------------------------
+r3008 | sdake | 2010-07-27 14:37:46 -0700 (Tue, 27 Jul 2010) | 6 lines
+
+Merge trunk revision 3007:
+r3007 | sdake | 2010-07-27 14:32:39 -0700 (Tue, 27 Jul 2010) | 2 lines
+
+Change trunk version to 1.2.7.
+
+
+------------------------------------------------------------------------
+r3006 | sdake | 2010-07-27 12:08:29 -0700 (Tue, 27 Jul 2010) | 7 lines
+
+Merge trunk revision 3005:
+r3005 | sdake | 2010-07-27 12:00:37 -0700 (Tue, 27 Jul 2010) | 3 lines
+
+Remove consensus check for two node cluster cases which can have smaller
+consensus values.  Document in man page the behavior of consensus.
+
+
+------------------------------------------------------------------------
+r3004 | asalkeld | 2010-07-21 16:16:41 -0700 (Wed, 21 Jul 2010) | 5 lines
+
+Merge trunk revision 3002:
+r3002 | sdake | 2010-07-22 06:48:40 +1000 (Thu, 22 Jul 2010) | 2 lines
+Fix merge error with revision 3001.
+
+
+------------------------------------------------------------------------
+r3003 | asalkeld | 2010-07-21 16:13:42 -0700 (Wed, 21 Jul 2010) | 6 lines
+
+Merge trunk revision 3001:
+r3001 | sdake | 2010-07-22 03:03:36 +1000 (Thu, 22 Jul 2010) | 3 lines
+Fix problem where flow control could lock up ipc under very heavy load in very
+rare circumstances.
+
+
+------------------------------------------------------------------------
+r3000 | sdake | 2010-07-19 13:20:25 -0700 (Mon, 19 Jul 2010) | 12 lines
+
+Merge trunk revision 2995:
+r2995 | asalkeld | 2010-07-16 21:59:40 -0700 (Fri, 16 Jul 2010) | 9 lines
+
+SYNC: always call sync_aborted() in sync_confchg_fn().
+
+1) sync_callbacks.sync_abort can be null.
+2) sync_processing is set to 0 after syncv1 is done.
+   Then syncv2 processing is down. If we get a config change
+   after syncv1 is down, but before syncv2 is done then it won't
+   get aborted.
+
+
+------------------------------------------------------------------------
+r2999 | sdake | 2010-07-19 13:19:30 -0700 (Mon, 19 Jul 2010) | 6 lines
+
+Merge trunk revision 2991:
+r2991 | asalkeld | 2010-07-15 19:08:54 -0700 (Thu, 15 Jul 2010) | 2 lines
+
+ SYNCV2: reset the my_memb_determine_ring_id in sync_v2_memb_list_abort()
+
+
+------------------------------------------------------------------------
+r2998 | sdake | 2010-07-19 08:29:30 -0700 (Mon, 19 Jul 2010) | 7 lines
+
+Merge trunk revision 2997:
+r2997 | fabbione | 2010-07-18 23:36:48 -0700 (Sun, 18 Jul 2010) | 4 lines
+
+Fix logging_daemon config parser code.
+
+Resolves: rhbz#615203
+
+------------------------------------------------------------------------
+r2989 | sdake | 2010-07-14 11:36:53 -0700 (Wed, 14 Jul 2010) | 8 lines
+
+Merge trunk revision 2988:
+r2988 | sdake | 2010-07-14 11:35:36 -0700 (Wed, 14 Jul 2010) | 4 lines
+
+Remove reset of token timeout on retransmitted token reception.  The timer
+should only be reset when a real token is received or membership protocol
+could run into problems with certain timing parameters.
+
+
+------------------------------------------------------------------------
+r2987 | sdake | 2010-07-07 14:45:01 -0700 (Wed, 07 Jul 2010) | 6 lines
+
+Merge trunk revision 2986:
+r2986 | sdake | 2010-07-07 14:43:15 -0700 (Wed, 07 Jul 2010) | 2 lines
+
+Speed up IPC connection process.
+
+
+------------------------------------------------------------------------
+r2985 | sdake | 2010-07-03 15:00:16 -0700 (Sat, 03 Jul 2010) | 6 lines
+
+Merge trunk revision 2984:
+r2984 | sdake | 2010-07-03 14:54:22 -0700 (Sat, 03 Jul 2010) | 2 lines
+
+Fix fail list fault that occurs in very rare circumstances.
+
+
+------------------------------------------------------------------------
 r2982 | sdake | 2010-06-30 12:47:14 -0700 (Wed, 30 Jun 2010) | 6 lines
 
 Merge trunk revision 2980:
diff --git a/configure b/configure
index 35df3dd..5130e7f 100755
--- a/configure
+++ b/configure
@@ -1,6 +1,6 @@
 #! /bin/sh
 # Guess values for system-dependent variables and create Makefiles.
-# Generated by GNU Autoconf 2.63 for corosync 1.2.6.
+# Generated by GNU Autoconf 2.63 for corosync 1.2.7.
 #
 # Report bugs to <openais at lists.osdl.org>.
 #
@@ -596,8 +596,8 @@ SHELL=${CONFIG_SHELL-/bin/sh}
 # Identity of this package.
 PACKAGE_NAME='corosync'
 PACKAGE_TARNAME='corosync'
-PACKAGE_VERSION='1.2.6'
-PACKAGE_STRING='corosync 1.2.6'
+PACKAGE_VERSION='1.2.7'
+PACKAGE_STRING='corosync 1.2.7'
 PACKAGE_BUGREPORT='openais at lists.osdl.org'
 
 ac_unique_file="lib/coroipcc.c"
@@ -1355,7 +1355,7 @@ if test "$ac_init_help" = "long"; then
   # Omit some internal or obsolete options to make the list less imposing.
   # This message is too long to be a string in the A/UX 3.1 sh.
   cat <<_ACEOF
-\`configure' configures corosync 1.2.6 to adapt to many kinds of systems.
+\`configure' configures corosync 1.2.7 to adapt to many kinds of systems.
 
 Usage: $0 [OPTION]... [VAR=VALUE]...
 
@@ -1425,7 +1425,7 @@ fi
 
 if test -n "$ac_init_help"; then
   case $ac_init_help in
-     short | recursive ) echo "Configuration of corosync 1.2.6:";;
+     short | recursive ) echo "Configuration of corosync 1.2.7:";;
    esac
   cat <<\_ACEOF
 
@@ -1536,7 +1536,7 @@ fi
 test -n "$ac_init_help" && exit $ac_status
 if $ac_init_version; then
   cat <<\_ACEOF
-corosync configure 1.2.6
+corosync configure 1.2.7
 generated by GNU Autoconf 2.63
 
 Copyright (C) 1992, 1993, 1994, 1995, 1996, 1998, 1999, 2000, 2001,
@@ -1550,7 +1550,7 @@ cat >config.log <<_ACEOF
 This file contains any messages produced by compilers while
 running configure, to aid debugging if configure makes a mistake.
 
-It was created by corosync $as_me 1.2.6, which was
+It was created by corosync $as_me 1.2.7, which was
 generated by GNU Autoconf 2.63.  Invocation command line was
 
   $ $0 $@
@@ -2399,7 +2399,7 @@ fi
 
 # Define the identity of the package.
  PACKAGE='corosync'
- VERSION='1.2.6'
+ VERSION='1.2.7'
 
 
 cat >>confdefs.h <<_ACEOF
@@ -2538,7 +2538,7 @@ ac_compiler_gnu=$ac_cv_c_compiler_gnu
 # Define SVN revision
 
 cat >>confdefs.h <<\_ACEOF
-#define SVN_REVISION "2982"
+#define SVN_REVISION "3008"
 _ACEOF
 
 
@@ -11278,7 +11278,7 @@ exec 6>&1
 # report actual input values of CONFIG_FILES etc. instead of their
 # values after options handling.
 ac_log="
-This file was extended by corosync $as_me 1.2.6, which was
+This file was extended by corosync $as_me 1.2.7, which was
 generated by GNU Autoconf 2.63.  Invocation command line was
 
   CONFIG_FILES    = $CONFIG_FILES
@@ -11345,7 +11345,7 @@ Report bugs to <bug-autoconf at gnu.org>."
 _ACEOF
 cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
 ac_cs_version="\\
-corosync config.status 1.2.6
+corosync config.status 1.2.7
 configured by $0, generated by GNU Autoconf 2.63,
   with options \\"`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`\\"
 
diff --git a/configure.ac b/configure.ac
index cbe26c2..7079583 100644
--- a/configure.ac
+++ b/configure.ac
@@ -4,7 +4,7 @@
 # bootstrap / init
 AC_PREREQ([2.61])
 
-AC_INIT([corosync], [1.2.6], [openais at lists.osdl.org])
+AC_INIT([corosync], [1.2.7], [openais at lists.osdl.org])
 AM_INIT_AUTOMAKE([-Wno-portability])
 
 AC_CONFIG_SRCDIR([lib/coroipcc.c])
diff --git a/exec/coroipcs.c b/exec/coroipcs.c
index 923ed87..aca9053 100644
--- a/exec/coroipcs.c
+++ b/exec/coroipcs.c
@@ -95,6 +95,9 @@
 #define MSG_SEND_LOCKED		0
 #define MSG_SEND_UNLOCKED	1
 
+#define POLL_STATE_IN		1
+#define POLL_STATE_INOUT	2
+
 static struct coroipcs_init_state_v2 *api = NULL;
 
 DECLARE_LIST_INIT (conn_info_list_head);
@@ -141,13 +144,10 @@ struct conn_info {
 	pthread_attr_t thread_attr;
 	unsigned int service;
 	enum conn_state state;
-	int notify_flow_control_enabled;
-	int flow_control_state;
 	int refcount;
 	hdb_handle_t stats_handle;
 #if _POSIX_THREAD_PROCESS_SHARED < 1
 	key_t semkey;
-	int semid;
 #endif
 	unsigned int pending_semops;
 	pthread_mutex_t mutex;
@@ -166,6 +166,7 @@ struct conn_info {
 	unsigned int setup_bytes_read;
 	struct list_head zcb_mapped_list_head;
 	char *sending_allowed_private_data[64];
+	int poll_state;
 };
 
 static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
@@ -221,34 +222,6 @@ static void dummy_stats_increment_value (
 {
 }
 
-static void sem_post_exit_thread (struct conn_info *conn_info)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-	int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semop:
-	res = sem_post (&conn_info->control_buffer->sem0);
-	if (res == -1 && errno == EINTR) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	}
-#else
-	sop.sem_num = 0;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	}
-#endif
-}
-
 static int
 memory_map (
 	const char *path,
@@ -383,6 +356,34 @@ circular_memory_unmap (void *buf, size_t bytes)
 	return (res);
 }
 
+static void flow_control_state_set (
+	struct conn_info *conn_info,
+	int flow_control_state)
+{
+	if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
+		return;
+	}
+	if (flow_control_state == 0) {
+		log_printf (LOGSYS_LEVEL_DEBUG,
+			"Disabling flow control for %d\n",
+			conn_info->client_pid);
+	} else
+	if (flow_control_state == 1) {
+		log_printf (LOGSYS_LEVEL_DEBUG,
+			"Enabling flow control for %d\n",
+			conn_info->client_pid);
+	}
+
+
+	conn_info->control_buffer->flow_control_enabled = flow_control_state;
+	api->stats_update_value (conn_info->stats_handle,
+		"flow_control",
+		&flow_control_state,
+		sizeof(flow_control_state));
+	api->stats_increment_value (conn_info->stats_handle,
+		"flow_control_count");
+}
+
 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
 {
 	unsigned int res;
@@ -517,7 +518,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	}
 
 	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
-		sem_post_exit_thread (conn_info);
+		ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
 		return (0);
 	}
 
@@ -546,11 +547,12 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	pthread_mutex_unlock (&conn_info->mutex);
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-	sem_destroy (&conn_info->control_buffer->sem0);
-	sem_destroy (&conn_info->control_buffer->sem1);
-	sem_destroy (&conn_info->control_buffer->sem2);
+	sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+	sem_destroy (&conn_info->control_buffer->sem_request);
+	sem_destroy (&conn_info->control_buffer->sem_response);
+	sem_destroy (&conn_info->control_buffer->sem_dispatch);
 #else
-	semctl (conn_info->semid, 0, IPC_RMID);
+	semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
 #endif
 	/*
 	 * Destroy shared memory segment and semaphore
@@ -653,14 +655,12 @@ static inline void zerocopy_operations_process (
 static void *pthread_ipc_consumer (void *conn)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
 	int res;
 	coroipc_request_header_t *header;
 	coroipc_response_header_t coroipc_response_header;
 	int send_ok;
 	unsigned int new_message;
+	int sem_value = 0;
 
 #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
 	if (api->sched_policy != 0) {
@@ -670,43 +670,28 @@ static void *pthread_ipc_consumer (void *conn)
 #endif
 
 	for (;;) {
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semwait:
-		res = sem_wait (&conn_info->control_buffer->sem0);
+		ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
 		if (ipc_thread_active (conn_info) == 0) {
 			coroipcs_refcount_dec (conn_info);
 			pthread_exit (0);
 		}
-		if ((res == -1) && (errno == EINTR)) {
-			api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-			goto retry_semwait;
-		}
-#else
 
-		sop.sem_num = 0;
-		sop.sem_op = -1;
-		sop.sem_flg = 0;
-retry_semop:
-		res = semop (conn_info->semid, &sop, 1);
-		if (ipc_thread_active (conn_info) == 0) {
-			coroipcs_refcount_dec (conn_info);
-			pthread_exit (0);
-		}
-		if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-			api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-			goto retry_semop;
-		} else
-		if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-			coroipcs_refcount_dec (conn_info);
-			pthread_exit (0);
-		}
-#endif
+		outq_flush (conn_info);
 
+		ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value);
+		if (sem_value > 0) {
+		
+			res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST);
+		} else {
+			continue;
+		}
+	
 		zerocopy_operations_process (conn_info, &header, &new_message);
 		/*
 		 * There is no new message to process, continue for loop
 		 */
 		if (new_message == 0) {
+printf ("continuing\n");
 			continue;
 		}
 
@@ -738,7 +723,6 @@ retry_semop:
 			/*
 			 * Overload, tell library to retry
 			 */
-			api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
 			coroipc_response_header.size = sizeof (coroipc_response_header_t);
 			coroipc_response_header.id = 0;
 			coroipc_response_header.error = CS_ERR_TRY_AGAIN;
@@ -928,7 +912,7 @@ static void ipc_disconnect (struct conn_info *conn_info)
 	conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
 	pthread_mutex_unlock (&conn_info->mutex);
 
-	sem_post_exit_thread (conn_info);
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
 }
 
 static int conn_info_create (int fd)
@@ -945,6 +929,7 @@ static int conn_info_create (int fd)
 	conn_info->client_pid = 0;
 	conn_info->service = SOCKET_SERVICE_INIT;
 	conn_info->state = CONN_STATE_THREAD_INACTIVE;
+	conn_info->poll_state = POLL_STATE_IN;
 	list_init (&conn_info->outq_head);
 	list_init (&conn_info->list);
 	list_init (&conn_info->zcb_mapped_list_head);
@@ -1103,11 +1088,12 @@ void coroipcs_ipc_exit (void)
 		ipc_disconnect (conn_info);
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-		sem_destroy (&conn_info->control_buffer->sem0);
-		sem_destroy (&conn_info->control_buffer->sem1);
-		sem_destroy (&conn_info->control_buffer->sem2);
+		sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+		sem_destroy (&conn_info->control_buffer->sem_request);
+		sem_destroy (&conn_info->control_buffer->sem_response);
+		sem_destroy (&conn_info->control_buffer->sem_dispatch);
 #else
-		semctl (conn_info->semid, 0, IPC_RMID);
+		semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
 #endif
 
 		/*
@@ -1181,33 +1167,11 @@ void *coroipcs_private_data_get (void *conn)
 int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-	int res;
 
 	memcpy (conn_info->response_buffer, msg, mlen);
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&conn_info->control_buffer->sem1);
-	if (res == -1) {
-		return (-1);
-	}
-#else
-	sop.sem_num = 1;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	} else
-	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-		return (0);
-	}
-#endif
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
+
 	api->stats_increment_value (conn_info->stats_handle, "responses");
 	return (0);
 }
@@ -1215,10 +1179,6 @@ retry_semop:
 int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-	int res;
 	int write_idx = 0;
 	int i;
 
@@ -1228,26 +1188,8 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in
 		write_idx += iov[i].iov_len;
 	}
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&conn_info->control_buffer->sem1);
-	if (res == -1) {
-		return (-1);
-	}
-#else
-	sop.sem_num = 1;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	} else
-	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-		return (0);
-	}
-#endif
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
+
 	api->stats_increment_value (conn_info->stats_handle, "responses");
 	return (0);
 }
@@ -1283,86 +1225,31 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l
 	conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
 }
 
-/**
- * simulate the behaviour in coroipcc.c
- */
-static int flow_control_event_send (struct conn_info *conn_info, char event)
-{
-	int new_fc = 0;
-
-	if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
-		event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-		new_fc = 1;
-	}
-
-	if (conn_info->flow_control_state != new_fc) {
-		if (new_fc == 1) {
-			log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control for %d, event %d\n",
-				conn_info->client_pid, event);
-		} else {
-			log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control for %d, event %d\n",
-				conn_info->client_pid, event);
-		}
-		conn_info->flow_control_state = new_fc;
-		api->stats_update_value (conn_info->stats_handle, "flow_control",
-			&conn_info->flow_control_state,
-			sizeof(conn_info->flow_control_state));
-		api->stats_increment_value (conn_info->stats_handle, "flow_control_count");
-	}
-
-	return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
-}
-
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
 	int res;
 	int i;
+	char buf;
 
 	for (i = 0; i < iov_len; i++) {
 		memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
 	}
 
-	if (list_empty (&conn_info->outq_head))
-		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_EMPTY);
-	else
-		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_NOT_EMPTY);
-
-	if (res == -1 && errno == EAGAIN) {
-		if (locked == 0) {
-			pthread_mutex_lock (&conn_info->mutex);
-		}
+	buf = list_empty (&conn_info->outq_head);
+	res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+	if (res != 1) {
 		conn_info->pending_semops += 1;
-		if (locked == 0) {
-			pthread_mutex_unlock (&conn_info->mutex);
+		if (conn_info->poll_state == POLL_STATE_IN) {
+			conn_info->poll_state = POLL_STATE_INOUT;
+			api->poll_dispatch_modify (conn_info->fd,
+				POLLIN|POLLOUT|POLLNVAL);
 		}
-		api->poll_dispatch_modify (conn_info->fd,
-			POLLIN|POLLOUT|POLLNVAL);
-	} else
-	if (res == -1) {
-		ipc_disconnect (conn_info);
-	}
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&conn_info->control_buffer->sem2);
-#else
-	sop.sem_num = 2;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	} else
-	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-		return;
 	}
-#endif
+
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
+
 	api->stats_increment_value (conn_info->stats_handle, "dispatched");
 }
 
@@ -1371,11 +1258,10 @@ static void outq_flush (struct conn_info *conn_info) {
 	struct outq_item *outq_item;
 	unsigned int bytes_left;
 	struct iovec iov;
-	int res;
 
 	pthread_mutex_lock (&conn_info->mutex);
 	if (list_empty (&conn_info->outq_head)) {
-		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_FLUSH_NR);
+		flow_control_state_set (conn_info, 0);
 		pthread_mutex_unlock (&conn_info->mutex);
 		return;
 	}
@@ -1441,7 +1327,7 @@ retry_recv:
 	semun.buf = &ipc_set;
 
 	for (i = 0; i < 3; i++) {
-		res = semctl (conn_info->semid, 0, IPC_SET, semun);
+		res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, semun);
 		if (res == -1) {
 			return (-1);
 		}
@@ -1471,6 +1357,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
 		bytes_msg += iov[i].iov_len;
 	}
 	if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+		flow_control_state_set (conn_info, 1);
 		outq_item = api->malloc (sizeof (struct outq_item));
 		if (outq_item == NULL) {
 			ipc_disconnect (conn);
@@ -1491,11 +1378,6 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
 		outq_item->mlen = bytes_msg;
 		list_init (&outq_item->list);
 		pthread_mutex_lock (&conn_info->mutex);
-		if (list_empty (&conn_info->outq_head)) {
-			conn_info->notify_flow_control_enabled = 1;
-			api->poll_dispatch_modify (conn_info->fd,
-				POLLIN|POLLOUT|POLLNVAL);
-		}
 		list_add_tail (&outq_item->list, &conn_info->outq_head);
 		pthread_mutex_unlock (&conn_info->mutex);
 		api->stats_increment_value (conn_info->stats_handle, "queue_size");
@@ -1742,11 +1624,10 @@ int coroipcs_handler_dispatch (
 
 		conn_info->service = req_setup->service;
 		conn_info->refcount = 0;
-		conn_info->notify_flow_control_enabled = 0;
 		conn_info->setup_bytes_read = 0;
 
 #if _POSIX_THREAD_PROCESS_SHARED < 1
-		conn_info->semid = semget (conn_info->semkey, 3, 0600);
+		conn_info->control_buffer->semid = semget (conn_info->semkey, 3, 0600);
 #endif
 		conn_info->pending_semops = 0;
 
@@ -1794,9 +1675,6 @@ int coroipcs_handler_dispatch (
 		res = recv (fd, &buf, 1, MSG_NOSIGNAL);
 		if (res == 1) {
 			switch (buf) {
-			case MESSAGE_REQ_OUTQ_FLUSH:
-				outq_flush (conn_info);
-				break;
 			case MESSAGE_REQ_CHANGE_EUID:
 				if (priv_change (conn_info) == -1) {
 					ipc_disconnect (conn_info);
@@ -1820,37 +1698,24 @@ int coroipcs_handler_dispatch (
 		coroipcs_refcount_dec (conn_info);
 	}
 
-	coroipcs_refcount_inc (conn_info);
-	pthread_mutex_lock (&conn_info->mutex);
-	if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
-		if (list_empty (&conn_info->outq_head))
-			buf = MESSAGE_RES_OUTQ_EMPTY;
-		else
-			buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+	if (revent & POLLOUT) {
+		int psop = conn_info->pending_semops;
+		int i;
 
-		for (; conn_info->pending_semops;) {
-			res = flow_control_event_send (conn_info, buf);
-			if (res == 1) {
-				conn_info->pending_semops--;
+		assert (psop != 0);
+		for (i = 0; i < psop; i++) {
+			res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+			if (res != 1) {
+				return (0);
 			} else {
-				break;
-			}
-		}
-		if (conn_info->notify_flow_control_enabled) {
-			res = flow_control_event_send (conn_info, MESSAGE_RES_ENABLE_FLOWCONTROL);
-			if (res == 1) {
-				conn_info->notify_flow_control_enabled = 0;
+				conn_info->pending_semops -= 1;
 			}
 		}
-		if (conn_info->notify_flow_control_enabled == 0 &&
-			conn_info->pending_semops == 0) {
-
-			api->poll_dispatch_modify (conn_info->fd,
-				POLLIN|POLLNVAL);
+		if (conn_info->poll_state == POLL_STATE_INOUT) {
+			conn_info->poll_state = POLL_STATE_IN;
+			api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLNVAL);
 		}
 	}
-	pthread_mutex_unlock (&conn_info->mutex);
-	coroipcs_refcount_dec (conn_info);
 
 	return (0);
 }
diff --git a/exec/mainconfig.c b/exec/mainconfig.c
index d8483e8..44bd47a 100644
--- a/exec/mainconfig.c
+++ b/exec/mainconfig.c
@@ -528,21 +528,25 @@ static int corosync_main_config_read_logging (
 				object_logger_subsys_handle,
 				"name", &value)) {
 
-				if ((strcmp(value, "corosync") == 0) &&
-				   (!objdb_get_string (objdb,
-					object_logger_subsys_handle,
-					"subsys", &value))) {
-
-					if (corosync_main_config_set (objdb,
-							object_logger_subsys_handle,
-							value,
-							&error_reason) < 0) {
-						goto parse_error;
+				if (strcmp(value, "corosync") == 0) {
+					if (!objdb_get_string (objdb,
+						object_logger_subsys_handle,
+						"subsys", &value)) {
+						if (corosync_main_config_set (objdb,
+								object_logger_subsys_handle,
+								value,
+								&error_reason) < 0) {
+							goto parse_error;
+						}
+					}
+					else {
+						if (corosync_main_config_set (objdb,
+								object_logger_subsys_handle,
+								NULL,
+								&error_reason) < 0) {
+							goto parse_error;
+						}
 					}
-				}
-				else {
-					error_reason = "subsys required for logging_daemon directive";
-					goto parse_error;
 				}
 			}
 			else {
diff --git a/exec/sync.c b/exec/sync.c
index ce115a3..c4197a8 100644
--- a/exec/sync.c
+++ b/exec/sync.c
@@ -479,8 +479,8 @@ static void sync_confchg_fn (
 	memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int));
 	my_member_list_entries = member_list_entries;
 
+	sync_aborted ();
 	if (sync_processing && sync_callbacks.sync_abort != NULL) {
-		sync_aborted ();
 		sync_callbacks.sync_abort ();
 		sync_callbacks.sync_activate = NULL;
 	}
diff --git a/exec/syncv2.c b/exec/syncv2.c
index 1b6c617..344d7d0 100644
--- a/exec/syncv2.c
+++ b/exec/syncv2.c
@@ -617,4 +617,5 @@ void sync_v2_memb_list_determine (const struct memb_ring_id *ring_id)
 void sync_v2_memb_list_abort (void)
 {
 	my_memb_determine_list_entries = 0;
+	memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id));
 }
diff --git a/exec/totemconfig.c b/exec/totemconfig.c
index 9128bdd..381cf4f 100644
--- a/exec/totemconfig.c
+++ b/exec/totemconfig.c
@@ -564,13 +564,6 @@ int totem_config_validate (
 		goto parse_error;
 	}
 
-	if (totem_config->consensus_timeout < 1.2 * totem_config->token_timeout) {
-		snprintf (local_error_reason, sizeof(local_error_reason),
-			"The consensus timeout parameter (%d ms) must be atleast 1.2 * token (%d ms).",
-			totem_config->consensus_timeout, (int) ((float)1.2 * totem_config->token_timeout));
-		goto parse_error;
-	}
-
 	if (totem_config->merge_timeout == 0) {
 		totem_config->merge_timeout = MERGE_TIMEOUT;
 	}
diff --git a/exec/totemsrp.c b/exec/totemsrp.c
index 6a11771..c60d942 100644
--- a/exec/totemsrp.c
+++ b/exec/totemsrp.c
@@ -3448,23 +3448,6 @@ static int message_handler_orf_token (
 		 * Discard retransmitted tokens
 		 */
 		if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
-			/*
-			 * If this processor receives a retransmitted token, it is sure
-		 	 * the previous processor is still alive.  As a result, it can
-			 * reset its token timeout.  If some processor previous to that
-			 * has failed, it will eventually not execute a reset of the
-			 * token timeout, and will cause a reconfiguration to occur.
-			 */
-			reset_token_timeout (instance);
-
-			if ((forward_token)
-				&& instance->use_heartbeat) {
-				reset_heartbeat_timeout(instance);
-			}
-			else {
-				cancel_heartbeat_timeout(instance);
-			}
-
 			return (0); /* discard token */
 		}
 		last_aru = instance->my_last_aru;
@@ -3904,12 +3887,15 @@ static int message_handler_memb_merge_detect (
 	return (0);
 }
 
-static int memb_join_process (
+static void memb_join_process (
 	struct totemsrp_instance *instance,
 	const struct memb_join *memb_join)
 {
 	struct srp_addr *proc_list;
 	struct srp_addr *failed_list;
+	int gather_entered = 0;
+	int fail_minus_memb_entries = 0;
+	struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
 
 	proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
 	failed_list = proc_list + memb_join->proc_list_entries;
@@ -3919,7 +3905,8 @@ static int memb_join_process (
 	memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
 	memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
 	memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
-*/
+-*/
+
 	if (memb_set_equal (proc_list,
 		memb_join->proc_list_entries,
 		instance->my_proc_list,
@@ -3942,7 +3929,7 @@ static int memb_join_process (
 				memb_state_commit_token_create (instance);
 
 				memb_state_commit_enter (instance);
-				return (0);
+				return;
 		}
 		if (memb_consensus_agreed (instance) &&
 			memb_lowest_in_config (instance)) {
@@ -3951,7 +3938,7 @@ static int memb_join_process (
 
 			memb_state_commit_enter (instance);
 		} else {
-			return (0);
+			return;
 		}
 	} else
 	if (memb_set_subset (proc_list,
@@ -3964,12 +3951,12 @@ static int memb_join_process (
 		instance->my_failed_list,
 		instance->my_failed_list_entries)) {
 
-		return (0);
+		return;
 	} else
 	if (memb_set_subset (&memb_join->system_from, 1,
 		instance->my_failed_list, instance->my_failed_list_entries)) {
 
-		return (0);
+		return;
 	} else {
 		memb_set_merge (proc_list,
 			memb_join->proc_list_entries,
@@ -3983,14 +3970,42 @@ static int memb_join_process (
 				&memb_join->system_from, 1,
 				instance->my_failed_list, &instance->my_failed_list_entries);
 		} else {
-			memb_set_merge (failed_list,
-				memb_join->failed_list_entries,
-				instance->my_failed_list, &instance->my_failed_list_entries);
+			if (memb_set_subset (
+				&memb_join->system_from, 1,
+				instance->my_memb_list,
+				instance->my_memb_entries)) {
+
+				if (memb_set_subset (
+					&memb_join->system_from, 1,
+					instance->my_failed_list,
+					instance->my_failed_list_entries) == 0) {
+
+					memb_set_merge (failed_list,
+						memb_join->failed_list_entries,
+						instance->my_failed_list, &instance->my_failed_list_entries);
+				} else {
+					memb_set_subtract (fail_minus_memb,
+						&fail_minus_memb_entries,
+						failed_list,
+						memb_join->failed_list_entries,
+						instance->my_memb_list,
+						instance->my_memb_entries);
+
+					memb_set_merge (fail_minus_memb,
+						fail_minus_memb_entries,
+						instance->my_failed_list,
+						&instance->my_failed_list_entries);
+				}
+			}
 		}
 		memb_state_gather_enter (instance, 11);
-		return (1); /* gather entered */
+		gather_entered = 1;
+	}
+	if (gather_entered == 0 &&
+		instance->memb_state == MEMB_STATE_OPERATIONAL) {
+
+		memb_state_gather_enter (instance, 12);
 	}
-	return (0); /* gather not entered */
 }
 
 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
@@ -4121,7 +4136,6 @@ static int message_handler_memb_join (
 {
 	const struct memb_join *memb_join;
 	struct memb_join *memb_join_convert = alloca (msg_len);
-	int gather_entered;
 
 	if (endian_conversion_needed) {
 		memb_join = memb_join_convert;
@@ -4144,11 +4158,7 @@ static int message_handler_memb_join (
 	}
 	switch (instance->memb_state) {
 		case MEMB_STATE_OPERATIONAL:
-			gather_entered = memb_join_process (instance,
-				memb_join);
-			if (gather_entered == 0) {
-				memb_state_gather_enter (instance, 12);
-			}
+			memb_join_process (instance, memb_join);
 			break;
 
 		case MEMB_STATE_GATHER:
diff --git a/include/corosync/coroipc_ipc.h b/include/corosync/coroipc_ipc.h
index 77cf0bc..acf4f02 100644
--- a/include/corosync/coroipc_ipc.h
+++ b/include/corosync/coroipc_ipc.h
@@ -35,6 +35,9 @@
 #define COROIPC_IPC_H_DEFINED
 
 #include <unistd.h>
+#include <poll.h>
+#include <time.h>
+#include "corotypes.h"
 #include "config.h"
 
 /*
@@ -52,28 +55,40 @@
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
 #include <semaphore.h>
+#else
+#include <sys/sem.h>
 #endif
 
+/*
+ * Define sem_wait timeout (real timeout will be (n-1;n) )
+ */
+#define IPC_SEMWAIT_TIMEOUT 2
+
 enum req_init_types {
 	MESSAGE_REQ_RESPONSE_INIT = 0,
 	MESSAGE_REQ_DISPATCH_INIT = 1
 };
 
 #define MESSAGE_REQ_CHANGE_EUID		1
-#define MESSAGE_REQ_OUTQ_FLUSH		2
 
-#define MESSAGE_RES_OUTQ_EMPTY         0
-#define MESSAGE_RES_OUTQ_NOT_EMPTY     1
-#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
-#define MESSAGE_RES_OUTQ_FLUSH_NR      3
+enum ipc_semaphore_identifiers {
+	SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT 	= 0,
+	SEMAPHORE_REQUEST			= 1,
+	SEMAPHORE_RESPONSE			= 2,
+	SEMAPHORE_DISPATCH			= 3
+};
 
 struct control_buffer {
 	unsigned int read;
 	unsigned int write;
+	int flow_control_enabled;
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-	sem_t sem0;
-	sem_t sem1;
-	sem_t sem2;
+	sem_t sem_request_or_flush_or_exit;
+	sem_t sem_response;
+	sem_t sem_dispatch;
+	sem_t sem_request;
+#else
+	int semid;
 #endif
 };
 
@@ -145,4 +160,173 @@ struct coroipcs_zc_header {
 #define ZC_FREE_HEADER		0xFFFFFFFE
 #define ZC_EXECUTE_HEADER	0xFFFFFFFD
 
+static inline cs_error_t
+ipc_sem_wait (
+	struct control_buffer *control_buffer,
+	enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+	struct sembuf sop;
+#else
+	struct timespec timeout;
+	sem_t *sem = NULL;
+#endif
+	int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	switch (sem_id) {
+	case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+		sem = &control_buffer->sem_request_or_flush_or_exit;
+		break;
+	case SEMAPHORE_RESPONSE:
+		sem = &control_buffer->sem_request;
+		break;
+	case SEMAPHORE_DISPATCH:
+		sem = &control_buffer->sem_response;
+		break;
+	case SEMAPHORE_REQUEST:
+		sem = &control_buffer->sem_dispatch;
+		break;
+	}
+
+	timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
+	timeout.tv_nsec = 0;
+
+retry_sem_timedwait:
+	res = sem_timedwait (sem, &timeout);
+	if (res == -1 && errno == ETIMEDOUT) {
+		return (CS_ERR_LIBRARY);
+	} else
+	if (res == -1 && errno == EINTR) {
+		goto retry_sem_timedwait;
+	} else
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#else
+	/*
+	 * Wait for semaphore indicating a new message from server
+	 * to client in queue
+	 */
+	sop.sem_num = sem_id;
+	sop.sem_op = -1;
+	sop.sem_flg = 0;
+
+retry_semop:
+	res = semop (control_buffer->semid, &sop, 1);
+	if (res == -1 && errno == EINTR) {
+		return (CS_ERR_TRY_AGAIN);
+		goto retry_semop;
+	} else
+	if (res == -1 && errno == EACCES) {
+		return (CS_ERR_TRY_AGAIN);
+	} else
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#endif
+	return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_post (
+	struct control_buffer *control_buffer,
+	enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+	struct sembuf sop;
+#else
+	sem_t *sem = NULL;
+#endif
+	int res;
+	
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	switch (sem_id) {
+	case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+		sem = &control_buffer->sem_request_or_flush_or_exit;
+		break;
+	case SEMAPHORE_RESPONSE:
+		sem = &control_buffer->sem_request;
+		break;
+	case SEMAPHORE_DISPATCH:
+		sem = &control_buffer->sem_response;
+		break;
+	case SEMAPHORE_REQUEST:
+		sem = &control_buffer->sem_dispatch;
+		break;
+	}
+
+	res = sem_post (sem);
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#else
+	sop.sem_num = sem_id;
+	sop.sem_op = 1;
+	sop.sem_flg = 0;
+
+retry_semop:
+	res = semop (control_buffer->semid, &sop, 1);
+	if (res == -1 && errno == EINTR) {
+		goto retry_semop;
+	} else
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#endif
+	return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_getvalue (
+	struct control_buffer *control_buffer,
+	enum ipc_semaphore_identifiers sem_id,
+	int *sem_value)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+	struct sembuf sop;
+	int sem_value_hold;
+#else
+	sem_t *sem = NULL;
+	int res;
+#endif
+	
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	switch (sem_id) {
+	case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+		sem = &control_buffer->sem_request_or_flush_or_exit;
+		break;
+	case SEMAPHORE_RESPONSE:
+		sem = &control_buffer->sem_request;
+		break;
+	case SEMAPHORE_DISPATCH:
+		sem = &control_buffer->sem_response;
+		break;
+	case SEMAPHORE_REQUEST:
+		sem = &control_buffer->sem_dispatch;
+		break;
+	}
+
+	res = sem_getvalue (sem, sem_value);
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#else
+	sop.sem_num = sem_id;
+	sop.sem_op = 1;
+	sop.sem_flg = 0;
+
+retry_semctl:
+	sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
+	if (sem_value_hold == -1 && errno == EINTR) {
+		goto retry_semctl;
+	} else
+	if (sem_value_hold == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+	*sem_value = sem_value_hold;
+#endif
+	return (CS_OK);
+}
+
 #endif /* COROIPC_IPC_H_DEFINED */
diff --git a/lib/coroipcc.c b/lib/coroipcc.c
index 1942cd1..4ece2dc 100644
--- a/lib/coroipcc.c
+++ b/lib/coroipcc.c
@@ -72,17 +72,8 @@
 
 #include "util.h"
 
-/*
- * Define sem_wait timeout (real timeout will be (n-1;n) )
- */
-#define IPC_SEMWAIT_TIMEOUT 2
-
 struct ipc_instance {
 	int fd;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	int semid;
-#endif
-	int flow_control_state;
 	struct control_buffer *control_buffer;
 	char *request_buffer;
 	char *response_buffer;
@@ -117,6 +108,23 @@ static void socket_nosigpipe(int s)
 #define MSG_NOSIGNAL 0
 #endif
 
+static inline int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
+{
+	unsigned int n_read;
+	unsigned int n_write;
+	unsigned int bytes_left;
+
+	n_read = context->control_buffer->read;
+	n_write = context->control_buffer->write;
+
+	if (n_read <= n_write) {
+		bytes_left = context->dispatch_size - n_write + n_read;
+	} else {
+		bytes_left = n_read - n_write;
+	}
+	return (bytes_left);
+}
+
 static cs_error_t
 socket_send (
 	int s,
@@ -238,10 +246,10 @@ res_exit:
 	return (res);
 }
 
-#if _POSIX_THREAD_PROCESS_SHARED < 1
 static int
 priv_change_send (struct ipc_instance *ipc_instance)
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	char buf_req;
 	mar_req_priv_change req_priv_change;
 	unsigned int res;
@@ -268,19 +276,12 @@ priv_change_send (struct ipc_instance *ipc_instance)
 	}
 
 	ipc_instance->euid = req_priv_change.euid;
+#else
+	ipc_instance = NULL;
+#endif
 	return (0);
 }
 
-#if defined(_SEM_SEMUN_UNDEFINED)
-union semun {
-        int val;
-        struct semid_ds *buf;
-        unsigned short int *array;
-        struct seminfo *__buf;
-};
-#endif
-#endif
-
 static int
 circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
 {
@@ -288,10 +289,11 @@ circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
 	void *addr_orig;
 	void *addr;
 	int32_t res;
-	char buffer[128];
 	int32_t i;
 	int32_t written;
-
+	char *buffer;
+	long page_size;
+	
 	snprintf (path, PATH_MAX, "/dev/shm/%s", file);
 
 	fd = mkstemp (path);
@@ -307,17 +309,25 @@ circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
 	if (res == -1) {
 		goto error_close_unlink;
 	}
-	memset (buffer, 0, sizeof (buffer));
-	for (i = 0; i < (bytes / 64); i++) {
+
+	page_size = sysconf(_SC_PAGESIZE);
+	buffer = malloc (page_size);
+	if (buffer == NULL) {
+		goto error_close_unlink;
+	}
+	memset (buffer, 0, page_size);
+	for (i = 0; i < (bytes / page_size); i++) {
 retry_write:
-		written = write (fd, buffer, 64);
+		written = write (fd, buffer, page_size);
 		if (written == -1 && errno == EINTR) {
 			goto retry_write;
 		}
-		if (written != 64) {
+		if (written != page_size) {
+			free (buffer);
 			goto error_close_unlink;
 		}
 	}
+	free (buffer);
 
 	addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
 		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
@@ -386,9 +396,10 @@ memory_map (char *path, const char *file, void **buf, size_t bytes)
 	void *addr_orig;
 	void *addr;
 	int32_t res;
-	char buffer[128];
+	char *buffer;
 	int32_t i;
 	int32_t written;
+	long page_size; 
 
 	snprintf (path, PATH_MAX, "/dev/shm/%s", file);
 
@@ -405,17 +416,24 @@ memory_map (char *path, const char *file, void **buf, size_t bytes)
 	if (res == -1) {
 		goto error_close_unlink;
 	}
-	memset (buffer, 0, sizeof (buffer));
-	for (i = 0; i < (bytes / 64); i++) {
+	page_size = sysconf(_SC_PAGESIZE);
+	buffer = malloc (page_size);
+	if (buffer == NULL) {
+		goto error_close_unlink;
+	}
+	memset (buffer, 0, page_size);
+	for (i = 0; i < (bytes / page_size); i++) {
 retry_write:
-		written = write (fd, buffer, 64);
+		written = write (fd, buffer, page_size);
 		if (written == -1 && errno == EINTR) {
 			goto retry_write;
 		}
-		if (written != 64) {
+		if (written != page_size) {
+			free (buffer);
 			goto error_close_unlink;
 		}
 	}
+	free (buffer);
 
 	addr_orig = mmap (NULL, bytes, PROT_NONE,
 		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
@@ -454,10 +472,6 @@ msg_send (
 	const struct iovec *iov,
 	unsigned int iov_len)
 {
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-
 	int i;
 	int res;
 	int req_buffer_idx = 0;
@@ -473,117 +487,18 @@ msg_send (
 		req_buffer_idx += iov[i].iov_len;
 	}
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&ipc_instance->control_buffer->sem0);
-	if (res == -1) {
-		return (CS_ERR_LIBRARY);
-	}
-#else 
 	/*
-	 * Signal semaphore #0 indicting a new message from client
+	 * Signal semaphore #3 and #0 indicting a new message from client
 	 * to server request queue
 	 */
-	sop.sem_num = 0;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (ipc_instance->semid, &sop, 1);
-	if (res == -1 && errno == EINTR) {
-		return (CS_ERR_TRY_AGAIN);
-	} else
-	if (res == -1 && errno == EACCES) {
-		priv_change_send (ipc_instance);
-		goto retry_semop;
-	} else
-	if (res == -1) {
-		return (CS_ERR_LIBRARY);
-	}
-#endif
-	return (CS_OK);
-}
-
-inline static cs_error_t
-ipc_sem_wait (
-	struct ipc_instance *ipc_instance,
-	int sem_num)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#else
-	struct timespec timeout;
-	struct pollfd pfd;
-	sem_t *sem = NULL;
-#endif
-	int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	switch (sem_num) {
-	case 0:
-		sem = &ipc_instance->control_buffer->sem0;
-		break;
-	case 1:
-		sem = &ipc_instance->control_buffer->sem1;
-		break;
-	case 2:
-		sem = &ipc_instance->control_buffer->sem2;
-		break;
-	}
-
-retry_semwait:
-	timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
-	timeout.tv_nsec = 0;
-
-	res = sem_timedwait (sem, &timeout);
-	if (res == -1 && errno == ETIMEDOUT) {
-		pfd.fd = ipc_instance->fd;
-		pfd.events = 0;
-
-		res = poll (&pfd, 1, 0);
-
-		if (res == -1 && errno == EINTR) {
-			return (CS_ERR_TRY_AGAIN);
-		} else
-		if (res == -1) {
-			return (CS_ERR_LIBRARY);
-		}
-
-		if (res == 1) {
-			if (pfd.revents == POLLERR || pfd.revents == POLLHUP || pfd.revents == POLLNVAL) {
-				return (CS_ERR_LIBRARY);
-			}
-		}
-
-		goto retry_semwait;
-	} else
-	if (res == -1 && errno == EINTR) {
-		return (CS_ERR_TRY_AGAIN);
-	} else
-	if (res == -1) {
+	res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
+	if (res != CS_OK) {
 		return (CS_ERR_LIBRARY);
 	}
-#else
-	/*
-	 * Wait for semaphore indicating a new message from server
-	 * to client in queue
-	 */
-	sop.sem_num = sem_num;
-	sop.sem_op = -1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (ipc_instance->semid, &sop, 1);
-	if (res == -1 && errno == EINTR) {
-		return (CS_ERR_TRY_AGAIN);
-	} else
-	if (res == -1 && errno == EACCES) {
-		priv_change_send (ipc_instance);
-		goto retry_semop;
-	} else
-	if (res == -1) {
+	res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+	if (res != CS_OK) {
 		return (CS_ERR_LIBRARY);
 	}
-#endif
 	return (CS_OK);
 }
 
@@ -594,10 +509,17 @@ reply_receive (
 	size_t res_len)
 {
 	coroipc_response_header_t *response_header;
-	cs_error_t err;
+	cs_error_t res;
 
-	if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
-		return (err);
+retry_ipc_sem_wait:
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+	if (res != CS_OK) {
+		if (res == CS_ERR_TRY_AGAIN) {
+			priv_change_send (ipc_instance);
+			goto retry_ipc_sem_wait;
+		} else {
+			return (res);
+		}
 	}
 
 	response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
@@ -614,10 +536,17 @@ reply_receive_in_buf (
 	struct ipc_instance *ipc_instance,
 	void **res_msg)
 {
-	cs_error_t err;
+	cs_error_t res;
 
-	if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
-		return (err);
+retry_ipc_sem_wait:
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+	if (res != CS_OK) {
+		if (res == CS_ERR_TRY_AGAIN) {
+			priv_change_send (ipc_instance);
+			goto retry_ipc_sem_wait;
+		} else {
+			return (res);
+		}
 	}
 
 	*res_msg = (char *)ipc_instance->response_buffer;
@@ -737,18 +666,22 @@ coroipcc_service_connect (
 	}
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-	sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
-	sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
-	sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
 #else
+{
+	int i;
+
 	/*
 	 * Allocate a semaphore segment
 	 */
 	while (1) {
 		semkey = random();
 		ipc_instance->euid = geteuid ();
-		if ((ipc_instance->semid
-		     = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
+		if ((ipc_instance->control_buffer->semid
+		     = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
 		      break;
 		}
 		/*
@@ -764,18 +697,15 @@ coroipcc_service_connect (
 		}
 	}
 
-	semun.val = 0;
-	sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
-	if (sys_res != 0) {
-		res = CS_ERR_LIBRARY;
-		goto error_exit;
-	}
-
-	sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
-	if (sys_res != 0) {
-		res = CS_ERR_LIBRARY;
-		goto error_exit;
+	for (i = 0; i < 4; i++) {
+		semun.val = 0;
+		sys_res = semctl (ipc_instance->control_buffer->semid, i, SETVAL, semun);
+		if (sys_res != 0) {
+			res = CS_ERR_LIBRARY;
+			goto error_exit;
+		}
 	}
+}
 #endif
 
 	/*
@@ -805,7 +735,6 @@ coroipcc_service_connect (
 	}
 
 	ipc_instance->fd = request_fd;
-	ipc_instance->flow_control_state = 0;
 
 	if (res_setup.error == CS_ERR_TRY_AGAIN) {
 		res = res_setup.error;
@@ -825,8 +754,8 @@ coroipcc_service_connect (
 
 error_exit:
 #if _POSIX_THREAD_PROCESS_SHARED < 1
-	if (ipc_instance->semid > 0)
-		semctl (ipc_instance->semid, 0, IPC_RMID);
+	if (ipc_instance->control_buffer->semid > 0)
+		semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
 #endif
 	memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
 error_dispatch_buffer:
@@ -876,7 +805,7 @@ coroipcc_dispatch_flow_control_get (
 		return (res);
 	}
 
-	*flow_control_state = ipc_instance->flow_control_state;
+	*flow_control_state = ipc_instance->control_buffer->flow_control_enabled;
 
 	hdb_handle_put (&ipc_hdb, handle);
 	return (res);
@@ -911,10 +840,9 @@ coroipcc_dispatch_get (
 	int poll_events;
 	char buf;
 	struct ipc_instance *ipc_instance;
-	int res;
-	char buf_two = 1;
 	char *data_addr;
 	cs_error_t error = CS_OK;
+	int res;
 
 	error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
 	if (error != CS_OK) {
@@ -945,46 +873,21 @@ coroipcc_dispatch_get (
 		goto error_put;
 	}
 
-	res = recv (ipc_instance->fd, &buf, 1, 0);
-	if (res == -1 && errno == EINTR) {
-		error = CS_ERR_TRY_AGAIN;
-		goto error_put;
-	} else
-	if (res == -1) {
-		error = CS_ERR_LIBRARY;
-		goto error_put;
-	} else
-	if (res == 0) {
-		/* Means that the peer closed cleanly the socket. However, it should
-		 * happen only on BSD and Darwing systems since poll() returns a
-		 * POLLHUP event on other systems.
+	error = socket_recv (ipc_instance->fd, &buf, 1);
+	assert (error == CS_OK);
+
+	if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
+		/*
+		 * Notify coroipcs to flush any pending dispatch messages
 		 */
-		error = CS_ERR_LIBRARY;
-		goto error_put;
-	}
-	ipc_instance->flow_control_state = 0;
-	if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-		ipc_instance->flow_control_state = 1;
-	}
-	/*
-	 * Notify executive to flush any pending dispatch messages
-	 */
-	if (ipc_instance->flow_control_state) {
-		buf_two = MESSAGE_REQ_OUTQ_FLUSH;
-		res = socket_send (ipc_instance->fd, &buf_two, 1);
-		assert (res == CS_OK); /* TODO */
-	}
-	/*
-	 * This is just a notification of flow control starting at the addition
-	 * of a new pending message, not a message to dispatch
-	 */
-	if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-		error = CS_ERR_TRY_AGAIN;
-		goto error_put;
-	}
-	if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
-		error = CS_ERR_TRY_AGAIN;
-		goto error_put;
+		
+		res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+		if (res != CS_OK) {
+			error = CS_ERR_LIBRARY;
+			goto error_put;
+		}
+
+
 	}
 
 	data_addr = ipc_instance->dispatch_buffer;
@@ -1013,8 +916,15 @@ coroipcc_dispatch_put (hdb_handle_t handle)
 		return (res);
 	}
 
-	if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
-		goto error_exit;
+retry_ipc_sem_wait:
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
+	if (res != CS_OK) {
+		if (res == CS_ERR_TRY_AGAIN) {
+			priv_change_send (ipc_instance);
+			goto retry_ipc_sem_wait;
+		} else {
+			goto error_exit;
+		}
 	}
 
 	addr = ipc_instance->dispatch_buffer;
@@ -1061,8 +971,8 @@ coroipcc_msg_send_reply_receive (
 	res = reply_receive (ipc_instance, res_msg, res_len);
 
 error_exit:
-	hdb_handle_put (&ipc_hdb, handle);
 	pthread_mutex_unlock (&ipc_instance->mutex);
+	hdb_handle_put (&ipc_hdb, handle);
 
 	return (res);
 }
diff --git a/man/corosync.conf.5 b/man/corosync.conf.5
index 15cff18..5d3f7b5 100644
--- a/man/corosync.conf.5
+++ b/man/corosync.conf.5
@@ -333,6 +333,12 @@ achieved before starting a new round of membership configuration.  The minimum
 value for consensus must be 1.2 * token.  This value will be automatically
 calculated at 1.2 * token if the user doesn't specify a consensus value.
 
+For two node clusters, a consensus larger then the join timeout but less then
+token is safe.  For three node or larger clusters, consensus should be larger
+then token.  There is an increasing risk of odd membership changes, which stil
+guarantee virtual synchrony,  as node count grows if consensus is less than
+token.
+
 The default is 1200 milliseconds.
 
 .TP
-- 
corosync Debian packaging
    
    
More information about the Debian-ha-svn-commits
mailing list