[libanyevent-rabbitmq-perl] 122/151: finish on_return at message, channel, and connection level
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:09 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit a31379e003651123b28b057d041a59c9cee97859
Author: Chip Salzenberg <chip at topsy.com>
Date: Thu Sep 13 18:39:28 2012 -0700
finish on_return at message, channel, and connection level
---
lib/AnyEvent/RabbitMQ.pm | 10 ++++--
lib/AnyEvent/RabbitMQ/Channel.pm | 66 ++++++++++++++++++++++++----------------
2 files changed, 47 insertions(+), 29 deletions(-)
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 99b7065..e0f9114 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -167,8 +167,8 @@ sub _read_loop {
my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
if ($self->{verbose}) {
- warn '[C] <-- [S] ' . Dumper($frame);
- warn '-----------', "\n";
+ warn '[C] <-- [S] ', Dumper($frame),
+ '-----------', "\n";
}
my $id = $frame->channel;
@@ -583,6 +583,10 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
},
on_failure => $cv,
on_read_failure => sub {die @_},
+ on_return => sub {
+ my $frame = shift;
+ die "Unable to deliver ", Dumper($frame);
+ }
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text;
@@ -599,7 +603,7 @@ You can use AnyEvent::RabbitMQ to -
* Declare and delete exchanges
* Declare, delete, bind and unbind queues
- * Set QoS
+ * Set QoS and confirm mode
* Publish, consume, get, ack, recover and reject messages
* Select, commit and rollback transactions
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index f3390e2..f542676 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -3,8 +3,9 @@ package AnyEvent::RabbitMQ::Channel;
use strict;
use warnings;
-use Scalar::Util qw(weaken);
use AnyEvent::RabbitMQ::LocalQueue;
+use Scalar::Util qw(weaken);
+use Carp qw(croak);
BEGIN { *Dumper = \&AnyEvent::RabbitMQ::Dumper }
our $VERSION = '1.08';
@@ -311,10 +312,24 @@ sub publish {
my $header_args = delete $args{header};
my $body = delete $args{body};
my $ack_cb = delete $args{on_ack};
+ my $return_cb = delete $args{on_return};
defined($header_args) or $header_args = {};
defined($body) or $body = '';
- defined($ack_cb) or $ack_cb = sub {};
+ defined($ack_cb) or defined($return_cb)
+ and !$self->{_is_confirm}
+ and croak "Can't set on_ack or on_return callback when not in confirm mode";
+
+ my $tag;
+ if ($self->{_is_confirm}) {
+ # yeah, delivery tags in acks are sequential. see Java client
+ $tag = ++$self->{_publish_tag};
+ if ($return_cb) {
+ $header_args = { %$header_args };
+ $header_args->{headers}{_return} = $tag; # just reuse the same value, why not
+ }
+ $self->{_publish_cbs}{$tag} = [$ack_cb, $return_cb];
+ }
$self->_publish(
%args,
@@ -324,16 +339,6 @@ sub publish {
$body,
);
- if ($self->{_is_confirm}) {
- # yeah, they're sequential. see Java client
- my $tag = ++$self->{_publish_tag};
- $self->{_publish_cbs}{$tag} = $ack_cb;
- }
- else {
- # we do not expect an ack, so assume success
- $ack_cb->();
- }
-
return $self;
}
@@ -356,11 +361,13 @@ sub _publish {
}
sub _header {
- my ($self, $args, $body,) = @_;
+ my ($self, $args, $body) = @_;
+
+ my $weight = delete $args->{weight} || 0;
$self->{connection}->_push_write(
Net::AMQP::Frame::Header->new(
- weight => $args->{weight} || 0,
+ weight => $weight,
body_size => length($body),
header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
content_type => 'application/octet-stream',
@@ -545,19 +552,19 @@ sub confirm {
weaken(my $wself = $self);
$self->{connection}->_push_write_and_read(
- 'Confirm::Select',
- {
- %args,
- nowait => 0, # FIXME
- },
- 'Confirm::SelectOk',
+ 'Confirm::Select',
+ {
+ %args,
+ nowait => 0, # FIXME
+ },
+ 'Confirm::SelectOk',
sub {
my $me = $wself or return;
$me->{_is_confirm} = 1;
$cb->();
},
- $failure_cb,
- $self->{id},
+ $failure_cb,
+ $self->{id},
);
return $self;
@@ -678,19 +685,26 @@ sub push_queue_or_consume {
$self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
- my $cb = $self->{on_return} || $failure_cb;
+ my $cb = sub {
+ my $ret = shift;
+ my $headers = $ret->{header}->headers || {};
+ my $tag = $headers->{_return_tag};
+ my $cbs = $self->{_publish_cbs}{$headers->{_return}};
+ my $onret_cb = ($cbs && $cbs->[1]) || $self->{on_return} || $self->{connection}{on_return} || sub {}; # oh well
+ $onret_cb->($frame);
+ };
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack')) {
- my $pub_cb;
+ my $cbs;
if (!$self->{_is_confirm}) {
$failure_cb->("Received Ack when not in confirm mode");
}
- elsif (not $pub_cb = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
+ elsif (not $cbs = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
$failure_cb->("Received Ack of unknown delivery tag $method_frame->{delivery_tag}");
}
else {
- $pub_cb->();
+ $cbs->[0]->();
}
} elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
$self->{_is_active} = $method_frame->active;
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list