[DRE-commits] [ruby-stomp] 01/04: Imported Upstream version 1.3.1

Jonas Genannt jonas at brachium-system.net
Tue Oct 22 19:05:02 UTC 2013


This is an automated email from the git hooks/post-receive script.

hggh-guest pushed a commit to branch master
in repository ruby-stomp.

commit 8abc7250eb623734e0a49ee3c769c01c6aa07c93
Author: Jonas Genannt <jonas at brachium-system.net>
Date:   Tue Oct 22 20:52:51 2013 +0200

    Imported Upstream version 1.3.1
---
 CHANGELOG.rdoc                     |   14 +
 README.rdoc                        |    4 +
 checksums.yaml.gz                  |  Bin 427 -> 423 bytes
 lib/client/utils.rb                |   90 ++++---
 lib/connection/heartbeats.rb       |   23 +-
 lib/connection/netio.rb            |   10 +-
 lib/connection/utils.rb            |    2 +
 lib/stomp.rb                       |    1 +
 lib/stomp/client.rb                |   44 ++-
 lib/stomp/connection.rb            |   53 ++--
 lib/stomp/errors.rb                |   51 ++++
 lib/stomp/null_logger.rb           |   28 ++
 {examples => lib/stomp}/slogger.rb |   16 +-
 lib/stomp/version.rb               |    4 +-
 metadata.yml                       |   14 +-
 spec/client_spec.rb                |   40 ++-
 spec/connection_spec.rb            |    3 +
 stomp.gemspec                      |   27 +-
 test/test_anonymous.rb             |  523 ++++++++++++++++++++++++++++++++++++
 test/test_helper.rb                |   15 ++
 20 files changed, 839 insertions(+), 123 deletions(-)

diff --git a/CHANGELOG.rdoc b/CHANGELOG.rdoc
index b9711c6..a98c1c3 100644
--- a/CHANGELOG.rdoc
+++ b/CHANGELOG.rdoc
@@ -1,3 +1,17 @@
+== 1.3.1 20131002
+
+* Method calls to the logger object should check for that method first (#83)
+
+== 1.3.0 20130930
+
+* ERROR frames now raise an exception in the Stomp::Client thread(#73, #81)
+* Allow anonymous connections (#75)
+* Fix for subscription id handling in STOMP 1.1 (#78)
+* Added a NullLogger (#77)
+* Added :tcp_nodelay option (disable Nagle's algorithm) (#76)
+* Read receipt ids are now UUIDs
+* Added a :start_timeout parameter
+
 == 1.2.16 20130812
 
 * Stomp::Client's should expose connection's host params
diff --git a/README.rdoc b/README.rdoc
index 34615a3..2f8508d 100644
--- a/README.rdoc
+++ b/README.rdoc
@@ -12,6 +12,8 @@ An implementation of the Stomp protocol for Ruby. See:
 
 See _CHANGELOG.rdoc_ for details.
 
+* Gem version 1.3.1. Bugfix for logging.
+* Gem version 1.3.0. Added ERROR frame raising as exception, added anonymous connections, miscellaneous other fixes.
 * Gem version 1.2.16. Fixed Stomp::Client to expose its connection's host parameters.
 * Gem version 1.2.15. Timeout cleanup, added license info to gemspec.
 * Gem version 1.2.14. Cleanup.
@@ -63,6 +65,8 @@ See _CHANGELOG.rdoc_ for details.
                                           # For fast heartbeat senders.  'fast' == YMMV.  If not
                                           # correct for your environment, expect unnecessary fail overs
       :connread_timeout => 0,             # Timeout during CONNECT for read of CONNECTED/ERROR, secs
+      :tcp_nodelay => true,               # Turns on the TCP_NODELAY socket option; disables Nagle's algorithm
+      :start_timeout => 10,               # Timeout around initialization
     }
 
     # for client
diff --git a/checksums.yaml.gz b/checksums.yaml.gz
index 88fb18b..950688a 100644
Binary files a/checksums.yaml.gz and b/checksums.yaml.gz differ
diff --git a/lib/client/utils.rb b/lib/client/utils.rb
index 925a092..fcc096d 100644
--- a/lib/client/utils.rb
+++ b/lib/client/utils.rb
@@ -71,33 +71,22 @@ module Stomp
       end
     end
 
-    # Register a receipt listener.
-    def register_receipt_listener(listener)
-      id = -1
-      @id_mutex.synchronize do
-        id = @ids.to_s
-        @ids = @ids.succ
+    # Parse a stomp URL.
+    def parse_hosts(url)
+      hosts = []
+      host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/
+      url.scan(host_match).each do |match|
+        host = {}
+        host[:ssl] = match[0] == "+ssl" ? true : false
+        host[:login] =  match[3] || ""
+        host[:passcode] = match[4] || ""
+        host[:host] = match[5]
+        host[:port] = match[6].to_i
+        hosts << host
       end
-      @receipt_listeners[id] = listener
-      id
+      hosts
     end
 
-# Parse a stomp URL.
-def parse_hosts(url)
-  hosts = []
-  host_match = /stomp(\+ssl)?:\/\/#{URL_REPAT}/
-  url.scan(host_match).each do |match|
-    host = {}
-    host[:ssl] = match[0] == "+ssl" ? true : false
-    host[:login] =  match[3] || ""
-    host[:passcode] = match[4] || ""
-    host[:host] = match[5]
-    host[:port] = match[6].to_i
-    hosts << host
-  end
-  hosts
-end
-
     # A very basic check of required arguments.
     def check_arguments!()
       first_host = @parameters && @parameters[:hosts] && @parameters[:hosts].first
@@ -131,35 +120,54 @@ end
         set_subscription_id_if_missing(message.headers['destination'], message.headers)
         subscription_id = message.headers[:id]
       end
-      @listeners[subscription_id]
+
+      listener = @listeners[subscription_id]
+      listener.call(message) if listener
     end
 
-    # Start a single listener thread.  Misnamed I think.
-    def start_listeners()
+    # Register a receipt listener.
+    def register_receipt_listener(listener)
+      id = uuid
+      @receipt_listeners[id] = listener
+      id
+    end
+
+    def find_receipt_listener(message)
+      listener = @receipt_listeners[message.headers['receipt-id']]
+      listener.call(message) if listener
+    end
+
+    def create_listener_maps
       @listeners = {}
       @receipt_listeners = {}
       @replay_messages_by_txn = {}
 
+      @listener_map = Hash.new do |message|
+        @logger.on_miscerr(@connection.log_params, "Received unknown frame type: '#{message.command}'\n")
+      end
+
+      @listener_map[Stomp::CMD_MESSAGE] = lambda {|message| find_listener(message) }
+      @listener_map[Stomp::CMD_RECEIPT] = lambda {|message| find_receipt_listener(message) }
+      @listener_map[Stomp::CMD_ERROR]   = @error_listener
+    end
+
+    # Start a single listener thread.  Misnamed I think.
+    def start_listeners()
+      create_listener_maps
+
       @listener_thread = Thread.start do
-        while true
+        loop do
           message = @connection.receive
           # AMQ specific behavior
           if message.nil? && (!@parameters[:reliable])
             raise Stomp::Error::NilMessageError
           end
-          if message # message can be nil on rapid AMQ stop / start sequences
-          # OK, we have some real data
-            if message.command == Stomp::CMD_MESSAGE
-              if listener = find_listener(message)
-                listener.call(message)
-              end
-            elsif message.command == Stomp::CMD_RECEIPT
-              if listener = @receipt_listeners[message.headers['receipt-id']]
-                listener.call(message)
-              end
-            end
-          end
-        end # while true
+
+          next unless message # message can be nil on rapid AMQ stop/start sequences
+
+          @listener_map[message.command].call(message)
+        end
+
       end
     end # method start_listeners
 
diff --git a/lib/connection/heartbeats.rb b/lib/connection/heartbeats.rb
index 624cb98..3e4acbe 100644
--- a/lib/connection/heartbeats.rb
+++ b/lib/connection/heartbeats.rb
@@ -100,9 +100,11 @@ module Stomp
           sleep(slt)
           next unless @socket # nil under some circumstances ??
           curt = Time.now.to_f
+
           if @logger && @logger.respond_to?(:on_hbfire)
             @logger.on_hbfire(log_params, "send_fire", :curt => curt, :last_sleep => slt)
           end
+
           delta = curt - @ls
           # Be tolerant (minus), and always do this the first time through.
           # Reintroduce logic removed in d922fa.
@@ -111,8 +113,8 @@ module Stomp
             first_time = false
             if @logger && @logger.respond_to?(:on_hbfire)
               @logger.on_hbfire(log_params, "send_heartbeat", :last_sleep => slt,
-                :curt => curt, :last_send => @ls, :delta => delta,
-                :compval => compval)
+                                :curt => curt, :last_send => @ls, :delta => delta,
+                                :compval => compval)
             end
             # Send a heartbeat
             @transmit_semaphore.synchronize do
@@ -126,7 +128,7 @@ module Stomp
                 @hb_sent = false # Set the warning flag
                 if @logger && @logger.respond_to?(:on_hbwrite_fail)
                   @logger.on_hbwrite_fail(log_params, {"ticker_interval" => sleeptime,
-                  "exception" => sendex})
+                                                       "exception" => sendex})
                 end
                 if @hbser
                   raise # Re-raise if user requested this, otherwise ignore
@@ -203,8 +205,9 @@ module Stomp
                   read_fail_count += 1
                   if @logger && @logger.respond_to?(:on_hbread_fail)
                     @logger.on_hbread_fail(log_params, {"ticker_interval" => sleeptime,
-                      "read_fail_count" => read_fail_count, "lock_fail" => false,
-                      "lock_fail_count" => lock_fail_count})
+                                                         "read_fail_count" => read_fail_count,
+                                                         "lock_fail" => false,
+                                                         "lock_fail_count" => lock_fail_count})
                   end
                 end
               else  # try_lock failed
@@ -214,8 +217,9 @@ module Stomp
                 lock_fail_count += 1
                 if @logger && @logger.respond_to?(:on_hbread_fail)
                   @logger.on_hbread_fail(log_params, {"ticker_interval" => sleeptime,
-                    "read_fail_count" => read_fail_count, "lock_fail" => true,
-                    "lock_fail_count" => lock_fail_count})
+                                                      "read_fail_count" => read_fail_count,
+                                                      "lock_fail" => true,
+                                                      "lock_fail_count" => lock_fail_count})
                 end
               end # of the try_lock
 
@@ -227,8 +231,9 @@ module Stomp
           rescue Exception => recvex
             if @logger && @logger.respond_to?(:on_hbread_fail)
               @logger.on_hbread_fail(log_params, {"ticker_interval" => sleeptime,
-                "exception" => recvex, "read_fail_count" => read_fail_count,
-                "lock_fail_count" => lock_fail_count})
+                                                  "exception" => recvex,
+                                                  "read_fail_count" => read_fail_count,
+                                                  "lock_fail_count" => lock_fail_count})
             end
             fail_hard = true
           end
diff --git a/lib/connection/netio.rb b/lib/connection/netio.rb
index 9a4c99c..8a3459e 100644
--- a/lib/connection/netio.rb
+++ b/lib/connection/netio.rb
@@ -343,11 +343,15 @@ module Stomp
       @closed = false
       if @parameters # nil in some rspec tests
         unless @reconnect_delay
-          @reconnect_delay = @parameters[:initial_reconnect_delay] ? @parameters[:initial_reconnect_delay] : 0.01
+          @reconnect_delay = @parameters[:initial_reconnect_delay] || 0.01
         end
       end
       # Use keepalive
       used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
+
+      # TCP_NODELAY option (disables Nagle's algorithm)
+      used_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, !!(@parameters && @parameters[:tcp_nodelay]))
+
       used_socket
     end
 
@@ -355,8 +359,8 @@ module Stomp
     def connect(used_socket)
       @connect_headers = {} unless @connect_headers # Caller said nil/false
       headers = @connect_headers.clone
-      headers[:login] = @login
-      headers[:passcode] = @passcode
+      headers[:login] = @login unless @login.to_s.empty?
+      headers[:passcode] = @passcode unless @login.to_s.empty?
       _pre_connect
       if !@hhas10 && @stompconn
         _transmit(used_socket, Stomp::CMD_STOMP, headers)
diff --git a/lib/connection/utils.rb b/lib/connection/utils.rb
index 624b114..5f6d696 100644
--- a/lib/connection/utils.rb
+++ b/lib/connection/utils.rb
@@ -183,6 +183,7 @@ module Stomp
         :max_hbrlck_fails => 0,
         :fast_hbs_adjust => 0.0,
         :connread_timeout => 0,
+        :tcp_nodelay => true
       }
 
       res_params = default_params.merge(params)
@@ -241,6 +242,7 @@ module Stomp
           else
             $stderr.print errstr
           end
+
           # !!! This initiates a re-connect !!!
           _reconn_prep()
         end
diff --git a/lib/stomp.rb b/lib/stomp.rb
index 9856372..53393d3 100644
--- a/lib/stomp.rb
+++ b/lib/stomp.rb
@@ -24,6 +24,7 @@ require 'stomp/version'         # Stomp#Version#STRING
 require 'stomp/errors'          # All Stomp# exceptions
 require 'stomp/codec'           # Stomp 1.1 codec
 require 'stomp/sslparams'       # Stomp SSL support
+require 'stomp/null_logger'     # A NullLogger class
 
 # Private methods in #Client
 require 'client/utils'          # private Client Utility methods
diff --git a/lib/stomp/client.rb b/lib/stomp/client.rb
index 223c5a6..0368e93 100644
--- a/lib/stomp/client.rb
+++ b/lib/stomp/client.rb
@@ -2,6 +2,7 @@
 
 require 'thread'
 require 'digest/sha1'
+require 'timeout'
 require 'forwardable'
 
 module Stomp
@@ -80,13 +81,31 @@ module Stomp
 
       check_arguments!()
 
-      @id_mutex = Mutex.new()
-      @ids = 1
+      @logger = @parameters[:logger] ||= Stomp::NullLogger.new
 
-      create_connection(autoflush)
+      @start_timeout = @parameters[:start_timeout] || 10
+      Timeout.timeout(@start_timeout, Stomp::Error::StartTimeoutException.new(@start_timeout)) do
+        create_error_handler
+        create_connection(autoflush)
+        start_listeners()
+      end
+    end
+
+    def create_error_handler
+      client_thread = Thread.current
 
-      start_listeners()
+      @error_listener = lambda do |error|
+        exception = case error.body
+                      when /ResourceAllocationException/i
+                        Stomp::Error::ProducerFlowControlException.new(error)
+                      when /ProtocolException/i
+                        Stomp::Error::ProtocolException.new(error)
+                      else
+                        Stomp::Error::BrokerException.new(error)
+                    end
 
+        client_thread.raise exception
+      end
     end
 
     def create_connection(autoflush)
@@ -119,9 +138,7 @@ module Stomp
       replay_list = @replay_messages_by_txn[name]
       if replay_list
         replay_list.each do |message|
-          if listener = find_listener(message)
-            listener.call(message)
-          end
+          find_listener(message) # find_listener also calls the listener
         end
       end
     end
@@ -159,7 +176,7 @@ module Stomp
     # Acknowledge a message, used when a subscription has specified
     # client acknowledgement ( connection.subscribe("/queue/a",{:ack => 'client'}).
     # Accepts a transaction header ( :transaction => 'some_transaction_id' ).
-    def acknowledge(message, headers = {})
+    def ack(message, headers = {})
       txn_id = headers[:transaction]
       if txn_id
         # lets keep around messages ack'd in this transaction in case we rollback
@@ -175,14 +192,20 @@ module Stomp
       end
       if protocol() == Stomp::SPL_12
         @connection.ack(message.headers['ack'], headers)
+      elsif protocol == Stomp::SPL_11
+        headers.merge!(:subscription => message.headers['subscription'])
+        @connection.ack(message.headers['message-id'], headers)
       else
         @connection.ack(message.headers['message-id'], headers)
       end
     end
 
+    # For posterity, we alias:
+    alias acknowledge ack
+
     # Stomp 1.1+ NACK.
-    def nack(message_id, headers = {})
-      @connection.nack(message_id, headers)
+    def nack(message, headers = {})
+      @connection.nack(message, headers)
     end
 
     # Unreceive a message, sending it back to its queue or to the DLQ.
@@ -240,6 +263,7 @@ module Stomp
 
     # set_logger identifies a new callback logger.
     def set_logger(logger)
+      @logger = logger
       @connection.set_logger(logger)
     end
 
diff --git a/lib/stomp/connection.rb b/lib/stomp/connection.rb
index 181d2b8..a203f8b 100644
--- a/lib/stomp/connection.rb
+++ b/lib/stomp/connection.rb
@@ -113,7 +113,7 @@ module Stomp
         @parameters = nil
         @parse_timeout = 5		# To override, use hashed parameters
         @connect_timeout = 0	# To override, use hashed parameters
-        @logger = nil     		# To override, use hashed parameters
+        @logger = Stomp::NullLogger.new	# To override, use hashed parameters
         @autoflush = false    # To override, use hashed parameters or setter
         @closed_check = true  # Run closed check in each protocol method
         @hbser = false        # Raise if heartbeat send exception
@@ -142,14 +142,13 @@ module Stomp
     # hashed_initialize prepares a new connection with a Hash of initialization
     # parameters.
     def hashed_initialize(params)
-
       @parameters = refine_params(params)
       @reliable =  @parameters[:reliable]
       @reconnect_delay = @parameters[:initial_reconnect_delay]
       @connect_headers = @parameters[:connect_headers]
       @parse_timeout =  @parameters[:parse_timeout]
       @connect_timeout =  @parameters[:connect_timeout]
-      @logger =  @parameters[:logger]
+      @logger = @parameters[:logger] || Stomp::NullLogger.new
       @autoflush = @parameters[:autoflush]
       @closed_check = @parameters[:closed_check]
       @hbser = @parameters[:hbser]
@@ -184,9 +183,7 @@ module Stomp
       headers = headers.symbolize_keys
       headers[:transaction] = name
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_begin)
-        @logger.on_begin(log_params, headers)
-      end
+      @logger.on_begin(log_params, headers)
       transmit(Stomp::CMD_BEGIN, headers)
     end
 
@@ -217,9 +214,7 @@ module Stomp
           headers[:'message-id'] = message_id
       end
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_ack)
-        @logger.on_ack(log_params, headers)
-      end
+      @logger.on_ack(log_params, headers)
       transmit(Stomp::CMD_ACK, headers)
     end
 
@@ -243,9 +238,7 @@ module Stomp
           raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
       end
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_nack)
-        @logger.on_nack(log_params, headers)
-      end
+      @logger.on_nack(log_params, headers)
       transmit(Stomp::CMD_NACK, headers)
     end
 
@@ -255,9 +248,7 @@ module Stomp
       headers = headers.symbolize_keys
       headers[:transaction] = name
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_commit)
-        @logger.on_commit(log_params, headers)
-      end
+      @logger.on_commit(log_params, headers)
       transmit(Stomp::CMD_COMMIT, headers)
     end
 
@@ -267,9 +258,7 @@ module Stomp
       headers = headers.symbolize_keys
       headers[:transaction] = name
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_abort)
-        @logger.on_abort(log_params, headers)
-      end
+      @logger.on_abort(log_params, headers)
       transmit(Stomp::CMD_ABORT, headers)
     end
 
@@ -284,9 +273,7 @@ module Stomp
         headers[:id] = subId if headers[:id].nil?
       end
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_subscribe)
-        @logger.on_subscribe(log_params, headers)
-      end
+      @logger.on_subscribe(log_params, headers)
 
       # Store the subscription so that we can replay if we reconnect.
       if @reliable
@@ -309,9 +296,7 @@ module Stomp
         headers[:id] = subId unless headers[:id]
       end
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_unsubscribe)
-        @logger.on_unsubscribe(log_params, headers)
-      end
+      @logger.on_unsubscribe(log_params, headers)
       transmit(Stomp::CMD_UNSUBSCRIBE, headers)
       if @reliable
         subId = dest if subId.nil?
@@ -327,9 +312,7 @@ module Stomp
       headers = headers.symbolize_keys
       headers[:destination] = destination
       _headerCheck(headers)
-      if @logger && @logger.respond_to?(:on_publish)
-        @logger.on_publish(log_params, message, headers)
-      end
+      @logger.on_publish(log_params, message, headers)
       transmit(Stomp::CMD_SEND, headers, message)
     end
 
@@ -392,9 +375,7 @@ module Stomp
       end
       transmit(Stomp::CMD_DISCONNECT, headers)
       @disconnect_receipt = receive if headers[:receipt]
-      if @logger && @logger.respond_to?(:on_disconnect)
-        @logger.on_disconnect(log_params)
-      end
+      @logger.on_disconnect(log_params)
       close_socket
     end
 
@@ -417,11 +398,9 @@ module Stomp
       super_result = __old_receive()
       if super_result.nil? && @reliable && !closed?
         errstr = "connection.receive returning EOF as nil - resetting connection.\n"
-        if @logger && @logger.respond_to?(:on_miscerr)
-          @logger.on_miscerr(log_params, "es_recv: " + errstr)
-        else
-          $stderr.print errstr
-        end
+        @logger.on_miscerr(log_params, "es_recv: " + errstr)
+        $stderr.print errstr
+
         # !!! This initiates a re-connect !!!
         # The call to __old_receive() will in turn call socket().  Before
         # that we should change the target host, otherwise the host that
@@ -438,9 +417,7 @@ module Stomp
         @closed = true
         warn 'warning: broker sent EOF, and connection not reliable' unless defined?(Test)
       end
-      if @logger && @logger.respond_to?(:on_receive)
-        @logger.on_receive(log_params, super_result)
-      end
+      @logger.on_receive(log_params, super_result)
       return super_result
     end
 
diff --git a/lib/stomp/errors.rb b/lib/stomp/errors.rb
index bf1e526..998a8fd 100644
--- a/lib/stomp/errors.rb
+++ b/lib/stomp/errors.rb
@@ -221,6 +221,57 @@ module Stomp
       end
     end
 
+    class StompException < RuntimeError; end
+
+    class BrokerException < StompException
+      attr_reader :headers, :message, :receipt_id, :broker_backtrace
+
+      def initialize(message)
+        @message          = message.headers.delete('message')
+        @receipt_id       = message.headers.delete('receipt-id') || 'no receipt id'
+        @headers          = message.headers
+        @broker_backtrace = message.body
+      end
+    end
+
+    class ProducerFlowControlException < BrokerException
+      attr_reader :producer_id, :dest_name
+
+      def initialize(message)
+        super(message)
+        msg_headers = /.*producer\s+\((.*)\).*to\s+prevent\s+flooding\s+([^\s]*)\.\s+/i.match(@message)
+
+        @producer_id = msg_headers && msg_headers[1]
+        @dest_name   = msg_headers && msg_headers[2]
+      end
+    end
+
+    class ProtocolException < BrokerException
+      def initialize(message)
+        super(message)
+      end
+    end
+
+    class StartTimeoutException < StompException
+      def initialize(timeout)
+        @timeout = timeout
+      end
+
+      def message
+        "Client failed to start in #{@timeout} seconds"
+      end
+    end
+
+    class ReadReceiptTimeoutException < StompException
+      def initialize(timeout)
+        @timeout = timeout
+      end
+
+      def message
+        "Read receipt not received after #{@timeout} seconds"
+      end
+    end
+
   end # module Error
 
 end # module Stomp
diff --git a/lib/stomp/null_logger.rb b/lib/stomp/null_logger.rb
new file mode 100644
index 0000000..1f2d511
--- /dev/null
+++ b/lib/stomp/null_logger.rb
@@ -0,0 +1,28 @@
+module Stomp
+  class NullLogger
+    def on_miscerr(parms, error_msg)
+      $stderr.print parms
+      $stderr.print error_msg
+    end
+
+    def on_connecting(parms); end
+    def on_connected(parms); end
+    def on_connectfail(parms); end
+    def on_disconnect(parms); end
+    def on_subscribe(parms, headers); end
+    def on_unsubscribe(parms, headers); end
+    def on_publish(parms, message, headers); end
+    def on_receive(parms, result); end
+    def on_begin(parms, headers); end
+    def on_ack(parms, headers); end
+    def on_nack(parms, headers); end
+    def on_commit(parms, headers); end
+    def on_abort(parms, headers); end
+    def on_hbread_fail(parms, ticker_data); end
+    def on_hbwrite_fail(parms, ticker_data); end
+    def on_ssl_connecting(parms); end
+    def on_ssl_connected(parms); end
+    def on_ssl_connectfail(parms); end
+    def on_hbfire(parms, srind, curt); end
+  end
+end
diff --git a/examples/slogger.rb b/lib/stomp/slogger.rb
similarity index 98%
rename from examples/slogger.rb
rename to lib/stomp/slogger.rb
index 278a1e5..c623bac 100644
--- a/examples/slogger.rb
+++ b/lib/stomp/slogger.rb
@@ -51,13 +51,25 @@ require 'logger'	# use the standard Ruby logger .....
 # Callback parameters: are a copy of the @parameters instance variable for
 # the Stomp::Connection.
 #
-class Slogger
+class Slogger < Stomp::NullLogger
 
   # Initialize a new callback logger instance.
   def initialize(init_parms = nil)
+    _init
+    @log.info("Logger initialization complete.")
+  end
+
+  def _init
     @log = Logger::new(STDOUT)		# User preference
     @log.level = Logger::DEBUG		# User preference
-    @log.info("Logger initialization complete.")
+  end
+
+  def marshal_dump
+    []
+  end
+
+  def marshal_load(array)
+    _init
   end
 
   # Log connecting events
diff --git a/lib/stomp/version.rb b/lib/stomp/version.rb
index 7070208..2744a05 100644
--- a/lib/stomp/version.rb
+++ b/lib/stomp/version.rb
@@ -5,8 +5,8 @@ module Stomp
   # Define the gem version.
   module Version  #:nodoc: all
     MAJOR = 1
-    MINOR = 2
-    PATCH = 16
+    MINOR = 3
+    PATCH = 1
     STRING = "#{MAJOR}.#{MINOR}.#{PATCH}"
   end
 end
diff --git a/metadata.yml b/metadata.yml
index 88f9646..ca88dc6 100644
--- a/metadata.yml
+++ b/metadata.yml
@@ -1,7 +1,7 @@
 --- !ruby/object:Gem::Specification
 name: stomp
 version: !ruby/object:Gem::Version
-  version: 1.2.16
+  version: 1.3.1
 platform: ruby
 authors:
 - Brian McCallister
@@ -11,7 +11,7 @@ authors:
 autorequire: 
 bindir: bin
 cert_chain: []
-date: 2013-08-19 00:00:00.000000000 Z
+date: 2013-10-02 00:00:00.000000000 Z
 dependencies:
 - !ruby/object:Gem::Dependency
   name: rspec
@@ -55,7 +55,6 @@ extra_rdoc_files:
 - examples/publisher.rb
 - examples/put11conn_ex1.rb
 - examples/putget11_rh1.rb
-- examples/slogger.rb
 - examples/ssl_uc1.rb
 - examples/ssl_uc1_ciphers.rb
 - examples/ssl_uc2.rb
@@ -81,8 +80,11 @@ extra_rdoc_files:
 - lib/stomp/errors.rb
 - lib/stomp/ext/hash.rb
 - lib/stomp/message.rb
+- lib/stomp/null_logger.rb
+- lib/stomp/slogger.rb
 - lib/stomp/sslparams.rb
 - lib/stomp/version.rb
+- test/test_anonymous.rb
 - test/test_client.rb
 - test/test_codec.rb
 - test/test_connection.rb
@@ -112,7 +114,6 @@ files:
 - examples/publisher.rb
 - examples/put11conn_ex1.rb
 - examples/putget11_rh1.rb
-- examples/slogger.rb
 - examples/ssl_uc1.rb
 - examples/ssl_uc1_ciphers.rb
 - examples/ssl_uc2.rb
@@ -138,6 +139,8 @@ files:
 - lib/stomp/errors.rb
 - lib/stomp/ext/hash.rb
 - lib/stomp/message.rb
+- lib/stomp/null_logger.rb
+- lib/stomp/slogger.rb
 - lib/stomp/sslparams.rb
 - lib/stomp/version.rb
 - notes/heartbeat_readme.txt
@@ -147,6 +150,7 @@ files:
 - spec/message_spec.rb
 - spec/spec_helper.rb
 - stomp.gemspec
+- test/test_anonymous.rb
 - test/test_client.rb
 - test/test_codec.rb
 - test/test_connection.rb
@@ -178,7 +182,7 @@ requirements: []
 rubyforge_project: 
 rubygems_version: 2.0.5
 signing_key: 
-specification_version: 3
+specification_version: 4
 summary: Ruby client for the Stomp messaging protocol
 test_files: []
 has_rdoc: 
diff --git a/spec/client_spec.rb b/spec/client_spec.rb
index 5e305e4..33919f8 100644
--- a/spec/client_spec.rb
+++ b/spec/client_spec.rb
@@ -5,8 +5,10 @@ require 'client_shared_examples'
 
 
 describe Stomp::Client do
+  let(:null_logger) { double("mock Stomp::NullLogger") }
 
   before(:each) do
+    Stomp::NullLogger.stub(:new).and_return(null_logger)
     @mock_connection = double('connection', :autoflush= => true)
     Stomp::Connection.stub(:new).and_return(@mock_connection)
   end
@@ -130,6 +132,7 @@ describe Stomp::Client do
                                                               :passcode => 'testpassword',
                                                               :host => 'localhost',
                                                               :port => 12345}],
+                                                  :logger => null_logger,
                                                   :reliable => false)
       Stomp::Client.new('testlogin', 'testpassword', 'localhost', '12345', false)
     end
@@ -139,7 +142,6 @@ describe Stomp::Client do
   end
 
   describe "(created with non-authenticating stomp:// URL and non-TLD host)" do
-
     before(:each) do
       @client = Stomp::Client.new('stomp://foobar:12345')
     end
@@ -149,6 +151,7 @@ describe Stomp::Client do
                                                               :passcode => '',
                                                               :host => 'foobar',
                                                               :port => 12345}],
+                                                  :logger => null_logger,
                                                   :reliable => false)
       Stomp::Client.new('stomp://foobar:12345')
     end
@@ -168,6 +171,7 @@ describe Stomp::Client do
                                                               :passcode => '',
                                                               :host => 'foo-bar',
                                                               :port => 12345}],
+                                                  :logger => null_logger,
                                                   :reliable => false)
       Stomp::Client.new('stomp://foo-bar:12345')
     end
@@ -187,6 +191,7 @@ describe Stomp::Client do
                                                               :passcode => 'testpasscode',
                                                               :host => 'foobar',
                                                               :port => 12345}],
+                                                  :logger => null_logger,
                                                   :reliable => false)
       Stomp::Client.new('stomp://test-login:testpasscode@foobar:12345')
     end
@@ -206,6 +211,7 @@ describe Stomp::Client do
                                                               :passcode => 'testpasscode',
                                                               :host => 'foo-bar',
                                                               :port => 12345}],
+                                                  :logger => null_logger,
                                                   :reliable => false)
       Stomp::Client.new('stomp://test-login:testpasscode@foo-bar:12345')
     end
@@ -228,6 +234,7 @@ describe Stomp::Client do
                                                               :passcode => '',
                                                               :host => 'host.foobar.com',
                                                               :port => 12345}],
+                                                  :logger => null_logger,
                                                   :reliable => false)
       Stomp::Client.new('stomp://host.foobar.com:12345')
     end
@@ -247,6 +254,7 @@ describe Stomp::Client do
                                                               :passcode => 'testpasscode',
                                                               :host => 'host.foobar.com',
                                                               :port => 12345}],
+                                                  :logger => null_logger,
                                                   :reliable => false)
       Stomp::Client.new('stomp://testlogin:testpasscode@host.foobar.com:12345')
     end
@@ -276,6 +284,8 @@ describe Stomp::Client do
         {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
         {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
       ]
+
+      @parameters.merge!({:logger => null_logger})
       
       Stomp::Connection.should_receive(:new).with(@parameters)
       
@@ -292,6 +302,8 @@ describe Stomp::Client do
         {:login => "login3", :passcode => "passcode3", :host => "remotehost2", :port => 61618, :ssl => false}
       ]
       
+      @parameters.merge!({:logger => null_logger})
+      
       Stomp::Connection.should_receive(:new).with(@parameters)
       
       client = Stomp::Client.new(url)
@@ -306,6 +318,8 @@ describe Stomp::Client do
         {:login => "", :passcode => "", :host => "remotehost", :port => 61617, :ssl => false}
       ]
       
+      @parameters.merge!({:logger => null_logger})
+      
       Stomp::Connection.should_receive(:new).with(@parameters)
       
       client = Stomp::Client.new(url)
@@ -320,6 +334,8 @@ describe Stomp::Client do
         {:login => "", :passcode => "", :host => "remotehost", :port => 61617, :ssl => false}
       ]
       
+      @parameters.merge!({:logger => null_logger})
+      
       Stomp::Connection.should_receive(:new).with(@parameters)
       
       client = Stomp::Client.new(url)
@@ -349,6 +365,8 @@ describe Stomp::Client do
         {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
       ]
       
+      @parameters.merge!({:logger => null_logger})
+      
       Stomp::Connection.should_receive(:new).with(@parameters)
       
       client = Stomp::Client.new(url)
@@ -357,4 +375,24 @@ describe Stomp::Client do
     
   end
 
+
+  describe '#error_listener' do
+    context 'on getting a ResourceAllocationException' do
+      let(:message) do
+        message = Stomp::Message.new('')
+        message.body = "javax.jms.ResourceAllocationException: Usage"
+        message.headers = {'message' => %q{message = "Usage Manager Memory Limit reached. Stopping producer (ID:producer) to prevent flooding queue://errors. See } }
+        message.command = Stomp::CMD_ERROR
+        message
+      end
+  
+      it 'should handle ProducerFlowControlException errors by raising' do
+        expect do
+          @client = Stomp::Client.new
+          @error_listener = @client.instance_variable_get(:@error_listener)
+          @error_listener.call(message)
+        end.to raise_exception(Stomp::Error::ProducerFlowControlException)
+      end
+    end
+  end
 end
diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb
index 81cf532..56b8dc1 100644
--- a/spec/connection_spec.rb
+++ b/spec/connection_spec.rb
@@ -29,6 +29,7 @@ describe Stomp::Connection do
       :max_hbrlck_fails => 0,
       :fast_hbs_adjust => 0.0,
       :connread_timeout => 0,
+      :tcp_nodelay => true,
    }
         
     #POG:
@@ -353,6 +354,7 @@ describe Stomp::Connection do
           :max_hbrlck_fails => 0,
           :fast_hbs_adjust => 0.0,
           :connread_timeout => 0,
+          :tcp_nodelay => true,
         }
         
         used_hash =  {
@@ -393,6 +395,7 @@ describe Stomp::Connection do
           :max_hbrlck_fails => 456,
           :fast_hbs_adjust => 0.2,
           :connread_timeout => 42,
+          :tcp_nodelay => true,
         }
         
         @connection = Stomp::Connection.new(used_hash)
diff --git a/stomp.gemspec b/stomp.gemspec
index 7a866eb..d82369e 100644
--- a/stomp.gemspec
+++ b/stomp.gemspec
@@ -4,14 +4,13 @@
 # -*- encoding: utf-8 -*-
 
 Gem::Specification.new do |s|
-  s.name = %q{stomp}
-  s.version = "1.2.16"
-  s.license = "Apache 2.0"
+  s.name = "stomp"
+  s.version = "1.3.1"
 
   s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
   s.authors = ["Brian McCallister", "Marius Mathiesen", "Thiago Morello", "Guy M. Allard"]
-  s.date = %q{2013-08-19}
-  s.description = %q{Ruby client for the Stomp messaging protocol.  Note that this gem is no longer supported on rubyforge.}
+  s.date = "2013-10-02"
+  s.description = "Ruby client for the Stomp messaging protocol.  Note that this gem is no longer supported on rubyforge."
   s.email = ["brianm at apache.org", "marius at stones.com", "morellon at gmail.com", "allard.guy.m at gmail.com"]
   s.executables = ["catstomp", "stompcat"]
   s.extra_rdoc_files = [
@@ -31,7 +30,6 @@ Gem::Specification.new do |s|
     "examples/publisher.rb",
     "examples/put11conn_ex1.rb",
     "examples/putget11_rh1.rb",
-    "examples/slogger.rb",
     "examples/ssl_uc1.rb",
     "examples/ssl_uc1_ciphers.rb",
     "examples/ssl_uc2.rb",
@@ -57,8 +55,11 @@ Gem::Specification.new do |s|
     "lib/stomp/errors.rb",
     "lib/stomp/ext/hash.rb",
     "lib/stomp/message.rb",
+    "lib/stomp/null_logger.rb",
+    "lib/stomp/slogger.rb",
     "lib/stomp/sslparams.rb",
     "lib/stomp/version.rb",
+    "test/test_anonymous.rb",
     "test/test_client.rb",
     "test/test_codec.rb",
     "test/test_connection.rb",
@@ -89,7 +90,6 @@ Gem::Specification.new do |s|
     "examples/publisher.rb",
     "examples/put11conn_ex1.rb",
     "examples/putget11_rh1.rb",
-    "examples/slogger.rb",
     "examples/ssl_uc1.rb",
     "examples/ssl_uc1_ciphers.rb",
     "examples/ssl_uc2.rb",
@@ -115,6 +115,8 @@ Gem::Specification.new do |s|
     "lib/stomp/errors.rb",
     "lib/stomp/ext/hash.rb",
     "lib/stomp/message.rb",
+    "lib/stomp/null_logger.rb",
+    "lib/stomp/slogger.rb",
     "lib/stomp/sslparams.rb",
     "lib/stomp/version.rb",
     "notes/heartbeat_readme.txt",
@@ -124,6 +126,7 @@ Gem::Specification.new do |s|
     "spec/message_spec.rb",
     "spec/spec_helper.rb",
     "stomp.gemspec",
+    "test/test_anonymous.rb",
     "test/test_client.rb",
     "test/test_codec.rb",
     "test/test_connection.rb",
@@ -134,14 +137,14 @@ Gem::Specification.new do |s|
     "test/test_urlogin.rb",
     "test/tlogger.rb"
   ]
-  s.homepage = %q{https://github.com/stompgem/stomp}
+  s.homepage = "https://github.com/stompgem/stomp"
+  s.licenses = ["Apache 2.0"]
   s.require_paths = ["lib"]
-  s.rubygems_version = %q{1.3.7}
-  s.summary = %q{Ruby client for the Stomp messaging protocol}
+  s.rubygems_version = "2.0.5"
+  s.summary = "Ruby client for the Stomp messaging protocol"
 
   if s.respond_to? :specification_version then
-    current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
-    s.specification_version = 3
+    s.specification_version = 4
 
     if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
       s.add_development_dependency(%q<rspec>, [">= 2.3"])
diff --git a/test/test_anonymous.rb b/test/test_anonymous.rb
new file mode 100644
index 0000000..003f1d6
--- /dev/null
+++ b/test/test_anonymous.rb
@@ -0,0 +1,523 @@
+# -*- encoding: utf-8 -*-
+
+if Kernel.respond_to?(:require_relative)
+  require_relative("test_helper")
+else
+  $:.unshift(File.dirname(__FILE__))
+  require 'test_helper'
+end
+
+=begin
+
+  Main class for testing Stomp::Connection instances.
+
+=end
+class TestConnection < Test::Unit::TestCase
+  include TestBase
+
+  def setup
+    @conn = get_anonymous_connection()
+    # Data for multi_thread tests
+    @max_threads = 20
+    @max_msgs = 100
+  end
+
+  def teardown
+    @conn.disconnect if @conn.open? # allow tests to disconnect
+  end
+
+  # Test basic connection creation.
+  def test_connection_exists
+    assert_not_nil @conn
+  end
+
+  # Test asynchronous polling.
+  def test_poll_async
+    @conn.subscribe("/queue/do.not.put.messages.on.this.queue", :id => "a.no.messages.queue")
+    # If the test 'hangs' here, Connection#poll is broken.
+    m = @conn.poll
+    assert m.nil?
+  end
+
+  # Test suppression of content length header.
+  def test_no_length
+    conn_subscribe make_destination
+    #
+    @conn.publish make_destination, "test_stomp#test_no_length",
+                  { :suppress_content_length => true }
+    msg = @conn.receive
+    assert_equal "test_stomp#test_no_length", msg.body
+    #
+    @conn.publish make_destination, "test_stomp#test_\000_length",
+                  { :suppress_content_length => true }
+    msg2 = @conn.receive
+    assert_equal "test_stomp#test_", msg2.body
+    checkEmsg(@conn)
+  end unless ENV['STOMP_RABBIT']
+
+  # Test direct / explicit receive.
+  def test_explicit_receive
+    conn_subscribe make_destination
+    @conn.publish make_destination, "test_stomp#test_explicit_receive"
+    msg = @conn.receive
+    assert_equal "test_stomp#test_explicit_receive", msg.body
+  end
+
+  # Test asking for a receipt.
+  def test_receipt
+    conn_subscribe make_destination, :receipt => "abc"
+    msg = @conn.receive
+    assert_equal "abc", msg.headers['receipt-id']
+    checkEmsg(@conn)
+  end
+
+  # Test asking for a receipt on disconnect.
+  def test_disconnect_receipt
+    @conn.disconnect :receipt => "abc123"
+    assert_nothing_raised {
+      assert_not_nil(@conn.disconnect_receipt, "should have a receipt")
+      assert_equal(@conn.disconnect_receipt.headers['receipt-id'],
+                   "abc123", "receipt sent and received should match")
+    }
+  end
+
+  # Test ACKs for Stomp 1.0
+  def test_client_ack_with_symbol_10
+    if @conn.protocol != Stomp::SPL_10
+      assert true
+      return
+    end
+    queue = make_destination()
+    @conn.subscribe queue, :ack => :client
+    @conn.publish queue, "test_stomp#test_client_ack_with_symbol_10"
+    msg = @conn.receive
+    assert_nothing_raised {
+      # ACK has one required header, message-id, which must contain a value 
+      # matching the message-id for the MESSAGE being acknowledged.
+      @conn.ack msg.headers['message-id']
+    }
+    checkEmsg(@conn)
+  end
+
+  # Test ACKs for Stomp 1.1
+  def test_client_ack_with_symbol_11
+    if @conn.protocol != Stomp::SPL_11
+      assert true
+      return
+    end
+    sid = @conn.uuid()
+    queue = make_destination()
+    @conn.subscribe queue, :ack => :client, :id => sid
+    @conn.publish queue, "test_stomp#test_client_ack_with_symbol_11"
+    msg = @conn.receive
+    assert_nothing_raised {
+      # ACK has two REQUIRED headers: message-id, which MUST contain a value 
+      # matching the message-id for the MESSAGE being acknowledged and 
+      # subscription, which MUST be set to match the value of the subscription's 
+      # id header.
+      @conn.ack msg.headers['message-id'], :subscription => msg.headers['subscription']
+    }
+    checkEmsg(@conn)
+  end
+
+  # Test ACKs for Stomp 1.2
+  def test_client_ack_with_symbol_12
+    if @conn.protocol != Stomp::SPL_12
+      assert true
+      return
+    end
+    sid = @conn.uuid()
+    queue = make_destination()
+    @conn.subscribe queue, :ack => :client, :id => sid
+    @conn.publish queue, "test_stomp#test_client_ack_with_symbol_11"
+    msg = @conn.receive
+    assert_nothing_raised {
+      # The ACK frame MUST include an id header matching the ack header 
+      # of the MESSAGE being acknowledged.
+      @conn.ack msg.headers['ack']
+    }
+    checkEmsg(@conn)
+  end
+
+  # Test a message with 0x00 embedded in the body.
+  def test_embedded_null
+    conn_subscribe make_destination
+    @conn.publish make_destination, "a\0"
+    msg = @conn.receive
+    assert_equal "a\0" , msg.body
+    checkEmsg(@conn)
+  end
+
+  # Test connection open checking.
+  def test_connection_open?
+    assert_equal true , @conn.open?
+    @conn.disconnect
+    assert_equal false, @conn.open?
+  end
+
+  # Test connection closed checking.
+  def test_connection_closed?
+    assert_equal false, @conn.closed?
+    @conn.disconnect
+    assert_equal true, @conn.closed?
+  end
+
+  # Test that methods detect a closed connection.
+  def test_closed_checks_conn
+    @conn.disconnect
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.ack("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.begin("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.commit("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.abort("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      conn_subscribe("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.unsubscribe("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.publish("dummy_data","dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.unreceive("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      @conn.disconnect("dummy_data")
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      m = @conn.receive
+    end
+    #
+    assert_raise Stomp::Error::NoCurrentConnection do
+      m = @conn.poll
+    end
+  end
+
+  # Test that we receive a Stomp::Message.
+  def test_response_is_instance_of_message_class
+    conn_subscribe make_destination
+    @conn.publish make_destination, "a\0"
+    msg = @conn.receive
+    assert_instance_of Stomp::Message , msg
+    checkEmsg(@conn)
+  end
+
+  # Test converting a Message to a string.
+  def test_message_to_s
+    conn_subscribe make_destination
+    @conn.publish make_destination, "a\0"
+    msg = @conn.receive
+    assert_match /^<Stomp::Message headers=/ , msg.to_s
+    checkEmsg(@conn)
+  end
+
+  # Test that a connection frame is present.
+  def test_connection_frame
+    assert_not_nil @conn.connection_frame
+  end
+
+  # Test messages with multiple line ends.
+  def test_messages_with_multipleLine_ends
+    conn_subscribe make_destination
+    @conn.publish make_destination, "a\n\n"
+    @conn.publish make_destination, "b\n\na\n\n"
+
+    msg_a = @conn.receive
+    msg_b = @conn.receive
+
+    assert_equal "a\n\n", msg_a.body
+    assert_equal "b\n\na\n\n", msg_b.body
+    checkEmsg(@conn)
+  end
+
+  # Test publishing multiple messages.
+  def test_publish_two_messages
+    conn_subscribe make_destination
+    @conn.publish make_destination, "a\0"
+    @conn.publish make_destination, "b\0"
+    msg_a = @conn.receive
+    msg_b = @conn.receive
+
+    assert_equal "a\0", msg_a.body
+    assert_equal "b\0", msg_b.body
+    checkEmsg(@conn)
+  end
+
+  def test_thread_hang_one
+    received = nil
+    Thread.new(@conn) do |amq|
+      while true
+        received = amq.receive
+      end
+    end
+    #
+    conn_subscribe( make_destination )
+    message = Time.now.to_s
+    @conn.publish(make_destination, message)
+    sleep 1
+    assert_not_nil received
+    assert_equal message, received.body
+    checkEmsg(@conn)
+  end
+
+  # Test polling with a single thread.
+  def test_thread_poll_one
+    received = nil
+    max_sleep = (RUBY_VERSION =~ /1\.8/) ? 10 : 1
+    Thread.new(@conn) do |amq|
+      while true
+        received = amq.poll
+        # One message is needed
+        Thread.exit if received
+        sleep max_sleep
+      end
+    end
+    #
+    conn_subscribe( make_destination )
+    message = Time.now.to_s
+    @conn.publish(make_destination, message)
+    sleep max_sleep+1
+    assert_not_nil received
+    assert_equal message, received.body
+    checkEmsg(@conn)
+  end
+
+  # Test receiving with multiple threads.
+  def test_multi_thread_receive
+    lock = Mutex.new
+    msg_ctr = 0
+    dest = make_destination
+    #
+    1.upto(@max_threads) do |tnum|
+      Thread.new(@conn) do |amq|
+        while true
+          received = amq.receive
+          lock.synchronize do
+            msg_ctr += 1
+          end
+          # Simulate message processing
+          sleep 0.05
+        end
+      end
+    end
+    #
+    conn_subscribe( dest )
+    1.upto(@max_msgs) do |mnum|
+      msg = Time.now.to_s + " #{mnum}"
+      @conn.publish(dest, msg)
+    end
+    #
+    max_sleep = (RUBY_VERSION =~ /1\.8/) ? 30 : 5
+    max_sleep = 30 if RUBY_ENGINE =~ /mingw/
+    sleep_incr = 0.10
+    total_slept = 0
+    while true
+      break if @max_msgs == msg_ctr
+      total_slept += sleep_incr
+      break if total_slept > max_sleep
+      sleep sleep_incr
+    end
+    assert_equal @max_msgs, msg_ctr
+    checkEmsg(@conn)
+  end unless RUBY_ENGINE =~ /jruby/
+
+  # Test polling with multiple threads.
+  def test_multi_thread_poll
+    #
+    lock = Mutex.new
+    msg_ctr = 0
+    dest = make_destination
+    #
+    1.upto(@max_threads) do |tnum|
+      Thread.new(@conn) do |amq|
+        while true
+          received = amq.poll
+          if received
+            lock.synchronize do
+              msg_ctr += 1
+            end
+            # Simulate message processing
+            sleep 0.05
+          else
+            # Wait a bit for more work
+            sleep 0.05
+          end
+        end
+      end
+    end
+    #
+    conn_subscribe( dest )
+    1.upto(@max_msgs) do |mnum|
+      msg = Time.now.to_s + " #{mnum}"
+      @conn.publish(dest, msg)
+    end
+    #
+    max_sleep = (RUBY_VERSION =~ /1\.8\.6/) ? 30 : 5
+    max_sleep = 30 if RUBY_ENGINE =~ /mingw/
+    sleep_incr = 0.10
+    total_slept = 0
+    while true
+      break if @max_msgs == msg_ctr
+      total_slept += sleep_incr
+      break if total_slept > max_sleep
+      sleep sleep_incr
+    end
+    assert_equal @max_msgs, msg_ctr
+    checkEmsg(@conn)
+  end unless RUBY_ENGINE =~ /jruby/
+
+  # Test using a nil body.
+  def test_nil_body
+    dest = make_destination
+    assert_nothing_raised {
+      @conn.publish dest, nil
+    }
+    conn_subscribe dest
+    msg = @conn.receive
+    assert_equal "", msg.body
+    checkEmsg(@conn)
+  end
+
+  # Test transaction message sequencing.
+  def test_transaction
+    conn_subscribe make_destination
+
+    @conn.begin "txA"
+    @conn.publish make_destination, "txn message", 'transaction' => "txA"
+
+    @conn.publish make_destination, "first message"
+
+    msg = @conn.receive
+    assert_equal "first message", msg.body
+
+    @conn.commit "txA"
+    msg = @conn.receive
+    assert_equal "txn message", msg.body
+    checkEmsg(@conn)
+  end
+
+  # Test duplicate subscriptions.
+  def test_duplicate_subscription
+    @conn.disconnect # not reliable
+    @conn = Stomp::Connection.open(nil, nil, host, port, true, nil, nil) # reliable
+    dest = make_destination
+    conn_subscribe dest
+                     #
+    assert_raise Stomp::Error::DuplicateSubscription do
+      conn_subscribe dest
+    end
+    checkEmsg(@conn)
+  end
+
+  # Test nil 1.1 connection parameters.
+  def test_nil_connparms
+    @conn.disconnect
+    #
+    assert_nothing_raised do
+      @conn = Stomp::Connection.open(nil, nil, host, port, false, 5, nil)
+    end
+    checkEmsg(@conn)
+  end
+
+  # Basic NAK test.
+  def test_nack11p_0010
+    if @conn.protocol == Stomp::SPL_10
+      assert_raise Stomp::Error::UnsupportedProtocolError do
+        @conn.nack "dummy msg-id"
+      end
+    else
+      dest = make_destination
+      smsg = "test_stomp#test_nack01: #{Time.now.to_f}"
+      @conn.publish dest, smsg
+      #
+      sid = @conn.uuid()
+      @conn.subscribe dest, :ack => :client, :id => sid
+      msg = @conn.receive
+      assert_equal smsg, msg.body
+      case @conn.protocol
+        when Stomp::SPL_12
+          assert_nothing_raised {
+            @conn.nack msg.headers["ack"]
+            sleep 0.05 # Give racy brokers a chance to handle the last nack before unsubscribe
+            @conn.unsubscribe dest, :id => sid
+          }
+        else # Stomp::SPL_11
+          assert_nothing_raised {
+            @conn.nack msg.headers["message-id"], :subscription => sid
+            sleep 0.05 # Give racy brokers a chance to handle the last nack before unsubscribe
+            @conn.unsubscribe dest, :id => sid
+          }
+      end
+
+      # phase 2
+      teardown()
+      setup()
+      sid = @conn.uuid()
+      @conn.subscribe dest, :ack => :auto, :id => sid
+      msg2 = @conn.receive
+      assert_equal smsg, msg2.body
+      checkEmsg(@conn)
+    end
+  end unless ENV['STOMP_AMQ11'] # AMQ sends NACK'd messages to a DLQ
+
+  # Test to illustrate Issue #44.  Prior to a fix for #44, these tests would
+  # fail only when connecting to a pure STOMP 1.0 server that does not 
+  # return a 'version' header at all.
+  def test_conn10_simple
+    @conn.disconnect
+    #
+    vhost = ENV['STOMP_RABBIT'] ? "/" : host
+    hash = { :hosts => [
+        {:host => host, :port => port, :ssl => false},
+    ],
+             :connect_headers => {"accept-version" => "1.0", "host" => vhost},
+             :reliable => false,
+    }
+    c = nil
+    assert_nothing_raised {
+      c = Stomp::Connection.new(hash)
+    }
+    c.disconnect if c
+    #
+    hash = { :hosts => [
+        {:host => host, :port => port, :ssl => false},
+    ],
+             :connect_headers => {"accept-version" => "3.14159,1.0,12.0", "host" => vhost},
+             :reliable => false,
+    }
+    c = nil
+    assert_nothing_raised {
+      c = Stomp::Connection.new(hash)
+    }
+    c.disconnect if c
+  end
+
+  # test JRuby detection
+  def test_jruby_presence
+    if defined?(RUBY_ENGINE) && RUBY_ENGINE =~ /jruby/
+      assert @conn.jruby
+    else
+      assert !@conn.jruby
+    end
+  end
+
+end
+
diff --git a/test/test_helper.rb b/test/test_helper.rb
index f1ee9df..9a22747 100644
--- a/test/test_helper.rb
+++ b/test/test_helper.rb
@@ -81,6 +81,21 @@ module TestBase
     conn
   end
 
+  # Get a Stomp Anonymous Connection.
+  def get_anonymous_connection()
+    ch = get_conn_headers()
+    hash = { :hosts => [
+        {:host => host, :port => port, :ssl => nil},
+    ],
+             :reliable => false,
+             :connect_headers => ch,
+             :stompconn => get_stomp_conn(),
+             :usecrlf => get_crlf(),
+    }
+    conn = Stomp::Connection.open(hash)
+    conn
+  end
+
   # Get a Stomp SSL Connection.
   def get_ssl_connection()
     ch = get_conn_headers()

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-ruby-extras/ruby-stomp.git



More information about the Pkg-ruby-extras-commits mailing list