[libanyevent-rabbitmq-perl] 26/151: Exclude the Moose form the RabbitFoot core module.
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:01 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit b55556f94e6c6747183fe5ba04eb4d84e1cb705d
Author: cooldaemon <cooldaemon at gmail.com>
Date: Wed Feb 24 12:47:23 2010 +0900
Exclude the Moose form the RabbitFoot core module.
---
lib/AnyEvent/RabbitMQ.pm | 202 ++++++++++--------------------
lib/AnyEvent/RabbitMQ/Channel.pm | 239 ++++++++++++++++--------------------
lib/AnyEvent/RabbitMQ/LocalQueue.pm | 62 ++++------
lib/RabbitFoot.pm | 51 ++++----
lib/RabbitFoot/Channel.pm | 80 ++++++------
5 files changed, 257 insertions(+), 377 deletions(-)
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
index 96a08ff..18172ad 100644
--- a/lib/AnyEvent/RabbitMQ.pm
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -1,5 +1,8 @@
package AnyEvent::RabbitMQ;
+use strict;
+use warnings;
+
use Data::Dumper;
use List::MoreUtils qw(none);
@@ -12,62 +15,33 @@ use Net::AMQP::Common qw(:all);
use AnyEvent::RabbitMQ::Channel;
use AnyEvent::RabbitMQ::LocalQueue;
-use Moose;
-use MooseX::AttributeHelpers;
-
-our $VERSION = '0.01';
-
-has verbose => (
- isa => 'Bool',
- is => 'rw',
-);
-
-has _connect_guard => (
- isa => 'Guard',
- is => 'ro',
- clearer => 'clear_connect_guard',
-);
-
-has _handle => (
- isa => 'AnyEvent::Handle',
- is => 'ro',
- clearer => 'clear_handle',
-);
-
-has _is_open => (
- isa => 'Bool',
- is => 'ro',
- default => 0,
-);
-
-has channels => (
- metaclass => 'Collection::Hash',
- is => 'ro',
- isa => 'HashRef[AnyEvent::RabbitMQ::Channel]',
- default => sub {{}},
- provides => {
- set => 'set_channel',
- get => 'get_channel',
- delete => 'delete_channel',
- keys => 'channel_ids',
- count => 'count_channels',
- },
-);
-
-has _queue => (
- isa => 'AnyEvent::RabbitMQ::LocalQueue',
- is => 'ro',
- default => sub {
- AnyEvent::RabbitMQ::LocalQueue->new
- },
-);
-
-__PACKAGE__->meta->make_immutable;
-no Moose;
+our $VERSION = '1.00';
+
+sub new {
+ my $class = shift;
+ return bless {
+ verbose => 0,
+ @_,
+ _is_open => 0,
+ _queue => AnyEvent::RabbitMQ::LocalQueue->new,
+ _channels => {},
+ }, $class;
+}
+
+sub channels {
+ my $self = shift;
+ return $self->{_channels};
+}
+
+sub delete_channel {
+ my $self = shift;
+ my ($id) = @_;
+ return delete $self->{_channels}->{$id};
+}
sub load_xml_spec {
- my ($self, $file,) = @_;
- Net::AMQP::Protocol->load_xml_spec($file); # die when fail in this line.
+ my $self = shift;
+ Net::AMQP::Protocol->load_xml_spec(@_); # die when fail in this line.
return $self;
}
@@ -79,7 +53,7 @@ sub connect {
$args{on_read_failure} ||= sub {die @_};
$args{timeout} ||= 0;
- if ($self->verbose) {
+ if ($self->{verbose}) {
print STDERR 'connect to ', $args{host}, ':', $args{port}, '...', "\n";
}
@@ -93,7 +67,7 @@ sub connect {
fh => $fh,
on_error => sub {
my ($handle, $fatal, $message) = @_;
- $self->clear_handle;
+ delete $self->{_handle};
$args{on_failure}->($message);
}
);
@@ -111,9 +85,9 @@ sub connect {
sub _read_loop {
my ($self, $close_cb, $failure_cb,) = @_;
- return if !$self->_handle;
+ return if !defined $self->{_handle}; # called on_error
- $self->_handle->push_read(chunk => 8, sub {
+ $self->{_handle}->push_read(chunk => 8, sub {
my $data = $_[1];
my $stack = $_[1];
@@ -130,11 +104,11 @@ sub _read_loop {
goto &_read_loop;
}
- $self->_handle->push_read(chunk => $length, sub {
+ $self->{_handle}->push_read(chunk => $length, sub {
$stack .= $_[1];
my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
- if ($self->verbose) {
+ if ($self->{verbose}) {
print STDERR '[C] <-- [S] ' . Dumper($frame);
print STDERR '-----------', "\n";
}
@@ -146,15 +120,11 @@ sub _read_loop {
if (0 == $id) {
return if !$self->_check_close_and_clean($frame, $close_cb, $id,);
- $self->_queue->push($frame);
+ $self->{_queue}->push($frame);
} else {
- my $channel = $self->get_channel($id);
- if ($channel) {
- if (
- $self->_check_channel_close_and_clean($frame, $id, $channel)
- ) {
- $channel->_push_queue_or_consume($frame, $failure_cb);
- }
+ my $channel = $self->{_channels}->{$id};
+ if (defined $channel) {
+ $channel->push_queue_or_consume($frame, $failure_cb);
} else {
$failure_cb->('Unknown channel id: ' . $frame->channel);
}
@@ -169,7 +139,8 @@ sub _read_loop {
}
sub _check_close_and_clean {
- my ($self, $frame, $close_cb, $id,) = @_;
+ my $self = shift;
+ my ($frame, $close_cb, $id,) = @_;
return 1 if !$frame->isa('Net::AMQP::Frame::Method');
@@ -183,30 +154,15 @@ sub _check_close_and_clean {
return;
}
-sub _check_channel_close_and_clean {
- my ($self, $frame, $id, $channel,) = @_;
-
- return 1 if !$frame->isa('Net::AMQP::Frame::Method');
-
- my $method_frame = $frame->method_frame;
- return 1 if !$method_frame->isa('Net::AMQP::Protocol::Channel::Close');
-
- $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
- $channel->{_is_open} = 0;
- $channel->on_close->($frame);
- $self->delete_channel($id);
- return;
-}
-
sub _start {
my $self = shift;
my %args = @_;
- if ($self->verbose) {
+ if ($self->{verbose}) {
print STDERR 'post header', "\n";
}
- $self->_handle->push_write(Net::AMQP::Protocol->header);
+ $self->{_handle}->push_write(Net::AMQP::Protocol->header);
$self->_push_read_and_valid(
'Connection::Start',
@@ -311,15 +267,12 @@ sub close {
return $self;
};
-# if (0 == $self->count_channels) {
- if (0 == scalar keys %{$self->channels}) { # FIXME
+ if (0 == scalar keys %{$self->{_channels}}) {
return $close_cb->();
}
-# for my $id ($self->channel_ids) {
- for my $id (keys %{$self->channels}) { # FIXME
-# $self->get_channel($id)->close(
- $self->channels->{$id}->close( # FIXME
+ for my $id (keys %{$self->{_channels}}) {
+ $self->{_channels}->{$id}->close(
on_success => $close_cb,
on_failure => sub {
$close_cb->();
@@ -332,9 +285,10 @@ sub close {
}
sub _close {
- my ($self, $cb, $failure_cb,) = @_;
+ my $self = shift;
+ my ($cb, $failure_cb,) = @_;
- return $self if !$self->_is_open || 0 < $self->count_channels;
+ return $self if !$self->{_is_open} || 0 < scalar keys %{$self->{_channels}};
$self->_push_write_and_read(
'Connection::Close', {}, 'Connection::CloseOk',
@@ -346,12 +300,12 @@ sub _close {
}
sub _disconnect {
- my ($self,) = @_;
+ my $self = shift;
- $self->clear_connect_guard;
- $self->clear_handle;
+ delete $self->{_handle};
+ delete $self->{_connect_guard};
- return;
+ return $self;
}
sub open_channel {
@@ -362,11 +316,11 @@ sub open_channel {
my $id = $args{id};
return $args{on_failure}->("Channel id $id is already in use")
- if $id && $self->get_channel($id);
+ if $id && $self->{_channels}->{$id};
if (!$id) {
for my $candidate_id (1 .. (2**16 - 1)) { # FIXME
- next if $self->get_channel($candidate_id);
+ next if defined $self->{_channels}->{$candidate_id};
$id = $candidate_id;
last;
}
@@ -379,7 +333,7 @@ sub open_channel {
on_close => $args{on_close},
);
- $self->set_channel($id => $channel);
+ $self->{_channels}->{$id} = $channel;
$channel->open(
on_success => sub {
@@ -395,7 +349,8 @@ sub open_channel {
}
sub _push_write_and_read {
- my ($self, $method, $args, $exp, $cb, $failure_cb, $id,) = @_;
+ my $self = shift;
+ my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;
$method = 'Net::AMQP::Protocol::' . $method;
$self->_push_write(
@@ -409,10 +364,11 @@ sub _push_write_and_read {
}
sub _push_read_and_valid {
- my ($self, $exp, $cb, $failure_cb, $id,) = @_;
+ my $self = shift;
+ my ($exp, $cb, $failure_cb, $id,) = @_;
$exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
- my $queue = $id ? $self->get_channel($id)->_queue : $self->_queue;
+ my $queue = $id ? $self->{_channels}->{$id}->queue : $self->{_queue};
$queue->get(sub {
my $frame = shift;
@@ -433,45 +389,20 @@ sub _push_read_and_valid {
});
}
-sub _push_read {
- my ($self, $cb, $failure_cb) = @_;
-
- $self->_handle->push_read(chunk => 8, sub {
- my $data = $_[1];
- my $stack = $_[1];
- return $failure_cb->('Disconnect') if length($data) <= 0;
-
- my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
- return $failure_cb->('Broken data was received')
- if !defined $type_id || !defined $channel || !defined $length;
-
- $self->_handle->push_read(chunk => $length, sub {
- $stack .= $_[1];
- my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
-
- if ($self->verbose) {
- print STDERR '[C] <-- [S] ' . Dumper($frame);
- print STDERR '-----------', "\n";
- }
-
- $cb->($frame);
- });
- });
-}
-
sub _push_write {
- my ($self, $output, $id,) = @_;
+ my $self = shift;
+ my ($output, $id,) = @_;
if ($output->isa('Net::AMQP::Protocol::Base')) {
$output = $output->frame_wrap;
}
$output->channel($id || 0);
- if ($self->verbose) {
+ if ($self->{verbose}) {
print STDERR '[C] --> [S] ', Dumper($output), "\n";
}
- $self->_handle->push_write($output->to_raw_frame());
+ $self->{_handle}->push_write($output->to_raw_frame());
return;
}
@@ -485,9 +416,8 @@ sub _set_cbs {
return %args;
}
-sub DEMOLISH {
- my ($self) = @_;
-
+sub DESTROY {
+ my $self = shift;
$self->close();
return;
}
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
index 79c4188..dc52f72 100644
--- a/lib/AnyEvent/RabbitMQ/Channel.pm
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -1,84 +1,43 @@
package AnyEvent::RabbitMQ::Channel;
+use strict;
+use warnings;
+
+use Scalar::Util qw(weaken);
use AnyEvent::RabbitMQ::LocalQueue;
-use Moose;
-
-our $VERSION = '0.01';
-
-has id => (
- isa => 'Int',
- is => 'rw',
- required => 1,
-);
-
-has connection => (
- isa => 'AnyEvent::RabbitMQ',
- is => 'rw',
- required => 1,
- weak_ref => 1,
-);
-
-has on_close => (
-# isa => 'CodeRef',
- is => 'rw',
- required => 1,
-);
-
-has _is_open => (
- isa => 'Bool',
- is => 'rw',
- default => 0,
-);
-
-has _queue => (
- isa => 'AnyEvent::RabbitMQ::LocalQueue',
- is => 'ro',
- default => sub {
- AnyEvent::RabbitMQ::LocalQueue->new
- },
-);
-
-has consumer_cbs => (
- metaclass => 'Collection::Hash',
- is => 'ro',
- isa => 'HashRef[CodeRef]',
- default => sub {{}},
- provides => {
- set => 'set_consumer_cbs',
- get => 'get_consumer_cbs',
- delete => 'delete_consumer_cbs',
- keys => 'consumer_tags',
- count => 'count_consumer_cbs',
- },
-);
-
-has return_cbs => (
- metaclass => 'Collection::Hash',
- is => 'ro',
- isa => 'HashRef[CodeRef]',
- default => sub {{}},
- provides => {
- set => 'set_return_cbs',
- get => 'get_return_cbs',
- },
-);
-
-__PACKAGE__->meta->make_immutable;
-no Moose;
+our $VERSION = '1.00';
+
+sub new {
+ my $class = shift;
+ my $self = bless {
+ @_, # id, connection, on_close
+ _is_open => 0,
+ _queue => AnyEvent::RabbitMQ::LocalQueue->new,
+ _consumer_cbs => {},
+ _return_cbs => {},
+ }, $class;
+ weaken($self->{connection});
+ return $self;
+}
+
+sub queue {
+ my $self = shift;
+ return $self->{_queue};
+}
sub open {
my $self = shift;
my %args = @_;
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Channel::Open', {}, 'Channel::OpenOk',
sub {
$self->{_is_open} = 1;
$args{on_success}->();
},
$args{on_failur},
- $self->id,
+ $self->{id},
);
return $self;
@@ -86,13 +45,13 @@ sub open {
sub close {
my $self = shift;
- my %args = $self->connection->_set_cbs(@_);
+ my %args = $self->{connection}->_set_cbs(@_);
- return $self if !$self->_is_open;
+ return $self if !$self->{_is_open};
- return $self->_close(%args) if 0 == $self->count_consumer_cbs;
+ return $self->_close(%args) if 0 == scalar keys %{$self->{_consumer_cbs}};
- for my $consumer_tag ($self->consumer_tags) {
+ for my $consumer_tag (keys %{$self->{_consumer_cbs}}) {
$self->cancel(
consumer_tag => $consumer_tag,
on_success => sub {
@@ -112,19 +71,19 @@ sub _close {
my $self = shift;
my %args = @_;
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Channel::Close', {}, 'Channel::CloseOk',
sub {
$self->{_is_open} = 0;
- $self->connection->delete_channel($self->id);
+ $self->{connection}->delete_channel($self->{id});
$args{on_success}->();
},
sub {
$self->{_is_open} = 0;
- $self->connection->delete_channel($self->id);
+ $self->{connection}->delete_channel($self->{id});
$args{on_failur}->();
},
- $self->id,
+ $self->{id},
);
return $self;
@@ -134,7 +93,7 @@ sub declare_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Exchange::Declare',
{
type => 'direct',
@@ -149,7 +108,7 @@ sub declare_exchange {
'Exchange::DeclareOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -159,7 +118,7 @@ sub delete_exchange {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Exchange::Delete',
{
if_unused => 0,
@@ -170,7 +129,7 @@ sub delete_exchange {
'Exchange::DeleteOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -180,7 +139,7 @@ sub declare_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Queue::Declare',
{
queue => '',
@@ -196,7 +155,7 @@ sub declare_queue {
'Queue::DeclareOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
}
@@ -204,7 +163,7 @@ sub bind_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Queue::Bind',
{
%args, # queue, exchange, routing_key
@@ -214,7 +173,7 @@ sub bind_queue {
'Queue::BindOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -224,7 +183,7 @@ sub unbind_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Queue::Unbind',
{
%args, # queue, exchange, routing_key
@@ -233,7 +192,7 @@ sub unbind_queue {
'Queue::UnbindOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -243,7 +202,7 @@ sub purge_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Queue::Purge',
{
%args, # queue
@@ -253,7 +212,7 @@ sub purge_queue {
'Queue::PurgeOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -263,7 +222,7 @@ sub delete_queue {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Queue::Delete',
{
if_unused => 0,
@@ -275,7 +234,7 @@ sub delete_queue {
'Queue::DeleteOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -299,9 +258,9 @@ sub publish {
return $self if !$args{mandatory} && !$args{immediate};
- $self->set_return_cbs(
- ($args{exchange} || '') . '_' . $args{routing_key} => $return_cb
- );
+ $self->{_return_cbs}->{
+ ($args{exchange} || '') . '_' . $args{routing_key}
+ } = $return_cb;
return $self;
}
@@ -310,7 +269,7 @@ sub _publish {
my $self = shift;
my %args = @_;
- $self->connection->_push_write(
+ $self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Publish->new(
exchange => '',
mandatory => 0,
@@ -318,7 +277,7 @@ sub _publish {
%args, # routing_key
ticket => 0,
),
- $self->id,
+ $self->{id},
);
return $self;
@@ -327,7 +286,7 @@ sub _publish {
sub _header {
my ($self, $args, $body,) = @_;
- $self->connection->_push_write(
+ $self->{connection}->_push_write(
Net::AMQP::Frame::Header->new(
weight => $args->{weight} || 0,
body_size => length($body),
@@ -349,7 +308,7 @@ sub _header {
%$args,
),
),
- $self->id,
+ $self->{id},
);
return $self;
@@ -358,9 +317,9 @@ sub _header {
sub _body {
my ($self, $body,) = @_;
- $self->connection->_push_write(
+ $self->{connection}->_push_write(
Net::AMQP::Frame::Body->new(payload => $body),
- $self->id,
+ $self->{id},
);
return $self;
@@ -372,7 +331,7 @@ sub consume {
my $consumer_cb = delete $args{on_consume} || sub {};
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Basic::Consume',
{
consumer_tag => '',
@@ -386,13 +345,13 @@ sub consume {
'Basic::ConsumeOk',
sub {
my $frame = shift;
- $self->set_consumer_cbs(
- $frame->method_frame->consumer_tag => $consumer_cb
- );
+ $self->{_consumer_cbs}->{
+ $frame->method_frame->consumer_tag
+ } = $consumer_cb;
$cb->($frame);
},
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -406,9 +365,9 @@ sub cancel {
if !defined $args{consumer_tag};
return $failure_cb->('Unknown consumer_tag')
- if !$self->get_consumer_cbs($args{consumer_tag});
+ if !$self->{_consumer_cbs}->{$args{consumer_tag}};
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Basic::Cancel',
{
%args, # consumer_tag
@@ -417,11 +376,11 @@ sub cancel {
'Basic::CancelOk',
sub {
my $frame = shift;
- $self->delete_consumer_cbs($args{consumer_tag});
+ delete $self->{_consumer_cbs}->{$args{consumer_tag}};
$cb->($frame);
},
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -431,7 +390,7 @@ sub get {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Basic::Get',
{
no_ack => 1,
@@ -446,7 +405,7 @@ sub get {
$self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
},
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -456,7 +415,7 @@ sub ack {
my $self = shift;
my %args = @_;
- $self->connection->_push_write(
+ $self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Ack->new(
delivery_tag => 0,
multiple => (
@@ -464,7 +423,7 @@ sub ack {
),
%args,
),
- $self->id,
+ $self->{id},
);
return $self;
@@ -474,7 +433,7 @@ sub qos {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Basic::Qos',
{
prefetch_count => 1,
@@ -485,7 +444,7 @@ sub qos {
'Basic::QosOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -495,12 +454,12 @@ sub recover {
my $self = shift;
my %args = @_;
- $self->connection->_push_write(
+ $self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Recover->new(
requeue => 0,
%args,
),
- $self->id,
+ $self->{id},
);
return $self;
@@ -510,11 +469,11 @@ sub select_tx {
my $self = shift;
my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Tx::Select', {}, 'Tx::SelectOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -524,11 +483,11 @@ sub commit_tx {
my $self = shift;
my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Tx::Commit', {}, 'Tx::CommitOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
@@ -538,45 +497,56 @@ sub rollback_tx {
my $self = shift;
my ($cb, $failure_cb,) = $self->_delete_cbs(@_);
- $self->connection->_push_write_and_read(
+ $self->{connection}->_push_write_and_read(
'Tx::Rollback', {}, 'Tx::RollbackOk',
$cb,
$failure_cb,
- $self->id,
+ $self->{id},
);
return $self;
}
-sub _push_queue_or_consume {
- my ($self, $frame, $failure_cb,) = @_;
+sub push_queue_or_consume {
+ my $self = shift;
+ my ($frame, $failure_cb,) = @_;
if ($frame->isa('Net::AMQP::Frame::Method')) {
my $method_frame = $frame->method_frame;
- if ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
- my $cb = $self->get_consumer_cbs(
+ if ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
+ $self->{connection}->_push_write(
+ Net::AMQP::Protocol::Channel::CloseOk->new(),
+ $self->{id},
+ );
+ $self->{_is_open} = 0;
+ $self->{connection}->delete_channel($self->{id});
+ $self->{on_close}->($frame);
+ return $self;
+ } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
+ my $cb = $self->{_consumer_cbs}->{
$method_frame->consumer_tag
- ) || sub {};
+ } || sub {};
$self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
return $self;
- } elsif ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
- my $cb = $self->get_return_cbs(
+ } elsif ($method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
+ my $cb = $self->{_return_cbs}->{
$method_frame->exchange . '_' . $method_frame->routing_key
- ) || sub {};
+ } || sub {};
$self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
return $self;
}
}
- $self->_queue->push($frame);
+ $self->{_queue}->push($frame);
return $self;
}
sub _push_read_header_and_body {
- my ($self, $type, $frame, $cb, $failure_cb,) = @_;
+ my $self = shift;
+ my ($type, $frame, $cb, $failure_cb,) = @_;
my $response = {$type => $frame};
- $self->_queue->get(sub{
+ $self->{_queue}->get(sub{
my $frame = shift;
return $failure_cb->('Received data is not header frame')
@@ -591,7 +561,7 @@ sub _push_read_header_and_body {
$response->{header} = $header_frame;
});
- $self->_queue->get(sub{
+ $self->{_queue}->get(sub{
my $frame = shift;
return $failure_cb->('Received data is not body frame')
@@ -614,9 +584,8 @@ sub _delete_cbs {
return $cb, $failure_cb, %args;
}
-sub DEMOLISH {
- my ($self) = @_;
-
+sub DESTROY {
+ my $self = shift;
$self->close();
return;
}
diff --git a/lib/AnyEvent/RabbitMQ/LocalQueue.pm b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
index 56b0a8c..4504971 100644
--- a/lib/AnyEvent/RabbitMQ/LocalQueue.pm
+++ b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
@@ -1,59 +1,45 @@
package AnyEvent::RabbitMQ::LocalQueue;
-use Moose;
-use MooseX::AttributeHelpers;
-
-our $VERSION = '0.01';
-
-has _message_queue => (
- metaclass => 'Collection::Array',
- is => 'ro',
- isa => 'ArrayRef[Any]',
- default => sub {[]},
- provides => {
- push => '_push_message_queue',
- shift => '_shift_message_queue',
- count => '_count_message_queue',
- },
-);
-
-has _drain_code_queue => (
- metaclass => 'Collection::Array',
- is => 'ro',
- isa => 'ArrayRef[CodeRef]',
- default => sub {[]},
- provides => {
- push => '_push_drain_code_queue',
- shift => '_shift_drain_code_queue',
- count => '_count_drain_code_queue',
- },
-);
-
-__PACKAGE__->meta->make_immutable;
-no Moose;
+use strict;
+use warnings;
+
+our $VERSION = '1.00';
+
+sub new {
+ my $class = shift;
+ return bless {
+ _message_queue => [],
+ _drain_code_queue => [],
+ }, $class;
+}
sub push {
my $self = shift;
- $self->_push_message_queue(@_);
+
+ CORE::push @{$self->{_message_queue}}, @_;
return $self->_drain_queue();
}
sub get {
my $self = shift;
- $self->_push_drain_code_queue(@_);
+
+ CORE::push @{$self->{_drain_code_queue}}, @_;
return $self->_drain_queue();
}
sub _drain_queue {
my $self = shift;
- my $count
- = $self->_count_message_queue < $self->_count_drain_code_queue
- ? $self->_count_message_queue : $self->_count_drain_code_queue
- ;
+ my $message_count = scalar @{$self->{_message_queue}};
+ my $drain_code_count = scalar @{$self->{_drain_code_queue}};
+
+ my $count = $message_count < $drain_code_count
+ ? $message_count : $drain_code_count;
for (1 .. $count) {
- $self->_shift_drain_code_queue->($self->_shift_message_queue);
+ &{shift @{$self->{_drain_code_queue}}}(
+ shift @{$self->{_message_queue}}
+ );
}
return $self;
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index ee86a9c..456b16a 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -1,41 +1,32 @@
package RabbitFoot;
+use strict;
+use warnings;
+
use AnyEvent::RabbitMQ;
use Coro;
use Coro::AnyEvent;
use RabbitFoot::Channel;
-use Moose;
-
-our $VERSION = '0.02';
-
-has verbose => (
- isa => 'Bool',
- is => 'rw',
-);
-
-has _ar => (
- isa => 'AnyEvent::RabbitMQ',
- is => 'ro',
-);
-
-for my $method (qw(connect close)) {
- __PACKAGE__->meta->add_method($method, sub {
- my $self = shift;
- $self->_do($method, @_);
- return $self;
- });
+our $VERSION = '1.00';
+
+BEGIN {
+ for my $method (qw(connect close)) {
+ no strict 'refs';
+ *{__PACKAGE__ . '::' . $method} = sub {
+ my $self = shift;
+ $self->_do($method, @_);
+ return $self;
+ };
+ }
}
-__PACKAGE__->meta->make_immutable;
-no Moose;
-
-sub BUILD {
- my $self = shift;
- $self->{_ar} = AnyEvent::RabbitMQ->new(
- verbose => $self->verbose,
- );
+sub new {
+ my $class = shift;
+ return bless {
+ _ar => AnyEvent::RabbitMQ->new(@_),
+ }, $class;
}
sub load_xml_spec {
@@ -46,7 +37,7 @@ sub load_xml_spec {
sub open_channel {
my $self = shift;
- return RabbitFoot::Channel->new(arc => $self->_do('open_channel', @_));
+ return RabbitFoot::Channel->new(arc => $self->_do('open_channel', @_,));
}
sub _do {
@@ -58,7 +49,7 @@ sub _do {
$args{on_success} = sub {$cb->(1, @_);},
$args{on_failure} = sub {$cb->(0, @_);},
- $self->_ar->$method(%args);
+ $self->{_ar}->$method(%args);
my ($is_success, @responses) = Coro::rouse_wait;
die @responses if !$is_success;
return @responses;
diff --git a/lib/RabbitFoot/Channel.pm b/lib/RabbitFoot/Channel.pm
index c5d2538..83eb127 100644
--- a/lib/RabbitFoot/Channel.pm
+++ b/lib/RabbitFoot/Channel.pm
@@ -1,50 +1,54 @@
package RabbitFoot::Channel;
+use strict;
+use warnings;
+
use Coro;
use Coro::AnyEvent;
use AnyEvent::RabbitMQ::Channel;
-use Moose;
-
-our $VERSION = '0.01';
-
-has arc => (
- isa => 'AnyEvent::RabbitMQ::Channel',
- is => 'ro',
-);
-
-for my $method (qw(
- close
- declare_exchange delete_exchange
- declare_queue bind_queue unbind_queue purge_queue delete_queue
- consume cancel get qos
- select_tx commit_tx rollback_tx
-)) {
- __PACKAGE__->meta->add_method($method, sub {
- my $self = shift;
- my %args = @_;
-
- my $cb = Coro::rouse_cb;
- $args{on_success} = sub {$cb->(1, @_);},
- $args{on_failure} = sub {$cb->(0, @_);},
-
- $self->arc->$method(%args);
- my ($is_success, @responses) = Coro::rouse_wait;
- die @responses if !$is_success;
- return $responses[0];
- });
+our $VERSION = '1.00';
+
+BEGIN {
+ for my $method (qw(
+ close
+ declare_exchange delete_exchange
+ declare_queue bind_queue unbind_queue purge_queue delete_queue
+ consume cancel get qos
+ select_tx commit_tx rollback_tx
+ )) {
+ no strict 'refs';
+ *{__PACKAGE__ . '::' . $method} = sub {
+ my $self = shift;
+ my %args = @_;
+
+ my $cb = Coro::rouse_cb;
+ $args{on_success} = sub {$cb->(1, @_);},
+ $args{on_failure} = sub {$cb->(0, @_);},
+
+ $self->{arc}->$method(%args);
+ my ($is_success, @responses) = Coro::rouse_wait;
+ die @responses if !$is_success;
+ return $responses[0];
+ };
+ }
+
+ for my $method (qw(publish ack recover)) {
+ no strict 'refs';
+ *{__PACKAGE__ . '::' . $method} = sub {
+ my $self = shift;
+ $self->{arc}->$method(@_);
+ return $self;
+ };
+ }
}
-for my $method (qw(publish ack recover)) {
- __PACKAGE__->meta->add_method($method, sub {
- my $self = shift;
- $self->arc->$method(@_);
- return $self;
- });
+sub new {
+ my $class = shift;
+ return bless {
+ @_, # arc
+ }, $class;
}
-__PACKAGE__->meta->make_immutable;
-no Moose;
-
1;
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list