[SCM] libmessage-passing-perl Debian packaging branch, master, updated. debian/0.111-3-14-g44f6e88

Tomas Doran bobtfish at bobtfish.net
Mon May 6 11:57:06 UTC 2013


The following commit has been merged in the master branch:
commit bad8fbdf5c390c22c7ac87f421687ada6539993c
Author: Tomas Doran <bobtfish at bobtfish.net>
Date:   Fri Jun 1 10:01:21 2012 +0100

    Get reconnect working

diff --git a/Changes b/Changes
index 3fd03b8..f3d6406 100644
--- a/Changes
+++ b/Changes
@@ -1,3 +1,6 @@
+   - Get connection timeouts and connection reconnects
+     working in the generic ConnectionManager role.
+
    - Add link to syslog input
 
    - AMQP input/output is on CPAN
diff --git a/README b/README
index 1b91938..fbdc21d 100644
--- a/README
+++ b/README
@@ -64,6 +64,9 @@ COMPONENTS
 
     Message::Passing::Input::STDIN
     Message::Passing::Input::ZeroMQ
+    Message::Passing::Input::STOMP
+    Message::Passing::Input::AMQP
+    Message::Passing::Input::Syslog
     Message::Passing::Input::Test
 
     You can easily write your own input, just use AnyEvent, and consume
@@ -90,8 +93,8 @@ COMPONENTS
     Outputs send data to somewhere, i.e. they consume messages.
 
     Message::Passing::Output::STDOUT
-    Message::Passing::Output::AMQP - COMING SOON
-    (<https://github.com/suretec/Message-Passing-AMQP>)
+    Message::Passing::Output::AMQP
+    Message::Passing::Output::STOMP
     Message::Passing::Output::ZeroMQ
     Message::Passing::Output::WebHooks
     Message::Passing::Output::ElasticSearch - COMING SOON
diff --git a/lib/Message/Passing/Role/ConnectionManager.pm b/lib/Message/Passing/Role/ConnectionManager.pm
index 9ed8224..7a55e88 100644
--- a/lib/Message/Passing/Role/ConnectionManager.pm
+++ b/lib/Message/Passing/Role/ConnectionManager.pm
@@ -11,11 +11,17 @@ sub BUILD {
 }
 
 has timeout => (
-    isa => 'Int',
+    isa => 'Num',
     is => 'ro',
     default => sub { 30 },
 );
 
+has reconnect_after => (
+    isa => 'Num',
+    is => 'ro',
+    default => sub { 2 },
+);
+
 has _timeout_timer => (
     is => 'rw',
 );
@@ -30,19 +36,48 @@ has connected => (
 has connection => (
     is => 'ro',
     lazy => 1,
+    predicate => '_has_connection',
     builder => '_build_connection',
-    clearer => '_clear_connection'
+    clearer => '_clear_connection',
 );
 
 after _build_connection => sub {
     my $self = shift;
     weaken($self);
-    $self->_timeout_timer(AnyEvent->timer(
+    $self->_timeout_timer($self->_build_timeout_timer);
+};
+
+sub _build_timeout_timer {
+    my $self = shift;
+    weaken($self);
+    AnyEvent->timer(
         after => $self->timeout,
         cb => sub {
-            $self->_set_connected(0);
+            $self->_timeout_timer(undef);
+            warn "TIMEOUT";
+            $self->_set_connected(0); # Use public API, causing reconnect timer to be built
+        },
+    );
+}
+
+sub _build_reconnect_timer {
+    my $self = shift;
+    weaken($self);
+    warn "Build reconnect timer";
+    AnyEvent->timer(
+        after => $self->reconnect_after,
+        cb => sub {
+            warn "Am reconnecting";
+            $self->_timeout_timer(undef);
+            $self->connection; # Just rebuild the connection object
         },
-    ));
+    );
+}
+
+before _clear_connection => sub {
+    my $self = shift;
+    return unless $self->_has_connection;
+    $self->_timeout_timer($self->_build_reconnect_timer);
 };
 
 has _connect_subscribers => (
@@ -77,6 +112,7 @@ after _set_connected => sub {
     foreach my $sub (@{$self->_connect_subscribers}) {
         $sub->$method($self->connection) if $sub->can($method);
     }
+    $self->_timeout_timer(undef) if $connected;
     $self->_clear_connection unless $connected;
 };
 
diff --git a/t/role_connectionmanager.t b/t/role_connectionmanager.t
index 699beda..80acd46 100644
--- a/t/role_connectionmanager.t
+++ b/t/role_connectionmanager.t
@@ -34,7 +34,11 @@ use AnyEvent;
     with 'Message::Passing::Role::ConnectionManager';
 
     has '+timeout' => (
-        default => sub { 0 },
+        default => sub { 0.1 },
+    );
+
+    has '+reconnect_after' => (
+        default => sub { 0.1 },
     );
 
     sub _build_connection {
@@ -73,15 +77,37 @@ ok $sub2->{am_connected};
 is_deeply $i->_connect_subscribers, [$sub2];
 ok !$sub;
 
+# Test connectiomn timeout
 $i = My::Connection::Wrapper->new;
 my $cv = AnyEvent->condvar;
 my $t; $t = AnyEvent->timer(
-    after => 0.1,
+    after => 0.11,
     cb => sub { $cv->send },
 );
 ok $i->{connection};
 $cv->recv;
 ok !$i->{connection};
 
+# Test reconnect
+$cv = AnyEvent->condvar;
+$t; $t = AnyEvent->timer(
+    after => 0.11,
+    cb => sub { $cv->send },
+);
+$cv->recv;
+$i->_set_connected(1);
+ok $i->{connection};
+my ($c, $d) = (0,0);
+My::Connection::Wrapper->meta->add_before_method_modifier('_build_timeout_timer', sub { $c++ });
+My::Connection::Wrapper->meta->add_before_method_modifier('_build_reconnect_timer', sub { $d++ });
+$cv = AnyEvent->condvar;
+my $t; $t = AnyEvent->timer(
+    after => 0.5,
+    cb => sub { $cv->send },
+);
+$cv->recv;
+is $c, 0;
+is $d, 0;
+
 done_testing;
 

-- 
libmessage-passing-perl Debian packaging



More information about the Pkg-perl-cvs-commits mailing list