[libanyevent-rabbitmq-perl] 11/151: Added AnyEvent::RabbitMQ. Fixed a bug.
Damyan Ivanov
dmn at moszumanska.debian.org
Thu Jan 16 11:03:00 UTC 2014
This is an automated email from the git hooks/post-receive script.
dmn pushed a commit to annotated tag debian/1.12-1
in repository libanyevent-rabbitmq-perl.
commit 7283cd69b00e33e2ebb4e1826241a966183f7fb2
Author: cooldaemon <cooldaemon at gmail.com>
Date: Sun Feb 7 21:30:30 2010 +0900
Added AnyEvent::RabbitMQ. Fixed a bug.
---
Makefile.PL | 2 +
lib/AnyEvent/RabbitMQ.pm | 550 ++++++++++++++++++++++++++++++++
lib/AnyEvent/RabbitMQ/Channel.pm | 618 ++++++++++++++++++++++++++++++++++++
lib/AnyEvent/RabbitMQ/LocalQueue.pm | 63 ++++
lib/RabbitFoot.pm | 6 +-
t/00_compile.t | 6 +-
t/01_localqueue.t | 34 ++
7 files changed, 1275 insertions(+), 4 deletions(-)
diff --git a/Makefile.PL b/Makefile.PL
index 36a390d..81d4b82 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -3,6 +3,7 @@ name 'RabbitFoot';
all_from 'lib/RabbitFoot.pm';
requires 'Moose';
+requires 'MooseX::AttributeHelpers';
requires 'MooseX::App::Cmd';
requires 'MooseX::ConfigFromFile';
requires 'Config::Any';
@@ -10,6 +11,7 @@ requires 'JSON::Syck';
requires 'List::MoreUtils';
requires 'Sys::SigAction';
requires 'Net::AMQP';
+requires 'AnyEvent';
tests 't/*.t';
author_tests 'xt';
diff --git a/lib/AnyEvent/RabbitMQ.pm b/lib/AnyEvent/RabbitMQ.pm
new file mode 100644
index 0000000..d0aebf4
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ.pm
@@ -0,0 +1,550 @@
+package AnyEvent::RabbitMQ;
+
+use Data::Dumper;
+use List::MoreUtils qw(none);
+use AnyEvent::Handle;
+use AnyEvent::Socket;
+use Net::AMQP;
+use Net::AMQP::Common qw(:all);
+
+use AnyEvent::RabbitMQ::Channel;
+use AnyEvent::RabbitMQ::LocalQueue;
+
+use Moose;
+use MooseX::AttributeHelpers;
+
+our $VERSION = '0.01';
+
+has verbose => (
+ isa => 'Bool',
+ is => 'rw',
+);
+
+has timeout => (
+ isa => 'Int',
+ is => 'rw',
+ default => 1,
+);
+
+has _connect_guard => (
+ isa => 'Guard',
+ is => 'ro',
+ clearer => 'clear_connect_guard',
+);
+
+has _handle => (
+ isa => 'AnyEvent::Handle',
+ is => 'ro',
+ clearer => 'clear_handle',
+);
+
+has _is_open => (
+ isa => 'Bool',
+ is => 'ro',
+ default => 0,
+);
+
+has channels => (
+ metaclass => 'Collection::Hash',
+ is => 'ro',
+ isa => 'HashRef[AnyEvent::RabbitMQ::Channel]',
+ default => sub {{}},
+ provides => {
+ set => 'set_channel',
+ get => 'get_channel',
+ empty => 'has_channels',
+ delete => 'delete_channel',
+ keys => 'channel_ids',
+ count => 'count_channels',
+ },
+);
+
+has _queue => (
+ isa => 'AnyEvent::RabbitMQ::LocalQueue',
+ is => 'ro',
+ default => sub {
+ AnyEvent::RabbitMQ::LocalQueue->new
+ },
+);
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+sub load_xml_spec {
+ my ($self, $file,) = @_;
+ Net::AMQP::Protocol->load_xml_spec($file); # die when fail in this line.
+ return $self;
+}
+
+sub connect {
+ my ($self, $args,) = @_;
+
+ $args->{on_success} ||= sub {};
+ $args->{on_failure} ||= sub {die @_};
+
+ if ($self->verbose) {
+ print STDERR 'connect to ', $args->{host}, ':', $args->{port}, '...', "\n";
+ }
+
+ $self->{_connect_guard} = AnyEvent::Socket::tcp_connect(
+ $args->{host},
+ $args->{port},
+ sub {
+ my $fh = shift
+ or return $args->{on_failure}->('Error connecting to AMQP Server!');
+ $self->{_handle} = AnyEvent::Handle->new(
+ fh => $fh,
+ on_error => sub {
+ my ($handle, $fatal, $message) = @_;
+ $self->clear_handle;
+ $args->{on_failure}->($message);
+ }
+ );
+ $self->_read_loop($args->{on_failure});
+ $self->_start($args,);
+ },
+ sub {
+ return $self->timeout;
+ },
+ );
+
+ return $self;
+}
+
+sub _read_loop {
+ my ($self, $failure_cb,) = @_;
+
+ return if !$self->_handle;
+
+ $self->_handle->push_read(chunk => 8, sub {
+ my $data = $_[1];
+ my $stack = $_[1];
+
+ if (length($data) <= 0) {
+ $failure_cb->('Disconnect');
+ @_ = ($self, $failure_cb,);
+ goto &_read_loop;
+ }
+
+ my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
+ if (!defined $type_id || !defined $channel || !defined $length) {
+ $failure_cb->('Broken data was received');
+ @_ = ($self, $failure_cb,);
+ goto &_read_loop;
+ }
+
+ $self->_handle->push_read(chunk => $length, sub {
+ $stack .= $_[1];
+ my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
+
+ if ($self->verbose) {
+ print STDERR '[C] <-- [S] ' . Dumper($frame);
+ print STDERR '-----------', "\n";
+ }
+
+ return if !$self->_check_close_and_clean($frame, $failure_cb,);
+
+ my $id = $frame->channel;
+ if (0 == $id) {
+ $self->_queue->push($frame);
+ } elsif ($self->has_channels($id)) {
+ $self->get_channel($id)->_push_queue_or_consume($frame, $failure_cb);
+ } else {
+ $failure_cb->('Unknown channel id: ' . $frame->channel);
+ }
+
+ @_ = ($self, $failure_cb,);
+ goto &_read_loop;
+ });
+ });
+
+ return $self;
+}
+
+sub _check_close_and_clean {
+ my ($self, $frame, $failure_cb, $id,) = @_;
+
+ return 1 if !$frame->isa('Net::AMQP::Frame::Method');
+
+ my $method_frame = $frame->method_frame;
+
+ if ($method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
+ $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
+ $self->{_is_open} = 0;
+ $self->_disconnect();
+ $failure_cb->(
+ $method_frame->reply_code . ' ' . $method_frame->reply_text
+ );
+ return;
+ } elsif ($method_frame->isa('Net::AMQP::Protocol::Channel::Close')) {
+ $self->_push_write(Net::AMQP::Protocol::Channel::CloseOk->new(), $id,);
+
+ my $id = $frame->channel;
+ my $message = $method_frame->reply_code . ' ' . $method_frame->reply_text;
+
+ my $channel = $self->get_channel($id);
+ if (!$channel) {
+ $failure_cb->("Unknown channel id: ${id}\n${message}");
+ return;
+ }
+
+ $channel->_is_open(0);
+ $self->delete_channel($id);
+ $failure_cb->($message);
+ return;
+ }
+
+ return 1;
+}
+
+sub _start {
+ my ($self, $args,) = @_;
+
+ if ($self->verbose) {
+ print STDERR 'post header', "\n";
+ }
+
+ $self->_handle->push_write(Net::AMQP::Protocol->header);
+
+ $self->_push_read_and_valid(
+ 'Connection::Start',
+ sub {
+ my $frame = shift;
+
+ my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
+ return $args->{on_failure}->('AMQPLAIN is not found in mechanisms')
+ if none {$_ eq 'AMQPLAIN'} @mechanisms;
+
+ my @locales = split /\s/, $frame->method_frame->locales;
+ return $args->{on_failure}->('en_US is not found in locales')
+ if none {$_ eq 'en_US'} @locales;
+
+ $self->_push_write(
+ Net::AMQP::Protocol::Connection::StartOk->new(
+ client_properties => {
+ platform => 'Perl',
+ product => __PACKAGE__,
+ information => 'http://d.hatena.ne.jp/cooldaemon/',
+ version => '0.01',
+ },
+ mechanism => 'AMQPLAIN',
+ response => {
+ LOGIN => $args->{user},
+ PASSWORD => $args->{pass},
+ },
+ locale => 'en_US',
+ ),
+ );
+
+ $self->_tune($args,);
+ },
+ $args->{on_failure},
+ );
+
+ return $self;
+}
+
+sub _tune {
+ my ($self, $args,) = @_;
+
+ $self->_push_read_and_valid(
+ 'Connection::Tune',
+ sub {
+ my $frame = shift;
+
+ $self->_push_write(
+ Net::AMQP::Protocol::Connection::TuneOk->new(
+ channel_max => $frame->method_frame->channel_max,
+ frame_max => $frame->method_frame->frame_max,
+ heartbeat => $frame->method_frame->heartbeat,
+ ),
+ );
+
+ $self->_open($args,);
+ },
+ $args->{on_failure},
+ );
+
+ return $self;
+}
+
+sub _open {
+ my ($self, $args,) = @_;
+
+ $self->_push_write_and_read(
+ 'Connection::Open',
+ {
+ virtual_host => $args->{vhost},
+ capabilities => '',
+ insist => 1,
+ },
+ 'Connection::OpenOk',
+ sub {
+ $self->{_is_open} = 1;
+ $args->{on_success}->();
+ },
+ $args->{on_failure},
+ );
+
+ return $self;
+}
+
+sub close {
+ my ($self, $args,) = @_;
+
+ $args->{on_success} ||= sub {};
+ $args->{on_failure} ||= sub {die @_};
+
+ my $close_cb = sub {
+ $self->_close(
+ sub {
+ $self->_disconnect();
+ $args->{on_success}->(@_);
+ },
+ sub {
+ $self->_disconnect();
+ $args->{on_failure}->(@_);
+ }
+ );
+ return $self;
+ };
+
+# if (0 == $self->count_channels) {
+ if (0 == scalar keys %{$self->channels}) { # FIXME
+ return $close_cb->();
+ }
+
+# for my $id ($self->channel_ids) {
+ for my $id (keys %{$self->channels}) { # FIXME
+# $self->get_channel($id)->close({
+ $self->channels->{$id}->close({ # FIXME
+ on_success => $close_cb,
+ on_failure => sub {
+ $close_cb->();
+ $args->{on_failure}->(@_);
+ },
+ });
+ }
+
+ return $self;
+}
+
+sub _close {
+ my ($self, $cb, $failure_cb,) = @_;
+
+ return $self if !$self->_is_open || 0 < $self->count_channels;
+
+ $self->_push_write_and_read(
+ 'Connection::Close', {}, 'Connection::CloseOk',
+ $cb, $failure_cb,
+ );
+ $self->{_is_open} = 0;
+
+ return $self;
+}
+
+sub _disconnect {
+ my ($self,) = @_;
+
+ $self->clear_connect_guard;
+ $self->clear_handle;
+
+ return;
+}
+
+sub open_channel {
+ my ($self, $args,) = @_;
+
+ $args->{on_success} ||= sub {};
+ $args->{on_failure} ||= sub {die @_};
+
+ my $id = $args->{id};
+ return $args->{on_failure}->("Channel id $id is already in use")
+ if $id && $self->has_channels($id);
+
+ if (!$id) {
+ for my $candidate_id (1 .. (2**16 - 1)) { # FIXME
+ next if $self->has_channels($candidate_id);
+ $id = $candidate_id;
+ last;
+ }
+ return $args->{on_failure}->('Ran out of channel ids') if !$id;
+ }
+
+ my $channel = AnyEvent::RabbitMQ::Channel->new({
+ id => $id,
+ connection => $self,
+ });
+
+ $self->set_channel($id => $channel);
+
+ $channel->open({
+ on_success => sub {
+ $args->{on_success}->($channel);
+ },
+ on_failure => sub {
+ $self->delete_channel($id);
+ $args->{on_failure}->(@_);
+ },
+ });
+
+ return $self;
+}
+
+sub _push_write_and_read {
+ my ($self, $method, $args, $exp, $cb, $failure_cb, $id,) = @_;
+
+ $method = 'Net::AMQP::Protocol::' . $method;
+ $self->_push_write(
+ Net::AMQP::Frame::Method->new(
+ method_frame => $method->new(%$args)
+ ),
+ $id,
+ );
+
+ return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
+}
+
+sub _push_read_and_valid {
+ my ($self, $exp, $cb, $failure_cb, $id,) = @_;
+ $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];
+
+ my $queue = $id ? $self->get_channel($id)->_queue : $self->_queue;
+
+ $queue->get(sub {
+ my $frame = shift;
+
+ return $failure_cb->('Received data is not method frame')
+ if !$frame->isa('Net::AMQP::Frame::Method');
+
+ my $method_frame = $frame->method_frame;
+ for my $exp_elem (@$exp) {
+ return $cb->($frame)
+ if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
+ }
+
+ $failure_cb->(
+ 'Method is not ' . join(',', @$exp) . "\n"
+ . 'Method was ' . ref $method_frame
+ );
+ });
+}
+
+sub _push_read {
+ my ($self, $cb, $failure_cb) = @_;
+
+ $self->_handle->push_read(chunk => 8, sub {
+ my $data = $_[1];
+ my $stack = $_[1];
+ return $failure_cb->('Disconnect') if length($data) <= 0;
+
+ my ($type_id, $channel, $length,) = unpack 'CnN', substr $data, 0, 7, '';
+ return $failure_cb->('Broken data was received')
+ if !defined $type_id || !defined $channel || !defined $length;
+
+ $self->_handle->push_read(chunk => $length, sub {
+ $stack .= $_[1];
+ my ($frame) = Net::AMQP->parse_raw_frames(\$stack);
+
+ if ($self->verbose) {
+ print STDERR '[C] <-- [S] ' . Dumper($frame);
+ print STDERR '-----------', "\n";
+ }
+
+ $cb->($frame);
+ });
+ });
+}
+
+sub _push_write {
+ my ($self, $output, $id,) = @_;
+
+ if ($output->isa('Net::AMQP::Protocol::Base')) {
+ $output = $output->frame_wrap;
+ }
+ $output->channel($id || 0);
+
+ if ($self->verbose) {
+ print STDERR '[C] --> [S] ', Dumper($output), "\n";
+ }
+
+ $self->_handle->push_write($output->to_raw_frame());
+ return;
+}
+
+sub DEMOLISH {
+ my ($self) = @_;
+
+ $self->close();
+ return;
+}
+
+1;
+__END__
+
+=head1 NAME
+
+RabbitFoot - A synchronous and single channel Perl AMQP client.
+
+=head1 SYNOPSIS
+
+ use AnyEvent::RabbitMQ;
+
+ my $cv = AnyEvent->condvar;
+
+ my $ar = AnyEvent::RabbitMQ->new({
+ timeout => 1,
+ })->load_xml_spec(
+ '/path/to/amqp0-8.xml',
+ )->connect({
+ host => 'localhosti',
+ port => 5672,
+ user => 'guest',
+ port => 'guest',
+ vhost => '/',
+ on_success => sub {
+ $ar->open_channel({
+ on_success => sub {
+ my $channel = shift;
+ $channel->declare_exchange({
+ exchange => 'test_exchange',
+ on_success => sub {
+ $cv->send('Declared exchange');
+ },
+ on_failure => $cv,
+ });
+ },
+ on_failure => $cv,
+ });
+ },
+ on_failure => $cv,
+ });
+
+ print $cv->recv, "\n";
+
+=head1 DESCRIPTION
+
+RabbitFoot is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in a Asynchronous fashion.
+
+You can use RabbitFoot to -
+
+ * Declare and delete exchanges
+ * Declare, delete, bind and unbind queues
+ * Set QoS
+ * Publish, consume, get, ack and recover messages
+ * Select, commit and rollback transactions
+
+RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
+
+=head1 AUTHOR
+
+Masahito Ikuta E<lt>cooldaemon at gmail.comE<gt>
+
+=head1 SEE ALSO
+
+=head1 LICENSE
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself.
+
+=cut
diff --git a/lib/AnyEvent/RabbitMQ/Channel.pm b/lib/AnyEvent/RabbitMQ/Channel.pm
new file mode 100644
index 0000000..9054591
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ/Channel.pm
@@ -0,0 +1,618 @@
+package AnyEvent::RabbitMQ::Channel;
+
+use AnyEvent::RabbitMQ::LocalQueue;
+
+use Moose;
+
+our $VERSION = '0.01';
+
+has id => (
+ isa => 'Int',
+ is => 'rw',
+ required => 1,
+);
+
+has connection => (
+ isa => 'AnyEvent::RabbitMQ',
+ is => 'rw',
+ required => 1,
+ weak_ref => 1,
+);
+
+has _is_open => (
+ isa => 'Bool',
+ is => 'rw',
+ default => 0,
+);
+
+has _queue => (
+ isa => 'AnyEvent::RabbitMQ::LocalQueue',
+ is => 'ro',
+ default => sub {
+ AnyEvent::RabbitMQ::LocalQueue->new
+ },
+);
+
+has consumer_cbs => (
+ metaclass => 'Collection::Hash',
+ is => 'ro',
+ isa => 'HashRef[CodeRef]',
+ default => sub {{}},
+ provides => {
+ set => 'set_consumer_cbs',
+ get => 'get_consumer_cbs',
+ empty => 'has_consumer_cbs',
+ delete => 'delete_consumer_cbs',
+ keys => 'consumer_tags',
+ count => 'count_consumer_cbs',
+ },
+);
+
+has return_cbs => (
+ metaclass => 'Collection::Hash',
+ is => 'ro',
+ isa => 'HashRef[CodeRef]',
+ default => sub {{}},
+ provides => {
+ set => 'set_return_cbs',
+ get => 'get_return_cbs',
+ },
+);
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+sub open {
+ my ($self, $args,) = @_;
+
+ $args->{on_success} ||= sub {};
+ $args->{on_failur} ||= sub {die @_};
+
+ $self->connection->_push_write_and_read(
+ 'Channel::Open', {}, 'Channel::OpenOk',
+ sub {
+ $self->{_is_open} = 1;
+ $args->{on_success}->();
+ },
+ $args->{on_failur},
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub close {
+ my ($self, $args,) = @_;
+
+ return $self if !$self->_is_open;
+
+ $args->{on_success} ||= sub {};
+ $args->{on_failur} ||= sub {die @_};
+
+ return $self->_close($args) if 0 == $self->count_consumer_cbs;
+
+ for my $consumer_tag ($self->consumer_tags) {
+ $self->cancel({
+ consumer_tag => $consumer_tag,
+ on_success => sub {
+ $self->_close($args);
+ },
+ on_failure => sub {
+ $self->_close($args);
+ $args->{on_failure}->(@_);
+ }
+ });
+ }
+
+ return $self;
+}
+
+sub _close {
+ my ($self, $args,) = @_;
+
+ $self->connection->_push_write_and_read(
+ 'Channel::Close', {}, 'Channel::CloseOk',
+ sub {
+ $self->{_is_open} = 0;
+ $self->connection->delete_channel($self->id);
+ $args->{on_success}->();
+ },
+ sub {
+ $self->{_is_open} = 0;
+ $self->connection->delete_channel($self->id);
+ $args->{on_failur}->();
+ },
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub declare_exchange {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Exchange::Declare',
+ {
+ type => 'direct',
+ passive => 0,
+ durable => 0,
+ auto_delete => 0,
+ internal => 0,
+ %$args, # exchange
+ ticket => 0,
+ nowait => 0, # FIXME
+ },
+ 'Exchange::DeclareOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub delete_exchange {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Exchange::Delete',
+ {
+ if_unused => 0,
+ %$args, # exchange
+ ticket => 0,
+ nowait => 0, # FIXME
+ },
+ 'Exchange::DeleteOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub declare_queue {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Queue::Declare',
+ {
+ queue => '',
+ passive => 0,
+ durable => 0,
+ exclusive => 0,
+ auto_delete => 0,
+ no_ack => 1,
+ %$args,
+ ticket => 0,
+ nowait => 0, # FIXME
+ },
+ 'Queue::DeclareOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+}
+
+sub bind_queue {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Queue::Bind',
+ {
+ %$args, # queue, exchange, routing_key
+ ticket => 0,
+ nowait => 0, # FIXME
+ },
+ 'Queue::BindOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub unbind_queue {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Queue::Unbind',
+ {
+ %$args, # queue, exchange, routing_key
+ ticket => 0,
+ },
+ 'Queue::UnbindOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub purge_queue {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Queue::Purge',
+ {
+ %$args, # queue
+ ticket => 0,
+ nowait => 0, # FIXME
+ },
+ 'Queue::PurgeOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub delete_queue {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Queue::Delete',
+ {
+ if_unused => 0,
+ if_empty => 0,
+ %$args, # queue
+ ticket => 0,
+ nowait => 0, # FIXME
+ },
+ 'Queue::DeleteOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub publish {
+ my ($self, $args,) = @_;
+
+ my $header_args = delete $args->{header} || {};
+ my $body = delete $args->{body} || '';
+ my $return_cb = delete $args->{on_return} || sub {};
+
+ $self->_publish(
+ $args,
+ )->_header(
+ $header_args, $body,
+ )->_body(
+ $body,
+ );
+
+ return $self if !$args->{mandatory} && !$args->{immediate};
+
+ $self->set_return_cbs(
+ ($args->{exchange} || '') . '_' . $args->{routing_key}
+ => $return_cb
+ );
+
+ return $self;
+}
+
+sub _publish {
+ my ($self, $args,) = @_;
+
+ $self->connection->_push_write(
+ Net::AMQP::Protocol::Basic::Publish->new(
+ exchange => '',
+ mandatory => 0,
+ immediate => 0,
+ %$args, # routing_key
+ ticket => 0,
+ ),
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub _header {
+ my ($self, $args, $body,) = @_;
+
+ $self->connection->_push_write(
+ Net::AMQP::Frame::Header->new(
+ weight => $args->{weight} || 0,
+ body_size => length($body),
+ header_frame => Net::AMQP::Protocol::Basic::ContentHeader->new(
+ content_type => 'application/octet-stream',
+ content_encoding => '',
+ headers => {},
+ delivery_mode => 1,
+ priority => 1,
+ correlation_id => '',
+ reply_to => '',
+ expiration => '',
+ message_id => '',
+ timestamp => time,
+ type => '',
+ user_id => '',
+ app_id => '',
+ cluster_id => '',
+ %$args,
+ ),
+ ),
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub _body {
+ my ($self, $body,) = @_;
+
+ $self->connection->_push_write(
+ Net::AMQP::Frame::Body->new(payload => $body),
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub consume {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ my $consumer_cb = delete $args->{on_consume} || sub {};
+
+ $self->connection->_push_write_and_read(
+ 'Basic::Consume',
+ {
+ consumer_tag => '',
+ no_local => 0,
+ no_ack => 1,
+ exclusive => 0,
+ %$args, # queue
+ ticket => 0,
+ nowait => 0, # FIXME
+ },
+ 'Basic::ConsumeOk',
+ sub {
+ my $frame = shift;
+ $self->set_consumer_cbs(
+ $frame->method_frame->consumer_tag => $consumer_cb
+ );
+ $cb->($frame);
+ },
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub cancel {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ return $failure_cb->('consumer_tag is not set')
+ if !defined $args->{consumer_tag};
+
+ return $failure_cb->('Unknown consumer_tag')
+ if !$self->has_consumer_cbs($args->{consumer_tag});
+
+ $self->connection->_push_write_and_read(
+ 'Basic::Cancel',
+ {
+ %$args, # consumer_tag
+ nowait => 0,
+ },
+ 'Basic::CancelOk',
+ sub {
+ my $frame = shift;
+ $self->delete_consumer_cbs($args->{consumer_tag});
+ $cb->($frame);
+ },
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub get {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Basic::Get',
+ {
+ no_ack => 1,
+ %$args, # queue
+ ticket => 0,
+ },
+ [qw(Basic::GetOk Basic::GetEmpty)],
+ sub {
+ my $frame = shift;
+ return $cb->({empty => $frame})
+ if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
+ $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
+ },
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub ack {
+ my ($self, $args,) = @_;
+
+ $self->connection->_push_write(
+ Net::AMQP::Protocol::Basic::Ack->new(
+ delivery_tag => 0,
+ multiple => (
+ defined $args->{delivery_tag} && $args->{delivery_tag} != 0 ? 0 : 1
+ ),
+ %$args,
+ ),
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub qos {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Basic::Qos',
+ {
+ prefetch_count => 1,
+ %$args,
+ prefetch_size => 0,
+ global => 0,
+ },
+ 'Basic::QosOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub recover {
+ my ($self, $args,) = @_;
+
+ $self->connection->_push_write(
+ Net::AMQP::Protocol::Basic::Recover->new(
+ requeue => 0,
+ %$args,
+ ),
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub select_tx {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Tx::Select', {}, 'Tx::SelectOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub commit_tx {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Tx::Commit', {}, 'Tx::CommitOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub rollback_tx {
+ my ($self, $args,) = @_;
+ my ($cb, $failure_cb,) = $self->_delete_callbacks($args);
+
+ $self->connection->_push_write_and_read(
+ 'Tx::Rollback', {}, 'Tx::RollbackOk',
+ $cb,
+ $failure_cb,
+ $self->id,
+ );
+
+ return $self;
+}
+
+sub _delete_callbacks {
+ my ($self, $args,) = @_;
+
+ my $cb = delete $args->{on_success} || sub {};
+ my $failure_cb = delete $args->{on_failure} || sub {die @_};
+
+ return $cb, $failure_cb;
+}
+
+sub _push_queue_or_consume {
+ my ($self, $frame, $failure_cb,) = @_;
+
+ if ($frame->isa('Net::AMQP::Frame::Method')) {
+ my $method_frame = $frame->method_frame;
+ if ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Deliver')) {
+ my $cb = $self->get_consumer_cbs(
+ $method_frame->consumer_tag
+ ) || sub {};
+ $self->_push_read_header_and_body('deliver', $frame, $cb, $failure_cb);
+ return $self;
+ } elsif ($frame->method_frame->isa('Net::AMQP::Protocol::Basic::Return')) {
+ my $cb = $self->get_return_cbs(
+ $method_frame->exchange . '_' . $method_frame->routing_key
+ ) || sub {};
+ $self->_push_read_header_and_body('return', $frame, $cb, $failure_cb);
+ return $self;
+ }
+ }
+
+ $self->_queue->push($frame);
+ return $self;
+}
+
+sub _push_read_header_and_body {
+ my ($self, $type, $frame, $cb, $failure_cb,) = @_;
+ my $response = {$type => $frame};
+
+ $self->_queue->get(sub{
+ my $frame = shift;
+
+ return $failure_cb->('Received data is not header frame')
+ if !$frame->isa('Net::AMQP::Frame::Header');
+
+ my $header_frame = $frame->header_frame;
+ return $failure_cb->(
+ 'Header is not Protocol::Basic::ContentHeader'
+ . 'Header was ' . ref $header_frame
+ ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
+
+ $response->{header} = $header_frame;
+ });
+
+ $self->_queue->get(sub{
+ my $frame = shift;
+
+ return $failure_cb->('Received data is not body frame')
+ if !$frame->isa('Net::AMQP::Frame::Body');
+
+ $response->{body} = $frame;
+ $cb->($response);
+ });
+
+ return $self;
+}
+
+sub DEMOLISH {
+ my ($self) = @_;
+
+ $self->close();
+ return;
+}
+
+1;
diff --git a/lib/AnyEvent/RabbitMQ/LocalQueue.pm b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
new file mode 100644
index 0000000..56b0a8c
--- /dev/null
+++ b/lib/AnyEvent/RabbitMQ/LocalQueue.pm
@@ -0,0 +1,63 @@
+package AnyEvent::RabbitMQ::LocalQueue;
+
+use Moose;
+use MooseX::AttributeHelpers;
+
+our $VERSION = '0.01';
+
+has _message_queue => (
+ metaclass => 'Collection::Array',
+ is => 'ro',
+ isa => 'ArrayRef[Any]',
+ default => sub {[]},
+ provides => {
+ push => '_push_message_queue',
+ shift => '_shift_message_queue',
+ count => '_count_message_queue',
+ },
+);
+
+has _drain_code_queue => (
+ metaclass => 'Collection::Array',
+ is => 'ro',
+ isa => 'ArrayRef[CodeRef]',
+ default => sub {[]},
+ provides => {
+ push => '_push_drain_code_queue',
+ shift => '_shift_drain_code_queue',
+ count => '_count_drain_code_queue',
+ },
+);
+
+__PACKAGE__->meta->make_immutable;
+no Moose;
+
+sub push {
+ my $self = shift;
+ $self->_push_message_queue(@_);
+ return $self->_drain_queue();
+}
+
+sub get {
+ my $self = shift;
+ $self->_push_drain_code_queue(@_);
+ return $self->_drain_queue();
+}
+
+sub _drain_queue {
+ my $self = shift;
+
+ my $count
+ = $self->_count_message_queue < $self->_count_drain_code_queue
+ ? $self->_count_message_queue : $self->_count_drain_code_queue
+ ;
+
+ for (1 .. $count) {
+ $self->_shift_drain_code_queue->($self->_shift_message_queue);
+ }
+
+ return $self;
+}
+
+1;
+
diff --git a/lib/RabbitFoot.pm b/lib/RabbitFoot.pm
index 3ca1322..e08f850 100644
--- a/lib/RabbitFoot.pm
+++ b/lib/RabbitFoot.pm
@@ -606,7 +606,7 @@ sub _read_header_and_valid {
my $frame = $self->_read();
if (!$frame->isa('Net::AMQP::Frame::Header')) {
- $self->_check_close_and_cleanup($frame);
+ $self->_check_close_and_clean($frame);
die 'Received data is not header frame', "\n";
}
@@ -624,7 +624,7 @@ sub _read_body_and_valid {
my $frame = $self->_read();
return $frame if $frame->isa('Net::AMQP::Frame::Body');
- $self->_check_close_and_cleanup($frame);
+ $self->_check_close_and_clean($frame);
die 'Received data is not body frame', "\n";
}
@@ -761,7 +761,7 @@ You can use RabbitFoot to -
* Publish, consume, get, ack and recover messages
* Select, commit and rollback transactions
-RabbitFoot is known to work with RabbitMQ versions 1.7.0 and version 0-8 of the AMQP specification.
+RabbitFoot is known to work with RabbitMQ versions 1.7.1 and version 0-8 of the AMQP specification.
=head1 AUTHOR
diff --git a/t/00_compile.t b/t/00_compile.t
index fcda7d3..73bca90 100644
--- a/t/00_compile.t
+++ b/t/00_compile.t
@@ -1,5 +1,5 @@
use strict;
-use Test::More tests => 8;
+use Test::More tests => 11;
BEGIN {
use_ok 'RabbitFoot';
@@ -10,4 +10,8 @@ BEGIN {
use_ok 'RabbitFoot::Cmd::Command::bind_queue';
use_ok 'RabbitFoot::Cmd::Command::purge_queue';
use_ok 'RabbitFoot::Cmd::Command::declare_exchange';
+
+ use_ok 'AnyEvent::RabbitMQ';
+ use_ok 'AnyEvent::RabbitMQ::Channel';
+ use_ok 'AnyEvent::RabbitMQ::LocalQueue';
}
diff --git a/t/01_localqueue.t b/t/01_localqueue.t
new file mode 100644
index 0000000..3f1ad8e
--- /dev/null
+++ b/t/01_localqueue.t
@@ -0,0 +1,34 @@
+use Test::More tests => 10;
+
+use AnyEvent::RabbitMQ::LocalQueue;
+
+my $q = AnyEvent::RabbitMQ::LocalQueue->new;
+
+$q->push(1);
+$q->get(sub {is $_[0], 1, 'push -> get';});
+
+$q->get(sub {is $_[0], 2, 'get -> push';});
+$q->push(2);
+
+$q->push(3, 4);
+$q->push(5, 6);
+$q->get(
+ sub {is $_[0], 3, '';},
+ sub {is $_[0], 4, '';},
+);
+$q->get(
+ sub {is $_[0], 5, '';},
+ sub {is $_[0], 6, '';},
+);
+
+$q->get(
+ sub {is $_[0], 7, '';},
+ sub {is $_[0], 8, '';},
+);
+$q->get(
+ sub {is $_[0], 9, '';},
+ sub {is $_[0], 10, '';},
+);
+$q->push(7, 8);
+$q->push(9, 10);
+
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libanyevent-rabbitmq-perl.git
More information about the Pkg-perl-cvs-commits
mailing list