Skip to content

Commit

Permalink
Add KafkaAdmin class
Browse files Browse the repository at this point in the history
Requires cluster version > 0.10.0.0, and uses new wire protocol
classes to do many things via broker connection that previously
needed to be done directly in zookeeper.
  • Loading branch information
llamahunter committed Jul 12, 2018
1 parent 9ac3cb1 commit b046d25
Show file tree
Hide file tree
Showing 11 changed files with 683 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def emit(self, record):
logging.getLogger(__name__).addHandler(NullHandler())


from kafka.admin import KafkaAdmin
from kafka.consumer import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.producer import KafkaProducer
Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(self, *args, **kwargs):


__all__ = [
'KafkaAdmin',
'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection',
'SimpleClient', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner',
Expand Down
10 changes: 10 additions & 0 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.kafka import KafkaAdmin
from kafka.admin.new_topic import NewTopic
from kafka.admin.new_partitions import NewPartitions

__all__ = [
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions'
]
35 changes: 35 additions & 0 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import absolute_import

import sys
if sys.version_info < (3, 4):
from enum34 import IntEnum # pylint: disable=import-error
else:
from enum import IntEnum


class ConfigResourceType(IntEnum):
"""An enumerated type of config resources"""

BROKER = 4,
TOPIC = 2


class ConfigResource(object):
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
name (string): The name of the kafka resource
configs ({key : value}): A maps of config keys to values.
"""

def __init__(
self,
resource_type,
name,
configs=None
):
if not isinstance(resource_type, (ConfigResourceType)):
resource_type = ConfigResourceType[str(resource_type).upper()]
self.resource_type = resource_type
self.name = name
self.configs = configs
504 changes: 504 additions & 0 deletions kafka/admin/kafka.py

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import absolute_import


class NewPartitions(object):
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
total_count (int): the total number of partitions that should exist on the topic
new_assignments ([[int]]): an array of arrays of replica assignments for new partitions.
If not set, broker assigns replicas per an internal algorithm.
"""

def __init__(
self,
total_count,
new_assignments=None
):
self.total_count = total_count
self.new_assignments = new_assignments
34 changes: 34 additions & 0 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
""" A class for new topic creation
Arguments:
name (string): name of the topic
num_partitions (int): number of partitions
or -1 if replica_assignment has been specified
replication_factor (int): replication factor or -1 if
replica assignment is specified
replica_assignment (dict of int: [int]): A mapping containing
partition id and replicas to assign to it.
topic_configs (dict of str: str): A mapping of config key
and value for the topic.
"""

def __init__(
self,
name,
num_partitions,
replication_factor,
replica_assignments=None,
topic_configs=None,
):
if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None):
raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified')
self.name = name
self.num_partitions = num_partitions
self.replication_factor = replication_factor
self.replica_assignments = replica_assignments or {}
self.topic_configs = topic_configs or {}
16 changes: 16 additions & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def __init__(self, **configs):
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
self._conns = Dict() # object to support weakrefs
self._api_versions = None
self._connecting = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
Expand Down Expand Up @@ -798,6 +799,17 @@ def refresh_done(val_or_error):
# to let us know the selected connection might be usable again.
return float('inf')

def get_api_versions(self):
"""Return the ApiVersions map, if available.
Note: A call to check_version must previously have succeeded and returned
version 0.10.0 or later
Returns: a map of dict mapping {api_key : (min_version, max_version)},
or None if ApiVersion is not supported by the kafka cluster.
"""
return self._api_versions

def check_version(self, node_id=None, timeout=2, strict=False):
"""Attempt to guess the version of a Kafka broker.
Expand Down Expand Up @@ -831,6 +843,10 @@ def check_version(self, node_id=None, timeout=2, strict=False):
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict)
if version >= (0, 10, 0):
# cache the api versions map if it's available (starting
# in 0.10 cluster version)
self._api_versions = conn.get_api_versions()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
Expand Down
10 changes: 10 additions & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,16 @@ def _handle_api_version_response(self, response):
])
return self._api_versions

def get_api_versions(self):
version = self.check_version()
if version < (0, 10, 0):
raise Errors.UnsupportedVersionError(
"ApiVersion not supported by cluster version {} < 0.10.0"
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
return self._api_versions;

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in reverse order. As soon as we find one that works, return it
Expand Down
5 changes: 5 additions & 0 deletions kafka/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@
33: 'AlterConfigs',
36: 'SaslAuthenticate',
37: 'CreatePartitions',
38: 'CreateDelegationToken',
39: 'RenewDelegationToken',
40: 'ExpireDelegationToken',
41: 'DescribeDelegationToken',
42: 'DeleteGroups',
}
47 changes: 47 additions & 0 deletions test/test_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pytest

import kafka.admin
from kafka.errors import IllegalArgumentError


def test_config_resource():
with pytest.raises(KeyError):
bad_resource = kafka.admin.ConfigResource('something', 'foo')
good_resource = kafka.admin.ConfigResource('broker', 'bar')
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER)
assert(good_resource.name == 'bar')
assert(good_resource.configs is None)
good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob' : 'nob'})
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC)
assert(good_resource.name == 'baz')
assert(good_resource.configs == {'frob' : 'nob'})


def test_new_partitions():
good_partitions = kafka.admin.NewPartitions(6)
assert(good_partitions.total_count == 6)
assert(good_partitions.new_assignments is None)
good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]])
assert(good_partitions.total_count == 7)
assert(good_partitions.new_assignments == [[1, 2, 3]])


def test_new_topic():
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', 1, -1)
with pytest.raises(IllegalArgumentError):
bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1 : [1, 1, 1]})
good_topic = kafka.admin.NewTopic('foo', 1, 2)
assert(good_topic.name == 'foo')
assert(good_topic.num_partitions == 1)
assert(good_topic.replication_factor == 2)
assert(good_topic.replica_assignments == {})
assert(good_topic.topic_configs == {})
good_topic = kafka.admin.NewTopic('bar', -1, -1, {1 : [1, 2, 3]}, {'key' : 'value'})
assert(good_topic.name == 'bar')
assert(good_topic.num_partitions == -1)
assert(good_topic.replication_factor == -1)
assert(good_topic.replica_assignments == {1: [1, 2, 3]})
assert(good_topic.topic_configs == {'key' : 'value'})
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ deps =
crc32c
py26: unittest2
decorator
py27: enum34
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
setenv =
Expand Down

0 comments on commit b046d25

Please sign in to comment.