[libanyevent-rabbitmq-perl] 11/151: Added AnyEvent::RabbitMQ. Fixed a bug.

Damyan Ivanov dmn at moszumanska.debian.org
Thu Jan 16 11:03:00 UTC 2014


This is an automated email from the git hooks/post-receive script.

dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.

commit 7283cd69b00e33e2ebb4e1826241a966183f7fb2
Author: cooldaemon <cooldaemon at gmail.com>
Date:   Sun Feb 7 21:30:30 2010 +0900

    Added AnyEvent::RabbitMQ. Fixed a bug.
---
 Makefile.PL                         |   2 +
 lib/AnyEvent/RabbitMQ.pm            | 550 ++++++++++++++++++++++++++++++++
 lib/AnyEvent/RabbitMQ/Channel.pm    | 618 ++++++++++++++++++++++++++++++++++++
 lib/AnyEvent/RabbitMQ/LocalQueue.pm |  63 ++++
 lib/RabbitFoot.pm                   |   6 +-
 t/00_compile.t                      |   6 +-
 t/01_localqueue.t                   |  34 ++
 7 files changed, 1275 insertions(+), 4 deletions(-)

diff --git a/Makefile.PL b/Makefile.PL
index 36a390d..81d4b82 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -3,6 +3,7 @@ name 'RabbitFoot';
 all_from 'lib/RabbitFoot.pm';
 
 requires 'Moose';
+requires 'MooseX::AttributeHelpers';
 requires 'MooseX::App::Cmd';
 requires 'MooseX::ConfigFromFile';
 requires 'Config::Any';
@@ -10,6 +11,7 @@ requires 'JSON::Syck';
 requires 'List::MoreUtils';
 requires 'Sys::SigAction';
 requires 'Net::AMQP';
+requires 'AnyEvent';
 
 tests 't/*.t';
 author_tests 'xt';
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
new file mode 100644
index 0000000..d0aebf4
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -0,0 +1,550 @@
+package AnyEvent::RabbitMQ;
+
+use Data::Dumper;
+use List::MoreUtils qw(none);
+use AnyEvent::Handle;
+use AnyEvent::Socket;
+use Net::AMQP;
+use Net::AMQP::Common qw(:all);
+
+use AnyEvent::RabbitMQ::Channel;
+use AnyEvent::RabbitMQ::LocalQueue;
+
+use Moose;
+use MooseX::AttributeHelpers;
+
+our $VERSION = '0.01';
+
+has verbose => (
+    isa => 'Bool',
+    is  => 'rw',
+);
+
+has timeout => (
+    isa     => 'Int',
+    is      => 'rw',
+    default => 1,
+);
+
+has _connect_guard => (
+    isa     => 'Guard',
+    is      => 'ro',
+    clearer => 'clear_connect_guard',
+);
+
+has _handle => (
+    isa     => 'AnyEvent::Handle',
+    is      => 'ro',
+    clearer => 'clear_handle',
+);
+
+has _is_open => (
+    isa     => 'Bool',
+    is      => 'ro',
+    default => 0,
+);
+
+has channels => (
+    metaclass => 'Collection::Hash',
+    is        => 'ro',
+    isa       => 'HashRef[AnyEvent::RabbitMQ::Channel]',
+    default   => sub {{}},
+    provides  => {
+        set    => 'set_channel',
+        get    => 'get_channel',
+        empty  => 'has_channels',
+        delete => 'delete_channel',
+        keys   => 'channel_ids',
+        count  => 'count_channels',
+    },
+);
+
+has _queue => (
+    isa     => 'AnyEvent::RabbitMQ::LocalQueue',
+    is      => 'ro',
+    default => sub {
+        AnyEvent::RabbitMQ::LocalQueue->new
+    },
+);
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+sub load_xml_spec {
+    my ($self, $file,) = @_;
+    Net::AMQP::Protocol->load_xml_spec($file); # die when fail in this line.
+    return $self;
+}
+
+sub connect {
+    my ($self, $args,) = @_;
+
+    $args->{on_success} ||= sub {};
+    $args->{on_failure} ||= sub {die @_};
+
+    if ($self->verbose) {
+        print STDERR 'connect to ', $args->{host}, ':', $args->{port}, '...', "\n";
+    }
+
+    $self->{_connect_guard} = AnyEvent::Socket::tcp_connect(
+        $args->{host},
+        $args->{port},
+        sub {
+            my $fh = shift
+                or return $args->{on_failure}->('Error connecting to AMQP Server!');
+            $self->{_handle} = AnyEvent::Handle->new(
+                fh       => $fh,
+                on_error => sub {
+                    my ($handle, $fatal, $message) = @_;
+                    $self->clear_handle;
+                    $args->{on_failure}->($message);
+                }
+            );
+            $self->_read_loop($args->{on_failure});
+            $self->_start($args,);
+        },
+        sub {
+            return $self->timeout;
+        },
+    );
+
+    return $self;
+}
+
+sub _read_loop {
+    my ($self, $failure_cb,) = @_;
+
+    return if !$self->_handle;
+
+    $self->_handle->push_read(chunk => 8, sub {
+        my $data = $_[1];
+        my $stack = $_[1];
+
+        if (length($data) <= 0) {
+            $failure_cb->('Disconnect');
+            @_ = ($self, $failure_cb,);
+            goto &_read_loop;
+        }
+
+        my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
+        if (!defined $type_id || !defined $channel || !defined $length) {
+            $failure_cb->('Broken data was received');
+            @_ = ($self, $failure_cb,);
+            goto &_read_loop;
+        }
+
+        $self->_handle->push_read(chunk => $length, sub {
+            $stack .= $_[1];
+            my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
+
+            if ($self->verbose) {
+                print STDERR '[C] <-- [S] ' . Dumper($frame);
+                print STDERR '-----------', "\n";
+            }
+
+            return if !$self->_check_close_and_clean($frame, $failure_cb,);
+
+            my $id = $frame->channel;
+            if (0 == $id) {
+                $self->_queue->push($frame);
+            } elsif ($self->has_channels($id)) {
+                $self->get_channel($id)->_push_queue_or_consume($frame, $failure_cb);
+            } else {
+                $failure_cb->('Unknown channel id: ' . $frame->channel);
+            }
+
+            @_ = ($self, $failure_cb,);
+            goto &_read_loop;
+        });
+    });
+
+    return $self;
+}
+
+sub _check_close_and_clean {
+    my ($self, $frame, $failure_cb, $id,) = @_;
+
+    return 1 if !$frame->isa('Net::AMQP::Frame::Method');
+
+    my $method_frame = $frame->method_frame;
+
+    if ($method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
+        $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
+        $self->{_is_open} = 0;
+        $self->_disconnect();
+        $failure_cb->(
+            $method_frame->reply_code . ' ' . $method_frame->reply_text
+        );
+        return;
+    } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
+        $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
+
+        my $id = $frame->channel;
+        my $message = $method_frame->reply_code . ' ' . $method_frame->reply_text;
+
+        my $channel = $self->get_channel($id);
+        if (!$channel) {
+            $failure_cb->("Unknown channel id: ${id}\n${message}");
+            return;
+        }
+
+        $channel->_is_open(0);
+        $self->delete_channel($id);
+        $failure_cb->($message);
+        return;
+    }
+
+    return 1;
+}
+
+sub _start {
+    my ($self, $args,) = @_;
+
+    if ($self->verbose) {
+        print STDERR 'post header', "\n";
+    }
+
+    $self->_handle->push_write(Net::AMQP::Protocol->header);
+
+    $self->_push_read_and_valid(
+        'Connection::Start',
+        sub {
+            my $frame = shift;
+
+            my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
+            return $args->{on_failure}->('AMQPLAIN is not found in mechanisms')
+                if none {$_ eq 'AMQPLAIN'} @mechanisms;
+
+            my @locales = split /\s/, $frame->method_frame->locales;
+            return $args->{on_failure}->('en_US is not found in locales')
+                if none {$_ eq 'en_US'} @locales;
+
+            $self->_push_write(
+                Net::AMQP::Protocol::Connection::StartOk->new(
+                    client_properties => {
+                        platform    => 'Perl',
+                        product     => __PACKAGE__,
+                        information => 'http://d.hatena.ne.jp/cooldaemon/',
+                        version     => '0.01',
+                    },
+                    mechanism => 'AMQPLAIN',
+                    response => {
+                        LOGIN    => $args->{user},
+                        PASSWORD => $args->{pass},
+                    },
+                    locale => 'en_US',
+                ),
+            );
+
+            $self->_tune($args,);
+        },
+        $args->{on_failure},
+    );
+
+    return $self;
+}
+
+sub _tune {
+    my ($self, $args,) = @_;
+
+    $self->_push_read_and_valid(
+        'Connection::Tune',
+        sub {
+            my $frame = shift;
+
+            $self->_push_write(
+                Net::AMQP::Protocol::Connection::TuneOk->new(
+                    channel_max => $frame->method_frame->channel_max,
+                    frame_max   => $frame->method_frame->frame_max,
+                    heartbeat   => $frame->method_frame->heartbeat,
+                ),
+            );
+
+            $self->_open($args,);
+        },
+        $args->{on_failure},
+    );
+
+    return $self;
+}
+
+sub _open {
+    my ($self, $args,) = @_;
+
+    $self->_push_write_and_read(
+        'Connection::Open',
+        {
+            virtual_host => $args->{vhost},
+            capabilities => '',
+            insist       => 1,
+        },
+        'Connection::OpenOk', 
+        sub {
+            $self->{_is_open} = 1;
+            $args->{on_success}->();
+        },
+        $args->{on_failure},
+    );
+
+    return $self;
+}
+
+sub close {
+    my ($self, $args,) = @_;
+
+    $args->{on_success} ||= sub {};
+    $args->{on_failure}  ||= sub {die @_};
+
+    my $close_cb = sub {
+        $self->_close(
+            sub {
+                $self->_disconnect();
+                $args->{on_success}->(@_);
+            },
+            sub {
+                $self->_disconnect();
+                $args->{on_failure}->(@_);
+            }
+        );
+        return $self;
+    };
+
+#   if (0 == $self->count_channels) {
+    if (0 == scalar keys %{$self->channels}) { # FIXME
+        return $close_cb->();
+    }
+
+#    for my $id ($self->channel_ids) {
+    for my $id (keys %{$self->channels}) { # FIXME
+#        $self->get_channel($id)->close({
+         $self->channels->{$id}->close({ # FIXME
+            on_success => $close_cb,
+            on_failure => sub {
+                $close_cb->();
+                $args->{on_failure}->(@_);
+            },
+        });
+    }
+
+    return $self;
+}
+
+sub _close {
+    my ($self, $cb, $failure_cb,) = @_;
+
+    return $self if !$self->_is_open || 0 < $self->count_channels;
+
+    $self->_push_write_and_read(
+        'Connection::Close', {}, 'Connection::CloseOk',
+        $cb, $failure_cb,
+    );
+    $self->{_is_open} = 0;
+
+    return $self;
+}
+
+sub _disconnect {
+    my ($self,) = @_;
+
+    $self->clear_connect_guard;
+    $self->clear_handle;
+
+    return;
+}
+
+sub open_channel {
+    my ($self, $args,) = @_;
+
+    $args->{on_success} ||= sub {};
+    $args->{on_failure} ||= sub {die @_};
+
+    my $id = $args->{id};
+    return $args->{on_failure}->("Channel id $id is already in use")
+        if $id && $self->has_channels($id);
+
+    if (!$id) {
+        for my $candidate_id (1 .. (2**16 - 1)) { # FIXME
+            next if $self->has_channels($candidate_id);
+            $id = $candidate_id;
+            last;
+        }
+        return $args->{on_failure}->('Ran out of channel ids') if !$id;
+    }
+
+    my $channel = AnyEvent::RabbitMQ::Channel->new({
+        id         => $id,
+        connection => $self,
+    });
+
+    $self->set_channel($id => $channel);
+
+    $channel->open({
+        on_success => sub {
+            $args->{on_success}->($channel);
+        },
+        on_failure => sub {
+            $self->delete_channel($id);
+            $args->{on_failure}->(@_);
+        },
+    });
+
+    return $self;
+}
+
+sub _push_write_and_read {
+    my ($self, $method, $args, $exp, $cb, $failure_cb, $id,) = @_;
+
+    $method = 'Net::AMQP::Protocol::' . $method;
+    $self->_push_write(
+        Net::AMQP::Frame::Method->new(
+            method_frame => $method->new(%$args)
+        ),
+        $id,
+    );
+
+    return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
+}
+
+sub _push_read_and_valid {
+    my ($self, $exp, $cb, $failure_cb, $id,) = @_;
+    $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
+
+    my $queue = $id ? $self->get_channel($id)->_queue : $self->_queue;
+
+    $queue->get(sub {
+        my $frame = shift;
+
+        return $failure_cb->('Received data is not method frame')
+            if !$frame->isa('Net::AMQP::Frame::Method');
+
+        my $method_frame = $frame->method_frame;
+        for my $exp_elem (@$exp) {
+            return $cb->($frame)
+                if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
+        }
+
+        $failure_cb->(
+              'Method is not ' . join(',', @$exp) . "\n"
+            . 'Method was ' . ref $method_frame
+        );
+    });
+}
+
+sub _push_read {
+    my ($self, $cb, $failure_cb) = @_;
+
+    $self->_handle->push_read(chunk => 8, sub {
+        my $data = $_[1];
+        my $stack = $_[1];
+        return $failure_cb->('Disconnect') if length($data) <= 0;
+
+        my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
+        return $failure_cb->('Broken data was received')
+            if !defined $type_id || !defined $channel || !defined $length;
+
+        $self->_handle->push_read(chunk => $length, sub {
+            $stack .= $_[1];
+            my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
+
+            if ($self->verbose) {
+                print STDERR '[C] <-- [S] ' . Dumper($frame);
+                print STDERR '-----------', "\n";
+            }
+
+            $cb->($frame);
+        });
+    });
+}
+
+sub _push_write {
+    my ($self, $output, $id,) = @_;
+
+    if ($output->isa('Net::AMQP::Protocol::Base')) {
+        $output = $output->frame_wrap;
+    }
+    $output->channel($id || 0);
+
+    if ($self->verbose) {
+        print STDERR '[C] --> [S] ', Dumper($output), "\n";
+    }
+
+    $self->_handle->push_write($output->to_raw_frame());
+    return;
+}
+
+sub DEMOLISH {
+    my ($self) = @_;
+
+    $self->close();
+    return;
+}
+
+1;
+__END__
+
+=head1 NAME
+
+RabbitFoot - A synchronous and single channel Perl AMQP client.
+
+=head1 SYNOPSIS
+
+  use AnyEvent::RabbitMQ;
+
+  my $cv = AnyEvent->condvar;
+
+  my $ar = AnyEvent::RabbitMQ->new({
+      timeout => 1,
+  })->load_xml_spec(
+      '/path/to/amqp0-8.xml',
+  )->connect({
+      host       => 'localhosti',
+      port       => 5672,
+      user       => 'guest',
+      port       => 'guest',
+      vhost      => '/',
+      on_success => sub {
+          $ar->open_channel({
+              on_success => sub {
+                  my $channel = shift;
+                  $channel->declare_exchange({
+                      exchange   => 'test_exchange',
+                      on_success => sub {
+                          $cv->send('Declared exchange');
+                      },
+                      on_failure => $cv,
+                  });
+              },
+              on_failure => $cv,
+          });
+      },
+      on_failure => $cv,
+  });
+
+  print $cv->recv, "\n";
+
+=head1 DESCRIPTION
+
+RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
+
+You can use RabbitFoot to -
+
+  * Declare and delete exchanges
+  * Declare, delete, bind and unbind queues
+  * Set QoS
+  * Publish, consume, get, ack and recover messages
+  * Select, commit and rollback transactions
+
+RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+
+=head1 AUTHOR
+
+Masahito Ikuta E<lt>cooldaemon at gmail.comE<gt>
+
+=head1 SEE ALSO
+
+=head1 LICENSE
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself.
+
+=cut
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
new file mode 100644
index 0000000..9054591
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -0,0 +1,618 @@
+package AnyEvent::RabbitMQ::Channel;
+
+use AnyEvent::RabbitMQ::LocalQueue;
+
+use Moose;
+
+our $VERSION = '0.01';
+
+has id => (
+    isa      => 'Int',
+    is       => 'rw',
+    required => 1,
+);
+
+has connection => (
+    isa      => 'AnyEvent::RabbitMQ',
+    is       => 'rw',
+    required => 1,
+    weak_ref => 1,
+);
+
+has _is_open => (
+    isa     => 'Bool',
+    is      => 'rw',
+    default => 0,
+);
+
+has _queue => (
+    isa     => 'AnyEvent::RabbitMQ::LocalQueue',
+    is      => 'ro',
+    default => sub {
+        AnyEvent::RabbitMQ::LocalQueue->new
+    },
+);
+
+has consumer_cbs => (
+    metaclass => 'Collection::Hash',
+    is        => 'ro',
+    isa       => 'HashRef[CodeRef]',
+    default   => sub {{}},
+    provides  => {
+        set    => 'set_consumer_cbs',
+        get    => 'get_consumer_cbs',
+        empty  => 'has_consumer_cbs',
+        delete => 'delete_consumer_cbs',
+        keys   => 'consumer_tags',
+        count  => 'count_consumer_cbs',
+    },
+);
+
+has return_cbs => (
+    metaclass => 'Collection::Hash',
+    is        => 'ro',
+    isa       => 'HashRef[CodeRef]',
+    default   => sub {{}},
+    provides  => {
+        set    => 'set_return_cbs',
+        get    => 'get_return_cbs',
+    },
+);
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+sub open {
+    my ($self, $args,) = @_;
+
+    $args->{on_success} ||= sub {};
+    $args->{on_failur}  ||= sub {die @_};
+
+    $self->connection->_push_write_and_read(
+        'Channel::Open', {}, 'Channel::OpenOk',
+        sub {
+            $self->{_is_open} = 1;
+            $args->{on_success}->();
+        },
+        $args->{on_failur},
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub close {
+    my ($self, $args,) = @_;
+
+    return $self if !$self->_is_open;
+
+    $args->{on_success} ||= sub {};
+    $args->{on_failur}  ||= sub {die @_};
+
+    return $self->_close($args) if 0 == $self->count_consumer_cbs;
+
+    for my $consumer_tag ($self->consumer_tags) {
+        $self->cancel({
+            consumer_tag => $consumer_tag,
+            on_success   => sub {
+                $self->_close($args);
+            },
+            on_failure   => sub {
+                $self->_close($args);
+                $args->{on_failure}->(@_);
+            }
+        });
+    }
+
+    return $self;
+}
+
+sub _close {
+    my ($self, $args,) = @_;
+
+    $self->connection->_push_write_and_read(
+        'Channel::Close', {}, 'Channel::CloseOk',
+        sub {
+            $self->{_is_open} = 0;
+            $self->connection->delete_channel($self->id);
+            $args->{on_success}->();
+        },
+        sub {
+            $self->{_is_open} = 0;
+            $self->connection->delete_channel($self->id);
+            $args->{on_failur}->();
+        },
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub declare_exchange {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Exchange::Declare',
+        {
+            type        => 'direct',
+            passive     => 0,
+            durable     => 0,
+            auto_delete => 0,
+            internal    => 0,
+            %$args, # exchange
+            ticket      => 0,
+            nowait      => 0, # FIXME
+        },
+        'Exchange::DeclareOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub delete_exchange {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Exchange::Delete',
+        {
+            if_unused => 0,
+            %$args, # exchange
+            ticket    => 0,
+            nowait    => 0, # FIXME
+        },
+        'Exchange::DeleteOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub declare_queue {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Queue::Declare',
+        {
+            queue       => '',
+            passive     => 0,
+            durable     => 0,
+            exclusive   => 0,
+            auto_delete => 0,
+            no_ack      => 1,
+            %$args,
+            ticket      => 0,
+            nowait      => 0, # FIXME
+        },
+        'Queue::DeclareOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+}
+
+sub bind_queue {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Queue::Bind',
+        {
+            %$args, # queue, exchange, routing_key
+            ticket      => 0,
+            nowait      => 0, # FIXME
+        },
+        'Queue::BindOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub unbind_queue {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Queue::Unbind',
+        {
+            %$args, # queue, exchange, routing_key
+            ticket      => 0,
+        },
+        'Queue::UnbindOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub purge_queue {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Queue::Purge',
+        {
+            %$args, # queue
+            ticket => 0,
+            nowait => 0, # FIXME
+        },
+        'Queue::PurgeOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub delete_queue {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Queue::Delete',
+        {
+            if_unused => 0,
+            if_empty  => 0,
+            %$args, # queue
+            ticket    => 0,
+            nowait    => 0, # FIXME
+        },
+        'Queue::DeleteOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub publish {
+    my ($self, $args,) = @_;
+
+    my $header_args = delete $args->{header}    || {};
+    my $body        = delete $args->{body}      || '';
+    my $return_cb   = delete $args->{on_return} || sub {};
+
+    $self->_publish(
+        $args,
+    )->_header(
+        $header_args, $body,
+    )->_body(
+        $body,
+    );
+
+    return $self if !$args->{mandatory} && !$args->{immediate};
+
+    $self->set_return_cbs(
+        ($args->{exchange} || '') . '_' . $args->{routing_key}
+            => $return_cb
+    );
+
+    return $self;
+}
+
+sub _publish {
+    my ($self, $args,) = @_;
+
+    $self->connection->_push_write(
+        Net::AMQP::Protocol::Basic::Publish->new(
+            exchange  => '',
+            mandatory => 0,
+            immediate => 0,
+            %$args, # routing_key
+            ticket    => 0,
+        ),
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub _header {
+    my ($self, $args, $body,) = @_;
+
+    $self->connection->_push_write(
+        Net::AMQP::Frame::Header->new(
+            weight       => $args->{weight} || 0,
+            body_size    => length($body),
+            header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
+                content_type     => 'application/octet-stream',
+                content_encoding => '',
+                headers          => {},
+                delivery_mode    => 1,
+                priority         => 1,
+                correlation_id   => '',
+                reply_to         => '',
+                expiration       => '',
+                message_id       => '',
+                timestamp        => time,
+                type             => '',
+                user_id          => '',
+                app_id           => '',
+                cluster_id       => '',
+                %$args,
+            ),
+        ),
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub _body {
+    my ($self, $body,) = @_;
+
+    $self->connection->_push_write(
+        Net::AMQP::Frame::Body->new(payload => $body),
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub consume {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    my $consumer_cb = delete $args->{on_consume} || sub {};
+    
+    $self->connection->_push_write_and_read(
+        'Basic::Consume',
+        {
+            consumer_tag => '',
+            no_local     => 0,
+            no_ack       => 1,
+            exclusive    => 0,
+            %$args, # queue
+            ticket       => 0,
+            nowait       => 0, # FIXME
+        },
+        'Basic::ConsumeOk', 
+        sub {
+            my $frame = shift;
+            $self->set_consumer_cbs(
+                $frame->method_frame->consumer_tag => $consumer_cb
+            );
+            $cb->($frame);
+        },
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub cancel {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    return $failure_cb->('consumer_tag is not set')
+        if !defined $args->{consumer_tag};
+
+    return $failure_cb->('Unknown consumer_tag')
+        if !$self->has_consumer_cbs($args->{consumer_tag});
+
+    $self->connection->_push_write_and_read(
+        'Basic::Cancel',
+        {
+            %$args, # consumer_tag
+            nowait => 0,
+        },
+        'Basic::CancelOk', 
+        sub {
+            my $frame = shift;
+            $self->delete_consumer_cbs($args->{consumer_tag});
+            $cb->($frame);
+        },
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub get {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Basic::Get',
+        {
+            no_ack => 1,
+            %$args, # queue
+            ticket => 0,
+        },
+        [qw(Basic::GetOk Basic::GetEmpty)], 
+        sub {
+            my $frame = shift;
+            return $cb->({empty => $frame})
+                if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
+            $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
+        },
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub ack {
+    my ($self, $args,) = @_;
+
+    $self->connection->_push_write(
+        Net::AMQP::Protocol::Basic::Ack->new(
+            delivery_tag => 0,
+            multiple     => (
+                defined $args->{delivery_tag} && $args->{delivery_tag} != 0 ? 0 : 1
+            ),
+            %$args,
+        ),
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub qos {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Basic::Qos',
+        {
+            prefetch_count => 1,
+            %$args,
+            prefetch_size  => 0,
+            global         => 0,
+        },
+        'Basic::QosOk', 
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub recover {
+    my ($self, $args,) = @_;
+
+    $self->connection->_push_write(
+        Net::AMQP::Protocol::Basic::Recover->new(
+            requeue => 0,
+            %$args,
+        ),
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub select_tx {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Tx::Select', {}, 'Tx::SelectOk',
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub commit_tx {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Tx::Commit', {}, 'Tx::CommitOk',
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub rollback_tx {
+    my ($self, $args,) = @_;
+    my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+    $self->connection->_push_write_and_read(
+        'Tx::Rollback', {}, 'Tx::RollbackOk',
+        $cb,
+        $failure_cb,
+        $self->id,
+    );
+
+    return $self;
+}
+
+sub _delete_callbacks {
+    my ($self, $args,) = @_;
+
+    my $cb         = delete $args->{on_success} || sub {};
+    my $failure_cb = delete $args->{on_failure} || sub {die @_};
+
+    return $cb, $failure_cb;
+}
+
+sub _push_queue_or_consume {
+    my ($self, $frame, $failure_cb,) = @_;
+
+    if ($frame->isa('Net::AMQP::Frame::Method')) {
+        my $method_frame = $frame->method_frame;
+        if ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
+            my $cb = $self->get_consumer_cbs(
+                $method_frame->consumer_tag
+            ) || sub {};
+            $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
+            return $self;
+        } elsif ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
+            my $cb = $self->get_return_cbs(
+                $method_frame->exchange . '_' . $method_frame->routing_key
+            ) || sub {};
+            $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
+            return $self;
+        }
+    }
+
+    $self->_queue->push($frame);
+    return $self;
+}
+
+sub _push_read_header_and_body {
+    my ($self, $type, $frame, $cb, $failure_cb,) = @_;
+    my $response = {$type => $frame};
+
+    $self->_queue->get(sub{
+        my $frame = shift;
+
+        return $failure_cb->('Received data is not header frame')
+            if !$frame->isa('Net::AMQP::Frame::Header');
+
+        my $header_frame = $frame->header_frame;
+        return $failure_cb->(
+              'Header is not Protocol::Basic::ContentHeader'
+            . 'Header was ' . ref $header_frame
+        ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
+
+        $response->{header} = $header_frame;
+    });
+
+    $self->_queue->get(sub{
+        my $frame = shift;
+
+        return $failure_cb->('Received data is not body frame')
+            if !$frame->isa('Net::AMQP::Frame::Body');
+
+        $response->{body} = $frame;
+        $cb->($response);
+    });
+
+    return $self;
+}
+
+sub DEMOLISH {
+    my ($self) = @_;
+
+    $self->close();
+    return;
+}
+
+1;
diff --git a/lib/AnyEvent/RabbitMQ/LocalQueue.pm b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
new file mode 100644
index 0000000..56b0a8c
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
@@ -0,0 +1,63 @@
+package AnyEvent::RabbitMQ::LocalQueue;
+
+use Moose;
+use MooseX::AttributeHelpers;
+
+our $VERSION = '0.01';
+
+has _message_queue => (
+    metaclass => 'Collection::Array',
+    is        => 'ro',
+    isa       => 'ArrayRef[Any]',
+    default   => sub {[]},
+    provides  => {
+        push  => '_push_message_queue',
+        shift => '_shift_message_queue',
+        count => '_count_message_queue',
+    },
+);
+
+has _drain_code_queue => (
+    metaclass => 'Collection::Array',
+    is        => 'ro',
+    isa       => 'ArrayRef[CodeRef]',
+    default   => sub {[]},
+    provides  => {
+        push  => '_push_drain_code_queue',
+        shift => '_shift_drain_code_queue',
+        count => '_count_drain_code_queue',
+    },
+);
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+sub push {
+    my $self = shift;
+    $self->_push_message_queue(@_);
+    return $self->_drain_queue();
+}
+
+sub get {
+    my $self = shift;
+    $self->_push_drain_code_queue(@_);
+    return $self->_drain_queue();
+}
+
+sub _drain_queue {
+    my $self = shift;
+
+    my $count
+        = $self->_count_message_queue < $self->_count_drain_code_queue
+        ? $self->_count_message_queue : $self->_count_drain_code_queue
+        ;
+
+    for (1 .. $count) {
+        $self->_shift_drain_code_queue->($self->_shift_message_queue);
+    }
+
+    return $self;
+}
+
+1;
+
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index 3ca1322..e08f850 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -606,7 +606,7 @@ sub _read_header_and_valid {
 
     my $frame = $self->_read();
     if (!$frame->isa('Net::AMQP::Frame::Header')) {
-        $self->_check_close_and_cleanup($frame);
+        $self->_check_close_and_clean($frame);
         die 'Received data is not header frame', "\n";
     }
 
@@ -624,7 +624,7 @@ sub _read_body_and_valid {
     my $frame = $self->_read();
     return $frame if $frame->isa('Net::AMQP::Frame::Body');
 
-    $self->_check_close_and_cleanup($frame);
+    $self->_check_close_and_clean($frame);
     die 'Received data is not body frame', "\n";
 }
 
@@ -761,7 +761,7 @@ You can use RabbitFoot to -
   * Publish, consume, get, ack and recover messages
   * Select, commit and rollback transactions
 
-RabbitFoot is known to work with RabbitMQ versions 1.7.0 and version 0-8 of the AMQP specification.
+RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
 
 =head1 AUTHOR
 
diff --git a/t/00_compile.t b/t/00_compile.t
index fcda7d3..73bca90 100644
--- a/t/00_compile.t
+++ b/t/00_compile.t
@@ -1,5 +1,5 @@
 use strict;
-use Test::More tests => 8;
+use Test::More tests => 11;
 
 BEGIN {
     use_ok 'RabbitFoot';
@@ -10,4 +10,8 @@ BEGIN {
     use_ok 'RabbitFoot::Cmd::Command::bind_queue';
     use_ok 'RabbitFoot::Cmd::Command::purge_queue';
     use_ok 'RabbitFoot::Cmd::Command::declare_exchange';
+
+    use_ok 'AnyEvent::RabbitMQ';
+    use_ok 'AnyEvent::RabbitMQ::Channel';
+    use_ok 'AnyEvent::RabbitMQ::LocalQueue';
 }
diff --git a/t/01_localqueue.t b/t/01_localqueue.t
new file mode 100644
index 0000000..3f1ad8e
--- /dev/null
+++ b/t/01_localqueue.t
@@ -0,0 +1,34 @@
+use Test::More tests => 10;
+
+use AnyEvent::RabbitMQ::LocalQueue;
+
+my $q = AnyEvent::RabbitMQ::LocalQueue->new;
+
+$q->push(1);
+$q->get(sub {is $_[0], 1, 'push -> get';});
+
+$q->get(sub {is $_[0], 2, 'get -> push';});
+$q->push(2);
+
+$q->push(3, 4);
+$q->push(5, 6);
+$q->get(
+    sub {is $_[0], 3, '';},
+    sub {is $_[0], 4, '';},
+);
+$q->get(
+    sub {is $_[0], 5, '';},
+    sub {is $_[0], 6, '';},
+);
+
+$q->get(
+    sub {is $_[0], 7, '';},
+    sub {is $_[0], 8, '';},
+);
+$q->get(
+    sub {is $_[0], 9, '';},
+    sub {is $_[0], 10, '';},
+);
+$q->push(7, 8);
+$q->push(9, 10);
+

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git



More information about the Pkg-perl-cvs-commits mailing list