[libanyevent-rabbitmq-perl] 14/151: Used Coro.
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:00 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit 8f8ca410c14b549ca918869838d7feb271c47858
Author: cooldaemon <cooldaemon at gmail.com>
Date: Wed Feb 10 11:37:56 2010 +0900
Used Coro.
---
Makefile.PL | 2 +
lib/AnyEvent/RabbitMQ.pm | 2 +-
lib/RabbitFoot.pm | 744 +++---------------------------------
lib/RabbitFoot/Channel.pm | 50 +++
xt/{05_anyevent.t => 04_anyevent.t} | 53 +--
xt/04_use_server.t | 197 ----------
xt/05_coro.t | 220 +++++++++++
7 files changed, 348 insertions(+), 920 deletions(-)
diff --git a/Makefile.PL b/Makefile.PL
index 81d4b82..2cede9f 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -12,6 +12,8 @@ requires 'List::MoreUtils';
requires 'Sys::SigAction';
requires 'Net::AMQP';
requires 'AnyEvent';
+requires 'Coro';
+requires 'Coro::AnyEvent';
tests 't/*.t';
author_tests 'xt';
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index b4f9a27..9496192 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -188,7 +188,7 @@ sub _check_close_and_clean {
return;
}
- $channel->_is_open(0);
+ $channel->{_is_open} = 0;
$self->delete_channel($id);
$failure_cb->($message);
return;
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index e08f850..a19800b 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -1,15 +1,14 @@
package RabbitFoot;
-use Data::Dumper;
-use List::MoreUtils qw(none);
-use IO::Socket::INET;
-use Sys::SigAction qw(timeout_call);
-use Net::AMQP;
-use Net::AMQP::Common qw(:all);
+use AnyEvent::RabbitMQ;
+use Coro;
+use Coro::AnyEvent;
+
+use RabbitFoot::Channel;
use Moose;
-our $VERSION = '0.01';
+our $VERSION = '0.02';
has verbose => (
isa => 'Bool',
@@ -22,706 +21,54 @@ has timeout => (
default => 1,
);
-has publish_timeout => (
- isa => 'Int',
- is => 'rw',
- default => 1,
-);
-
-has _socket => (
- isa => 'IO::Socket::INET',
- is => 'rw',
- clearer => 'clear_socket',
-);
-
-has _is_open => (
- isa => 'Bool',
- is => 'rw',
- default => 0,
-);
-
-has _is_oepn_channel => (
- isa => 'Bool',
- is => 'rw',
- default => 0,
+has _ar => (
+ isa => 'AnyEvent::RabbitMQ',
+ is => 'ro',
);
-has _consume_tag => (
- isa => 'Str',
- is => 'rw',
- default => '',
-);
+for my $method (qw(connect close)) {
+ __PACKAGE__->meta->add_method($method, sub {
+ my $self = shift;
+ $self->_do($method, @_);
+ return $self;
+ });
+}
__PACKAGE__->meta->make_immutable;
no Moose;
-sub load_xml_spec {
- my ($self, $file,) = @_;
-
- eval {
- Net::AMQP::Protocol->load_xml_spec($file);
- };
- die $@, "\n" if $@;
-
- return $self;
-}
-
-sub connect {
- my ($self, $args) = @_;
-
- eval {
- $self->_connect(
- $args,
- )->_start(
- $args,
- )->_tune(
- )->_open(
- $args,
- )->_open_channel(
- );
- };
-
- return $self if !$@;
-
- my $exception = $@;
- $self->close();
- die $exception;
-}
-
-sub _connect {
- my ($self, $args,) = @_;
-
- if ($self->verbose) {
- print STDERR 'connect to ', $args->{host}, ':', $args->{port}, '...', "\n";
- }
-
- my $socket = IO::Socket::INET->new(
- Proto => 'tcp',
- PeerAddr => $args->{host},
- PeerPort => $args->{port},
- Timeout => $self->timeout,
- ) or die 'Error connecting to AMQP Server!', "\n";
-
- $self->_socket($socket);
- return $self;
-}
-
-sub _start {
- my ($self, $args,) = @_;
-
- if ($self->verbose) {
- print STDERR 'post header', "\n";
- }
-
- print {$self->_socket} Net::AMQP::Protocol->header;
-
- my $frame = $self->_read_and_valid('Connection::Start');
-
- my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
- die 'AMQPLAIN is not found in mechanisms', "\n"
- if none {$_ eq 'AMQPLAIN'} @mechanisms;
-
- my @locales = split /\s/, $frame->method_frame->locales;
- die 'en_US is not found in locales', "\n"
- if none {$_ eq 'en_US'} @locales;
-
- $self->_post(
- Net::AMQP::Protocol::Connection::StartOk->new(
- client_properties => {
- platform => 'Perl',
- product => __PACKAGE__,
- information => 'http://d.hatena.ne.jp/cooldaemon/',
- version => '0.01',
- },
- mechanism => 'AMQPLAIN',
- response => {
- LOGIN => $args->{user},
- PASSWORD => $args->{pass},
- },
- locale => 'en_US',
- ),
- );
-
- return $self;
-}
-
-sub _tune {
- my ($self,) = @_;
-
- my $frame = $self->_read_and_valid('Connection::Tune');
-
- $self->_post(
- Net::AMQP::Protocol::Connection::TuneOk->new(
- channel_max => $frame->method_frame->channel_max,
- frame_max => $frame->method_frame->frame_max,
- heartbeat => $frame->method_frame->heartbeat,
- ),
- );
-
- return $self;
-}
-
-sub _open {
- my ($self, $args,) = @_;
-
- $self->_post_and_read(
- 'Connection::Open',
- {
- virtual_host => $args->{vhost},
- capabilities => '',
- insist => 1,
- },
- 'Connection::OpenOk',
- );
- $self->_is_open(1);
-
- return $self;
-}
-
-sub close {
- my ($self,) = @_;
-
- for my $method (qw(cancel _close_channel _close _disconnect)) {
- eval {$self->$method()};
- }
-
- return $self;
-}
-
-sub _close {
- my ($self,) = @_;
-
- return $self if !$self->_is_open;
-
- $self->_post_and_read('Connection::Close', {}, 'Connection::CloseOk',);
- $self->_is_open(0);
-
- return $self;
-}
-
-sub _disconnect {
- my ($self,) = @_;
-
- return if !$self->_socket;
-
- CORE::close $self->_socket;
- $self->clear_socket;
-
- return;
-}
-
-sub _open_channel {
- my ($self) = @_;
-
- $self->_post_and_read('Channel::Open', {}, 'Channel::OpenOk', 1,);
- $self->_is_oepn_channel(1);
-
- return $self;
-}
-
-sub _close_channel {
- my ($self,) = @_;
-
- return $self if !$self->_is_oepn_channel;
-
- $self->_post_and_read('Channel::Close', {}, 'Channel::CloseOk', 1,);
- $self->_is_oepn_channel(0);
-
- return $self;
-}
-
-sub declare_exchange {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Exchange::Declare',
- {
- type => 'direct',
- passive => 0,
- durable => 0,
- auto_delete => 0,
- internal => 0,
- %$args, # exchange
- ticket => 0,
- nowait => 0,
- },
- 'Exchange::DeclareOk',
- 1,
- );
-}
-
-sub delete_exchange {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Exchange::Delete',
- {
- if_unused => 0,
- %$args, # exchange
- ticket => 0,
- nowait => 0,
- },
- 'Exchange::DeleteOk',
- 1,
- );
-}
-
-sub declare_queue {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Queue::Declare',
- {
- queue => '',
- passive => 0,
- durable => 0,
- exclusive => 0,
- auto_delete => 0,
- %$args,
- ticket => 0,
- no_ack => 1,
- nowait => 0,
- },
- 'Queue::DeclareOk',
- 1,
- );
-}
-
-sub bind_queue {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Queue::Bind',
- {
- %$args, # queue, exchange, routing_key
- ticket => 0,
- nowait => 0,
- },
- 'Queue::BindOk',
- 1,
+sub BUILD {
+ my $self = shift;
+ $self->{_ar} = AnyEvent::RabbitMQ->new(
+ verbose => $self->verbose,
+ timeout => $self->timeout,
);
}
-sub unbind_queue {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Queue::Unbind',
- {
- %$args, # queue, exchange, routing_key
- ticket => 0,
- },
- 'Queue::UnbindOk',
- 1,
- );
-}
-
-sub purge_queue {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Queue::Purge',
- {
- %$args, # queue
- ticket => 0,
- nowait => 0,
- },
- 'Queue::PurgeOk',
- 1,
- );
-}
-
-sub delete_queue {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Queue::Delete',
- {
- if_unused => 0,
- if_empty => 0,
- %$args, # queue
- ticket => 0,
- nowait => 0,
- },
- 'Queue::DeleteOk',
- 1,
- );
-}
-
-sub publish {
- my ($self, $publish_args, $header_args, $message,) = @_;
-
- $self->_publish(
- $publish_args,
- )->_header(
- $header_args, $message,
- )->_body(
- $message,
- );
-
- return if !$publish_args->{mandatory} && !$publish_args->{immediate};
-
- my $frame = eval {
- $self->_read_and_valid('Basic::Return', $self->publish_timeout);
- };
-
- if ($@) {
- return if $@ =~ '^Read\stimed\sout';
- die $@;
- }
-
- return {
- return => $frame,
- header => $self->_read_header_and_valid(),
- body => $self->_read_body_and_valid(),
- };
-}
-
-sub _publish {
- my ($self, $args,) = @_;
-
- $self->_post(
- Net::AMQP::Protocol::Basic::Publish->new(
- exchange => '',
- mandatory => 0,
- immediate => 0,
- %$args, # routing_key
- ticket => 0,
- ),
- 1,
- );
-
- return $self;
-}
-
-sub _header {
- my ($self, $args, $message,) = @_;
-
- $self->_post(
- Net::AMQP::Frame::Header->new(
- weight => $args->{weight} || 0,
- body_size => length($message),
- header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
- content_type => 'application/octet-stream',
- content_encoding => '',
- headers => {},
- delivery_mode => 1,
- priority => 1,
- correlation_id => '',
- reply_to => '',
- expiration => '',
- message_id => '',
- timestamp => time,
- type => '',
- user_id => '',
- app_id => '',
- cluster_id => '',
- %$args,
- ),
- ),
- 1,
- );
-
- return $self;
-}
-
-sub _body {
- my ($self, $message,) = @_;
- $self->_post(Net::AMQP::Frame::Body->new(payload => $message), 1);
- return $self;
-}
-
-sub consume {
- my ($self, $args,) = @_;
-
- die 'Has already been consuming message', "\n" if $self->_consume_tag;
-
- my $frame = $self->_post_and_read(
- 'Basic::Consume',
- {
- consumer_tag => '',
- no_local => 0,
- no_ack => 1,
- exclusive => 0,
- %$args, # queue
- ticket => 0,
- nowait => 0,
- },
- 'Basic::ConsumeOk',
- 1,
- );
-
- $self->_consume_tag($frame->method_frame->consumer_tag);
- return $frame;
-}
-
-sub cancel {
- my ($self,) = @_;
-
- return if !$self->_consume_tag;
-
- my $frame = $self->_post_and_read(
- 'Basic::Cancel',
- {
- consumer_tag => $self->_consume_tag,
- nowait => 0,
- },
- 'Basic::CancelOk',
- 1,
- );
-
- $self->_consume_tag('');
- return $frame;
-}
-
-sub poll {
- my ($self, $args,) = @_;
-
- my $timeout = $args && $args->{timeout} ? $args->{timeout} : 'infinite';
- return {
- deliver => $self->_read_and_valid('Basic::Deliver', $timeout),
- header => $self->_read_header_and_valid(),
- body => $self->_read_body_and_valid(),
- };
-}
-
-sub get {
- my ($self, $args,) = @_;
-
- my $frame = $self->_post_and_read(
- 'Basic::Get',
- {
- no_ack => 1,
- %$args, # queue
- ticket => 0,
- },
- [qw(Basic::GetOk Basic::GetEmpty)],
- 1,
- );
-
- return $frame
- if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
-
- return {
- getok => $frame,
- header => $self->_read_header_and_valid(),
- body => $self->_read_body_and_valid(),
- };
-}
-
-sub ack {
- my ($self, $args,) = @_;
-
- $self->_post(
- Net::AMQP::Protocol::Basic::Ack->new(
- delivery_tag => 0,
- multiple => (
- defined $args->{delivery_tag} && $args->{delivery_tag} != 0 ? 0 : 1
- ),
- %$args,
- ),
- 1,
- );
- return;
-}
-
-sub qos {
- my ($self, $args,) = @_;
-
- return $self->_post_and_read(
- 'Basic::Qos',
- {
- prefetch_count => 1,
- %$args,
- prefetch_size => 0,
- global => 0,
- },
- 'Basic::QosOk',
- 1,
- );
-}
-
-sub recover {
- my ($self, $args,) = @_;
-
- $self->_post(
- Net::AMQP::Protocol::Basic::Recover->new(
- requeue => 0,
- %$args,
- ),
- 1,
- );
- return;
-}
-
-sub select_tx {
- my ($self,) = @_;
- return $self->_post_and_read('Tx::Select', {}, 'Tx::SelectOk', 1,);
-}
-
-sub commit_tx {
- my ($self,) = @_;
- return $self->_post_and_read('Tx::Commit', {}, 'Tx::CommitOk', 1,);
-}
-
-sub rollback_tx {
- my ($self,) = @_;
- return $self->_post_and_read('Tx::Rollback', {}, 'Tx::RollbackOk', 1,);
-}
-
-sub _post_and_read {
- my ($self, $method, $args, $exp, $id,) = @_;
-
- $method = 'Net::AMQP::Protocol::' . $method;
- $self->_post(
- Net::AMQP::Frame::Method->new(
- method_frame => $method->new(%$args)
- ),
- $id,
- );
- return $self->_read_and_valid($exp);
-}
-
-sub _read_and_valid {
- my ($self, $exp, $timeout,) = @_;
- $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
-
- my $frame = $self->_read($timeout);
- die 'Received data is not method frame', "\n"
- if !$frame->isa('Net::AMQP::Frame::Method');
-
- my $method_frame = $frame->method_frame;
- for my $exp_elem (@$exp) {
- return $frame if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
- }
-
- $self->_check_close_and_clean($frame);
- die 'Method is not ', join(',', @$exp), "\n",
- 'Method was ', ref $method_frame, "\n"
- if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
-}
-
-sub _read_header_and_valid {
- my ($self,) = @_;
-
- my $frame = $self->_read();
- if (!$frame->isa('Net::AMQP::Frame::Header')) {
- $self->_check_close_and_clean($frame);
- die 'Received data is not header frame', "\n";
- }
-
- my $header_frame = $frame->header_frame;
- die 'Header is not Protocol::Basic::ContentHeader',
- 'Header was ', ref $header_frame, "\n"
- if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
-
- return $frame;
-}
-
-sub _read_body_and_valid {
- my ($self,) = @_;
-
- my $frame = $self->_read();
- return $frame if $frame->isa('Net::AMQP::Frame::Body');
-
- $self->_check_close_and_clean($frame);
- die 'Received data is not body frame', "\n";
-}
-
-sub _check_close_and_clean {
- my ($self, $frame,) = @_;
-
- return $self if !$frame->isa('Net::AMQP::Frame::Method');
-
- my $method_frame = $frame->method_frame;
-
- if ($method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
- $self->_is_oepn_channel(0);
- $self->_post(Net::AMQP::Protocol::Connection::CloseOk->new());
- $self->_is_open(0);
- $self->_disconnect();
- die $method_frame->reply_code, ' ', $method_frame->reply_text, "\n";
- } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
- $self->_post(Net::AMQP::Protocol::Channel::CloseOk->new(), 1);
- $self->_is_oepn_channel(0);
- $self->_close()->_disconnect();
- die $method_frame->reply_code, ' ', $method_frame->reply_text, "\n";
- }
-
+sub load_xml_spec {
+ my $self = shift;
+ $self->{_ar}->load_xml_spec(@_);
return $self;
}
-sub _read {
- my ($self, $timeout,) = @_;
-
- $timeout ||= $self->timeout;
-
- my $frame;
-
- if ($timeout eq 'infinite') {
- $frame = $self->_do_read();
- } else {
- if (timeout_call($timeout, sub {$frame = $self->_do_read()})) {
- die 'Read timed out after', $timeout, "\n";
- }
- }
-
- return $frame;
+sub open_channel {
+ my $self = shift;
+ return RabbitFoot::Channel->new(arc => $self->_do('open_channel', @_));
}
-sub _do_read {
- my ($self,) = @_;
-
- my $stack;
- my $data;
-
- read $self->_socket, $data, 8;
- if (length($data) <= 0) {
- die 'Disconnect', "\n";
- }
-
- $stack .= $data;
-
- my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
- if (!defined $type_id || !defined $channel || !defined $length) {
- die 'Broken data was received', "\n";
- }
-
- while ($length > 0) {
- $length -= read $self->_socket, $data, $length;
- $stack .= $data;
- }
-
- my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
- if ($self->verbose) {
- print STDERR '[C] <-- [S] ' . Dumper($frame);
- print STDERR '-----------', "\n";
- }
-
- return $frame;
-}
-
-sub _post {
- my ($self, $output, $id,) = @_;
-
- if ($output->isa('Net::AMQP::Protocol::Base')) {
- $output = $output->frame_wrap;
- }
- $output->channel($id || 0);
-
- if ($self->verbose) {
- print STDERR '[C] --> [S] ', Dumper($output), "\n";
- }
- print {$self->_socket} $output->to_raw_frame();
-
- return;
-}
+sub _do {
+ my $self = shift;
+ my $method = shift;
+ my %args = @_;
-sub DEMOLISH {
- my ($self) = @_;
+ my $cb = Coro::rouse_cb;
+ $args{on_success} = sub {$cb->(1, @_);},
+ $args{on_failure} = sub {$cb->(0, @_);},
- $self->close();
- return;
+ $self->_ar->$method(%args);
+ my ($is_success, @responses) = Coro::rouse_wait;
+ die @responses if !$is_success;
+ return @responses;
}
1;
@@ -729,29 +76,30 @@ __END__
=head1 NAME
-RabbitFoot - A synchronous and single channel Perl AMQP client.
+RabbitFoot - An Asynchronous and single channel Perl AMQP client.
=head1 SYNOPSIS
use RabbitFoot;
- my $rf = RabbitFoot->new({
+ my $rf = RabbitFoot->new(
timeout => 1,
- })->load_xml_spec(
+ )->load_xml_spec(
'/path/to/amqp0-8.xml',
- )->connect({
+ )->connect(
host => 'localhosti',
port => 5672,
user => 'guest',
port => 'guest',
vhost => '/',
- });
+ );
- $rf->declare_exchange({exchange => 'test_exchange'});
+ my $ch = $rf->open_channel();
+ $ch->declare_exchange(exchange => 'test_exchange');
=head1 DESCRIPTION
-RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a synchronous fashion.
+RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.
You can use RabbitFoot to -
diff --git a/lib/RabbitFoot/Channel.pm b/lib/RabbitFoot/Channel.pm
new file mode 100644
index 0000000..c5d2538
--- /dev/null
+++ b/lib/RabbitFoot/Channel.pm
@@ -0,0 +1,50 @@
+package RabbitFoot::Channel;
+
+use Coro;
+use Coro::AnyEvent;
+
+use AnyEvent::RabbitMQ::Channel;
+
+use Moose;
+
+our $VERSION = '0.01';
+
+has arc => (
+ isa => 'AnyEvent::RabbitMQ::Channel',
+ is => 'ro',
+);
+
+for my $method (qw(
+ close
+ declare_exchange delete_exchange
+ declare_queue bind_queue unbind_queue purge_queue delete_queue
+ consume cancel get qos
+ select_tx commit_tx rollback_tx
+)) {
+ __PACKAGE__->meta->add_method($method, sub {
+ my $self = shift;
+ my %args = @_;
+
+ my $cb = Coro::rouse_cb;
+ $args{on_success} = sub {$cb->(1, @_);},
+ $args{on_failure} = sub {$cb->(0, @_);},
+
+ $self->arc->$method(%args);
+ my ($is_success, @responses) = Coro::rouse_wait;
+ die @responses if !$is_success;
+ return $responses[0];
+ });
+}
+
+for my $method (qw(publish ack recover)) {
+ __PACKAGE__->meta->add_method($method, sub {
+ my $self = shift;
+ $self->arc->$method(@_);
+ return $self;
+ });
+}
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+1;
diff --git a/xt/05_anyevent.t b/xt/04_anyevent.t
similarity index 89%
rename from xt/05_anyevent.t
rename to xt/04_anyevent.t
index f734e0b..1765f37 100644
--- a/xt/05_anyevent.t
+++ b/xt/04_anyevent.t
@@ -25,7 +25,7 @@ plan tests => 24;
use AnyEvent::RabbitMQ;
-my $ar = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 1,);
+my $ar = AnyEvent::RabbitMQ->new(timeout => 1,);
lives_ok sub {
$ar->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
@@ -91,32 +91,37 @@ $ch->bind_queue(
$done->recv;
$done = AnyEvent->condvar;
+my $consumer_tag;
$ch->consume(
queue => 'test_q',
+ on_success => sub {
+ my $frame = shift;
+ $consumer_tag = $frame->method_frame->consumer_tag;
+ pass('consume');
+ },
on_consume => sub {
my $response = shift;
- my $message = $response->{body}->payload;
-
- pass('publish and consume message');
- return if $message ne 'cancel';
-
- $ch->cancel(
- consumer_tag => $response->{deliver}->method_frame->consumer_tag,
- on_success => sub {
- pass('cancel');
- $done->send;
- },
- on_failure => failure_cb($done),
- );
+ ok($response->{body}->payload, 'publish');
+ $done->send;
},
on_failure => failure_cb($done),
);
-publish($ch, 'Hello RabbitMQ', $done,);
-publish($ch, 'cancel', $done,);
+publish($ch, 'Hello RabbitMQ.', $done,);
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->cancel(
+ consumer_tag => $consumer_tag,
+ on_success => sub {
+ pass('cancel');
+ $done->send;
+ },
+ on_failure => failure_cb($done),
+);
$done->recv;
$done = AnyEvent->condvar;
-publish($ch, 'I love RabbitMQ', $done,);
+publish($ch, 'I love RabbitMQ.', $done,);
$ch->get(
queue => 'test_q',
on_success => sub {
@@ -162,11 +167,11 @@ $ch->consume(
},
on_failure => failure_cb($done),
);
-publish($ch, 'NO RabbitMQ, NO LIFE', $done,);
+publish($ch, 'NO RabbitMQ, NO LIFE.', $done,);
$done->recv;
$done = AnyEvent->condvar;
-publish($ch, 'RabbitMQ is cool', $done,);
+publish($ch, 'RabbitMQ is cool.', $done,);
$ch->get(
queue => 'test_q',
no_ack => 0,
@@ -201,8 +206,8 @@ $ch->qos(
},
on_failure => failure_cb($done),
);
-publish($ch, 'RabbitMQ is excellent', $done,);
-publish($ch, 'RabbitMQ is fantastic', $done,);
+publish($ch, 'RabbitMQ is excellent.', $done,);
+publish($ch, 'RabbitMQ is fantastic.', $done,);
$done->recv;
pass('qos');
@@ -236,7 +241,7 @@ $ch->consume(
my $response = shift;
if (5 > ++$recover_count) {
- $ch->recover({});
+ $ch->recover();
return;
}
@@ -254,7 +259,7 @@ $ch->consume(
},
on_failure => failure_cb($done),
);
-publish($ch, 'RabbitMQ is powerful', $done,);
+publish($ch, 'RabbitMQ is powerful.', $done,);
$done->recv;
pass('recover');
@@ -262,7 +267,7 @@ $done = AnyEvent->condvar;
$ch->select_tx(
on_success => sub {
pass('select tx');
- publish($ch, 'RabbitMQ is highly reliable systems', $done,);
+ publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
$ch->rollback_tx(
on_success => sub {
diff --git a/xt/04_use_server.t b/xt/04_use_server.t
deleted file mode 100644
index 26a006f..0000000
--- a/xt/04_use_server.t
+++ /dev/null
@@ -1,197 +0,0 @@
-use Test::More;
-use Test::Exception;
-
-use FindBin;
-use JSON::Syck;
-
-my $conf = JSON::Syck::LoadFile($FindBin::Bin . '/../config.json');
-
-eval {
- use IO::Socket::INET;
-
- my $socket = IO::Socket::INET->new(
- Proto => 'tcp',
- PeerAddr => $conf->{host},
- PeerPort => $conf->{port},
- Timeout => 1,
- ) or die 'Error connecting to AMQP Server!';
-
- close $socket;
-};
-
-plan skip_all => 'Connection failure: '
- . $conf->{host} . ':' . $conf->{port} if $@;
-plan tests => 23;
-
-use RabbitFoot;
-
-my $rf = RabbitFoot->new({timeout => 1, verbose => 1,});
-
-lives_ok sub {
- $rf->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
-}, 'load xml spec';
-
-lives_ok sub {
- $rf->connect({(map {$_ => $conf->{$_}} qw(host port user pass vhost))});
-}, 'connect';
-
-lives_ok sub {
- $rf->declare_exchange({exchange => 'test_x'});
-}, 'declare_exchange';
-
-lives_ok sub {
- $rf->declare_queue({queue => 'test_q'});
-}, 'declare_queue';
-
-lives_ok sub {
- $rf->bind_queue({
- queue => 'test_q',
- exchange => 'test_x',
- routing_key => 'test_r',
- });
-}, 'bind_queue';
-
-lives_ok sub {
- publish($rf, 'Hello RabbitMQ.');
-}, 'publish for consume';
-
-lives_ok sub {
- $rf->consume({queue => 'test_q'});
-}, 'consume';
-
-lives_ok sub {
- $rf->poll({timeout => 1});
-}, 'poll';
-
-lives_ok sub {
- $rf->cancel();
-}, 'cancel';
-
-lives_ok sub {
- publish($rf, 'I love RabbitMQ.');
- $rf->get({queue => 'test_q'});
-}, 'get';
-
-lives_ok sub {
- $rf->get({queue => 'test_q'});
-}, 'get empty';
-
-lives_ok sub {
- publish($rf, 'NO RabbitMQ, NO LIFE.');
- $rf->consume({
- queue => 'test_q',
- no_ack => 0,
- });
- my $response = $rf->poll({timeout => 1});
- $rf->ack({
- delivery_tag => $response->{deliver}->method_frame->delivery_tag,
- });
- $rf->cancel();
-}, 'ack deliver';
-
-lives_ok sub {
- publish($rf, 'RabbitMQ is cool.');
- my $response = $rf->get({
- queue => 'test_q',
- no_ack => 0,
- });
- $rf->ack({
- delivery_tag => $response->{getok}->method_frame->delivery_tag,
- });
-}, 'ack get';
-
-lives_ok sub {
- publish($rf, 'RabbitMQ is excellent.');
- publish($rf, 'RabbitMQ is fantastic.');
- $rf->qos({prefetch_count => 2});
-
- $rf->consume({
- queue => 'test_q',
- no_ack => 0,
- });
-
- my @responses = map {$rf->poll({timeout => 1})} (1, 2);
- for my $response (@responses) {
- $rf->ack({
- delivery_tag => $response->{deliver}->method_frame->delivery_tag,
- });
- }
-
- $rf->cancel();
- $rf->qos({});
-}, 'qos';
-
-lives_ok sub {
- publish($rf, 'RabbitMQ is powerful.');
-
- $rf->consume({
- queue => 'test_q',
- no_ack => 0,
- });
-
- for (1..5) {
- my $response = $rf->poll({timeout => 1});
- $rf->recover({});
- }
-
- my $response = $rf->poll({timeout => 1});
- $rf->ack({
- delivery_tag => $response->{deliver}->method_frame->delivery_tag,
- });
-
- $rf->cancel();
-}, 'recover';
-
-lives_ok sub {
- $rf->select_tx();
-}, 'select_tx';
-
-lives_ok sub {
- publish($rf, 'RabbitMQ is highly reliable systems.');
- $rf->rollback_tx();
-}, 'rollback_tx';
-
-lives_ok sub {
- publish($rf, 'RabbitMQ is highly reliable systems.');
- $rf->commit_tx();
-}, 'commit_tx';
-
-lives_ok sub {
- $rf->purge_queue({queue => 'test_q'});
-}, 'purge_queue';
-
-lives_ok sub {
- $rf->unbind_queue({
- queue => 'test_q',
- exchange => 'test_x',
- routing_key => 'test_r',
- });
-}, 'unbind_queue';
-
-lives_ok sub {
- $rf->delete_queue({queue => 'test_q'});
-}, 'delete_queue';
-
-lives_ok sub {
- $rf->delete_exchange({exchange => 'test_x'});
-}, 'delete_exchange';
-
-lives_ok sub {
- $rf->close();
-}, 'close';
-
-sub publish {
- my ($rf, $message,) = @_;
-
- $rf->publish(
- {
- exchange => 'test_x',
- routing_key => 'test_r',
- },
- {},
- $message,
- );
-
- return;
-}
-
diff --git a/xt/05_coro.t b/xt/05_coro.t
new file mode 100644
index 0000000..a8b3a0d
--- /dev/null
+++ b/xt/05_coro.t
@@ -0,0 +1,220 @@
+use Test::More;
+use Test::Exception;
+
+use FindBin;
+use JSON::Syck;
+
+my $conf = JSON::Syck::LoadFile($FindBin::Bin . '/../config.json');
+
+eval {
+ use IO::Socket::INET;
+
+ my $socket = IO::Socket::INET->new(
+ Proto => 'tcp',
+ PeerAddr => $conf->{host},
+ PeerPort => $conf->{port},
+ Timeout => 1,
+ ) or die 'Error connecting to AMQP Server!';
+
+ close $socket;
+};
+
+plan skip_all => 'Connection failure: '
+ . $conf->{host} . ':' . $conf->{port} if $@;
+plan tests => 23;
+
+use RabbitFoot;
+
+#my $rf = RabbitFoot->new(timeout => 1, verbose => 1,);
+my $rf = RabbitFoot->new(timeout => 1,);
+
+lives_ok sub {
+ $rf->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
+}, 'load xml spec';
+
+lives_ok sub {
+ $rf->connect((map {$_ => $conf->{$_}} qw(host port user pass vhost)));
+}, 'connect';
+
+my $ch;
+lives_ok sub {$ch = $rf->open_channel();}, 'open channel';
+
+lives_ok sub {
+ $ch->declare_exchange(exchange => 'test_x');
+}, 'declare_exchange';
+
+lives_ok sub {
+ $ch->declare_queue(queue => 'test_q');
+}, 'declare_queue';
+
+lives_ok sub {
+ $ch->bind_queue(
+ queue => 'test_q',
+ exchange => 'test_x',
+ routing_key => 'test_r',
+ );
+}, 'bind_queue';
+
+lives_ok sub {
+ publish($ch, 'Hello RabbitMQ.');
+}, 'publish';
+
+my $done = AnyEvent->condvar;
+lives_ok sub {
+ $ch->consume(
+ queue => 'test_q',
+ on_consume => sub {
+ my $response = shift;
+ $done->send($response->{deliver}->method_frame->consumer_tag);
+ },
+ );
+}, 'consume';
+
+lives_ok sub {
+ $ch->cancel(consumer_tag => $done->recv,);
+}, 'cancel';
+
+lives_ok sub {
+ publish($ch, 'I love RabbitMQ.');
+ $ch->get(queue => 'test_q');
+}, 'get';
+
+lives_ok sub {
+ $ch->get(queue => 'test_q');
+}, 'empty';
+
+$done = AnyEvent->condvar;
+lives_ok sub {
+ $ch->consume(
+ queue => 'test_q',
+ no_ack => 0,
+ on_consume => sub {
+ my $response = shift;
+ $ch->ack(
+ delivery_tag => $response->{deliver}->method_frame->delivery_tag
+ );
+ $done->send($response->{deliver}->method_frame->consumer_tag);
+ }
+ );
+ publish($ch, 'NO RabbitMQ, NO LIFE.');
+ $ch->cancel(consumer_tag => $done->recv,);
+}, 'ack deliver';
+
+lives_ok sub {
+ publish($ch, 'RabbitMQ is cool.');
+ my $response = $ch->get(
+ queue => 'test_q',
+ no_ack => 0,
+ );
+ $ch->ack(delivery_tag => $response->{ok}->method_frame->delivery_tag,);
+}, 'ack get';
+
+lives_ok sub {
+ $ch->qos(prefetch_count => 2);
+
+ $done = AnyEvent->condvar;
+ my @responses;
+ my $frame = $ch->consume(
+ queue => 'test_q',
+ no_ack => 0,
+ on_consume => sub {
+ my $response = shift;
+ push @responses, $response;
+ return if 2 > scalar @responses;
+ $done->send;
+ },
+ );
+ publish($ch, 'RabbitMQ is excellent.');
+ publish($ch, 'RabbitMQ is fantastic.');
+ $done->recv;
+
+ for my $response (@responses) {
+ $ch->ack(
+ delivery_tag => $response->{deliver}->method_frame->delivery_tag,
+ );
+ }
+
+ $ch->cancel(consumer_tag => $frame->method_frame->consumer_tag,);
+ $ch->qos();
+}, 'qos';
+
+lives_ok sub {
+ $done = AnyEvent->condvar;
+ my $recover_count = 0;
+ $ch->consume(
+ queue => 'test_q',
+ no_ack => 0,
+ on_consume => sub {
+ my $response = shift;
+
+ if (5 > ++$recover_count) {
+ $ch->recover();
+ return;
+ }
+
+ $ch->ack(
+ delivery_tag => $response->{deliver}->method_frame->delivery_tag
+ );
+
+ $done->send($response->{deliver}->method_frame->consumer_tag);
+ }
+ );
+ publish($ch, 'RabbitMQ is powerful.');
+ $ch->cancel(consumer_tag => $done->recv,);
+}, 'recover';
+
+lives_ok sub {
+ $ch->select_tx();
+}, 'select_tx';
+
+lives_ok sub {
+ publish($ch, 'RabbitMQ is highly reliable systems.');
+ $ch->rollback_tx();
+}, 'rollback_tx';
+
+lives_ok sub {
+ publish($ch, 'RabbitMQ is highly reliable systems.');
+ $ch->commit_tx();
+}, 'commit_tx';
+
+lives_ok sub {
+ $ch->purge_queue(queue => 'test_q');
+}, 'purge_queue';
+
+lives_ok sub {
+ $ch->unbind_queue(
+ queue => 'test_q',
+ exchange => 'test_x',
+ routing_key => 'test_r',
+ );
+}, 'unbind_queue';
+
+lives_ok sub {
+ $ch->delete_queue(queue => 'test_q');
+}, 'delete_queue';
+
+lives_ok sub {
+ $ch->delete_exchange(exchange => 'test_x');
+}, 'delete_exchange';
+
+lives_ok sub {
+ $rf->close();
+}, 'close';
+
+sub publish {
+ my ($ch, $message,) = @_;
+
+ $ch->publish(
+ exchange => 'test_x',
+ routing_key => 'test_r',
+ body => $message,
+ on_return => sub {
+ my $response = shift;
+ my $error_message = 'on_return: ' . Dumper($response);
+ die $error_message;
+ },
+ );
+
+ return;
+}
+
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list