[libanyevent-rabbitmq-perl] 123/151: Handle flow control (do not throw messages away!) Handle out-of-order CancelOk Handle server-sent Nack Add missing accessors, e.g. is_open and is_active and verbose Fix a potential memory leak

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:09 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit b867a9b226978034988595df1b7fc25fd92e2852
Author: Chip Salzenberg <chip at topsy.com>
Date:   Tue Sep 18 13:28:42 2012 -0700

    Handle flow control (do not throw messages away!)
    Handle out-of-order CancelOk
    Handle server-sent Nack
    Add missing accessors, e.g. is_open and is_active and verbose
    Fix a potential memory leak
---
 lib/AnyEvent/RabbitMQ.pm         |  12 ++++-
 lib/AnyEvent/RabbitMQ/Channel.pm | 106 +++++++++++++++++++++++++++++----------
 2 files changed, 90 insertions(+), 28 deletions(-)

diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index e0f9114..25169cb 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -47,6 +47,16 @@ sub new {
     }, $class;
 }
 
+sub verbose {
+    my $self = shift;
+    @_ ? ($self->{verbose} = shift) : $self->{verbose}
+}
+
+sub is_open {
+    my $self = shift;
+    $self->{_is_open}
+}
+
 sub channels {
     my $self = shift;
     return $self->{_channels};
@@ -55,7 +65,7 @@ sub channels {
 sub delete_channel {
     my $self = shift;
     my ($id) = @_;
-    return delete $self->{_channels}->{$id};
+    return defined delete $self->{_channels}->{$id};
 }
 
 sub login_user {
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index f542676..da0b1dd 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -14,7 +14,7 @@ sub new {
     my $class = shift;
 
     my $self = bless {
-        @_,    # id, connection, on_return, on_close
+        @_,    # id, connection, on_return, on_close, on_inactive, on_active
         _queue         => AnyEvent::RabbitMQ::LocalQueue->new,
         _content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
     }, $class;
@@ -32,12 +32,28 @@ sub _reset {
         _publish_tag   => 0,
         _publish_cbs   => {},
         _consumer_cbs  => {},
+        _consumer_cans => {},
     );
     @$self{keys %a} = values %a;
 
     return $self;
 }
 
+sub is_open {
+    my $self = shift;
+    return $self->{_is_open};
+}
+
+sub is_active {
+    my $self = shift;
+    return $self->{_is_active};
+}
+
+sub is_confirm {
+    my $self = shift;
+    return $self->{_is_confirm};
+}
+
 sub queue {
     my $self = shift;
     return $self->{_queue};
@@ -123,8 +139,10 @@ sub _close {
     my $on_close = $self->{on_close};
 
     $self->{_is_open} = 0;
-    $self->{_queue}->_flush($frame);
-    $self->{_content_queue}->_flush($frame);
+    if ($frame) {
+        $self->{_queue}->_flush($frame);
+        $self->{_content_queue}->_flush($frame);
+    }
     $self->_reset;
 
     $connection->delete_channel($self->{id}) if $connection;
@@ -132,7 +150,7 @@ sub _close {
     if (defined $on_close) {
         local $@;
         $on_close->($frame);
-        warn "Error in callback, ignored:\n  $@  " if $@;
+        warn "Error in channel on_close callback, ignored:\n  $@  " if $@;
     }
 
     return $self;
@@ -307,18 +325,26 @@ sub publish {
     my $self = shift;
     my %args = @_;
 
-    return $self if !$self->{_is_active};
+    # Docs should advise channel-level callback over this, but still, better to give user an out
+    unless ($self->{_is_active}) {
+        if (defined $args{on_inactive}) {
+            $args{on_inactive}->();
+            return $self;
+        }
+        croak "Can't publish on inactive channel (server flow control); provide on_inactive callback";
+    }
 
     my $header_args = delete $args{header};
     my $body        = delete $args{body};
     my $ack_cb      = delete $args{on_ack};
+    my $nack_cb     = delete $args{on_nack};
     my $return_cb   = delete $args{on_return};
 
     defined($header_args) or $header_args = {};
     defined($body) or $body = '';
-    defined($ack_cb) or defined($return_cb)
+    defined($ack_cb) or defined($nack_cb) or defined($return_cb)
        and !$self->{_is_confirm}
-       and croak "Can't set on_ack or on_return callback when not in confirm mode";
+       and croak "Can't set on_ack/on_nack/on_return callback when not in confirm mode";
 
     my $tag;
     if ($self->{_is_confirm}) {
@@ -328,7 +354,7 @@ sub publish {
             $header_args = { %$header_args };
             $header_args->{headers}{_return} = $tag;  # just reuse the same value, why not
         }
-        $self->{_publish_cbs}{$tag} = [$ack_cb, $return_cb];
+        $self->{_publish_cbs}{$tag} = [$ack_cb, $nack_cb, $return_cb];
     }
 
     $self->_publish(
@@ -453,19 +479,13 @@ sub cancel {
         return $self;
     }
 
-    $self->{connection}->_push_write_and_read(
-        'Basic::Cancel',
-        {
+    $self->{_consumer_cans}{$args{consumer_tag}} = $cb;
+
+    $self->{connection}->_push_write(
+        Net::AMQP::Protocol::Basic::Cancel->new(
             %args, # consumer_tag
             nowait => 0,
-        },
-        'Basic::CancelOk', 
-        sub {
-            my $frame = shift;
-            delete $self->{_consumer_cbs}->{$args{consumer_tag}};
-            $cb->($frame);
-        },
-        $failure_cb,
+        ),
         $self->{id},
     );
 
@@ -684,28 +704,57 @@ sub push_queue_or_consume {
             } || sub {};
             $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
             return $self;
+        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
+            my $can_cb = delete $self->{_consumer_cans}{$method_frame->consumer_tag};
+            if ($can_cb) {
+                $can_cb->($method_frame);
+            }
+            else {
+                $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
+            }
+            return $self;
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
+            weaken(my $wself = $self);
             my $cb = sub {
                 my $ret = shift;
+                my $me = $wself or return;
                 my $headers = $ret->{header}->headers || {};
                 my $tag = $headers->{_return_tag};
-                my $cbs = $self->{_publish_cbs}{$headers->{_return}};
-                my $onret_cb = ($cbs && $cbs->[1]) || $self->{on_return} || $self->{connection}{on_return} || sub {};  # oh well
+                my $cbs = $me->{_publish_cbs}{$headers->{_return}};
+                my $onret_cb = ($cbs && $cbs->[1]) || $me->{on_return} || $me->{connection}{on_return} || sub {};  # oh well
                 $onret_cb->($frame);
             };
             $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
             return $self;
-        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack')) {
+        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
+                 $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
+            (my $resp = ref($method_frame)) =~ s/.*:://;
             my $cbs;
             if (!$self->{_is_confirm}) {
-                $failure_cb->("Received Ack when not in confirm mode");
-            }
-            elsif (not $cbs = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
-                $failure_cb->("Received Ack of unknown delivery tag $method_frame->{delivery_tag}");
+                $failure_cb->("Received $resp when not in confirm mode");
             }
             else {
-                $cbs->[0]->();
+                my @tags;
+                if ($method_frame->{multiple}) {
+                    @tags = sort { $a <=> $b }
+                              grep { $_ <= $method_frame->{delivery_tag} }
+                                keys %{$self->{_publish_cbs}};
+                }
+                else {
+                    @tags = ($method_frame->{delivery_tag});
+                }
+                my $cbi = ($resp eq 'Ack') ? 0 : 1;
+                for my $tag (@tags) {
+                    my $cbs;
+                    if (not $cbs = delete $self->{_publish_cbs}{$tag}) {
+                        $failure_cb->("Received $resp of unknown delivery tag $tag");
+                    }
+                    elsif ($cbs->[$cbi]) {
+                        $cbs->[$cbi]->();
+                    }
+                }
             }
+            return $self;
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
             $self->{_is_active} = $method_frame->active;
             $self->{connection}->_push_write(
@@ -714,6 +763,9 @@ sub push_queue_or_consume {
                 ),
                 $self->{id},
             );
+            my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
+            my $cb = $self->{$cbname} || $self->{connection}{$cbname} || sub {};
+            $cb->($frame);
             return $self;
         }
         $self->{_queue}->push($frame);

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list