[libanyevent-rabbitmq-perl] 23/151: Fixed a function for close connection.

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:01 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit 7c3e1eba3b0dcda78ae9f8ca85c56b29e6236b68
Author: cooldaemon <cooldaemon at gmail.com>
Date:   Mon Feb 22 16:23:18 2010 +0900

    Fixed a function for close connection.
---
 README                             |  2 +-
 lib/AnyEvent/RabbitMQ.pm           | 88 +++++++++++++++++++++++---------------
 lib/AnyEvent/RabbitMQ/Channel.pm   |  6 +++
 lib/RabbitFoot.pm                  |  2 +-
 lib/RabbitFoot/Cmd/Role/Command.pm | 31 ++++++++++++--
 xt/04_anyevent.t                   |  8 ++++
 6 files changed, 96 insertions(+), 41 deletions(-)

diff --git a/README b/README
index 09c2bb8..05f2035 100644
--- a/README
+++ b/README
@@ -10,7 +10,7 @@ You can use RabbitFoot to -
   * Publish, consume, get, ack and recover messages
   * Select, commit and rollback transactions
 
-RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+RabbitFoot is known to work with RabbitMQ versions 1.7.2 and version 0-8 of the AMQP specification.
 
 INSTALLATION
 
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 46ee654..96a08ff 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -75,7 +75,9 @@ sub connect {
     my $self = shift;
     my %args = $self->_set_cbs(@_);
 
-    $args{timeout} ||= 0;
+    $args{on_close}        ||= sub {};
+    $args{on_read_failure} ||= sub {die @_};
+    $args{timeout}         ||= 0;
 
     if ($self->verbose) {
         print STDERR 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
@@ -95,7 +97,7 @@ sub connect {
                     $args{on_failure}->($message);
                 }
             );
-            $self->_read_loop($args{on_failure});
+            $self->_read_loop($args{on_close}, $args{on_read_failure});
             $self->_start(%args,);
         },
         sub {
@@ -107,7 +109,7 @@ sub connect {
 }
 
 sub _read_loop {
-    my ($self, $failure_cb,) = @_;
+    my ($self, $close_cb, $failure_cb,) = @_;
 
     return if !$self->_handle;
 
@@ -115,8 +117,8 @@ sub _read_loop {
         my $data = $_[1];
         my $stack = $_[1];
 
-        if (length($data) <= 0) {
-            $failure_cb->('Disconnect');
+        if (length($data) <= 7) {
+            $failure_cb->('Broken data was received');
             @_ = ($self, $failure_cb,);
             goto &_read_loop;
         }
@@ -137,15 +139,25 @@ sub _read_loop {
                 print STDERR '-----------', "\n";
             }
 
-            return if !$self->_check_close_and_clean($frame, $failure_cb,);
-
             my $id = $frame->channel;
+            return if !$self->_check_close_and_clean(
+                $frame, $close_cb, $failure_cb, $id,
+            );
+
             if (0 == $id) {
+                return if !$self->_check_close_and_clean($frame, $close_cb, $id,);
                 $self->_queue->push($frame);
-            } elsif ($self->get_channel($id)) {
-                $self->get_channel($id)->_push_queue_or_consume($frame, $failure_cb);
             } else {
-                $failure_cb->('Unknown channel id: ' . $frame->channel);
+                my $channel = $self->get_channel($id);
+                if ($channel) {
+                    if (
+                        $self->_check_channel_close_and_clean($frame, $id, $channel)
+                    ) {
+                        $channel->_push_queue_or_consume($frame, $failure_cb);
+                    }
+                } else {
+                    $failure_cb->('Unknown channel id: ' . $frame->channel);
+                }
             }
 
             @_ = ($self, $failure_cb,);
@@ -157,39 +169,33 @@ sub _read_loop {
 }
 
 sub _check_close_and_clean {
-    my ($self, $frame, $failure_cb, $id,) = @_;
+    my ($self, $frame, $close_cb, $id,) = @_;
 
     return 1 if !$frame->isa('Net::AMQP::Frame::Method');
 
     my $method_frame = $frame->method_frame;
+    return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
 
-    if ($method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
-        $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
-        $self->{_is_open} = 0;
-        $self->_disconnect();
-        $failure_cb->(
-            $method_frame->reply_code . ' ' . $method_frame->reply_text
-        );
-        return;
-    } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
-        $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
+    $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
+    $self->{_is_open} = 0;
+    $self->_disconnect();
+    $close_cb->($frame);
+    return;
+}
 
-        my $id = $frame->channel;
-        my $message = $method_frame->reply_code . ' ' . $method_frame->reply_text;
+sub _check_channel_close_and_clean {
+    my ($self, $frame, $id, $channel,) = @_;
 
-        my $channel = $self->get_channel($id);
-        if (!$channel) {
-            $failure_cb->("Unknown channel id: ${id}\n${message}");
-            return;
-        }
+    return 1 if !$frame->isa('Net::AMQP::Frame::Method');
 
-        $channel->{_is_open} = 0;
-        $self->delete_channel($id);
-        $failure_cb->($message);
-        return;
-    }
+    my $method_frame = $frame->method_frame;
+    return 1 if !$method_frame->isa('Net::AMQP::Protocol::Channel::Close');
 
-    return 1;
+    $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
+    $channel->{_is_open} = 0;
+    $channel->on_close->($frame);
+    $self->delete_channel($id);
+    return;
 }
 
 sub _start {
@@ -352,6 +358,8 @@ sub open_channel {
     my $self = shift;
     my %args = $self->_set_cbs(@_);
 
+    $args{on_close} ||= sub {};
+
     my $id = $args{id};
     return $args{on_failure}->("Channel id $id is already in use")
         if $id && $self->get_channel($id);
@@ -368,6 +376,7 @@ sub open_channel {
     my $channel = AnyEvent::RabbitMQ::Channel->new(
         id         => $id,
         connection => $self,
+        on_close   => $args{on_close},
     );
 
     $self->set_channel($id => $channel);
@@ -518,9 +527,18 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
                   );
               },
               on_failure => $cv,
+              on_close   => sub {
+                  my $method_frame = shift->method_frame;
+                  die $method_frame->reply_code, $method_frame->reply_text;
+              }
           );
       },
       on_failure => $cv,
+      on_read_failure => sub {die @_},
+      on_close   => sub {
+          my $method_frame = shift->method_frame;
+          die $method_frame->reply_code, $method_frame->reply_text;
+      },
   );
 
   print $cv->recv, "\n";
@@ -537,7 +555,7 @@ You can use AnyEvent::RabbitMQ to -
   * Publish, consume, get, ack and recover messages
   * Select, commit and rollback transactions
 
-AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 1.7.2 and version 0-8 of the AMQP specification.
 
 =head1 AUTHOR
 
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index fe34511..79c4188 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -19,6 +19,12 @@ has connection => (
     weak_ref => 1,
 );
 
+has on_close => (
+#   isa      => 'CodeRef',
+    is       => 'rw',
+    required => 1,
+);
+
 has _is_open => (
     isa     => 'Bool',
     is      => 'rw',
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index 35a160c..ee86a9c 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -101,7 +101,7 @@ You can use RabbitFoot to -
   * Publish, consume, get, ack and recover messages
   * Select, commit and rollback transactions
 
-RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+RabbitFoot is known to work with RabbitMQ versions 1.7.2 and version 0-8 of the AMQP specification.
 
 =head1 AUTHOR
 
diff --git a/lib/RabbitFoot/Cmd/Role/Command.pm b/lib/RabbitFoot/Cmd/Role/Command.pm
index c6ca24a..e6b1eb7 100644
--- a/lib/RabbitFoot/Cmd/Role/Command.pm
+++ b/lib/RabbitFoot/Cmd/Role/Command.pm
@@ -1,6 +1,7 @@
 package RabbitFoot::Cmd::Role::Command;
 
 use FindBin;
+use Coro;
 use RabbitFoot;
 
 use Moose::Role;
@@ -114,16 +115,38 @@ sub execute {
     my $self = shift;
     my ($opt, $args,) = @_;
 
-    my $ch = RabbitFoot->new(
+    my $rf = RabbitFoot->new(
         verbose => $self->verbose,
     )->load_xml_spec(
         $self->spec,
     )->connect(
-        timeout => 5,
-        (map {$_ => $self->$_} qw(host port user pass vhost))
-    )->open_channel();
+        (map {$_ => $self->$_} qw(host port user pass vhost)),
+        timeout  => 5,
+        on_close => unblock_sub {
+            $self->_close(shift);
+            exit; # FIXME
+        },
+    );
+
+    my $ch = $rf->open_channel(
+        on_close => unblock_sub {
+            $self->_close(shift);
+            $rf->close;
+            exit;
+        },
+    );
 
     $self->_run($ch, @_,);
+
+    $ch->close;
+    $rf->close;
+    return;
+}
+
+sub _close {
+    my $self = shift;
+    my $method_frame = shift->method_frame;
+    print $method_frame->reply_code, ' ', $method_frame->reply_text, "\n";
     return;
 }
 
diff --git a/xt/04_anyevent.t b/xt/04_anyevent.t
index c8c6370..5cb0749 100644
--- a/xt/04_anyevent.t
+++ b/xt/04_anyevent.t
@@ -41,6 +41,10 @@ $ar->connect(
         $done->send;
     },
     on_failure => failure_cb($done),
+    on_close   => sub {
+        my $method_frame = shift->method_frame;
+        die $method_frame->reply_code, $method_frame->reply_text;
+    },
 );
 $done->recv;
 
@@ -53,6 +57,10 @@ $ar->open_channel(
         $done->send;
     },
     on_failure => failure_cb($done),
+    on_close   => sub {
+        my $method_frame = shift->method_frame;
+        die $method_frame->reply_code, $method_frame->reply_text;
+    },
 );
 $done->recv;
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list