[libanyevent-rabbitmq-perl] 14/151: Used Coro.

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:00 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit 8f8ca410c14b549ca918869838d7feb271c47858
Author: cooldaemon <cooldaemon at gmail.com>
Date:   Wed Feb 10 11:37:56 2010 +0900

    Used Coro.
---
 Makefile.PL                         |   2 +
 lib/AnyEvent/RabbitMQ.pm            |   2 +-
 lib/RabbitFoot.pm                   | 744 +++---------------------------------
 lib/RabbitFoot/Channel.pm           |  50 +++
 xt/{05_anyevent.t => 04_anyevent.t} |  53 +--
 xt/04_use_server.t                  | 197 ----------
 xt/05_coro.t                        | 220 +++++++++++
 7 files changed, 348 insertions(+), 920 deletions(-)

diff --git a/Makefile.PL b/Makefile.PL
index 81d4b82..2cede9f 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -12,6 +12,8 @@ requires 'List::MoreUtils';
 requires 'Sys::SigAction';
 requires 'Net::AMQP';
 requires 'AnyEvent';
+requires 'Coro';
+requires 'Coro::AnyEvent';
 
 tests 't/*.t';
 author_tests 'xt';
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index b4f9a27..9496192 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -188,7 +188,7 @@ sub _check_close_and_clean {
             return;
         }
 
-        $channel->_is_open(0);
+        $channel->{_is_open} = 0;
         $self->delete_channel($id);
         $failure_cb->($message);
         return;
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index e08f850..a19800b 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -1,15 +1,14 @@
 package RabbitFoot;
 
-use Data::Dumper;
-use List::MoreUtils qw(none);
-use IO::Socket::INET;
-use Sys::SigAction qw(timeout_call);
-use Net::AMQP;
-use Net::AMQP::Common qw(:all);
+use AnyEvent::RabbitMQ;
+use Coro;
+use Coro::AnyEvent;
+
+use RabbitFoot::Channel;
 
 use Moose;
 
-our $VERSION = '0.01';
+our $VERSION = '0.02';
 
 has verbose => (
     isa => 'Bool',
@@ -22,706 +21,54 @@ has timeout => (
     default => 1,
 );
 
-has publish_timeout => (
-    isa     => 'Int',
-    is      => 'rw',
-    default => 1,
-);
-
-has _socket => (
-    isa     => 'IO::Socket::INET',
-    is      => 'rw',
-    clearer => 'clear_socket',
-);
-
-has _is_open => (
-    isa     => 'Bool',
-    is      => 'rw',
-    default => 0,
-);
-
-has _is_oepn_channel => (
-    isa     => 'Bool',
-    is      => 'rw',
-    default => 0,
+has _ar => (
+    isa     => 'AnyEvent::RabbitMQ',
+    is      => 'ro',
 );
 
-has _consume_tag => (
-    isa     => 'Str',
-    is      => 'rw',
-    default => '',
-);
+for my $method (qw(connect close)) {
+    __PACKAGE__->meta->add_method($method, sub {
+        my $self = shift;
+        $self->_do($method, @_);
+        return $self;
+    });
+}
 
 __PACKAGE__->meta->make_immutable;
 no Moose;
 
-sub load_xml_spec {
-    my ($self, $file,) = @_;
-
-    eval {
-        Net::AMQP::Protocol->load_xml_spec($file);
-    };
-    die $@, "\n" if $@;
-
-    return $self;
-}
-
-sub connect {
-    my ($self, $args) = @_;
-
-    eval {
-        $self->_connect(
-            $args,
-        )->_start(
-            $args,
-        )->_tune(
-        )->_open(
-            $args,
-        )->_open_channel(
-        );
-    };
-
-    return $self if !$@;
-
-    my $exception = $@;
-    $self->close();
-    die $exception;
-}
-
-sub _connect {
-    my ($self, $args,) = @_;
-
-    if ($self->verbose) {
-        print STDERR 'connect to ', $args->{host}, ':', $args->{port}, '...', "\n";
-    }
-
-    my $socket = IO::Socket::INET->new(
-        Proto    => 'tcp',
-        PeerAddr => $args->{host},
-        PeerPort => $args->{port},
-        Timeout  => $self->timeout,
-    ) or die 'Error connecting to AMQP Server!', "\n";
-
-    $self->_socket($socket);
-    return $self;
-}
-
-sub _start {
-    my ($self, $args,) = @_;
-
-    if ($self->verbose) {
-        print STDERR 'post header', "\n";
-    }
-
-    print {$self->_socket} Net::AMQP::Protocol->header;
-
-    my $frame = $self->_read_and_valid('Connection::Start');
-
-    my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
-    die 'AMQPLAIN is not found in mechanisms', "\n"
-        if none {$_ eq 'AMQPLAIN'} @mechanisms;
-
-    my @locales = split /\s/, $frame->method_frame->locales;
-    die 'en_US is not found in locales', "\n"
-        if none {$_ eq 'en_US'} @locales;
-
-    $self->_post(
-        Net::AMQP::Protocol::Connection::StartOk->new(
-            client_properties => {
-                platform    => 'Perl',
-                product     => __PACKAGE__,
-                information => 'http://d.hatena.ne.jp/cooldaemon/',
-                version     => '0.01',
-            },
-            mechanism => 'AMQPLAIN',
-            response => {
-                LOGIN    => $args->{user},
-                PASSWORD => $args->{pass},
-            },
-            locale => 'en_US',
-        ),
-    );
-
-    return $self;
-}
-
-sub _tune {
-    my ($self,) = @_;
-
-    my $frame = $self->_read_and_valid('Connection::Tune');
-
-    $self->_post(
-        Net::AMQP::Protocol::Connection::TuneOk->new(
-            channel_max => $frame->method_frame->channel_max,
-            frame_max   => $frame->method_frame->frame_max,
-            heartbeat   => $frame->method_frame->heartbeat,
-        ),
-    );
-
-    return $self;
-}
-
-sub _open {
-    my ($self, $args,) = @_;
-
-    $self->_post_and_read(
-        'Connection::Open',
-        {
-            virtual_host => $args->{vhost},
-            capabilities => '',
-            insist       => 1,
-        },
-        'Connection::OpenOk', 
-    );
-    $self->_is_open(1);
-
-    return $self;
-}
-
-sub close {
-    my ($self,) = @_;
-
-    for my $method (qw(cancel _close_channel _close _disconnect)) {
-        eval {$self->$method()};
-    }
-
-    return $self;
-}
-
-sub _close {
-    my ($self,) = @_;
-
-    return $self if !$self->_is_open;
-
-    $self->_post_and_read('Connection::Close', {}, 'Connection::CloseOk',);
-    $self->_is_open(0);
-
-    return $self;
-}
-
-sub _disconnect {
-    my ($self,) = @_;
-
-    return if !$self->_socket;
-
-    CORE::close $self->_socket;
-    $self->clear_socket;
-
-    return;
-}
-
-sub _open_channel {
-    my ($self) = @_;
-
-    $self->_post_and_read('Channel::Open', {}, 'Channel::OpenOk', 1,);
-    $self->_is_oepn_channel(1);
-
-    return $self;
-}
-
-sub _close_channel {
-    my ($self,) = @_;
-
-    return $self if !$self->_is_oepn_channel;
-
-    $self->_post_and_read('Channel::Close', {}, 'Channel::CloseOk', 1,);
-    $self->_is_oepn_channel(0);
-
-    return $self;
-}
-
-sub declare_exchange {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Exchange::Declare',
-        {
-            type        => 'direct',
-            passive     => 0,
-            durable     => 0,
-            auto_delete => 0,
-            internal    => 0,
-            %$args, # exchange
-            ticket      => 0,
-            nowait      => 0,
-        },
-        'Exchange::DeclareOk', 
-        1,
-    );
-}
-
-sub delete_exchange {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Exchange::Delete',
-        {
-            if_unused => 0,
-            %$args, # exchange
-            ticket    => 0,
-            nowait    => 0,
-        },
-        'Exchange::DeleteOk', 
-        1,
-    );
-}
-
-sub declare_queue {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Queue::Declare',
-        {
-            queue       => '',
-            passive     => 0,
-            durable     => 0,
-            exclusive   => 0,
-            auto_delete => 0,
-            %$args,
-            ticket      => 0,
-            no_ack      => 1,
-            nowait      => 0,
-        },
-        'Queue::DeclareOk', 
-        1,
-    );
-}
-
-sub bind_queue {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Queue::Bind',
-        {
-            %$args, # queue, exchange, routing_key
-            ticket      => 0,
-            nowait      => 0,
-        },
-        'Queue::BindOk', 
-        1,
+sub BUILD {
+    my $self = shift;
+    $self->{_ar} = AnyEvent::RabbitMQ->new(
+        verbose => $self->verbose,
+        timeout => $self->timeout,
     );
 }
 
-sub unbind_queue {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Queue::Unbind',
-        {
-            %$args, # queue, exchange, routing_key
-            ticket      => 0,
-        },
-        'Queue::UnbindOk', 
-        1,
-    );
-}
-
-sub purge_queue {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Queue::Purge',
-        {
-            %$args, # queue
-            ticket => 0,
-            nowait => 0,
-        },
-        'Queue::PurgeOk', 
-        1,
-    );
-}
-
-sub delete_queue {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Queue::Delete',
-        {
-            if_unused => 0,
-            if_empty  => 0,
-            %$args, # queue
-            ticket    => 0,
-            nowait    => 0,
-        },
-        'Queue::DeleteOk', 
-        1,
-    );
-}
-
-sub publish {
-    my ($self, $publish_args, $header_args, $message,) = @_;
-
-    $self->_publish(
-        $publish_args,
-    )->_header(
-        $header_args, $message,
-    )->_body(
-        $message,
-    );
-
-    return if !$publish_args->{mandatory} && !$publish_args->{immediate};
-
-    my $frame = eval {
-        $self->_read_and_valid('Basic::Return', $self->publish_timeout);
-    };
-
-    if ($@) {
-        return if $@ =~ '^Read\stimed\sout';
-        die $@;
-    }
-
-    return {
-        return => $frame,
-        header => $self->_read_header_and_valid(),
-        body   => $self->_read_body_and_valid(),
-    };
-}
-
-sub _publish {
-    my ($self, $args,) = @_;
-
-    $self->_post(
-        Net::AMQP::Protocol::Basic::Publish->new(
-            exchange  => '',
-            mandatory => 0,
-            immediate => 0,
-            %$args, # routing_key
-            ticket    => 0,
-        ),
-        1,
-    );
-
-    return $self;
-}
-
-sub _header {
-    my ($self, $args, $message,) = @_;
-
-    $self->_post(
-        Net::AMQP::Frame::Header->new(
-            weight       => $args->{weight} || 0,
-            body_size    => length($message),
-            header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
-                content_type     => 'application/octet-stream',
-                content_encoding => '',
-                headers          => {},
-                delivery_mode    => 1,
-                priority         => 1,
-                correlation_id   => '',
-                reply_to         => '',
-                expiration       => '',
-                message_id       => '',
-                timestamp        => time,
-                type             => '',
-                user_id          => '',
-                app_id           => '',
-                cluster_id       => '',
-                %$args,
-            ),
-        ),
-        1,
-    );
-
-    return $self;
-}
-
-sub _body {
-    my ($self, $message,) = @_;
-    $self->_post(Net::AMQP::Frame::Body->new(payload => $message), 1);
-    return $self;
-}
-
-sub consume {
-    my ($self, $args,) = @_;
-
-    die 'Has already been consuming message', "\n" if $self->_consume_tag;
-
-    my $frame = $self->_post_and_read(
-        'Basic::Consume',
-        {
-            consumer_tag => '',
-            no_local     => 0,
-            no_ack       => 1,
-            exclusive    => 0,
-            %$args, # queue
-            ticket       => 0,
-            nowait       => 0,
-        },
-        'Basic::ConsumeOk', 
-        1,
-    );
-
-    $self->_consume_tag($frame->method_frame->consumer_tag);
-    return $frame;
-}
-
-sub cancel {
-    my ($self,) = @_;
-
-    return if !$self->_consume_tag;
-
-    my $frame = $self->_post_and_read(
-        'Basic::Cancel',
-        {
-            consumer_tag => $self->_consume_tag,
-            nowait       => 0,
-        },
-        'Basic::CancelOk', 
-        1,
-    );
-
-    $self->_consume_tag('');
-    return $frame;
-}
-
-sub poll {
-    my ($self, $args,) = @_;
-
-    my $timeout = $args && $args->{timeout} ? $args->{timeout} : 'infinite';
-    return {
-        deliver => $self->_read_and_valid('Basic::Deliver', $timeout),
-        header  => $self->_read_header_and_valid(),
-        body    => $self->_read_body_and_valid(),
-    };
-}
-
-sub get {
-    my ($self, $args,) = @_;
-
-    my $frame = $self->_post_and_read(
-        'Basic::Get',
-        {
-            no_ack => 1,
-            %$args, # queue
-            ticket => 0,
-        },
-        [qw(Basic::GetOk Basic::GetEmpty)], 
-        1,
-    );
-
-    return $frame
-        if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
-
-    return {
-        getok  => $frame,
-        header => $self->_read_header_and_valid(),
-        body   => $self->_read_body_and_valid(),
-    };
-}
-
-sub ack {
-    my ($self, $args,) = @_;
-
-    $self->_post(
-        Net::AMQP::Protocol::Basic::Ack->new(
-            delivery_tag => 0,
-            multiple     => (
-                defined $args->{delivery_tag} && $args->{delivery_tag} != 0 ? 0 : 1
-            ),
-            %$args,
-        ),
-        1,
-    );
-    return;
-}
-
-sub qos {
-    my ($self, $args,) = @_;
-
-    return $self->_post_and_read(
-        'Basic::Qos',
-        {
-            prefetch_count => 1,
-            %$args,
-            prefetch_size  => 0,
-            global         => 0,
-        },
-        'Basic::QosOk', 
-        1,
-    );
-}
-
-sub recover {
-    my ($self, $args,) = @_;
-
-    $self->_post(
-        Net::AMQP::Protocol::Basic::Recover->new(
-            requeue => 0,
-            %$args,
-        ),
-        1,
-    );
-    return;
-}
-
-sub select_tx {
-    my ($self,) = @_;
-    return $self->_post_and_read('Tx::Select', {}, 'Tx::SelectOk', 1,);
-}
-
-sub commit_tx {
-    my ($self,) = @_;
-    return $self->_post_and_read('Tx::Commit', {}, 'Tx::CommitOk', 1,);
-}
-
-sub rollback_tx {
-    my ($self,) = @_;
-    return $self->_post_and_read('Tx::Rollback', {}, 'Tx::RollbackOk', 1,);
-}
-
-sub _post_and_read {
-    my ($self, $method, $args, $exp, $id,) = @_;
-
-    $method = 'Net::AMQP::Protocol::' . $method;
-    $self->_post(
-        Net::AMQP::Frame::Method->new(
-            method_frame => $method->new(%$args)
-        ),
-        $id,
-    );
-    return $self->_read_and_valid($exp);
-}
-
-sub _read_and_valid {
-    my ($self, $exp, $timeout,) = @_;
-    $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
-
-    my $frame = $self->_read($timeout);
-    die 'Received data is not method frame', "\n"
-        if !$frame->isa('Net::AMQP::Frame::Method');
-
-    my $method_frame = $frame->method_frame;
-    for my $exp_elem (@$exp) {
-        return $frame if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
-    }
-
-    $self->_check_close_and_clean($frame);
-    die 'Method is not ', join(',', @$exp), "\n",
-        'Method was ', ref $method_frame, "\n"
-            if !$method_frame->isa('Net::AMQP::Protocol::Connection::Close');
-}
-
-sub _read_header_and_valid {
-    my ($self,) = @_;
-
-    my $frame = $self->_read();
-    if (!$frame->isa('Net::AMQP::Frame::Header')) {
-        $self->_check_close_and_clean($frame);
-        die 'Received data is not header frame', "\n";
-    }
-
-    my $header_frame = $frame->header_frame;
-    die 'Header is not Protocol::Basic::ContentHeader',
-        'Header was ', ref $header_frame, "\n"
-            if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
-
-    return $frame;
-}
-
-sub _read_body_and_valid {
-    my ($self,) = @_;
-
-    my $frame = $self->_read();
-    return $frame if $frame->isa('Net::AMQP::Frame::Body');
-
-    $self->_check_close_and_clean($frame);
-    die 'Received data is not body frame', "\n";
-}
-
-sub _check_close_and_clean {
-    my ($self, $frame,) = @_;
-
-    return $self if !$frame->isa('Net::AMQP::Frame::Method');
-
-    my $method_frame = $frame->method_frame;
-
-    if ($method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
-        $self->_is_oepn_channel(0);
-        $self->_post(Net::AMQP::Protocol::Connection::CloseOk->new());
-        $self->_is_open(0);
-        $self->_disconnect();
-        die $method_frame->reply_code, ' ', $method_frame->reply_text, "\n";
-    } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
-        $self->_post(Net::AMQP::Protocol::Channel::CloseOk->new(), 1);
-        $self->_is_oepn_channel(0);
-        $self->_close()->_disconnect();
-        die $method_frame->reply_code, ' ', $method_frame->reply_text, "\n";
-    }
-
+sub load_xml_spec {
+    my $self = shift;
+    $self->{_ar}->load_xml_spec(@_);
     return $self;
 }
 
-sub _read {
-    my ($self, $timeout,) = @_;
-
-    $timeout ||= $self->timeout;
-
-    my $frame;
-
-    if ($timeout eq 'infinite') {
-        $frame = $self->_do_read();
-    } else {
-        if (timeout_call($timeout, sub {$frame = $self->_do_read()})) {
-            die 'Read timed out after', $timeout, "\n";
-        }
-    }
-
-    return $frame;
+sub open_channel {
+    my $self = shift;
+    return RabbitFoot::Channel->new(arc => $self->_do('open_channel', @_));
 }
 
-sub _do_read {
-    my ($self,) = @_;
-
-    my $stack;
-    my $data;
-
-    read $self->_socket, $data, 8;
-    if (length($data) <= 0) {
-        die 'Disconnect', "\n";
-    }
-
-    $stack .= $data;
-
-    my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
-    if (!defined $type_id || !defined $channel || !defined $length) {
-        die 'Broken data was received', "\n";
-    }
-
-    while ($length > 0) {
-        $length -= read $self->_socket, $data, $length;
-        $stack .= $data;
-    }
-
-    my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
-    if ($self->verbose) {
-        print STDERR '[C] <-- [S] ' . Dumper($frame);
-        print STDERR '-----------', "\n";
-    }
-
-    return $frame;
-}
-
-sub _post {
-    my ($self, $output, $id,) = @_;
-
-    if ($output->isa('Net::AMQP::Protocol::Base')) {
-        $output = $output->frame_wrap;
-    }
-    $output->channel($id || 0);
-
-    if ($self->verbose) {
-        print STDERR '[C] --> [S] ', Dumper($output), "\n";
-    }
-    print {$self->_socket} $output->to_raw_frame();
-
-    return;
-}
+sub _do {
+    my $self   = shift;
+    my $method = shift;
+    my %args   = @_;
 
-sub DEMOLISH {
-    my ($self) = @_;
+    my $cb = Coro::rouse_cb;
+    $args{on_success} = sub {$cb->(1, @_);},
+    $args{on_failure} = sub {$cb->(0, @_);},
 
-    $self->close();
-    return;
+    $self->_ar->$method(%args);
+    my ($is_success, @responses) = Coro::rouse_wait;
+    die @responses if !$is_success;
+    return @responses;
 }
 
 1;
@@ -729,29 +76,30 @@ __END__
 
 =head1 NAME
 
-RabbitFoot - A synchronous and single channel Perl AMQP client.
+RabbitFoot - An Asynchronous and single channel Perl AMQP client.
 
 =head1 SYNOPSIS
 
   use RabbitFoot;
 
-  my $rf = RabbitFoot->new({
+  my $rf = RabbitFoot->new(
       timeout => 1,
-  })->load_xml_spec(
+  )->load_xml_spec(
       '/path/to/amqp0-8.xml',
-  )->connect({
+  )->connect(
       host  => 'localhosti',
       port  => 5672,
       user  => 'guest',
       port  => 'guest',
       vhost => '/',
-  });
+  );
 
-  $rf->declare_exchange({exchange => 'test_exchange'});
+  my $ch = $rf->open_channel();
+  $ch->declare_exchange(exchange => 'test_exchange');
 
 =head1 DESCRIPTION
 
-RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a synchronous fashion.
+RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.
 
 You can use RabbitFoot to -
 
diff --git a/lib/RabbitFoot/Channel.pm b/lib/RabbitFoot/Channel.pm
new file mode 100644
index 0000000..c5d2538
--- /dev/null
+++ b/lib/RabbitFoot/Channel.pm
@@ -0,0 +1,50 @@
+package RabbitFoot::Channel;
+
+use Coro;
+use Coro::AnyEvent;
+
+use AnyEvent::RabbitMQ::Channel;
+
+use Moose;
+
+our $VERSION = '0.01';
+
+has arc => (
+    isa => 'AnyEvent::RabbitMQ::Channel',
+    is  => 'ro',
+);
+
+for my $method (qw(
+    close
+    declare_exchange delete_exchange
+    declare_queue bind_queue unbind_queue purge_queue delete_queue
+    consume cancel get qos
+    select_tx commit_tx rollback_tx
+)) {
+    __PACKAGE__->meta->add_method($method, sub {
+        my $self = shift;
+        my %args = @_;
+
+        my $cb = Coro::rouse_cb;
+        $args{on_success} = sub {$cb->(1, @_);},
+        $args{on_failure} = sub {$cb->(0, @_);},
+
+        $self->arc->$method(%args);
+        my ($is_success, @responses) = Coro::rouse_wait;
+        die @responses if !$is_success;
+        return $responses[0];
+    });
+}
+
+for my $method (qw(publish ack recover)) {
+    __PACKAGE__->meta->add_method($method, sub {
+        my $self = shift;
+        $self->arc->$method(@_);
+        return $self;
+    });
+}
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+1;
diff --git a/xt/05_anyevent.t b/xt/04_anyevent.t
similarity index 89%
rename from xt/05_anyevent.t
rename to xt/04_anyevent.t
index f734e0b..1765f37 100644
--- a/xt/05_anyevent.t
+++ b/xt/04_anyevent.t
@@ -25,7 +25,7 @@ plan tests => 24;
 
 use AnyEvent::RabbitMQ;
 
-my $ar = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 1,);
+my $ar = AnyEvent::RabbitMQ->new(timeout => 1,);
 
 lives_ok sub {
     $ar->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
@@ -91,32 +91,37 @@ $ch->bind_queue(
 $done->recv;
 
 $done = AnyEvent->condvar;
+my $consumer_tag;
 $ch->consume(
     queue      => 'test_q',
+    on_success => sub {
+        my $frame = shift;
+        $consumer_tag = $frame->method_frame->consumer_tag;
+        pass('consume');
+    },
     on_consume => sub {
         my $response = shift;
-        my $message  = $response->{body}->payload;
-
-        pass('publish and consume message');
-        return if $message ne 'cancel';
-
-        $ch->cancel(
-            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
-            on_success   => sub {
-                pass('cancel');
-                $done->send;
-            },
-            on_failure   => failure_cb($done),
-        );
+        ok($response->{body}->payload, 'publish');
+        $done->send;
     },
     on_failure => failure_cb($done),
 );
-publish($ch, 'Hello RabbitMQ', $done,);
-publish($ch, 'cancel', $done,);
+publish($ch, 'Hello RabbitMQ.', $done,);
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->cancel(
+    consumer_tag => $consumer_tag,
+    on_success   => sub {
+        pass('cancel');
+        $done->send;
+    },
+    on_failure   => failure_cb($done),
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-publish($ch, 'I love RabbitMQ', $done,);
+publish($ch, 'I love RabbitMQ.', $done,);
 $ch->get(
     queue      => 'test_q',
     on_success => sub {
@@ -162,11 +167,11 @@ $ch->consume(
     },
     on_failure => failure_cb($done),
 );
-publish($ch, 'NO RabbitMQ, NO LIFE', $done,);
+publish($ch, 'NO RabbitMQ, NO LIFE.', $done,);
 $done->recv;
 
 $done = AnyEvent->condvar;
-publish($ch, 'RabbitMQ is cool', $done,);
+publish($ch, 'RabbitMQ is cool.', $done,);
 $ch->get(
     queue      => 'test_q',
     no_ack     => 0,
@@ -201,8 +206,8 @@ $ch->qos(
     },
     on_failure => failure_cb($done),
 );
-publish($ch, 'RabbitMQ is excellent', $done,);
-publish($ch, 'RabbitMQ is fantastic', $done,);
+publish($ch, 'RabbitMQ is excellent.', $done,);
+publish($ch, 'RabbitMQ is fantastic.', $done,);
 $done->recv;
 pass('qos');
 
@@ -236,7 +241,7 @@ $ch->consume(
         my $response = shift;
 
         if (5 > ++$recover_count) {
-            $ch->recover({});
+            $ch->recover();
             return;
         }
 
@@ -254,7 +259,7 @@ $ch->consume(
     },
     on_failure => failure_cb($done),
 );
-publish($ch, 'RabbitMQ is powerful', $done,);
+publish($ch, 'RabbitMQ is powerful.', $done,);
 $done->recv;
 pass('recover');
 
@@ -262,7 +267,7 @@ $done = AnyEvent->condvar;
 $ch->select_tx(
     on_success => sub {
         pass('select tx');
-        publish($ch, 'RabbitMQ is highly reliable systems', $done,);
+        publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
 
         $ch->rollback_tx(
             on_success => sub {
diff --git a/xt/04_use_server.t b/xt/04_use_server.t
deleted file mode 100644
index 26a006f..0000000
--- a/xt/04_use_server.t
+++ /dev/null
@@ -1,197 +0,0 @@
-use Test::More;
-use Test::Exception;
-
-use FindBin;
-use JSON::Syck;
-
-my $conf = JSON::Syck::LoadFile($FindBin::Bin . '/../config.json');
-
-eval {
-    use IO::Socket::INET;
-
-    my $socket = IO::Socket::INET->new(
-        Proto    => 'tcp',
-        PeerAddr => $conf->{host},
-        PeerPort => $conf->{port},
-        Timeout  => 1,
-    ) or die 'Error connecting to AMQP Server!';
-
-    close $socket;
-};
-
-plan skip_all => 'Connection failure: '
-               . $conf->{host} . ':' . $conf->{port} if $@;
-plan tests => 23;
-
-use RabbitFoot;
-
-my $rf = RabbitFoot->new({timeout => 1, verbose => 1,});
-
-lives_ok sub {
-    $rf->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
-}, 'load xml spec';
- 
-lives_ok sub {
-    $rf->connect({(map {$_ => $conf->{$_}} qw(host port user pass vhost))});
-}, 'connect';
-
-lives_ok sub {
-    $rf->declare_exchange({exchange => 'test_x'});
-}, 'declare_exchange';
-
-lives_ok sub {
-    $rf->declare_queue({queue => 'test_q'});
-}, 'declare_queue';
-
-lives_ok sub {
-    $rf->bind_queue({
-        queue       => 'test_q',
-        exchange    => 'test_x',
-        routing_key => 'test_r',
-    });
-}, 'bind_queue';
-
-lives_ok sub {
-    publish($rf, 'Hello RabbitMQ.');
-}, 'publish for consume';
-
-lives_ok sub {
-    $rf->consume({queue => 'test_q'});
-}, 'consume';
-
-lives_ok sub {
-    $rf->poll({timeout => 1});
-}, 'poll';
-
-lives_ok sub {
-    $rf->cancel();
-}, 'cancel';
-
-lives_ok sub {
-    publish($rf, 'I love RabbitMQ.');
-    $rf->get({queue => 'test_q'});
-}, 'get';
-
-lives_ok sub {
-    $rf->get({queue => 'test_q'});
-}, 'get empty';
-
-lives_ok sub {
-    publish($rf, 'NO RabbitMQ, NO LIFE.');
-    $rf->consume({
-        queue  => 'test_q',
-        no_ack => 0,
-    });
-    my $response = $rf->poll({timeout => 1});
-    $rf->ack({
-        delivery_tag => $response->{deliver}->method_frame->delivery_tag,
-    });
-    $rf->cancel();
-}, 'ack deliver';
-
-lives_ok sub {
-    publish($rf, 'RabbitMQ is cool.');
-    my $response = $rf->get({
-        queue  => 'test_q',
-        no_ack => 0,
-    });
-    $rf->ack({
-        delivery_tag => $response->{getok}->method_frame->delivery_tag,
-    });
-}, 'ack get';
-
-lives_ok sub {
-    publish($rf, 'RabbitMQ is excellent.');
-    publish($rf, 'RabbitMQ is fantastic.');
-    $rf->qos({prefetch_count => 2});
-
-    $rf->consume({
-        queue  => 'test_q',
-        no_ack => 0,
-    });
- 
-    my @responses = map {$rf->poll({timeout => 1})} (1, 2);
-    for my $response (@responses) {
-        $rf->ack({
-            delivery_tag => $response->{deliver}->method_frame->delivery_tag,
-        });
-    }
-
-    $rf->cancel();
-    $rf->qos({});
-}, 'qos';
-
-lives_ok sub {
-    publish($rf, 'RabbitMQ is powerful.');
-
-    $rf->consume({
-        queue  => 'test_q',
-        no_ack => 0,
-    });
- 
-    for (1..5) {
-        my $response = $rf->poll({timeout => 1});
-        $rf->recover({});
-    }
-
-    my $response = $rf->poll({timeout => 1});
-    $rf->ack({
-        delivery_tag => $response->{deliver}->method_frame->delivery_tag,
-    });
-
-    $rf->cancel();
-}, 'recover';
-
-lives_ok sub {
-    $rf->select_tx();
-}, 'select_tx';
-
-lives_ok sub {
-    publish($rf, 'RabbitMQ is highly reliable systems.');
-    $rf->rollback_tx();
-}, 'rollback_tx';
-
-lives_ok sub {
-    publish($rf, 'RabbitMQ is highly reliable systems.');
-    $rf->commit_tx();
-}, 'commit_tx';
-
-lives_ok sub {
-    $rf->purge_queue({queue => 'test_q'});
-}, 'purge_queue';
-
-lives_ok sub {
-    $rf->unbind_queue({
-        queue       => 'test_q',
-        exchange    => 'test_x',
-        routing_key => 'test_r',
-    });
-}, 'unbind_queue';
-
-lives_ok sub {
-    $rf->delete_queue({queue => 'test_q'});
-}, 'delete_queue';
-
-lives_ok sub {
-    $rf->delete_exchange({exchange => 'test_x'});
-}, 'delete_exchange';
-
-lives_ok sub {
-    $rf->close();
-}, 'close';
-
-sub publish {
-    my ($rf, $message,) = @_;
-
-    $rf->publish(
-        {
-            exchange    => 'test_x',
-            routing_key => 'test_r',
-        },
-        {},
-        $message,
-    );
-
-    return;
-}
-
diff --git a/xt/05_coro.t b/xt/05_coro.t
new file mode 100644
index 0000000..a8b3a0d
--- /dev/null
+++ b/xt/05_coro.t
@@ -0,0 +1,220 @@
+use Test::More;
+use Test::Exception;
+
+use FindBin;
+use JSON::Syck;
+
+my $conf = JSON::Syck::LoadFile($FindBin::Bin . '/../config.json');
+
+eval {
+    use IO::Socket::INET;
+
+    my $socket = IO::Socket::INET->new(
+        Proto    => 'tcp',
+        PeerAddr => $conf->{host},
+        PeerPort => $conf->{port},
+        Timeout  => 1,
+    ) or die 'Error connecting to AMQP Server!';
+
+    close $socket;
+};
+
+plan skip_all => 'Connection failure: '
+               . $conf->{host} . ':' . $conf->{port} if $@;
+plan tests => 23;
+
+use RabbitFoot;
+
+#my $rf = RabbitFoot->new(timeout => 1, verbose => 1,);
+my $rf = RabbitFoot->new(timeout => 1,);
+
+lives_ok sub {
+    $rf->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
+}, 'load xml spec';
+ 
+lives_ok sub {
+    $rf->connect((map {$_ => $conf->{$_}} qw(host port user pass vhost)));
+}, 'connect';
+
+my $ch;
+lives_ok sub {$ch = $rf->open_channel();}, 'open channel';
+
+lives_ok sub {
+    $ch->declare_exchange(exchange => 'test_x');
+}, 'declare_exchange';
+
+lives_ok sub {
+    $ch->declare_queue(queue => 'test_q');
+}, 'declare_queue';
+
+lives_ok sub {
+    $ch->bind_queue(
+        queue       => 'test_q',
+        exchange    => 'test_x',
+        routing_key => 'test_r',
+    );
+}, 'bind_queue';
+
+lives_ok sub {
+    publish($ch, 'Hello RabbitMQ.');
+}, 'publish';
+
+my $done = AnyEvent->condvar;
+lives_ok sub {
+    $ch->consume(
+        queue      => 'test_q',
+        on_consume => sub {
+            my $response = shift;
+            $done->send($response->{deliver}->method_frame->consumer_tag);
+        },
+    );
+}, 'consume';
+
+lives_ok sub {
+    $ch->cancel(consumer_tag => $done->recv,);
+}, 'cancel';
+
+lives_ok sub {
+    publish($ch, 'I love RabbitMQ.');
+    $ch->get(queue => 'test_q');
+}, 'get';
+
+lives_ok sub {
+    $ch->get(queue => 'test_q');
+}, 'empty';
+
+$done = AnyEvent->condvar;
+lives_ok sub {
+    $ch->consume(
+        queue  => 'test_q',
+        no_ack => 0,
+        on_consume => sub {
+            my $response = shift;
+            $ch->ack(
+                delivery_tag => $response->{deliver}->method_frame->delivery_tag
+            );
+            $done->send($response->{deliver}->method_frame->consumer_tag);
+        }
+    );
+    publish($ch, 'NO RabbitMQ, NO LIFE.');
+    $ch->cancel(consumer_tag => $done->recv,);
+}, 'ack deliver';
+
+lives_ok sub {
+    publish($ch, 'RabbitMQ is cool.');
+    my $response = $ch->get(
+        queue  => 'test_q',
+        no_ack => 0,
+    );
+    $ch->ack(delivery_tag => $response->{ok}->method_frame->delivery_tag,);
+}, 'ack get';
+
+lives_ok sub {
+    $ch->qos(prefetch_count => 2);
+
+    $done = AnyEvent->condvar;
+    my @responses;
+    my $frame = $ch->consume(
+        queue  => 'test_q',
+        no_ack => 0,
+        on_consume => sub {
+            my $response = shift;
+            push @responses, $response;
+            return if 2 > scalar @responses;
+            $done->send;
+        },
+    );
+    publish($ch, 'RabbitMQ is excellent.');
+    publish($ch, 'RabbitMQ is fantastic.');
+    $done->recv;
+
+    for my $response (@responses) {
+        $ch->ack(
+            delivery_tag => $response->{deliver}->method_frame->delivery_tag,
+        );
+    }
+
+    $ch->cancel(consumer_tag => $frame->method_frame->consumer_tag,);
+    $ch->qos();
+}, 'qos';
+
+lives_ok sub {
+    $done = AnyEvent->condvar;
+    my $recover_count = 0;
+    $ch->consume(
+        queue  => 'test_q',
+        no_ack => 0,
+        on_consume => sub {
+            my $response = shift;
+
+            if (5 > ++$recover_count) {
+                $ch->recover();
+                return;
+            }
+
+            $ch->ack(
+                delivery_tag => $response->{deliver}->method_frame->delivery_tag
+            );
+
+            $done->send($response->{deliver}->method_frame->consumer_tag);
+        }
+    );
+    publish($ch, 'RabbitMQ is powerful.');
+    $ch->cancel(consumer_tag => $done->recv,);
+}, 'recover';
+
+lives_ok sub {
+    $ch->select_tx();
+}, 'select_tx';
+
+lives_ok sub {
+    publish($ch, 'RabbitMQ is highly reliable systems.');
+    $ch->rollback_tx();
+}, 'rollback_tx';
+
+lives_ok sub {
+    publish($ch, 'RabbitMQ is highly reliable systems.');
+    $ch->commit_tx();
+}, 'commit_tx';
+
+lives_ok sub {
+    $ch->purge_queue(queue => 'test_q');
+}, 'purge_queue';
+
+lives_ok sub {
+    $ch->unbind_queue(
+        queue       => 'test_q',
+        exchange    => 'test_x',
+        routing_key => 'test_r',
+    );
+}, 'unbind_queue';
+
+lives_ok sub {
+    $ch->delete_queue(queue => 'test_q');
+}, 'delete_queue';
+
+lives_ok sub {
+    $ch->delete_exchange(exchange => 'test_x');
+}, 'delete_exchange';
+
+lives_ok sub {
+    $rf->close();
+}, 'close';
+
+sub publish {
+    my ($ch, $message,) = @_;
+
+    $ch->publish(
+        exchange    => 'test_x',
+        routing_key => 'test_r',
+        body        => $message,
+        on_return   => sub {
+            my $response = shift;
+            my $error_message = 'on_return: ' . Dumper($response);
+            die $error_message;
+        },
+    );
+
+    return;
+}
+

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list