[libanyevent-rabbitmq-perl] 123/151: Handle flow control (do not throw messages away!) Handle out-of-order CancelOk Handle server-sent Nack Add missing accessors, e.g. is_open and is_active and verbose Fix a potential memory leak
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:09 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit b867a9b226978034988595df1b7fc25fd92e2852
Author: Chip Salzenberg <chip at topsy.com>
Date: Tue Sep 18 13:28:42 2012 -0700
Handle flow control (do not throw messages away!)
Handle out-of-order CancelOk
Handle server-sent Nack
Add missing accessors, e.g. is_open and is_active and verbose
Fix a potential memory leak
---
lib/AnyEvent/RabbitMQ.pm | 12 ++++-
lib/AnyEvent/RabbitMQ/Channel.pm | 106 +++++++++++++++++++++++++++++----------
2 files changed, 90 insertions(+), 28 deletions(-)
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index e0f9114..25169cb 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -47,6 +47,16 @@ sub new {
}, $class;
}
+sub verbose {
+ my $self = shift;
+ @_ ? ($self->{verbose} = shift) : $self->{verbose}
+}
+
+sub is_open {
+ my $self = shift;
+ $self->{_is_open}
+}
+
sub channels {
my $self = shift;
return $self->{_channels};
@@ -55,7 +65,7 @@ sub channels {
sub delete_channel {
my $self = shift;
my ($id) = @_;
- return delete $self->{_channels}->{$id};
+ return defined delete $self->{_channels}->{$id};
}
sub login_user {
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index f542676..da0b1dd 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -14,7 +14,7 @@ sub new {
my $class = shift;
my $self = bless {
- @_, # id, connection, on_return, on_close
+ @_, # id, connection, on_return, on_close, on_inactive, on_active
_queue => AnyEvent::RabbitMQ::LocalQueue->new,
_content_queue => AnyEvent::RabbitMQ::LocalQueue->new,
}, $class;
@@ -32,12 +32,28 @@ sub _reset {
_publish_tag => 0,
_publish_cbs => {},
_consumer_cbs => {},
+ _consumer_cans => {},
);
@$self{keys %a} = values %a;
return $self;
}
+sub is_open {
+ my $self = shift;
+ return $self->{_is_open};
+}
+
+sub is_active {
+ my $self = shift;
+ return $self->{_is_active};
+}
+
+sub is_confirm {
+ my $self = shift;
+ return $self->{_is_confirm};
+}
+
sub queue {
my $self = shift;
return $self->{_queue};
@@ -123,8 +139,10 @@ sub _close {
my $on_close = $self->{on_close};
$self->{_is_open} = 0;
- $self->{_queue}->_flush($frame);
- $self->{_content_queue}->_flush($frame);
+ if ($frame) {
+ $self->{_queue}->_flush($frame);
+ $self->{_content_queue}->_flush($frame);
+ }
$self->_reset;
$connection->delete_channel($self->{id}) if $connection;
@@ -132,7 +150,7 @@ sub _close {
if (defined $on_close) {
local $@;
$on_close->($frame);
- warn "Error in callback, ignored:\n $@ " if $@;
+ warn "Error in channel on_close callback, ignored:\n $@ " if $@;
}
return $self;
@@ -307,18 +325,26 @@ sub publish {
my $self = shift;
my %args = @_;
- return $self if !$self->{_is_active};
+ # Docs should advise channel-level callback over this, but still, better to give user an out
+ unless ($self->{_is_active}) {
+ if (defined $args{on_inactive}) {
+ $args{on_inactive}->();
+ return $self;
+ }
+ croak "Can't publish on inactive channel (server flow control); provide on_inactive callback";
+ }
my $header_args = delete $args{header};
my $body = delete $args{body};
my $ack_cb = delete $args{on_ack};
+ my $nack_cb = delete $args{on_nack};
my $return_cb = delete $args{on_return};
defined($header_args) or $header_args = {};
defined($body) or $body = '';
- defined($ack_cb) or defined($return_cb)
+ defined($ack_cb) or defined($nack_cb) or defined($return_cb)
and !$self->{_is_confirm}
- and croak "Can't set on_ack or on_return callback when not in confirm mode";
+ and croak "Can't set on_ack/on_nack/on_return callback when not in confirm mode";
my $tag;
if ($self->{_is_confirm}) {
@@ -328,7 +354,7 @@ sub publish {
$header_args = { %$header_args };
$header_args->{headers}{_return} = $tag; # just reuse the same value, why not
}
- $self->{_publish_cbs}{$tag} = [$ack_cb, $return_cb];
+ $self->{_publish_cbs}{$tag} = [$ack_cb, $nack_cb, $return_cb];
}
$self->_publish(
@@ -453,19 +479,13 @@ sub cancel {
return $self;
}
- $self->{connection}->_push_write_and_read(
- 'Basic::Cancel',
- {
+ $self->{_consumer_cans}{$args{consumer_tag}} = $cb;
+
+ $self->{connection}->_push_write(
+ Net::AMQP::Protocol::Basic::Cancel->new(
%args, # consumer_tag
nowait => 0,
- },
- 'Basic::CancelOk',
- sub {
- my $frame = shift;
- delete $self->{_consumer_cbs}->{$args{consumer_tag}};
- $cb->($frame);
- },
- $failure_cb,
+ ),
$self->{id},
);
@@ -684,28 +704,57 @@ sub push_queue_or_consume {
} || sub {};
$self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
return $self;
+ } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::CancelOk')) {
+ my $can_cb = delete $self->{_consumer_cans}{$method_frame->consumer_tag};
+ if ($can_cb) {
+ $can_cb->($method_frame);
+ }
+ else {
+ $failure_cb->("Received CancelOk for unknown consumer tag " . $method_frame->consumer_tag);
+ }
+ return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
+ weaken(my $wself = $self);
my $cb = sub {
my $ret = shift;
+ my $me = $wself or return;
my $headers = $ret->{header}->headers || {};
my $tag = $headers->{_return_tag};
- my $cbs = $self->{_publish_cbs}{$headers->{_return}};
- my $onret_cb = ($cbs && $cbs->[1]) || $self->{on_return} || $self->{connection}{on_return} || sub {}; # oh well
+ my $cbs = $me->{_publish_cbs}{$headers->{_return}};
+ my $onret_cb = ($cbs && $cbs->[1]) || $me->{on_return} || $me->{connection}{on_return} || sub {}; # oh well
$onret_cb->($frame);
};
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
return $self;
- } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack')) {
+ } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Ack') ||
+ $method_frame->isa('Net::AMQP::Protocol::Basic::Nack')) {
+ (my $resp = ref($method_frame)) =~ s/.*:://;
my $cbs;
if (!$self->{_is_confirm}) {
- $failure_cb->("Received Ack when not in confirm mode");
- }
- elsif (not $cbs = delete $self->{_publish_cbs}{$method_frame->{delivery_tag}}) {
- $failure_cb->("Received Ack of unknown delivery tag $method_frame->{delivery_tag}");
+ $failure_cb->("Received $resp when not in confirm mode");
}
else {
- $cbs->[0]->();
+ my @tags;
+ if ($method_frame->{multiple}) {
+ @tags = sort { $a <=> $b }
+ grep { $_ <= $method_frame->{delivery_tag} }
+ keys %{$self->{_publish_cbs}};
+ }
+ else {
+ @tags = ($method_frame->{delivery_tag});
+ }
+ my $cbi = ($resp eq 'Ack') ? 0 : 1;
+ for my $tag (@tags) {
+ my $cbs;
+ if (not $cbs = delete $self->{_publish_cbs}{$tag}) {
+ $failure_cb->("Received $resp of unknown delivery tag $tag");
+ }
+ elsif ($cbs->[$cbi]) {
+ $cbs->[$cbi]->();
+ }
+ }
}
+ return $self;
} elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Flow')) {
$self->{_is_active} = $method_frame->active;
$self->{connection}->_push_write(
@@ -714,6 +763,9 @@ sub push_queue_or_consume {
),
$self->{id},
);
+ my $cbname = $self->{_is_active} ? 'on_active' : 'on_inactive';
+ my $cb = $self->{$cbname} || $self->{connection}{$cbname} || sub {};
+ $cb->($frame);
return $self;
}
$self->{_queue}->push($frame);
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list