[libanyevent-rabbitmq-perl] 36/151: Supported Channel.Flow method.
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:02 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit e3205d1a86a717d262e9d4fb68d84f9034cca121
Author: cooldaemon <cooldaemon at gmail.com>
Date: Sun Mar 7 15:30:38 2010 +0900
Supported Channel.Flow method.
---
Changes | 4 ++
lib/AnyEvent/RabbitMQ.pm | 38 ++++++++++++++----
lib/AnyEvent/RabbitMQ/Channel.pm | 83 ++++++++++++++++++++++++++++++++++++----
xt/01_podspell.t | 1 +
4 files changed, 111 insertions(+), 15 deletions(-)
diff --git a/Changes b/Changes
index 74ce10b..53bef29 100644
--- a/Changes
+++ b/Changes
@@ -1,5 +1,9 @@
Revision history for Perl extension RabbitFoot
+1.01 Sun Mar 7 15:28:46 2010
+ - fix bugs.
+ - support channel.flow.
+
1.00 Fri Mar 5 11:30:00 2010
- fix module name.
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index ede6a25..53c205b 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -15,7 +15,7 @@ use Net::AMQP::Common qw(:all);
use AnyEvent::RabbitMQ::Channel;
use AnyEvent::RabbitMQ::LocalQueue;
-our $VERSION = '1.00';
+our $VERSION = '1.01';
sub new {
my $class = shift;
@@ -49,6 +49,11 @@ sub connect {
my $self = shift;
my %args = $self->_set_cbs(@_);
+ if ($self->{_is_open}) {
+ $args{on_failure}->('Connection has already been opened');
+ return $self;
+ }
+
$args{on_close} ||= sub {};
$args{on_read_failure} ||= sub {die @_};
$args{timeout} ||= 0;
@@ -183,7 +188,7 @@ sub _start {
platform => 'Perl',
product => __PACKAGE__,
information => 'http://d.hatena.ne.jp/cooldaemon/',
- version => '0.01',
+ version => '1.01',
},
mechanism => 'AMQPLAIN',
response => {
@@ -253,6 +258,8 @@ sub close {
my $self = shift;
my %args = $self->_set_cbs(@_);
+ return $self if !$self->{_is_open};
+
my $close_cb = sub {
$self->_close(
sub {
@@ -312,19 +319,26 @@ sub open_channel {
my $self = shift;
my %args = $self->_set_cbs(@_);
+ return $self if !$self->_check_open($args{on_failure});
+
$args{on_close} ||= sub {};
my $id = $args{id};
- return $args{on_failure}->("Channel id $id is already in use")
- if $id && $self->{_channels}->{$id};
+ if ($id && $self->{_channels}->{$id}) {
+ $args{on_failure}->("Channel id $id is already in use");
+ return $self;
+ }
if (!$id) {
- for my $candidate_id (1 .. (2**16 - 1)) { # FIXME
+ for my $candidate_id (1 .. (2**16 - 1)) {
next if defined $self->{_channels}->{$candidate_id};
$id = $candidate_id;
last;
}
- return $args{on_failure}->('Ran out of channel ids') if !$id;
+ if (!$id) {
+ $args{on_failure}->('Ran out of channel ids');
+ return $self;
+ }
}
my $channel = AnyEvent::RabbitMQ::Channel->new(
@@ -423,6 +437,16 @@ sub _set_cbs {
return %args;
}
+sub _check_open {
+ my $self = shift;
+ my ($failure_cb) = @_;
+
+ return 1 if $self->{_is_open};
+
+ $failure_cb->('Connection has already been closed');
+ return 0;
+}
+
sub DESTROY {
my $self = shift;
$self->close();
@@ -482,7 +506,7 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
=head1 DESCRIPTION
-AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
+AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.
You can use AnyEvent::RabbitMQ to -
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index fbcb21d..a37cbc7 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -6,13 +6,14 @@ use warnings;
use Scalar::Util qw(weaken);
use AnyEvent::RabbitMQ::LocalQueue;
-our $VERSION = '1.00';
+our $VERSION = '1.01';
sub new {
my $class = shift;
my $self = bless {
@_, # id, connection, on_close
_is_open => 0,
+ _is_active => 0,
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_consumer_cbs => {},
@@ -31,13 +32,19 @@ sub open {
my $self = shift;
my %args = @_;
+ if ($self->{_is_open}) {
+ $args{on_failure}->('Channel has already been opened');
+ return $self;
+ }
+
$self->{connection}->_push_write_and_read(
'Channel::Open', {}, 'Channel::OpenOk',
sub {
- $self->{_is_open} = 1;
+ $self->{_is_open} = 1;
+ $self->{_is_active} = 1;
$args{on_success}->();
},
- $args{on_failur},
+ $args{on_failure},
$self->{id},
);
@@ -76,13 +83,15 @@ sub _close {
'Channel::Close', {}, 'Channel::CloseOk',
sub {
$self->{_is_open} = 0;
+ $self->{_is_active} = 0;
$self->{connection}->delete_channel($self->{id});
$args{on_success}->();
},
sub {
$self->{_is_open} = 0;
+ $self->{_is_active} = 0;
$self->{connection}->delete_channel($self->{id});
- $args{on_failur}->();
+ $args{on_failure}->();
},
$self->{id},
);
@@ -94,6 +103,8 @@ sub declare_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Exchange::Declare',
{
@@ -119,6 +130,8 @@ sub delete_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Exchange::Delete',
{
@@ -140,6 +153,8 @@ sub declare_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Queue::Declare',
{
@@ -164,6 +179,8 @@ sub bind_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Queue::Bind',
{
@@ -184,6 +201,8 @@ sub unbind_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Queue::Unbind',
{
@@ -203,6 +222,8 @@ sub purge_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Queue::Purge',
{
@@ -223,6 +244,8 @@ sub delete_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Queue::Delete',
{
@@ -245,6 +268,8 @@ sub publish {
my $self = shift;
my %args = @_;
+ return $self if !$self->{_is_active};
+
my $header_args = delete $args{header} || {};
my $body = delete $args{body} || '';
my $return_cb = delete $args{on_return} || sub {};
@@ -330,6 +355,8 @@ sub consume {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
my $consumer_cb = delete $args{on_consume} || sub {};
$self->{connection}->_push_write_and_read(
@@ -362,11 +389,17 @@ sub cancel {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- return $failure_cb->('consumer_tag is not set')
- if !defined $args{consumer_tag};
+ return $self if !$self->_check_open($failure_cb);
- return $failure_cb->('Unknown consumer_tag')
- if !$self->{_consumer_cbs}->{$args{consumer_tag}};
+ if (!defined $args{consumer_tag}) {
+ $failure_cb->('consumer_tag is not set');
+ return $self;
+ }
+
+ if (!$self->{_consumer_cbs}->{$args{consumer_tag}}) {
+ $failure_cb->('Unknown consumer_tag');
+ return $self;
+ }
$self->{connection}->_push_write_and_read(
'Basic::Cancel',
@@ -391,6 +424,8 @@ sub get {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Basic::Get',
{
@@ -416,6 +451,8 @@ sub ack {
my $self = shift;
my %args = @_;
+ return $self if !$self->_check_open(sub {});
+
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Ack->new(
delivery_tag => 0,
@@ -434,6 +471,8 @@ sub qos {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Basic::Qos',
{
@@ -455,6 +494,8 @@ sub recover {
my $self = shift;
my %args = @_;
+ return $self if !$self->_check_open(sub {});
+
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Recover->new(
requeue => 0,
@@ -470,6 +511,8 @@ sub select_tx {
my $self = shift;
my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Tx::Select', {}, 'Tx::SelectOk',
$cb,
@@ -484,6 +527,8 @@ sub commit_tx {
my $self = shift;
my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Tx::Commit', {}, 'Tx::CommitOk',
$cb,
@@ -498,6 +543,8 @@ sub rollback_tx {
my $self = shift;
my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
+ return $self if !$self->_check_open($failure_cb);
+
$self->{connection}->_push_write_and_read(
'Tx::Rollback', {}, 'Tx::RollbackOk',
$cb,
@@ -520,6 +567,7 @@ sub push_queue_or_consume {
$self->{id},
);
$self->{_is_open} = 0;
+ $self->{_is_active} = 0;
$self->{connection}->delete_channel($self->{id});
$self->{on_close}->($frame);
return $self;
@@ -535,6 +583,15 @@ sub push_queue_or_consume {
} || sub {};
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
return $self;
+ } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
+ $self->{_is_active} = $method_frame->active;
+ $self->{connection}->_push_write(
+ Net::AMQP::Protocol::Channel::FlowOk->new(
+ active => $method_frame->active,
+ ),
+ $self->{id},
+ );
+ return $self;
}
$self->{_queue}->push($frame);
} else {
@@ -587,6 +644,16 @@ sub _delete_cbs {
return $cb, $failure_cb, %args;
}
+sub _check_open {
+ my $self = shift;
+ my ($failure_cb) = @_;
+
+ return 1 if $self->{_is_open};
+
+ $failure_cb->('Channel has already been closed');
+ return 0;
+}
+
sub DESTROY {
my $self = shift;
$self->close();
diff --git a/xt/01_podspell.t b/xt/01_podspell.t
index babf41f..d4104da 100644
--- a/xt/01_podspell.t
+++ b/xt/01_podspell.t
@@ -11,3 +11,4 @@ cooldaemon at gmail.com
RabbitFoot
AMQP
RabbitMQ
+multi
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list