-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Requires cluster version > 0.10.0.0, and uses new wire protocol classes to do many things via broker connection that previously needed to be done directly in zookeeper.
- Loading branch information
1 parent
a7e28ae
commit 6eeaf63
Showing
10 changed files
with
683 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from __future__ import absolute_import | ||
|
||
from kafka.admin.config_resource import ConfigResource, ConfigResourceType | ||
from kafka.admin.kafka import KafkaAdmin | ||
from kafka.admin.new_topic import NewTopic | ||
from kafka.admin.new_partitions import NewPartitions | ||
|
||
__all__ = [ | ||
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions' | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from __future__ import absolute_import | ||
|
||
# enum in stdlib as of py3.4 | ||
try: | ||
from enum import IntEnum # pylint: disable=import-error | ||
except ImportError: | ||
# vendored backport module | ||
from kafka.vendor.enum34 import IntEnum | ||
|
||
|
||
class ConfigResourceType(IntEnum): | ||
"""An enumerated type of config resources""" | ||
|
||
BROKER = 4, | ||
TOPIC = 2 | ||
|
||
|
||
class ConfigResource(object): | ||
"""A class for specifying config resources. | ||
Arguments: | ||
resource_type (ConfigResourceType): the type of kafka resource | ||
name (string): The name of the kafka resource | ||
configs ({key : value}): A maps of config keys to values. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
resource_type, | ||
name, | ||
configs=None | ||
): | ||
if not isinstance(resource_type, (ConfigResourceType)): | ||
resource_type = ConfigResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object | ||
self.resource_type = resource_type | ||
self.name = name | ||
self.configs = configs |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from __future__ import absolute_import | ||
|
||
|
||
class NewPartitions(object): | ||
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified, | ||
must be the difference between the new total number of partitions and the existing number of partitions. | ||
Arguments: | ||
total_count (int): the total number of partitions that should exist on the topic | ||
new_assignments ([[int]]): an array of arrays of replica assignments for new partitions. | ||
If not set, broker assigns replicas per an internal algorithm. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
total_count, | ||
new_assignments=None | ||
): | ||
self.total_count = total_count | ||
self.new_assignments = new_assignments |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from __future__ import absolute_import | ||
|
||
from kafka.errors import IllegalArgumentError | ||
|
||
|
||
class NewTopic(object): | ||
""" A class for new topic creation | ||
Arguments: | ||
name (string): name of the topic | ||
num_partitions (int): number of partitions | ||
or -1 if replica_assignment has been specified | ||
replication_factor (int): replication factor or -1 if | ||
replica assignment is specified | ||
replica_assignment (dict of int: [int]): A mapping containing | ||
partition id and replicas to assign to it. | ||
topic_configs (dict of str: str): A mapping of config key | ||
and value for the topic. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
name, | ||
num_partitions, | ||
replication_factor, | ||
replica_assignments=None, | ||
topic_configs=None, | ||
): | ||
if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None): | ||
raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified') | ||
self.name = name | ||
self.num_partitions = num_partitions | ||
self.replication_factor = replication_factor | ||
self.replica_assignments = replica_assignments or {} | ||
self.topic_configs = topic_configs or {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import pytest | ||
|
||
import kafka.admin | ||
from kafka.errors import IllegalArgumentError | ||
|
||
|
||
def test_config_resource(): | ||
with pytest.raises(KeyError): | ||
bad_resource = kafka.admin.ConfigResource('something', 'foo') | ||
good_resource = kafka.admin.ConfigResource('broker', 'bar') | ||
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER) | ||
assert(good_resource.name == 'bar') | ||
assert(good_resource.configs is None) | ||
good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob' : 'nob'}) | ||
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC) | ||
assert(good_resource.name == 'baz') | ||
assert(good_resource.configs == {'frob' : 'nob'}) | ||
|
||
|
||
def test_new_partitions(): | ||
good_partitions = kafka.admin.NewPartitions(6) | ||
assert(good_partitions.total_count == 6) | ||
assert(good_partitions.new_assignments is None) | ||
good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]]) | ||
assert(good_partitions.total_count == 7) | ||
assert(good_partitions.new_assignments == [[1, 2, 3]]) | ||
|
||
|
||
def test_new_topic(): | ||
with pytest.raises(IllegalArgumentError): | ||
bad_topic = kafka.admin.NewTopic('foo', -1, -1) | ||
with pytest.raises(IllegalArgumentError): | ||
bad_topic = kafka.admin.NewTopic('foo', 1, -1) | ||
with pytest.raises(IllegalArgumentError): | ||
bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1 : [1, 1, 1]}) | ||
good_topic = kafka.admin.NewTopic('foo', 1, 2) | ||
assert(good_topic.name == 'foo') | ||
assert(good_topic.num_partitions == 1) | ||
assert(good_topic.replication_factor == 2) | ||
assert(good_topic.replica_assignments == {}) | ||
assert(good_topic.topic_configs == {}) | ||
good_topic = kafka.admin.NewTopic('bar', -1, -1, {1 : [1, 2, 3]}, {'key' : 'value'}) | ||
assert(good_topic.name == 'bar') | ||
assert(good_topic.num_partitions == -1) | ||
assert(good_topic.replication_factor == -1) | ||
assert(good_topic.replica_assignments == {1: [1, 2, 3]}) | ||
assert(good_topic.topic_configs == {'key' : 'value'}) |