[SCM] libmessage-passing-perl Debian packaging branch, master, updated. debian/0.111-3-14-g44f6e88
Tomas Doran
bobtfish at bobtfish.net
Mon May 6 11:57:06 UTC 2013
The following commit has been merged in the master branch:
commit bad8fbdf5c390c22c7ac87f421687ada6539993c
Author: Tomas Doran <bobtfish at bobtfish.net>
Date: Fri Jun 1 10:01:21 2012 +0100
Get reconnect working
diff --git a/Changes b/Changes
index 3fd03b8..f3d6406 100644
--- a/Changes
+++ b/Changes
@@ -1,3 +1,6 @@
+ - Get connection timeouts and connection reconnects
+ working in the generic ConnectionManager role.
+
- Add link to syslog input
- AMQP input/output is on CPAN
diff --git a/README b/README
index 1b91938..fbdc21d 100644
--- a/README
+++ b/README
@@ -64,6 +64,9 @@ COMPONENTS
Message::Passing::Input::STDIN
Message::Passing::Input::ZeroMQ
+ Message::Passing::Input::STOMP
+ Message::Passing::Input::AMQP
+ Message::Passing::Input::Syslog
Message::Passing::Input::Test
You can easily write your own input, just use AnyEvent, and consume
@@ -90,8 +93,8 @@ COMPONENTS
Outputs send data to somewhere, i.e. they consume messages.
Message::Passing::Output::STDOUT
- Message::Passing::Output::AMQP - COMING SOON
- (<https://github.com/suretec/Message-Passing-AMQP>)
+ Message::Passing::Output::AMQP
+ Message::Passing::Output::STOMP
Message::Passing::Output::ZeroMQ
Message::Passing::Output::WebHooks
Message::Passing::Output::ElasticSearch - COMING SOON
diff --git a/lib/Message/Passing/Role/ConnectionManager.pm b/lib/Message/Passing/Role/ConnectionManager.pm
index 9ed8224..7a55e88 100644
--- a/lib/Message/Passing/Role/ConnectionManager.pm
+++ b/lib/Message/Passing/Role/ConnectionManager.pm
@@ -11,11 +11,17 @@ sub BUILD {
}
has timeout => (
- isa => 'Int',
+ isa => 'Num',
is => 'ro',
default => sub { 30 },
);
+has reconnect_after => (
+ isa => 'Num',
+ is => 'ro',
+ default => sub { 2 },
+);
+
has _timeout_timer => (
is => 'rw',
);
@@ -30,19 +36,48 @@ has connected => (
has connection => (
is => 'ro',
lazy => 1,
+ predicate => '_has_connection',
builder => '_build_connection',
- clearer => '_clear_connection'
+ clearer => '_clear_connection',
);
after _build_connection => sub {
my $self = shift;
weaken($self);
- $self->_timeout_timer(AnyEvent->timer(
+ $self->_timeout_timer($self->_build_timeout_timer);
+};
+
+sub _build_timeout_timer {
+ my $self = shift;
+ weaken($self);
+ AnyEvent->timer(
after => $self->timeout,
cb => sub {
- $self->_set_connected(0);
+ $self->_timeout_timer(undef);
+ warn "TIMEOUT";
+ $self->_set_connected(0); # Use public API, causing reconnect timer to be built
+ },
+ );
+}
+
+sub _build_reconnect_timer {
+ my $self = shift;
+ weaken($self);
+ warn "Build reconnect timer";
+ AnyEvent->timer(
+ after => $self->reconnect_after,
+ cb => sub {
+ warn "Am reconnecting";
+ $self->_timeout_timer(undef);
+ $self->connection; # Just rebuild the connection object
},
- ));
+ );
+}
+
+before _clear_connection => sub {
+ my $self = shift;
+ return unless $self->_has_connection;
+ $self->_timeout_timer($self->_build_reconnect_timer);
};
has _connect_subscribers => (
@@ -77,6 +112,7 @@ after _set_connected => sub {
foreach my $sub (@{$self->_connect_subscribers}) {
$sub->$method($self->connection) if $sub->can($method);
}
+ $self->_timeout_timer(undef) if $connected;
$self->_clear_connection unless $connected;
};
diff --git a/t/role_connectionmanager.t b/t/role_connectionmanager.t
index 699beda..80acd46 100644
--- a/t/role_connectionmanager.t
+++ b/t/role_connectionmanager.t
@@ -34,7 +34,11 @@ use AnyEvent;
with 'Message::Passing::Role::ConnectionManager';
has '+timeout' => (
- default => sub { 0 },
+ default => sub { 0.1 },
+ );
+
+ has '+reconnect_after' => (
+ default => sub { 0.1 },
);
sub _build_connection {
@@ -73,15 +77,37 @@ ok $sub2->{am_connected};
is_deeply $i->_connect_subscribers, [$sub2];
ok !$sub;
+# Test connectiomn timeout
$i = My::Connection::Wrapper->new;
my $cv = AnyEvent->condvar;
my $t; $t = AnyEvent->timer(
- after => 0.1,
+ after => 0.11,
cb => sub { $cv->send },
);
ok $i->{connection};
$cv->recv;
ok !$i->{connection};
+# Test reconnect
+$cv = AnyEvent->condvar;
+$t; $t = AnyEvent->timer(
+ after => 0.11,
+ cb => sub { $cv->send },
+);
+$cv->recv;
+$i->_set_connected(1);
+ok $i->{connection};
+my ($c, $d) = (0,0);
+My::Connection::Wrapper->meta->add_before_method_modifier('_build_timeout_timer', sub { $c++ });
+My::Connection::Wrapper->meta->add_before_method_modifier('_build_reconnect_timer', sub { $d++ });
+$cv = AnyEvent->condvar;
+my $t; $t = AnyEvent->timer(
+ after => 0.5,
+ cb => sub { $cv->send },
+);
+$cv->recv;
+is $c, 0;
+is $d, 0;
+
done_testing;
--
libmessage-passing-perl Debian packaging
More information about the Pkg-perl-cvs-commits
mailing list