[libanyevent-rabbitmq-perl] 23/151: Fixed a function for close connection.
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:01 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit 7c3e1eba3b0dcda78ae9f8ca85c56b29e6236b68
Author: cooldaemon <cooldaemon at gmail.com>
Date: Mon Feb 22 16:23:18 2010 +0900
Fixed a function for close connection.
---
README | 2 +-
lib/AnyEvent/RabbitMQ.pm | 88 +++++++++++++++++++++++---------------
lib/AnyEvent/RabbitMQ/Channel.pm | 6 +++
lib/RabbitFoot.pm | 2 +-
lib/RabbitFoot/Cmd/Role/Command.pm | 31 ++++++++++++--
xt/04_anyevent.t | 8 ++++
6 files changed, 96 insertions(+), 41 deletions(-)
diff --git a/README b/README
index 09c2bb8..05f2035 100644
--- a/README
+++ b/README
@@ -10,7 +10,7 @@ You can use RabbitFoot to -
* Publish, consume, get, ack and recover messages
* Select, commit and rollback transactions
-RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+RabbitFoot is known to work with RabbitMQ versions 1.7.2 and version 0-8 of the AMQP specification.
INSTALLATION
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 46ee654..96a08ff 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -75,7 +75,9 @@ sub connect {
my $self = shift;
my %args = $self->_set_cbs(@_);
- $args{timeout} ||= 0;
+ $args{on_close} ||= sub {};
+ $args{on_read_failure} ||= sub {die @_};
+ $args{timeout} ||= 0;
if ($self->verbose) {
print STDERR 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
@@ -95,7 +97,7 @@ sub connect {
$args{on_failure}->($message);
}
);
- $self->_read_loop($args{on_failure});
+ $self->_read_loop($args{on_close}, $args{on_read_failure});
$self->_start(%args,);
},
sub {
@@ -107,7 +109,7 @@ sub connect {
}
sub _read_loop {
- my ($self, $failure_cb,) = @_;
+ my ($self, $close_cb, $failure_cb,) = @_;
return if !$self->_handle;
@@ -115,8 +117,8 @@ sub _read_loop {
my $data = $_[1];
my $stack = $_[1];
- if (length($data) <= 0) {
- $failure_cb->('Disconnect');
+ if (length($data) <= 7) {
+ $failure_cb->('Broken data was received');
@_ = ($self, $failure_cb,);
goto &_read_loop;
}
@@ -137,15 +139,25 @@ sub _read_loop {
print STDERR '-----------', "\n";
}
- return if !$self->_check_close_and_clean($frame, $failure_cb,);
-
my $id = $frame->channel;
+ return if !$self->_check_close_and_clean(
+ $frame, $close_cb, $failure_cb, $id,
+ );
+
if (0 == $id) {
+ return if !$self->_check_close_and_clean($frame, $close_cb, $id,);
$self->_queue->push($frame);
- } elsif ($self->get_channel($id)) {
- $self->get_channel($id)->_push_queue_or_consume($frame, $failure_cb);
} else {
- $failure_cb->('Unknown channel id: ' . $frame->channel);
+ my $channel = $self->get_channel($id);
+ if ($channel) {
+ if (
+ $self->_check_channel_close_and_clean($frame, $id, $channel)
+ ) {
+ $channel->_push_queue_or_consume($frame, $failure_cb);
+ }
+ } else {
+ $failure_cb->('Unknown channel id: ' . $frame->channel);
+ }
}
@_ = ($self, $failure_cb,);
@@ -157,39 +169,33 @@ sub _read_loop {
}
sub _check_close_and_clean {
- my ($self, $frame, $failure_cb, $id,) = @_;
+ my ($self, $frame, $close_cb, $id,) = @_;
return 1 if !$frame->isa('Net::AMQP::Frame::Method');
my $method_frame = $frame->method_frame;
+ return 1 if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
- if ($method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
- $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
- $self->{_is_open} = 0;
- $self->_disconnect();
- $failure_cb->(
- $method_frame->reply_code . ' ' . $method_frame->reply_text
- );
- return;
- } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
- $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
+ $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
+ $self->{_is_open} = 0;
+ $self->_disconnect();
+ $close_cb->($frame);
+ return;
+}
- my $id = $frame->channel;
- my $message = $method_frame->reply_code . ' ' . $method_frame->reply_text;
+sub _check_channel_close_and_clean {
+ my ($self, $frame, $id, $channel,) = @_;
- my $channel = $self->get_channel($id);
- if (!$channel) {
- $failure_cb->("Unknown channel id: ${id}\n${message}");
- return;
- }
+ return 1 if !$frame->isa('Net::AMQP::Frame::Method');
- $channel->{_is_open} = 0;
- $self->delete_channel($id);
- $failure_cb->($message);
- return;
- }
+ my $method_frame = $frame->method_frame;
+ return 1 if !$method_frame->isa('Net::AMQP::Protocol::Channel::Close');
- return 1;
+ $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
+ $channel->{_is_open} = 0;
+ $channel->on_close->($frame);
+ $self->delete_channel($id);
+ return;
}
sub _start {
@@ -352,6 +358,8 @@ sub open_channel {
my $self = shift;
my %args = $self->_set_cbs(@_);
+ $args{on_close} ||= sub {};
+
my $id = $args{id};
return $args{on_failure}->("Channel id $id is already in use")
if $id && $self->get_channel($id);
@@ -368,6 +376,7 @@ sub open_channel {
my $channel = AnyEvent::RabbitMQ::Channel->new(
id => $id,
connection => $self,
+ on_close => $args{on_close},
);
$self->set_channel($id => $channel);
@@ -518,9 +527,18 @@ AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
);
},
on_failure => $cv,
+ on_close => sub {
+ my $method_frame = shift->method_frame;
+ die $method_frame->reply_code, $method_frame->reply_text;
+ }
);
},
on_failure => $cv,
+ on_read_failure => sub {die @_},
+ on_close => sub {
+ my $method_frame = shift->method_frame;
+ die $method_frame->reply_code, $method_frame->reply_text;
+ },
);
print $cv->recv, "\n";
@@ -537,7 +555,7 @@ You can use AnyEvent::RabbitMQ to -
* Publish, consume, get, ack and recover messages
* Select, commit and rollback transactions
-AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 1.7.2 and version 0-8 of the AMQP specification.
=head1 AUTHOR
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index fe34511..79c4188 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -19,6 +19,12 @@ has connection => (
weak_ref => 1,
);
+has on_close => (
+# isa => 'CodeRef',
+ is => 'rw',
+ required => 1,
+);
+
has _is_open => (
isa => 'Bool',
is => 'rw',
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index 35a160c..ee86a9c 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -101,7 +101,7 @@ You can use RabbitFoot to -
* Publish, consume, get, ack and recover messages
* Select, commit and rollback transactions
-RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+RabbitFoot is known to work with RabbitMQ versions 1.7.2 and version 0-8 of the AMQP specification.
=head1 AUTHOR
diff --git a/lib/RabbitFoot/Cmd/Role/Command.pm b/lib/RabbitFoot/Cmd/Role/Command.pm
index c6ca24a..e6b1eb7 100644
--- a/lib/RabbitFoot/Cmd/Role/Command.pm
+++ b/lib/RabbitFoot/Cmd/Role/Command.pm
@@ -1,6 +1,7 @@
package RabbitFoot::Cmd::Role::Command;
use FindBin;
+use Coro;
use RabbitFoot;
use Moose::Role;
@@ -114,16 +115,38 @@ sub execute {
my $self = shift;
my ($opt, $args,) = @_;
- my $ch = RabbitFoot->new(
+ my $rf = RabbitFoot->new(
verbose => $self->verbose,
)->load_xml_spec(
$self->spec,
)->connect(
- timeout => 5,
- (map {$_ => $self->$_} qw(host port user pass vhost))
- )->open_channel();
+ (map {$_ => $self->$_} qw(host port user pass vhost)),
+ timeout => 5,
+ on_close => unblock_sub {
+ $self->_close(shift);
+ exit; # FIXME
+ },
+ );
+
+ my $ch = $rf->open_channel(
+ on_close => unblock_sub {
+ $self->_close(shift);
+ $rf->close;
+ exit;
+ },
+ );
$self->_run($ch, @_,);
+
+ $ch->close;
+ $rf->close;
+ return;
+}
+
+sub _close {
+ my $self = shift;
+ my $method_frame = shift->method_frame;
+ print $method_frame->reply_code, ' ', $method_frame->reply_text, "\n";
return;
}
diff --git a/xt/04_anyevent.t b/xt/04_anyevent.t
index c8c6370..5cb0749 100644
--- a/xt/04_anyevent.t
+++ b/xt/04_anyevent.t
@@ -41,6 +41,10 @@ $ar->connect(
$done->send;
},
on_failure => failure_cb($done),
+ on_close => sub {
+ my $method_frame = shift->method_frame;
+ die $method_frame->reply_code, $method_frame->reply_text;
+ },
);
$done->recv;
@@ -53,6 +57,10 @@ $ar->open_channel(
$done->send;
},
on_failure => failure_cb($done),
+ on_close => sub {
+ my $method_frame = shift->method_frame;
+ die $method_frame->reply_code, $method_frame->reply_text;
+ },
);
$done->recv;
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list