[libanyevent-rabbitmq-perl] 13/151: Fixed APIs.

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:00 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit 411d5747a8c06f8240338469527b37b5799510b2
Author: cooldaemon <cooldaemon at gmail.com>
Date:   Tue Feb 9 12:29:31 2010 +0900

    Fixed APIs.
---
 lib/AnyEvent/RabbitMQ.pm         | 129 +++++++++++++++--------------
 lib/AnyEvent/RabbitMQ/Channel.pm | 175 ++++++++++++++++++++-------------------
 xt/05_anyevent.t                 | 126 ++++++++++++++--------------
 3 files changed, 220 insertions(+), 210 deletions(-)

diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 87c08f3..b4f9a27 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -2,8 +2,10 @@ package AnyEvent::RabbitMQ;
 
 use Data::Dumper;
 use List::MoreUtils qw(none);
+
 use AnyEvent::Handle;
 use AnyEvent::Socket;
+
 use Net::AMQP;
 use Net::AMQP::Common qw(:all);
 
@@ -77,31 +79,29 @@ sub load_xml_spec {
 }
 
 sub connect {
-    my ($self, $args,) = @_;
-
-    $args->{on_success} ||= sub {};
-    $args->{on_failure} ||= sub {die @_};
+    my $self = shift;
+    my %args = $self->_set_cbs(@_);
 
     if ($self->verbose) {
-        print STDERR 'connect to ', $args->{host}, ':', $args->{port}, '...', "\n";
+        print STDERR 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
     }
 
     $self->{_connect_guard} = AnyEvent::Socket::tcp_connect(
-        $args->{host},
-        $args->{port},
+        $args{host},
+        $args{port},
         sub {
             my $fh = shift
-                or return $args->{on_failure}->('Error connecting to AMQP Server!');
+                or return $args{on_failure}->('Error connecting to AMQP Server!');
             $self->{_handle} = AnyEvent::Handle->new(
                 fh       => $fh,
                 on_error => sub {
                     my ($handle, $fatal, $message) = @_;
                     $self->clear_handle;
-                    $args->{on_failure}->($message);
+                    $args{on_failure}->($message);
                 }
             );
-            $self->_read_loop($args->{on_failure});
-            $self->_start($args,);
+            $self->_read_loop($args{on_failure});
+            $self->_start(%args,);
         },
         sub {
             return $self->timeout;
@@ -198,7 +198,8 @@ sub _check_close_and_clean {
 }
 
 sub _start {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
     if ($self->verbose) {
         print STDERR 'post header', "\n";
@@ -212,11 +213,11 @@ sub _start {
             my $frame = shift;
 
             my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
-            return $args->{on_failure}->('AMQPLAIN is not found in mechanisms')
+            return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
                 if none {$_ eq 'AMQPLAIN'} @mechanisms;
 
             my @locales = split /\s/, $frame->method_frame->locales;
-            return $args->{on_failure}->('en_US is not found in locales')
+            return $args{on_failure}->('en_US is not found in locales')
                 if none {$_ eq 'en_US'} @locales;
 
             $self->_push_write(
@@ -229,23 +230,24 @@ sub _start {
                     },
                     mechanism => 'AMQPLAIN',
                     response => {
-                        LOGIN    => $args->{user},
-                        PASSWORD => $args->{pass},
+                        LOGIN    => $args{user},
+                        PASSWORD => $args{pass},
                     },
                     locale => 'en_US',
                 ),
             );
 
-            $self->_tune($args,);
+            $self->_tune(%args,);
         },
-        $args->{on_failure},
+        $args{on_failure},
     );
 
     return $self;
 }
 
 sub _tune {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
     $self->_push_read_and_valid(
         'Connection::Tune',
@@ -260,50 +262,49 @@ sub _tune {
                 ),
             );
 
-            $self->_open($args,);
+            $self->_open(%args,);
         },
-        $args->{on_failure},
+        $args{on_failure},
     );
 
     return $self;
 }
 
 sub _open {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
     $self->_push_write_and_read(
         'Connection::Open',
         {
-            virtual_host => $args->{vhost},
+            virtual_host => $args{vhost},
             capabilities => '',
             insist       => 1,
         },
         'Connection::OpenOk', 
         sub {
             $self->{_is_open} = 1;
-            $args->{on_success}->($self);
+            $args{on_success}->($self);
         },
-        $args->{on_failure},
+        $args{on_failure},
     );
 
     return $self;
 }
 
 sub close {
-    my ($self, $args,) = @_;
-
-    $args->{on_success} ||= sub {};
-    $args->{on_failure}  ||= sub {die @_};
+    my $self = shift;
+    my %args = $self->_set_cbs(@_);
 
     my $close_cb = sub {
         $self->_close(
             sub {
                 $self->_disconnect();
-                $args->{on_success}->(@_);
+                $args{on_success}->(@_);
             },
             sub {
                 $self->_disconnect();
-                $args->{on_failure}->(@_);
+                $args{on_failure}->(@_);
             }
         );
         return $self;
@@ -316,14 +317,14 @@ sub close {
 
 #    for my $id ($self->channel_ids) {
     for my $id (keys %{$self->channels}) { # FIXME
-#        $self->get_channel($id)->close({
-         $self->channels->{$id}->close({ # FIXME
+#        $self->get_channel($id)->close(
+         $self->channels->{$id}->close( # FIXME
             on_success => $close_cb,
             on_failure => sub {
                 $close_cb->();
-                $args->{on_failure}->(@_);
+                $args{on_failure}->(@_);
             },
-        });
+        );
     }
 
     return $self;
@@ -353,13 +354,11 @@ sub _disconnect {
 }
 
 sub open_channel {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = $self->_set_cbs(@_);
 
-    $args->{on_success} ||= sub {};
-    $args->{on_failure} ||= sub {die @_};
-
-    my $id = $args->{id};
-    return $args->{on_failure}->("Channel id $id is already in use")
+    my $id = $args{id};
+    return $args{on_failure}->("Channel id $id is already in use")
         if $id && $self->has_channels($id);
 
     if (!$id) {
@@ -368,25 +367,25 @@ sub open_channel {
             $id = $candidate_id;
             last;
         }
-        return $args->{on_failure}->('Ran out of channel ids') if !$id;
+        return $args{on_failure}->('Ran out of channel ids') if !$id;
     }
 
-    my $channel = AnyEvent::RabbitMQ::Channel->new({
+    my $channel = AnyEvent::RabbitMQ::Channel->new(
         id         => $id,
         connection => $self,
-    });
+    );
 
     $self->set_channel($id => $channel);
 
-    $channel->open({
+    $channel->open(
         on_success => sub {
-            $args->{on_success}->($channel);
+            $args{on_success}->($channel);
         },
         on_failure => sub {
             $self->delete_channel($id);
-            $args->{on_failure}->(@_);
+            $args{on_failure}->(@_);
         },
-    });
+    );
 
     return $self;
 }
@@ -472,6 +471,16 @@ sub _push_write {
     return;
 }
 
+sub _set_cbs {
+    my $self = shift;
+    my %args = @_;
+
+    $args{on_success} ||= sub {};
+    $args{on_failure} ||= sub {die @_};
+
+    return %args;
+}
+
 sub DEMOLISH {
     my ($self) = @_;
 
@@ -484,7 +493,7 @@ __END__
 
 =head1 NAME
 
-RabbitFoot - A synchronous and single channel Perl AMQP client.
+AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.
 
 =head1 SYNOPSIS
 
@@ -492,41 +501,41 @@ RabbitFoot - A synchronous and single channel Perl AMQP client.
 
   my $cv = AnyEvent->condvar;
 
-  my $ar = AnyEvent::RabbitMQ->new({
+  my $ar = AnyEvent::RabbitMQ->new(
       timeout => 1,
-  })->load_xml_spec(
+  )->load_xml_spec(
       '/path/to/amqp0-8.xml',
-  )->connect({
+  )->connect(
       host       => 'localhosti',
       port       => 5672,
       user       => 'guest',
       port       => 'guest',
       vhost      => '/',
       on_success => sub {
-          $ar->open_channel({
+          $ar->open_channel(
               on_success => sub {
                   my $channel = shift;
-                  $channel->declare_exchange({
+                  $channel->declare_exchange(
                       exchange   => 'test_exchange',
                       on_success => sub {
                           $cv->send('Declared exchange');
                       },
                       on_failure => $cv,
-                  });
+                  );
               },
               on_failure => $cv,
-          });
+          );
       },
       on_failure => $cv,
-  });
+  );
 
   print $cv->recv, "\n";
 
 =head1 DESCRIPTION
 
-RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
+AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
 
-You can use RabbitFoot to -
+You can use AnyEvent::RabbitMQ to -
 
   * Declare and delete exchanges
   * Declare, delete, bind and unbind queues
@@ -534,7 +543,7 @@ You can use RabbitFoot to -
   * Publish, consume, get, ack and recover messages
   * Select, commit and rollback transactions
 
-RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+AnyEvnet::RabbitMQ is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
 
 =head1 AUTHOR
 
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index 9054591..aab1ab7 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -63,18 +63,16 @@ __PACKAGE__->meta->make_immutable;
 no Moose;
 
 sub open {
-    my ($self, $args,) = @_;
-
-    $args->{on_success} ||= sub {};
-    $args->{on_failur}  ||= sub {die @_};
+    my $self = shift;
+    my %args = @_;
 
     $self->connection->_push_write_and_read(
         'Channel::Open', {}, 'Channel::OpenOk',
         sub {
             $self->{_is_open} = 1;
-            $args->{on_success}->();
+            $args{on_success}->();
         },
-        $args->{on_failur},
+        $args{on_failur},
         $self->id,
     );
 
@@ -82,24 +80,22 @@ sub open {
 }
 
 sub close {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = $self->connection->_set_cbs(@_);
 
     return $self if !$self->_is_open;
 
-    $args->{on_success} ||= sub {};
-    $args->{on_failur}  ||= sub {die @_};
-
-    return $self->_close($args) if 0 == $self->count_consumer_cbs;
+    return $self->_close(%args) if 0 == $self->count_consumer_cbs;
 
     for my $consumer_tag ($self->consumer_tags) {
         $self->cancel({
             consumer_tag => $consumer_tag,
             on_success   => sub {
-                $self->_close($args);
+                $self->_close(%args);
             },
             on_failure   => sub {
-                $self->_close($args);
-                $args->{on_failure}->(@_);
+                $self->_close(%args);
+                $args{on_failure}->(@_);
             }
         });
     }
@@ -108,19 +104,20 @@ sub close {
 }
 
 sub _close {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
     $self->connection->_push_write_and_read(
         'Channel::Close', {}, 'Channel::CloseOk',
         sub {
             $self->{_is_open} = 0;
             $self->connection->delete_channel($self->id);
-            $args->{on_success}->();
+            $args{on_success}->();
         },
         sub {
             $self->{_is_open} = 0;
             $self->connection->delete_channel($self->id);
-            $args->{on_failur}->();
+            $args{on_failur}->();
         },
         $self->id,
     );
@@ -129,8 +126,8 @@ sub _close {
 }
 
 sub declare_exchange {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Exchange::Declare',
@@ -140,7 +137,7 @@ sub declare_exchange {
             durable     => 0,
             auto_delete => 0,
             internal    => 0,
-            %$args, # exchange
+            %args, # exchange
             ticket      => 0,
             nowait      => 0, # FIXME
         },
@@ -154,14 +151,14 @@ sub declare_exchange {
 }
 
 sub delete_exchange {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Exchange::Delete',
         {
             if_unused => 0,
-            %$args, # exchange
+            %args, # exchange
             ticket    => 0,
             nowait    => 0, # FIXME
         },
@@ -175,8 +172,8 @@ sub delete_exchange {
 }
 
 sub declare_queue {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Queue::Declare',
@@ -187,7 +184,7 @@ sub declare_queue {
             exclusive   => 0,
             auto_delete => 0,
             no_ack      => 1,
-            %$args,
+            %args,
             ticket      => 0,
             nowait      => 0, # FIXME
         },
@@ -199,15 +196,15 @@ sub declare_queue {
 }
 
 sub bind_queue {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Queue::Bind',
         {
-            %$args, # queue, exchange, routing_key
-            ticket      => 0,
-            nowait      => 0, # FIXME
+            %args, # queue, exchange, routing_key
+            ticket => 0,
+            nowait => 0, # FIXME
         },
         'Queue::BindOk', 
         $cb,
@@ -219,14 +216,14 @@ sub bind_queue {
 }
 
 sub unbind_queue {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Queue::Unbind',
         {
-            %$args, # queue, exchange, routing_key
-            ticket      => 0,
+            %args, # queue, exchange, routing_key
+            ticket => 0,
         },
         'Queue::UnbindOk', 
         $cb,
@@ -238,13 +235,13 @@ sub unbind_queue {
 }
 
 sub purge_queue {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Queue::Purge',
         {
-            %$args, # queue
+            %args, # queue
             ticket => 0,
             nowait => 0, # FIXME
         },
@@ -258,15 +255,15 @@ sub purge_queue {
 }
 
 sub delete_queue {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Queue::Delete',
         {
             if_unused => 0,
             if_empty  => 0,
-            %$args, # queue
+            %args, # queue
             ticket    => 0,
             nowait    => 0, # FIXME
         },
@@ -280,39 +277,40 @@ sub delete_queue {
 }
 
 sub publish {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
-    my $header_args = delete $args->{header}    || {};
-    my $body        = delete $args->{body}      || '';
-    my $return_cb   = delete $args->{on_return} || sub {};
+    my $header_args = delete $args{header}    || {};
+    my $body        = delete $args{body}      || '';
+    my $return_cb   = delete $args{on_return} || sub {};
 
     $self->_publish(
-        $args,
+        %args,
     )->_header(
         $header_args, $body,
     )->_body(
         $body,
     );
 
-    return $self if !$args->{mandatory} && !$args->{immediate};
+    return $self if !$args{mandatory} && !$args{immediate};
 
     $self->set_return_cbs(
-        ($args->{exchange} || '') . '_' . $args->{routing_key}
-            => $return_cb
+        ($args{exchange} || '') . '_' . $args{routing_key} => $return_cb
     );
 
     return $self;
 }
 
 sub _publish {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
     $self->connection->_push_write(
         Net::AMQP::Protocol::Basic::Publish->new(
             exchange  => '',
             mandatory => 0,
             immediate => 0,
-            %$args, # routing_key
+            %args, # routing_key
             ticket    => 0,
         ),
         $self->id,
@@ -364,10 +362,10 @@ sub _body {
 }
 
 sub consume {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    my $consumer_cb = delete $args->{on_consume} || sub {};
+    my $consumer_cb = delete $args{on_consume} || sub {};
     
     $self->connection->_push_write_and_read(
         'Basic::Consume',
@@ -376,7 +374,7 @@ sub consume {
             no_local     => 0,
             no_ack       => 1,
             exclusive    => 0,
-            %$args, # queue
+            %args, # queue
             ticket       => 0,
             nowait       => 0, # FIXME
         },
@@ -396,25 +394,25 @@ sub consume {
 }
 
 sub cancel {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     return $failure_cb->('consumer_tag is not set')
-        if !defined $args->{consumer_tag};
+        if !defined $args{consumer_tag};
 
     return $failure_cb->('Unknown consumer_tag')
-        if !$self->has_consumer_cbs($args->{consumer_tag});
+        if !$self->has_consumer_cbs($args{consumer_tag});
 
     $self->connection->_push_write_and_read(
         'Basic::Cancel',
         {
-            %$args, # consumer_tag
+            %args, # consumer_tag
             nowait => 0,
         },
         'Basic::CancelOk', 
         sub {
             my $frame = shift;
-            $self->delete_consumer_cbs($args->{consumer_tag});
+            $self->delete_consumer_cbs($args{consumer_tag});
             $cb->($frame);
         },
         $failure_cb,
@@ -425,14 +423,14 @@ sub cancel {
 }
 
 sub get {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Basic::Get',
         {
             no_ack => 1,
-            %$args, # queue
+            %args, # queue
             ticket => 0,
         },
         [qw(Basic::GetOk Basic::GetEmpty)], 
@@ -450,15 +448,16 @@ sub get {
 }
 
 sub ack {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
     $self->connection->_push_write(
         Net::AMQP::Protocol::Basic::Ack->new(
             delivery_tag => 0,
             multiple     => (
-                defined $args->{delivery_tag} && $args->{delivery_tag} != 0 ? 0 : 1
+                defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
             ),
-            %$args,
+            %args,
         ),
         $self->id,
     );
@@ -467,14 +466,14 @@ sub ack {
 }
 
 sub qos {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Basic::Qos',
         {
             prefetch_count => 1,
-            %$args,
+            %args,
             prefetch_size  => 0,
             global         => 0,
         },
@@ -488,12 +487,13 @@ sub qos {
 }
 
 sub recover {
-    my ($self, $args,) = @_;
+    my $self = shift;
+    my %args = @_;
 
     $self->connection->_push_write(
         Net::AMQP::Protocol::Basic::Recover->new(
             requeue => 0,
-            %$args,
+            %args,
         ),
         $self->id,
     );
@@ -502,8 +502,8 @@ sub recover {
 }
 
 sub select_tx {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Tx::Select', {}, 'Tx::SelectOk',
@@ -516,8 +516,8 @@ sub select_tx {
 }
 
 sub commit_tx {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Tx::Commit', {}, 'Tx::CommitOk',
@@ -530,8 +530,8 @@ sub commit_tx {
 }
 
 sub rollback_tx {
-    my ($self, $args,) = @_;
-    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+    my $self = shift;
+    my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
     $self->connection->_push_write_and_read(
         'Tx::Rollback', {}, 'Tx::RollbackOk',
@@ -543,15 +543,6 @@ sub rollback_tx {
     return $self;
 }
 
-sub _delete_callbacks {
-    my ($self, $args,) = @_;
-
-    my $cb         = delete $args->{on_success} || sub {};
-    my $failure_cb = delete $args->{on_failure} || sub {die @_};
-
-    return $cb, $failure_cb;
-}
-
 sub _push_queue_or_consume {
     my ($self, $frame, $failure_cb,) = @_;
 
@@ -608,6 +599,16 @@ sub _push_read_header_and_body {
     return $self;
 }
 
+sub _delete_cbs {
+    my $self = shift;
+    my %args = @_;
+
+    my $cb         = delete $args{on_success} || sub {};
+    my $failure_cb = delete $args{on_failure} || sub {die @_};
+
+    return $cb, $failure_cb, %args;
+}
+
 sub DEMOLISH {
     my ($self) = @_;
 
diff --git a/xt/05_anyevent.t b/xt/05_anyevent.t
index ffdc917..f734e0b 100644
--- a/xt/05_anyevent.t
+++ b/xt/05_anyevent.t
@@ -25,14 +25,14 @@ plan tests => 24;
 
 use AnyEvent::RabbitMQ;
 
-my $ar = AnyEvent::RabbitMQ->new({timeout => 1, verbose => 1,});
+my $ar = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 1,);
 
 lives_ok sub {
     $ar->load_xml_spec($FindBin::Bin . '/../fixed_amqp0-8.xml')
 }, 'load xml spec';
 
 my $done = AnyEvent->condvar;
-$ar->connect({
+$ar->connect(
     (map {$_ => $conf->{$_}} qw(host port user pass vhost)),
     on_success => sub {
         my $ar = shift;
@@ -40,45 +40,45 @@ $ar->connect({
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
 my $ch;
-$ar->open_channel({
+$ar->open_channel(
     on_success => sub {
         $ch = shift;
         isa_ok($ch, 'AnyEvent::RabbitMQ::Channel');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->declare_exchange({
+$ch->declare_exchange(
     exchange   => 'test_x',
     on_success => sub {
         pass('declare exchange');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->declare_queue({
+$ch->declare_queue(
     queue      => 'test_q',
     on_success => sub {
         pass('declare queue');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->bind_queue({
+$ch->bind_queue(
     queue       => 'test_q',
     exchange    => 'test_x',
     routing_key => 'test_r',
@@ -87,11 +87,11 @@ $ch->bind_queue({
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->consume({
+$ch->consume(
     queue      => 'test_q',
     on_consume => sub {
         my $response = shift;
@@ -100,24 +100,24 @@ $ch->consume({
         pass('publish and consume message');
         return if $message ne 'cancel';
 
-        $ch->cancel({
+        $ch->cancel(
             consumer_tag => $response->{deliver}->method_frame->consumer_tag,
             on_success   => sub {
                 pass('cancel');
                 $done->send;
             },
             on_failure   => failure_cb($done),
-        });
+        );
     },
     on_failure => failure_cb($done),
-});
+);
 publish($ch, 'Hello RabbitMQ', $done,);
 publish($ch, 'cancel', $done,);
 $done->recv;
 
 $done = AnyEvent->condvar;
 publish($ch, 'I love RabbitMQ', $done,);
-$ch->get({
+$ch->get(
     queue      => 'test_q',
     on_success => sub {
         my $response = shift;
@@ -125,11 +125,11 @@ $ch->get({
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->get({
+$ch->get(
     queue      => 'test_q',
     on_success => sub {
         my $response = shift;
@@ -137,57 +137,57 @@ $ch->get({
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->consume({
+$ch->consume(
     queue      => 'test_q',
     no_ack     => 0,
     on_consume => sub {
         my $response = shift;
-        $ch->ack({
+        $ch->ack(
             delivery_tag => $response->{deliver}->method_frame->delivery_tag
-        });
+        );
         pass('ack deliver');
 
-        $ch->cancel({
+        $ch->cancel(
             consumer_tag => $response->{deliver}->method_frame->consumer_tag,
             on_success   => sub {
                 pass('cancel');
                 $done->send;
             },
             on_failure   => failure_cb($done),
-        });
+        );
     },
     on_failure => failure_cb($done),
-});
+);
 publish($ch, 'NO RabbitMQ, NO LIFE', $done,);
 $done->recv;
 
 $done = AnyEvent->condvar;
 publish($ch, 'RabbitMQ is cool', $done,);
-$ch->get({
+$ch->get(
     queue      => 'test_q',
     no_ack     => 0,
     on_success => sub {
         my $response = shift;
-        $ch->ack({
+        $ch->ack(
             delivery_tag => $response->{ok}->method_frame->delivery_tag
-        });
+        );
         pass('ack get');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
 my @responses;
-$ch->qos({
+$ch->qos(
     prefetch_count => 2,
     on_success => sub {
-        $ch->consume({
+        $ch->consume(
             queue      => 'test_q',
             no_ack     => 0,
             on_consume => sub {
@@ -197,39 +197,39 @@ $ch->qos({
                 $done->send;
             },
             on_failure => failure_cb($done),
-        });
+        );
     },
     on_failure => failure_cb($done),
-});
+);
 publish($ch, 'RabbitMQ is excellent', $done,);
 publish($ch, 'RabbitMQ is fantastic', $done,);
 $done->recv;
 pass('qos');
 
 for my $response (@responses) {
-    $ch->ack({
+    $ch->ack(
         delivery_tag => $response->{deliver}->method_frame->delivery_tag,
-    });
+    );
 }
 
 $done = AnyEvent->condvar;
-$ch->cancel({
+$ch->cancel(
     consumer_tag => $responses[0]->{deliver}->method_frame->consumer_tag,
     on_success   => sub {
-        $ch->qos({
+        $ch->qos(
             on_success => sub {
                 $done->send;
             },
             on_failure => failure_cb($done),
-        });
+        );
     },
     on_failure   => failure_cb($done),
-});
+);
 $done->recv;
  
 $done = AnyEvent->condvar;
 my $recover_count = 0;
-$ch->consume({
+$ch->consume(
     queue      => 'test_q',
     no_ack     => 0,
     on_consume => sub {
@@ -240,63 +240,63 @@ $ch->consume({
             return;
         }
 
-        $ch->ack({
+        $ch->ack(
             delivery_tag => $response->{deliver}->method_frame->delivery_tag
-        });
+        );
 
-        $ch->cancel({
+        $ch->cancel(
             consumer_tag => $response->{deliver}->method_frame->consumer_tag,
             on_success   => sub {
                 $done->send;
             },
             on_failure   => failure_cb($done),
-        });
+        );
     },
     on_failure => failure_cb($done),
-});
+);
 publish($ch, 'RabbitMQ is powerful', $done,);
 $done->recv;
 pass('recover');
 
 $done = AnyEvent->condvar;
-$ch->select_tx({
+$ch->select_tx(
     on_success => sub {
         pass('select tx');
         publish($ch, 'RabbitMQ is highly reliable systems', $done,);
 
-        $ch->rollback_tx({
+        $ch->rollback_tx(
             on_success => sub {
                 pass('rollback tx');
                 publish($ch, 'RabbitMQ is highly reliable systems.', $done,);
 
-                $ch->commit_tx({
+                $ch->commit_tx(
                     on_success => sub {
                         pass('commit tx');
                         $done->send;
                     },
                     on_failure => failure_cb($done),
-                });
+                );
             },
             on_failure => failure_cb($done),
-        });
+        );
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->purge_queue({
+$ch->purge_queue(
     queue      => 'test_q',
     on_success => sub {
         pass('purge queue');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->unbind_queue({
+$ch->unbind_queue(
     queue       => 'test_q',
     exchange    => 'test_x',
     routing_key => 'test_r',
@@ -305,39 +305,39 @@ $ch->unbind_queue({
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->delete_queue({
+$ch->delete_queue(
     queue      => 'test_q',
     on_success => sub {
         pass('delete queue');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ch->delete_exchange({
+$ch->delete_exchange(
     exchange   => 'test_x',
     on_success => sub {
         pass('delete exchange');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 $done = AnyEvent->condvar;
-$ar->close({
+$ar->close(
     on_success => sub {
         pass('close');
         $done->send;
     },
     on_failure => failure_cb($done),
-});
+);
 $done->recv;
 
 sub failure_cb {
@@ -351,7 +351,7 @@ sub failure_cb {
 sub publish {
     my ($ch, $message, $cv,) = @_;
 
-    $ch->publish({
+    $ch->publish(
         exchange    => 'test_x',
         routing_key => 'test_r',
         body        => $message,
@@ -360,7 +360,7 @@ sub publish {
             fail('on_return: ' . Dumper($response));
             $cv->send;
         },
-    });
+    );
 
     return;
 }

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list