Skip to content

Commit 481f880

Browse files
llamahunterjeffwidman
authored andcommitted
Add KafkaAdmin class
Requires cluster version > 0.10.0.0, and uses new wire protocol classes to do many things via broker connection that previously needed to be done directly in zookeeper.
1 parent ac9d562 commit 481f880

10 files changed

+684
-0
lines changed

kafka/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def emit(self, record):
1818
logging.getLogger(__name__).addHandler(NullHandler())
1919

2020

21+
from kafka.admin import KafkaAdmin
2122
from kafka.consumer import KafkaConsumer
2223
from kafka.consumer.subscription_state import ConsumerRebalanceListener
2324
from kafka.producer import KafkaProducer
@@ -46,6 +47,7 @@ def __init__(self, *args, **kwargs):
4647

4748

4849
__all__ = [
50+
'KafkaAdmin',
4951
'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection',
5052
'SimpleClient', 'SimpleProducer', 'KeyedProducer',
5153
'RoundRobinPartitioner', 'HashedPartitioner',

kafka/admin/__init__.py

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.admin.config_resource import ConfigResource, ConfigResourceType
4+
from kafka.admin.kafka import KafkaAdmin
5+
from kafka.admin.new_topic import NewTopic
6+
from kafka.admin.new_partitions import NewPartitions
7+
8+
__all__ = [
9+
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions'
10+
]

kafka/admin/config_resource.py

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from __future__ import absolute_import
2+
3+
# enum in stdlib as of py3.4
4+
try:
5+
from enum import IntEnum # pylint: disable=import-error
6+
except ImportError:
7+
# vendored backport module
8+
from kafka.vendor.enum34 import IntEnum
9+
10+
11+
class ConfigResourceType(IntEnum):
12+
"""An enumerated type of config resources"""
13+
14+
BROKER = 4,
15+
TOPIC = 2
16+
17+
18+
class ConfigResource(object):
19+
"""A class for specifying config resources.
20+
Arguments:
21+
resource_type (ConfigResourceType): the type of kafka resource
22+
name (string): The name of the kafka resource
23+
configs ({key : value}): A maps of config keys to values.
24+
"""
25+
26+
def __init__(
27+
self,
28+
resource_type,
29+
name,
30+
configs=None
31+
):
32+
if not isinstance(resource_type, (ConfigResourceType)):
33+
resource_type = ConfigResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object
34+
self.resource_type = resource_type
35+
self.name = name
36+
self.configs = configs

kafka/admin/kafka.py

+505
Large diffs are not rendered by default.

kafka/admin/new_partitions.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from __future__ import absolute_import
2+
3+
4+
class NewPartitions(object):
5+
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
6+
must be the difference between the new total number of partitions and the existing number of partitions.
7+
Arguments:
8+
total_count (int): the total number of partitions that should exist on the topic
9+
new_assignments ([[int]]): an array of arrays of replica assignments for new partitions.
10+
If not set, broker assigns replicas per an internal algorithm.
11+
"""
12+
13+
def __init__(
14+
self,
15+
total_count,
16+
new_assignments=None
17+
):
18+
self.total_count = total_count
19+
self.new_assignments = new_assignments

kafka/admin/new_topic.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.errors import IllegalArgumentError
4+
5+
6+
class NewTopic(object):
7+
""" A class for new topic creation
8+
Arguments:
9+
name (string): name of the topic
10+
num_partitions (int): number of partitions
11+
or -1 if replica_assignment has been specified
12+
replication_factor (int): replication factor or -1 if
13+
replica assignment is specified
14+
replica_assignment (dict of int: [int]): A mapping containing
15+
partition id and replicas to assign to it.
16+
topic_configs (dict of str: str): A mapping of config key
17+
and value for the topic.
18+
"""
19+
20+
def __init__(
21+
self,
22+
name,
23+
num_partitions,
24+
replication_factor,
25+
replica_assignments=None,
26+
topic_configs=None,
27+
):
28+
if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None):
29+
raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified')
30+
self.name = name
31+
self.num_partitions = num_partitions
32+
self.replication_factor = replication_factor
33+
self.replica_assignments = replica_assignments or {}
34+
self.topic_configs = topic_configs or {}

kafka/client_async.py

+16
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ def __init__(self, **configs):
196196
self._metadata_refresh_in_progress = False
197197
self._selector = self.config['selector']()
198198
self._conns = Dict() # object to support weakrefs
199+
self._api_versions = None
199200
self._connecting = set()
200201
self._refresh_on_disconnects = True
201202
self._last_bootstrap = 0
@@ -808,6 +809,17 @@ def refresh_done(val_or_error):
808809
# to let us know the selected connection might be usable again.
809810
return float('inf')
810811

812+
def get_api_versions(self):
813+
"""Return the ApiVersions map, if available.
814+
815+
Note: A call to check_version must previously have succeeded and returned
816+
version 0.10.0 or later
817+
818+
Returns: a map of dict mapping {api_key : (min_version, max_version)},
819+
or None if ApiVersion is not supported by the kafka cluster.
820+
"""
821+
return self._api_versions
822+
811823
def check_version(self, node_id=None, timeout=2, strict=False):
812824
"""Attempt to guess the version of a Kafka broker.
813825
@@ -841,6 +853,10 @@ def check_version(self, node_id=None, timeout=2, strict=False):
841853
try:
842854
remaining = end - time.time()
843855
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
856+
if version >= (0, 10, 0):
857+
# cache the api versions map if it's available (starting
858+
# in 0.10 cluster version)
859+
self._api_versions = conn.get_api_versions()
844860
return version
845861
except Errors.NodeNotReadyError:
846862
# Only raise to user if this is a node-specific request

kafka/conn.py

+10
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,16 @@ def _handle_api_version_response(self, response):
873873
])
874874
return self._api_versions
875875

876+
def get_api_versions(self):
877+
version = self.check_version()
878+
if version < (0, 10, 0):
879+
raise Errors.UnsupportedVersionError(
880+
"ApiVersion not supported by cluster version {} < 0.10.0"
881+
.format(version))
882+
# _api_versions is set as a side effect of check_versions() on a cluster
883+
# that supports 0.10.0 or later
884+
return self._api_versions;
885+
876886
def _infer_broker_version_from_api_versions(self, api_versions):
877887
# The logic here is to check the list of supported request versions
878888
# in reverse order. As soon as we find one that works, return it

kafka/protocol/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@
4444
33: 'AlterConfigs',
4545
36: 'SaslAuthenticate',
4646
37: 'CreatePartitions',
47+
38: 'CreateDelegationToken',
48+
39: 'RenewDelegationToken',
49+
40: 'ExpireDelegationToken',
50+
41: 'DescribeDelegationToken',
51+
42: 'DeleteGroups',
4752
}

test/test_admin.py

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import pytest
2+
3+
import kafka.admin
4+
from kafka.errors import IllegalArgumentError
5+
6+
7+
def test_config_resource():
8+
with pytest.raises(KeyError):
9+
bad_resource = kafka.admin.ConfigResource('something', 'foo')
10+
good_resource = kafka.admin.ConfigResource('broker', 'bar')
11+
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER)
12+
assert(good_resource.name == 'bar')
13+
assert(good_resource.configs is None)
14+
good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob' : 'nob'})
15+
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC)
16+
assert(good_resource.name == 'baz')
17+
assert(good_resource.configs == {'frob' : 'nob'})
18+
19+
20+
def test_new_partitions():
21+
good_partitions = kafka.admin.NewPartitions(6)
22+
assert(good_partitions.total_count == 6)
23+
assert(good_partitions.new_assignments is None)
24+
good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]])
25+
assert(good_partitions.total_count == 7)
26+
assert(good_partitions.new_assignments == [[1, 2, 3]])
27+
28+
29+
def test_new_topic():
30+
with pytest.raises(IllegalArgumentError):
31+
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
32+
with pytest.raises(IllegalArgumentError):
33+
bad_topic = kafka.admin.NewTopic('foo', 1, -1)
34+
with pytest.raises(IllegalArgumentError):
35+
bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1 : [1, 1, 1]})
36+
good_topic = kafka.admin.NewTopic('foo', 1, 2)
37+
assert(good_topic.name == 'foo')
38+
assert(good_topic.num_partitions == 1)
39+
assert(good_topic.replication_factor == 2)
40+
assert(good_topic.replica_assignments == {})
41+
assert(good_topic.topic_configs == {})
42+
good_topic = kafka.admin.NewTopic('bar', -1, -1, {1 : [1, 2, 3]}, {'key' : 'value'})
43+
assert(good_topic.name == 'bar')
44+
assert(good_topic.num_partitions == -1)
45+
assert(good_topic.replication_factor == -1)
46+
assert(good_topic.replica_assignments == {1: [1, 2, 3]})
47+
assert(good_topic.topic_configs == {'key' : 'value'})

0 commit comments

Comments
 (0)