[libanyevent-rabbitmq-perl] 120/151: AMQP 0.9 support, ->confirm, more care that all on_close callbacks will be called correctly, and potential memory leak fixes

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:09 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit 7e7ee5e594e69225b41b6563295b18efaffecf3c
Author: Chip Salzenberg <chip at topsy.com>
Date:   Wed Sep 12 15:14:44 2012 -0700

    AMQP 0.9 support, ->confirm, more care that all on_close callbacks will be
      called correctly, and potential memory leak fixes
---
 lib/AnyEvent/RabbitMQ.pm         |  59 ++++++----
 lib/AnyEvent/RabbitMQ/Channel.pm | 229 ++++++++++++++++++++++++++++++---------
 xt/04_anyevent.t                 |  10 +-
 xt/05_multi_channel.t            |   3 +-
 4 files changed, 219 insertions(+), 82 deletions(-)

diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 48b8dd5..99b7065 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -5,9 +5,9 @@ use warnings;
 use Carp qw(confess croak);
 use List::MoreUtils qw(none);
 use Devel::GlobalDestruction;
-use namespace::clean;
 use File::ShareDir;
 use Readonly;
+use namespace::clean;
 
 require Data::Dumper;
 sub Dumper {
@@ -32,7 +32,7 @@ use AnyEvent::RabbitMQ::LocalQueue;
 our $VERSION = '1.08';
 
 Readonly my $DEFAULT_AMQP_SPEC
-    => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-8.xml';
+    => File::ShareDir::dist_dir("AnyEvent-RabbitMQ") . '/fixed_amqp0-9-1.xml';
 
 sub new {
     my $class = shift;
@@ -296,7 +296,6 @@ sub _open {
         'Connection::Open',
         {
             virtual_host => $args{vhost},
-            capabilities => '',
             insist       => 1,
         },
         'Connection::OpenOk', 
@@ -320,36 +319,40 @@ sub close {
         return $self;
     }
 
-    my $close_cb = sub {
-        $self->_close(
-            sub {
-                $self->_disconnect();
-                $args{on_success}->(@_);
-            },
-            sub {
-                $self->_disconnect();
-                $args{on_failure}->(@_);
-            }
-        );
-        return $self;
-    };
-
-    if (0 == scalar keys %{$self->{_channels}}) {
-        return $close_cb->();
-    }
+    my $channels_cv = AnyEvent->condvar;
+    $channels_cv->begin(
+        sub {
+            $self->_close(
+                sub {
+                    $self->_disconnect();
+                    $args{on_success}->(@_);
+                },
+                sub {
+                    $self->_disconnect();
+                    $args{on_failure}->(@_);
+                }
+            );
+        }
+    );
 
     for my $id (keys %{$self->{_channels}}) {
          my $channel = $self->{_channels}->{$id}
             or next; # Could have already gone away on global destruction..
+
+         $channels_cv->begin;
          $channel->close(
-            on_success => $close_cb,
+            on_success => sub {
+                $channels_cv->end;
+            },
             on_failure => sub {
-                $close_cb->();
+                $channels_cv->end;
                 $args{on_failure}->(@_);
             },
         );
     }
 
+    $channels_cv->end;
+
     return $self;
 }
 
@@ -357,7 +360,15 @@ sub _close {
     my $self = shift;
     my ($cb, $failure_cb,) = @_;
 
-    return $self if !$self->{_is_open} || 0 < scalar keys %{$self->{_channels}};
+    if (!$self->{_is_open}) {
+        $cb->("Already closed");
+        return $self;
+    }
+
+    if (my @ch = keys %{$self->{_channels}}) {
+        $failure_cb->("Can't disconnect with channel(s) open: @ch");
+        return $self;
+    }
 
     $self->_push_write_and_read(
         'Connection::Close', {}, 'Connection::CloseOk',
@@ -592,7 +603,7 @@ You can use AnyEvent::RabbitMQ to -
   * Publish, consume, get, ack, recover and reject messages
   * Select, commit and rollback transactions
 
-AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and version 0-8 of the AMQP specification.
+AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and versions 0-8 and 0-9-1 of the AMQP specification.
 
 =head1 AUTHOR
 
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index 5cd2bb1..f3390e2 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -5,21 +5,35 @@ use warnings;
 
 use Scalar::Util qw(weaken);
 use AnyEvent::RabbitMQ::LocalQueue;
+BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
 
 our $VERSION = '1.08';
 
 sub new {
     my $class = shift;
+
     my $self = bless {
-        @_, # id, connection, on_close
-        _is_open       => 0,
-        _is_active     => 0,
+        @_,    # id, connection, on_return, on_close
         _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
         _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
-        _consumer_cbs  => {},
-        _return_cbs    => {},
     }, $class;
     weaken($self->{connection});
+    return $self->_reset;
+}
+
+sub _reset {
+    my $self = shift;
+
+    my %a = (
+        _is_open       => 0,
+        _is_active     => 0,
+        _is_confirm    => 0,
+        _publish_tag   => 0,
+        _publish_cbs   => {},
+        _consumer_cbs  => {},
+    );
+    @$self{keys %a} = values %a;
+
     return $self;
 }
 
@@ -65,45 +79,33 @@ sub close {
     # open, but we've closed it - a more elegant fix would be to mark that
     # the channel is opening, and wait for it to open before closing it
     if (!$self->{_is_open}) {
-        $self->{connection}->delete_channel($self->{id});
+        $connection->delete_channel($self->{id});
         $args{on_success}->($self);
         return $self;
     }
 
-    return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}};
-
-    for my $consumer_tag (keys %{$self->{_consumer_cbs}}) {
-        $self->cancel(
-            consumer_tag => $consumer_tag,
-            on_success   => sub {
-                $self->_close(%args);
-            },
-            on_failure   => sub {
-                $self->_close(%args);
-                $args{on_failure}->(@_);
-            }
-        );
-    }
+    # spell it out, so the callbacks always can call ->method_frame
+    my $close_frame = Net::AMQP::Frame::Method->new(
+        method_frame => Net::AMQP::Protocol::Channel::Close->new,
+    );
 
-    return $self;
-}
+    $connection->_push_write(
+        $close_frame,
+        $self->{id},
+    );
 
-sub _close {
-    my $self = shift;
-    my %args = @_;
+    weaken(my $wself = $self);
 
-    $self->{connection}->_push_write_and_read(
-        'Channel::Close', {}, 'Channel::CloseOk',
+    $connection->_push_read_and_valid(
+        'Channel::CloseOk',
         sub {
-            $self->{_is_open} = 0;
-            $self->{_is_active} = 0;
-            $self->{connection}->delete_channel($self->{id});
+            my $me = $wself or return;
+            $me->_close($close_frame, 0);
             $args{on_success}->();
         },
         sub {
-            $self->{_is_open} = 0;
-            $self->{_is_active} = 0;
-            $self->{connection}->delete_channel($self->{id});
+            my $me = $wself or return;
+            $me->_close($close_frame, 0);
             $args{on_failure}->();
         },
         $self->{id},
@@ -112,6 +114,29 @@ sub _close {
     return $self;
 }
 
+sub _close {
+    my $self = shift;
+    my ($frame, $forced) = @_;
+
+    my $connection = $self->{connection};
+    my $on_close = $self->{on_close};
+
+    $self->{_is_open} = 0;
+    $self->{_queue}->_flush($frame);
+    $self->{_content_queue}->_flush($frame);
+    $self->_reset;
+
+    $connection->delete_channel($self->{id}) if $connection;
+
+    if (defined $on_close) {
+        local $@;
+        $on_close->($frame);
+        warn "Error in callback, ignored:\n  $@  " if $@;
+    }
+
+    return $self;
+}
+
 sub declare_exchange {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
@@ -283,9 +308,13 @@ sub publish {
 
     return $self if !$self->{_is_active};
 
-    my $header_args = delete $args{header}    || {};
-    my $body        = delete $args{body}      || '';
-    my $return_cb   = delete $args{on_return} || sub {};
+    my $header_args = delete $args{header};
+    my $body        = delete $args{body};
+    my $ack_cb      = delete $args{on_ack};
+
+    defined($header_args) or $header_args = {};
+    defined($body) or $body = '';
+    defined($ack_cb) or $ack_cb = sub {};
 
     $self->_publish(
         %args,
@@ -295,12 +324,14 @@ sub publish {
         $body,
     );
 
-    return $self if !$args{mandatory} && !$args{immediate};
-
-    if ($args{mandatory} || $args{immediate}) {
-        $self->{_return_cbs}->{
-            ($args{exchange} || '') . '_' . $args{routing_key}
-        } = $return_cb;
+    if ($self->{_is_confirm}) {
+        # yeah, they're sequential.  see Java client
+        my $tag = ++$self->{_publish_tag};
+        $self->{_publish_cbs}{$tag} = $ack_cb;
+    }
+    else {
+        # we do not expect an ack, so assume success
+        $ack_cb->();
     }
 
     return $self;
@@ -504,9 +535,37 @@ sub qos {
     return $self;
 }
 
+sub confirm {
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
+
+    return $self if !$self->_check_open($failure_cb);
+    return $self if !$self->_check_version(0, 9, $failure_cb);
+
+    weaken(my $wself = $self);
+
+    $self->{connection}->_push_write_and_read(
+	'Confirm::Select',
+	{
+	    %args,
+	    nowait       => 0, # FIXME
+	},
+	'Confirm::SelectOk',
+        sub {
+            my $me = $wself or return;
+            $me->{_is_confirm} = 1;
+            $cb->();
+        },
+	$failure_cb,
+	$self->{id},
+    );
+
+    return $self;
+}
+
 sub recover {
     my $self = shift;
-    my %args = @_;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     return $self if !$self->_check_open(sub {});
 
@@ -518,6 +577,18 @@ sub recover {
         $self->{id},
     );
 
+     if (!$args{nowait} && $self->_check_version(0, 9)) {
+        $self->{connection}->_push_read_and_valid(
+            'Basic::RecoverOk',
+            $cb,
+            $failure_cb,
+            $self->{id},
+        );
+    }
+    else {
+        $cb->();
+    }
+
     return $self;
 }
 
@@ -598,12 +669,7 @@ sub push_queue_or_consume {
                 Net::AMQP::Protocol::Channel::CloseOk->new(),
                 $self->{id},
             );
-            $self->{_is_open} = 0;
-            $self->{_is_active} = 0;
-            $self->{_queue}->_flush($frame);
-            $self->{_content_queue}->_flush($frame);
-            $self->{connection}->delete_channel($self->{id});
-            $self->{on_close}->($frame);
+            $self->_close($frame, 0);
             return $self;
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
             my $cb = $self->{_consumer_cbs}->{
@@ -612,11 +678,20 @@ sub push_queue_or_consume {
             $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
             return $self;
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
-            my $cb = $self->{_return_cbs}->{
-                $method_frame->exchange . '_' . $method_frame->routing_key
-            } || sub {};
+            my $cb = $self->{on_return} || $failure_cb;
             $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
             return $self;
+        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack')) {
+            my $pub_cb;
+            if (!$self->{_is_confirm}) {
+                $failure_cb->("Received Ack when not in confirm mode");
+            }
+            elsif (not $pub_cb = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
+                $failure_cb->("Received Ack of unknown delivery tag $method_frame->{delivery_tag}");
+            }
+            else {
+                $pub_cb->();
+            }
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
             $self->{_is_active} = $method_frame->active;
             $self->{connection}->_push_write(
@@ -657,11 +732,14 @@ sub _push_read_header_and_body {
         $body_size = $frame->body_size;
     });
 
+    weaken(my $wcontq = $self->{_content_queue});
     my $body_payload = "";
     my $w_next_frame;
     my $next_frame = sub {
         my $frame = shift;
 
+        my $contq = $wcontq or return;
+
         return $failure_cb->('Received data is not body frame')
             if !$frame->isa('Net::AMQP::Frame::Body');
 
@@ -669,7 +747,7 @@ sub _push_read_header_and_body {
 
         if (length($body_payload) < $body_size) {
             # More to come
-            $self->{_content_queue}->get($w_next_frame);
+            $contq->get($w_next_frame);
         }
         else {
             $frame->payload($body_payload);
@@ -705,9 +783,22 @@ sub _check_open {
     return 0;
 }
 
+sub _check_version {
+    my $self = shift;
+    my ($major, $minor, $failure_cb) = @_;
+
+    my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
+    my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
+
+    return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
+
+    $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
+    return 0;
+}
+
 sub DESTROY {
     my $self = shift;
-    $self->close() if defined $self;
+    $self->close() if $self->{_is_open};
     return;
 }
 
@@ -727,6 +818,23 @@ AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.
 
 =head1 DESCRIPTION
 
+=head1 ARGUMENTS FOR C<open_channel>
+
+=over
+
+=item on_close
+
+Callback invoked when the channel closes.  Callback will be passed the
+incoming message that caused the close, if any.
+
+=item on_return
+
+Callback invoked when a mandatory or immediate message publish fails.
+Callback will be passed the incoming message, with accessors
+C<method_frame>, C<header_frame>, and C<body_frame>.
+
+=back
+
 =head1 METHODS
 
 =head2 declare_exchange (%args)
@@ -815,6 +923,10 @@ Arguments:
 
 The text body of the message to send.
 
+=item header
+
+Customer headers for the message (if any).
+
 =item exchange
 
 The name of the exchange to send the message to.
@@ -823,6 +935,10 @@ The name of the exchange to send the message to.
 
 The routing key with which to publish the message.
 
+=item on_ack
+
+Callback (if any) for confirming acknowledgment when in confirm mode.
+
 =back
 
 =head2 consume
@@ -906,6 +1022,11 @@ This callback will be called if an error is signalled on this channel.
 
 =head2 qos
 
+=head2 confirm
+
+Put channel into confirm mode.  In confirm mode, publishes are confirmed by
+the server, so the on_ack callback of publish works.
+
 =head2 recover
 
 =head2 select_tx
diff --git a/xt/04_anyevent.t b/xt/04_anyevent.t
index ca3de56..205129a 100644
--- a/xt/04_anyevent.t
+++ b/xt/04_anyevent.t
@@ -1,5 +1,6 @@
 use Test::More;
 use Test::Exception;
+use Data::Dumper;
 
 use FindBin;
 use version;
@@ -15,6 +16,7 @@ my %conf = (
     user  => 'guest',
     pass  => 'guest',
     vhost => '/',
+    verbose => 1,
 );
 
 eval {
@@ -36,7 +38,7 @@ plan tests => 31;
 
 use AnyEvent::RabbitMQ;
 
-my $ar = AnyEvent::RabbitMQ->new();
+my $ar = AnyEvent::RabbitMQ->new(verbose => $conf{verbose});
 
 lives_ok sub {
     $ar->load_xml_spec()
@@ -56,7 +58,8 @@ $ar->connect(
     on_failure => failure_cb($done),
     on_close   => sub {
         my $method_frame = shift->method_frame;
-        die $method_frame->reply_code, $method_frame->reply_text;
+        die $method_frame->reply_code, $method_frame->reply_text
+          if $method_frame->reply_code;
     },
 );
 $done->recv;
@@ -72,7 +75,8 @@ $ar->open_channel(
     on_failure => failure_cb($done),
     on_close   => sub {
         my $method_frame = shift->method_frame;
-        die $method_frame->reply_code, $method_frame->reply_text;
+        die $method_frame->reply_code, $method_frame->reply_text
+          if $method_frame->reply_code;
     },
 );
 $done->recv;
diff --git a/xt/05_multi_channel.t b/xt/05_multi_channel.t
index bdcaaa6..774129b 100644
--- a/xt/05_multi_channel.t
+++ b/xt/05_multi_channel.t
@@ -163,6 +163,7 @@ sub publish {
 
 sub handle_close {
     my $method_frame = shift->method_frame;
-    die $method_frame->reply_code, $method_frame->reply_text;
+    die $method_frame->reply_code, $method_frame->reply_text
+      if $method_frame->reply_code;
 }
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list