[Debian-ha-svn-commits] [SCM] corosync Debian packaging branch, experimental, updated. debian/1.2.7-1
Guido Günther
agx at sigxcpu.org
Fri Aug 13 14:38:09 UTC 2010
The following commit has been merged in the experimental branch:
commit 1c157d6ac5b0a3cd847154f596d22691b4c17314
Author: Guido Günther <agx at sigxcpu.org>
Date: Fri Aug 13 09:29:48 2010 +0200
New upstream version 1.2.7
diff --git a/CHANGELOG b/CHANGELOG
index 49e72fe..01b307e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,4 +1,103 @@
------------------------------------------------------------------------
+r3008 | sdake | 2010-07-27 14:37:46 -0700 (Tue, 27 Jul 2010) | 6 lines
+
+Merge trunk revision 3007:
+r3007 | sdake | 2010-07-27 14:32:39 -0700 (Tue, 27 Jul 2010) | 2 lines
+
+Change trunk version to 1.2.7.
+
+
+------------------------------------------------------------------------
+r3006 | sdake | 2010-07-27 12:08:29 -0700 (Tue, 27 Jul 2010) | 7 lines
+
+Merge trunk revision 3005:
+r3005 | sdake | 2010-07-27 12:00:37 -0700 (Tue, 27 Jul 2010) | 3 lines
+
+Remove consensus check for two node cluster cases which can have smaller
+consensus values. Document in man page the behavior of consensus.
+
+
+------------------------------------------------------------------------
+r3004 | asalkeld | 2010-07-21 16:16:41 -0700 (Wed, 21 Jul 2010) | 5 lines
+
+Merge trunk revision 3002:
+r3002 | sdake | 2010-07-22 06:48:40 +1000 (Thu, 22 Jul 2010) | 2 lines
+Fix merge error with revision 3001.
+
+
+------------------------------------------------------------------------
+r3003 | asalkeld | 2010-07-21 16:13:42 -0700 (Wed, 21 Jul 2010) | 6 lines
+
+Merge trunk revision 3001:
+r3001 | sdake | 2010-07-22 03:03:36 +1000 (Thu, 22 Jul 2010) | 3 lines
+Fix problem where flow control could lock up ipc under very heavy load in very
+rare circumstances.
+
+
+------------------------------------------------------------------------
+r3000 | sdake | 2010-07-19 13:20:25 -0700 (Mon, 19 Jul 2010) | 12 lines
+
+Merge trunk revision 2995:
+r2995 | asalkeld | 2010-07-16 21:59:40 -0700 (Fri, 16 Jul 2010) | 9 lines
+
+SYNC: always call sync_aborted() in sync_confchg_fn().
+
+1) sync_callbacks.sync_abort can be null.
+2) sync_processing is set to 0 after syncv1 is done.
+ Then syncv2 processing is down. If we get a config change
+ after syncv1 is down, but before syncv2 is done then it won't
+ get aborted.
+
+
+------------------------------------------------------------------------
+r2999 | sdake | 2010-07-19 13:19:30 -0700 (Mon, 19 Jul 2010) | 6 lines
+
+Merge trunk revision 2991:
+r2991 | asalkeld | 2010-07-15 19:08:54 -0700 (Thu, 15 Jul 2010) | 2 lines
+
+ SYNCV2: reset the my_memb_determine_ring_id in sync_v2_memb_list_abort()
+
+
+------------------------------------------------------------------------
+r2998 | sdake | 2010-07-19 08:29:30 -0700 (Mon, 19 Jul 2010) | 7 lines
+
+Merge trunk revision 2997:
+r2997 | fabbione | 2010-07-18 23:36:48 -0700 (Sun, 18 Jul 2010) | 4 lines
+
+Fix logging_daemon config parser code.
+
+Resolves: rhbz#615203
+
+------------------------------------------------------------------------
+r2989 | sdake | 2010-07-14 11:36:53 -0700 (Wed, 14 Jul 2010) | 8 lines
+
+Merge trunk revision 2988:
+r2988 | sdake | 2010-07-14 11:35:36 -0700 (Wed, 14 Jul 2010) | 4 lines
+
+Remove reset of token timeout on retransmitted token reception. The timer
+should only be reset when a real token is received or membership protocol
+could run into problems with certain timing parameters.
+
+
+------------------------------------------------------------------------
+r2987 | sdake | 2010-07-07 14:45:01 -0700 (Wed, 07 Jul 2010) | 6 lines
+
+Merge trunk revision 2986:
+r2986 | sdake | 2010-07-07 14:43:15 -0700 (Wed, 07 Jul 2010) | 2 lines
+
+Speed up IPC connection process.
+
+
+------------------------------------------------------------------------
+r2985 | sdake | 2010-07-03 15:00:16 -0700 (Sat, 03 Jul 2010) | 6 lines
+
+Merge trunk revision 2984:
+r2984 | sdake | 2010-07-03 14:54:22 -0700 (Sat, 03 Jul 2010) | 2 lines
+
+Fix fail list fault that occurs in very rare circumstances.
+
+
+------------------------------------------------------------------------
r2982 | sdake | 2010-06-30 12:47:14 -0700 (Wed, 30 Jun 2010) | 6 lines
Merge trunk revision 2980:
diff --git a/configure b/configure
index 35df3dd..5130e7f 100755
--- a/configure
+++ b/configure
@@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
-# Generated by GNU Autoconf 2.63 for corosync 1.2.6.
+# Generated by GNU Autoconf 2.63 for corosync 1.2.7.
#
# Report bugs to <openais at lists.osdl.org>.
#
@@ -596,8 +596,8 @@ SHELL=${CONFIG_SHELL-/bin/sh}
# Identity of this package.
PACKAGE_NAME='corosync'
PACKAGE_TARNAME='corosync'
-PACKAGE_VERSION='1.2.6'
-PACKAGE_STRING='corosync 1.2.6'
+PACKAGE_VERSION='1.2.7'
+PACKAGE_STRING='corosync 1.2.7'
PACKAGE_BUGREPORT='openais at lists.osdl.org'
ac_unique_file="lib/coroipcc.c"
@@ -1355,7 +1355,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
-\`configure' configures corosync 1.2.6 to adapt to many kinds of systems.
+\`configure' configures corosync 1.2.7 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@@ -1425,7 +1425,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
- short | recursive ) echo "Configuration of corosync 1.2.6:";;
+ short | recursive ) echo "Configuration of corosync 1.2.7:";;
esac
cat <<\_ACEOF
@@ -1536,7 +1536,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
-corosync configure 1.2.6
+corosync configure 1.2.7
generated by GNU Autoconf 2.63
Copyright (C) 1992, 1993, 1994, 1995, 1996, 1998, 1999, 2000, 2001,
@@ -1550,7 +1550,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
-It was created by corosync $as_me 1.2.6, which was
+It was created by corosync $as_me 1.2.7, which was
generated by GNU Autoconf 2.63. Invocation command line was
$ $0 $@
@@ -2399,7 +2399,7 @@ fi
# Define the identity of the package.
PACKAGE='corosync'
- VERSION='1.2.6'
+ VERSION='1.2.7'
cat >>confdefs.h <<_ACEOF
@@ -2538,7 +2538,7 @@ ac_compiler_gnu=$ac_cv_c_compiler_gnu
# Define SVN revision
cat >>confdefs.h <<\_ACEOF
-#define SVN_REVISION "2982"
+#define SVN_REVISION "3008"
_ACEOF
@@ -11278,7 +11278,7 @@ exec 6>&1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
-This file was extended by corosync $as_me 1.2.6, which was
+This file was extended by corosync $as_me 1.2.7, which was
generated by GNU Autoconf 2.63. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@@ -11345,7 +11345,7 @@ Report bugs to <bug-autoconf at gnu.org>."
_ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_version="\\
-corosync config.status 1.2.6
+corosync config.status 1.2.7
configured by $0, generated by GNU Autoconf 2.63,
with options \\"`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`\\"
diff --git a/configure.ac b/configure.ac
index cbe26c2..7079583 100644
--- a/configure.ac
+++ b/configure.ac
@@ -4,7 +4,7 @@
# bootstrap / init
AC_PREREQ([2.61])
-AC_INIT([corosync], [1.2.6], [openais at lists.osdl.org])
+AC_INIT([corosync], [1.2.7], [openais at lists.osdl.org])
AM_INIT_AUTOMAKE([-Wno-portability])
AC_CONFIG_SRCDIR([lib/coroipcc.c])
diff --git a/exec/coroipcs.c b/exec/coroipcs.c
index 923ed87..aca9053 100644
--- a/exec/coroipcs.c
+++ b/exec/coroipcs.c
@@ -95,6 +95,9 @@
#define MSG_SEND_LOCKED 0
#define MSG_SEND_UNLOCKED 1
+#define POLL_STATE_IN 1
+#define POLL_STATE_INOUT 2
+
static struct coroipcs_init_state_v2 *api = NULL;
DECLARE_LIST_INIT (conn_info_list_head);
@@ -141,13 +144,10 @@ struct conn_info {
pthread_attr_t thread_attr;
unsigned int service;
enum conn_state state;
- int notify_flow_control_enabled;
- int flow_control_state;
int refcount;
hdb_handle_t stats_handle;
#if _POSIX_THREAD_PROCESS_SHARED < 1
key_t semkey;
- int semid;
#endif
unsigned int pending_semops;
pthread_mutex_t mutex;
@@ -166,6 +166,7 @@ struct conn_info {
unsigned int setup_bytes_read;
struct list_head zcb_mapped_list_head;
char *sending_allowed_private_data[64];
+ int poll_state;
};
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
@@ -221,34 +222,6 @@ static void dummy_stats_increment_value (
{
}
-static void sem_post_exit_thread (struct conn_info *conn_info)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semop:
- res = sem_post (&conn_info->control_buffer->sem0);
- if (res == -1 && errno == EINTR) {
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
- goto retry_semop;
- }
-#else
- sop.sem_num = 0;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
- goto retry_semop;
- }
-#endif
-}
-
static int
memory_map (
const char *path,
@@ -383,6 +356,34 @@ circular_memory_unmap (void *buf, size_t bytes)
return (res);
}
+static void flow_control_state_set (
+ struct conn_info *conn_info,
+ int flow_control_state)
+{
+ if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
+ return;
+ }
+ if (flow_control_state == 0) {
+ log_printf (LOGSYS_LEVEL_DEBUG,
+ "Disabling flow control for %d\n",
+ conn_info->client_pid);
+ } else
+ if (flow_control_state == 1) {
+ log_printf (LOGSYS_LEVEL_DEBUG,
+ "Enabling flow control for %d\n",
+ conn_info->client_pid);
+ }
+
+
+ conn_info->control_buffer->flow_control_enabled = flow_control_state;
+ api->stats_update_value (conn_info->stats_handle,
+ "flow_control",
+ &flow_control_state,
+ sizeof(flow_control_state));
+ api->stats_increment_value (conn_info->stats_handle,
+ "flow_control_count");
+}
+
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
{
unsigned int res;
@@ -517,7 +518,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
}
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
- sem_post_exit_thread (conn_info);
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
return (0);
}
@@ -546,11 +547,12 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
pthread_mutex_unlock (&conn_info->mutex);
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_destroy (&conn_info->control_buffer->sem0);
- sem_destroy (&conn_info->control_buffer->sem1);
- sem_destroy (&conn_info->control_buffer->sem2);
+ sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+ sem_destroy (&conn_info->control_buffer->sem_request);
+ sem_destroy (&conn_info->control_buffer->sem_response);
+ sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
- semctl (conn_info->semid, 0, IPC_RMID);
+ semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
* Destroy shared memory segment and semaphore
@@ -653,14 +655,12 @@ static inline void zerocopy_operations_process (
static void *pthread_ipc_consumer (void *conn)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
int res;
coroipc_request_header_t *header;
coroipc_response_header_t coroipc_response_header;
int send_ok;
unsigned int new_message;
+ int sem_value = 0;
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
if (api->sched_policy != 0) {
@@ -670,43 +670,28 @@ static void *pthread_ipc_consumer (void *conn)
#endif
for (;;) {
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semwait:
- res = sem_wait (&conn_info->control_buffer->sem0);
+ ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
if (ipc_thread_active (conn_info) == 0) {
coroipcs_refcount_dec (conn_info);
pthread_exit (0);
}
- if ((res == -1) && (errno == EINTR)) {
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
- goto retry_semwait;
- }
-#else
- sop.sem_num = 0;
- sop.sem_op = -1;
- sop.sem_flg = 0;
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if (ipc_thread_active (conn_info) == 0) {
- coroipcs_refcount_dec (conn_info);
- pthread_exit (0);
- }
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- coroipcs_refcount_dec (conn_info);
- pthread_exit (0);
- }
-#endif
+ outq_flush (conn_info);
+ ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value);
+ if (sem_value > 0) {
+
+ res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST);
+ } else {
+ continue;
+ }
+
zerocopy_operations_process (conn_info, &header, &new_message);
/*
* There is no new message to process, continue for loop
*/
if (new_message == 0) {
+printf ("continuing\n");
continue;
}
@@ -738,7 +723,6 @@ retry_semop:
/*
* Overload, tell library to retry
*/
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
coroipc_response_header.size = sizeof (coroipc_response_header_t);
coroipc_response_header.id = 0;
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
@@ -928,7 +912,7 @@ static void ipc_disconnect (struct conn_info *conn_info)
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
pthread_mutex_unlock (&conn_info->mutex);
- sem_post_exit_thread (conn_info);
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
}
static int conn_info_create (int fd)
@@ -945,6 +929,7 @@ static int conn_info_create (int fd)
conn_info->client_pid = 0;
conn_info->service = SOCKET_SERVICE_INIT;
conn_info->state = CONN_STATE_THREAD_INACTIVE;
+ conn_info->poll_state = POLL_STATE_IN;
list_init (&conn_info->outq_head);
list_init (&conn_info->list);
list_init (&conn_info->zcb_mapped_list_head);
@@ -1103,11 +1088,12 @@ void coroipcs_ipc_exit (void)
ipc_disconnect (conn_info);
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_destroy (&conn_info->control_buffer->sem0);
- sem_destroy (&conn_info->control_buffer->sem1);
- sem_destroy (&conn_info->control_buffer->sem2);
+ sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+ sem_destroy (&conn_info->control_buffer->sem_request);
+ sem_destroy (&conn_info->control_buffer->sem_response);
+ sem_destroy (&conn_info->control_buffer->sem_dispatch);
#else
- semctl (conn_info->semid, 0, IPC_RMID);
+ semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
#endif
/*
@@ -1181,33 +1167,11 @@ void *coroipcs_private_data_get (void *conn)
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
memcpy (conn_info->response_buffer, msg, mlen);
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&conn_info->control_buffer->sem1);
- if (res == -1) {
- return (-1);
- }
-#else
- sop.sem_num = 1;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- return (0);
- }
-#endif
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
+
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
@@ -1215,10 +1179,6 @@ retry_semop:
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
- int res;
int write_idx = 0;
int i;
@@ -1228,26 +1188,8 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in
write_idx += iov[i].iov_len;
}
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&conn_info->control_buffer->sem1);
- if (res == -1) {
- return (-1);
- }
-#else
- sop.sem_num = 1;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- return (0);
- }
-#endif
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
+
api->stats_increment_value (conn_info->stats_handle, "responses");
return (0);
}
@@ -1283,86 +1225,31 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l
conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
}
-/**
- * simulate the behaviour in coroipcc.c
- */
-static int flow_control_event_send (struct conn_info *conn_info, char event)
-{
- int new_fc = 0;
-
- if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
- event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
- new_fc = 1;
- }
-
- if (conn_info->flow_control_state != new_fc) {
- if (new_fc == 1) {
- log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control for %d, event %d\n",
- conn_info->client_pid, event);
- } else {
- log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control for %d, event %d\n",
- conn_info->client_pid, event);
- }
- conn_info->flow_control_state = new_fc;
- api->stats_update_value (conn_info->stats_handle, "flow_control",
- &conn_info->flow_control_state,
- sizeof(conn_info->flow_control_state));
- api->stats_increment_value (conn_info->stats_handle, "flow_control_count");
- }
-
- return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
-}
-
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
int locked)
{
struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
int res;
int i;
+ char buf;
for (i = 0; i < iov_len; i++) {
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
}
- if (list_empty (&conn_info->outq_head))
- res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_EMPTY);
- else
- res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_NOT_EMPTY);
-
- if (res == -1 && errno == EAGAIN) {
- if (locked == 0) {
- pthread_mutex_lock (&conn_info->mutex);
- }
+ buf = list_empty (&conn_info->outq_head);
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res != 1) {
conn_info->pending_semops += 1;
- if (locked == 0) {
- pthread_mutex_unlock (&conn_info->mutex);
+ if (conn_info->poll_state == POLL_STATE_IN) {
+ conn_info->poll_state = POLL_STATE_INOUT;
+ api->poll_dispatch_modify (conn_info->fd,
+ POLLIN|POLLOUT|POLLNVAL);
}
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLOUT|POLLNVAL);
- } else
- if (res == -1) {
- ipc_disconnect (conn_info);
- }
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&conn_info->control_buffer->sem2);
-#else
- sop.sem_num = 2;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (conn_info->semid, &sop, 1);
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
- goto retry_semop;
- } else
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
- return;
}
-#endif
+
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
+
api->stats_increment_value (conn_info->stats_handle, "dispatched");
}
@@ -1371,11 +1258,10 @@ static void outq_flush (struct conn_info *conn_info) {
struct outq_item *outq_item;
unsigned int bytes_left;
struct iovec iov;
- int res;
pthread_mutex_lock (&conn_info->mutex);
if (list_empty (&conn_info->outq_head)) {
- res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_FLUSH_NR);
+ flow_control_state_set (conn_info, 0);
pthread_mutex_unlock (&conn_info->mutex);
return;
}
@@ -1441,7 +1327,7 @@ retry_recv:
semun.buf = &ipc_set;
for (i = 0; i < 3; i++) {
- res = semctl (conn_info->semid, 0, IPC_SET, semun);
+ res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, semun);
if (res == -1) {
return (-1);
}
@@ -1471,6 +1357,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
bytes_msg += iov[i].iov_len;
}
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+ flow_control_state_set (conn_info, 1);
outq_item = api->malloc (sizeof (struct outq_item));
if (outq_item == NULL) {
ipc_disconnect (conn);
@@ -1491,11 +1378,6 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
outq_item->mlen = bytes_msg;
list_init (&outq_item->list);
pthread_mutex_lock (&conn_info->mutex);
- if (list_empty (&conn_info->outq_head)) {
- conn_info->notify_flow_control_enabled = 1;
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLOUT|POLLNVAL);
- }
list_add_tail (&outq_item->list, &conn_info->outq_head);
pthread_mutex_unlock (&conn_info->mutex);
api->stats_increment_value (conn_info->stats_handle, "queue_size");
@@ -1742,11 +1624,10 @@ int coroipcs_handler_dispatch (
conn_info->service = req_setup->service;
conn_info->refcount = 0;
- conn_info->notify_flow_control_enabled = 0;
conn_info->setup_bytes_read = 0;
#if _POSIX_THREAD_PROCESS_SHARED < 1
- conn_info->semid = semget (conn_info->semkey, 3, 0600);
+ conn_info->control_buffer->semid = semget (conn_info->semkey, 3, 0600);
#endif
conn_info->pending_semops = 0;
@@ -1794,9 +1675,6 @@ int coroipcs_handler_dispatch (
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
if (res == 1) {
switch (buf) {
- case MESSAGE_REQ_OUTQ_FLUSH:
- outq_flush (conn_info);
- break;
case MESSAGE_REQ_CHANGE_EUID:
if (priv_change (conn_info) == -1) {
ipc_disconnect (conn_info);
@@ -1820,37 +1698,24 @@ int coroipcs_handler_dispatch (
coroipcs_refcount_dec (conn_info);
}
- coroipcs_refcount_inc (conn_info);
- pthread_mutex_lock (&conn_info->mutex);
- if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
- if (list_empty (&conn_info->outq_head))
- buf = MESSAGE_RES_OUTQ_EMPTY;
- else
- buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+ if (revent & POLLOUT) {
+ int psop = conn_info->pending_semops;
+ int i;
- for (; conn_info->pending_semops;) {
- res = flow_control_event_send (conn_info, buf);
- if (res == 1) {
- conn_info->pending_semops--;
+ assert (psop != 0);
+ for (i = 0; i < psop; i++) {
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+ if (res != 1) {
+ return (0);
} else {
- break;
- }
- }
- if (conn_info->notify_flow_control_enabled) {
- res = flow_control_event_send (conn_info, MESSAGE_RES_ENABLE_FLOWCONTROL);
- if (res == 1) {
- conn_info->notify_flow_control_enabled = 0;
+ conn_info->pending_semops -= 1;
}
}
- if (conn_info->notify_flow_control_enabled == 0 &&
- conn_info->pending_semops == 0) {
-
- api->poll_dispatch_modify (conn_info->fd,
- POLLIN|POLLNVAL);
+ if (conn_info->poll_state == POLL_STATE_INOUT) {
+ conn_info->poll_state = POLL_STATE_IN;
+ api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLNVAL);
}
}
- pthread_mutex_unlock (&conn_info->mutex);
- coroipcs_refcount_dec (conn_info);
return (0);
}
diff --git a/exec/mainconfig.c b/exec/mainconfig.c
index d8483e8..44bd47a 100644
--- a/exec/mainconfig.c
+++ b/exec/mainconfig.c
@@ -528,21 +528,25 @@ static int corosync_main_config_read_logging (
object_logger_subsys_handle,
"name", &value)) {
- if ((strcmp(value, "corosync") == 0) &&
- (!objdb_get_string (objdb,
- object_logger_subsys_handle,
- "subsys", &value))) {
-
- if (corosync_main_config_set (objdb,
- object_logger_subsys_handle,
- value,
- &error_reason) < 0) {
- goto parse_error;
+ if (strcmp(value, "corosync") == 0) {
+ if (!objdb_get_string (objdb,
+ object_logger_subsys_handle,
+ "subsys", &value)) {
+ if (corosync_main_config_set (objdb,
+ object_logger_subsys_handle,
+ value,
+ &error_reason) < 0) {
+ goto parse_error;
+ }
+ }
+ else {
+ if (corosync_main_config_set (objdb,
+ object_logger_subsys_handle,
+ NULL,
+ &error_reason) < 0) {
+ goto parse_error;
+ }
}
- }
- else {
- error_reason = "subsys required for logging_daemon directive";
- goto parse_error;
}
}
else {
diff --git a/exec/sync.c b/exec/sync.c
index ce115a3..c4197a8 100644
--- a/exec/sync.c
+++ b/exec/sync.c
@@ -479,8 +479,8 @@ static void sync_confchg_fn (
memcpy (my_member_list, member_list, member_list_entries * sizeof (unsigned int));
my_member_list_entries = member_list_entries;
+ sync_aborted ();
if (sync_processing && sync_callbacks.sync_abort != NULL) {
- sync_aborted ();
sync_callbacks.sync_abort ();
sync_callbacks.sync_activate = NULL;
}
diff --git a/exec/syncv2.c b/exec/syncv2.c
index 1b6c617..344d7d0 100644
--- a/exec/syncv2.c
+++ b/exec/syncv2.c
@@ -617,4 +617,5 @@ void sync_v2_memb_list_determine (const struct memb_ring_id *ring_id)
void sync_v2_memb_list_abort (void)
{
my_memb_determine_list_entries = 0;
+ memset (&my_memb_determine_ring_id, 0, sizeof (struct memb_ring_id));
}
diff --git a/exec/totemconfig.c b/exec/totemconfig.c
index 9128bdd..381cf4f 100644
--- a/exec/totemconfig.c
+++ b/exec/totemconfig.c
@@ -564,13 +564,6 @@ int totem_config_validate (
goto parse_error;
}
- if (totem_config->consensus_timeout < 1.2 * totem_config->token_timeout) {
- snprintf (local_error_reason, sizeof(local_error_reason),
- "The consensus timeout parameter (%d ms) must be atleast 1.2 * token (%d ms).",
- totem_config->consensus_timeout, (int) ((float)1.2 * totem_config->token_timeout));
- goto parse_error;
- }
-
if (totem_config->merge_timeout == 0) {
totem_config->merge_timeout = MERGE_TIMEOUT;
}
diff --git a/exec/totemsrp.c b/exec/totemsrp.c
index 6a11771..c60d942 100644
--- a/exec/totemsrp.c
+++ b/exec/totemsrp.c
@@ -3448,23 +3448,6 @@ static int message_handler_orf_token (
* Discard retransmitted tokens
*/
if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
- /*
- * If this processor receives a retransmitted token, it is sure
- * the previous processor is still alive. As a result, it can
- * reset its token timeout. If some processor previous to that
- * has failed, it will eventually not execute a reset of the
- * token timeout, and will cause a reconfiguration to occur.
- */
- reset_token_timeout (instance);
-
- if ((forward_token)
- && instance->use_heartbeat) {
- reset_heartbeat_timeout(instance);
- }
- else {
- cancel_heartbeat_timeout(instance);
- }
-
return (0); /* discard token */
}
last_aru = instance->my_last_aru;
@@ -3904,12 +3887,15 @@ static int message_handler_memb_merge_detect (
return (0);
}
-static int memb_join_process (
+static void memb_join_process (
struct totemsrp_instance *instance,
const struct memb_join *memb_join)
{
struct srp_addr *proc_list;
struct srp_addr *failed_list;
+ int gather_entered = 0;
+ int fail_minus_memb_entries = 0;
+ struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
failed_list = proc_list + memb_join->proc_list_entries;
@@ -3919,7 +3905,8 @@ static int memb_join_process (
memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
-*/
+-*/
+
if (memb_set_equal (proc_list,
memb_join->proc_list_entries,
instance->my_proc_list,
@@ -3942,7 +3929,7 @@ static int memb_join_process (
memb_state_commit_token_create (instance);
memb_state_commit_enter (instance);
- return (0);
+ return;
}
if (memb_consensus_agreed (instance) &&
memb_lowest_in_config (instance)) {
@@ -3951,7 +3938,7 @@ static int memb_join_process (
memb_state_commit_enter (instance);
} else {
- return (0);
+ return;
}
} else
if (memb_set_subset (proc_list,
@@ -3964,12 +3951,12 @@ static int memb_join_process (
instance->my_failed_list,
instance->my_failed_list_entries)) {
- return (0);
+ return;
} else
if (memb_set_subset (&memb_join->system_from, 1,
instance->my_failed_list, instance->my_failed_list_entries)) {
- return (0);
+ return;
} else {
memb_set_merge (proc_list,
memb_join->proc_list_entries,
@@ -3983,14 +3970,42 @@ static int memb_join_process (
&memb_join->system_from, 1,
instance->my_failed_list, &instance->my_failed_list_entries);
} else {
- memb_set_merge (failed_list,
- memb_join->failed_list_entries,
- instance->my_failed_list, &instance->my_failed_list_entries);
+ if (memb_set_subset (
+ &memb_join->system_from, 1,
+ instance->my_memb_list,
+ instance->my_memb_entries)) {
+
+ if (memb_set_subset (
+ &memb_join->system_from, 1,
+ instance->my_failed_list,
+ instance->my_failed_list_entries) == 0) {
+
+ memb_set_merge (failed_list,
+ memb_join->failed_list_entries,
+ instance->my_failed_list, &instance->my_failed_list_entries);
+ } else {
+ memb_set_subtract (fail_minus_memb,
+ &fail_minus_memb_entries,
+ failed_list,
+ memb_join->failed_list_entries,
+ instance->my_memb_list,
+ instance->my_memb_entries);
+
+ memb_set_merge (fail_minus_memb,
+ fail_minus_memb_entries,
+ instance->my_failed_list,
+ &instance->my_failed_list_entries);
+ }
+ }
}
memb_state_gather_enter (instance, 11);
- return (1); /* gather entered */
+ gather_entered = 1;
+ }
+ if (gather_entered == 0 &&
+ instance->memb_state == MEMB_STATE_OPERATIONAL) {
+
+ memb_state_gather_enter (instance, 12);
}
- return (0); /* gather not entered */
}
static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
@@ -4121,7 +4136,6 @@ static int message_handler_memb_join (
{
const struct memb_join *memb_join;
struct memb_join *memb_join_convert = alloca (msg_len);
- int gather_entered;
if (endian_conversion_needed) {
memb_join = memb_join_convert;
@@ -4144,11 +4158,7 @@ static int message_handler_memb_join (
}
switch (instance->memb_state) {
case MEMB_STATE_OPERATIONAL:
- gather_entered = memb_join_process (instance,
- memb_join);
- if (gather_entered == 0) {
- memb_state_gather_enter (instance, 12);
- }
+ memb_join_process (instance, memb_join);
break;
case MEMB_STATE_GATHER:
diff --git a/include/corosync/coroipc_ipc.h b/include/corosync/coroipc_ipc.h
index 77cf0bc..acf4f02 100644
--- a/include/corosync/coroipc_ipc.h
+++ b/include/corosync/coroipc_ipc.h
@@ -35,6 +35,9 @@
#define COROIPC_IPC_H_DEFINED
#include <unistd.h>
+#include <poll.h>
+#include <time.h>
+#include "corotypes.h"
#include "config.h"
/*
@@ -52,28 +55,40 @@
#if _POSIX_THREAD_PROCESS_SHARED > 0
#include <semaphore.h>
+#else
+#include <sys/sem.h>
#endif
+/*
+ * Define sem_wait timeout (real timeout will be (n-1;n) )
+ */
+#define IPC_SEMWAIT_TIMEOUT 2
+
enum req_init_types {
MESSAGE_REQ_RESPONSE_INIT = 0,
MESSAGE_REQ_DISPATCH_INIT = 1
};
#define MESSAGE_REQ_CHANGE_EUID 1
-#define MESSAGE_REQ_OUTQ_FLUSH 2
-#define MESSAGE_RES_OUTQ_EMPTY 0
-#define MESSAGE_RES_OUTQ_NOT_EMPTY 1
-#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
-#define MESSAGE_RES_OUTQ_FLUSH_NR 3
+enum ipc_semaphore_identifiers {
+ SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT = 0,
+ SEMAPHORE_REQUEST = 1,
+ SEMAPHORE_RESPONSE = 2,
+ SEMAPHORE_DISPATCH = 3
+};
struct control_buffer {
unsigned int read;
unsigned int write;
+ int flow_control_enabled;
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_t sem0;
- sem_t sem1;
- sem_t sem2;
+ sem_t sem_request_or_flush_or_exit;
+ sem_t sem_response;
+ sem_t sem_dispatch;
+ sem_t sem_request;
+#else
+ int semid;
#endif
};
@@ -145,4 +160,173 @@ struct coroipcs_zc_header {
#define ZC_FREE_HEADER 0xFFFFFFFE
#define ZC_EXECUTE_HEADER 0xFFFFFFFD
+static inline cs_error_t
+ipc_sem_wait (
+ struct control_buffer *control_buffer,
+ enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+ struct sembuf sop;
+#else
+ struct timespec timeout;
+ sem_t *sem = NULL;
+#endif
+ int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ switch (sem_id) {
+ case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+ sem = &control_buffer->sem_request_or_flush_or_exit;
+ break;
+ case SEMAPHORE_RESPONSE:
+ sem = &control_buffer->sem_request;
+ break;
+ case SEMAPHORE_DISPATCH:
+ sem = &control_buffer->sem_response;
+ break;
+ case SEMAPHORE_REQUEST:
+ sem = &control_buffer->sem_dispatch;
+ break;
+ }
+
+ timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
+ timeout.tv_nsec = 0;
+
+retry_sem_timedwait:
+ res = sem_timedwait (sem, &timeout);
+ if (res == -1 && errno == ETIMEDOUT) {
+ return (CS_ERR_LIBRARY);
+ } else
+ if (res == -1 && errno == EINTR) {
+ goto retry_sem_timedwait;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#else
+ /*
+ * Wait for semaphore indicating a new message from server
+ * to client in queue
+ */
+ sop.sem_num = sem_id;
+ sop.sem_op = -1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (control_buffer->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ return (CS_ERR_TRY_AGAIN);
+ goto retry_semop;
+ } else
+ if (res == -1 && errno == EACCES) {
+ return (CS_ERR_TRY_AGAIN);
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#endif
+ return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_post (
+ struct control_buffer *control_buffer,
+ enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+ struct sembuf sop;
+#else
+ sem_t *sem = NULL;
+#endif
+ int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ switch (sem_id) {
+ case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+ sem = &control_buffer->sem_request_or_flush_or_exit;
+ break;
+ case SEMAPHORE_RESPONSE:
+ sem = &control_buffer->sem_request;
+ break;
+ case SEMAPHORE_DISPATCH:
+ sem = &control_buffer->sem_response;
+ break;
+ case SEMAPHORE_REQUEST:
+ sem = &control_buffer->sem_dispatch;
+ break;
+ }
+
+ res = sem_post (sem);
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#else
+ sop.sem_num = sem_id;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semop:
+ res = semop (control_buffer->semid, &sop, 1);
+ if (res == -1 && errno == EINTR) {
+ goto retry_semop;
+ } else
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#endif
+ return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_getvalue (
+ struct control_buffer *control_buffer,
+ enum ipc_semaphore_identifiers sem_id,
+ int *sem_value)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+ struct sembuf sop;
+ int sem_value_hold;
+#else
+ sem_t *sem = NULL;
+ int res;
+#endif
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+ switch (sem_id) {
+ case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+ sem = &control_buffer->sem_request_or_flush_or_exit;
+ break;
+ case SEMAPHORE_RESPONSE:
+ sem = &control_buffer->sem_request;
+ break;
+ case SEMAPHORE_DISPATCH:
+ sem = &control_buffer->sem_response;
+ break;
+ case SEMAPHORE_REQUEST:
+ sem = &control_buffer->sem_dispatch;
+ break;
+ }
+
+ res = sem_getvalue (sem, sem_value);
+ if (res == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+#else
+ sop.sem_num = sem_id;
+ sop.sem_op = 1;
+ sop.sem_flg = 0;
+
+retry_semctl:
+ sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
+ if (sem_value_hold == -1 && errno == EINTR) {
+ goto retry_semctl;
+ } else
+ if (sem_value_hold == -1) {
+ return (CS_ERR_LIBRARY);
+ }
+ *sem_value = sem_value_hold;
+#endif
+ return (CS_OK);
+}
+
#endif /* COROIPC_IPC_H_DEFINED */
diff --git a/lib/coroipcc.c b/lib/coroipcc.c
index 1942cd1..4ece2dc 100644
--- a/lib/coroipcc.c
+++ b/lib/coroipcc.c
@@ -72,17 +72,8 @@
#include "util.h"
-/*
- * Define sem_wait timeout (real timeout will be (n-1;n) )
- */
-#define IPC_SEMWAIT_TIMEOUT 2
-
struct ipc_instance {
int fd;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- int semid;
-#endif
- int flow_control_state;
struct control_buffer *control_buffer;
char *request_buffer;
char *response_buffer;
@@ -117,6 +108,23 @@ static void socket_nosigpipe(int s)
#define MSG_NOSIGNAL 0
#endif
+static inline int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
+{
+ unsigned int n_read;
+ unsigned int n_write;
+ unsigned int bytes_left;
+
+ n_read = context->control_buffer->read;
+ n_write = context->control_buffer->write;
+
+ if (n_read <= n_write) {
+ bytes_left = context->dispatch_size - n_write + n_read;
+ } else {
+ bytes_left = n_read - n_write;
+ }
+ return (bytes_left);
+}
+
static cs_error_t
socket_send (
int s,
@@ -238,10 +246,10 @@ res_exit:
return (res);
}
-#if _POSIX_THREAD_PROCESS_SHARED < 1
static int
priv_change_send (struct ipc_instance *ipc_instance)
{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
char buf_req;
mar_req_priv_change req_priv_change;
unsigned int res;
@@ -268,19 +276,12 @@ priv_change_send (struct ipc_instance *ipc_instance)
}
ipc_instance->euid = req_priv_change.euid;
+#else
+ ipc_instance = NULL;
+#endif
return (0);
}
-#if defined(_SEM_SEMUN_UNDEFINED)
-union semun {
- int val;
- struct semid_ds *buf;
- unsigned short int *array;
- struct seminfo *__buf;
-};
-#endif
-#endif
-
static int
circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
{
@@ -288,10 +289,11 @@ circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
void *addr_orig;
void *addr;
int32_t res;
- char buffer[128];
int32_t i;
int32_t written;
-
+ char *buffer;
+ long page_size;
+
snprintf (path, PATH_MAX, "/dev/shm/%s", file);
fd = mkstemp (path);
@@ -307,17 +309,25 @@ circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
if (res == -1) {
goto error_close_unlink;
}
- memset (buffer, 0, sizeof (buffer));
- for (i = 0; i < (bytes / 64); i++) {
+
+ page_size = sysconf(_SC_PAGESIZE);
+ buffer = malloc (page_size);
+ if (buffer == NULL) {
+ goto error_close_unlink;
+ }
+ memset (buffer, 0, page_size);
+ for (i = 0; i < (bytes / page_size); i++) {
retry_write:
- written = write (fd, buffer, 64);
+ written = write (fd, buffer, page_size);
if (written == -1 && errno == EINTR) {
goto retry_write;
}
- if (written != 64) {
+ if (written != page_size) {
+ free (buffer);
goto error_close_unlink;
}
}
+ free (buffer);
addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
@@ -386,9 +396,10 @@ memory_map (char *path, const char *file, void **buf, size_t bytes)
void *addr_orig;
void *addr;
int32_t res;
- char buffer[128];
+ char *buffer;
int32_t i;
int32_t written;
+ long page_size;
snprintf (path, PATH_MAX, "/dev/shm/%s", file);
@@ -405,17 +416,24 @@ memory_map (char *path, const char *file, void **buf, size_t bytes)
if (res == -1) {
goto error_close_unlink;
}
- memset (buffer, 0, sizeof (buffer));
- for (i = 0; i < (bytes / 64); i++) {
+ page_size = sysconf(_SC_PAGESIZE);
+ buffer = malloc (page_size);
+ if (buffer == NULL) {
+ goto error_close_unlink;
+ }
+ memset (buffer, 0, page_size);
+ for (i = 0; i < (bytes / page_size); i++) {
retry_write:
- written = write (fd, buffer, 64);
+ written = write (fd, buffer, page_size);
if (written == -1 && errno == EINTR) {
goto retry_write;
}
- if (written != 64) {
+ if (written != page_size) {
+ free (buffer);
goto error_close_unlink;
}
}
+ free (buffer);
addr_orig = mmap (NULL, bytes, PROT_NONE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
@@ -454,10 +472,6 @@ msg_send (
const struct iovec *iov,
unsigned int iov_len)
{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#endif
-
int i;
int res;
int req_buffer_idx = 0;
@@ -473,117 +487,18 @@ msg_send (
req_buffer_idx += iov[i].iov_len;
}
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- res = sem_post (&ipc_instance->control_buffer->sem0);
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
-#else
/*
- * Signal semaphore #0 indicting a new message from client
+ * Signal semaphore #3 and #0 indicting a new message from client
* to server request queue
*/
- sop.sem_num = 0;
- sop.sem_op = 1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (ipc_instance->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_instance);
- goto retry_semop;
- } else
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
-#endif
- return (CS_OK);
-}
-
-inline static cs_error_t
-ipc_sem_wait (
- struct ipc_instance *ipc_instance,
- int sem_num)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
- struct sembuf sop;
-#else
- struct timespec timeout;
- struct pollfd pfd;
- sem_t *sem = NULL;
-#endif
- int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
- switch (sem_num) {
- case 0:
- sem = &ipc_instance->control_buffer->sem0;
- break;
- case 1:
- sem = &ipc_instance->control_buffer->sem1;
- break;
- case 2:
- sem = &ipc_instance->control_buffer->sem2;
- break;
- }
-
-retry_semwait:
- timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
- timeout.tv_nsec = 0;
-
- res = sem_timedwait (sem, &timeout);
- if (res == -1 && errno == ETIMEDOUT) {
- pfd.fd = ipc_instance->fd;
- pfd.events = 0;
-
- res = poll (&pfd, 1, 0);
-
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1) {
- return (CS_ERR_LIBRARY);
- }
-
- if (res == 1) {
- if (pfd.revents == POLLERR || pfd.revents == POLLHUP || pfd.revents == POLLNVAL) {
- return (CS_ERR_LIBRARY);
- }
- }
-
- goto retry_semwait;
- } else
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1) {
+ res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
+ if (res != CS_OK) {
return (CS_ERR_LIBRARY);
}
-#else
- /*
- * Wait for semaphore indicating a new message from server
- * to client in queue
- */
- sop.sem_num = sem_num;
- sop.sem_op = -1;
- sop.sem_flg = 0;
-
-retry_semop:
- res = semop (ipc_instance->semid, &sop, 1);
- if (res == -1 && errno == EINTR) {
- return (CS_ERR_TRY_AGAIN);
- } else
- if (res == -1 && errno == EACCES) {
- priv_change_send (ipc_instance);
- goto retry_semop;
- } else
- if (res == -1) {
+ res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+ if (res != CS_OK) {
return (CS_ERR_LIBRARY);
}
-#endif
return (CS_OK);
}
@@ -594,10 +509,17 @@ reply_receive (
size_t res_len)
{
coroipc_response_header_t *response_header;
- cs_error_t err;
+ cs_error_t res;
- if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
- return (err);
+retry_ipc_sem_wait:
+ res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+ if (res != CS_OK) {
+ if (res == CS_ERR_TRY_AGAIN) {
+ priv_change_send (ipc_instance);
+ goto retry_ipc_sem_wait;
+ } else {
+ return (res);
+ }
}
response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
@@ -614,10 +536,17 @@ reply_receive_in_buf (
struct ipc_instance *ipc_instance,
void **res_msg)
{
- cs_error_t err;
+ cs_error_t res;
- if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
- return (err);
+retry_ipc_sem_wait:
+ res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+ if (res != CS_OK) {
+ if (res == CS_ERR_TRY_AGAIN) {
+ priv_change_send (ipc_instance);
+ goto retry_ipc_sem_wait;
+ } else {
+ return (res);
+ }
}
*res_msg = (char *)ipc_instance->response_buffer;
@@ -737,18 +666,22 @@ coroipcc_service_connect (
}
#if _POSIX_THREAD_PROCESS_SHARED > 0
- sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
- sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
- sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
+ sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
#else
+{
+ int i;
+
/*
* Allocate a semaphore segment
*/
while (1) {
semkey = random();
ipc_instance->euid = geteuid ();
- if ((ipc_instance->semid
- = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
+ if ((ipc_instance->control_buffer->semid
+ = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
break;
}
/*
@@ -764,18 +697,15 @@ coroipcc_service_connect (
}
}
- semun.val = 0;
- sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
- if (sys_res != 0) {
- res = CS_ERR_LIBRARY;
- goto error_exit;
- }
-
- sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
- if (sys_res != 0) {
- res = CS_ERR_LIBRARY;
- goto error_exit;
+ for (i = 0; i < 4; i++) {
+ semun.val = 0;
+ sys_res = semctl (ipc_instance->control_buffer->semid, i, SETVAL, semun);
+ if (sys_res != 0) {
+ res = CS_ERR_LIBRARY;
+ goto error_exit;
+ }
}
+}
#endif
/*
@@ -805,7 +735,6 @@ coroipcc_service_connect (
}
ipc_instance->fd = request_fd;
- ipc_instance->flow_control_state = 0;
if (res_setup.error == CS_ERR_TRY_AGAIN) {
res = res_setup.error;
@@ -825,8 +754,8 @@ coroipcc_service_connect (
error_exit:
#if _POSIX_THREAD_PROCESS_SHARED < 1
- if (ipc_instance->semid > 0)
- semctl (ipc_instance->semid, 0, IPC_RMID);
+ if (ipc_instance->control_buffer->semid > 0)
+ semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
#endif
memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
error_dispatch_buffer:
@@ -876,7 +805,7 @@ coroipcc_dispatch_flow_control_get (
return (res);
}
- *flow_control_state = ipc_instance->flow_control_state;
+ *flow_control_state = ipc_instance->control_buffer->flow_control_enabled;
hdb_handle_put (&ipc_hdb, handle);
return (res);
@@ -911,10 +840,9 @@ coroipcc_dispatch_get (
int poll_events;
char buf;
struct ipc_instance *ipc_instance;
- int res;
- char buf_two = 1;
char *data_addr;
cs_error_t error = CS_OK;
+ int res;
error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
if (error != CS_OK) {
@@ -945,46 +873,21 @@ coroipcc_dispatch_get (
goto error_put;
}
- res = recv (ipc_instance->fd, &buf, 1, 0);
- if (res == -1 && errno == EINTR) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- } else
- if (res == -1) {
- error = CS_ERR_LIBRARY;
- goto error_put;
- } else
- if (res == 0) {
- /* Means that the peer closed cleanly the socket. However, it should
- * happen only on BSD and Darwing systems since poll() returns a
- * POLLHUP event on other systems.
+ error = socket_recv (ipc_instance->fd, &buf, 1);
+ assert (error == CS_OK);
+
+ if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
+ /*
+ * Notify coroipcs to flush any pending dispatch messages
*/
- error = CS_ERR_LIBRARY;
- goto error_put;
- }
- ipc_instance->flow_control_state = 0;
- if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
- ipc_instance->flow_control_state = 1;
- }
- /*
- * Notify executive to flush any pending dispatch messages
- */
- if (ipc_instance->flow_control_state) {
- buf_two = MESSAGE_REQ_OUTQ_FLUSH;
- res = socket_send (ipc_instance->fd, &buf_two, 1);
- assert (res == CS_OK); /* TODO */
- }
- /*
- * This is just a notification of flow control starting at the addition
- * of a new pending message, not a message to dispatch
- */
- if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
- }
- if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
- error = CS_ERR_TRY_AGAIN;
- goto error_put;
+
+ res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+ if (res != CS_OK) {
+ error = CS_ERR_LIBRARY;
+ goto error_put;
+ }
+
+
}
data_addr = ipc_instance->dispatch_buffer;
@@ -1013,8 +916,15 @@ coroipcc_dispatch_put (hdb_handle_t handle)
return (res);
}
- if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
- goto error_exit;
+retry_ipc_sem_wait:
+ res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
+ if (res != CS_OK) {
+ if (res == CS_ERR_TRY_AGAIN) {
+ priv_change_send (ipc_instance);
+ goto retry_ipc_sem_wait;
+ } else {
+ goto error_exit;
+ }
}
addr = ipc_instance->dispatch_buffer;
@@ -1061,8 +971,8 @@ coroipcc_msg_send_reply_receive (
res = reply_receive (ipc_instance, res_msg, res_len);
error_exit:
- hdb_handle_put (&ipc_hdb, handle);
pthread_mutex_unlock (&ipc_instance->mutex);
+ hdb_handle_put (&ipc_hdb, handle);
return (res);
}
diff --git a/man/corosync.conf.5 b/man/corosync.conf.5
index 15cff18..5d3f7b5 100644
--- a/man/corosync.conf.5
+++ b/man/corosync.conf.5
@@ -333,6 +333,12 @@ achieved before starting a new round of membership configuration. The minimum
value for consensus must be 1.2 * token. This value will be automatically
calculated at 1.2 * token if the user doesn't specify a consensus value.
+For two node clusters, a consensus larger then the join timeout but less then
+token is safe. For three node or larger clusters, consensus should be larger
+then token. There is an increasing risk of odd membership changes, which stil
+guarantee virtual synchrony, as node count grows if consensus is less than
+token.
+
The default is 1200 milliseconds.
.TP
--
corosync Debian packaging
More information about the Debian-ha-svn-commits
mailing list