Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #269 from Parsely/feature/producer-futures
Browse files Browse the repository at this point in the history
Producer futures
  • Loading branch information
emmettbutler committed Oct 30, 2015
2 parents cf90b74 + 7aa90e6 commit 52ae7a1
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 138 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ notifications:


install:
- pip install codecov kazoo tox testinstances
- pip install codecov kazoo tox testinstances futures

before_script:
- "python -m pykafka.test.kafka_instance 3 --download-dir /home/travis/kafka-bin &"
Expand Down
36 changes: 35 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,44 @@ producing messages.

.. sourcecode:: python

>>> with topic.get_producer() as producer:
>>> with topic.get_sync_producer() as producer:
... for i in range(4):
... producer.produce('test message ' + i ** 2)

The example above would produce to kafka synchronously, that is, the call only
returns after we have confirmation that the message made it to the cluster.

To achieve higher throughput however, we recommend using the ``Producer`` in
asynchronous mode. In that configuration, ``produce()`` calls will return a
``concurrent.futures.Future`` (`docs`_), which you may evaluate later (or, if
reliable delivery is not a concern, you're free to discard it unevaluated).
Here's a rough usage example:

.. sourcecode:: python

>>> with topic.get_producer() as producer:
... count = 0
... pending = []
... while True:
... count += 1
... future = producer.produce('test message',
... partition_key='{}'.format(count))
... pending.append(future)
... if count % 10**5 == 0: # adjust this or bring lots of RAM ;)
... done, not_done = concurrent.futures.wait(pending,
timeout=.001)
... for future in done:
... message_key = future.kafka_msg.partition_key
... if future.exception() is not None:
... print 'Failed to deliver message {}: {}'.format(
... message_key, repr(future.exception()))
... else:
... print 'Successfully delivered message {}'.format(
... message_key)
... pending = list(not_done)

.. _docs: https://pythonhosted.org/futures/#future-objects

You can also consume messages from this topic using a `Consumer` instance.

.. sourcecode:: python
Expand Down
2 changes: 1 addition & 1 deletion pykafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Message(object):
:ivar key: (optional) Message key
:ivar offset: Message offset
"""
pass
__slots__ = []


class CompressionType(object):
Expand Down
151 changes: 81 additions & 70 deletions pykafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,22 @@
"""
__all__ = ["Producer"]
from collections import deque
import itertools
from concurrent import futures
import logging
import sys
import time
import traceback
import weakref

from .common import CompressionType
from .exceptions import (
InvalidMessageError,
ERROR_CODES,
InvalidMessageSize,
LeaderNotAvailable,
MessageSizeTooLarge,
NotLeaderForPartition,
ProduceFailureError,
ProducerQueueFullError,
ProducerStoppedException,
RequestTimedOut,
SocketDisconnectedError,
UnknownTopicOrPartition
)
from .partitioners import random_partitioner
from .protocol import Message, ProduceRequest
Expand Down Expand Up @@ -229,6 +226,13 @@ def produce(self, message, partition_key=None):
:param partition_key: The key to use when deciding which partition to send this
message to
:type partition_key: bytes
:returns: a `Future` if the producer was created with `sync=False`,
or `None` for `sync=True` (and in that case, any exceptions that
the future would have carried are raised here directly). The
`Future` carries the (successfully or unsuccessfully) produced
:class:`pykafka.protocol.Message` in an extra field, `kafka_msg`.
:rtype: `concurrent.futures.Future`
"""
if not (isinstance(message, bytes) or message is None):
raise TypeError("Producer.produce accepts a bytes object, but it "
Expand All @@ -237,24 +241,33 @@ def produce(self, message, partition_key=None):
raise ProducerStoppedException()
partitions = list(self._topic.partitions.values())
partition_id = self._partitioner(partitions, partition_key).id
message_partition_tup = (partition_key, message), partition_id, 0
self._produce(message_partition_tup)
if self._synchronous:
self._wait_all()

future = futures.Future()
msg = Message(value=message,
partition_key=partition_key,
partition_id=partition_id,
# prevent circular ref; see future.kafka_msg below
delivery_future=weakref.ref(future))
self._produce(msg)

self._raise_worker_exceptions()
if self._synchronous:
return future.result()

future.kafka_msg = msg
return future

def _produce(self, message_partition_tup):
def _produce(self, message):
"""Enqueue a message for the relevant broker
:param message_partition_tup: Message with partition assigned.
:type message_partition_tup: ((bytes, bytes), int) tuple
:param message: Message with valid `partition_id`, ready to be sent
:type message: `pykafka.protocol.Message`
"""
kv, partition_id, attempts = message_partition_tup
success = False
while not success:
leader_id = self._topic.partitions[partition_id].leader.id
leader_id = self._topic.partitions[message.partition_id].leader.id
if leader_id in self._owned_brokers:
self._owned_brokers[leader_id].enqueue([(kv, partition_id, attempts)])
self._owned_brokers[leader_id].enqueue(message)
success = True
else:
success = False
Expand All @@ -263,7 +276,7 @@ def _send_request(self, message_batch, owned_broker):
"""Send the produce request to the broker and handle the response.
:param message_batch: An iterable of messages to send
:type message_batch: iterable of `((key, value), partition_id)` tuples
:type message_batch: iterable of `pykafka.protocol.Message`
:param owned_broker: The broker to which to send the request
:type owned_broker: :class:`pykafka.producer.OwnedBroker`
"""
Expand All @@ -272,84 +285,83 @@ def _send_request(self, message_batch, owned_broker):
required_acks=self._required_acks,
timeout=self._ack_timeout_ms
)
for (key, value), partition_id, msg_attempt in message_batch:
req.add_message(
Message(value, partition_key=key, produce_attempt=msg_attempt),
self._topic.name,
partition_id
)
for msg in message_batch:
req.add_message(msg, self._topic.name, msg.partition_id)
log.debug("Sending %d messages to broker %d",
len(message_batch), owned_broker.broker.id)

def _get_partition_msgs(partition_id, req):
"""Get all the messages for the partitions from the request."""
messages = itertools.chain.from_iterable(
mset.messages
return (
mset
for topic, partitions in iteritems(req.msets)
for p_id, mset in iteritems(partitions)
if p_id == partition_id
)
for message in messages:
yield (message.partition_key, message.value), partition_id, message.produce_attempt

def mark_as_delivered(message_batch):
owned_broker.increment_messages_pending(-1 * len(message_batch))
for msg in message_batch:
f = msg.delivery_future()
if f is not None: # else user discarded future already
f.set_result(None)

try:
response = owned_broker.broker.produce_messages(req)
if self._required_acks == 0: # and thus, `response` is None
owned_broker.increment_messages_pending(
-1 * len(message_batch))
mark_as_delivered(message_batch)
return
to_retry = []

# Kafka either atomically appends or rejects whole MessageSets, so
# we define a list of potential retries thus:
to_retry = [] # (MessageSet, Exception) tuples

for topic, partitions in iteritems(response.topics):
for partition, presponse in iteritems(partitions):
if presponse.err == 0:
# mark msg_count messages as successfully delivered
msg_count = len(req.msets[topic][partition].messages)
owned_broker.increment_messages_pending(-1 * msg_count)
mark_as_delivered(req.msets[topic][partition].messages)
continue # All's well
if presponse.err == UnknownTopicOrPartition.ERROR_CODE:
log.warning('Unknown topic: %s or partition: %s. '
'Retrying.', topic, partition)
elif presponse.err == NotLeaderForPartition.ERROR_CODE:
log.warning('Partition leader for %s/%s changed. '
'Retrying.', topic, partition)
if presponse.err == NotLeaderForPartition.ERROR_CODE:
# Update cluster metadata to get new leader
self._update()
elif presponse.err == RequestTimedOut.ERROR_CODE:
log.warning('Produce request to %s:%s timed out. '
'Retrying.', owned_broker.broker.host,
owned_broker.broker.port)
elif presponse.err == LeaderNotAvailable.ERROR_CODE:
log.warning('Leader not available for partition %s.'
'Retrying.', partition)
elif presponse.err == InvalidMessageError.ERROR_CODE:
log.warning('Encountered InvalidMessageError')
elif presponse.err == InvalidMessageSize.ERROR_CODE:
log.warning('Encountered InvalidMessageSize')
continue
elif presponse.err == MessageSizeTooLarge.ERROR_CODE:
log.warning('Encountered MessageSizeTooLarge')
continue
to_retry.extend(_get_partition_msgs(partition, req))
except SocketDisconnectedError:
info = "Produce request for {}/{} to {}:{} failed.".format(
topic,
partition,
owned_broker.broker.host,
owned_broker.broker.port)
log.warning(info)
exc = ERROR_CODES[presponse.err](info)
to_retry.extend(
(mset, exc)
for mset in _get_partition_msgs(partition, req))
except SocketDisconnectedError as exc:
log.warning('Broker %s:%s disconnected. Retrying.',
owned_broker.broker.host,
owned_broker.broker.port)
self._update()
to_retry = [
((message.partition_key, message.value), p_id, message.produce_attempt)
(mset, exc)
for topic, partitions in iteritems(req.msets)
for p_id, mset in iteritems(partitions)
for message in mset.messages
]

if to_retry:
time.sleep(self._retry_backoff_ms / 1000)
owned_broker.increment_messages_pending(-1 * len(to_retry))
for kv, partition_id, msg_attempt in to_retry:
if msg_attempt >= self._max_retries:
raise ProduceFailureError("Message failed to send after %d "
"retries.", self._max_retries)
self._produce((kv, partition_id, msg_attempt + 1))
for mset, exc in to_retry:
# XXX arguably, we should try to check these non_recoverables
# for individual messages in _produce and raise errors there
# right away, rather than failing a whole batch here?
non_recoverable = type(exc) in (InvalidMessageSize,
MessageSizeTooLarge)
for msg in mset.messages:
if (non_recoverable or msg.produce_attempt >= self._max_retries):
f = msg.delivery_future()
if f is not None: # else user discarded future already
f.set_exception(exc)
else:
msg.produce_attempt += 1
self._produce(msg)

def _wait_all(self):
"""Block until all pending messages are sent
Expand Down Expand Up @@ -425,17 +437,16 @@ def message_is_pending(self):
"""
return self.messages_pending > 0

def enqueue(self, messages):
"""Push messages onto the queue
def enqueue(self, message):
"""Push message onto the queue
:param messages: The messages to push onto the queue
:type messages: iterable of tuples of the form
`((key, value), partition_id)`
:param message: The message to push onto the queue
:type message: `pykafka.protocol.Message`
"""
self._wait_for_slot_available()
with self.lock:
self.queue.extendleft(messages)
self.increment_messages_pending(len(messages))
self.queue.appendleft(message)
self.increment_messages_pending(1)
if len(self.queue) >= self.producer._min_queued_messages:
if not self.flush_ready.is_set():
self.flush_ready.set()
Expand Down
22 changes: 19 additions & 3 deletions pykafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ class Message(Message, Serializable):
  Value => bytes
:class:`pykafka.protocol.Message` also contains `partition` and
`partition_id` fields. Both of these have meaningless default values when
:class:`pykafka.protocol.Message` is used by the producer.
`partition_id` fields. Both of these have meaningless default values. When
:class:`pykafka.protocol.Message` is used by the producer, `partition_id`
identifies the Message's destination partition.
When used in a :class:`pykafka.protocol.FetchRequest`, `partition_id`
is set to the id of the partition from which the message was sent on
receipt of the message. In the :class:`pykafka.simpleconsumer.SimpleConsumer`,
Expand All @@ -150,16 +151,29 @@ class Message(Message, Serializable):
:ivar value: The payload associated with this message
:ivar offset: The offset of the message
:ivar partition_id: The id of the partition to which this message belongs
:ivar delivery_future: For use by :class:`pykafka.producer.Producer`
"""
MAGIC = 0

__slots__ = [
"compression_type",
"partition_key",
"value",
"offset",
"partition_id",
"partition",
"produce_attempt",
"delivery_future",
]

def __init__(self,
value,
partition_key=None,
compression_type=CompressionType.NONE,
offset=-1,
partition_id=-1,
produce_attempt=0):
produce_attempt=0,
delivery_future=None):
self.compression_type = compression_type
self.partition_key = partition_key
self.value = value
Expand All @@ -170,6 +184,8 @@ def __init__(self,
# self.partition is set by the consumer
self.partition = None
self.produce_attempt = produce_attempt
# delivery_future is used by the producer
self.delivery_future = delivery_future

def __len__(self):
size = 4 + 1 + 1 + 4 + 4
Expand Down
2 changes: 2 additions & 0 deletions pykafka/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@


class Serializable(object):
__slots__ = []

def __len__(self):
"""Length of the bytes that will be sent to the Kafka server."""
raise NotImplementedError()
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def get_version():
version_file.read()).group('version')

install_requires = [
'futures',
'kazoo',
'tabulate',
]
Expand Down
Loading

0 comments on commit 52ae7a1

Please sign in to comment.