[libanyevent-rabbitmq-perl] 13/151: Fixed APIs.
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:00 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit 411d5747a8c06f8240338469527b37b5799510b2
Author: cooldaemon <cooldaemon at gmail.com>
Date: Tue Feb 9 12:29:31 2010 +0900
Fixed APIs.
---
lib/AnyEvent/RabbitMQ.pm | 129 +++++++++++++++--------------
lib/AnyEvent/RabbitMQ/Channel.pm | 175 ++++++++++++++++++++-------------------
xt/05_anyevent.t | 126 ++++++++++++++--------------
3 files changed, 220 insertions(+), 210 deletions(-)
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 87c08f3..b4f9a27 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -2,8 +2,10 @@ package AnyEvent::RabbitMQ;
use Data::Dumper;
use List::MoreUtils qw(none);
+
use AnyEvent::Handle;
use AnyEvent::Socket;
+
use Net::AMQP;
use Net::AMQP::Common qw(:all);
@@ -77,31 +79,29 @@ sub load_xml_spec {
}
sub connect {
- my ($self, $args,) = @_;
-
- $args->{on_success} ||= sub {};
- $args->{on_failure} ||= sub {die @_};
+ my $self = shift;
+ my %args = $self->_set_cbs(@_);
if ($self->verbose) {
- print STDERR 'connect to ', $args->{host}, ':', $args->{port}, '...', "\n";
+ print STDERR 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
}
$self->{_connect_guard} = AnyEvent::Socket::tcp_connect(
- $args->{host},
- $args->{port},
+ $args{host},
+ $args{port},
sub {
my $fh = shift
- or return $args->{on_failure}->('Error connecting to AMQP Server!');
+ or return $args{on_failure}->('Error connecting to AMQP Server!');
$self->{_handle} = AnyEvent::Handle->new(
fh => $fh,
on_error => sub {
my ($handle, $fatal, $message) = @_;
$self->clear_handle;
- $args->{on_failure}->($message);
+ $args{on_failure}->($message);
}
);
- $self->_read_loop($args->{on_failure});
- $self->_start($args,);
+ $self->_read_loop($args{on_failure});
+ $self->_start(%args,);
},
sub {
return $self->timeout;
@@ -198,7 +198,8 @@ sub _check_close_and_clean {
}
sub _start {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
if ($self->verbose) {
print STDERR 'post header', "\n";
@@ -212,11 +213,11 @@ sub _start {
my $frame = shift;
my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
- return $args->{on_failure}->('AMQPLAIN is not found in mechanisms')
+ return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
if none {$_ eq 'AMQPLAIN'} @mechanisms;
my @locales = split /\s/, $frame->method_frame->locales;
- return $args->{on_failure}->('en_US is not found in locales')
+ return $args{on_failure}->('en_US is not found in locales')
if none {$_ eq 'en_US'} @locales;
$self->_push_write(
@@ -229,23 +230,24 @@ sub _start {
},
mechanism => 'AMQPLAIN',
response => {
- LOGIN => $args->{user},
- PASSWORD => $args->{pass},
+ LOGIN => $args{user},
+ PASSWORD => $args{pass},
},
locale => 'en_US',
),
);
- $self->_tune($args,);
+ $self->_tune(%args,);
},
- $args->{on_failure},
+ $args{on_failure},
);
return $self;
}
sub _tune {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
$self->_push_read_and_valid(
'Connection::Tune',
@@ -260,50 +262,49 @@ sub _tune {
),
);
- $self->_open($args,);
+ $self->_open(%args,);
},
- $args->{on_failure},
+ $args{on_failure},
);
return $self;
}
sub _open {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
$self->_push_write_and_read(
'Connection::Open',
{
- virtual_host => $args->{vhost},
+ virtual_host => $args{vhost},
capabilities => '',
insist => 1,
},
'Connection::OpenOk',
sub {
$self->{_is_open} = 1;
- $args->{on_success}->($self);
+ $args{on_success}->($self);
},
- $args->{on_failure},
+ $args{on_failure},
);
return $self;
}
sub close {
- my ($self, $args,) = @_;
-
- $args->{on_success} ||= sub {};
- $args->{on_failure} ||= sub {die @_};
+ my $self = shift;
+ my %args = $self->_set_cbs(@_);
my $close_cb = sub {
$self->_close(
sub {
$self->_disconnect();
- $args->{on_success}->(@_);
+ $args{on_success}->(@_);
},
sub {
$self->_disconnect();
- $args->{on_failure}->(@_);
+ $args{on_failure}->(@_);
}
);
return $self;
@@ -316,14 +317,14 @@ sub close {
# for my $id ($self->channel_ids) {
for my $id (keys %{$self->channels}) { # FIXME
-# $self->get_channel($id)->close({
- $self->channels->{$id}->close({ # FIXME
+# $self->get_channel($id)->close(
+ $self->channels->{$id}->close( # FIXME
on_success => $close_cb,
on_failure => sub {
$close_cb->();
- $args->{on_failure}->(@_);
+ $args{on_failure}->(@_);
},
- });
+ );
}
return $self;
@@ -353,13 +354,11 @@ sub _disconnect {
}
sub open_channel {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = $self->_set_cbs(@_);
- $args->{on_success} ||= sub {};
- $args->{on_failure} ||= sub {die @_};
-
- my $id = $args->{id};
- return $args->{on_failure}->("Channel id $id is already in use")
+ my $id = $args{id};
+ return $args{on_failure}->("Channel id $id is already in use")
if $id && $self->has_channels($id);
if (!$id) {
@@ -368,25 +367,25 @@ sub open_channel {
$id = $candidate_id;
last;
}
- return $args->{on_failure}->('Ran out of channel ids') if !$id;
+ return $args{on_failure}->('Ran out of channel ids') if !$id;
}
- my $channel = AnyEvent::RabbitMQ::Channel->new({
+ my $channel = AnyEvent::RabbitMQ::Channel->new(
id => $id,
connection => $self,
- });
+ );
$self->set_channel($id => $channel);
- $channel->open({
+ $channel->open(
on_success => sub {
- $args->{on_success}->($channel);
+ $args{on_success}->($channel);
},
on_failure => sub {
$self->delete_channel($id);
- $args->{on_failure}->(@_);
+ $args{on_failure}->(@_);
},
- });
+ );
return $self;
}
@@ -472,6 +471,16 @@ sub _push_write {
return;
}
+sub _set_cbs {
+ my $self = shift;
+ my %args = @_;
+
+ $args{on_success} ||= sub {};
+ $args{on_failure} ||= sub {die @_};
+
+ return %args;
+}
+
sub DEMOLISH {
my ($self) = @_;
@@ -484,7 +493,7 @@ __END__
=head1 NAME
-RabbitFoot - A synchronous and single channel Perl AMQP client.
+AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
=head1 SYNOPSIS
@@ -492,41 +501,41 @@ RabbitFoot - A synchronous and single channel Perl AMQP client.
my $cv = AnyEvent->condvar;
- my $ar = AnyEvent::RabbitMQ->new({
+ my $ar = AnyEvent::RabbitMQ->new(
timeout => 1,
- })->load_xml_spec(
+ )->load_xml_spec(
'/path/to/amqp0-8.xml',
- )->connect({
+ )->connect(
host => 'localhosti',
port => 5672,
user => 'guest',
port => 'guest',
vhost => '/',
on_success => sub {
- $ar->open_channel({
+ $ar->open_channel(
on_success => sub {
my $channel = shift;
- $channel->declare_exchange({
+ $channel->declare_exchange(
exchange => 'test_exchange',
on_success => sub {
$cv->send('Declared exchange');
},
on_failure => $cv,
- });
+ );
},
on_failure => $cv,
- });
+ );
},
on_failure => $cv,
- });
+ );
print $cv->recv, "\n";
=head1 DESCRIPTION
-RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
+AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
-You can use RabbitFoot to -
+You can use AnyEvent::RabbitMQ to -
* Declare and delete exchanges
* Declare, delete, bind and unbind queues
@@ -534,7 +543,7 @@ You can use RabbitFoot to -
* Publish, consume, get, ack and recover messages
* Select, commit and rollback transactions
-RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
=head1 AUTHOR
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index 9054591..aab1ab7 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -63,18 +63,16 @@ __PACKAGE__->meta->make_immutable;
no Moose;
sub open {
- my ($self, $args,) = @_;
-
- $args->{on_success} ||= sub {};
- $args->{on_failur} ||= sub {die @_};
+ my $self = shift;
+ my %args = @_;
$self->connection->_push_write_and_read(
'Channel::Open', {}, 'Channel::OpenOk',
sub {
$self->{_is_open} = 1;
- $args->{on_success}->();
+ $args{on_success}->();
},
- $args->{on_failur},
+ $args{on_failur},
$self->id,
);
@@ -82,24 +80,22 @@ sub open {
}
sub close {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = $self->connection->_set_cbs(@_);
return $self if !$self->_is_open;
- $args->{on_success} ||= sub {};
- $args->{on_failur} ||= sub {die @_};
-
- return $self->_close($args) if 0 == $self->count_consumer_cbs;
+ return $self->_close(%args) if 0 == $self->count_consumer_cbs;
for my $consumer_tag ($self->consumer_tags) {
$self->cancel({
consumer_tag => $consumer_tag,
on_success => sub {
- $self->_close($args);
+ $self->_close(%args);
},
on_failure => sub {
- $self->_close($args);
- $args->{on_failure}->(@_);
+ $self->_close(%args);
+ $args{on_failure}->(@_);
}
});
}
@@ -108,19 +104,20 @@ sub close {
}
sub _close {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
$self->connection->_push_write_and_read(
'Channel::Close', {}, 'Channel::CloseOk',
sub {
$self->{_is_open} = 0;
$self->connection->delete_channel($self->id);
- $args->{on_success}->();
+ $args{on_success}->();
},
sub {
$self->{_is_open} = 0;
$self->connection->delete_channel($self->id);
- $args->{on_failur}->();
+ $args{on_failur}->();
},
$self->id,
);
@@ -129,8 +126,8 @@ sub _close {
}
sub declare_exchange {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Exchange::Declare',
@@ -140,7 +137,7 @@ sub declare_exchange {
durable => 0,
auto_delete => 0,
internal => 0,
- %$args, # exchange
+ %args, # exchange
ticket => 0,
nowait => 0, # FIXME
},
@@ -154,14 +151,14 @@ sub declare_exchange {
}
sub delete_exchange {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Exchange::Delete',
{
if_unused => 0,
- %$args, # exchange
+ %args, # exchange
ticket => 0,
nowait => 0, # FIXME
},
@@ -175,8 +172,8 @@ sub delete_exchange {
}
sub declare_queue {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Queue::Declare',
@@ -187,7 +184,7 @@ sub declare_queue {
exclusive => 0,
auto_delete => 0,
no_ack => 1,
- %$args,
+ %args,
ticket => 0,
nowait => 0, # FIXME
},
@@ -199,15 +196,15 @@ sub declare_queue {
}
sub bind_queue {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Queue::Bind',
{
- %$args, # queue, exchange, routing_key
- ticket => 0,
- nowait => 0, # FIXME
+ %args, # queue, exchange, routing_key
+ ticket => 0,
+ nowait => 0, # FIXME
},
'Queue::BindOk',
$cb,
@@ -219,14 +216,14 @@ sub bind_queue {
}
sub unbind_queue {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Queue::Unbind',
{
- %$args, # queue, exchange, routing_key
- ticket => 0,
+ %args, # queue, exchange, routing_key
+ ticket => 0,
},
'Queue::UnbindOk',
$cb,
@@ -238,13 +235,13 @@ sub unbind_queue {
}
sub purge_queue {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Queue::Purge',
{
- %$args, # queue
+ %args, # queue
ticket => 0,
nowait => 0, # FIXME
},
@@ -258,15 +255,15 @@ sub purge_queue {
}
sub delete_queue {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Queue::Delete',
{
if_unused => 0,
if_empty => 0,
- %$args, # queue
+ %args, # queue
ticket => 0,
nowait => 0, # FIXME
},
@@ -280,39 +277,40 @@ sub delete_queue {
}
sub publish {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
- my $header_args = delete $args->{header} || {};
- my $body = delete $args->{body} || '';
- my $return_cb = delete $args->{on_return} || sub {};
+ my $header_args = delete $args{header} || {};
+ my $body = delete $args{body} || '';
+ my $return_cb = delete $args{on_return} || sub {};
$self->_publish(
- $args,
+ %args,
)->_header(
$header_args, $body,
)->_body(
$body,
);
- return $self if !$args->{mandatory} && !$args->{immediate};
+ return $self if !$args{mandatory} && !$args{immediate};
$self->set_return_cbs(
- ($args->{exchange} || '') . '_' . $args->{routing_key}
- => $return_cb
+ ($args{exchange} || '') . '_' . $args{routing_key} => $return_cb
);
return $self;
}
sub _publish {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
$self->connection->_push_write(
Net::AMQP::Protocol::Basic::Publish->new(
exchange => '',
mandatory => 0,
immediate => 0,
- %$args, # routing_key
+ %args, # routing_key
ticket => 0,
),
$self->id,
@@ -364,10 +362,10 @@ sub _body {
}
sub consume {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- my $consumer_cb = delete $args->{on_consume} || sub {};
+ my $consumer_cb = delete $args{on_consume} || sub {};
$self->connection->_push_write_and_read(
'Basic::Consume',
@@ -376,7 +374,7 @@ sub consume {
no_local => 0,
no_ack => 1,
exclusive => 0,
- %$args, # queue
+ %args, # queue
ticket => 0,
nowait => 0, # FIXME
},
@@ -396,25 +394,25 @@ sub consume {
}
sub cancel {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $failure_cb->('consumer_tag is not set')
- if !defined $args->{consumer_tag};
+ if !defined $args{consumer_tag};
return $failure_cb->('Unknown consumer_tag')
- if !$self->has_consumer_cbs($args->{consumer_tag});
+ if !$self->has_consumer_cbs($args{consumer_tag});
$self->connection->_push_write_and_read(
'Basic::Cancel',
{
- %$args, # consumer_tag
+ %args, # consumer_tag
nowait => 0,
},
'Basic::CancelOk',
sub {
my $frame = shift;
- $self->delete_consumer_cbs($args->{consumer_tag});
+ $self->delete_consumer_cbs($args{consumer_tag});
$cb->($frame);
},
$failure_cb,
@@ -425,14 +423,14 @@ sub cancel {
}
sub get {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Basic::Get',
{
no_ack => 1,
- %$args, # queue
+ %args, # queue
ticket => 0,
},
[qw(Basic::GetOk Basic::GetEmpty)],
@@ -450,15 +448,16 @@ sub get {
}
sub ack {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
$self->connection->_push_write(
Net::AMQP::Protocol::Basic::Ack->new(
delivery_tag => 0,
multiple => (
- defined $args->{delivery_tag} && $args->{delivery_tag} != 0 ? 0 : 1
+ defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
),
- %$args,
+ %args,
),
$self->id,
);
@@ -467,14 +466,14 @@ sub ack {
}
sub qos {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Basic::Qos',
{
prefetch_count => 1,
- %$args,
+ %args,
prefetch_size => 0,
global => 0,
},
@@ -488,12 +487,13 @@ sub qos {
}
sub recover {
- my ($self, $args,) = @_;
+ my $self = shift;
+ my %args = @_;
$self->connection->_push_write(
Net::AMQP::Protocol::Basic::Recover->new(
requeue => 0,
- %$args,
+ %args,
),
$self->id,
);
@@ -502,8 +502,8 @@ sub recover {
}
sub select_tx {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Tx::Select', {}, 'Tx::SelectOk',
@@ -516,8 +516,8 @@ sub select_tx {
}
sub commit_tx {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Tx::Commit', {}, 'Tx::CommitOk',
@@ -530,8 +530,8 @@ sub commit_tx {
}
sub rollback_tx {
- my ($self, $args,) = @_;
- my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+ my $self = shift;
+ my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
$self->connection->_push_write_and_read(
'Tx::Rollback', {}, 'Tx::RollbackOk',
@@ -543,15 +543,6 @@ sub rollback_tx {
return $self;
}
-sub _delete_callbacks {
- my ($self, $args,) = @_;
-
- my $cb = delete $args->{on_success} || sub {};
- my $failure_cb = delete $args->{on_failure} || sub {die @_};
-
- return $cb, $failure_cb;
-}
-
sub _push_queue_or_consume {
my ($self, $frame, $failure_cb,) = @_;
@@ -608,6 +599,16 @@ sub _push_read_header_and_body {
return $self;
}
+sub _delete_cbs {
+ my $self = shift;
+ my %args = @_;
+
+ my $cb = delete $args{on_success} || sub {};
+ my $failure_cb = delete $args{on_failure} || sub {die @_};
+
+ return $cb, $failure_cb, %args;
+}
+
sub DEMOLISH {
my ($self) = @_;
diff --git a/xt/05_anyevent.t b/xt/05_anyevent.t
index ffdc917..f734e0b 100644
--- a/xt/05_anyevent.t
+++ b/xt/05_anyevent.t
@@ -25,14 +25,14 @@ plan tests => 24;
use AnyEvent::RabbitMQ;
-my $ar = AnyEvent::RabbitMQ->new({timeout => 1, verbose => 1,});
+my $ar = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 1,);
lives_ok sub {
$ar->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
}, 'load xml spec';
my $done = AnyEvent->condvar;
-$ar->connect({
+$ar->connect(
(map {$_ => $conf->{$_}} qw(host port user pass vhost)),
on_success => sub {
my $ar = shift;
@@ -40,45 +40,45 @@ $ar->connect({
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
my $ch;
-$ar->open_channel({
+$ar->open_channel(
on_success => sub {
$ch = shift;
isa_ok($ch, 'AnyEvent::RabbitMQ::Channel');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->declare_exchange({
+$ch->declare_exchange(
exchange => 'test_x',
on_success => sub {
pass('declare exchange');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->declare_queue({
+$ch->declare_queue(
queue => 'test_q',
on_success => sub {
pass('declare queue');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->bind_queue({
+$ch->bind_queue(
queue => 'test_q',
exchange => 'test_x',
routing_key => 'test_r',
@@ -87,11 +87,11 @@ $ch->bind_queue({
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->consume({
+$ch->consume(
queue => 'test_q',
on_consume => sub {
my $response = shift;
@@ -100,24 +100,24 @@ $ch->consume({
pass('publish and consume message');
return if $message ne 'cancel';
- $ch->cancel({
+ $ch->cancel(
consumer_tag => $response->{deliver}->method_frame->consumer_tag,
on_success => sub {
pass('cancel');
$done->send;
},
on_failure => failure_cb($done),
- });
+ );
},
on_failure => failure_cb($done),
-});
+);
publish($ch, 'Hello RabbitMQ', $done,);
publish($ch, 'cancel', $done,);
$done->recv;
$done = AnyEvent->condvar;
publish($ch, 'I love RabbitMQ', $done,);
-$ch->get({
+$ch->get(
queue => 'test_q',
on_success => sub {
my $response = shift;
@@ -125,11 +125,11 @@ $ch->get({
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->get({
+$ch->get(
queue => 'test_q',
on_success => sub {
my $response = shift;
@@ -137,57 +137,57 @@ $ch->get({
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->consume({
+$ch->consume(
queue => 'test_q',
no_ack => 0,
on_consume => sub {
my $response = shift;
- $ch->ack({
+ $ch->ack(
delivery_tag => $response->{deliver}->method_frame->delivery_tag
- });
+ );
pass('ack deliver');
- $ch->cancel({
+ $ch->cancel(
consumer_tag => $response->{deliver}->method_frame->consumer_tag,
on_success => sub {
pass('cancel');
$done->send;
},
on_failure => failure_cb($done),
- });
+ );
},
on_failure => failure_cb($done),
-});
+);
publish($ch, 'NO RabbitMQ, NO LIFE', $done,);
$done->recv;
$done = AnyEvent->condvar;
publish($ch, 'RabbitMQ is cool', $done,);
-$ch->get({
+$ch->get(
queue => 'test_q',
no_ack => 0,
on_success => sub {
my $response = shift;
- $ch->ack({
+ $ch->ack(
delivery_tag => $response->{ok}->method_frame->delivery_tag
- });
+ );
pass('ack get');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
my @responses;
-$ch->qos({
+$ch->qos(
prefetch_count => 2,
on_success => sub {
- $ch->consume({
+ $ch->consume(
queue => 'test_q',
no_ack => 0,
on_consume => sub {
@@ -197,39 +197,39 @@ $ch->qos({
$done->send;
},
on_failure => failure_cb($done),
- });
+ );
},
on_failure => failure_cb($done),
-});
+);
publish($ch, 'RabbitMQ is excellent', $done,);
publish($ch, 'RabbitMQ is fantastic', $done,);
$done->recv;
pass('qos');
for my $response (@responses) {
- $ch->ack({
+ $ch->ack(
delivery_tag => $response->{deliver}->method_frame->delivery_tag,
- });
+ );
}
$done = AnyEvent->condvar;
-$ch->cancel({
+$ch->cancel(
consumer_tag => $responses[0]->{deliver}->method_frame->consumer_tag,
on_success => sub {
- $ch->qos({
+ $ch->qos(
on_success => sub {
$done->send;
},
on_failure => failure_cb($done),
- });
+ );
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
my $recover_count = 0;
-$ch->consume({
+$ch->consume(
queue => 'test_q',
no_ack => 0,
on_consume => sub {
@@ -240,63 +240,63 @@ $ch->consume({
return;
}
- $ch->ack({
+ $ch->ack(
delivery_tag => $response->{deliver}->method_frame->delivery_tag
- });
+ );
- $ch->cancel({
+ $ch->cancel(
consumer_tag => $response->{deliver}->method_frame->consumer_tag,
on_success => sub {
$done->send;
},
on_failure => failure_cb($done),
- });
+ );
},
on_failure => failure_cb($done),
-});
+);
publish($ch, 'RabbitMQ is powerful', $done,);
$done->recv;
pass('recover');
$done = AnyEvent->condvar;
-$ch->select_tx({
+$ch->select_tx(
on_success => sub {
pass('select tx');
publish($ch, 'RabbitMQ is highly reliable systems', $done,);
- $ch->rollback_tx({
+ $ch->rollback_tx(
on_success => sub {
pass('rollback tx');
publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
- $ch->commit_tx({
+ $ch->commit_tx(
on_success => sub {
pass('commit tx');
$done->send;
},
on_failure => failure_cb($done),
- });
+ );
},
on_failure => failure_cb($done),
- });
+ );
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->purge_queue({
+$ch->purge_queue(
queue => 'test_q',
on_success => sub {
pass('purge queue');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->unbind_queue({
+$ch->unbind_queue(
queue => 'test_q',
exchange => 'test_x',
routing_key => 'test_r',
@@ -305,39 +305,39 @@ $ch->unbind_queue({
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->delete_queue({
+$ch->delete_queue(
queue => 'test_q',
on_success => sub {
pass('delete queue');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ch->delete_exchange({
+$ch->delete_exchange(
exchange => 'test_x',
on_success => sub {
pass('delete exchange');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
$done = AnyEvent->condvar;
-$ar->close({
+$ar->close(
on_success => sub {
pass('close');
$done->send;
},
on_failure => failure_cb($done),
-});
+);
$done->recv;
sub failure_cb {
@@ -351,7 +351,7 @@ sub failure_cb {
sub publish {
my ($ch, $message, $cv,) = @_;
- $ch->publish({
+ $ch->publish(
exchange => 'test_x',
routing_key => 'test_r',
body => $message,
@@ -360,7 +360,7 @@ sub publish {
fail('on_return: ' . Dumper($response));
$cv->send;
},
- });
+ );
return;
}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list