[libanyevent-rabbitmq-perl] 26/151: Exclude the Moose form the RabbitFoot core module.

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:01 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit b55556f94e6c6747183fe5ba04eb4d84e1cb705d
Author: cooldaemon <cooldaemon at gmail.com>
Date:   Wed Feb 24 12:47:23 2010 +0900

    Exclude the Moose form the RabbitFoot core module.
---
 lib/AnyEvent/RabbitMQ.pm            | 202 ++++++++++--------------------
 lib/AnyEvent/RabbitMQ/Channel.pm    | 239 ++++++++++++++++--------------------
 lib/AnyEvent/RabbitMQ/LocalQueue.pm |  62 ++++------
 lib/RabbitFoot.pm                   |  51 ++++----
 lib/RabbitFoot/Channel.pm           |  80 ++++++------
 5 files changed, 257 insertions(+), 377 deletions(-)

diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 96a08ff..18172ad 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -1,5 +1,8 @@
 package AnyEvent::RabbitMQ;
 
+use strict;
+use warnings;
+
 use Data::Dumper;
 use List::MoreUtils qw(none);
 
@@ -12,62 +15,33 @@ use Net::AMQP::Common qw(:all);
 use AnyEvent::RabbitMQ::Channel;
 use AnyEvent::RabbitMQ::LocalQueue;
 
-use Moose;
-use MooseX::AttributeHelpers;
-
-our $VERSION = '0.01';
-
-has verbose => (
-    isa => 'Bool',
-    is  => 'rw',
-);
-
-has _connect_guard => (
-    isa     => 'Guard',
-    is      => 'ro',
-    clearer => 'clear_connect_guard',
-);
-
-has _handle => (
-    isa     => 'AnyEvent::Handle',
-    is      => 'ro',
-    clearer => 'clear_handle',
-);
-
-has _is_open => (
-    isa     => 'Bool',
-    is      => 'ro',
-    default => 0,
-);
-
-has channels => (
-    metaclass => 'Collection::Hash',
-    is        => 'ro',
-    isa       => 'HashRef[AnyEvent::RabbitMQ::Channel]',
-    default   => sub {{}},
-    provides  => {
-        set    => 'set_channel',
-        get    => 'get_channel',
-        delete => 'delete_channel',
-        keys   => 'channel_ids',
-        count  => 'count_channels',
-    },
-);
-
-has _queue => (
-    isa     => 'AnyEvent::RabbitMQ::LocalQueue',
-    is      => 'ro',
-    default => sub {
-        AnyEvent::RabbitMQ::LocalQueue->new
-    },
-);
-
-__PACKAGE__->meta->make_immutable;
-no Moose;
+our $VERSION = '1.00';
+
+sub new {
+    my $class = shift;
+    return bless {
+        verbose   => 0,
+        @_,
+        _is_open  => 0,
+        _queue    => AnyEvent::RabbitMQ::LocalQueue->new,
+        _channels => {},
+    }, $class;
+}
+
+sub channels {
+    my $self = shift;
+    return $self->{_channels};
+}
+
+sub delete_channel {
+    my $self = shift;
+    my ($id) = @_;
+    return delete $self->{_channels}->{$id};
+}
 
 sub load_xml_spec {
-    my ($self, $file,) = @_;
-    Net::AMQP::Protocol->load_xml_spec($file); # die when fail in this line.
+    my $self = shift;
+    Net::AMQP::Protocol->load_xml_spec(@_); # die when fail in this line.
     return $self;
 }
 
@@ -79,7 +53,7 @@ sub connect {
     $args{on_read_failure} ||= sub {die @_};
     $args{timeout}         ||= 0;
 
-    if ($self->verbose) {
+    if ($self->{verbose}) {
         print STDERR 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
     }
 
@@ -93,7 +67,7 @@ sub connect {
                 fh       => $fh,
                 on_error => sub {
                     my ($handle, $fatal, $message) = @_;
-                    $self->clear_handle;
+                    delete $self->{_handle};
                     $args{on_failure}->($message);
                 }
             );
@@ -111,9 +85,9 @@ sub connect {
 sub _read_loop {
     my ($self, $close_cb, $failure_cb,) = @_;
 
-    return if !$self->_handle;
+    return if !defined $self->{_handle}; # called on_error
 
-    $self->_handle->push_read(chunk => 8, sub {
+    $self->{_handle}->push_read(chunk => 8, sub {
         my $data = $_[1];
         my $stack = $_[1];
 
@@ -130,11 +104,11 @@ sub _read_loop {
             goto &_read_loop;
         }
 
-        $self->_handle->push_read(chunk => $length, sub {
+        $self->{_handle}->push_read(chunk => $length, sub {
             $stack .= $_[1];
             my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
 
-            if ($self->verbose) {
+            if ($self->{verbose}) {
                 print STDERR '[C] <-- [S] ' . Dumper($frame);
                 print STDERR '-----------', "\n";
             }
@@ -146,15 +120,11 @@ sub _read_loop {
 
             if (0 == $id) {
                 return if !$self->_check_close_and_clean($frame, $close_cb, $id,);
-                $self->_queue->push($frame);
+                $self->{_queue}->push($frame);
             } else {
-                my $channel = $self->get_channel($id);
-                if ($channel) {
-                    if (
-                        $self->_check_channel_close_and_clean($frame, $id, $channel)
-                    ) {
-                        $channel->_push_queue_or_consume($frame, $failure_cb);
-                    }
+                my $channel = $self->{_channels}->{$id};
+                if (defined $channel) {
+                    $channel->push_queue_or_consume($frame, $failure_cb);
                 } else {
                     $failure_cb->('Unknown channel id: ' . $frame->channel);
                 }
@@ -169,7 +139,8 @@ sub _read_loop {
 }
 
 sub _check_close_and_clean {
-    my ($self, $frame, $close_cb, $id,) = @_;
+    my $self = shift;
+    my ($frame, $close_cb, $id,) = @_;
 
     return 1 if !$frame->isa('Net::AMQP::Frame::Method');
 
@@ -183,30 +154,15 @@ sub _check_close_and_clean {
     return;
 }
 
-sub _check_channel_close_and_clean {
-    my ($self, $frame, $id, $channel,) = @_;
-
-    return 1 if !$frame->isa('Net::AMQP::Frame::Method');
-
-    my $method_frame = $frame->method_frame;
-    return 1 if !$method_frame->isa('Net::AMQP::Protocol::Channel::Close');
-
-    $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
-    $channel->{_is_open} = 0;
-    $channel->on_close->($frame);
-    $self->delete_channel($id);
-    return;
-}
-
 sub _start {
     my $self = shift;
     my %args = @_;
 
-    if ($self->verbose) {
+    if ($self->{verbose}) {
         print STDERR 'post header', "\n";
     }
 
-    $self->_handle->push_write(Net::AMQP::Protocol->header);
+    $self->{_handle}->push_write(Net::AMQP::Protocol->header);
 
     $self->_push_read_and_valid(
         'Connection::Start',
@@ -311,15 +267,12 @@ sub close {
         return $self;
     };
 
-#   if (0 == $self->count_channels) {
-    if (0 == scalar keys %{$self->channels}) { # FIXME
+    if (0 == scalar keys %{$self->{_channels}}) {
         return $close_cb->();
     }
 
-#    for my $id ($self->channel_ids) {
-    for my $id (keys %{$self->channels}) { # FIXME
-#        $self->get_channel($id)->close(
-         $self->channels->{$id}->close( # FIXME
+    for my $id (keys %{$self->{_channels}}) {
+         $self->{_channels}->{$id}->close(
             on_success => $close_cb,
             on_failure => sub {
                 $close_cb->();
@@ -332,9 +285,10 @@ sub close {
 }
 
 sub _close {
-    my ($self, $cb, $failure_cb,) = @_;
+    my $self = shift;
+    my ($cb, $failure_cb,) = @_;
 
-    return $self if !$self->_is_open || 0 < $self->count_channels;
+    return $self if !$self->{_is_open} || 0 < scalar keys %{$self->{_channels}};
 
     $self->_push_write_and_read(
         'Connection::Close', {}, 'Connection::CloseOk',
@@ -346,12 +300,12 @@ sub _close {
 }
 
 sub _disconnect {
-    my ($self,) = @_;
+    my $self = shift;
 
-    $self->clear_connect_guard;
-    $self->clear_handle;
+    delete $self->{_handle};
+    delete $self->{_connect_guard};
 
-    return;
+    return $self;
 }
 
 sub open_channel {
@@ -362,11 +316,11 @@ sub open_channel {
 
     my $id = $args{id};
     return $args{on_failure}->("Channel id $id is already in use")
-        if $id && $self->get_channel($id);
+        if $id && $self->{_channels}->{$id};
 
     if (!$id) {
         for my $candidate_id (1 .. (2**16 - 1)) { # FIXME
-            next if $self->get_channel($candidate_id);
+            next if defined $self->{_channels}->{$candidate_id};
             $id = $candidate_id;
             last;
         }
@@ -379,7 +333,7 @@ sub open_channel {
         on_close   => $args{on_close},
     );
 
-    $self->set_channel($id => $channel);
+    $self->{_channels}->{$id} = $channel;
 
     $channel->open(
         on_success => sub {
@@ -395,7 +349,8 @@ sub open_channel {
 }
 
 sub _push_write_and_read {
-    my ($self, $method, $args, $exp, $cb, $failure_cb, $id,) = @_;
+    my $self = shift;
+    my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
 
     $method = 'Net::AMQP::Protocol::' . $method;
     $self->_push_write(
@@ -409,10 +364,11 @@ sub _push_write_and_read {
 }
 
 sub _push_read_and_valid {
-    my ($self, $exp, $cb, $failure_cb, $id,) = @_;
+    my $self = shift;
+    my ($exp, $cb, $failure_cb, $id,) = @_;
     $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
 
-    my $queue = $id ? $self->get_channel($id)->_queue : $self->_queue;
+    my $queue = $id ? $self->{_channels}->{$id}->queue : $self->{_queue};
 
     $queue->get(sub {
         my $frame = shift;
@@ -433,45 +389,20 @@ sub _push_read_and_valid {
     });
 }
 
-sub _push_read {
-    my ($self, $cb, $failure_cb) = @_;
-
-    $self->_handle->push_read(chunk => 8, sub {
-        my $data = $_[1];
-        my $stack = $_[1];
-        return $failure_cb->('Disconnect') if length($data) <= 0;
-
-        my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
-        return $failure_cb->('Broken data was received')
-            if !defined $type_id || !defined $channel || !defined $length;
-
-        $self->_handle->push_read(chunk => $length, sub {
-            $stack .= $_[1];
-            my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
-
-            if ($self->verbose) {
-                print STDERR '[C] <-- [S] ' . Dumper($frame);
-                print STDERR '-----------', "\n";
-            }
-
-            $cb->($frame);
-        });
-    });
-}
-
 sub _push_write {
-    my ($self, $output, $id,) = @_;
+    my $self = shift;
+    my ($output, $id,) = @_;
 
     if ($output->isa('Net::AMQP::Protocol::Base')) {
         $output = $output->frame_wrap;
     }
     $output->channel($id || 0);
 
-    if ($self->verbose) {
+    if ($self->{verbose}) {
         print STDERR '[C] --> [S] ', Dumper($output), "\n";
     }
 
-    $self->_handle->push_write($output->to_raw_frame());
+    $self->{_handle}->push_write($output->to_raw_frame());
     return;
 }
 
@@ -485,9 +416,8 @@ sub _set_cbs {
     return %args;
 }
 
-sub DEMOLISH {
-    my ($self) = @_;
-
+sub DESTROY {
+    my $self = shift;
     $self->close();
     return;
 }
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index 79c4188..dc52f72 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -1,84 +1,43 @@
 package AnyEvent::RabbitMQ::Channel;
 
+use strict;
+use warnings;
+
+use Scalar::Util qw(weaken);
 use AnyEvent::RabbitMQ::LocalQueue;
 
-use Moose;
-
-our $VERSION = '0.01';
-
-has id => (
-    isa      => 'Int',
-    is       => 'rw',
-    required => 1,
-);
-
-has connection => (
-    isa      => 'AnyEvent::RabbitMQ',
-    is       => 'rw',
-    required => 1,
-    weak_ref => 1,
-);
-
-has on_close => (
-#   isa      => 'CodeRef',
-    is       => 'rw',
-    required => 1,
-);
-
-has _is_open => (
-    isa     => 'Bool',
-    is      => 'rw',
-    default => 0,
-);
-
-has _queue => (
-    isa     => 'AnyEvent::RabbitMQ::LocalQueue',
-    is      => 'ro',
-    default => sub {
-        AnyEvent::RabbitMQ::LocalQueue->new
-    },
-);
-
-has consumer_cbs => (
-    metaclass => 'Collection::Hash',
-    is        => 'ro',
-    isa       => 'HashRef[CodeRef]',
-    default   => sub {{}},
-    provides  => {
-        set    => 'set_consumer_cbs',
-        get    => 'get_consumer_cbs',
-        delete => 'delete_consumer_cbs',
-        keys   => 'consumer_tags',
-        count  => 'count_consumer_cbs',
-    },
-);
-
-has return_cbs => (
-    metaclass => 'Collection::Hash',
-    is        => 'ro',
-    isa       => 'HashRef[CodeRef]',
-    default   => sub {{}},
-    provides  => {
-        set    => 'set_return_cbs',
-        get    => 'get_return_cbs',
-    },
-);
-
-__PACKAGE__->meta->make_immutable;
-no Moose;
+our $VERSION = '1.00';
+
+sub new {
+    my $class = shift;
+    my $self = bless {
+        @_, # id, connection, on_close
+        _is_open      => 0,
+        _queue        => AnyEvent::RabbitMQ::LocalQueue->new,
+        _consumer_cbs => {},
+        _return_cbs   => {},
+    }, $class;
+    weaken($self->{connection});
+    return $self;
+}
+
+sub queue {
+    my $self = shift;
+    return $self->{_queue};
+}
 
 sub open {
     my $self = shift;
     my %args = @_;
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Channel::Open', {}, 'Channel::OpenOk',
         sub {
             $self->{_is_open} = 1;
             $args{on_success}->();
         },
         $args{on_failur},
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -86,13 +45,13 @@ sub open {
 
 sub close {
     my $self = shift;
-    my %args = $self->connection->_set_cbs(@_);
+    my %args = $self->{connection}->_set_cbs(@_);
 
-    return $self if !$self->_is_open;
+    return $self if !$self->{_is_open};
 
-    return $self->_close(%args) if 0 == $self->count_consumer_cbs;
+    return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}};
 
-    for my $consumer_tag ($self->consumer_tags) {
+    for my $consumer_tag (keys %{$self->{_consumer_cbs}}) {
         $self->cancel(
             consumer_tag => $consumer_tag,
             on_success   => sub {
@@ -112,19 +71,19 @@ sub _close {
     my $self = shift;
     my %args = @_;
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Channel::Close', {}, 'Channel::CloseOk',
         sub {
             $self->{_is_open} = 0;
-            $self->connection->delete_channel($self->id);
+            $self->{connection}->delete_channel($self->{id});
             $args{on_success}->();
         },
         sub {
             $self->{_is_open} = 0;
-            $self->connection->delete_channel($self->id);
+            $self->{connection}->delete_channel($self->{id});
             $args{on_failur}->();
         },
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -134,7 +93,7 @@ sub declare_exchange {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Exchange::Declare',
         {
             type        => 'direct',
@@ -149,7 +108,7 @@ sub declare_exchange {
         'Exchange::DeclareOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -159,7 +118,7 @@ sub delete_exchange {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Exchange::Delete',
         {
             if_unused => 0,
@@ -170,7 +129,7 @@ sub delete_exchange {
         'Exchange::DeleteOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -180,7 +139,7 @@ sub declare_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Queue::Declare',
         {
             queue       => '',
@@ -196,7 +155,7 @@ sub declare_queue {
         'Queue::DeclareOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 }
 
@@ -204,7 +163,7 @@ sub bind_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Queue::Bind',
         {
             %args, # queue, exchange, routing_key
@@ -214,7 +173,7 @@ sub bind_queue {
         'Queue::BindOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -224,7 +183,7 @@ sub unbind_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Queue::Unbind',
         {
             %args, # queue, exchange, routing_key
@@ -233,7 +192,7 @@ sub unbind_queue {
         'Queue::UnbindOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -243,7 +202,7 @@ sub purge_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Queue::Purge',
         {
             %args, # queue
@@ -253,7 +212,7 @@ sub purge_queue {
         'Queue::PurgeOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -263,7 +222,7 @@ sub delete_queue {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Queue::Delete',
         {
             if_unused => 0,
@@ -275,7 +234,7 @@ sub delete_queue {
         'Queue::DeleteOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -299,9 +258,9 @@ sub publish {
 
     return $self if !$args{mandatory} && !$args{immediate};
 
-    $self->set_return_cbs(
-        ($args{exchange} || '') . '_' . $args{routing_key} => $return_cb
-    );
+    $self->{_return_cbs}->{
+        ($args{exchange} || '') . '_' . $args{routing_key}
+    } = $return_cb;
 
     return $self;
 }
@@ -310,7 +269,7 @@ sub _publish {
     my $self = shift;
     my %args = @_;
 
-    $self->connection->_push_write(
+    $self->{connection}->_push_write(
         Net::AMQP::Protocol::Basic::Publish->new(
             exchange  => '',
             mandatory => 0,
@@ -318,7 +277,7 @@ sub _publish {
             %args, # routing_key
             ticket    => 0,
         ),
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -327,7 +286,7 @@ sub _publish {
 sub _header {
     my ($self, $args, $body,) = @_;
 
-    $self->connection->_push_write(
+    $self->{connection}->_push_write(
         Net::AMQP::Frame::Header->new(
             weight       => $args->{weight} || 0,
             body_size    => length($body),
@@ -349,7 +308,7 @@ sub _header {
                 %$args,
             ),
         ),
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -358,9 +317,9 @@ sub _header {
 sub _body {
     my ($self, $body,) = @_;
 
-    $self->connection->_push_write(
+    $self->{connection}->_push_write(
         Net::AMQP::Frame::Body->new(payload => $body),
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -372,7 +331,7 @@ sub consume {
 
     my $consumer_cb = delete $args{on_consume} || sub {};
     
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Basic::Consume',
         {
             consumer_tag => '',
@@ -386,13 +345,13 @@ sub consume {
         'Basic::ConsumeOk', 
         sub {
             my $frame = shift;
-            $self->set_consumer_cbs(
-                $frame->method_frame->consumer_tag => $consumer_cb
-            );
+            $self->{_consumer_cbs}->{
+                $frame->method_frame->consumer_tag
+            } = $consumer_cb;
             $cb->($frame);
         },
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -406,9 +365,9 @@ sub cancel {
         if !defined $args{consumer_tag};
 
     return $failure_cb->('Unknown consumer_tag')
-        if !$self->get_consumer_cbs($args{consumer_tag});
+        if !$self->{_consumer_cbs}->{$args{consumer_tag}};
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Basic::Cancel',
         {
             %args, # consumer_tag
@@ -417,11 +376,11 @@ sub cancel {
         'Basic::CancelOk', 
         sub {
             my $frame = shift;
-            $self->delete_consumer_cbs($args{consumer_tag});
+            delete $self->{_consumer_cbs}->{$args{consumer_tag}};
             $cb->($frame);
         },
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -431,7 +390,7 @@ sub get {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Basic::Get',
         {
             no_ack => 1,
@@ -446,7 +405,7 @@ sub get {
             $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
         },
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -456,7 +415,7 @@ sub ack {
     my $self = shift;
     my %args = @_;
 
-    $self->connection->_push_write(
+    $self->{connection}->_push_write(
         Net::AMQP::Protocol::Basic::Ack->new(
             delivery_tag => 0,
             multiple     => (
@@ -464,7 +423,7 @@ sub ack {
             ),
             %args,
         ),
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -474,7 +433,7 @@ sub qos {
     my $self = shift;
     my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Basic::Qos',
         {
             prefetch_count => 1,
@@ -485,7 +444,7 @@ sub qos {
         'Basic::QosOk', 
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -495,12 +454,12 @@ sub recover {
     my $self = shift;
     my %args = @_;
 
-    $self->connection->_push_write(
+    $self->{connection}->_push_write(
         Net::AMQP::Protocol::Basic::Recover->new(
             requeue => 0,
             %args,
         ),
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -510,11 +469,11 @@ sub select_tx {
     my $self = shift;
     my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Tx::Select', {}, 'Tx::SelectOk',
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -524,11 +483,11 @@ sub commit_tx {
     my $self = shift;
     my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Tx::Commit', {}, 'Tx::CommitOk',
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
@@ -538,45 +497,56 @@ sub rollback_tx {
     my $self = shift;
     my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
 
-    $self->connection->_push_write_and_read(
+    $self->{connection}->_push_write_and_read(
         'Tx::Rollback', {}, 'Tx::RollbackOk',
         $cb,
         $failure_cb,
-        $self->id,
+        $self->{id},
     );
 
     return $self;
 }
 
-sub _push_queue_or_consume {
-    my ($self, $frame, $failure_cb,) = @_;
+sub push_queue_or_consume {
+    my $self = shift;
+    my ($frame, $failure_cb,) = @_;
 
     if ($frame->isa('Net::AMQP::Frame::Method')) {
         my $method_frame = $frame->method_frame;
-        if ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
-            my $cb = $self->get_consumer_cbs(
+        if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
+            $self->{connection}->_push_write(
+                Net::AMQP::Protocol::Channel::CloseOk->new(),
+                $self->{id},
+            );
+            $self->{_is_open} = 0;
+            $self->{connection}->delete_channel($self->{id});
+            $self->{on_close}->($frame);
+            return $self;
+        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
+            my $cb = $self->{_consumer_cbs}->{
                 $method_frame->consumer_tag
-            ) || sub {};
+            } || sub {};
             $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
             return $self;
-        } elsif ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
-            my $cb = $self->get_return_cbs(
+        } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
+            my $cb = $self->{_return_cbs}->{
                 $method_frame->exchange . '_' . $method_frame->routing_key
-            ) || sub {};
+            } || sub {};
             $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
             return $self;
         }
     }
 
-    $self->_queue->push($frame);
+    $self->{_queue}->push($frame);
     return $self;
 }
 
 sub _push_read_header_and_body {
-    my ($self, $type, $frame, $cb, $failure_cb,) = @_;
+    my $self = shift;
+    my ($type, $frame, $cb, $failure_cb,) = @_;
     my $response = {$type => $frame};
 
-    $self->_queue->get(sub{
+    $self->{_queue}->get(sub{
         my $frame = shift;
 
         return $failure_cb->('Received data is not header frame')
@@ -591,7 +561,7 @@ sub _push_read_header_and_body {
         $response->{header} = $header_frame;
     });
 
-    $self->_queue->get(sub{
+    $self->{_queue}->get(sub{
         my $frame = shift;
 
         return $failure_cb->('Received data is not body frame')
@@ -614,9 +584,8 @@ sub _delete_cbs {
     return $cb, $failure_cb, %args;
 }
 
-sub DEMOLISH {
-    my ($self) = @_;
-
+sub DESTROY {
+    my $self = shift;
     $self->close();
     return;
 }
diff --git a/lib/AnyEvent/RabbitMQ/LocalQueue.pm b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
index 56b0a8c..4504971 100644
--- a/lib/AnyEvent/RabbitMQ/LocalQueue.pm
+++ b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
@@ -1,59 +1,45 @@
 package AnyEvent::RabbitMQ::LocalQueue;
 
-use Moose;
-use MooseX::AttributeHelpers;
-
-our $VERSION = '0.01';
-
-has _message_queue => (
-    metaclass => 'Collection::Array',
-    is        => 'ro',
-    isa       => 'ArrayRef[Any]',
-    default   => sub {[]},
-    provides  => {
-        push  => '_push_message_queue',
-        shift => '_shift_message_queue',
-        count => '_count_message_queue',
-    },
-);
-
-has _drain_code_queue => (
-    metaclass => 'Collection::Array',
-    is        => 'ro',
-    isa       => 'ArrayRef[CodeRef]',
-    default   => sub {[]},
-    provides  => {
-        push  => '_push_drain_code_queue',
-        shift => '_shift_drain_code_queue',
-        count => '_count_drain_code_queue',
-    },
-);
-
-__PACKAGE__->meta->make_immutable;
-no Moose;
+use strict;
+use warnings;
+
+our $VERSION = '1.00';
+
+sub new {
+    my $class = shift;
+    return bless {
+        _message_queue    => [],
+        _drain_code_queue => [],
+    }, $class;
+}
 
 sub push {
     my $self = shift;
-    $self->_push_message_queue(@_);
+
+    CORE::push @{$self->{_message_queue}}, @_;
     return $self->_drain_queue();
 }
 
 sub get {
     my $self = shift;
-    $self->_push_drain_code_queue(@_);
+
+    CORE::push @{$self->{_drain_code_queue}}, @_;
     return $self->_drain_queue();
 }
 
 sub _drain_queue {
     my $self = shift;
 
-    my $count
-        = $self->_count_message_queue < $self->_count_drain_code_queue
-        ? $self->_count_message_queue : $self->_count_drain_code_queue
-        ;
+    my $message_count = scalar @{$self->{_message_queue}};
+    my $drain_code_count = scalar @{$self->{_drain_code_queue}};
+
+    my $count = $message_count < $drain_code_count
+              ? $message_count : $drain_code_count;
 
     for (1 .. $count) {
-        $self->_shift_drain_code_queue->($self->_shift_message_queue);
+        &{shift @{$self->{_drain_code_queue}}}(
+            shift @{$self->{_message_queue}}
+        );
     }
 
     return $self;
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index ee86a9c..456b16a 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -1,41 +1,32 @@
 package RabbitFoot;
 
+use strict;
+use warnings;
+
 use AnyEvent::RabbitMQ;
 use Coro;
 use Coro::AnyEvent;
 
 use RabbitFoot::Channel;
 
-use Moose;
-
-our $VERSION = '0.02';
-
-has verbose => (
-    isa => 'Bool',
-    is  => 'rw',
-);
-
-has _ar => (
-    isa     => 'AnyEvent::RabbitMQ',
-    is      => 'ro',
-);
-
-for my $method (qw(connect close)) {
-    __PACKAGE__->meta->add_method($method, sub {
-        my $self = shift;
-        $self->_do($method, @_);
-        return $self;
-    });
+our $VERSION = '1.00';
+
+BEGIN {
+    for my $method (qw(connect close)) {
+        no strict 'refs';
+        *{__PACKAGE__ . '::' . $method} = sub {
+            my $self = shift;
+            $self->_do($method, @_);
+            return $self;
+        };
+    }
 }
 
-__PACKAGE__->meta->make_immutable;
-no Moose;
-
-sub BUILD {
-    my $self = shift;
-    $self->{_ar} = AnyEvent::RabbitMQ->new(
-        verbose => $self->verbose,
-    );
+sub new {
+    my $class = shift;
+    return bless {
+        _ar => AnyEvent::RabbitMQ->new(@_),
+    }, $class;
 }
 
 sub load_xml_spec {
@@ -46,7 +37,7 @@ sub load_xml_spec {
 
 sub open_channel {
     my $self = shift;
-    return RabbitFoot::Channel->new(arc => $self->_do('open_channel', @_));
+    return RabbitFoot::Channel->new(arc => $self->_do('open_channel', @_,));
 }
 
 sub _do {
@@ -58,7 +49,7 @@ sub _do {
     $args{on_success} = sub {$cb->(1, @_);},
     $args{on_failure} = sub {$cb->(0, @_);},
 
-    $self->_ar->$method(%args);
+    $self->{_ar}->$method(%args);
     my ($is_success, @responses) = Coro::rouse_wait;
     die @responses if !$is_success;
     return @responses;
diff --git a/lib/RabbitFoot/Channel.pm b/lib/RabbitFoot/Channel.pm
index c5d2538..83eb127 100644
--- a/lib/RabbitFoot/Channel.pm
+++ b/lib/RabbitFoot/Channel.pm
@@ -1,50 +1,54 @@
 package RabbitFoot::Channel;
 
+use strict;
+use warnings;
+
 use Coro;
 use Coro::AnyEvent;
 
 use AnyEvent::RabbitMQ::Channel;
 
-use Moose;
-
-our $VERSION = '0.01';
-
-has arc => (
-    isa => 'AnyEvent::RabbitMQ::Channel',
-    is  => 'ro',
-);
-
-for my $method (qw(
-    close
-    declare_exchange delete_exchange
-    declare_queue bind_queue unbind_queue purge_queue delete_queue
-    consume cancel get qos
-    select_tx commit_tx rollback_tx
-)) {
-    __PACKAGE__->meta->add_method($method, sub {
-        my $self = shift;
-        my %args = @_;
-
-        my $cb = Coro::rouse_cb;
-        $args{on_success} = sub {$cb->(1, @_);},
-        $args{on_failure} = sub {$cb->(0, @_);},
-
-        $self->arc->$method(%args);
-        my ($is_success, @responses) = Coro::rouse_wait;
-        die @responses if !$is_success;
-        return $responses[0];
-    });
+our $VERSION = '1.00';
+
+BEGIN {
+    for my $method (qw(
+        close
+        declare_exchange delete_exchange
+        declare_queue bind_queue unbind_queue purge_queue delete_queue
+        consume cancel get qos
+        select_tx commit_tx rollback_tx
+    )) {
+        no strict 'refs';
+        *{__PACKAGE__ . '::' . $method} = sub {
+            my $self = shift;
+            my %args = @_;
+
+            my $cb = Coro::rouse_cb;
+            $args{on_success} = sub {$cb->(1, @_);},
+            $args{on_failure} = sub {$cb->(0, @_);},
+
+            $self->{arc}->$method(%args);
+            my ($is_success, @responses) = Coro::rouse_wait;
+            die @responses if !$is_success;
+            return $responses[0];
+        };
+    }
+
+    for my $method (qw(publish ack recover)) {
+        no strict 'refs';
+        *{__PACKAGE__ . '::' . $method} = sub {
+            my $self = shift;
+            $self->{arc}->$method(@_);
+            return $self;
+        };
+    }
 }
 
-for my $method (qw(publish ack recover)) {
-    __PACKAGE__->meta->add_method($method, sub {
-        my $self = shift;
-        $self->arc->$method(@_);
-        return $self;
-    });
+sub new {
+    my $class = shift;
+    return bless {
+        @_, # arc
+    }, $class;
 }
 
-__PACKAGE__->meta->make_immutable;
-no Moose;
-
 1;

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list