[libanyevent-rabbitmq-perl] 120/151: AMQP 0.9 support, ->confirm, more care that all on_close callbacks will be called correctly, and potential memory leak fixes
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:09 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit 7e7ee5e594e69225b41b6563295b18efaffecf3c
Author: Chip Salzenberg <chip at topsy.com>
Date: Wed Sep 12 15:14:44 2012 -0700
AMQP 0.9 support, ->confirm, more care that all on_close callbacks will be
called correctly, and potential memory leak fixes
---
lib/AnyEvent/RabbitMQ.pm | 59 ++++++----
lib/AnyEvent/RabbitMQ/Channel.pm | 229 ++++++++++++++++++++++++++++++---------
xt/04_anyevent.t | 10 +-
xt/05_multi_channel.t | 3 +-
4 files changed, 219 insertions(+), 82 deletions(-)
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 48b8dd5..99b7065 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -5,9 +5,9 @@ use warnings;
use Carp qw(confess croak);
use List::MoreUtils qw(none);
use Devel::GlobalDestruction;
-use namespace::clean;
use File::ShareDir;
use Readonly;
+use namespace::clean;
require Data::Dumper;
sub Dumper {
@@ -32,7 +32,7 @@ use AnyEvent::RabbitMQ::LocalQueue;
our $VERSION = '1.08';
Readonly my $DEFAULT_AMQP_SPEC
- => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml';
+ => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';
sub new {
my $class = shift;
@@ -296,7 +296,6 @@ sub _open {
'Connection::Open',
{
virtual_host => $args{vhost},
- capabilities => '',
insist => 1,
},
'Connection::OpenOk',
@@ -320,36 +319,40 @@ sub close {
return $self;
}
- my $close_cb = sub {
- $self->_close(
- sub {
- $self->_disconnect();
- $args{on_success}->(@_);
- },
- sub {
- $self->_disconnect();
- $args{on_failure}->(@_);
- }
- );
- return $self;
- };
-
- if (0 == scalar keys %{$self->{_channels}}) {
- return $close_cb->();
- }
+ my $channels_cv = AnyEvent->condvar;
+ $channels_cv->begin(
+ sub {
+ $self->_close(
+ sub {
+ $self->_disconnect();
+ $args{on_success}->(@_);
+ },
+ sub {
+ $self->_disconnect();
+ $args{on_failure}->(@_);
+ }
+ );
+ }
+ );
for my $id (keys %{$self->{_channels}}) {
my $channel = $self->{_channels}->{$id}
or next; # Could have already gone away on global destruction..
+
+ $channels_cv->begin;
$channel->close(
- on_success => $close_cb,
+ on_success => sub {
+ $channels_cv->end;
+ },
on_failure => sub {
- $close_cb->();
+ $channels_cv->end;
$args{on_failure}->(@_);
},
);
}
+ $channels_cv->end;
+
return $self;
}
@@ -357,7 +360,15 @@ sub _close {
my $self = shift;
my ($cb, $failure_cb,) = @_;
- return $self if !$self->{_is_open} || 0 < scalar keys %{$self->{_channels}};
+ if (!$self->{_is_open}) {
+ $cb->("Already closed");
+ return $self;
+ }
+
+ if (my @ch = keys %{$self->{_channels}}) {
+ $failure_cb->("Can't disconnect with channel(s) open: @ch");
+ return $self;
+ }
$self->_push_write_and_read(
'Connection::Close', {}, 'Connection::CloseOk',
@@ -592,7 +603,7 @@ You can use AnyEvent::RabbitMQ to -
* Publish, consume, get, ack, recover and reject messages
* Select, commit and rollback transactions
-AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and version 0-8 of the AMQP specification.
+AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and versions 0-8 and 0-9-1 of the AMQP specification.
=head1 AUTHOR
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index 5cd2bb1..f3390e2 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -5,21 +5,35 @@ use warnings;
use Scalar::Util qw(weaken);
use AnyEvent::RabbitMQ::LocalQueue;
+BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
our $VERSION = '1.08';
sub new {
my $class = shift;
+
my $self = bless {
- @_, # id, connection, on_close
- _is_open => 0,
- _is_active => 0,
+ @_, # id, connection, on_return, on_close
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
- _consumer_cbs => {},
- _return_cbs => {},
}, $class;
weaken($self->{connection});
+ return $self->_reset;
+}
+
+sub _reset {
+ my $self = shift;
+
+ my %a = (
+ _is_open => 0,
+ _is_active => 0,
+ _is_confirm => 0,
+ _publish_tag => 0,
+ _publish_cbs => {},
+ _consumer_cbs => {},
+ );
+ @$self{keys %a} = values %a;
+
return $self;
}
@@ -65,45 +79,33 @@ sub close {
# open, but we've closed it - a more elegant fix would be to mark that
# the channel is opening, and wait for it to open before closing it
if (!$self->{_is_open}) {
- $self->{connection}->delete_channel($self->{id});
+ $connection->delete_channel($self->{id});
$args{on_success}->($self);
return $self;
}
- return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}};
-
- for my $consumer_tag (keys %{$self->{_consumer_cbs}}) {
- $self->cancel(
- consumer_tag => $consumer_tag,
- on_success => sub {
- $self->_close(%args);
- },
- on_failure => sub {
- $self->_close(%args);
- $args{on_failure}->(@_);
- }
- );
- }
+ # spell it out, so the callbacks always can call ->method_frame
+ my $close_frame = Net::AMQP::Frame::Method->new(
+ method_frame => Net::AMQP::Protocol::Channel::Close->new,
+ );
- return $self;
-}
+ $connection->_push_write(
+ $close_frame,
+ $self->{id},
+ );
-sub _close {
- my $self = shift;
- my %args = @_;
+ weaken(my $wself = $self);
- $self->{connection}->_push_write_and_read(
- 'Channel::Close', {}, 'Channel::CloseOk',
+ $connection->_push_read_and_valid(
+ 'Channel::CloseOk',
sub {
- $self->{_is_open} = 0;
- $self->{_is_active} = 0;
- $self->{connection}->delete_channel($self->{id});
+ my $me = $wself or return;
+ $me->_close($close_frame, 0);
$args{on_success}->();
},
sub {
- $self->{_is_open} = 0;
- $self->{_is_active} = 0;
- $self->{connection}->delete_channel($self->{id});
+ my $me = $wself or return;
+ $me->_close($close_frame, 0);
$args{on_failure}->();
},
$self->{id},
@@ -112,6 +114,29 @@ sub _close {
return $self;
}
+sub _close {
+ my $self = shift;
+ my ($frame, $forced) = @_;
+
+ my $connection = $self->{connection};
+ my $on_close = $self->{on_close};
+
+ $self->{_is_open} = 0;
+ $self->{_queue}->_flush($frame);
+ $self->{_content_queue}->_flush($frame);
+ $self->_reset;
+
+ $connection->delete_channel($self->{id}) if $connection;
+
+ if (defined $on_close) {
+ local $@;
+ $on_close->($frame);
+ warn "Error in callback, ignored:\n $@ " if $@;
+ }
+
+ return $self;
+}
+
sub declare_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
@@ -283,9 +308,13 @@ sub publish {
return $self if !$self->{_is_active};
- my $header_args = delete $args{header} || {};
- my $body = delete $args{body} || '';
- my $return_cb = delete $args{on_return} || sub {};
+ my $header_args = delete $args{header};
+ my $body = delete $args{body};
+ my $ack_cb = delete $args{on_ack};
+
+ defined($header_args) or $header_args = {};
+ defined($body) or $body = '';
+ defined($ack_cb) or $ack_cb = sub {};
$self->_publish(
%args,
@@ -295,12 +324,14 @@ sub publish {
$body,
);
- return $self if !$args{mandatory} && !$args{immediate};
-
- if ($args{mandatory} || $args{immediate}) {
- $self->{_return_cbs}->{
- ($args{exchange} || '') . '_' . $args{routing_key}
- } = $return_cb;
+ if ($self->{_is_confirm}) {
+ # yeah, they're sequential. see Java client
+ my $tag = ++$self->{_publish_tag};
+ $self->{_publish_cbs}{$tag} = $ack_cb;
+ }
+ else {
+ # we do not expect an ack, so assume success
+ $ack_cb->();
}
return $self;
@@ -504,9 +535,37 @@ sub qos {
return $self;
}
+sub confirm {
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+
+ return $self if !$self->_check_open($failure_cb);
+ return $self if !$self->_check_version(0, 9, $failure_cb);
+
+ weaken(my $wself = $self);
+
+ $self->{connection}->_push_write_and_read(
+ 'Confirm::Select',
+ {
+ %args,
+ nowait => 0, # FIXME
+ },
+ 'Confirm::SelectOk',
+ sub {
+ my $me = $wself or return;
+ $me->{_is_confirm} = 1;
+ $cb->();
+ },
+ $failure_cb,
+ $self->{id},
+ );
+
+ return $self;
+}
+
sub recover {
my $self = shift;
- my %args = @_;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open(sub {});
@@ -518,6 +577,18 @@ sub recover {
$self->{id},
);
+ if (!$args{nowait} && $self->_check_version(0, 9)) {
+ $self->{connection}->_push_read_and_valid(
+ 'Basic::RecoverOk',
+ $cb,
+ $failure_cb,
+ $self->{id},
+ );
+ }
+ else {
+ $cb->();
+ }
+
return $self;
}
@@ -598,12 +669,7 @@ sub push_queue_or_consume {
Net::AMQP::Protocol::Channel::CloseOk->new(),
$self->{id},
);
- $self->{_is_open} = 0;
- $self->{_is_active} = 0;
- $self->{_queue}->_flush($frame);
- $self->{_content_queue}->_flush($frame);
- $self->{connection}->delete_channel($self->{id});
- $self->{on_close}->($frame);
+ $self->_close($frame, 0);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
my $cb = $self->{_consumer_cbs}->{
@@ -612,11 +678,20 @@ sub push_queue_or_consume {
$self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
- my $cb = $self->{_return_cbs}->{
- $method_frame->exchange . '_' . $method_frame->routing_key
- } || sub {};
+ my $cb = $self->{on_return} || $failure_cb;
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
return $self;
+ } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack')) {
+ my $pub_cb;
+ if (!$self->{_is_confirm}) {
+ $failure_cb->("Received Ack when not in confirm mode");
+ }
+ elsif (not $pub_cb = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
+ $failure_cb->("Received Ack of unknown delivery tag $method_frame->{delivery_tag}");
+ }
+ else {
+ $pub_cb->();
+ }
} elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
$self->{_is_active} = $method_frame->active;
$self->{connection}->_push_write(
@@ -657,11 +732,14 @@ sub _push_read_header_and_body {
$body_size = $frame->body_size;
});
+ weaken(my $wcontq = $self->{_content_queue});
my $body_payload = "";
my $w_next_frame;
my $next_frame = sub {
my $frame = shift;
+ my $contq = $wcontq or return;
+
return $failure_cb->('Received data is not body frame')
if !$frame->isa('Net::AMQP::Frame::Body');
@@ -669,7 +747,7 @@ sub _push_read_header_and_body {
if (length($body_payload) < $body_size) {
# More to come
- $self->{_content_queue}->get($w_next_frame);
+ $contq->get($w_next_frame);
}
else {
$frame->payload($body_payload);
@@ -705,9 +783,22 @@ sub _check_open {
return 0;
}
+sub _check_version {
+ my $self = shift;
+ my ($major, $minor, $failure_cb) = @_;
+
+ my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
+ my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
+
+ return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
+
+ $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
+ return 0;
+}
+
sub DESTROY {
my $self = shift;
- $self->close() if defined $self;
+ $self->close() if $self->{_is_open};
return;
}
@@ -727,6 +818,23 @@ AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.
=head1 DESCRIPTION
+=head1 ARGUMENTS FOR C<open_channel>
+
+=over
+
+=item on_close
+
+Callback invoked when the channel closes. Callback will be passed the
+incoming message that caused the close, if any.
+
+=item on_return
+
+Callback invoked when a mandatory or immediate message publish fails.
+Callback will be passed the incoming message, with accessors
+C<method_frame>, C<header_frame>, and C<body_frame>.
+
+=back
+
=head1 METHODS
=head2 declare_exchange (%args)
@@ -815,6 +923,10 @@ Arguments:
The text body of the message to send.
+=item header
+
+Customer headers for the message (if any).
+
=item exchange
The name of the exchange to send the message to.
@@ -823,6 +935,10 @@ The name of the exchange to send the message to.
The routing key with which to publish the message.
+=item on_ack
+
+Callback (if any) for confirming acknowledgment when in confirm mode.
+
=back
=head2 consume
@@ -906,6 +1022,11 @@ This callback will be called if an error is signalled on this channel.
=head2 qos
+=head2 confirm
+
+Put channel into confirm mode. In confirm mode, publishes are confirmed by
+the server, so the on_ack callback of publish works.
+
=head2 recover
=head2 select_tx
diff --git a/xt/04_anyevent.t b/xt/04_anyevent.t
index ca3de56..205129a 100644
--- a/xt/04_anyevent.t
+++ b/xt/04_anyevent.t
@@ -1,5 +1,6 @@
use Test::More;
use Test::Exception;
+use Data::Dumper;
use FindBin;
use version;
@@ -15,6 +16,7 @@ my %conf = (
user => 'guest',
pass => 'guest',
vhost => '/',
+ verbose => 1,
);
eval {
@@ -36,7 +38,7 @@ plan tests => 31;
use AnyEvent::RabbitMQ;
-my $ar = AnyEvent::RabbitMQ->new();
+my $ar = AnyEvent::RabbitMQ->new(verbose => $conf{verbose});
lives_ok sub {
$ar->load_xml_spec()
@@ -56,7 +58,8 @@ $ar->connect(
on_failure => failure_cb($done),
on_close => sub {
my $method_frame = shift->method_frame;
- die $method_frame->reply_code, $method_frame->reply_text;
+ die $method_frame->reply_code, $method_frame->reply_text
+ if $method_frame->reply_code;
},
);
$done->recv;
@@ -72,7 +75,8 @@ $ar->open_channel(
on_failure => failure_cb($done),
on_close => sub {
my $method_frame = shift->method_frame;
- die $method_frame->reply_code, $method_frame->reply_text;
+ die $method_frame->reply_code, $method_frame->reply_text
+ if $method_frame->reply_code;
},
);
$done->recv;
diff --git a/xt/05_multi_channel.t b/xt/05_multi_channel.t
index bdcaaa6..774129b 100644
--- a/xt/05_multi_channel.t
+++ b/xt/05_multi_channel.t
@@ -163,6 +163,7 @@ sub publish {
sub handle_close {
my $method_frame = shift->method_frame;
- die $method_frame->reply_code, $method_frame->reply_text;
+ die $method_frame->reply_code, $method_frame->reply_text
+ if $method_frame->reply_code;
}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list