[SCM] libanyevent-rabbitmq-perl Debian packaging branch, master, updated. debian/1.12-1-6-g25ebabd

Chip Salzenberg chip at topsy.com
Mon May 6 14:52:56 UTC 2013


The following commit has been merged in the master branch:
commit ff0a88970b42f86951009b77832c776ea71d0bfd
Author: Chip Salzenberg <chip at topsy.com>
Date:   Thu Apr 25 02:32:32 2013 -0700

    Require Net::AMQP 0.06 to:
      get consume cancel notifications (e.g. queue deletion);
      properly encode user-provided header strings that look like numbers.
    Fix race between server-sent and client-sent cancellation.
    Expect server to send heartbeats as promised.  If it doesn't, go President
      Madagasgar on its ass and SHUT DOWN EVERYTHING.
    Rearrangeme many things and weaken many references to eliminate bad circular
      references.  Some circular refs are actually good, though; leave those.
    Allow customized client_properties on connection.
    Make test output clearer.

diff --git a/Changes b/Changes
index 8fd6184..ac19941 100644
--- a/Changes
+++ b/Changes
@@ -1,17 +1,33 @@
 Revision history for Perl extension AnyEvent::RabbitMQ
 
+<tbd>
+        - Require Net::AMQP 0.06 to:
+           + Get consume cancel notifications (e.g. queue deletion)
+           + Properly encode user-provided header strings that look like numbers
+        - Fix race between server-sent and client-sent cancellation.
+
+        - Expect server to send heartbeats as promised.  If it doesn't, go President
+            Madagasgar on its ass and SHUT DOWN EVERYTHING.
+
+        - Rearrange many things and weaken many references to eliminate bad circular
+            references.  Some circular refs are actually good, though; leave those.
+
+        - Allow customized client_properties on connection.
+
+        - Make test output clearer.
+
 1.12  Thu Apr 11 20:45:00 2013
-  - Allow AMQP client to adjust tuning, e.g. heartbeat
-    (Chip Salzenberg)
+        - Allow AMQP client to adjust tuning, e.g. heartbeat
+          (Chip Salzenberg)
 
-  - Fix RT#84222, continue reading AMQP packets after a heartbeat.
+        - Fix RT#84222, continue reading AMQP packets after a heartbeat.
 
-  - Spontaneously emit hearts as per amqp 0.9.1 spec.
+        - Spontaneously emit hearts as per amqp 0.9.1 spec.
 
-    The AMQP spec says, "The client should start sending heartbeats after
-    receiving a Connection.Tune method, and start monitoring heartbeats after
-    receiving Connection.Open."  There is no mention of merely responding to
-    heartbeat packets emitted by the server. (Dave Lambley)
+          The AMQP spec says, "The client should start sending heartbeats after
+          receiving a Connection.Tune method, and start monitoring heartbeats after
+          receiving Connection.Open."  There is no mention of merely responding to
+          heartbeat packets emitted by the server. (Dave Lambley)
 
 1.11  Tue Mar  5 22:22:00 2013
         - Fix on_success callback for the Channel->close method (davel).
diff --git a/Makefile.PL b/Makefile.PL
index f731120..c60be3f 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -4,7 +4,7 @@ name 'AnyEvent-RabbitMQ';
 all_from 'lib/AnyEvent/RabbitMQ.pm';
 
 requires 'List::MoreUtils';
-requires 'Net::AMQP';
+requires 'Net::AMQP' => '0.06';
 requires 'AnyEvent';
 requires 'Devel::GlobalDestruction';
 requires 'namespace::clean';
@@ -15,6 +15,7 @@ tests 't/*.t';
 author_tests 'xt';
 install_share;
 
+perl_version '5.006';
 build_requires 'Test::More';
 build_requires 'Test::Exception';
 build_requires 'version';
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 6d62b8a..83609dd 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -3,12 +3,12 @@ package AnyEvent::RabbitMQ;
 use strict;
 use warnings;
 use Carp qw(confess croak);
+use Scalar::Util qw(refaddr);
 use List::MoreUtils qw(none);
 use Devel::GlobalDestruction;
 use File::ShareDir;
 use Readonly;
 use Scalar::Util qw/ weaken /;
-use namespace::clean;
 
 require Data::Dumper;
 sub Dumper {
@@ -24,12 +24,14 @@ sub Dumper {
 use AnyEvent::Handle;
 use AnyEvent::Socket;
 
-use Net::AMQP;
+use Net::AMQP 0.06;
 use Net::AMQP::Common qw(:all);
 
 use AnyEvent::RabbitMQ::Channel;
 use AnyEvent::RabbitMQ::LocalQueue;
 
+use namespace::clean;
+
 our $VERSION = '1.12';
 
 Readonly my $DEFAULT_AMQP_SPEC
@@ -63,10 +65,15 @@ sub channels {
     return $self->{_channels};
 }
 
-sub delete_channel {
+sub _delete_channel {
     my $self = shift;
-    my ($id) = @_;
-    return defined delete $self->{_channels}->{$id};
+    my ($channel,) = @_;
+    my $c = $self->{_channels}->{$channel->id};
+    if (defined($c) && refaddr($c) == refaddr($channel)) {
+        delete $self->{_channels}->{$channel->id};
+        return 1;
+    }
+    return 0;
 }
 
 sub login_user {
@@ -109,37 +116,40 @@ sub connect {
         warn 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
     }
 
+    weaken(my $weak_self = $self);
     $self->{_connect_guard} = AnyEvent::Socket::tcp_connect(
         $args{host},
         $args{port},
         sub {
+            my $self = $weak_self or return;
             my $fh = shift or return $args{on_failure}->(
                 sprintf('Error connecting to AMQP Server %s:%s: %s', $args{host}, $args{port}, $!)
             );
 
-            my %handle_args = (
+            my $close_cb = $args{on_close};
+            my $failure_cb = $args{on_failure};
+            $self->{_handle} = AnyEvent::Handle->new(
                 fh       => $fh,
                 on_error => sub {
                     my ($handle, $fatal, $message) = @_;
+                    my $self = $weak_self or return;
 
-                    $self->{_channels} = {};
-                    if (!$self->{_is_open}) {
-                        $args{on_failure}->(@_);
+                    if ($self->{_is_open}) {
+                        $self->_force_close($close_cb, $message);
+                    }
+                    else {
+                        $failure_cb->(@_);
                     }
-                    $self->{_is_open} = 0;
-                    $self->_disconnect();
-                    $args{on_close}->($message);
                 },
                 on_drain => sub {
                     my ($handle) = @_;
+                    my $self = $weak_self or return;
+
                     $self->{drain_condvar}->send
                         if exists $self->{drain_condvar};
                 },
+                $args{tls} ? (tls => 'connect') : (),
             );
-            if ($args{tls}) {
-                $handle_args{tls} = 'connect';
-            }
-            $self->{_handle} = AnyEvent::Handle->new(%handle_args);
             $self->_read_loop($args{on_close}, $args{on_read_failure});
             $self->_start(%args,);
         },
@@ -160,7 +170,9 @@ sub _read_loop {
 
     return if !defined $self->{_handle}; # called on_error
 
+    weaken(my $weak_self = $self);
     $self->{_handle}->push_read(chunk => 8, sub {
+        my $self = $weak_self or return;
         my $data = $_[1];
         my $stack = $_[1];
 
@@ -178,17 +190,17 @@ sub _read_loop {
         }
 
         $self->{_handle}->push_read(chunk => $length, sub {
+            my $self = $weak_self or return;
             $stack .= $_[1];
             my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
 
+            $self->{_heartbeat_recv} = time if $self->{_heartbeat};
+
             if ($self->{verbose}) {
                 warn '[C] <-- [S] ', Dumper($frame),
                      '-----------', "\n";
             }
 
-            # TODO - check that a packet has been received within two times the
-            # heartbeat period.
-
             my $id = $frame->channel;
             if (0 == $id) {
                 if ($frame->type_id == 8) {
@@ -224,12 +236,24 @@ sub _check_close_and_clean {
     my $method_frame = $frame->method_frame;
     return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
 
-    delete $self->{_heartbeat};
+    delete $self->{_heartbeat_timer};
     $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
+    $self->_force_close($close_cb, $frame);
+    return;
+}
+
+sub _force_close {
+    my $self = shift;
+    my ($close_cb, $why,) = @_;
+
+    for my $channel (values %{ $self->{_channels} }) {
+        $channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
+    }
     $self->{_channels} = {};
     $self->{_is_open} = 0;
-    $self->_disconnect();
-    $close_cb->($frame);
+    $self->{_handle}->push_shutdown;
+
+    $close_cb->($why);
     return;
 }
 
@@ -261,10 +285,15 @@ sub _start {
             $self->_push_write(
                 Net::AMQP::Protocol::Connection::StartOk->new(
                     client_properties => {
-                        platform    => 'Perl',
-                        product     => __PACKAGE__,
-                        information => 'http://d.hatena.ne.jp/cooldaemon/',
-                        version     => __PACKAGE__->VERSION,
+                        platform     => 'Perl',
+                        product      => __PACKAGE__,
+                        information  => 'http://d.hatena.ne.jp/cooldaemon/',
+                        version      => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
+                        capabilities => {
+                            consumer_cancel_notify     => Net::AMQP::Value::true,
+                            exchange_exchange_bindings => Net::AMQP::Value::true,
+                        },
+                        %{ $args{client_properties} || {} },
                     },
                     mechanism => 'AMQPLAIN',
                     response => {
@@ -287,9 +316,11 @@ sub _tune {
     my $self = shift;
     my %args = @_;
 
+    weaken(my $weak_self = $self);
     $self->_push_read_and_valid(
         'Connection::Tune',
         sub {
+            my $self = $weak_self or return;
             my $frame = shift;
 
             my %tune = map { my $t = $args{tune}{$_};
@@ -303,10 +334,26 @@ sub _tune {
             $self->_open(%args,);
 
             if ($tune{heartbeat} > 0) {
-                $self->{_heartbeat} = AnyEvent->timer(
+                my $close_cb   = $args{on_close};
+                my $failure_cb = $args{on_read_failure};
+                my $last_recv = 0;
+                my $idle_cycles = 0;
+                $self->{_heartbeat_recv} = time;
+                $self->{_heartbeat_timer} = AnyEvent->timer(
                     after    => $tune{heartbeat},
                     interval => $tune{heartbeat},
                     cb => sub {
+                        my $self = $weak_self or return;
+                        if ($self->{_heartbeat_recv} != $last_recv) {
+                            $last_recv = $self->{_heartbeat_recv};
+                            $idle_cycles = 0;
+                        }
+                        elsif (++$idle_cycles > 1) {
+                            delete $self->{_heartbeat_timer};
+                            $failure_cb->("Heartbeat lost");
+                            $self->_force_close($close_cb, "Heartbeat lost");
+                            return;
+                        }
                         $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
                     },
                 );
@@ -343,8 +390,6 @@ sub _open {
 
 sub close {
     my $self = shift;
-    my $weak_self = $self;
-    weaken($weak_self);
     my %args = $self->_set_cbs(@_);
 
     if (!$self->{_is_open}) {
@@ -352,74 +397,59 @@ sub close {
         return $self;
     }
 
-    my $channels_to_close = 0;
-    my $all_closed_cb = sub {
-        return unless 0 == $channels_to_close;
-        $weak_self->_close(
-            sub {
-                $weak_self->_disconnect();
-                $args{on_success}->(@_);
-            },
-            sub {
-                $weak_self->_disconnect();
-                $args{on_failure}->(@_);
-            }
-        );
+    my $cv = AE::cv {
+        $self->_finish_close(%args);
     };
 
-    if (scalar(keys %{ $self->{_channels} })==0) {
-        $args{on_success}->(@_);
-    }
+    $cv->begin;
 
     for my $id (keys %{$self->{_channels}}) {
          my $channel = $self->{_channels}->{$id}
             or next; # Could have already gone away on global destruction..
 
-        $channels_to_close++;
-
+         $cv->begin;
          $channel->close(
-            on_success => sub {
-                $channels_to_close--;
-                $all_closed_cb->();
-            },
-            on_failure => sub {
-                $channels_to_close--;
-                $all_closed_cb->();
-                $args{on_failure}->(@_);
-            },
-        );
+             on_success => sub { $cv->end },
+             on_failure => sub { $cv->end },
+         );
     }
 
+    $cv->end;
+
     return $self;
 }
 
-sub _close {
+sub _finish_close {
     my $self = shift;
-    my ($cb, $failure_cb,) = @_;
+    my %args = @_;
 
     if (!$self->{_is_open}) {
-        $cb->("Already closed");
-        return $self;
+        $args{on_failure}->("Already closed");
+        return;
     }
 
     if (my @ch = keys %{$self->{_channels}}) {
-        $failure_cb->("Can't disconnect with channel(s) open: @ch");
-        return $self;
+        $args{on_failure}->("Can't close connection with channel(s) open: @ch");
+        return;
     }
 
     $self->_push_write_and_read(
         'Connection::Close', {}, 'Connection::CloseOk',
-        $cb, $failure_cb,
+        sub {
+            # circular ref ok
+            $self->{_handle}->push_shutdown;
+            $args{on_success}->(@_);
+        },
+        sub {
+            # circular ref ok
+            $self->{_handle}->push_shutdown;
+            $args{on_failure}->(@_);
+        },
     );
-    $self->{_is_open} = 0;
 
-    return $self;
-}
+    $self->{_is_open} = 0;
 
-sub _disconnect {
-    my $self = shift;
-    $self->{_handle}->push_shutdown;
-    return $self;
+    return;
 }
 
 sub open_channel {
@@ -461,7 +491,7 @@ sub open_channel {
             $args{on_success}->($channel);
         },
         on_failure => sub {
-            $self->delete_channel($id);
+            $self->_delete_channel($channel);
             $args{on_failure}->(@_);
         },
     );
@@ -512,8 +542,9 @@ sub _push_read_and_valid {
         }
 
         $failure_cb->(
-              'Method is not ' . join(',', @$exp) . "\n"
-            . 'Method was ' . ref $method_frame
+            $method_frame->isa('Net::AMQP::Protocol::Channel::Close')
+              ? 'Channel closed'
+              : 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
         );
     });
 }
@@ -621,14 +652,20 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
           );
       },
       on_failure => $cv,
-      on_read_failure => sub {die @_},
+      on_read_failure => sub { die @_ },
       on_return  => sub {
           my $frame = shift;
           die "Unable to deliver ", Dumper($frame);
       }
       on_close   => sub {
-          my $method_frame = shift->method_frame;
-          die $method_frame->reply_code, $method_frame->reply_text;
+          my $why = shift;
+          if (ref($why)) {
+              my $method_frame = $why->method_frame;
+              die $method_frame->reply_code, ": ", $method_frame->reply_text;
+          }
+          else {
+              die $why;
+          }
       },
   );
 
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index 14d9ebb..244c427 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -4,10 +4,12 @@ use strict;
 use warnings;
 
 use AnyEvent::RabbitMQ::LocalQueue;
-use Scalar::Util qw(weaken);
+use Scalar::Util qw( looks_like_number weaken );
 use Carp qw(croak);
 BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
 
+use namespace::clean;
+
 our $VERSION = '1.12';
 
 sub new {
@@ -31,14 +33,18 @@ sub _reset {
         _is_confirm    => 0,
         _publish_tag   => 0,
         _publish_cbs   => {},  # values: [on_ack, on_nack, on_return]
-        _consumer_cbs  => {},
-        _consumer_cans => {},
+        _consumer_cbs  => {},  # values: [on_consume, on_cancel...]
     );
     @$self{keys %a} = values %a;
 
     return $self;
 }
 
+sub id {
+    my $self = shift;
+    return $self->{id};
+}
+
 sub is_open {
     my $self = shift;
     return $self->{_is_open};
@@ -96,33 +102,26 @@ sub close {
     # open, but we've closed it - a more elegant fix would be to mark that
     # the channel is opening, and wait for it to open before closing it
     if (!$self->{_is_open}) {
-        $connection->delete_channel($self->{id});
+        $connection->_delete_channel($self);
         $args{on_success}->($self);
         return $self;
     }
 
-    # spell it out, so the callbacks always can call ->method_frame
-    my $close_frame = Net::AMQP::Frame::Method->new(
-        method_frame => Net::AMQP::Protocol::Channel::Close->new,
-    );
-
     $connection->_push_write(
-        $close_frame,
+        $self->_close_frame,
         $self->{id},
     );
 
-    weaken(my $wself = $self);
-
     $connection->_push_read_and_valid(
         'Channel::CloseOk',
         sub {
-            my $me = $wself or return;
-            $me->_close($close_frame, 0);
+            # circular ref ok
+            $self->_closed();
             $args{on_success}->();
         },
         sub {
-            my $me = $wself or return;
-            $me->_close($close_frame, 0);
+            # circular ref ok
+            $self->_closed();
             $args{on_failure}->();
         },
         $self->{id},
@@ -131,31 +130,49 @@ sub close {
     return $self;
 }
 
-sub _close {
+sub _closed {
     my $self = shift;
-    my ($frame, $forced) = @_;
-
-    my $connection = $self->{connection};
-    my $on_close = $self->{on_close};
+    my ($frame,) = @_;
+    $frame ||= $self->_close_frame;
 
+    $self->{_is_open} or return;
     $self->{_is_open} = 0;
-    if ($frame) {
-        $self->{_queue}->_flush($frame);
-        $self->{_content_queue}->_flush($frame);
-    }
+
+    # Perform callbacks for all outstanding commands
+    $self->{_queue}->_flush($frame);
+    $self->{_content_queue}->_flush($frame);
+
+    # Report cancelation of all outstanding consumes
+    my @tags = keys %{ $self->{_consumer_cbs} };
+    $self->_canceled($_, $frame) for @tags;
+
+    # Reset state (redundant?)
     $self->_reset;
 
-    $connection->delete_channel($self->{id}) if $connection;
+    if (my $connection = $self->{connection}) {
+        $connection->_delete_channel($self);
+    }
 
-    if (defined $on_close) {
+    if (my $cb = $self->{on_close}) {
         local $@;
-        $on_close->($frame);
+        eval { $cb->($frame) };
         warn "Error in channel on_close callback, ignored:\n  $@  " if $@;
     }
 
     return $self;
 }
 
+sub _close_frame {
+    my $self = shift;
+    my ($text,) = @_;
+
+    Net::AMQP::Frame::Method->new(
+        method_frame => Net::AMQP::Protocol::Channel::Close->new(
+            reply_text => $text,
+        ),
+    );
+}
+
 sub declare_exchange {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
@@ -352,9 +369,9 @@ sub publish {
         $tag = ++$self->{_publish_tag};
         if ($return_cb) {
             $header_args = { %$header_args };
-            $header_args->{headers}{_ar_return} = $tag;  # just reuse the same value, why not
+            $header_args->{headers}->{_ar_return} = $tag;  # just reuse the same value, why not
         }
-        $self->{_publish_cbs}{$tag} = [$ack_cb, $nack_cb, $return_cb];
+        $self->{_publish_cbs}->{$tag} = [$ack_cb, $nack_cb, $return_cb];
     }
 
     $self->_publish(
@@ -391,6 +408,16 @@ sub _header {
 
     my $weight = delete $args->{weight} || 0;
 
+    # user-provided message headers must be strings.  protect values that look like numbers.
+    my $headers = $args->{headers} || {};
+    my @prot = grep { my $v = $headers->{$_}; !ref($v) && looks_like_number($v) } keys %$headers;
+    if (@prot) {
+        $headers = {
+            %$headers,
+            map { $_ => Net::AMQP::Value::String->new($headers->{$_}) } @prot
+        };
+    }
+
     $self->{connection}->_push_write(
         Net::AMQP::Frame::Header->new(
             weight       => $weight,
@@ -398,7 +425,6 @@ sub _header {
             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
                 content_type     => 'application/octet-stream',
                 content_encoding => undef,
-                headers          => {},
                 delivery_mode    => 1,
                 priority         => 1,
                 correlation_id   => undef,
@@ -410,6 +436,7 @@ sub _header {
                 app_id           => undef,
                 cluster_id       => undef,
                 %$args,
+                headers          => $headers,
             ),
         ),
         $self->{id},
@@ -436,7 +463,8 @@ sub consume {
     return $self if !$self->_check_open($failure_cb);
 
     my $consumer_cb = delete $args{on_consume} || sub {};
-    
+    my $cancel_cb   = delete $args{on_cancel} || sub {};
+
     $self->{connection}->_push_write_and_read(
         'Basic::Consume',
         {
@@ -444,16 +472,16 @@ sub consume {
             no_local     => 0,
             no_ack       => 1,
             exclusive    => 0,
+
             %args, # queue
             ticket       => 0,
             nowait       => 0, # FIXME
         },
-        'Basic::ConsumeOk', 
+        'Basic::ConsumeOk',
         sub {
             my $frame = shift;
-            $self->{_consumer_cbs}->{
-                $frame->method_frame->consumer_tag
-            } = $consumer_cb;
+            my $tag = $frame->method_frame->consumer_tag;
+            $self->{_consumer_cbs}->{$tag} = [ $consumer_cb, $cancel_cb ];
             $cb->($frame);
         },
         $failure_cb,
@@ -474,12 +502,12 @@ sub cancel {
         return $self;
     }
 
-    if (!$self->{_consumer_cbs}->{$args{consumer_tag}}) {
+    my $cons_cbs = $self->{_consumer_cbs}->{$args{consumer_tag}};
+    unless ($cons_cbs) {
         $failure_cb->('Unknown consumer_tag');
         return $self;
     }
-
-    $self->{_consumer_cans}{$args{consumer_tag}} = $cb;
+    push @$cons_cbs, $cb;
 
     $self->{connection}->_push_write(
         Net::AMQP::Protocol::Basic::Cancel->new(
@@ -492,6 +520,20 @@ sub cancel {
     return $self;
 }
 
+sub _canceled {
+    my $self = shift;
+    my ($tag, $frame,) = @_;
+
+    my $cons_cbs = delete $self->{_consumer_cbs}->{$tag}
+      or return 0;
+
+    shift @$cons_cbs; # no more deliveries
+    for my $cb (reverse @$cons_cbs) {
+        $cb->($frame);
+    }
+    return 1;
+}
+
 sub get {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
@@ -696,20 +738,19 @@ sub push_queue_or_consume {
                 Net::AMQP::Protocol::Channel::CloseOk->new(),
                 $self->{id},
             );
-            $self->_close($frame, 0);
+            $self->_closed($frame);
             return $self;
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
-            my $cb = $self->{_consumer_cbs}->{
-                $method_frame->consumer_tag
-            } || sub {};
+            my $cons_cbs = $self->{_consumer_cbs}->{$method_frame->consumer_tag};
+            my $cb = ($cons_cbs && $cons_cbs->[0]) || sub {};
             $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
             return $self;
-        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
-            my $can_cb = delete $self->{_consumer_cans}{$method_frame->consumer_tag};
-            if ($can_cb) {
-                $can_cb->($method_frame);
-            }
-            else {
+        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk') ||
+                 $method_frame->isa('Net::AMQP::Protocol::Basic::Cancel')) {
+            # CancelOk means we asked for a cancel.
+            # Cancel means queue was deleted; it is not AMQP, but RMQ supports it.
+            if (!$self->_canceled($method_frame->consumer_tag, $frame)
+                  && $method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
                 $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
             }
             return $self;
@@ -721,10 +762,10 @@ sub push_queue_or_consume {
                 my $headers = $ret->{header}->headers || {};
                 my $onret_cb;
                 if (defined(my $tag = $headers->{_ar_return})) {
-                    my $cbs = delete $me->{_publish_cbs}{$tag};
+                    my $cbs = delete $me->{_publish_cbs}->{$tag};
                     $onret_cb = $cbs->[2] if $cbs;
                 }
-                $onret_cb ||= $me->{on_return} || $me->{connection}{on_return} || sub {};  # oh well
+                $onret_cb ||= $me->{on_return} || $me->{connection}->{on_return} || sub {};  # oh well
                 $onret_cb->($frame);
             };
             $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
@@ -749,7 +790,7 @@ sub push_queue_or_consume {
                 my $cbi = ($resp eq 'Ack') ? 0 : 1;
                 for my $tag (@tags) {
                     my $cbs;
-                    if (not $cbs = delete $self->{_publish_cbs}{$tag}) {
+                    if (not $cbs = delete $self->{_publish_cbs}->{$tag}) {
                         $failure_cb->("Received $resp of unknown delivery tag $tag");
                     }
                     elsif ($cbs->[$cbi]) {
@@ -767,7 +808,7 @@ sub push_queue_or_consume {
                 $self->{id},
             );
             my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
-            my $cb = $self->{$cbname} || $self->{connection}{$cbname} || sub {};
+            my $cb = $self->{$cbname} || $self->{connection}->{$cbname} || sub {};
             $cb->($frame);
             return $self;
         }
@@ -872,8 +913,6 @@ sub DESTROY {
 }
 
 1;
-
-1;
 __END__
 
 =head1 NAME
@@ -1022,6 +1061,12 @@ Arguments:
 
 Callback called with an argument of the message which has been consumed.
 
+=item on_cancel
+
+Callback called if consumption is canceled.  This may be at client request
+or as a side effect of queue deletion.  (Notification of queue deletion is a
+RabbitMQ extension.)
+
 =item consumer_tag
 
 Identifies this consumer, will be auto-generated if you do not provide it, but you must
diff --git a/xt/05_multi_channel.t b/xt/05_multi_channel.t
index 2c37b11..d814013 100644
--- a/xt/05_multi_channel.t
+++ b/xt/05_multi_channel.t
@@ -24,7 +24,7 @@ eval {
 
 plan skip_all => 'Connection failure: '
                . $conf{host} . ':' . $conf{port} if $@;
-plan tests => 1;
+plan tests => 3;
 
 use AnyEvent::RabbitMQ;
 
@@ -36,14 +36,19 @@ my @queues = map {
     declare_queue($ch, $queue,);
 
     my $done = AnyEvent->condvar;
+    my $cdone = AnyEvent->condvar;
     consume($ch, $queue, sub {
         my $response = shift;
         return if 'stop' ne $response->{body}->payload;
         $done->send();
+    }, sub {
+        $cdone->send();
     });
-    {name => $queue, cv => $done};
+    {name => $queue, cv => $done, ccv => $cdone};
 } (1..5);
 
+pass('queue setup');
+
 my $ch = open_channel($ar);
 for my $queue (@queues) {
     publish($ch, $queue->{name}, 'hello');
@@ -62,6 +67,14 @@ for my $queue (@queues) {
     delete_queue($ch, $queue->{name});
 }
 
+my $ccount = 0;
+for my $queue (@queues) {
+    $queue->{ccv}->recv;
+    $ccount++;
+}
+
+is($ccount, 5, 'cancel count');
+
 close_ar($ar);
 
 sub connect_ar {
@@ -70,7 +83,7 @@ sub connect_ar {
         (map {$_ => $conf{$_}} qw(host port user pass vhost)),
         timeout    => 1,
         on_success => sub {$done->send(1)},
-        on_failure => sub {$done->send()},
+        on_failure => sub { diag @_; $done->send()},
         on_close   => \&handle_close,
     );
     die 'Connection failure' if !$done->recv;
@@ -83,7 +96,7 @@ sub close_ar {
     my $done = AnyEvent->condvar;
     $ar->close(
         on_success => sub {$done->send(1)},
-        on_failure => sub {$done->send()},
+        on_failure => sub { diag @_; $done->send()},
     );
     die 'Close failure' if !$done->recv;
 
@@ -135,7 +148,7 @@ sub delete_queue {
 }
 
 sub consume {
-    my ($ch, $queue, $handle_consume,) = @_;
+    my ($ch, $queue, $handle_consume, $handle_cancel,) = @_;
 
     my $done = AnyEvent->condvar;
     $ch->consume(
@@ -143,6 +156,7 @@ sub consume {
         on_success => sub {$done->send(1)},
         on_failure => sub {$done->send()},
         on_consume => $handle_consume,
+        on_cancel  => $handle_cancel,
     );
     die 'Consume failure' if !$done->recv;
 

-- 
libanyevent-rabbitmq-perl Debian packaging



More information about the Pkg-perl-cvs-commits mailing list