[PKG-Openstack-devel] Bug#903031: python-kafka: Misc fixes for Python 3.7 compatibility

James Page james.page at ubuntu.com
Thu Jul 5 11:55:45 BST 2018


Package: python-kafka
Version: 1.3.3-3
Severity: normal
Tags: patch
User: ubuntu-devel at lists.ubuntu.com
Usertags: origin-ubuntu cosmic ubuntu-patch

Dear Maintainer,

In Ubuntu, the attached patch was applied to achieve the following:

  * d/p/py37-compat.patch: Python 3.7 compatibility fixes.

The patch was picked from the upstream VCS.

Thanks for considering the patch.


-- System Information:
Debian Release: buster/sid
  APT prefers cosmic
  APT policy: (500, 'cosmic')
Architecture: amd64 (x86_64)
Foreign Architectures: i386

Kernel: Linux 4.15.0-23-generic (SMP w/4 CPU cores)
Locale: LANG=en_GB.UTF-8, LC_CTYPE=en_GB.UTF-8 (charmap=UTF-8), LANGUAGE=en_GB:en (charmap=UTF-8)
Shell: /bin/sh linked to /bin/dash
Init: systemd (via /run/systemd/system)
LSM: AppArmor: enabled
-------------- next part --------------
diff -Nru python-kafka-1.3.3/debian/patches/1 python-kafka-1.3.3/debian/patches/1
--- python-kafka-1.3.3/debian/patches/1	1969-12-31 18:00:00.000000000 -0600
+++ python-kafka-1.3.3/debian/patches/1	2018-07-05 05:26:10.000000000 -0500
@@ -0,0 +1,318 @@
+From b62006aeb86258b4b1ef2735bebb1fe99459b82d Mon Sep 17 00:00:00 2001
+From: Dana Powers <dana.powers at gmail.com>
+Date: Fri, 23 Mar 2018 05:58:55 -0700
+Subject: [PATCH] Change SimpleProducer to use async_send (async is reserved in
+ py37) (#1454)
+
+---
+ docs/simple.rst                   |  8 ++++----
+ kafka/producer/base.py            | 38 +++++++++++++++++++++++---------------
+ kafka/producer/keyed.py           |  2 +-
+ kafka/producer/simple.py          |  2 +-
+ test/test_failover_integration.py |  8 ++++----
+ test/test_producer_integration.py |  8 ++++----
+ test/test_producer_legacy.py      | 10 +++++-----
+ 7 files changed, 42 insertions(+), 34 deletions(-)
+
+diff --git a/docs/simple.rst b/docs/simple.rst
+index 8192a8b7..afdb9756 100644
+--- a/docs/simple.rst
++++ b/docs/simple.rst
+@@ -49,7 +49,7 @@ Asynchronous Mode
+ 
+     # To send messages asynchronously
+     client = SimpleClient('localhost:9092')
+-    producer = SimpleProducer(client, async=True)
++    producer = SimpleProducer(client, async_send=True)
+     producer.send_messages('my-topic', b'async message')
+ 
+     # To send messages in batch. You can use any of the available
+@@ -60,7 +60,7 @@ Asynchronous Mode
+     # * If the producer dies before the messages are sent, there will be losses
+     # * Call producer.stop() to send the messages and cleanup
+     producer = SimpleProducer(client,
+-                              async=True,
++                              async_send=True,
+                               batch_send_every_n=20,
+                               batch_send_every_t=60)
+ 
+@@ -73,7 +73,7 @@ Synchronous Mode
+ 
+     # To send messages synchronously
+     client = SimpleClient('localhost:9092')
+-    producer = SimpleProducer(client, async=False)
++    producer = SimpleProducer(client, async_send=False)
+ 
+     # Note that the application is responsible for encoding messages to type bytes
+     producer.send_messages('my-topic', b'some message')
+@@ -88,7 +88,7 @@ Synchronous Mode
+     # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
+     #                            by all in sync replicas before sending a response
+     producer = SimpleProducer(client,
+-                              async=False,
++                              async_send=False,
+                               req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+                               ack_timeout=2000,
+                               sync_fail_on_error=False)
+diff --git a/kafka/producer/base.py b/kafka/producer/base.py
+index c038bd3a..e8d6c3d2 100644
+--- a/kafka/producer/base.py
++++ b/kafka/producer/base.py
+@@ -226,7 +226,7 @@ class Producer(object):
+ 
+     Arguments:
+         client (kafka.SimpleClient): instance to use for broker
+-            communications. If async=True, the background thread will use
++            communications. If async_send=True, the background thread will use
+             :meth:`client.copy`, which is expected to return a thread-safe
+             object.
+         codec (kafka.protocol.ALL_CODECS): compression codec to use.
+@@ -238,11 +238,11 @@ class Producer(object):
+         sync_fail_on_error (bool, optional): whether sync producer should
+             raise exceptions (True), or just return errors (False),
+             defaults to True.
+-        async (bool, optional): send message using a background thread,
++        async_send (bool, optional): send message using a background thread,
+             defaults to False.
+-        batch_send_every_n (int, optional): If async is True, messages are
++        batch_send_every_n (int, optional): If async_send is True, messages are
+             sent in batches of this size, defaults to 20.
+-        batch_send_every_t (int or float, optional): If async is True,
++        batch_send_every_t (int or float, optional): If async_send is True,
+             messages are sent immediately after this timeout in seconds, even
+             if there are fewer than batch_send_every_n, defaults to 20.
+         async_retry_limit (int, optional): number of retries for failed messages
+@@ -268,8 +268,10 @@ class Producer(object):
+             defaults to 30.
+ 
+     Deprecated Arguments:
++        async (bool, optional): send message using a background thread,
++            defaults to False. Deprecated, use 'async_send'
+         batch_send (bool, optional): If True, messages are sent by a background
+-            thread in batches, defaults to False. Deprecated, use 'async'
++            thread in batches, defaults to False. Deprecated, use 'async_send'
+     """
+     ACK_NOT_REQUIRED = 0            # No ack is required
+     ACK_AFTER_LOCAL_WRITE = 1       # Send response after it is written to log
+@@ -282,8 +284,8 @@ def __init__(self, client,
+                  codec=None,
+                  codec_compresslevel=None,
+                  sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
+-                 async=False,
+-                 batch_send=False,  # deprecated, use async
++                 async_send=False,
++                 batch_send=False,  # deprecated, use async_send
+                  batch_send_every_n=BATCH_SEND_MSG_COUNT,
+                  batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+                  async_retry_limit=ASYNC_RETRY_LIMIT,
+@@ -292,15 +294,21 @@ def __init__(self, client,
+                  async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+                  async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
+                  async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+-                 async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
++                 async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
++                 **kwargs):
++
++        # async renamed async_send for python3.7 support
++        if 'async' in kwargs:
++            log.warning('Deprecated async option found -- use async_send')
++            async_send = kwargs['async']
+ 
+-        if async:
++        if async_send:
+             assert batch_send_every_n > 0
+             assert batch_send_every_t > 0
+             assert async_queue_maxsize >= 0
+ 
+         self.client = client
+-        self.async = async
++        self.async_send = async_send
+         self.req_acks = req_acks
+         self.ack_timeout = ack_timeout
+         self.stopped = False
+@@ -313,7 +321,7 @@ def __init__(self, client,
+         self.codec = codec
+         self.codec_compresslevel = codec_compresslevel
+ 
+-        if self.async:
++        if self.async_send:
+             # Messages are sent through this queue
+             self.queue = Queue(async_queue_maxsize)
+             self.async_queue_put_timeout = async_queue_put_timeout
+@@ -400,7 +408,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
+         if key is not None and not isinstance(key, six.binary_type):
+             raise TypeError("the key must be type bytes")
+ 
+-        if self.async:
++        if self.async_send:
+             for idx, m in enumerate(msg):
+                 try:
+                     item = (TopicPartition(topic, partition), m, key)
+@@ -435,7 +443,7 @@ def stop(self, timeout=None):
+             log.warning('timeout argument to stop() is deprecated - '
+                         'it will be removed in future release')
+ 
+-        if not self.async:
++        if not self.async_send:
+             log.warning('producer.stop() called, but producer is not async')
+             return
+ 
+@@ -443,7 +451,7 @@ def stop(self, timeout=None):
+             log.warning('producer.stop() called, but producer is already stopped')
+             return
+ 
+-        if self.async:
++        if self.async_send:
+             self.queue.put((STOP_ASYNC_PRODUCER, None, None))
+             self.thread_stop_event.set()
+             self.thread.join()
+@@ -471,5 +479,5 @@ def stop(self, timeout=None):
+         self.stopped = True
+ 
+     def __del__(self):
+-        if self.async and not self.stopped:
++        if self.async_send and not self.stopped:
+             self.stop()
+diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
+index 8de3ad80..62bb733f 100644
+--- a/kafka/producer/keyed.py
++++ b/kafka/producer/keyed.py
+@@ -46,4 +46,4 @@ def send(self, topic, key, msg):
+         return self.send_messages(topic, key, msg)
+ 
+     def __repr__(self):
+-        return '<KeyedProducer batch=%s>' % self.async
++        return '<KeyedProducer batch=%s>' % self.async_send
+diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
+index 589363c9..91e0abc4 100644
+--- a/kafka/producer/simple.py
++++ b/kafka/producer/simple.py
+@@ -51,4 +51,4 @@ def send_messages(self, topic, *msg):
+         )
+ 
+     def __repr__(self):
+-        return '<SimpleProducer batch=%s>' % self.async
++        return '<SimpleProducer batch=%s>' % self.async_send
+diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
+index 8531cfbe..797e1c8e 100644
+--- a/test/test_failover_integration.py
++++ b/test/test_failover_integration.py
+@@ -60,7 +60,7 @@ def test_switch_leader(self):
+         # require that the server commit messages to all in-sync replicas
+         # so that failover doesn't lose any messages on server-side
+         # and we can assert that server-side message count equals client-side
+-        producer = Producer(self.client, async=False,
++        producer = Producer(self.client, async_send=False,
+                             req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
+ 
+         # Send 100 random messages to a specific partition
+@@ -101,7 +101,7 @@ def test_switch_leader_async(self):
+         partition = 0
+ 
+         # Test the base class Producer -- send_messages to a specific partition
+-        producer = Producer(self.client, async=True,
++        producer = Producer(self.client, async_send=True,
+                             batch_send_every_n=15,
+                             batch_send_every_t=3,
+                             req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
+@@ -146,7 +146,7 @@ def test_switch_leader_async(self):
+     def test_switch_leader_keyed_producer(self):
+         topic = self.topic
+ 
+-        producer = KeyedProducer(self.client, async=False)
++        producer = KeyedProducer(self.client, async_send=False)
+ 
+         # Send 10 random messages
+         for _ in range(10):
+@@ -182,7 +182,7 @@ def test_switch_leader_keyed_producer(self):
+             producer.send_messages(topic, key, msg)
+ 
+     def test_switch_leader_simple_consumer(self):
+-        producer = Producer(self.client, async=False)
++        producer = Producer(self.client, async_send=False)
+         consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
+         self._send_random_messages(producer, self.topic, 0, 2)
+         consumer.get_messages()
+diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
+index 6cd3d13a..2b810476 100644
+--- a/test/test_producer_integration.py
++++ b/test/test_producer_integration.py
+@@ -216,7 +216,7 @@ def test_async_simple_producer(self):
+         partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+         start_offset = self.current_offset(self.topic, partition)
+ 
+-        producer = SimpleProducer(self.client, async=True, random_start=False)
++        producer = SimpleProducer(self.client, async_send=True, random_start=False)
+         resp = producer.send_messages(self.topic, self.msg("one"))
+         self.assertEqual(len(resp), 0)
+ 
+@@ -235,7 +235,7 @@ def test_batched_simple_producer__triggers_by_message(self):
+         batch_interval = 5
+         producer = SimpleProducer(
+             self.client,
+-            async=True,
++            async_send=True,
+             batch_send_every_n=batch_messages,
+             batch_send_every_t=batch_interval,
+             random_start=False)
+@@ -300,7 +300,7 @@ def test_batched_simple_producer__triggers_by_time(self):
+         batch_interval = 5
+         producer = SimpleProducer(
+             self.client,
+-            async=True,
++            async_send=True,
+             batch_send_every_n=100,
+             batch_send_every_t=batch_interval,
+             random_start=False)
+@@ -432,7 +432,7 @@ def test_async_keyed_producer(self):
+ 
+         producer = KeyedProducer(self.client,
+                                  partitioner=RoundRobinPartitioner,
+-                                 async=True,
++                                 async_send=True,
+                                  batch_send_every_t=1)
+ 
+         resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
+diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
+index 9b87c766..6d00116c 100644
+--- a/test/test_producer_legacy.py
++++ b/test/test_producer_legacy.py
+@@ -73,7 +73,7 @@ def partitions(topic):
+     @patch('kafka.producer.base._send_upstream')
+     def test_producer_async_queue_overfilled(self, mock):
+         queue_size = 2
+-        producer = Producer(MagicMock(), async=True,
++        producer = Producer(MagicMock(), async_send=True,
+                             async_queue_maxsize=queue_size)
+ 
+         topic = b'test-topic'
+@@ -95,25 +95,25 @@ def test_producer_sync_fail_on_error(self):
+                     with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
+ 
+                         client = SimpleClient(MagicMock())
+-                        producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
++                        producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)
+ 
+                         # This should not raise
+                         (response,) = producer.send_messages('foobar', b'test message')
+                         self.assertEqual(response, error)
+ 
+-                        producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
++                        producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
+                         with self.assertRaises(FailedPayloadsError):
+                             producer.send_messages('foobar', b'test message')
+ 
+     def test_cleanup_is_not_called_on_stopped_producer(self):
+-        producer = Producer(MagicMock(), async=True)
++        producer = Producer(MagicMock(), async_send=True)
+         producer.stopped = True
+         with patch.object(producer, 'stop') as mocked_stop:
+             producer._cleanup_func(producer)
+             self.assertEqual(mocked_stop.call_count, 0)
+ 
+     def test_cleanup_is_called_on_running_producer(self):
+-        producer = Producer(MagicMock(), async=True)
++        producer = Producer(MagicMock(), async_send=True)
+         producer.stopped = False
+         with patch.object(producer, 'stop') as mocked_stop:
+             producer._cleanup_func(producer)
diff -Nru python-kafka-1.3.3/debian/patches/py37-compat.patch python-kafka-1.3.3/debian/patches/py37-compat.patch
--- python-kafka-1.3.3/debian/patches/py37-compat.patch	1969-12-31 18:00:00.000000000 -0600
+++ python-kafka-1.3.3/debian/patches/py37-compat.patch	2018-07-05 05:26:23.000000000 -0500
@@ -0,0 +1,304 @@
+From b62006aeb86258b4b1ef2735bebb1fe99459b82d Mon Sep 17 00:00:00 2001
+From: Dana Powers <dana.powers at gmail.com>
+Date: Fri, 23 Mar 2018 05:58:55 -0700
+Subject: [PATCH] Change SimpleProducer to use async_send (async is reserved in
+ py37) (#1454)
+
+---
+ docs/simple.rst                   |  8 ++++----
+ kafka/producer/base.py            | 38 +++++++++++++++++++++++---------------
+ kafka/producer/keyed.py           |  2 +-
+ kafka/producer/simple.py          |  2 +-
+ test/test_failover_integration.py |  8 ++++----
+ test/test_producer_integration.py |  8 ++++----
+ test/test_producer_legacy.py      | 10 +++++-----
+ 7 files changed, 42 insertions(+), 34 deletions(-)
+
+--- a/docs/simple.rst
++++ b/docs/simple.rst
+@@ -49,7 +49,7 @@ Asynchronous Mode
+ 
+     # To send messages asynchronously
+     client = SimpleClient('localhost:9092')
+-    producer = SimpleProducer(client, async=True)
++    producer = SimpleProducer(client, async_send=True)
+     producer.send_messages('my-topic', b'async message')
+ 
+     # To send messages in batch. You can use any of the available
+@@ -60,7 +60,7 @@ Asynchronous Mode
+     # * If the producer dies before the messages are sent, there will be losses
+     # * Call producer.stop() to send the messages and cleanup
+     producer = SimpleProducer(client,
+-                              async=True,
++                              async_send=True,
+                               batch_send_every_n=20,
+                               batch_send_every_t=60)
+ 
+@@ -73,7 +73,7 @@ Synchronous Mode
+ 
+     # To send messages synchronously
+     client = SimpleClient('localhost:9092')
+-    producer = SimpleProducer(client, async=False)
++    producer = SimpleProducer(client, async_send=False)
+ 
+     # Note that the application is responsible for encoding messages to type bytes
+     producer.send_messages('my-topic', b'some message')
+@@ -88,7 +88,7 @@ Synchronous Mode
+     # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
+     #                            by all in sync replicas before sending a response
+     producer = SimpleProducer(client,
+-                              async=False,
++                              async_send=False,
+                               req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+                               ack_timeout=2000,
+                               sync_fail_on_error=False)
+--- a/kafka/producer/base.py
++++ b/kafka/producer/base.py
+@@ -227,7 +227,7 @@ class Producer(object):
+ 
+     Arguments:
+         client (kafka.SimpleClient): instance to use for broker
+-            communications. If async=True, the background thread will use
++            communications. If async_send=True, the background thread will use
+             :meth:`client.copy`, which is expected to return a thread-safe
+             object.
+         codec (kafka.protocol.ALL_CODECS): compression codec to use.
+@@ -239,11 +239,11 @@ class Producer(object):
+         sync_fail_on_error (bool, optional): whether sync producer should
+             raise exceptions (True), or just return errors (False),
+             defaults to True.
+-        async (bool, optional): send message using a background thread,
++        async_send (bool, optional): send message using a background thread,
+             defaults to False.
+-        batch_send_every_n (int, optional): If async is True, messages are
++        batch_send_every_n (int, optional): If async_send is True, messages are
+             sent in batches of this size, defaults to 20.
+-        batch_send_every_t (int or float, optional): If async is True,
++        batch_send_every_t (int or float, optional): If async_send is True,
+             messages are sent immediately after this timeout in seconds, even
+             if there are fewer than batch_send_every_n, defaults to 20.
+         async_retry_limit (int, optional): number of retries for failed messages
+@@ -269,8 +269,10 @@ class Producer(object):
+             defaults to 30.
+ 
+     Deprecated Arguments:
++        async (bool, optional): send message using a background thread,
++            defaults to False. Deprecated, use 'async_send'
+         batch_send (bool, optional): If True, messages are sent by a background
+-            thread in batches, defaults to False. Deprecated, use 'async'
++            thread in batches, defaults to False. Deprecated, use 'async_send'
+     """
+     ACK_NOT_REQUIRED = 0            # No ack is required
+     ACK_AFTER_LOCAL_WRITE = 1       # Send response after it is written to log
+@@ -283,8 +285,8 @@ class Producer(object):
+                  codec=None,
+                  codec_compresslevel=None,
+                  sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
+-                 async=False,
+-                 batch_send=False,  # deprecated, use async
++                 async_send=False,
++                 batch_send=False,  # deprecated, use async_send
+                  batch_send_every_n=BATCH_SEND_MSG_COUNT,
+                  batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+                  async_retry_limit=ASYNC_RETRY_LIMIT,
+@@ -293,15 +295,21 @@ class Producer(object):
+                  async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
+                  async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
+                  async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
+-                 async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
++                 async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
++                 **kwargs):
++
++        # async renamed async_send for python3.7 support
++        if 'async' in kwargs:
++            log.warning('Deprecated async option found -- use async_send')
++            async_send = kwargs['async']
+ 
+-        if async:
++        if async_send:
+             assert batch_send_every_n > 0
+             assert batch_send_every_t > 0
+             assert async_queue_maxsize >= 0
+ 
+         self.client = client
+-        self.async = async
++        self.async_send = async_send
+         self.req_acks = req_acks
+         self.ack_timeout = ack_timeout
+         self.stopped = False
+@@ -314,7 +322,7 @@ class Producer(object):
+         self.codec = codec
+         self.codec_compresslevel = codec_compresslevel
+ 
+-        if self.async:
++        if self.async_send:
+             # Messages are sent through this queue
+             self.queue = Queue(async_queue_maxsize)
+             self.async_queue_put_timeout = async_queue_put_timeout
+@@ -401,7 +409,7 @@ class Producer(object):
+         if key is not None and not isinstance(key, six.binary_type):
+             raise TypeError("the key must be type bytes")
+ 
+-        if self.async:
++        if self.async_send:
+             for idx, m in enumerate(msg):
+                 try:
+                     item = (TopicPartition(topic, partition), m, key)
+@@ -436,7 +444,7 @@ class Producer(object):
+             log.warning('timeout argument to stop() is deprecated - '
+                         'it will be removed in future release')
+ 
+-        if not self.async:
++        if not self.async_send:
+             log.warning('producer.stop() called, but producer is not async')
+             return
+ 
+@@ -444,7 +452,7 @@ class Producer(object):
+             log.warning('producer.stop() called, but producer is already stopped')
+             return
+ 
+-        if self.async:
++        if self.async_send:
+             self.queue.put((STOP_ASYNC_PRODUCER, None, None))
+             self.thread_stop_event.set()
+             self.thread.join()
+@@ -472,5 +480,5 @@ class Producer(object):
+         self.stopped = True
+ 
+     def __del__(self):
+-        if self.async and not self.stopped:
++        if self.async_send and not self.stopped:
+             self.stop()
+--- a/kafka/producer/keyed.py
++++ b/kafka/producer/keyed.py
+@@ -46,4 +46,4 @@ class KeyedProducer(Producer):
+         return self.send_messages(topic, key, msg)
+ 
+     def __repr__(self):
+-        return '<KeyedProducer batch=%s>' % self.async
++        return '<KeyedProducer batch=%s>' % self.async_send
+--- a/kafka/producer/simple.py
++++ b/kafka/producer/simple.py
+@@ -51,4 +51,4 @@ class SimpleProducer(Producer):
+         )
+ 
+     def __repr__(self):
+-        return '<SimpleProducer batch=%s>' % self.async
++        return '<SimpleProducer batch=%s>' % self.async_send
+--- a/test/test_failover_integration.py
++++ b/test/test_failover_integration.py
+@@ -61,7 +61,7 @@ class TestFailover(KafkaIntegrationTestC
+         # require that the server commit messages to all in-sync replicas
+         # so that failover doesn't lose any messages on server-side
+         # and we can assert that server-side message count equals client-side
+-        producer = Producer(self.client, async=False,
++        producer = Producer(self.client, async_send=False,
+                             req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
+ 
+         # Send 100 random messages to a specific partition
+@@ -102,7 +102,7 @@ class TestFailover(KafkaIntegrationTestC
+         partition = 0
+ 
+         # Test the base class Producer -- send_messages to a specific partition
+-        producer = Producer(self.client, async=True,
++        producer = Producer(self.client, async_send=True,
+                             batch_send_every_n=15,
+                             batch_send_every_t=3,
+                             req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
+@@ -147,7 +147,7 @@ class TestFailover(KafkaIntegrationTestC
+     def test_switch_leader_keyed_producer(self):
+         topic = self.topic
+ 
+-        producer = KeyedProducer(self.client, async=False)
++        producer = KeyedProducer(self.client, async_send=False)
+ 
+         # Send 10 random messages
+         for _ in range(10):
+@@ -183,7 +183,7 @@ class TestFailover(KafkaIntegrationTestC
+             producer.send_messages(topic, key, msg)
+ 
+     def test_switch_leader_simple_consumer(self):
+-        producer = Producer(self.client, async=False)
++        producer = Producer(self.client, async_send=False)
+         consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
+         self._send_random_messages(producer, self.topic, 0, 2)
+         consumer.get_messages()
+--- a/test/test_producer_integration.py
++++ b/test/test_producer_integration.py
+@@ -184,7 +184,7 @@ class TestKafkaProducerIntegration(Kafka
+         partition = self.client.get_partition_ids_for_topic(self.topic)[0]
+         start_offset = self.current_offset(self.topic, partition)
+ 
+-        producer = SimpleProducer(self.client, async=True, random_start=False)
++        producer = SimpleProducer(self.client, async_send=True, random_start=False)
+         resp = producer.send_messages(self.topic, self.msg("one"))
+         self.assertEqual(len(resp), 0)
+ 
+@@ -203,7 +203,7 @@ class TestKafkaProducerIntegration(Kafka
+         batch_interval = 5
+         producer = SimpleProducer(
+             self.client,
+-            async=True,
++            async_send=True,
+             batch_send_every_n=batch_messages,
+             batch_send_every_t=batch_interval,
+             random_start=False)
+@@ -268,7 +268,7 @@ class TestKafkaProducerIntegration(Kafka
+         batch_interval = 5
+         producer = SimpleProducer(
+             self.client,
+-            async=True,
++            async_send=True,
+             batch_send_every_n=100,
+             batch_send_every_t=batch_interval,
+             random_start=False)
+@@ -400,7 +400,7 @@ class TestKafkaProducerIntegration(Kafka
+ 
+         producer = KeyedProducer(self.client,
+                                  partitioner=RoundRobinPartitioner,
+-                                 async=True,
++                                 async_send=True,
+                                  batch_send_every_t=1)
+ 
+         resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
+--- a/test/test_producer_legacy.py
++++ b/test/test_producer_legacy.py
+@@ -73,7 +73,7 @@ class TestKafkaProducer(unittest.TestCas
+     @patch('kafka.producer.base._send_upstream')
+     def test_producer_async_queue_overfilled(self, mock):
+         queue_size = 2
+-        producer = Producer(MagicMock(), async=True,
++        producer = Producer(MagicMock(), async_send=True,
+                             async_queue_maxsize=queue_size)
+ 
+         topic = b'test-topic'
+@@ -95,25 +95,25 @@ class TestKafkaProducer(unittest.TestCas
+                     with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
+ 
+                         client = SimpleClient(MagicMock())
+-                        producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
++                        producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)
+ 
+                         # This should not raise
+                         (response,) = producer.send_messages('foobar', b'test message')
+                         self.assertEqual(response, error)
+ 
+-                        producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
++                        producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
+                         with self.assertRaises(FailedPayloadsError):
+                             producer.send_messages('foobar', b'test message')
+ 
+     def test_cleanup_is_not_called_on_stopped_producer(self):
+-        producer = Producer(MagicMock(), async=True)
++        producer = Producer(MagicMock(), async_send=True)
+         producer.stopped = True
+         with patch.object(producer, 'stop') as mocked_stop:
+             producer._cleanup_func(producer)
+             self.assertEqual(mocked_stop.call_count, 0)
+ 
+     def test_cleanup_is_called_on_running_producer(self):
+-        producer = Producer(MagicMock(), async=True)
++        producer = Producer(MagicMock(), async_send=True)
+         producer.stopped = False
+         with patch.object(producer, 'stop') as mocked_stop:
+             producer._cleanup_func(producer)
diff -Nru python-kafka-1.3.3/debian/patches/series python-kafka-1.3.3/debian/patches/series
--- python-kafka-1.3.3/debian/patches/series	2017-11-17 06:11:12.000000000 -0600
+++ python-kafka-1.3.3/debian/patches/series	2018-07-05 05:18:26.000000000 -0500
@@ -3,3 +3,4 @@
 do-not-test-lz4-compression.patch
 remove-multiple-privacy-breaches.patch
 remove-old-lz4-test.patch
+py37-compat.patch


More information about the Openstack-devel mailing list