Skip to content

Commit

Permalink
Added max_bytes option and FetchRequest_v3 usage. (#962)
Browse files Browse the repository at this point in the history
* Added `max_bytes` option and FetchRequest_v3 usage.
* Add checks for versions above 0.10 based on ApiVersionResponse
  • Loading branch information
tvoinarovskyi authored and dpkp committed Mar 6, 2017
1 parent ff6f7bf commit 9c19ea7
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 9 deletions.
2 changes: 2 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class KafkaClient(object):
'sasl_plain_password': None,
}
API_VERSIONS = [
(0, 10, 1),
(0, 10, 0),
(0, 10),
(0, 9),
(0, 8, 2),
Expand Down
25 changes: 24 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32
from kafka.version import __version__

Expand Down Expand Up @@ -760,6 +761,24 @@ def _next_correlation_id(self):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id

def _check_version_above_0_10(self, response):
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 10, 1), MetadataRequest[2])
]

error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
max_versions = dict([
(api_key, max_version)
for api_key, _, max_version in response.api_versions
])
# Get the best match of test cases
for broker_version, struct in test_cases:
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
return broker_version
return (0, 10, 0)

def check_version(self, timeout=2, strict=False):
"""Attempt to guess the broker version.
Expand All @@ -784,7 +803,6 @@ def check_version(self, timeout=2, strict=False):
# socket.error (32, 54, or 104)
from .protocol.admin import ApiVersionRequest, ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from .protocol.metadata import MetadataRequest

# Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter
Expand All @@ -798,6 +816,7 @@ def filter(self, record):
log.addFilter(log_filter)

test_cases = [
# All cases starting from 0.10 will be based on ApiVersionResponse
((0, 10), ApiVersionRequest[0]()),
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
Expand Down Expand Up @@ -838,6 +857,10 @@ def connect():
self._sock.setblocking(False)

if f.succeeded():
if version == (0, 10):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
version = self._check_version_above_0_10(f.value)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
Expand Down
43 changes: 36 additions & 7 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Fetcher(six.Iterator):
'value_deserializer': None,
'fetch_min_bytes': 1,
'fetch_max_wait_ms': 500,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1048576,
'max_poll_records': sys.maxsize,
'check_crcs': True,
Expand All @@ -64,6 +65,15 @@ def __init__(self, client, subscriptions, metrics, **configs):
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
fetch_max_bytes (int): The maximum amount of data the server should
return for a fetch request. This is not an absolute maximum, if
the first message in the first non-empty partition of the fetch
is larger than this value, the message will still be returned
to ensure that the consumer can make progress. NOTE: consumer
performs fetches to multiple brokers in parallel so memory
usage will depend on the number of brokers containing
partitions for the topic.
Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes.
Expand Down Expand Up @@ -617,7 +627,7 @@ def _handle_offset_response(self, partition, future, response):
log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset)
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
Errors.UnknownTopicOrPartitionError):
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
Expand Down Expand Up @@ -664,19 +674,38 @@ def _create_fetch_requests(self):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)

if self.config['api_version'] >= (0, 10):
if self.config['api_version'] >= (0, 10, 1):
version = 3
elif self.config['api_version'] >= (0, 10):
version = 2
elif self.config['api_version'] == (0, 9):
version = 1
else:
version = 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
partition_data.items())
if version < 3:
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
partition_data.items())
else:
# As of version == 3 partitions will be returned in order as
# they are requested, so to avoid starvation with
# `fetch_max_bytes` option we need this shuffle
# NOTE: we do have partition_data in random order due to usage
# of unordered structures like dicts, but that does not
# guaranty equal distribution, and starting Python3.6
# dicts retain insert order.
partition_data = list(partition_data.items())
random.shuffle(partition_data)
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
self.config['fetch_max_bytes'],
partition_data)
return requests

def _handle_fetch_response(self, request, send_time, response):
Expand Down
9 changes: 9 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ class KafkaConsumer(six.Iterator):
the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500.
fetch_max_bytes (int): The maximum amount of data the server should
return for a fetch request. This is not an absolute maximum, if the
first message in the first non-empty partition of the fetch is
larger than this value, the message will still be returned to
ensure that the consumer can make progress. NOTE: consumer performs
fetches to multiple brokers in parallel so memory usage will depend
on the number of brokers containing partitions for the topic.
Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes.
Expand Down Expand Up @@ -212,6 +220,7 @@ class KafkaConsumer(six.Iterator):
'value_deserializer': None,
'fetch_max_wait_ms': 500,
'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1 * 1024 * 1024,
'request_timeout_ms': 40 * 1000,
'retry_backoff_ms': 100,
Expand Down
46 changes: 46 additions & 0 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os

from six.moves import xrange
import six

from . import unittest
from kafka import (
Expand Down Expand Up @@ -572,3 +573,48 @@ def test_kafka_consumer__offset_commit_resume(self):
output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)

@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_simple(self):
self.send_messages(0, range(100, 200))
self.send_messages(1, range(200, 300))

# Start a consumer
consumer = self.kafka_consumer(
auto_offset_reset='earliest', fetch_max_bytes=300)
fetched_size = 0
seen_partitions = set([])
for i in range(10):
poll_res = consumer.poll(timeout_ms=100)
for partition, msgs in six.iteritems(poll_res):
for msg in msgs:
fetched_size += len(msg.value)
seen_partitions.add(partition)

# Check that we fetched at least 1 message from both partitions
self.assertEqual(
seen_partitions, set([
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
self.assertLess(fetched_size, 3000)

@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_one_msg(self):
# We send to only 1 partition so we don't have parallel requests to 2
# nodes for data.
self.send_messages(0, range(100, 200))

# Start a consumer. FetchResponse_v3 should always include at least 1
# full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time
consumer = self.kafka_consumer(
auto_offset_reset='earliest', fetch_max_bytes=1)
fetched_msgs = []
# A bit hacky, but we need this in order for message count to be exact
consumer._coordinator.ensure_active_group()
for i in range(10):
poll_res = consumer.poll(timeout_ms=2000)
print(poll_res)
for partition, msgs in six.iteritems(poll_res):
for msg in msgs:
fetched_msgs.append(msg)

self.assertEqual(len(fetched_msgs), 10)
3 changes: 2 additions & 1 deletion test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def test_send_fetches(fetcher, mocker):


@pytest.mark.parametrize(("api_version", "fetch_version"), [
((0, 10), 2),
((0, 10, 1), 3),
((0, 10, 0), 2),
((0, 9), 1),
((0, 8), 0)
])
Expand Down

0 comments on commit 9c19ea7

Please sign in to comment.