Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KafkaAdmin class #1540

Merged
merged 1 commit into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def emit(self, record):
logging.getLogger(__name__).addHandler(NullHandler())


from kafka.admin import KafkaAdmin
from kafka.consumer import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.producer import KafkaProducer
Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(self, *args, **kwargs):


__all__ = [
'KafkaAdmin',
'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection',
'SimpleClient', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner',
Expand Down
10 changes: 10 additions & 0 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.kafka import KafkaAdmin
from kafka.admin.new_topic import NewTopic
from kafka.admin.new_partitions import NewPartitions

__all__ = [
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions'
]
36 changes: 36 additions & 0 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum


class ConfigResourceType(IntEnum):
"""An enumerated type of config resources"""

BROKER = 4,
TOPIC = 2


class ConfigResource(object):
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
name (string): The name of the kafka resource
configs ({key : value}): A maps of config keys to values.
"""

def __init__(
self,
resource_type,
name,
configs=None
):
if not isinstance(resource_type, (ConfigResourceType)):
resource_type = ConfigResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object
self.resource_type = resource_type
self.name = name
self.configs = configs
505 changes: 505 additions & 0 deletions kafka/admin/kafka.py

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import absolute_import


class NewPartitions(object):
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
total_count (int): the total number of partitions that should exist on the topic
new_assignments ([[int]]): an array of arrays of replica assignments for new partitions.
If not set, broker assigns replicas per an internal algorithm.
"""

def __init__(
self,
total_count,
new_assignments=None
):
self.total_count = total_count
self.new_assignments = new_assignments
34 changes: 34 additions & 0 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
""" A class for new topic creation
Arguments:
name (string): name of the topic
num_partitions (int): number of partitions
or -1 if replica_assignment has been specified
replication_factor (int): replication factor or -1 if
replica assignment is specified
replica_assignment (dict of int: [int]): A mapping containing
partition id and replicas to assign to it.
topic_configs (dict of str: str): A mapping of config key
and value for the topic.
"""

def __init__(
self,
name,
num_partitions,
replication_factor,
replica_assignments=None,
topic_configs=None,
):
if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None):
raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified')
self.name = name
self.num_partitions = num_partitions
self.replication_factor = replication_factor
self.replica_assignments = replica_assignments or {}
self.topic_configs = topic_configs or {}
16 changes: 16 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def __init__(self, **configs):
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
self._conns = Dict() # object to support weakrefs
self._api_versions = None
self._connecting = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
Expand Down Expand Up @@ -808,6 +809,17 @@ def refresh_done(val_or_error):
# to let us know the selected connection might be usable again.
return float('inf')

def get_api_versions(self):
"""Return the ApiVersions map, if available.

Note: A call to check_version must previously have succeeded and returned
version 0.10.0 or later

Returns: a map of dict mapping {api_key : (min_version, max_version)},
or None if ApiVersion is not supported by the kafka cluster.
"""
return self._api_versions

def check_version(self, node_id=None, timeout=2, strict=False):
"""Attempt to guess the version of a Kafka broker.

Expand Down Expand Up @@ -841,6 +853,10 @@ def check_version(self, node_id=None, timeout=2, strict=False):
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
if version >= (0, 10, 0):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
Expand Down
10 changes: 10 additions & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,16 @@ def _handle_api_version_response(self, response):
])
return self._api_versions

def get_api_versions(self):
version = self.check_version()
if version < (0, 10, 0):
raise Errors.UnsupportedVersionError(
"ApiVersion not supported by cluster version {} < 0.10.0"
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
return self._api_versions;

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in reverse order. As soon as we find one that works, return it
Expand Down
5 changes: 5 additions & 0 deletions kafka/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@
33: 'AlterConfigs',
36: 'SaslAuthenticate',
37: 'CreatePartitions',
38: 'CreateDelegationToken',
39: 'RenewDelegationToken',
40: 'ExpireDelegationToken',
41: 'DescribeDelegationToken',
42: 'DeleteGroups',
}
47 changes: 47 additions & 0 deletions test/test_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pytest

import kafka.admin
from kafka.errors import IllegalArgumentError


def test_config_resource():
with pytest.raises(KeyError):
bad_resource = kafka.admin.ConfigResource('something', 'foo')
good_resource = kafka.admin.ConfigResource('broker', 'bar')
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER)
assert(good_resource.name == 'bar')
assert(good_resource.configs is None)
good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob' : 'nob'})
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC)
assert(good_resource.name == 'baz')
assert(good_resource.configs == {'frob' : 'nob'})


def test_new_partitions():
good_partitions = kafka.admin.NewPartitions(6)
assert(good_partitions.total_count == 6)
assert(good_partitions.new_assignments is None)
good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]])
assert(good_partitions.total_count == 7)
assert(good_partitions.new_assignments == [[1, 2, 3]])


def test_new_topic():
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', 1, -1)
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1 : [1, 1, 1]})
good_topic = kafka.admin.NewTopic('foo', 1, 2)
assert(good_topic.name == 'foo')
assert(good_topic.num_partitions == 1)
assert(good_topic.replication_factor == 2)
assert(good_topic.replica_assignments == {})
assert(good_topic.topic_configs == {})
good_topic = kafka.admin.NewTopic('bar', -1, -1, {1 : [1, 2, 3]}, {'key' : 'value'})
assert(good_topic.name == 'bar')
assert(good_topic.num_partitions == -1)
assert(good_topic.replication_factor == -1)
assert(good_topic.replica_assignments == {1: [1, 2, 3]})
assert(good_topic.topic_configs == {'key' : 'value'})