From 862e86e2f14d05a70e04d1138b60c865ea0f5116 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Sun, 20 Sep 2015 00:05:40 +0100 Subject: [PATCH 01/15] producer: use protocol.Messages on queues etc The tuples handed to _produce() were starting to look like full-on Messages anyway. In preparation for work that would add yet another element to the tuple (a delivery-reporting future), I figured we might as well "upgrade" to a real Message. Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 53 +++++++++++++++------------------- pykafka/protocol.py | 3 +- tests/pykafka/test_producer.py | 12 ++++---- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index ed7a74bf7..52f12002f 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -234,24 +234,25 @@ def produce(self, message, partition_key=None): raise ProducerStoppedException() partitions = list(self._topic.partitions.values()) partition_id = self._partitioner(partitions, partition_key).id - message_partition_tup = (partition_key, message), partition_id, 0 - self._produce(message_partition_tup) + msg = Message(value=message, + partition_key=partition_key, + partition_id=partition_id) + self._produce(msg) if self._synchronous: self._wait_all() self._raise_worker_exceptions() - def _produce(self, message_partition_tup): + def _produce(self, message): """Enqueue a message for the relevant broker - :param message_partition_tup: Message with partition assigned. - :type message_partition_tup: ((bytes, bytes), int) tuple + :param message: Message with partition assigned. + :type message: `pykafka.protocol.Message` """ - kv, partition_id, attempts = message_partition_tup success = False while not success: - leader_id = self._topic.partitions[partition_id].leader.id + leader_id = self._topic.partitions[message.partition_id].leader.id if leader_id in self._owned_brokers: - self._owned_brokers[leader_id].enqueue([(kv, partition_id, attempts)]) + self._owned_brokers[leader_id].enqueue(message) success = True else: success = False @@ -260,7 +261,7 @@ def _send_request(self, message_batch, owned_broker): """Send the produce request to the broker and handle the response. :param message_batch: An iterable of messages to send - :type message_batch: iterable of `((key, value), partition_id)` tuples + :type message_batch: iterable of `pykafka.protocol.Message` :param owned_broker: The broker to which to send the request :type owned_broker: :class:`pykafka.producer.OwnedBroker` """ @@ -269,25 +270,19 @@ def _send_request(self, message_batch, owned_broker): required_acks=self._required_acks, timeout=self._ack_timeout_ms ) - for (key, value), partition_id, msg_attempt in message_batch: - req.add_message( - Message(value, partition_key=key, produce_attempt=msg_attempt), - self._topic.name, - partition_id - ) + for msg in message_batch: + req.add_message(msg, self._topic.name, msg.partition_id) log.debug("Sending %d messages to broker %d", len(message_batch), owned_broker.broker.id) def _get_partition_msgs(partition_id, req): """Get all the messages for the partitions from the request.""" - messages = itertools.chain.from_iterable( + return itertools.chain.from_iterable( mset.messages for topic, partitions in iteritems(req.msets) for p_id, mset in iteritems(partitions) if p_id == partition_id ) - for message in messages: - yield (message.partition_key, message.value), partition_id, message.produce_attempt try: response = owned_broker.broker.produce_messages(req) @@ -329,7 +324,7 @@ def _get_partition_msgs(partition_id, req): owned_broker.broker.port) self._update() to_retry = [ - ((message.partition_key, message.value), p_id, message.produce_attempt) + message for topic, partitions in iteritems(req.msets) for p_id, mset in iteritems(partitions) for message in mset.messages @@ -338,11 +333,12 @@ def _get_partition_msgs(partition_id, req): if to_retry: time.sleep(self._retry_backoff_ms / 1000) owned_broker.increment_messages_pending(-1 * len(to_retry)) - for kv, partition_id, msg_attempt in to_retry: - if msg_attempt >= self._max_retries: + for msg in to_retry: + if msg.produce_attempt >= self._max_retries: raise ProduceFailureError("Message failed to send after %d " "retries.", self._max_retries) - self._produce((kv, partition_id, msg_attempt + 1)) + msg.produce_attempt += 1 + self._produce(msg) def _wait_all(self): """Block until all pending messages are sent @@ -418,17 +414,16 @@ def message_is_pending(self): """ return self.messages_pending > 0 - def enqueue(self, messages): - """Push messages onto the queue + def enqueue(self, message): + """Push message onto the queue - :param messages: The messages to push onto the queue - :type messages: iterable of tuples of the form - `((key, value), partition_id)` + :param message: The message to push onto the queue + :type message: `pykafka.protocol.Message` """ self._wait_for_slot_available() with self.lock: - self.queue.extendleft(messages) - self.increment_messages_pending(len(messages)) + self.queue.appendleft(message) + self.increment_messages_pending(1) if len(self.queue) >= self.producer._min_queued_messages: if not self.flush_ready.is_set(): self.flush_ready.set() diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 5269869c5..4068a2597 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -138,7 +138,8 @@ class Message(Message, Serializable): :class:`pykafka.protocol.Message` also contains `partition` and `partition_id` fields. Both of these have meaningless default values. When - :class:`pykafka.protocol.Message` is used by the producer. + :class:`pykafka.protocol.Message` is used by the producer, `partition_id` + identifies the Message's destination partition. When used in a :class:`pykafka.protocol.FetchRequest`, `partition_id` is set to the id of the partition from which the message was sent on receipt of the message. In the :class:`pykafka.simpleconsumer.SimpleConsumer`, diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index 97a25c9a1..b9d5c0f02 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -6,6 +6,7 @@ from pykafka import KafkaClient from pykafka.exceptions import ProducerQueueFullError +from pykafka.protocol import Message from pykafka.test.utils import get_cluster, stop_cluster @@ -84,12 +85,13 @@ def test_async_produce_lingers(self): def test_async_produce_thread_exception(self): """Ensure that an exception on a worker thread is raised to the main thread""" topic = self.client.topics[self.topic_name] - with self.assertRaises(ValueError): + with self.assertRaises(AttributeError): with topic.get_producer(min_queued_messages=1) as producer: - # get some dummy data into the queue that will cause a crash when flushed - # specifically, this tuple causes a crash since its first element is - # not a two-tuple - producer._produce(("anything", 0)) + # get some dummy data into the queue that will cause a crash + # when flushed: + msg = Message("stuff", partition_id=0) + del msg.value + producer._produce(msg) while self.consumer.consume() is not None: time.sleep(.05) From c6bb40e04726075b4f0c1c8ef862e85e4b5f9a31 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Mon, 21 Sep 2015 23:38:04 +0100 Subject: [PATCH 02/15] producer: return futures from produce() I've so far only done some clumsy testing with this, where I'd manually disconnect broker connections, and wait for SocketDisconnectedError to emerge from the Future - that works. Test suite expansion coming soon. Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 64 ++++++++++++++++++++++++++++++--------------- setup.py | 1 + 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index 52f12002f..3a19aa63f 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -19,7 +19,7 @@ """ __all__ = ["Producer"] from collections import deque -import itertools +from concurrent import futures import logging import sys import time @@ -27,6 +27,7 @@ from .common import CompressionType from .exceptions import ( + ERROR_CODES, InvalidMessageError, InvalidMessageSize, LeaderNotAvailable, @@ -229,6 +230,11 @@ def produce(self, message, partition_key=None): :param partition_key: The key to use when deciding which partition to send this message to :type partition_key: bytes + + :returns: a Future, carrying any errors that occurred, or `None` if + the producer was created with `sync=True` (in that case, any + exceptions are also raised directly from `produce()`) + :rtype: `concurrent.futures.Future` """ if not self._running: raise ProducerStoppedException() @@ -237,10 +243,12 @@ def produce(self, message, partition_key=None): msg = Message(value=message, partition_key=partition_key, partition_id=partition_id) + msg.delivery_future = futures.Future() self._produce(msg) - if self._synchronous: - self._wait_all() self._raise_worker_exceptions() + if self._synchronous: + return msg.delivery_future.result() + return msg.delivery_future def _produce(self, message): """Enqueue a message for the relevant broker @@ -277,8 +285,8 @@ def _send_request(self, message_batch, owned_broker): def _get_partition_msgs(partition_id, req): """Get all the messages for the partitions from the request.""" - return itertools.chain.from_iterable( - mset.messages + return ( + mset for topic, partitions in iteritems(req.msets) for p_id, mset in iteritems(partitions) if p_id == partition_id @@ -286,13 +294,20 @@ def _get_partition_msgs(partition_id, req): try: response = owned_broker.broker.produce_messages(req) - to_retry = [] + + # Kafka either atomically appends or rejects whole MessageSets, so + # we define a list of potential retries thus: + to_retry = [] # (MessageSet, Exception) tuples + for topic, partitions in iteritems(response.topics): for partition, presponse in iteritems(partitions): if presponse.err == 0: - # mark msg_count messages as successfully delivered - msg_count = len(req.msets[topic][partition].messages) - owned_broker.increment_messages_pending(-1 * msg_count) + # mark messages as successfully delivered + delivered = req.msets[topic][partition].messages + owned_broker.increment_messages_pending( + -1 * len(delivered)) + for msg in delivered: + msg.delivery_future.set_result(None) continue # All's well if presponse.err == UnknownTopicOrPartition.ERROR_CODE: log.warning('Unknown topic: %s or partition: %s. ' @@ -313,32 +328,39 @@ def _get_partition_msgs(partition_id, req): log.warning('Encountered InvalidMessageError') elif presponse.err == InvalidMessageSize.ERROR_CODE: log.warning('Encountered InvalidMessageSize') - continue elif presponse.err == MessageSizeTooLarge.ERROR_CODE: log.warning('Encountered MessageSizeTooLarge') - continue - to_retry.extend(_get_partition_msgs(partition, req)) - except SocketDisconnectedError: + exc = ERROR_CODES[presponse.err] + to_retry.extend( + (mset, exc) + for mset in _get_partition_msgs(partition, req)) + except SocketDisconnectedError as exc: log.warning('Broker %s:%s disconnected. Retrying.', owned_broker.broker.host, owned_broker.broker.port) self._update() to_retry = [ - message + (mset, exc) for topic, partitions in iteritems(req.msets) for p_id, mset in iteritems(partitions) - for message in mset.messages ] if to_retry: time.sleep(self._retry_backoff_ms / 1000) owned_broker.increment_messages_pending(-1 * len(to_retry)) - for msg in to_retry: - if msg.produce_attempt >= self._max_retries: - raise ProduceFailureError("Message failed to send after %d " - "retries.", self._max_retries) - msg.produce_attempt += 1 - self._produce(msg) + for mset, exc in to_retry: + # XXX arguably, we should try to check these non_recoverables + # for individual messages in _produce and raise errors there + # right away, rather than failing a whole batch here? + non_recoverable = exc in (InvalidMessageSize, + MessageSizeTooLarge) + for msg in mset.messages: + if (non_recoverable + or msg.produce_attempt >= self._max_retries): + msg.delivery_future.set_exception(exc) + else: + msg.produce_attempt += 1 + self._produce(msg) def _wait_all(self): """Block until all pending messages are sent diff --git a/setup.py b/setup.py index da0dc382e..4a256008e 100755 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ def get_version(): version_file.read()).group('version') install_requires = [ + 'futures', 'kazoo', 'tabulate', ] From 1613635724a3e141a2c7b16db7eba3554fbda211 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Tue, 22 Sep 2015 10:55:50 +0100 Subject: [PATCH 03/15] travis-ci: add missing dependency Signed-off-by: Yung-Chin Oei --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4901f2452..4375b7505 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,7 @@ notifications: install: - - pip install python-coveralls kazoo tox testinstances + - pip install python-coveralls kazoo tox testinstances futures before_script: - "python -m pykafka.test.kafka_instance 3 --download-dir /home/travis/kafka-bin &" From 4c4e128f15135ccff8455526bbcb59d8d3be3600 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Wed, 30 Sep 2015 17:14:38 +0100 Subject: [PATCH 04/15] producer: fix pending futures for required_acks=0 See also the related fix on the (future-less) master branch, c434696c, and issue #278 for context. Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pykafka/producer.py b/pykafka/producer.py index 9f1d4365a..25689bfff 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -300,6 +300,8 @@ def _get_partition_msgs(partition_id, req): if self._required_acks == 0: # and thus, `response` is None owned_broker.increment_messages_pending( -1 * len(message_batch)) + for msg in message_batch: + msg.delivery_future.set_result(None) return # Kafka either atomically appends or rejects whole MessageSets, so From aff59c399fe875e292051865a8a67a0c7f5a1299 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Wed, 30 Sep 2015 21:19:31 +0100 Subject: [PATCH 05/15] producer: small refactor after parent commit Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index 25689bfff..957acfb1a 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -295,13 +295,15 @@ def _get_partition_msgs(partition_id, req): if p_id == partition_id ) + def mark_as_delivered(message_batch): + owned_broker.increment_messages_pending(-1 * len(message_batch)) + for msg in message_batch: + msg.delivery_future.set_result(None) + try: response = owned_broker.broker.produce_messages(req) if self._required_acks == 0: # and thus, `response` is None - owned_broker.increment_messages_pending( - -1 * len(message_batch)) - for msg in message_batch: - msg.delivery_future.set_result(None) + mark_as_delivered(message_batch) return # Kafka either atomically appends or rejects whole MessageSets, so @@ -311,12 +313,7 @@ def _get_partition_msgs(partition_id, req): for topic, partitions in iteritems(response.topics): for partition, presponse in iteritems(partitions): if presponse.err == 0: - # mark messages as successfully delivered - delivered = req.msets[topic][partition].messages - owned_broker.increment_messages_pending( - -1 * len(delivered)) - for msg in delivered: - msg.delivery_future.set_result(None) + mark_as_delivered(req.msets[topic][partition].messages) continue # All's well if presponse.err == UnknownTopicOrPartition.ERROR_CODE: log.warning('Unknown topic: %s or partition: %s. ' From bd5a15c1ff6946f5a4ef5d4c10c9675c747cbc43 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Fri, 2 Oct 2015 15:27:06 +0100 Subject: [PATCH 06/15] producer: add test and fix bug in future exceptions This adds a test for the new producer futures feature, and immediately exposed a bug: I pushed an exception type onto the future instead of an instance. We fix that here. Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 32 ++++++++++---------------------- tests/pykafka/test_producer.py | 12 ++++++++++-- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index 957acfb1a..8d539f0cd 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -315,28 +315,16 @@ def mark_as_delivered(message_batch): if presponse.err == 0: mark_as_delivered(req.msets[topic][partition].messages) continue # All's well - if presponse.err == UnknownTopicOrPartition.ERROR_CODE: - log.warning('Unknown topic: %s or partition: %s. ' - 'Retrying.', topic, partition) - elif presponse.err == NotLeaderForPartition.ERROR_CODE: - log.warning('Partition leader for %s/%s changed. ' - 'Retrying.', topic, partition) + if presponse.err == NotLeaderForPartition.ERROR_CODE: # Update cluster metadata to get new leader self._update() - elif presponse.err == RequestTimedOut.ERROR_CODE: - log.warning('Produce request to %s:%s timed out. ' - 'Retrying.', owned_broker.broker.host, - owned_broker.broker.port) - elif presponse.err == LeaderNotAvailable.ERROR_CODE: - log.warning('Leader not available for partition %s.' - 'Retrying.', partition) - elif presponse.err == InvalidMessageError.ERROR_CODE: - log.warning('Encountered InvalidMessageError') - elif presponse.err == InvalidMessageSize.ERROR_CODE: - log.warning('Encountered InvalidMessageSize') - elif presponse.err == MessageSizeTooLarge.ERROR_CODE: - log.warning('Encountered MessageSizeTooLarge') - exc = ERROR_CODES[presponse.err] + info = "Produce request for {}/{} to {}:{} failed.".format( + topic, + partition, + owned_broker.broker.host, + owned_broker.broker.port) + log.warning(info) + exc = ERROR_CODES[presponse.err](info) to_retry.extend( (mset, exc) for mset in _get_partition_msgs(partition, req)) @@ -358,8 +346,8 @@ def mark_as_delivered(message_batch): # XXX arguably, we should try to check these non_recoverables # for individual messages in _produce and raise errors there # right away, rather than failing a whole batch here? - non_recoverable = exc in (InvalidMessageSize, - MessageSizeTooLarge) + non_recoverable = type(exc) in (InvalidMessageSize, + MessageSizeTooLarge) for msg in mset.messages: if (non_recoverable or msg.produce_attempt >= self._max_retries): diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index 0dec0ecf9..c9913f3de 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -5,7 +5,7 @@ from uuid import uuid4 from pykafka import KafkaClient -from pykafka.exceptions import ProducerQueueFullError +from pykafka.exceptions import MessageSizeTooLarge, ProducerQueueFullError from pykafka.protocol import Message from pykafka.test.utils import get_cluster, stop_cluster @@ -39,11 +39,19 @@ def test_produce(self): message = self.consumer.consume() assert message.value == payload + def test_sync_produce_raises(self): + """Ensure response errors are raised in produce() if sync=True""" + topic = self.client.topics[self.topic_name] + with topic.get_sync_producer(min_queued_messages=1) as prod: + with self.assertRaises(MessageSizeTooLarge): + prod.produce(10**7 * b" ") + def test_async_produce(self): payload = uuid4().bytes prod = self.client.topics[self.topic_name].get_producer(min_queued_messages=1) - prod.produce(payload) + future = prod.produce(payload) + self.assertIsNone(future.result()) message = self.consumer.consume() assert message.value == payload From a63797cfa7307a0d7665304da5181e3c3b60f2fc Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Fri, 2 Oct 2015 15:55:55 +0100 Subject: [PATCH 07/15] update README re producer futures Signed-off-by: Yung-Chin Oei --- README.rst | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 14b3780c7..e6d6c8ba6 100644 --- a/README.rst +++ b/README.rst @@ -57,7 +57,14 @@ producing messages. >>> with topic.get_producer() as producer: ... for i in range(4): - ... producer.produce('test message ' + i ** 2) + ... future = producer.produce('test message ' + i ** 2) + +You're free to ignore the future returned from `produce()`, or you can later +evaluate it to assure yourself that the message made it to disk on the cluster. +This works as with any `concurrent.futures.Future` (`docs`_), by checking +`future.result()` (which should be `None`) or `future.exception()`. + +.. _docs: https://pythonhosted.org/futures/#future-objects You can also consume messages from this topic using a `Consumer` instance. From b932fb18c1c2ed4c82d85ee5b304018d4c43ab09 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Fri, 2 Oct 2015 17:07:51 +0100 Subject: [PATCH 08/15] producer: doc improvements Signed-off-by: Yung-Chin Oei --- README.rst | 4 ++++ pykafka/producer.py | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index e6d6c8ba6..a36f389b1 100644 --- a/README.rst +++ b/README.rst @@ -64,6 +64,10 @@ evaluate it to assure yourself that the message made it to disk on the cluster. This works as with any `concurrent.futures.Future` (`docs`_), by checking `future.result()` (which should be `None`) or `future.exception()`. +(If you would prefer your exceptions straight from the `produce()` call, you +can create the producer with `sync=True`, but this is of course a lot slower, +with network round-trips for every message produced.) + .. _docs: https://pythonhosted.org/futures/#future-objects You can also consume messages from this topic using a `Consumer` instance. diff --git a/pykafka/producer.py b/pykafka/producer.py index 8d539f0cd..a7d4cda23 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -231,9 +231,9 @@ def produce(self, message, partition_key=None): message to :type partition_key: bytes - :returns: a Future, carrying any errors that occurred, or `None` if - the producer was created with `sync=True` (in that case, any - exceptions are also raised directly from `produce()`) + :returns: a `Future` if the producer was created with `sync=False`, + or `None` for `sync=True` (and in that case, any exceptions that + the future would have carried are raised here directly) :rtype: `concurrent.futures.Future` """ if not isinstance(message, bytes): From 1a680f6399a7c7f736d53d1704d9dfd9686dfc79 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Fri, 2 Oct 2015 17:10:36 +0100 Subject: [PATCH 09/15] producer: remove unused imports Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index a7d4cda23..38afca0bc 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -28,17 +28,12 @@ from .common import CompressionType from .exceptions import ( ERROR_CODES, - InvalidMessageError, InvalidMessageSize, - LeaderNotAvailable, MessageSizeTooLarge, NotLeaderForPartition, - ProduceFailureError, ProducerQueueFullError, ProducerStoppedException, - RequestTimedOut, SocketDisconnectedError, - UnknownTopicOrPartition ) from .partitioners import random_partitioner from .protocol import Message, ProduceRequest From 6b4b1cef9080383bddd83189c24df0fd6ac15a90 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Wed, 28 Oct 2015 23:30:59 +0000 Subject: [PATCH 10/15] protocol: define Message.__slots__ As per discussion on #269. In order for this to work, parent classes also needed to be given `__slots__` (otherwise instances would still have a `__dict__`). A few tests used the instance's `__dict__` directly and thus needed a tweak. While working that out, I found that `test_snappy_decompression` wasn't running, because it had been misnamed. The test wasn't even py3-compatible yet, but luckily the code it tests was. Signed-off-by: Yung-Chin Oei --- pykafka/common.py | 2 +- pykafka/protocol.py | 13 +++++ pykafka/utils/__init__.py | 2 + tests/pykafka/test_protocol.py | 102 +++++++++++++++------------------ 4 files changed, 63 insertions(+), 56 deletions(-) diff --git a/pykafka/common.py b/pykafka/common.py index 5cb6c746e..2ae36c22f 100644 --- a/pykafka/common.py +++ b/pykafka/common.py @@ -33,7 +33,7 @@ class Message(object): :ivar key: (optional) Message key :ivar offset: Message offset """ - pass + __slots__ = [] class CompressionType(object): diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 496e142e6..906bd08f9 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -154,6 +154,17 @@ class Message(Message, Serializable): """ MAGIC = 0 + __slots__ = [ + "compression_type", + "partition_key", + "value", + "offset", + "partition_id", + "partition", + "produce_attempt", + "delivery_future", + ] + def __init__(self, value, partition_key=None, @@ -171,6 +182,8 @@ def __init__(self, # self.partition is set by the consumer self.partition = None self.produce_attempt = produce_attempt + # delivery_future is used by the producer + self.delivery_future = None def __len__(self): size = 4 + 1 + 1 + 4 + 4 diff --git a/pykafka/utils/__init__.py b/pykafka/utils/__init__.py index ca6997ae7..f1a838441 100644 --- a/pykafka/utils/__init__.py +++ b/pykafka/utils/__init__.py @@ -16,6 +16,8 @@ class Serializable(object): + __slots__ = [] + def __len__(self): """Length of the bytes that will be sent to the Kafka server.""" raise NotImplementedError() diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 90e9e60f1..8ff145f70 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -1,3 +1,4 @@ +import operator import unittest2 from pykafka import protocol @@ -95,6 +96,42 @@ def test_response(self): class TestFetchAPI(unittest2.TestCase): maxDiff = None + expected_data = [ + { + 'partition_key': b'asdf', + 'compression_type': 0, + 'value': b'this is a test message', + 'offset': 0, + 'partition_id': 0, + 'produce_attempt': 0, + 'delivery_future': None, + 'partition': None + }, { + 'partition_key': b'test_key', + 'compression_type': 0, + 'value': b'this is also a test message', + 'offset': 1, + 'partition_id': 0, + 'produce_attempt': 0, + 'delivery_future': None, + 'partition': None + }, { + 'partition_key': None, + 'compression_type': 0, + 'value': b"this doesn't have a partition key", + 'offset': 2, + 'partition_id': 0, + 'produce_attempt': 0, + 'delivery_future': None, + 'partition': None + }] + + def msg_to_dict(self, msg): + """Helper to extract data from Message slots""" + attr_names = protocol.Message.__slots__ + f = operator.attrgetter(*attr_names) + return dict(zip(attr_names, f(msg))) + def test_request(self): preq = protocol.PartitionFetchRequest(b'test', 0, 1) req = protocol.FetchRequest(partition_requests=[preq, ]) @@ -126,63 +163,18 @@ def test_response(self): def test_gzip_decompression(self): msg = b'\x00\x00\x00\x01\x00\ttest_gzip\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x9b\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x8f\xbb\xe7\x1f\xb8\x00\x01\xff\xff\xff\xff\x00\x00\x00\x81\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x00c`\x80\x03\r\xbe.I\x7f0\x8b%\xb18%\rH\x8b\x95dd\x16+\x00Q\xa2BIjq\x89Bnjqqbz*T=#\x10\x1b\xb2\xf3\xcb\xf4\x81y\x1c \x15\xf1\xd9\xa9\x95@\xb64\\_Nq>v\xcdL@\xac\x7f\xb5(\xd9\x98\x81\xe1?\x10\x00y\x8a`M)\xf9\xa9\xc5y\xea%\n\x19\x89e\xa9@\x9d\x05\x89E%\x99%\x99\xf9y\n@\x93\x01N1\x9f[\xac\x00\x00\x00' response = protocol.FetchResponse(msg) - expected1 = { - 'partition_key': b'asdf', - 'compression_type': 0, - 'value': b'this is a test message', - 'offset': 0, - 'partition_id': 0, - 'produce_attempt': 0, - 'partition': None - } - self.assertDictEqual( - response.topics[b'test_gzip'][0].messages[0].__dict__, - expected1 - ) - expected2 = { - 'partition_key': b'test_key', - 'compression_type': 0, - 'value': b'this is also a test message', - 'offset': 1, - 'partition_id': 0, - 'produce_attempt': 0, - 'partition': None - } - self.assertDictEqual( - response.topics[b'test_gzip'][0].messages[1].__dict__, - expected2 - ) - expected3 = { - 'partition_key': None, - 'compression_type': 0, - 'value': b"this doesn't have a partition key", - 'offset': 2, - 'partition_id': 0, - 'produce_attempt': 0, - 'partition': None - } - - self.assertDictEqual( - response.topics[b'test_gzip'][0].messages[2].__dict__, - expected3 - ) - return + for i in range(len(self.expected_data)): + self.assertDictEqual( + self.msg_to_dict(response.topics[b'test_gzip'][0].messages[i]), + self.expected_data[i]) - def snappy_decompression(self): - msg = '\x00\x00\x00\x01\x00\x0btest_snappy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\xb5\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\xa9\xc1\xf2\xa3\xe1\x00\x02\xff\xff\xff\xff\x00\x00\x00\x9b\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x87\xac\x01\x00\x00\x19\x01\x10(\x0e\x8a\x19O\x05\x0fx\x04asdf\x00\x00\x00\x16this is a test message\x05$(\x00\x00\x01\x00\x00\x001\x07\x0f\x1c\x8e\x05\x10\x00\x08\x01"\x1c_key\x00\x00\x00\x1b\x158\x08lsoV=\x00H\x02\x00\x00\x00/\xd5rc3\x00\x00\xff\xff\xff\xff\x00\x00\x00!\x055ldoesn\'t have a partition key' + def test_snappy_decompression(self): + msg = b'\x00\x00\x00\x01\x00\x0btest_snappy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\xb5\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\xa9\xc1\xf2\xa3\xe1\x00\x02\xff\xff\xff\xff\x00\x00\x00\x9b\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x87\xac\x01\x00\x00\x19\x01\x10(\x0e\x8a\x19O\x05\x0fx\x04asdf\x00\x00\x00\x16this is a test message\x05$(\x00\x00\x01\x00\x00\x001\x07\x0f\x1c\x8e\x05\x10\x00\x08\x01"\x1c_key\x00\x00\x00\x1b\x158\x08lsoV=\x00H\x02\x00\x00\x00/\xd5rc3\x00\x00\xff\xff\xff\xff\x00\x00\x00!\x055ldoesn\'t have a partition key' response = protocol.FetchResponse(msg) - self.assertDictEqual( - response.topics['test_snappy'][0].messages[0].__dict__, - {'partition_key': 'asdf', 'compression_type': 0, 'value': 'this is a test message', 'offset': 0}, - ) - self.assertDictEqual( - response.topics['test_snappy'][0].messages[1].__dict__, - {'partition_key': 'test_key', 'compression_type': 0, 'value': 'this is also a test message', 'offset': 1}, - ) - self.assertDictEqual( - response.topics['test_snappy'][0].messages[2].__dict__, - {'partition_key': None, 'compression_type': 0, 'value': "this doesn't have a partition key", 'offset': 2} - ) + for i in range(len(self.expected_data)): + self.assertDictEqual( + self.msg_to_dict(response.topics[b'test_snappy'][0].messages[i]), + self.expected_data[i]) class TestOffsetAPI(unittest2.TestCase): From 92d129160e7c6f1b03c8c80ce07e117c5b47f9fb Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Thu, 29 Oct 2015 00:21:28 +0000 Subject: [PATCH 11/15] tests: test producer with recoverable error This exercises the path in the new producer-futures code where the error encountered is recoverable. Signed-off-by: Yung-Chin Oei --- tests/pykafka/test_producer.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index be79306d1..4998f0e0a 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -56,6 +56,32 @@ def test_async_produce(self): message = self.consumer.consume() assert message.value == payload + def test_recover_disconnected(self): + """Test our retry-loop with a recoverable error""" + payload = uuid4().bytes + topic = self.client.topics[self.topic_name] + prod = topic.get_producer(min_queued_messages=1) + + # We must stop the consumer for this test, to ensure that it is the + # producer that will encounter the disconnected brokers and initiate + # a cluster update + self.consumer.stop() + for t in self.consumer._fetch_workers: + t.join() + part_offsets = self.consumer.held_offsets + + for broker in self.client.brokers.values(): + broker._connection.disconnect() + + future = prod.produce(payload) + self.assertIsNone(future.result()) + + self.consumer.start() + self.consumer.reset_offsets([(self.consumer.partitions[pid], offset) + for pid, offset in part_offsets.items()]) + message = self.consumer.consume() + self.assertEqual(message.value, payload) + def test_async_produce_context(self): """Ensure that the producer works as a context manager""" payload = uuid4().bytes From 330a0feab63dce5bf04ec76d86bdcc6c691578da Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Thu, 29 Oct 2015 15:12:32 +0000 Subject: [PATCH 12/15] protocol: add delivery_future to Message.__init__ Just some cleanup. Should have done this as part of defining Message.__slots__ Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 6 +++--- pykafka/protocol.py | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index f6872e166..e054b8da8 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -240,8 +240,8 @@ def produce(self, message, partition_key=None): partition_id = self._partitioner(partitions, partition_key).id msg = Message(value=message, partition_key=partition_key, - partition_id=partition_id) - msg.delivery_future = futures.Future() + partition_id=partition_id, + delivery_future=futures.Future()) self._produce(msg) self._raise_worker_exceptions() if self._synchronous: @@ -251,7 +251,7 @@ def produce(self, message, partition_key=None): def _produce(self, message): """Enqueue a message for the relevant broker - :param message: Message with partition assigned. + :param message: Message with valid `partition_id`, ready to be sent :type message: `pykafka.protocol.Message` """ success = False diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 906bd08f9..e53a099e5 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -151,6 +151,7 @@ class Message(Message, Serializable): :ivar value: The payload associated with this message :ivar offset: The offset of the message :ivar partition_id: The id of the partition to which this message belongs + :ivar delivery_future: For use by :class:`pykafka.producer.Producer` """ MAGIC = 0 @@ -171,7 +172,8 @@ def __init__(self, compression_type=CompressionType.NONE, offset=-1, partition_id=-1, - produce_attempt=0): + produce_attempt=0, + delivery_future=None): self.compression_type = compression_type self.partition_key = partition_key self.value = value @@ -183,7 +185,7 @@ def __init__(self, self.partition = None self.produce_attempt = produce_attempt # delivery_future is used by the producer - self.delivery_future = None + self.delivery_future = delivery_future def __len__(self): size = 4 + 1 + 1 + 4 + 4 From a1d9c3b8c4533fdbb102ac56fb0019e615e59a4f Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Thu, 29 Oct 2015 15:20:13 +0000 Subject: [PATCH 13/15] tests: fix test that was added with 92d12916 Signed-off-by: Yung-Chin Oei --- tests/pykafka/test_producer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index 4998f0e0a..0e4d16b9c 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -77,8 +77,10 @@ def test_recover_disconnected(self): self.assertIsNone(future.result()) self.consumer.start() - self.consumer.reset_offsets([(self.consumer.partitions[pid], offset) - for pid, offset in part_offsets.items()]) + self.consumer.reset_offsets( + # This is just a reset_offsets, but works around issue #216: + [(self.consumer.partitions[pid], offset if offset != -1 else -2) + for pid, offset in part_offsets.items()]) message = self.consumer.consume() self.assertEqual(message.value, payload) From f4f6df1d3e59651fcdcfa23116d1883a5a95d36f Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Thu, 29 Oct 2015 22:27:13 +0000 Subject: [PATCH 14/15] producer: add kafka_msg to returned futures Users will likely always want to have the message content stapled together with the delivery-future, for context when delivery fails, so doing that for them makes things that bit more convenient. Signed-off-by: Yung-Chin Oei --- pykafka/producer.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/pykafka/producer.py b/pykafka/producer.py index e054b8da8..85c032c77 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -24,6 +24,7 @@ import sys import time import traceback +import weakref from .common import CompressionType from .exceptions import ( @@ -228,7 +229,9 @@ def produce(self, message, partition_key=None): :returns: a `Future` if the producer was created with `sync=False`, or `None` for `sync=True` (and in that case, any exceptions that - the future would have carried are raised here directly) + the future would have carried are raised here directly). The + `Future` carries the (successfully or unsuccessfully) produced + :class:`pykafka.protocol.Message` in an extra field, `kafka_msg`. :rtype: `concurrent.futures.Future` """ if not (isinstance(message, bytes) or message is None): @@ -238,15 +241,21 @@ def produce(self, message, partition_key=None): raise ProducerStoppedException() partitions = list(self._topic.partitions.values()) partition_id = self._partitioner(partitions, partition_key).id + + future = futures.Future() msg = Message(value=message, partition_key=partition_key, partition_id=partition_id, - delivery_future=futures.Future()) + # prevent circular ref; see future.kafka_msg below + delivery_future=weakref.ref(future)) self._produce(msg) + self._raise_worker_exceptions() if self._synchronous: - return msg.delivery_future.result() - return msg.delivery_future + return future.result() + + future.kafka_msg = msg + return future def _produce(self, message): """Enqueue a message for the relevant broker @@ -293,7 +302,9 @@ def _get_partition_msgs(partition_id, req): def mark_as_delivered(message_batch): owned_broker.increment_messages_pending(-1 * len(message_batch)) for msg in message_batch: - msg.delivery_future.set_result(None) + f = msg.delivery_future() + if f is not None: # else user discarded future already + f.set_result(None) try: response = owned_broker.broker.produce_messages(req) @@ -344,9 +355,10 @@ def mark_as_delivered(message_batch): non_recoverable = type(exc) in (InvalidMessageSize, MessageSizeTooLarge) for msg in mset.messages: - if (non_recoverable - or msg.produce_attempt >= self._max_retries): - msg.delivery_future.set_exception(exc) + if (non_recoverable or msg.produce_attempt >= self._max_retries): + f = msg.delivery_future() + if f is not None: # else user discarded future already + f.set_exception(exc) else: msg.produce_attempt += 1 self._produce(msg) From 7aa90e662ff5a34aeca70090fd96a08a974f0887 Mon Sep 17 00:00:00 2001 From: Yung-Chin Oei Date: Thu, 29 Oct 2015 23:09:51 +0000 Subject: [PATCH 15/15] README: update producer examples Signed-off-by: Yung-Chin Oei --- README.rst | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/README.rst b/README.rst index f097392d2..4d1d44108 100644 --- a/README.rst +++ b/README.rst @@ -55,18 +55,41 @@ producing messages. .. sourcecode:: python - >>> with topic.get_producer() as producer: + >>> with topic.get_sync_producer() as producer: ... for i in range(4): - ... future = producer.produce('test message ' + i ** 2) + ... producer.produce('test message ' + i ** 2) + +The example above would produce to kafka synchronously, that is, the call only +returns after we have confirmation that the message made it to the cluster. -You're free to ignore the future returned from `produce()`, or you can later -evaluate it to assure yourself that the message made it to disk on the cluster. -This works as with any `concurrent.futures.Future` (`docs`_), by checking -`future.result()` (which should be `None`) or `future.exception()`. +To achieve higher throughput however, we recommend using the ``Producer`` in +asynchronous mode. In that configuration, ``produce()`` calls will return a +``concurrent.futures.Future`` (`docs`_), which you may evaluate later (or, if +reliable delivery is not a concern, you're free to discard it unevaluated). +Here's a rough usage example: -(If you would prefer your exceptions straight from the `produce()` call, you -can create the producer with `sync=True`, but this is of course a lot slower, -with network round-trips for every message produced.) +.. sourcecode:: python + + >>> with topic.get_producer() as producer: + ... count = 0 + ... pending = [] + ... while True: + ... count += 1 + ... future = producer.produce('test message', + ... partition_key='{}'.format(count)) + ... pending.append(future) + ... if count % 10**5 == 0: # adjust this or bring lots of RAM ;) + ... done, not_done = concurrent.futures.wait(pending, + timeout=.001) + ... for future in done: + ... message_key = future.kafka_msg.partition_key + ... if future.exception() is not None: + ... print 'Failed to deliver message {}: {}'.format( + ... message_key, repr(future.exception())) + ... else: + ... print 'Successfully delivered message {}'.format( + ... message_key) + ... pending = list(not_done) .. _docs: https://pythonhosted.org/futures/#future-objects