[libanyevent-rabbitmq-perl] 12/151: Added test scripts for AnyEvent.

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:00 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit 6aa9832a7757826f033010ae22360e1573f87a73
Author: cooldaemon <cooldaemon at gmail.com>
Date:   Mon Feb 8 22:34:45 2010 +0900

    Added test scripts for AnyEvent.
---
 lib/AnyEvent/RabbitMQ.pm |   2 +-
 xt/05_anyevent.t         | 367 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 368 insertions(+), 1 deletion(-)

diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index d0aebf4..87c08f3 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -281,7 +281,7 @@ sub _open {
         'Connection::OpenOk', 
         sub {
             $self->{_is_open} = 1;
-            $args->{on_success}->();
+            $args->{on_success}->($self);
         },
         $args->{on_failure},
     );
diff --git a/xt/05_anyevent.t b/xt/05_anyevent.t
new file mode 100644
index 0000000..ffdc917
--- /dev/null
+++ b/xt/05_anyevent.t
@@ -0,0 +1,367 @@
+use Test::More;
+use Test::Exception;
+
+use FindBin;
+use JSON::Syck;
+
+my $conf = JSON::Syck::LoadFile($FindBin::Bin . '/../config.json');
+
+eval {
+    use IO::Socket::INET;
+
+    my $socket = IO::Socket::INET->new(
+        Proto    => 'tcp',
+        PeerAddr => $conf->{host},
+        PeerPort => $conf->{port},
+        Timeout  => 1,
+    ) or die 'Error connecting to AMQP Server!';
+
+    close $socket;
+};
+
+plan skip_all => 'Connection failure: '
+               . $conf->{host} . ':' . $conf->{port} if $@;
+plan tests => 24;
+
+use AnyEvent::RabbitMQ;
+
+my $ar = AnyEvent::RabbitMQ->new({timeout => 1, verbose => 1,});
+
+lives_ok sub {
+    $ar->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
+}, 'load xml spec';
+
+my $done = AnyEvent->condvar;
+$ar->connect({
+    (map {$_ => $conf->{$_}} qw(host port user pass vhost)),
+    on_success => sub {
+        my $ar = shift;
+        isa_ok($ar, 'AnyEvent::RabbitMQ');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+my $ch;
+$ar->open_channel({
+    on_success => sub {
+        $ch = shift;
+        isa_ok($ch, 'AnyEvent::RabbitMQ::Channel');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->declare_exchange({
+    exchange   => 'test_x',
+    on_success => sub {
+        pass('declare exchange');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->declare_queue({
+    queue      => 'test_q',
+    on_success => sub {
+        pass('declare queue');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->bind_queue({
+    queue       => 'test_q',
+    exchange    => 'test_x',
+    routing_key => 'test_r',
+    on_success  => sub {
+        pass('bound queue');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->consume({
+    queue      => 'test_q',
+    on_consume => sub {
+        my $response = shift;
+        my $message  = $response->{body}->payload;
+
+        pass('publish and consume message');
+        return if $message ne 'cancel';
+
+        $ch->cancel({
+            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
+            on_success   => sub {
+                pass('cancel');
+                $done->send;
+            },
+            on_failure   => failure_cb($done),
+        });
+    },
+    on_failure => failure_cb($done),
+});
+publish($ch, 'Hello RabbitMQ', $done,);
+publish($ch, 'cancel', $done,);
+$done->recv;
+
+$done = AnyEvent->condvar;
+publish($ch, 'I love RabbitMQ', $done,);
+$ch->get({
+    queue      => 'test_q',
+    on_success => sub {
+        my $response = shift;
+        ok(defined $response->{ok}, 'getok');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->get({
+    queue      => 'test_q',
+    on_success => sub {
+        my $response = shift;
+        ok(defined $response->{empty}, 'empty');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->consume({
+    queue      => 'test_q',
+    no_ack     => 0,
+    on_consume => sub {
+        my $response = shift;
+        $ch->ack({
+            delivery_tag => $response->{deliver}->method_frame->delivery_tag
+        });
+        pass('ack deliver');
+
+        $ch->cancel({
+            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
+            on_success   => sub {
+                pass('cancel');
+                $done->send;
+            },
+            on_failure   => failure_cb($done),
+        });
+    },
+    on_failure => failure_cb($done),
+});
+publish($ch, 'NO RabbitMQ, NO LIFE', $done,);
+$done->recv;
+
+$done = AnyEvent->condvar;
+publish($ch, 'RabbitMQ is cool', $done,);
+$ch->get({
+    queue      => 'test_q',
+    no_ack     => 0,
+    on_success => sub {
+        my $response = shift;
+        $ch->ack({
+            delivery_tag => $response->{ok}->method_frame->delivery_tag
+        });
+        pass('ack get');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+my @responses;
+$ch->qos({
+    prefetch_count => 2,
+    on_success => sub {
+        $ch->consume({
+            queue      => 'test_q',
+            no_ack     => 0,
+            on_consume => sub {
+                my $response = shift;
+                push @responses, $response;
+                return if 2 > scalar @responses;
+                $done->send;
+            },
+            on_failure => failure_cb($done),
+        });
+    },
+    on_failure => failure_cb($done),
+});
+publish($ch, 'RabbitMQ is excellent', $done,);
+publish($ch, 'RabbitMQ is fantastic', $done,);
+$done->recv;
+pass('qos');
+
+for my $response (@responses) {
+    $ch->ack({
+        delivery_tag => $response->{deliver}->method_frame->delivery_tag,
+    });
+}
+
+$done = AnyEvent->condvar;
+$ch->cancel({
+    consumer_tag => $responses[0]->{deliver}->method_frame->consumer_tag,
+    on_success   => sub {
+        $ch->qos({
+            on_success => sub {
+                $done->send;
+            },
+            on_failure => failure_cb($done),
+        });
+    },
+    on_failure   => failure_cb($done),
+});
+$done->recv;
+ 
+$done = AnyEvent->condvar;
+my $recover_count = 0;
+$ch->consume({
+    queue      => 'test_q',
+    no_ack     => 0,
+    on_consume => sub {
+        my $response = shift;
+
+        if (5 > ++$recover_count) {
+            $ch->recover({});
+            return;
+        }
+
+        $ch->ack({
+            delivery_tag => $response->{deliver}->method_frame->delivery_tag
+        });
+
+        $ch->cancel({
+            consumer_tag => $response->{deliver}->method_frame->consumer_tag,
+            on_success   => sub {
+                $done->send;
+            },
+            on_failure   => failure_cb($done),
+        });
+    },
+    on_failure => failure_cb($done),
+});
+publish($ch, 'RabbitMQ is powerful', $done,);
+$done->recv;
+pass('recover');
+
+$done = AnyEvent->condvar;
+$ch->select_tx({
+    on_success => sub {
+        pass('select tx');
+        publish($ch, 'RabbitMQ is highly reliable systems', $done,);
+
+        $ch->rollback_tx({
+            on_success => sub {
+                pass('rollback tx');
+                publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
+
+                $ch->commit_tx({
+                    on_success => sub {
+                        pass('commit tx');
+                        $done->send;
+                    },
+                    on_failure => failure_cb($done),
+                });
+            },
+            on_failure => failure_cb($done),
+        });
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->purge_queue({
+    queue      => 'test_q',
+    on_success => sub {
+        pass('purge queue');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->unbind_queue({
+    queue       => 'test_q',
+    exchange    => 'test_x',
+    routing_key => 'test_r',
+    on_success  => sub {
+        pass('unbind queue');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->delete_queue({
+    queue      => 'test_q',
+    on_success => sub {
+        pass('delete queue');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ch->delete_exchange({
+    exchange   => 'test_x',
+    on_success => sub {
+        pass('delete exchange');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+$done = AnyEvent->condvar;
+$ar->close({
+    on_success => sub {
+        pass('close');
+        $done->send;
+    },
+    on_failure => failure_cb($done),
+});
+$done->recv;
+
+sub failure_cb {
+    my ($cv,) = @_;
+    return sub {
+        fail(join(' ', 'on_failure:', @_));
+        $cv->send;
+    };
+}
+
+sub publish {
+    my ($ch, $message, $cv,) = @_;
+
+    $ch->publish({
+        exchange    => 'test_x',
+        routing_key => 'test_r',
+        body        => $message,
+        on_return   => sub {
+            my $response = shift;
+            fail('on_return: ' . Dumper($response));
+            $cv->send;
+        },
+    });
+
+    return;
+}
+ 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list