[libanyevent-rabbitmq-perl] 36/151: Supported Channel.Flow method.

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:02 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit e3205d1a86a717d262e9d4fb68d84f9034cca121
Author: cooldaemon <cooldaemon at gmail.com>
Date:   Sun Mar 7 15:30:38 2010 +0900

    Supported Channel.Flow method.
---
 Changes                          |  4 ++
 lib/AnyEvent/RabbitMQ.pm         | 38 ++++++++++++++----
 lib/AnyEvent/RabbitMQ/Channel.pm | 83 ++++++++++++++++++++++++++++++++++++----
 xt/01_podspell.t                 |  1 +
 4 files changed, 111 insertions(+), 15 deletions(-)

diff --git a/Changes b/Changes
index 74ce10b..53bef29 100644
--- a/Changes
+++ b/Changes
@@ -1,5 +1,9 @@
 Revision history for Perl extension RabbitFoot
 
+1.01    Sun Mar  7 15:28:46 2010
+        - fix bugs.
+        - support channel.flow.
+
 1.00    Fri Mar  5 11:30:00 2010
         - fix module name.
 
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index ede6a25..53c205b 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -15,7 +15,7 @@ use Net::AMQP::Common qw(:all);
 use AnyEvent::RabbitMQ::Channel;
 use AnyEvent::RabbitMQ::LocalQueue;
 
-our $VERSION = '1.00';
+our $VERSION = '1.01';
 
 sub new {
     my $class = shift;
@@ -49,6 +49,11 @@ sub connect {
     my $self = shift;
     my %args = $self->_set_cbs(@_);
 
+    if ($self->{_is_open}) {
+        $args{on_failure}->('Connection has already been opened');
+        return $self;
+    }
+
     $args{on_close}        ||= sub {};
     $args{on_read_failure} ||= sub {die @_};
     $args{timeout}         ||= 0;
@@ -183,7 +188,7 @@ sub _start {
                         platform    => 'Perl',
                         product     => __PACKAGE__,
                         information => 'http://d.hatena.ne.jp/cooldaemon/',
-                        version     => '0.01',
+                        version     => '1.01',
                     },
                     mechanism => 'AMQPLAIN',
                     response => {
@@ -253,6 +258,8 @@ sub close {
     my $self = shift;
     my %args = $self->_set_cbs(@_);
 
+    return $self if !$self->{_is_open};
+
     my $close_cb = sub {
         $self->_close(
             sub {
@@ -312,19 +319,26 @@ sub open_channel {
     my $self = shift;
     my %args = $self->_set_cbs(@_);
 
+    return $self if !$self->_check_open($args{on_failure});
+
     $args{on_close} ||= sub {};
 
     my $id = $args{id};
-    return $args{on_failure}->("Channel id $id is already in use")
-        if $id && $self->{_channels}->{$id};
+    if ($id && $self->{_channels}->{$id}) {
+        $args{on_failure}->("Channel id $id is already in use");
+        return $self;
+    }
 
     if (!$id) {
-        for my $candidate_id (1 .. (2**16 - 1)) { # FIXME
+        for my $candidate_id (1 .. (2**16 - 1)) {
             next if defined $self->{_channels}->{$candidate_id};
             $id = $candidate_id;
             last;
         }
-        return $args{on_failure}->('Ran out of channel ids') if !$id;
+        if (!$id) {
+            $args{on_failure}->('Ran out of channel ids');
+            return $self;
+        }
     }
 
     my $channel = AnyEvent::RabbitMQ::Channel->new(
@@ -423,6 +437,16 @@ sub _set_cbs {
     return %args;
 }
 
+sub _check_open {
+    my $self = shift;
+    my ($failure_cb) = @_;
+
+    return 1 if $self->{_is_open};
+
+    $failure_cb->('Connection has already been closed');
+    return 0;
+}
+
 sub DESTROY {
     my $self = shift;
     $self->close();
@@ -482,7 +506,7 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
 
 =head1 DESCRIPTION
 
-AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
+AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.
 
 You can use AnyEvent::RabbitMQ to -
 
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index fbcb21d..a37cbc7 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -6,13 +6,14 @@ use warnings;
 use Scalar::Util qw(weaken);
 use AnyEvent::RabbitMQ::LocalQueue;
 
-our $VERSION = '1.00';
+our $VERSION = '1.01';
 
 sub new {
     my $class = shift;
     my $self = bless {
         @_, # id, connection, on_close
         _is_open       => 0,
+        _is_active     => 0,
         _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
         _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
         _consumer_cbs  => {},
@@ -31,13 +32,19 @@ sub open {
     my $self = shift;
     my %args = @_;
 
+    if ($self->{_is_open}) {
+        $args{on_failure}->('Channel has already been opened');
+        return $self;
+    }
+
     $self->{connection}->_push_write_and_read(
         'Channel::Open', {}, 'Channel::OpenOk',
         sub {
-            $self->{_is_open} = 1;
+            $self->{_is_open}   = 1;
+            $self->{_is_active} = 1;
             $args{on_success}->();
         },
-        $args{on_failur},
+        $args{on_failure},
         $self->{id},
     );
 
@@ -76,13 +83,15 @@ sub _close {
         'Channel::Close', {}, 'Channel::CloseOk',
         sub {
             $self->{_is_open} = 0;
+            $self->{_is_active} = 0;
             $self->{connection}->delete_channel($self->{id});
             $args{on_success}->();
         },
         sub {
             $self->{_is_open} = 0;
+            $self->{_is_active} = 0;
             $self->{connection}->delete_channel($self->{id});
-            $args{on_failur}->();
+            $args{on_failure}->();
         },
         $self->{id},
     );
@@ -94,6 +103,8 @@ sub declare_exchange {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Exchange::Declare',
         {
@@ -119,6 +130,8 @@ sub delete_exchange {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Exchange::Delete',
         {
@@ -140,6 +153,8 @@ sub declare_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Queue::Declare',
         {
@@ -164,6 +179,8 @@ sub bind_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Queue::Bind',
         {
@@ -184,6 +201,8 @@ sub unbind_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Queue::Unbind',
         {
@@ -203,6 +222,8 @@ sub purge_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Queue::Purge',
         {
@@ -223,6 +244,8 @@ sub delete_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Queue::Delete',
         {
@@ -245,6 +268,8 @@ sub publish {
     my $self = shift;
     my %args = @_;
 
+    return $self if !$self->{_is_active};
+
     my $header_args = delete $args{header}    || {};
     my $body        = delete $args{body}      || '';
     my $return_cb   = delete $args{on_return} || sub {};
@@ -330,6 +355,8 @@ sub consume {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     my $consumer_cb = delete $args{on_consume} || sub {};
     
     $self->{connection}->_push_write_and_read(
@@ -362,11 +389,17 @@ sub cancel {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    return $failure_cb->('consumer_tag is not set')
-        if !defined $args{consumer_tag};
+    return $self if !$self->_check_open($failure_cb);
 
-    return $failure_cb->('Unknown consumer_tag')
-        if !$self->{_consumer_cbs}->{$args{consumer_tag}};
+    if (!defined $args{consumer_tag}) {
+        $failure_cb->('consumer_tag is not set');
+        return $self;
+    }
+
+    if (!$self->{_consumer_cbs}->{$args{consumer_tag}}) {
+        $failure_cb->('Unknown consumer_tag');
+        return $self;
+    }
 
     $self->{connection}->_push_write_and_read(
         'Basic::Cancel',
@@ -391,6 +424,8 @@ sub get {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Basic::Get',
         {
@@ -416,6 +451,8 @@ sub ack {
     my $self = shift;
     my %args = @_;
 
+    return $self if !$self->_check_open(sub {});
+
     $self->{connection}->_push_write(
         Net::AMQP::Protocol::Basic::Ack->new(
             delivery_tag => 0,
@@ -434,6 +471,8 @@ sub qos {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Basic::Qos',
         {
@@ -455,6 +494,8 @@ sub recover {
     my $self = shift;
     my %args = @_;
 
+    return $self if !$self->_check_open(sub {});
+
     $self->{connection}->_push_write(
         Net::AMQP::Protocol::Basic::Recover->new(
             requeue => 0,
@@ -470,6 +511,8 @@ sub select_tx {
     my $self = shift;
     my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Tx::Select', {}, 'Tx::SelectOk',
         $cb,
@@ -484,6 +527,8 @@ sub commit_tx {
     my $self = shift;
     my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Tx::Commit', {}, 'Tx::CommitOk',
         $cb,
@@ -498,6 +543,8 @@ sub rollback_tx {
     my $self = shift;
     my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
+    return $self if !$self->_check_open($failure_cb);
+
     $self->{connection}->_push_write_and_read(
         'Tx::Rollback', {}, 'Tx::RollbackOk',
         $cb,
@@ -520,6 +567,7 @@ sub push_queue_or_consume {
                 $self->{id},
             );
             $self->{_is_open} = 0;
+            $self->{_is_active} = 0;
             $self->{connection}->delete_channel($self->{id});
             $self->{on_close}->($frame);
             return $self;
@@ -535,6 +583,15 @@ sub push_queue_or_consume {
             } || sub {};
             $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
             return $self;
+        } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
+            $self->{_is_active} = $method_frame->active;
+            $self->{connection}->_push_write(
+                Net::AMQP::Protocol::Channel::FlowOk->new(
+                    active => $method_frame->active,
+                ),
+                $self->{id},
+            );
+            return $self;
         }
         $self->{_queue}->push($frame);
     } else {
@@ -587,6 +644,16 @@ sub _delete_cbs {
     return $cb, $failure_cb, %args;
 }
 
+sub _check_open {
+    my $self = shift;
+    my ($failure_cb) = @_;
+
+    return 1 if $self->{_is_open};
+
+    $failure_cb->('Channel has already been closed');
+    return 0;
+}
+
 sub DESTROY {
     my $self = shift;
     $self->close();
diff --git a/xt/01_podspell.t b/xt/01_podspell.t
index babf41f..d4104da 100644
--- a/xt/01_podspell.t
+++ b/xt/01_podspell.t
@@ -11,3 +11,4 @@ cooldaemon at gmail.com
 RabbitFoot
 AMQP
 RabbitMQ
+multi

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list