From 5132fa81ea0b92084bf32a8fa6f54f5383f6c12a Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Fri, 16 Nov 2018 22:58:46 +0100 Subject: [PATCH 1/8] Add KafkaAdmin ACL methods --- kafka/admin/__init__.py | 5 +- kafka/admin/acl_resource.py | 84 ++++++++++++++++++++++ kafka/admin/client.py | 134 +++++++++++++++++++++++++++++++++++- 3 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 kafka/admin/acl_resource.py diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index a300301c6..b1e102673 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -2,9 +2,12 @@ from kafka.admin.config_resource import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient +from kafka.admin.acl_resource import (AclResource, AclOperation, AclResourceType, AclPermissionType, + AclResourcePatternType) from kafka.admin.new_topic import NewTopic from kafka.admin.new_partitions import NewPartitions __all__ = [ - 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions' + 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'AclResource', 'AclOperation', + 'AclResourceType', 'AclPermissionType', 'AclResourcePatternType' ] diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py new file mode 100644 index 000000000..ff3df4ac8 --- /dev/null +++ b/kafka/admin/acl_resource.py @@ -0,0 +1,84 @@ +from __future__ import absolute_import + +# enum in stdlib as of py3.4 +try: + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + +class AclResourceType(IntEnum): + """An enumerated type of config resources""" + + ANY = 1, + BROKER = 4, + DELEGATION_TOKEN = 6, + GROUP = 3, + TOPIC = 2, + TRANSACTIONAL_ID = 5 + +class AclOperation(IntEnum): + """An enumerated type of acl operations""" + + ANY = 1, + ALL = 2, + READ = 3, + WRITE = 4, + CREATE = 5, + DELETE = 6, + ALTER = 7, + DESCRIBE = 8, + CLUSTER_ACTION = 9, + DESCRIBE_CONFIGS = 10, + ALTER_CONFIGS = 11, + IDEMPOTENT_WRITE = 12 + + +class AclPermissionType(IntEnum): + """An enumerated type of permissions""" + + ANY = 1, + DENY = 2, + ALLOW = 3 + +class AclResourcePatternType(IntEnum): + """An enumerated type of resource patterns""" + + ANY = 1, + MATCH = 2, + LITERAL = 3, + PREFIXED = 4 + +class AclResource(object): + """A class for specifying config resources. + Arguments: + resource_type (ConfigResourceType): the type of kafka resource + name (string): The name of the kafka resource + configs ({key : value}): A maps of config keys to values. + """ + + def __init__( + self, + resource_type, + operation, + permission_type, + name=None, + principal=None, + host=None, + pattern_type=AclResourcePatternType.LITERAL + ): + if not isinstance(resource_type, AclResourceType): + resource_type = AclResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object + self.resource_type = resource_type + if not isinstance(operation, AclOperation): + operation = AclOperation[str(operation).upper()] # pylint: disable-msg:unsubscriptable-object + self.operation = operation + if not isinstance(permission_type, AclPermissionType): + permission_type = AclPermissionType[str(permission_type).upper()] # pylint: disable-msg=unsubscriptable-object + self.permission_type = permission_type + self.name = name + self.principal = principal + self.host = host + if not isinstance(pattern_type, AclResourcePatternType): + pattern_type = AclResourcePatternType[str(pattern_type).upper()] # pylint: disable-msg=unsubscriptable-object + self.pattern_type = pattern_type diff --git a/kafka/admin/client.py b/kafka/admin/client.py index e25afe7d8..42156e93e 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -11,14 +11,15 @@ import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError, - UnrecognizedBrokerVersion) + UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, - ListGroupsRequest, DescribeGroupsRequest) + ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.admin.acl_resource import AclOperation, AclPermissionType from kafka.version import __version__ @@ -430,12 +431,141 @@ def delete_topics(self, topics, timeout_ms=None): # describe_acls protocol not yet implemented # Note: send the request to the least_loaded_node() + def describe_acls(self, acl_resource): + """Describe a set of ACLs + """ + + version = self._matching_api_version(DescribeAclsRequest) + if version == 0: + request = DescribeAclsRequest[version]( + resource_type=acl_resource.resource_type, + resource_name=acl_resource.name, + principal=acl_resource.principal, + host=acl_resource.host, + operation=acl_resource.operation, + permission_type=acl_resource.permission_type + ) + elif version <= 1: + request = DescribeAclsRequest[version]( + resource_type=acl_resource.resource_type, + resource_name=acl_resource.name, + resource_pattern_type_filter=acl_resource.pattern_type, + principal=acl_resource.principal, + host=acl_resource.host, + operation=acl_resource.operation, + permission_type=acl_resource.permission_type + + ) + else: + raise UnsupportedVersionError( + "missing implementation of DescribeAcls for library supported version {}" + .format(version) + ) + + return self._send(request) + + @staticmethod + def _convert_create_acls_resource_request_v0(acl_resource): + if acl_resource.operation == AclOperation.ANY: + raise IllegalArgumentError("operation must not be ANY") + if acl_resource.permission_type == AclPermissionType.ANY: + raise IllegalArgumentError("permission_type must not be ANY") + + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) + + @staticmethod + def _convert_create_acls_resource_request_v1(acl_resource): + + if acl_resource.operation == AclOperation.ANY: + raise IllegalArgumentError("operation must not be ANY") + if acl_resource.permission_type == AclPermissionType.ANY: + raise IllegalArgumentError("permission_type must not be ANY") + + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.pattern_type, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) # create_acls protocol not yet implemented # Note: send the request to the least_loaded_node() + def create_acls(self, acl_resources): + """Create a set of ACLs""" + + version = self._matching_api_version(DescribeAclsRequest) + if version == 0: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v0(acl_resource) for acl_resource in acl_resources] + ) + elif version <= 1: + request = CreateAclsRequest[version]( + creations=[self._convert_create_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources] + ) + else: + raise UnsupportedVersionError( + "missing implementation of DescribeAcls for library supported version {}" + .format(version) + ) + + + return self._send(request) + + @staticmethod + def _convert_delete_acls_resource_request_v0(acl_resource): + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) + + @staticmethod + def _convert_delete_acls_resource_request_v1(acl_resource): + return ( + acl_resource.resource_type, + acl_resource.name, + acl_resource.pattern_type, + acl_resource.principal, + acl_resource.host, + acl_resource.operation, + acl_resource.permission_type + ) # delete_acls protocol not yet implemented # Note: send the request to the least_loaded_node() + def delete_acls(self, acl_resources): + """Delete a set of ACLSs""" + + version = self._matching_api_version(DescribeAclsRequest) + + if version == 0: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v0(acl_resource) for acl_resource in acl_resources] + ) + elif version <= 1: + request = DeleteAclsRequest[version]( + filters=[self._convert_delete_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources] + ) + else: + raise UnsupportedVersionError( + "missing implementation of DescribeAcls for library supported version {}" + .format(version) + ) + + return self._send(request) @staticmethod def _convert_describe_config_resource_request(config_resource): From 628cc48e3150112cf63aec6cb46a7a04f0a1d753 Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Sat, 17 Nov 2018 01:15:23 +0100 Subject: [PATCH 2/8] Add basic test for AclResource --- test/test_admin.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/test_admin.py b/test/test_admin.py index 300d5bced..afa222e0d 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -26,6 +26,14 @@ def test_new_partitions(): assert good_partitions.new_assignments == [[1, 2, 3]] +def test_acl_resource(): + good_resource = kafka.admin.AclResource("TOPIC", "ALL", "ALLOW", "foo", + "User:bar", "*", "LITERAL") + assert(good_resource.resource_type == kafka.admin.AclResourceType.TOPIC) + assert(good_resource.operation == kafka.admin.AclOperation.ALL) + assert(good_resource.permission_type == kafka.admin.AclPermissionType.ALLOW) + assert(good_resource.pattern_type == kafka.admin.AclResourcePatternType.LITERAL) + def test_new_topic(): with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', -1, -1) From fc54f94839abcf625a76e904872608f1a7934d2c Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Mon, 19 Nov 2018 18:24:03 +0100 Subject: [PATCH 3/8] Use new client errors --- kafka/admin/client.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 42156e93e..3b32ff7fa 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -457,8 +457,8 @@ def describe_acls(self, acl_resource): ) else: - raise UnsupportedVersionError( - "missing implementation of DescribeAcls for library supported version {}" + raise NotImplementedError( + "Support for DescribeAcls v{} has not yet been added to KafkaAdmin." .format(version) ) @@ -513,12 +513,11 @@ def create_acls(self, acl_resources): creations=[self._convert_create_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources] ) else: - raise UnsupportedVersionError( - "missing implementation of DescribeAcls for library supported version {}" + raise NotImplementedError( + "Support for CreateAcls v{} has not yet been added to KafkaAdmin." .format(version) ) - return self._send(request) @staticmethod @@ -560,8 +559,8 @@ def delete_acls(self, acl_resources): filters=[self._convert_delete_acls_resource_request_v1(acl_resource) for acl_resource in acl_resources] ) else: - raise UnsupportedVersionError( - "missing implementation of DescribeAcls for library supported version {}" + raise NotImplementedError( + "Support for DeleteAcls v{} has not yet been added to KafkaAdmin." .format(version) ) From 7fa3c51531c09875fd9f4e22ed047381d1e2eedb Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Mon, 19 Nov 2018 18:26:18 +0100 Subject: [PATCH 4/8] Bugfix, match against correct schema --- kafka/admin/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 3b32ff7fa..ce6540d98 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -503,7 +503,7 @@ def _convert_create_acls_resource_request_v1(acl_resource): def create_acls(self, acl_resources): """Create a set of ACLs""" - version = self._matching_api_version(DescribeAclsRequest) + version = self._matching_api_version(CreateAclsRequest) if version == 0: request = CreateAclsRequest[version]( creations=[self._convert_create_acls_resource_request_v0(acl_resource) for acl_resource in acl_resources] @@ -548,7 +548,7 @@ def _convert_delete_acls_resource_request_v1(acl_resource): def delete_acls(self, acl_resources): """Delete a set of ACLSs""" - version = self._matching_api_version(DescribeAclsRequest) + version = self._matching_api_version(DeleteAclsRequest) if version == 0: request = DeleteAclsRequest[version]( From ae9ab37328a4ead3d1f213519543d780dfc797f9 Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Mon, 19 Nov 2018 18:42:46 +0100 Subject: [PATCH 5/8] Send Acl requests to least loaded node --- kafka/admin/client.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index ce6540d98..47640d013 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -429,8 +429,6 @@ def delete_topics(self, topics, timeout_ms=None): # describe cluster functionality is in ClusterMetadata # Note: if implemented here, send the request to the least_loaded_node() - # describe_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() def describe_acls(self, acl_resource): """Describe a set of ACLs """ @@ -462,7 +460,7 @@ def describe_acls(self, acl_resource): .format(version) ) - return self._send(request) + return self._send_request_to_node(self._client.least_loaded_node(), request) @staticmethod def _convert_create_acls_resource_request_v0(acl_resource): @@ -498,8 +496,6 @@ def _convert_create_acls_resource_request_v1(acl_resource): acl_resource.permission_type ) - # create_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() def create_acls(self, acl_resources): """Create a set of ACLs""" @@ -518,7 +514,7 @@ def create_acls(self, acl_resources): .format(version) ) - return self._send(request) + return self._send_request_to_node(self._client.least_loaded_node(), request) @staticmethod def _convert_delete_acls_resource_request_v0(acl_resource): @@ -543,8 +539,6 @@ def _convert_delete_acls_resource_request_v1(acl_resource): acl_resource.permission_type ) - # delete_acls protocol not yet implemented - # Note: send the request to the least_loaded_node() def delete_acls(self, acl_resources): """Delete a set of ACLSs""" @@ -564,7 +558,7 @@ def delete_acls(self, acl_resources): .format(version) ) - return self._send(request) + return self._send_request_to_node(self._client.least_loaded_node(), request) @staticmethod def _convert_describe_config_resource_request(config_resource): From 0349bc9d89a1ae692fbe77dad57934e46e9e9ed3 Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Mon, 19 Nov 2018 21:15:57 +0100 Subject: [PATCH 6/8] Capitalize ACL types properly --- kafka/admin/__init__.py | 8 ++++---- kafka/admin/acl_resource.py | 28 ++++++++++++++-------------- kafka/admin/client.py | 10 +++++----- test/test_admin.py | 10 +++++----- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index b1e102673..9e5c914d4 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -2,12 +2,12 @@ from kafka.admin.config_resource import ConfigResource, ConfigResourceType from kafka.admin.client import KafkaAdminClient -from kafka.admin.acl_resource import (AclResource, AclOperation, AclResourceType, AclPermissionType, - AclResourcePatternType) +from kafka.admin.acl_resource import (ACLResource, ACLOperation, ACLResourceType, ACLPermissionType, + ACLResourcePatternType) from kafka.admin.new_topic import NewTopic from kafka.admin.new_partitions import NewPartitions __all__ = [ - 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions', 'AclResource', 'AclOperation', - 'AclResourceType', 'AclPermissionType', 'AclResourcePatternType' + 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClent', 'NewTopic', 'NewPartitions', 'ACLResource', 'ACLOperation', + 'ACLResourceType', 'ACLPermissionType', 'ACLResourcePatternType' ] diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index ff3df4ac8..3bd9052a6 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -7,7 +7,7 @@ # vendored backport module from kafka.vendor.enum34 import IntEnum -class AclResourceType(IntEnum): +class ACLResourceType(IntEnum): """An enumerated type of config resources""" ANY = 1, @@ -17,7 +17,7 @@ class AclResourceType(IntEnum): TOPIC = 2, TRANSACTIONAL_ID = 5 -class AclOperation(IntEnum): +class ACLOperation(IntEnum): """An enumerated type of acl operations""" ANY = 1, @@ -34,14 +34,14 @@ class AclOperation(IntEnum): IDEMPOTENT_WRITE = 12 -class AclPermissionType(IntEnum): +class ACLPermissionType(IntEnum): """An enumerated type of permissions""" ANY = 1, DENY = 2, ALLOW = 3 -class AclResourcePatternType(IntEnum): +class ACLResourcePatternType(IntEnum): """An enumerated type of resource patterns""" ANY = 1, @@ -49,7 +49,7 @@ class AclResourcePatternType(IntEnum): LITERAL = 3, PREFIXED = 4 -class AclResource(object): +class ACLResource(object): """A class for specifying config resources. Arguments: resource_type (ConfigResourceType): the type of kafka resource @@ -65,20 +65,20 @@ def __init__( name=None, principal=None, host=None, - pattern_type=AclResourcePatternType.LITERAL + pattern_type=ACLResourcePatternType.LITERAL ): - if not isinstance(resource_type, AclResourceType): - resource_type = AclResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object + if not isinstance(resource_type, ACLResourceType): + resource_type = ACLResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object self.resource_type = resource_type - if not isinstance(operation, AclOperation): - operation = AclOperation[str(operation).upper()] # pylint: disable-msg:unsubscriptable-object + if not isinstance(operation, ACLOperation): + operation = ACLOperation[str(operation).upper()] # pylint: disable-msg:unsubscriptable-object self.operation = operation - if not isinstance(permission_type, AclPermissionType): - permission_type = AclPermissionType[str(permission_type).upper()] # pylint: disable-msg=unsubscriptable-object + if not isinstance(permission_type, ACLPermissionType): + permission_type = ACLPermissionType[str(permission_type).upper()] # pylint: disable-msg=unsubscriptable-object self.permission_type = permission_type self.name = name self.principal = principal self.host = host - if not isinstance(pattern_type, AclResourcePatternType): - pattern_type = AclResourcePatternType[str(pattern_type).upper()] # pylint: disable-msg=unsubscriptable-object + if not isinstance(pattern_type, ACLResourcePatternType): + pattern_type = ACLResourcePatternType[str(pattern_type).upper()] # pylint: disable-msg=unsubscriptable-object self.pattern_type = pattern_type diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 47640d013..b121b4bf4 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -19,7 +19,7 @@ from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest from kafka.structs import TopicPartition, OffsetAndMetadata -from kafka.admin.acl_resource import AclOperation, AclPermissionType +from kafka.admin.acl_resource import ACLOperation, ACLPermissionType from kafka.version import __version__ @@ -464,9 +464,9 @@ def describe_acls(self, acl_resource): @staticmethod def _convert_create_acls_resource_request_v0(acl_resource): - if acl_resource.operation == AclOperation.ANY: + if acl_resource.operation == ACLOperation.ANY: raise IllegalArgumentError("operation must not be ANY") - if acl_resource.permission_type == AclPermissionType.ANY: + if acl_resource.permission_type == ACLPermissionType.ANY: raise IllegalArgumentError("permission_type must not be ANY") return ( @@ -481,9 +481,9 @@ def _convert_create_acls_resource_request_v0(acl_resource): @staticmethod def _convert_create_acls_resource_request_v1(acl_resource): - if acl_resource.operation == AclOperation.ANY: + if acl_resource.operation == ACLOperation.ANY: raise IllegalArgumentError("operation must not be ANY") - if acl_resource.permission_type == AclPermissionType.ANY: + if acl_resource.permission_type == ACLPermissionType.ANY: raise IllegalArgumentError("permission_type must not be ANY") return ( diff --git a/test/test_admin.py b/test/test_admin.py index afa222e0d..2883521db 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -27,12 +27,12 @@ def test_new_partitions(): def test_acl_resource(): - good_resource = kafka.admin.AclResource("TOPIC", "ALL", "ALLOW", "foo", + good_resource = kafka.admin.ACLResource("TOPIC", "ALL", "ALLOW", "foo", "User:bar", "*", "LITERAL") - assert(good_resource.resource_type == kafka.admin.AclResourceType.TOPIC) - assert(good_resource.operation == kafka.admin.AclOperation.ALL) - assert(good_resource.permission_type == kafka.admin.AclPermissionType.ALLOW) - assert(good_resource.pattern_type == kafka.admin.AclResourcePatternType.LITERAL) + assert(good_resource.resource_type == kafka.admin.ACLResourceType.TOPIC) + assert(good_resource.operation == kafka.admin.ACLOperation.ALL) + assert(good_resource.permission_type == kafka.admin.ACLPermissionType.ALLOW) + assert(good_resource.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL) def test_new_topic(): with pytest.raises(IllegalArgumentError): From 7c7f5bf8aebda714f9f6e2f2a45d23fd777c1dda Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Mon, 19 Nov 2018 21:17:04 +0100 Subject: [PATCH 7/8] Fix pep-8 blank line issue --- kafka/admin/acl_resource.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index 3bd9052a6..445b16e5a 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -7,6 +7,7 @@ # vendored backport module from kafka.vendor.enum34 import IntEnum + class ACLResourceType(IntEnum): """An enumerated type of config resources""" From 6f400519f0f2186a676f33ce55a8661a66252865 Mon Sep 17 00:00:00 2001 From: Ulrik Johansson Date: Mon, 19 Nov 2018 21:43:14 +0100 Subject: [PATCH 8/8] Require resource type objects when creating an ACLResource --- kafka/admin/acl_resource.py | 9 +++++---- test/test_admin.py | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/kafka/admin/acl_resource.py b/kafka/admin/acl_resource.py index 445b16e5a..fc273f8d1 100644 --- a/kafka/admin/acl_resource.py +++ b/kafka/admin/acl_resource.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from kafka.errors import IllegalArgumentError # enum in stdlib as of py3.4 try: @@ -69,17 +70,17 @@ def __init__( pattern_type=ACLResourcePatternType.LITERAL ): if not isinstance(resource_type, ACLResourceType): - resource_type = ACLResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object + raise IllegalArgumentError("resource_param must be of type ACLResourceType") self.resource_type = resource_type if not isinstance(operation, ACLOperation): - operation = ACLOperation[str(operation).upper()] # pylint: disable-msg:unsubscriptable-object + raise IllegalArgumentError("operation must be of type ACLOperation") self.operation = operation if not isinstance(permission_type, ACLPermissionType): - permission_type = ACLPermissionType[str(permission_type).upper()] # pylint: disable-msg=unsubscriptable-object + raise IllegalArgumentError("permission_type must be of type ACLPermissionType") self.permission_type = permission_type self.name = name self.principal = principal self.host = host if not isinstance(pattern_type, ACLResourcePatternType): - pattern_type = ACLResourcePatternType[str(pattern_type).upper()] # pylint: disable-msg=unsubscriptable-object + raise IllegalArgumentError("pattern_type must be of type ACLResourcePatternType") self.pattern_type = pattern_type diff --git a/test/test_admin.py b/test/test_admin.py index 2883521db..371e66e8f 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -27,13 +27,23 @@ def test_new_partitions(): def test_acl_resource(): - good_resource = kafka.admin.ACLResource("TOPIC", "ALL", "ALLOW", "foo", - "User:bar", "*", "LITERAL") + good_resource = kafka.admin.ACLResource( + kafka.admin.ACLResourceType.TOPIC, + kafka.admin.ACLOperation.ALL, + kafka.admin.ACLPermissionType.ALLOW, + "foo", + "User:bar", + "*", + kafka.admin.ACLResourcePatternType.LITERAL + ) assert(good_resource.resource_type == kafka.admin.ACLResourceType.TOPIC) assert(good_resource.operation == kafka.admin.ACLOperation.ALL) assert(good_resource.permission_type == kafka.admin.ACLPermissionType.ALLOW) assert(good_resource.pattern_type == kafka.admin.ACLResourcePatternType.LITERAL) + with pytest.raises(IllegalArgumentError): + bad_resource = kafka.admin.ACLResource("TOPIC", "ALL", "ALLOW", "foo", "User:bar", "*", "LITERAL") + def test_new_topic(): with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', -1, -1)