[libanyevent-rabbitmq-perl] 122/151: finish on_return at message, channel, and connection level

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:09 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit a31379e003651123b28b057d041a59c9cee97859
Author: Chip Salzenberg <chip at topsy.com>
Date:   Thu Sep 13 18:39:28 2012 -0700

    finish on_return at message, channel, and connection level
---
 lib/AnyEvent/RabbitMQ.pm         | 10 ++++--
 lib/AnyEvent/RabbitMQ/Channel.pm | 66 ++++++++++++++++++++++++----------------
 2 files changed, 47 insertions(+), 29 deletions(-)

diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 99b7065..e0f9114 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -167,8 +167,8 @@ sub _read_loop {
             my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
 
             if ($self->{verbose}) {
-                warn '[C] <-- [S] ' . Dumper($frame);
-                warn '-----------', "\n";
+                warn '[C] <-- [S] ', Dumper($frame),
+                     '-----------', "\n";
             }
 
             my $id = $frame->channel;
@@ -583,6 +583,10 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
       },
       on_failure => $cv,
       on_read_failure => sub {die @_},
+      on_return  => sub {
+          my $frame = shift;
+          die "Unable to deliver ", Dumper($frame);
+      }
       on_close   => sub {
           my $method_frame = shift->method_frame;
           die $method_frame->reply_code, $method_frame->reply_text;
@@ -599,7 +603,7 @@ You can use AnyEvent::RabbitMQ to -
 
   * Declare and delete exchanges
   * Declare, delete, bind and unbind queues
-  * Set QoS
+  * Set QoS and confirm mode
   * Publish, consume, get, ack, recover and reject messages
   * Select, commit and rollback transactions
 
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index f3390e2..f542676 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -3,8 +3,9 @@ package AnyEvent::RabbitMQ::Channel;
 use strict;
 use warnings;
 
-use Scalar::Util qw(weaken);
 use AnyEvent::RabbitMQ::LocalQueue;
+use Scalar::Util qw(weaken);
+use Carp qw(croak);
 BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
 
 our $VERSION = '1.08';
@@ -311,10 +312,24 @@ sub publish {
     my $header_args = delete $args{header};
     my $body        = delete $args{body};
     my $ack_cb      = delete $args{on_ack};
+    my $return_cb   = delete $args{on_return};
 
     defined($header_args) or $header_args = {};
     defined($body) or $body = '';
-    defined($ack_cb) or $ack_cb = sub {};
+    defined($ack_cb) or defined($return_cb)
+       and !$self->{_is_confirm}
+       and croak "Can't set on_ack or on_return callback when not in confirm mode";
+
+    my $tag;
+    if ($self->{_is_confirm}) {
+        # yeah, delivery tags in acks are sequential.  see Java client
+        $tag = ++$self->{_publish_tag};
+        if ($return_cb) {
+            $header_args = { %$header_args };
+            $header_args->{headers}{_return} = $tag;  # just reuse the same value, why not
+        }
+        $self->{_publish_cbs}{$tag} = [$ack_cb, $return_cb];
+    }
 
     $self->_publish(
         %args,
@@ -324,16 +339,6 @@ sub publish {
         $body,
     );
 
-    if ($self->{_is_confirm}) {
-        # yeah, they're sequential.  see Java client
-        my $tag = ++$self->{_publish_tag};
-        $self->{_publish_cbs}{$tag} = $ack_cb;
-    }
-    else {
-        # we do not expect an ack, so assume success
-        $ack_cb->();
-    }
-
     return $self;
 }
 
@@ -356,11 +361,13 @@ sub _publish {
 }
 
 sub _header {
-    my ($self, $args, $body,) = @_;
+    my ($self, $args, $body) = @_;
+
+    my $weight = delete $args->{weight} || 0;
 
     $self->{connection}->_push_write(
         Net::AMQP::Frame::Header->new(
-            weight       => $args->{weight} || 0,
+            weight       => $weight,
             body_size    => length($body),
             header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
                 content_type     => 'application/octet-stream',
@@ -545,19 +552,19 @@ sub confirm {
     weaken(my $wself = $self);
 
     $self->{connection}->_push_write_and_read(
-	'Confirm::Select',
-	{
-	    %args,
-	    nowait       => 0, # FIXME
-	},
-	'Confirm::SelectOk',
+        'Confirm::Select',
+        {
+            %args,
+            nowait       => 0, # FIXME
+        },
+        'Confirm::SelectOk',
         sub {
             my $me = $wself or return;
             $me->{_is_confirm} = 1;
             $cb->();
         },
-	$failure_cb,
-	$self->{id},
+        $failure_cb,
+        $self->{id},
     );
 
     return $self;
@@ -678,19 +685,26 @@ sub push_queue_or_consume {
             $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
             return $self;
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
-            my $cb = $self->{on_return} || $failure_cb;
+            my $cb = sub {
+                my $ret = shift;
+                my $headers = $ret->{header}->headers || {};
+                my $tag = $headers->{_return_tag};
+                my $cbs = $self->{_publish_cbs}{$headers->{_return}};
+                my $onret_cb = ($cbs && $cbs->[1]) || $self->{on_return} || $self->{connection}{on_return} || sub {};  # oh well
+                $onret_cb->($frame);
+            };
             $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
             return $self;
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack')) {
-            my $pub_cb;
+            my $cbs;
             if (!$self->{_is_confirm}) {
                 $failure_cb->("Received Ack when not in confirm mode");
             }
-            elsif (not $pub_cb = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
+            elsif (not $cbs = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
                 $failure_cb->("Received Ack of unknown delivery tag $method_frame->{delivery_tag}");
             }
             else {
-                $pub_cb->();
+                $cbs->[0]->();
             }
         } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
             $self->{_is_active} = $method_frame->active;

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list