+ * If the filter has each three parameters fully supplied, then it will only match a resource that has exactly + * the same values, e.g. a filter of {@code new ResourceFilter(ResourceType.GROUP, "one", ResourceTypeName.PREFIXED)} + * will only match the resource {@code new Resource(ResourceType.GROUP, "one", ResourceTypeName.PREFIXED)}. + *
+ * Any of the three parameters can be set to be ignored by the filter: + *
- * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- *
- */
-object AclZNode {
- def path = "/kafka-acl"
+ * Acls for resources are stored in ZK under a root node that is determined by the [[ResourceNameType]].
+ * Under each [[ResourceNameType]] node there will be one child node per resource type (Topic, Cluster, Group, etc).
+ * Under each resourceType there will be a unique child for each resource path and the data for that child will contain
+ * list of its acls as a json object. Following gives an example:
+ *
+ *
+ * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+ * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+ * /kafka-prefixed-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
+ *
+ */
+case class ZkAclStore(nameType: ResourceNameType) {
+ val aclPath: String = nameType match {
+ case Literal => "/kafka-acl"
+ case Prefixed => "/kafka-prefixed-acl"
+ case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
+ }
+
+ val aclChangePath: String = nameType match {
+ case Literal => "/kafka-acl-changes"
+ case Prefixed => "/kafka-prefixed-acl-changes"
+ case _ => throw new IllegalArgumentException("Unknown name type:" + nameType)
+ }
+
+ def path(resourceType: ResourceType) = s"$aclPath/$resourceType"
+
+ def path(resourceType: ResourceType, resourceName: String): String = s"$aclPath/$resourceType/$resourceName"
+
+ def changeSequenceZNode: AclChangeNotificationSequenceZNode = AclChangeNotificationSequenceZNode(this)
+
+ def decode(notificationMessage: Array[Byte]): Resource = AclChangeNotificationSequenceZNode.decode(nameType, notificationMessage)
}
-object ResourceTypeZNode {
- def path(resourceType: String) = s"${AclZNode.path}/$resourceType"
+object ZkAclStore {
+ val stores: Seq[ZkAclStore] = ResourceNameType.values
+ .map(nameType => ZkAclStore(nameType))
+
+ val securePaths: Seq[String] = stores
+ .flatMap(store => List(store.aclPath, store.aclChangePath))
}
object ResourceZNode {
- def path(resource: Resource) = s"${AclZNode.path}/${resource.resourceType}/${resource.name}"
- def encode(acls: Set[Acl]): Array[Byte] = {
- Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
- }
+ def path(resource: Resource): String = ZkAclStore(resource.resourceNameType).path(resource.resourceType, resource.name)
+
+ def encode(acls: Set[Acl]): Array[Byte] = Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
}
-object AclChangeNotificationZNode {
- def path = "/kafka-acl-changes"
+object AclChangeNotificationSequenceZNode {
+ val Separator = ":"
+ def SequenceNumberPrefix = "acl_changes_"
+
+ def encode(resource: Resource): Array[Byte] = {
+ (resource.resourceType.name + Separator + resource.name).getBytes(UTF_8)
+ }
+
+ def decode(nameType: ResourceNameType, bytes: Array[Byte]): Resource = {
+ val str = new String(bytes, UTF_8)
+ str.split(Separator, 2) match {
+ case Array(resourceType, name, _*) => Resource(ResourceType.fromString(resourceType), name, nameType)
+ case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
+ }
+ }
}
-object AclChangeNotificationSequenceZNode {
- val SequenceNumberPrefix = "acl_changes_"
- def createPath = s"${AclChangeNotificationZNode.path}/$SequenceNumberPrefix"
- def deletePath(sequenceNode: String) = s"${AclChangeNotificationZNode.path}/${sequenceNode}"
- def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
- def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+case class AclChangeNotificationSequenceZNode(store: ZkAclStore) {
+ def createPath = s"${store.aclChangePath}/${AclChangeNotificationSequenceZNode.SequenceNumberPrefix}"
+ def deletePath(sequenceNode: String) = s"${store.aclChangePath}/$sequenceNode"
}
object ClusterZNode {
@@ -545,11 +577,9 @@ object ZkData {
ControllerZNode.path,
ControllerEpochZNode.path,
IsrChangeNotificationZNode.path,
- AclZNode.path,
- AclChangeNotificationZNode.path,
ProducerIdBlockZNode.path,
LogDirEventNotificationZNode.path,
- DelegationTokenAuthZNode.path)
+ DelegationTokenAuthZNode.path) ++ ZkAclStore.securePaths
// These are persistent ZK paths that should exist on kafka broker startup.
val PersistentZkPaths = Seq(
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 5e4b893bf3f9b..331a4491e0d63 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.junit.{After, Before, Rule, Test}
import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
-import org.apache.kafka.common.resource.{Resource, ResourceType}
+import org.apache.kafka.common.resource.{Resource, ResourceNameType, ResourceType}
import org.junit.rules.Timeout
import org.junit.Assert._
@@ -933,7 +933,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
checkInvalidAlterConfigs(zkClient, servers, client)
}
- val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+ val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))
/**
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ea5a155b5dcca..b40dab7809c45 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records,
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _}
-import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType, Resource => AdminResource, ResourceType => AdminResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.{KafkaException, Node, TopicPartition, requests}
import org.junit.Assert._
@@ -70,11 +70,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val deleteRecordsPartition = new TopicPartition(deleteTopic, part)
val topicAndPartition = TopicAndPartition(topic, part)
val group = "my-group"
- val topicResource = new Resource(Topic, topic)
- val groupResource = new Resource(Group, group)
- val deleteTopicResource = new Resource(Topic, deleteTopic)
- val transactionalIdResource = new Resource(TransactionalId, transactionalId)
- val createTopicResource = new Resource(Topic, createTopic)
+ val topicResource = new Resource(Topic, topic, Literal)
+ val groupResource = new Resource(Group, group, Literal)
+ val deleteTopicResource = new Resource(Topic, deleteTopic, Literal)
+ val transactionalIdResource = new Resource(TransactionalId, transactionalId, Literal)
+ val createTopicResource = new Resource(Topic, createTopic, Literal)
val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
@@ -383,7 +383,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def deleteAclsRequest = new DeleteAclsRequest.Builder(
Collections.singletonList(new AclBindingFilter(
- new ResourceFilter(AdminResourceType.TOPIC, null),
+ new ResourceFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL),
new AccessControlEntryFilter(userPrincipal.toString, "*", AclOperation.ANY, AclPermissionType.DENY)))).build()
private def alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
@@ -577,7 +577,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) {
val topicPartition = new TopicPartition(createTopic, 0)
- val newTopicResource = new Resource(Topic, createTopic)
+ val newTopicResource = new Resource(Topic, createTopic, Literal)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
try {
sendRecords(numRecords, topicPartition)
@@ -733,7 +733,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// create an unmatched topic
val unmatchedTopic = "unmatched"
createTopic(unmatchedTopic)
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic, Literal))
sendRecords(1, new TopicPartition(unmatchedTopic, part))
removeAllAcls()
@@ -746,7 +746,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// set the subscription pattern to an internal topic that the consumer has read permission to. Since
// internal topics are not included, we should not be assigned any partitions from this topic
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), new Resource(Topic,
- GROUP_METADATA_TOPIC_NAME))
+ GROUP_METADATA_TOPIC_NAME, Literal))
consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
assertTrue(consumer.subscription().isEmpty)
@@ -774,7 +774,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
// now authorize the user for the internal topic and verify that we can subscribe
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), Resource(Topic,
- GROUP_METADATA_TOPIC_NAME))
+ GROUP_METADATA_TOPIC_NAME, Literal))
consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
@@ -789,7 +789,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), topicResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), groupResource)
- val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME)
+ val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME, Literal)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), internalTopicResource)
val consumerConfig = new Properties
@@ -842,7 +842,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) {
val topicPartition = new TopicPartition(newTopic, 0)
- val newTopicResource = new Resource(Topic, newTopic)
+ val newTopicResource = new Resource(Topic, newTopic, Literal)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
this.consumers.head.assign(List(topicPartition).asJava)
@@ -1045,7 +1045,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteTopicsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
val version = ApiKeys.DELETE_TOPICS.latestVersion
val deleteResponse = DeleteTopicsResponse.parse(response, version)
@@ -1072,7 +1072,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testDeleteRecordsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*", Literal))
val response = connectAndSend(deleteRecordsRequest, ApiKeys.DELETE_RECORDS)
val version = ApiKeys.DELETE_RECORDS.latestVersion
val deleteRecordsResponse = DeleteRecordsResponse.parse(response, version)
@@ -1090,7 +1090,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test
def testCreatePartitionsWithWildCardAuth() {
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*"))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)), new Resource(Topic, "*", Literal))
val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
val version = ApiKeys.CREATE_PARTITIONS.latestVersion
val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
@@ -1283,7 +1283,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), transactionalIdResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), topicResource)
- addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic))
+ addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)), new Resource(Topic, deleteTopic, Literal))
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index c81b32d121bdf..b809686f80585 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -65,8 +65,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
val numRecords = 1
- val group = "group"
- val topic = "e2etopic"
+ val groupPrefix = "gr"
+ val group = s"${groupPrefix}oup"
+ val topicPrefix = "e2e"
+ val topic = s"${topicPrefix}topic"
val wildcard = "*"
val part = 0
val tp = new TopicPartition(topic, part)
@@ -76,11 +78,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
- val topicResource = new Resource(Topic, topic)
- val groupResource = new Resource(Group, group)
+ val topicResource = new Resource(Topic, topic, Literal)
+ val groupResource = new Resource(Group, group, Literal)
val clusterResource = Resource.ClusterResource
- val wildcardTopicResource = new Resource(Topic, wildcard)
- val wildcardGroupResource = new Resource(Group, wildcard)
+ val prefixedTopicResource = new Resource(Topic, topicPrefix, Prefixed)
+ val prefixedGroupResource = new Resource(Group, groupPrefix, Prefixed)
+ val wildcardTopicResource = new Resource(Topic, wildcard, Literal)
+ val wildcardGroupResource = new Resource(Group, wildcard, Literal)
// Arguments to AclCommand to set ACLs.
def clusterActionArgs: Array[String] = Array("--authorizer-properties",
@@ -142,6 +146,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
s"--consumer",
s"--producer",
s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
+ def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties",
+ s"zookeeper.connect=$zkConnect",
+ s"--add",
+ s"--topic=$topicPrefix",
+ s"--group=$groupPrefix",
+ s"--resource-name-type=prefixed",
+ s"--consumer",
+ s"--producer",
+ s"--allow-principal=$kafkaPrincipalType:$clientPrincipal")
def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction))
def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read))
@@ -169,7 +182,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
super.setUp()
servers.foreach { s =>
TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
- TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*"))
+ TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*", Literal))
}
// create the test topic with all the brokers as replicas
createTopic(topic, 1, 3)
@@ -219,12 +232,12 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumeRecords(this.consumers.head, numRecords)
}
- private def setWildcardResourceAcls() {
- AclCommand.main(produceConsumeWildcardAclArgs)
- servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
- TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
- }
+ @Test
+ def testProduceConsumeWithPrefixedAcls(): Unit = {
+ setPrefixedResourceAcls()
+ sendRecords(numRecords, tp)
+ consumers.head.subscribe(List(topic).asJava)
+ consumeRecords(this.consumers.head, numRecords)
}
@Test
@@ -236,6 +249,22 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumeRecords(this.consumers.head, numRecords, topic = tp2.topic)
}
+ private def setWildcardResourceAcls() {
+ AclCommand.main(produceConsumeWildcardAclArgs)
+ servers.foreach { s =>
+ TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
+ }
+ }
+
+ private def setPrefixedResourceAcls() {
+ AclCommand.main(produceConsumePrefixedAclsArgs)
+ servers.foreach { s =>
+ TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, prefixedTopicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, prefixedGroupResource)
+ }
+ }
+
protected def setAclsAndProduce(tp: TopicPartition) {
AclCommand.main(produceAclArgs(tp.topic))
AclCommand.main(consumeAclArgs(tp.topic))
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 099af5247d366..b3572c0665b62 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -13,14 +13,15 @@
package kafka.api
import java.io.File
+import java.util
-import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Operation, PermissionType, SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource}
+import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, ClusterAction, Create, Delete, Deny, Describe, Group, Literal, Operation, PermissionType, SimpleAclAuthorizer, Topic, Prefixed, Acl => AuthAcl, Resource => AuthResource}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
-import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
-import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType}
+import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceNameType, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.Assert.assertEquals
import org.junit.{After, Assert, Before, Test}
@@ -88,25 +89,29 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
closeSasl()
}
- val acl2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2"),
+ val anyAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*", ResourceNameType.LITERAL),
+ new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
+ val acl2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
- val acl3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+ val acl3 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
+ new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
+ val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
- val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"),
+ val prefixAcl = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
- val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id"),
+ val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW))
- val groupAcl = new AclBinding(new Resource(ResourceType.GROUP, "*"),
+ val groupAcl = new AclBinding(new Resource(ResourceType.GROUP, "*", ResourceNameType.LITERAL),
new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
@Test
override def testAclOperations(): Unit = {
client = AdminClient.create(createConfig())
- assertEquals(7, client.describeAcls(AclBindingFilter.ANY).values.get().size)
+ assertEquals(7, getAcls(AclBindingFilter.ANY).size)
val results = client.createAcls(List(acl2, acl3).asJava)
assertEquals(Set(acl2, acl3), results.values.keySet().asScala)
results.values.values().asScala.foreach(value => value.get)
- val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
+ val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW))
val results2 = client.createAcls(List(aclUnknown).asJava)
assertEquals(Set(aclUnknown), results2.values.keySet().asScala)
@@ -118,13 +123,6 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet)
}
- def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
- TestUtils.waitUntilTrue(() => {
- val results = client.describeAcls(filter).values.get()
- acls == results.asScala.toSet
- }, s"timed out waiting for ACLs $acls")
- }
-
@Test
def testAclOperations2(): Unit = {
client = AdminClient.create(createConfig())
@@ -134,9 +132,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl))
- val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY)
- val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY)
- val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null), AccessControlEntryFilter.ANY)
+ val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
waitForDescribeAcls(client, filterA, Set(groupAcl))
waitForDescribeAcls(client, filterC, Set(transactionalIdAcl))
@@ -151,12 +149,126 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
waitForDescribeAcls(client, filterC, Set())
}
+ @Test
+ def testAclDescribe(): Unit = {
+ client = AdminClient.create(createConfig())
+ ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
+
+ val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+ val allLiteralTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val allPrefixedTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
+ val literalMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val prefixedMyTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic", ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
+ val allMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+ val allFooTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foobar", ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+
+ assertEquals(Set(anyAcl), getAcls(anyAcl.toFilter))
+ assertEquals(Set(prefixAcl), getAcls(prefixAcl.toFilter))
+ assertEquals(Set(acl2), getAcls(acl2.toFilter))
+ assertEquals(Set(fooAcl), getAcls(fooAcl.toFilter))
+
+ assertEquals(Set(acl2), getAcls(literalMyTopic2Acls))
+ assertEquals(Set(prefixAcl), getAcls(prefixedMyTopicAcls))
+ assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allLiteralTopicAcls))
+ assertEquals(Set(prefixAcl), getAcls(allPrefixedTopicAcls))
+ assertEquals(Set(anyAcl, acl2, prefixAcl), getAcls(allMyTopic2Acls))
+ assertEquals(Set(anyAcl, fooAcl), getAcls(allFooTopicAcls))
+ assertEquals(Set(anyAcl, acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
+ }
+
+ @Test
+ def testAclDelete(): Unit = {
+ client = AdminClient.create(createConfig())
+ ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl))
+
+ val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+ val allLiteralTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val allPrefixedTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.PREFIXED), AccessControlEntryFilter.ANY)
+
+ // Delete only ACLs on literal 'mytopic2' topic
+ var deleted = client.deleteAcls(List(acl2.toFilter).asJava).all().get().asScala.toSet
+ assertEquals(Set(acl2), deleted)
+ assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+ ensureAcls(deleted)
+
+ // Delete only ACLs on literal '*' topic
+ deleted = client.deleteAcls(List(anyAcl.toFilter).asJava).all().get().asScala.toSet
+ assertEquals(Set(anyAcl), deleted)
+ assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+ ensureAcls(deleted)
+
+ // Delete only ACLs on specific prefixed 'mytopic' topics:
+ deleted = client.deleteAcls(List(prefixAcl.toFilter).asJava).all().get().asScala.toSet
+ assertEquals(Set(prefixAcl), deleted)
+ assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
+
+ ensureAcls(deleted)
+
+ // Delete all literal ACLs:
+ deleted = client.deleteAcls(List(allLiteralTopicAcls).asJava).all().get().asScala.toSet
+ assertEquals(Set(anyAcl, acl2, fooAcl), deleted)
+ assertEquals(Set(prefixAcl), getAcls(allTopicAcls))
+
+ ensureAcls(deleted)
+
+ // Delete all prefixed ACLs:
+ deleted = client.deleteAcls(List(allPrefixedTopicAcls).asJava).all().get().asScala.toSet
+ assertEquals(Set(prefixAcl), deleted)
+ assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(allTopicAcls))
+
+ ensureAcls(deleted)
+
+ // Delete all topic ACLs:
+ deleted = client.deleteAcls(List(allTopicAcls).asJava).all().get().asScala.toSet
+ assertEquals(Set(), getAcls(allTopicAcls))
+ }
+
+ //noinspection ScalaDeprecation - test explicitly covers clients using legacy / deprecated constructors
+ @Test
+ def testLegacyAclOpsNeverAffectOrReturnPrefixed(): Unit = {
+ client = AdminClient.create(createConfig())
+ ensureAcls(Set(anyAcl, acl2, fooAcl, prefixAcl)) // <-- prefixed exists, but should never be returned.
+
+ val allTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.ANY), AccessControlEntryFilter.ANY)
+ val legacyAllTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, null, ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val legacyMyTopic2Acls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val legacyAnyTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "*", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+ val legacyFooTopicAcls = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "foobar", ResourceNameType.LITERAL), AccessControlEntryFilter.ANY)
+
+ assertEquals(Set(anyAcl, acl2, fooAcl), getAcls(legacyAllTopicAcls))
+ assertEquals(Set(acl2), getAcls(legacyMyTopic2Acls))
+ assertEquals(Set(anyAcl), getAcls(legacyAnyTopicAcls))
+ assertEquals(Set(fooAcl), getAcls(legacyFooTopicAcls))
+
+ // Delete only (legacy) ACLs on 'mytopic2' topic
+ var deleted = client.deleteAcls(List(legacyMyTopic2Acls).asJava).all().get().asScala.toSet
+ assertEquals(Set(acl2), deleted)
+ assertEquals(Set(anyAcl, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+ ensureAcls(deleted)
+
+ // Delete only (legacy) ACLs on '*' topic
+ deleted = client.deleteAcls(List(legacyAnyTopicAcls).asJava).all().get().asScala.toSet
+ assertEquals(Set(anyAcl), deleted)
+ assertEquals(Set(acl2, fooAcl, prefixAcl), getAcls(allTopicAcls))
+
+ ensureAcls(deleted)
+
+ // Delete all (legacy) topic ACLs:
+ deleted = client.deleteAcls(List(legacyAllTopicAcls).asJava).all().get().asScala.toSet
+ assertEquals(Set(anyAcl, acl2, fooAcl), deleted)
+ assertEquals(Set(), getAcls(legacyAllTopicAcls))
+ assertEquals(Set(prefixAcl), getAcls(allTopicAcls))
+ }
+
@Test
def testAttemptToCreateInvalidAcls(): Unit = {
client = AdminClient.create(createConfig())
- val clusterAcl = new AclBinding(new Resource(ResourceType.CLUSTER, "foobar"),
+ val clusterAcl = new AclBinding(new Resource(ResourceType.CLUSTER, "foobar", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
- val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""),
+ val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, "", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW))
val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions())
assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala)
@@ -224,7 +336,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
private def testAclGet(expectAuth: Boolean): Unit = {
TestUtils.waitUntilTrue(() => {
- val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*"),
+ val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*", ResourceNameType.LITERAL),
new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW))
val results = client.describeAcls(userAcl.toFilter)
if (expectAuth) {
@@ -276,4 +388,22 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
testAclGet(expectAuth = true)
testAclCreateGetDelete(expectAuth = false)
}
+
+ private def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = {
+ var lastResults: util.Collection[AclBinding] = null
+ TestUtils.waitUntilTrue(() => {
+ lastResults = client.describeAcls(filter).values.get()
+ acls == lastResults.asScala.toSet
+ }, s"timed out waiting for ACLs $acls.\nActual $lastResults")
+ }
+
+ private def ensureAcls(bindings: Set[AclBinding]): Unit = {
+ client.createAcls(bindings.asJava).all().get()
+
+ bindings.foreach(binding => waitForDescribeAcls(client, binding.toFilter, Set(binding)))
+ }
+
+ private def getAcls(allTopicAcls: AclBindingFilter) = {
+ client.describeAcls(allTopicAcls).values.get().asScala.toSet
+ }
}
diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
index 9197f79882f08..71754ba263318 100644
--- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala
@@ -24,21 +24,22 @@ import kafka.server.KafkaConfig
import kafka.utils.{Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.junit.Test
+import org.junit.{After, Before, Test}
class AclCommandTest extends ZooKeeperTestHarness with Logging {
+ private val principal: KafkaPrincipal = KafkaPrincipal.fromString("User:test2")
private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
- KafkaPrincipal.fromString("User:test2"),
+ principal,
KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
private val Hosts = Set("host1", "host2")
private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")
- private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
- private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2"))
- private val TransactionalIdResources = Set(new Resource(TransactionalId, "t0"), new Resource(TransactionalId, "t1"))
- private val TokenResources = Set(new Resource(DelegationToken, "token1"), new Resource(DelegationToken, "token2"))
+ private val TopicResources = Set(Resource(Topic, "test-1", Literal), Resource(Topic, "test-2", Literal))
+ private val GroupResources = Set(Resource(Group, "testGroup-1", Literal), Resource(Group, "testGroup-2", Literal))
+ private val TransactionalIdResources = Set(Resource(TransactionalId, "t0", Literal), Resource(TransactionalId, "t1", Literal))
+ private val TokenResources = Set(Resource(DelegationToken, "token1", Literal), Resource(DelegationToken, "token2", Literal))
private val ResourceToCommand = Map[Set[Resource], Array[String]](
TopicResources -> Array("--topic", "test-1", "--topic", "test-2"),
@@ -82,45 +83,71 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
ProducerResourceToAcls(enableIdempotence = true).getOrElse(k, Set.empty[Acl])) }
)
- @Test
- def testAclCli() {
- val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
+ private var brokerProps: Properties = _
+ private var zkArgs: Array[String] = _
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+
+ brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
- val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
+ zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
+ }
+
+ @Test
+ def testAclCli() {
for ((resources, resourceCmd) <- ResourceToCommand) {
for (permissionType <- PermissionType.values) {
val operationToCmd = ResourceToOperations(resources)
val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
- AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
+ AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
for (resource <- resources) {
- withAuthorizer(brokerProps) { authorizer =>
+ withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
}
}
- testRemove(resources, resourceCmd, args, brokerProps)
+ testRemove(resources, resourceCmd, brokerProps)
}
}
}
@Test
def testProducerConsumerCli() {
- val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
- brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
- val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
-
for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
- AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
+ AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
for ((resources, acls) <- resourcesToAcls) {
for (resource <- resources) {
- withAuthorizer(brokerProps) { authorizer =>
+ withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
}
}
}
- testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, args, brokerProps)
+ testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, brokerProps)
+ }
+ }
+
+ @Test
+ def testAclsOnPrefixedResources(): Unit = {
+ val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-name-type", "Prefixed")
+
+ AclCommand.main(zkArgs ++ cmd :+ "--add")
+
+ withAuthorizer() { authorizer =>
+ val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
+ val describeAcl = Acl(principal, Allow, Acl.WildCardHost, Describe)
+ val createAcl = Acl(principal, Allow, Acl.WildCardHost, Create)
+ TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", Prefixed))
+ }
+
+ AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
+
+ withAuthorizer() { authorizer =>
+ TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", Literal))
+ TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Topic, "Test-", Prefixed))
}
}
@@ -130,10 +157,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
}
- private def testRemove(resources: Set[Resource], resourceCmd: Array[String], args: Array[String], brokerProps: Properties) {
+ private def testRemove(resources: Set[Resource], resourceCmd: Array[String], brokerProps: Properties) {
for (resource <- resources) {
- AclCommand.main(args ++ resourceCmd :+ "--remove" :+ "--force")
- withAuthorizer(brokerProps) { authorizer =>
+ AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force")
+ withAuthorizer() { authorizer =>
TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
}
}
@@ -150,8 +177,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString))
}
- def withAuthorizer(props: Properties)(f: Authorizer => Unit) {
- val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
+ def withAuthorizer()(f: Authorizer => Unit) {
+ val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
val authZ = new SimpleAclAuthorizer
try {
authZ.configure(kafkaConfig.originals)
diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index e46bd9b726fb8..cee0bd6b72582 100644
--- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -16,31 +16,39 @@
*/
package kafka.common
-import java.nio.charset.StandardCharsets
-
+import kafka.security.auth.{Group, Literal, Resource}
import kafka.utils.TestUtils
-import kafka.zk.{AclChangeNotificationSequenceZNode, AclChangeNotificationZNode, ZooKeeperTestHarness}
-import org.junit.Test
+import kafka.zk.{AclChangeNotificationSequenceZNode, ZkAclStore, ZooKeeperTestHarness}
+import org.junit.{After, Test}
class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
+ var notificationListener: ZkNodeChangeNotificationListener = _
+
+ @After
+ override def tearDown(): Unit = {
+ if (notificationListener != null) {
+ notificationListener.close()
+ }
+ }
+
@Test
def testProcessNotification() {
- @volatile var notification: String = null
+ @volatile var notification: Resource = null
@volatile var invocationCount = 0
val notificationHandler = new NotificationHandler {
override def processNotification(notificationMessage: Array[Byte]): Unit = {
- notification = new String(notificationMessage, StandardCharsets.UTF_8)
+ notification = AclChangeNotificationSequenceZNode.decode(Literal, notificationMessage)
invocationCount += 1
}
}
zkClient.createAclPaths()
- val notificationMessage1 = "message1"
- val notificationMessage2 = "message2"
+ val notificationMessage1 = Resource(Group, "messageA", Literal)
+ val notificationMessage2 = Resource(Group, "messageB", Literal)
val changeExpirationMs = 1000
- val notificationListener = new ZkNodeChangeNotificationListener(zkClient, AclChangeNotificationZNode.path,
+ notificationListener = new ZkNodeChangeNotificationListener(zkClient, ZkAclStore(Literal).aclChangePath,
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, notificationHandler, changeExpirationMs)
notificationListener.init()
@@ -60,7 +68,7 @@ class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2,
"Failed to send/process notification message in the timeout period.")
- (3 to 10).foreach(i => zkClient.createAclChangeNotification("message" + i))
+ (3 to 10).foreach(i => zkClient.createAclChangeNotification(Resource(Group, "message" + i, Literal)))
TestUtils.waitUntilTrue(() => invocationCount == 10 ,
s"Expected 10 invocations of processNotifications, but there were $invocationCount")
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1e18f1d7bce93..3e7f6a8020a29 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -20,7 +20,7 @@ import java.net.InetAddress
import java.util.UUID
import kafka.network.RequestChannel.Session
-import kafka.security.auth.Acl.WildCardHost
+import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
@@ -30,14 +30,22 @@ import org.junit.{After, Before, Test}
class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
+ val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
+ val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
+ val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
+
+ val wildCardResource = Resource(Topic, WildCardResource, Literal)
+ val prefixedResource = Resource(Topic, "foo", Prefixed)
+
val simpleAclAuthorizer = new SimpleAclAuthorizer
val simpleAclAuthorizer2 = new SimpleAclAuthorizer
val testPrincipal = Acl.WildCardPrincipal
val testHostName = InetAddress.getByName("192.168.0.1")
- val session = Session(testPrincipal, testHostName)
var resource: Resource = null
val superUsers = "User:superuser1; User:superuser2"
val username = "alice"
+ val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
+ val session = Session(principal, testHostName)
var config: KafkaConfig = null
@Before
@@ -54,7 +62,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
config = KafkaConfig.fromProps(props)
simpleAclAuthorizer.configure(config.originals)
simpleAclAuthorizer2.configure(config.originals)
- resource = new Resource(Topic, UUID.randomUUID().toString)
+ resource = new Resource(Topic, "foo-" + UUID.randomUUID(), Literal)
}
@After
@@ -64,6 +72,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
super.tearDown()
}
+ @Test(expected = classOf[IllegalArgumentException])
+ def testAuthorizeThrowsOnNoneLiteralResource() {
+ simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", Prefixed))
+ }
+
@Test
def testTopicAcl() {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
@@ -161,7 +174,6 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host1 = InetAddress.getByName("192.168.3.1")
val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read)
- val wildCardResource = new Resource(resource.resourceType, Resource.WildCardResource)
val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
@@ -222,10 +234,10 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
val resourceToAcls = Map[Resource, Set[Acl]](
- new Resource(Topic, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
- new Resource(Cluster, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
- new Resource(Group, Resource.WildCardResource) -> acls,
- new Resource(Group, "test-ConsumerGroup") -> acls
+ new Resource(Topic, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
+ new Resource(Cluster, Resource.WildCardResource, Literal) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
+ new Resource(Group, Resource.WildCardResource, Literal) -> acls,
+ new Resource(Group, "test-ConsumerGroup", Literal) -> acls
)
resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
@@ -253,7 +265,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
simpleAclAuthorizer.addAcls(acls, resource)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val resource1 = new Resource(Topic, "test-2")
+ val resource1 = new Resource(Topic, "test-2", Literal)
val acl2 = new Acl(user2, Deny, "host3", Read)
val acls1 = Set[Acl](acl2)
simpleAclAuthorizer.addAcls(acls1, resource1)
@@ -272,7 +284,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test
def testLocalConcurrentModificationOfResourceAcls() {
- val commonResource = new Resource(Topic, "test")
+ val commonResource = new Resource(Topic, "test", Literal)
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -288,7 +300,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test
def testDistributedConcurrentModificationOfResourceAcls() {
- val commonResource = new Resource(Topic, "test")
+ val commonResource = new Resource(Topic, "test", Literal)
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new Acl(user1, Allow, WildCardHost, Read)
@@ -318,7 +330,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
@Test
def testHighConcurrencyModificationOfResourceAcls() {
- val commonResource = new Resource(Topic, "test")
+ val commonResource = new Resource(Topic, "test", Literal)
val acls = (0 to 50).map { i =>
val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
@@ -419,6 +431,127 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
}
+ @Test
+ def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
+
+ assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+ }
+
+ @Test
+ def testDeleteAclOnWildcardResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
+
+ simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), wildCardResource)
+
+ assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(wildCardResource))
+ }
+
+ @Test
+ def testDeleteAllAclOnWildcardResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
+
+ simpleAclAuthorizer.removeAcls(wildCardResource)
+
+ assertEquals(Map(), simpleAclAuthorizer.getAcls())
+ }
+
+ @Test
+ def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
+
+ assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+ }
+
+ @Test
+ def testDeleteAclOnPrefixedResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
+
+ simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), prefixedResource)
+
+ assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(prefixedResource))
+ }
+
+ @Test
+ def testDeleteAllAclOnPrefixedResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
+
+ simpleAclAuthorizer.removeAcls(prefixedResource)
+
+ assertEquals(Map(), simpleAclAuthorizer.getAcls())
+ }
+
+ @Test
+ def testAddAclsOnLiteralResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), resource)
+ simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), resource)
+
+ assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(resource))
+ assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
+ assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
+ }
+
+ @Test
+ def testAddAclsOnWildcardResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
+ simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), wildCardResource)
+
+ assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(wildCardResource))
+ assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
+ assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
+ }
+
+ @Test
+ def testAddAclsOnPrefiexedResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
+ simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), prefixedResource)
+
+ assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(prefixedResource))
+ assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
+ assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
+ }
+
+ @Test
+ def testAuthorizeWithPrefixedResource(): Unit = {
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Literal))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Prefixed))
+ simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", Literal))
+
+ simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
+
+ assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+ }
+
+ @Test
+ def testGetAclsPrincipal(): Unit = {
+ assertEquals(0, simpleAclAuthorizer.getAcls(principal).size)
+
+ val acl1 = new Acl(principal, Allow, WildCardHost, Write)
+ simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource)
+ assertEquals(1, simpleAclAuthorizer.getAcls(principal).size)
+
+ simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Topic, Acl.WildCardResource, Literal))
+ assertEquals(2, simpleAclAuthorizer.getAcls(principal).size)
+
+ val acl2 = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
+ simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, "groupA", Literal))
+ assertEquals(3, simpleAclAuthorizer.getAcls(principal).size)
+
+ // add prefixed principal acl on wildcard group name
+ val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, principal.getName.charAt(0) + WildCardResource), Allow, WildCardHost, Write)
+ simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, Acl.WildCardResource, Literal))
+ assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
+ }
+
private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
var acls = originalAcls
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index fe7aca2cc6494..eec7175983ef3 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -242,7 +242,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
//get all tokens for multiple owners (owner1, renewer4) and with permission
var acl = new Acl(owner1, Allow, WildCardHost, Describe)
- simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3))
+ simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId3, Literal))
tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, owner1, List(owner1, renewer4))
assert(tokens.size == 3)
@@ -257,7 +257,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
//get all tokens for multiple owners (renewer2, renewer3) which are token renewers principals and with permissions
hostSession = new Session(renewer2, InetAddress.getByName("192.168.1.1"))
acl = new Acl(renewer2, Allow, WildCardHost, Describe)
- simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2))
+ simpleAclAuthorizer.addAcls(Set(acl), new Resource(kafka.security.auth.DelegationToken, tokenId2, Literal))
tokens = getTokens(tokenManager, simpleAclAuthorizer, hostSession, renewer2, List(renewer2, renewer3))
assert(tokens.size == 2)
@@ -271,7 +271,7 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness {
List()
}
else {
- def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId))
+ def authorizeToken(tokenId: String) = simpleAclAuthorizer.authorize(hostSession, Describe, new Resource(kafka.security.auth.DelegationToken, tokenId, Literal))
def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, Option(requestedOwners), token, authorizeToken)
tokenManager.getTokens(eligible)
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3f2f66c089055..59f543b522a1f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -22,10 +22,9 @@ import kafka.log.LogConfig
import kafka.network.RequestChannel.Session
import kafka.security.auth._
import kafka.utils.TestUtils
-
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{ResourceFilter, ResourceNameType, Resource => AdminResource, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
@@ -318,7 +317,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DELETE_ACLS =>
new DeleteAclsRequest.Builder(Collections.singletonList(new AclBindingFilter(
- new ResourceFilter(AdminResourceType.TOPIC, null),
+ new ResourceFilter(AdminResourceType.TOPIC, null, ResourceNameType.LITERAL),
new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY))))
case ApiKeys.DESCRIBE_CONFIGS =>
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ec6c756d4531c..f50ef3a8a35d7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1142,8 +1142,11 @@ object TestUtils extends Logging {
}
def waitAndVerifyAcls(expected: Set[Acl], authorizer: Authorizer, resource: Resource) = {
+ val newLine = scala.util.Properties.lineSeparator
+
TestUtils.waitUntilTrue(() => authorizer.getAcls(resource) == expected,
- s"expected acls $expected but got ${authorizer.getAcls(resource)}", waitTime = JTestUtils.DEFAULT_MAX_WAIT_MS)
+ s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
+ s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}", waitTime = JTestUtils.DEFAULT_MAX_WAIT_MS)
}
/**
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 1aeca2203b49d..cfaf731768019 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -427,72 +427,76 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testAclManagementMethods() {
- assertFalse(zkClient.pathExists(AclZNode.path))
- assertFalse(zkClient.pathExists(AclChangeNotificationZNode.path))
- ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+ ZkAclStore.stores.foreach(store => {
+ assertFalse(zkClient.pathExists(store.aclPath))
+ assertFalse(zkClient.pathExists(store.aclChangePath))
+ ResourceType.values.foreach(resource => assertFalse(zkClient.pathExists(store.path(resource))))
+ })
// create acl paths
zkClient.createAclPaths
- assertTrue(zkClient.pathExists(AclZNode.path))
- assertTrue(zkClient.pathExists(AclChangeNotificationZNode.path))
- ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+ ZkAclStore.stores.foreach(store => {
+ assertTrue(zkClient.pathExists(store.aclPath))
+ assertTrue(zkClient.pathExists(store.aclChangePath))
+ ResourceType.values.foreach(resource => assertTrue(zkClient.pathExists(store.path(resource))))
- val resource1 = new Resource(Topic, UUID.randomUUID().toString)
- val resource2 = new Resource(Topic, UUID.randomUUID().toString)
+ val resource1 = new Resource(Topic, UUID.randomUUID().toString, store.nameType)
+ val resource2 = new Resource(Topic, UUID.randomUUID().toString, store.nameType)
- // try getting acls for non-existing resource
- var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
- assertTrue(versionedAcls.acls.isEmpty)
- assertEquals(-1, versionedAcls.zkVersion)
- assertFalse(zkClient.resourceExists(resource1))
+ // try getting acls for non-existing resource
+ var versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+ assertTrue(versionedAcls.acls.isEmpty)
+ assertEquals(-1, versionedAcls.zkVersion)
+ assertFalse(zkClient.resourceExists(resource1))
- val acl1 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"), Deny, "host1" , Read)
- val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Allow, "*", Read)
- val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read)
+ val acl1 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice"), Deny, "host1" , Read)
+ val acl2 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Allow, "*", Read)
+ val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob"), Deny, "host1", Read)
- //create acls for resources
- zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl2), 0)
- zkClient.conditionalSetOrCreateAclsForResource(resource2, Set(acl1, acl3), 0)
+ //create acls for resources
+ zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl2), 0)
+ zkClient.conditionalSetOrCreateAclsForResource(resource2, Set(acl1, acl3), 0)
- versionedAcls = zkClient.getVersionedAclsForResource(resource1)
- assertEquals(Set(acl1, acl2), versionedAcls.acls)
- assertEquals(0, versionedAcls.zkVersion)
- assertTrue(zkClient.resourceExists(resource1))
+ versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+ assertEquals(Set(acl1, acl2), versionedAcls.acls)
+ assertEquals(0, versionedAcls.zkVersion)
+ assertTrue(zkClient.resourceExists(resource1))
- //update acls for resource
- zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl3), 0)
+ //update acls for resource
+ zkClient.conditionalSetOrCreateAclsForResource(resource1, Set(acl1, acl3), 0)
- versionedAcls = zkClient.getVersionedAclsForResource(resource1)
- assertEquals(Set(acl1, acl3), versionedAcls.acls)
- assertEquals(1, versionedAcls.zkVersion)
+ versionedAcls = zkClient.getVersionedAclsForResource(resource1)
+ assertEquals(Set(acl1, acl3), versionedAcls.acls)
+ assertEquals(1, versionedAcls.zkVersion)
- //get resource Types
- assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes().toSet)
+ //get resource Types
+ assertTrue(ResourceType.values.map( rt => rt.name).toSet == zkClient.getResourceTypes(store.nameType).toSet)
- //get resource name
- val resourceNames = zkClient.getResourceNames(Topic.name)
- assertEquals(2, resourceNames.size)
- assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
+ //get resource name
+ val resourceNames = zkClient.getResourceNames(store.nameType, Topic)
+ assertEquals(2, resourceNames.size)
+ assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
- //delete resource
- assertTrue(zkClient.deleteResource(resource1))
- assertFalse(zkClient.resourceExists(resource1))
+ //delete resource
+ assertTrue(zkClient.deleteResource(resource1))
+ assertFalse(zkClient.resourceExists(resource1))
- //delete with invalid expected zk version
- assertFalse(zkClient.conditionalDelete(resource2, 10))
- //delete with valid expected zk version
- assertTrue(zkClient.conditionalDelete(resource2, 0))
+ //delete with invalid expected zk version
+ assertFalse(zkClient.conditionalDelete(resource2, 10))
+ //delete with valid expected zk version
+ assertTrue(zkClient.conditionalDelete(resource2, 0))
- zkClient.createAclChangeNotification("resource1")
- zkClient.createAclChangeNotification("resource2")
+ zkClient.createAclChangeNotification(Resource(Group, "resource1", store.nameType))
+ zkClient.createAclChangeNotification(Resource(Topic, "resource2", store.nameType))
- assertEquals(2, zkClient.getChildren(AclChangeNotificationZNode.path).size)
+ assertEquals(2, zkClient.getChildren(store.aclChangePath).size)
- zkClient.deleteAclChangeNotifications()
- assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty)
+ zkClient.deleteAclChangeNotifications()
+ assertTrue(zkClient.getChildren(store.aclChangePath).isEmpty)
+ })
}
@Test
diff --git a/docs/security.html b/docs/security.html
index 0ef37d75e929b..57bba4775a33b 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1099,6 +1099,19 @@ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topicNote that ``--allow-host`` and ``deny-host`` only support IP addresses (hostnames are not supported). Above examples add acls to a topic by specifying --topic [topic-name] as the resource option. Similarly user can add acls to cluster by specifying --cluster and to a consumer group by specifying --group [group-name]. + You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl "Principal User:Peter is allowed to produce to any Topic from IP 198.51.200.0" + You can do that by using the wildcard resource '*', e.g. by executing the CLI with following options: +
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic *+ You can add acls on resources matching a certain prefix, e.g. suppose you want to add an acl "Principal User:Jane is allowed to produce to any Topic whose name is prefixed with 'Test-' from any host". + You can do that by executing the CLI with following options: +
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Jane --producer --topic Test- --resource-name-type Prefixed+ Note, --resource-name-type defaults to 'literal', which only affects resources with the exact same name. The exception to this is the wildcard resource name '*', which should also be added using 'literal'.
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Jane --producer --topic Test- --resource-name-type Prefixed
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic --resource-name-type any
NOTE: any prefixed ACLs added to a cluster will be ignored should the cluster be downgraded again. +
--new-consumer option for all consumer based tools. This option is redundant since the new consumer is automatically
used if --bootstrap-server is defined.
+ leader_epoch field.