[DRE-commits] [ruby-bunny] 01/01: Imported Upstream version 1.1.2
Jonas Genannt
jonas at brachium-system.net
Fri Feb 14 17:13:26 UTC 2014
This is an automated email from the git hooks/post-receive script.
hggh-guest pushed a commit to annotated tag upstream/1.1.2
in repository ruby-bunny.
commit 0a31701ec79725a56b41fbe71b8bc4d47f3e2cf6
Author: Jonas Genannt <jonas at brachium-system.net>
Date: Fri Feb 14 18:13:05 2014 +0100
Imported Upstream version 1.1.2
---
ChangeLog.md | 33 +++++-
checksums.yaml.gz | Bin 270 -> 267 bytes
lib/bunny/channel.rb | 21 +++-
lib/bunny/exchange.rb | 8 ++
lib/bunny/version.rb | 2 +-
metadata.yml | 4 +-
.../integration/basic_consume_spec.rb | 119 +++++++++++++++++++++
.../integration/exchange_declare_spec.rb | 23 ++++
8 files changed, 203 insertions(+), 7 deletions(-)
diff --git a/ChangeLog.md b/ChangeLog.md
index 05f09ae..d213305 100644
--- a/ChangeLog.md
+++ b/ChangeLog.md
@@ -1,4 +1,35 @@
-## Changes between Bunny 1.1.0.rc1 and 1.1.0.rc2
+## Changes between Bunny 1.1.1 and 1.1.2
+
+### Internal Exchanges
+
+Exchanges now can be declared as internal:
+
+``` ruby
+ch = conn.create_channel
+x = ch.fanout("bunny.tests.exchanges.internal", :internal => true)
+```
+
+Internal exchanges cannot be published to by clients and are solely used
+for [Exchange-to-Exchange bindings](http://rabbitmq.com/e2e.html) and various
+plugins but apps may still need to bind them. Now it is possible
+to do so with Bunny.
+
+
+## Changes between Bunny 1.1.0 and 1.1.1
+
+### Uncaught Consumer Exceptions
+
+Uncaught consumer exceptions are now handled by uncaught exceptions
+handler that can be defined per channel:
+
+``` ruby
+ch.on_uncaught_exception do |e, consumer|
+ # ...
+end
+```
+
+
+## Changes between Bunny 1.1.0.rc1 and 1.1.0
### Synchronized Session#create_channel and Session#close_channel
diff --git a/checksums.yaml.gz b/checksums.yaml.gz
index ced0bc9..695ce47 100644
Binary files a/checksums.yaml.gz and b/checksums.yaml.gz differ
diff --git a/lib/bunny/channel.rb b/lib/bunny/channel.rb
index dad9e13..701e98e 100644
--- a/lib/bunny/channel.rb
+++ b/lib/bunny/channel.rb
@@ -193,6 +193,9 @@ module Bunny
@next_publish_seq_no = 0
@recoveries_counter = Bunny::Concurrent::AtomicFixnum.new(0)
+ @uncaught_exception_handler = Proc.new do |e, consumer|
+ @logger.error "Uncaught exception from consumer #{consumer.to_s}: #{e.message}"
+ end
end
attr_reader :recoveries_counter
@@ -1112,8 +1115,8 @@ module Bunny
opts.fetch(:passive, false),
opts.fetch(:durable, false),
opts.fetch(:auto_delete, false),
- false,
- false,
+ opts.fetch(:internal, false),
+ false, # nowait
opts[:arguments]))
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_declare_ok = wait_on_continuations
@@ -1406,6 +1409,13 @@ module Bunny
@on_error = block
end
+ # Defines a handler for uncaught exceptions in consumers
+ # (e.g. delivered message handlers).
+ #
+ # @api public
+ def on_uncaught_exception(&block)
+ @uncaught_exception_handler = block
+ end
#
# Recovery
@@ -1569,6 +1579,7 @@ module Bunny
consumer.handle_cancellation(method)
rescue Exception => e
@logger.error "Got exception when notifying consumer #{method.consumer_tag} about cancellation!"
+ @uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler
end
end
else
@@ -1627,7 +1638,11 @@ module Bunny
consumer = @consumers[basic_deliver.consumer_tag]
if consumer
@work_pool.submit do
- consumer.call(DeliveryInfo.new(basic_deliver, consumer, self), MessageProperties.new(properties), content)
+ begin
+ consumer.call(DeliveryInfo.new(basic_deliver, consumer, self), MessageProperties.new(properties), content)
+ rescue StandardError => e
+ @uncaught_exception_handler.call(e, consumer) if @uncaught_exception_handler
+ end
end
else
@logger.warn "No consumer for tag #{basic_deliver.consumer_tag} on channel #{@id}!"
diff --git a/lib/bunny/exchange.rb b/lib/bunny/exchange.rb
index cf84cde..10026ee 100644
--- a/lib/bunny/exchange.rb
+++ b/lib/bunny/exchange.rb
@@ -85,6 +85,7 @@ module Bunny
@durable = @options[:durable]
@auto_delete = @options[:auto_delete]
+ @internal = @options[:internal]
@arguments = @options[:arguments]
declare! unless opts[:no_declare] || predeclared? || (@name == AMQ::Protocol::EMPTY_STRING)
@@ -104,6 +105,12 @@ module Bunny
@auto_delete
end # auto_delete?
+ # @return [Boolean] true if this exchange is internal (used solely for exchange-to-exchange
+ # bindings and cannot be published to by clients)
+ def internal?
+ @internal
+ end
+
# @return [Hash] Additional optional arguments (typically used by RabbitMQ extensions and plugins)
# @api public
def arguments
@@ -261,6 +268,7 @@ module Bunny
:passive => false,
:durable => false,
:auto_delete => false,
+ :internal => false,
:arguments => nil
}.merge(h)
else
diff --git a/lib/bunny/version.rb b/lib/bunny/version.rb
index 327ceeb..e7a7ec3 100644
--- a/lib/bunny/version.rb
+++ b/lib/bunny/version.rb
@@ -2,5 +2,5 @@
module Bunny
# @return [String] Version of the library
- VERSION = "1.1.0"
+ VERSION = "1.1.2"
end
diff --git a/metadata.yml b/metadata.yml
index 68034a3..7242337 100644
--- a/metadata.yml
+++ b/metadata.yml
@@ -1,7 +1,7 @@
--- !ruby/object:Gem::Specification
name: bunny
version: !ruby/object:Gem::Version
- version: 1.1.0
+ version: 1.1.2
platform: ruby
authors:
- Chris Duncan
@@ -12,7 +12,7 @@ authors:
autorequire:
bindir: bin
cert_chain: []
-date: 2014-01-12 00:00:00.000000000 Z
+date: 2014-02-04 00:00:00.000000000 Z
dependencies:
- !ruby/object:Gem::Dependency
name: amq-protocol
diff --git a/spec/higher_level_api/integration/basic_consume_spec.rb b/spec/higher_level_api/integration/basic_consume_spec.rb
index c3fe938..544df04 100644
--- a/spec/higher_level_api/integration/basic_consume_spec.rb
+++ b/spec/higher_level_api/integration/basic_consume_spec.rb
@@ -222,4 +222,123 @@ describe Bunny::Queue, "#subscribe" do
end
end
+
+ context "with uncaught exceptions in delivery handler" do
+ context "and defined exception handler" do
+ let(:queue_name) { "bunny.basic_consume#{rand}" }
+
+ it "uses exception handler" do
+ caught = nil
+ t = Thread.new do
+ ch = connection.create_channel
+ q = ch.queue(queue_name, :auto_delete => true, :durable => false)
+
+ ch.on_uncaught_exception do |e, consumer|
+ caught = e
+ end
+
+ q.subscribe(:exclusive => false, :manual_ack => false) do |delivery_info, properties, payload|
+ raise RuntimeError.new(queue_name)
+ end
+ end
+ t.abort_on_exception = true
+ sleep 0.5
+
+ ch = connection.create_channel
+ x = ch.default_exchange
+ x.publish("hello", :routing_key => queue_name)
+ sleep 0.5
+
+ caught.message.should == queue_name
+
+ ch.close
+ end
+ end
+
+
+ context "and default exception handler" do
+ let(:queue_name) { "bunny.basic_consume#{rand}" }
+
+ it "uses exception handler" do
+ t = Thread.new do
+ ch = connection.create_channel
+ q = ch.queue(queue_name, :auto_delete => true, :durable => false)
+
+ q.subscribe(:exclusive => false, :manual_ack => false) do |delivery_info, properties, payload|
+ raise RuntimeError.new(queue_name)
+ end
+ end
+ t.abort_on_exception = true
+ sleep 0.5
+
+ ch = connection.create_channel
+ x = ch.default_exchange
+ 5.times { x.publish("hello", :routing_key => queue_name) }
+ sleep 1.5
+
+ ch.close
+ end
+ end
+
+
+ context "with a single consumer" do
+ let(:queue_name) { "bunny.basic_consume#{rand}" }
+
+ it "provides delivery tag access" do
+ delivery_tags = SortedSet.new
+
+ cch = connection.create_channel
+ q = cch.queue(queue_name, :auto_delete => true, :durable => false)
+ q.subscribe(:exclusive => false, :manual_ack => false) do |delivery_info, properties, payload|
+ delivery_tags << delivery_info.delivery_tag
+ end
+ sleep 0.5
+
+ ch = connection.create_channel
+ x = ch.default_exchange
+ 100.times do
+ x.publish("hello", :routing_key => queue_name)
+ end
+
+ sleep 1.0
+ delivery_tags.should == SortedSet.new(Range.new(1, 100).to_a)
+
+ ch.queue(queue_name, :auto_delete => true, :durable => false).message_count.should == 0
+
+ ch.close
+ end
+ end
+
+
+ context "with multiple consumers on the same channel" do
+ let(:queue_name) { "bunny.basic_consume#{rand}" }
+
+ it "provides delivery tag access" do
+ delivery_tags = SortedSet.new
+
+ cch = connection.create_channel
+ q = cch.queue(queue_name, :auto_delete => true, :durable => false)
+
+ 7.times do
+ q.subscribe(:exclusive => false, :manual_ack => false) do |delivery_info, properties, payload|
+ delivery_tags << delivery_info.delivery_tag
+ end
+ end
+ sleep 1.0
+
+ ch = connection.create_channel
+ x = ch.default_exchange
+ 100.times do
+ x.publish("hello", :routing_key => queue_name)
+ end
+
+ sleep 1.5
+ delivery_tags.should == SortedSet.new(Range.new(1, 100).to_a)
+
+ ch.queue(queue_name, :auto_delete => true, :durable => false).message_count.should == 0
+
+ ch.close
+ end
+ end
+ end
end # describe
diff --git a/spec/higher_level_api/integration/exchange_declare_spec.rb b/spec/higher_level_api/integration/exchange_declare_spec.rb
index df647c2..027479f 100644
--- a/spec/higher_level_api/integration/exchange_declare_spec.rb
+++ b/spec/higher_level_api/integration/exchange_declare_spec.rb
@@ -201,4 +201,27 @@ describe Bunny::Exchange do
end
end
end
+
+
+ context "that is internal" do
+ it "can be declared" do
+ ch = connection.create_channel
+ x = ch.fanout("bunny.tests.exchanges.internal", :internal => true)
+ x.should be_internal
+ x.delete
+
+ ch.close
+ end
+ end
+
+ context "not declared as internal" do
+ it "is not internal" do
+ ch = connection.create_channel
+ x = ch.fanout("bunny.tests.exchanges.non-internal")
+ x.should_not be_internal
+ x.delete
+
+ ch.close
+ end
+ end
end
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ruby-extras/ruby-bunny.git
More information about the Pkg-ruby-extras-commits
mailing list