[libmessage-passing-zeromq-perl] 50/78: More updates - stress test and HWM
Jonas Smedegaard
js at alioth.debian.org
Mon Sep 30 09:28:26 UTC 2013
This is an automated email from the git hooks/post-receive script.
js pushed a commit to branch master
in repository libmessage-passing-zeromq-perl.
commit 3c06284e76ab14e0116b971730a4ae4d761be644
Author: Tomas Doran <bobtfish at bobtfish.net>
Date: Sun Jun 10 13:09:10 2012 +0100
More updates - stress test and HWM
---
Changes | 6 ++
lib/Message/Passing/Input/ZeroMQ.pm | 3 +-
lib/Message/Passing/Output/ZeroMQ.pm | 5 +-
lib/Message/Passing/ZeroMQ/Role/HasASocket.pm | 34 ++++++------
t/stress.t | 73 +++++++++++++++++++++++++
5 files changed, 100 insertions(+), 21 deletions(-)
diff --git a/Changes b/Changes
index 10cd476..4f2b799 100644
--- a/Changes
+++ b/Changes
@@ -1,5 +1,11 @@
- Improve documentation.
+ - Fix incorrect docs to do with linger option
+
+ - Add ability to override default high water marks on sockets.
+
+ - Increase default high water mark for the output
+
0.004
- Changes to match up with Message::Passing 0.006
diff --git a/lib/Message/Passing/Input/ZeroMQ.pm b/lib/Message/Passing/Input/ZeroMQ.pm
index bc96287..df01350 100644
--- a/lib/Message/Passing/Input/ZeroMQ.pm
+++ b/lib/Message/Passing/Input/ZeroMQ.pm
@@ -19,10 +19,11 @@ has '+_socket' => (
sub _socket_type { 'SUB' }
+sub _build_socket_hwm { 100000 }
+
after setsockopt => sub {
my ($self, $socket) = @_;
$socket->setsockopt(ZMQ_SUBSCRIBE, '');
- $socket->setsockopt(ZMQ_HWM, 100000); # Buffer up to 100k messages.
};
sub _try_rx {
diff --git a/lib/Message/Passing/Output/ZeroMQ.pm b/lib/Message/Passing/Output/ZeroMQ.pm
index 5e22eb7..d5d1a1b 100644
--- a/lib/Message/Passing/Output/ZeroMQ.pm
+++ b/lib/Message/Passing/Output/ZeroMQ.pm
@@ -13,10 +13,7 @@ has '+_socket' => (
sub _socket_type { 'PUB' }
-after setsockopt => sub {
- my ($self, $socket) = @_;
- $socket->setsockopt(ZMQ_HWM, 1000); # Buffer up to 100k messages.
-};
+sub _build_socket_hwm { 10000 }
sub consume {
my $self = shift;
diff --git a/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm b/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm
index 222d07f..63cd1d4 100644
--- a/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm
+++ b/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm
@@ -49,7 +49,16 @@ sub _build_socket {
$socket;
}
-sub setsockopt {}
+has socket_hwm => (
+ is => 'ro',
+ isa => 'Int',
+ builder => '_build_socket_hwm',
+);
+
+sub setsockopt {
+ my ($self, $socket) = @_;
+ $socket->setsockopt(ZMQ_HWM, $self->socket_hwm);
+};
has socket_bind => (
is => 'ro',
@@ -122,24 +131,17 @@ The pair of PUSH, receives a proportion of messages distributed.
Bool indicating the value of the ZMQ_LINGER options.
-Defaults to 0 meaning sockets are lossy, but will not block.
-
-=head3 linger off (default)
-
-Sending messages will be buffered on the client side up to the set
-buffer for this connection. Further messages will be dropped until
-the buffer starts to empty.
+Defaults to 0 meaning sockets will not block on shutdown if a server
+is unavailable (i.e. queued messages will be discarded).
-Receiving messages will be buffered by ZeroMQ for you until you're
-ready to receive them, after which they will be discarded.
+=head3 socket_hwm
-=head3 linger off
+Set the High Water Mark for the socket. Depending on the socket type,
+messages are likely to be discarded once this high water mark is exceeded
+(i.e. there are more than this many messages buffered).
-Sending messages will be be buffered on the client side up to the set
-buffer for this connection. If this buffer fills, then ZeroMQ will block
-the program which was trying to send the message. If the client quits
-before all messages were sent, ZeroMQ will block exit until they have been
-sent.
+A value of 0 disables the high water mark, meaning that messages will be
+buffered until RAM runs out.
=head1 METHODS
diff --git a/t/stress.t b/t/stress.t
new file mode 100644
index 0000000..12a8da5
--- /dev/null
+++ b/t/stress.t
@@ -0,0 +1,73 @@
+use strict;
+use warnings;
+use Test::More;
+use AnyEvent;
+use JSON qw/ encode_json /;
+use Message::Passing::Input::ZeroMQ;
+use Message::Passing::Output::Test;
+use Message::Passing::Output::ZeroMQ;
+use Message::Passing::Filter::Decoder::JSON;
+use Time::HiRes qw( gettimeofday tv_interval );
+
+my $parent = $$;
+
+our $ITRS = 100000;
+
+sub _receiver_child {
+ my $i = 0;
+ my $cv = AnyEvent->condvar;
+ my $input = Message::Passing::Input::ZeroMQ->new(
+ socket_bind => 'tcp://*:5558',
+ socket_hwm => 0,
+ output_to => Message::Passing::Filter::Decoder::JSON->new(output_to => Message::Passing::Output::Test->new(
+ cb => sub {
+ $i++;
+ $cv->send if $i > $::ITRS;
+ },
+ )),
+ );
+ $cv->recv;
+ exit 0;
+}
+
+sub _sender_child {
+ my $output = Message::Passing::Output::ZeroMQ->new(
+ connect => 'tcp://127.0.0.1:5558',
+ linger => 1,
+ socket_hwm => 0,
+ );
+
+ my $run = sub {
+ $output->consume(encode_json {foo => 'bar'}) for 0..$::ITRS;
+ exit 0;
+ };
+ local $SIG{USR1} = sub { goto $run };
+ while (1) { sleep 1 }
+};
+
+my $receiver_pid = fork;
+if ($receiver_pid) { # Parent
+}
+else { # Child
+ _receiver_child();
+}
+
+my $sender_pid = fork;
+if ($sender_pid) { #Parent
+}
+else { # Child
+ _sender_child();
+}
+sleep 2; # Wait for children to spin up.
+my $t0 = [gettimeofday];
+#print "the code took:",timestr($td),"\n";
+kill('USR1', $sender_pid);
+
+is waitpid($sender_pid, 0), $sender_pid;
+is waitpid($receiver_pid, 0), $receiver_pid;
+my $elapsed = tv_interval ( $t0, [gettimeofday]);
+diag "Took " . $elapsed . "s for " . $ITRS . " iterations";
+diag $ITRS/$elapsed . " messages per second";
+
+done_testing;
+
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libmessage-passing-zeromq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list