diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index b1504b1acdffa..54d33a526a7ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -73,7 +73,7 @@ public static Schema[] schemaVersions() { * * REQUEST_TIMED_OUT(7) * INVALID_TOPIC_EXCEPTION(17) - * CLUSTER_AUTHORIZATION_FAILED(31) + * TOPIC_AUTHORIZATION_FAILED(29) * TOPIC_ALREADY_EXISTS(36) * INVALID_PARTITIONS(37) * INVALID_REPLICATION_FACTOR(38) @@ -81,6 +81,7 @@ public static Schema[] schemaVersions() { * INVALID_CONFIG(40) * NOT_CONTROLLER(41) * INVALID_REQUEST(42) + * POLICY_VIOLATION(44) */ private final Map errors; diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 6dd227223e026..4409a187ae233 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -31,7 +31,7 @@ object AclCommand extends Logging { val Newline = scala.util.Properties.lineSeparator val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( - Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), + Topic -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs, All), Group -> Set(Read, Describe, Delete, All), Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All), TransactionalId -> Set(Describe, Write, All), @@ -153,13 +153,16 @@ object AclCommand extends Logging { val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId) val enableIdempotence = opts.options.has(opts.idempotentOpt) - val acls = getAcl(opts, Set(Write, Describe)) + val topicAcls = getAcl(opts, Set(Write, Describe, Create)) + val transactionalIdAcls = getAcl(opts, Set(Write, Describe)) - //Write, Describe permission on topics, Create permission on cluster, Write, Describe on transactionalIds - topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ - transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] + - (Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++ - (if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else Set.empty[Acl]))) + //Write, Describe, Create permission on topics, Write, Describe on transactionalIds + topics.map(_ -> topicAcls).toMap ++ + transactionalIds.map(_ -> transactionalIdAcls).toMap ++ + (if (enableIdempotence) + Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite))) + else + Map.empty) } private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -168,12 +171,12 @@ object AclCommand extends Logging { val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic) val groups: Set[Resource] = resources.filter(_.resourceType == Group) - //Read,Describe on topic, Read on consumerGroup + Create on cluster + //Read, Describe on topic, Read on consumerGroup val acls = getAcl(opts, Set(Read, Describe)) - topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ - groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]] + topics.map(_ -> acls).toMap ++ + groups.map(_ -> getAcl(opts, Set(Read))).toMap } private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -355,7 +358,7 @@ object AclCommand extends Logging { .ofType(classOf[String]) val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " + - "This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") + "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.") val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " + "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9f1ab62f03dc3..98672c8287a27 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -62,6 +62,7 @@ import scala.collection.JavaConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} +import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails /** * Logic to handle the various Kafka requests @@ -1040,8 +1041,10 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authorize(request.session, Create, Resource.ClusterResource)) { - authorizedTopics --= nonExistingTopics - unauthorizedForCreateTopics ++= nonExistingTopics + unauthorizedForCreateTopics = nonExistingTopics.filter { topic => + !authorize(request.session, Create, new Resource(Topic, topic)) + } + authorizedTopics --= unauthorizedForCreateTopics } } } @@ -1424,16 +1427,20 @@ class KafkaApis(val requestChannel: RequestChannel, (topic, new ApiError(Errors.NOT_CONTROLLER, null)) } sendResponseCallback(results) - } else if (!authorize(request.session, Create, Resource.ClusterResource)) { - val results = createTopicsRequest.topics.asScala.map { case (topic, _) => - (topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)) - } - sendResponseCallback(results) } else { val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) => !createTopicsRequest.duplicateTopics.contains(topic) } + val (authorizedTopics, unauthorizedTopics) = + if (authorize(request.session, Create, Resource.ClusterResource)) { + (validTopics, Map[String, TopicDetails]()) + } else { + validTopics.partition { case (topic, _) => + authorize(request.session, Create, new Resource(Topic, topic)) + } + } + // Special handling to add duplicate topics to the response def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = { @@ -1447,14 +1454,15 @@ class KafkaApis(val requestChannel: RequestChannel, duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap } else Map.empty - val completeResults = results ++ duplicatedTopicsResults + val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) + val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults sendResponseCallback(completeResults) } adminManager.createTopics( createTopicsRequest.timeout, createTopicsRequest.validateOnly, - validTopics, + authorizedTopics, sendResponseWithDuplicatesCallback ) } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4af9b83f3b840..ea5a155b5dcca 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -17,6 +17,7 @@ import java.util import java.util.concurrent.ExecutionException import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} +import java.time.Duration import kafka.admin.AdminClient import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} @@ -73,6 +74,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val groupResource = new Resource(Group, group) val deleteTopicResource = new Resource(Topic, deleteTopic) val transactionalIdResource = new Resource(TransactionalId, transactionalId) + val createTopicResource = new Resource(Topic, createTopic) val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) @@ -82,6 +84,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter))) val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite))) + val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create))) val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write))) val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) @@ -207,7 +210,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEADER_AND_ISR -> clusterAcl, ApiKeys.STOP_REPLICA -> clusterAcl, ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl, - ApiKeys.CREATE_TOPICS -> clusterCreateAcl, + ApiKeys.CREATE_TOPICS -> topicCreateAcl, ApiKeys.DELETE_TOPICS -> topicDeleteAcl, ApiKeys.DELETE_RECORDS -> topicDeleteAcl, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl, @@ -492,6 +495,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + @Test + def testCreateTopicAuthorizationWithClusterCreate() { + removeAllAcls() + val resources = Set[ResourceType](Topic) + + sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = false) + + for ((resource, acls) <- clusterCreateAcl) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = true) + } + @Test def testFetchFollowerRequest() { val key = ApiKeys.FETCH @@ -551,18 +566,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testCreatePermissionNeededForWritingToNonExistentTopic() { - val newTopic = "newTopic" - val topicPartition = new TopicPartition(newTopic, 0) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) + def testCreatePermissionOnTopicToWriteToNonExistentTopic() { + testCreatePermissionNeededToWriteToNonExistentTopic(Topic) + } + + @Test + def testCreatePermissionOnClusterToWriteToNonExistentTopic() { + testCreatePermissionNeededToWriteToNonExistentTopic(Cluster) + } + + private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) { + val topicPartition = new TopicPartition(createTopic, 0) + val newTopicResource = new Resource(Topic, createTopic) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) try { sendRecords(numRecords, topicPartition) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(createTopic), e.unauthorizedTopics()) } - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), resource) + sendRecords(numRecords, topicPartition) } @@ -800,27 +827,37 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testCreatePermissionNeededToReadFromNonExistentTopic() { - val newTopic = "newTopic" + def testCreatePermissionOnTopicToReadFromNonExistentTopic() { + testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Topic) + } + + @Test + def testCreatePermissionOnClusterToReadFromNonExistentTopic() { + testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Cluster) + } + + private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) { val topicPartition = new TopicPartition(newTopic, 0) val newTopicResource = new Resource(Topic, newTopic) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource) - addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource) - try { - this.consumers.head.assign(List(topicPartition).asJava) - consumeRecords(this.consumers.head) - Assert.fail("should have thrown exception") - } catch { - case e: TopicAuthorizationException => - assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) - } + this.consumers.head.assign(List(topicPartition).asJava) + val unauthorizedTopics = intercept[TopicAuthorizationException] { + (0 until 10).foreach(_ => consumers.head.poll(Duration.ofMillis(50L))) + }.unauthorizedTopics + assertEquals(Collections.singleton(newTopic), unauthorizedTopics) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource + addAndVerifyAcls(acls, resource) - sendRecords(numRecords, topicPartition) - consumeRecords(this.consumers.head, topic = newTopic, part = 0) + TestUtils.waitUntilTrue(() => { + this.consumers.head.poll(Duration.ofMillis(50L)) + this.zkClient.topicExists(newTopic) + }, "Expected topic was not created") } @Test(expected = classOf[AuthorizationException]) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 4189ce3610b3b..c81b32d121bdf 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -60,7 +60,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override val serverCount = 3 override def configureSecurityBeforeServersStart() { - AclCommand.main(clusterAclArgs) + AclCommand.main(clusterActionArgs) AclCommand.main(topicBrokerReadAclArgs) } @@ -82,23 +82,20 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas val wildcardTopicResource = new Resource(Topic, wildcard) val wildcardGroupResource = new Resource(Group, wildcard) - // Arguments to AclCommand to set ACLs. There are three definitions here: - // 1- Provides read and write access to topic - // 2- Provides only write access to topic - // 3- Provides read access to consumer group - def clusterAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + // Arguments to AclCommand to set ACLs. + def clusterActionArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--cluster", + s"--operation=ClusterAction", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") - def produceAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$wildcard", + s"--operation=Read", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", s"--topic=$topic", @@ -124,13 +121,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--topic=$topic", s"--operation=Write", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def consumeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--group=$group", + s"--consumer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def groupAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", @@ -138,13 +135,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--operation=Read", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$wildcard", + s"--group=$wildcard", + s"--consumer", + s"--producer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read)) @@ -152,6 +149,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write)) def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Describe)) + def TopicCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Create)) // The next two configuration parameters enable ZooKeeper secure ACLs // and sets the Kafka authorizer, both necessary to enable security. this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") @@ -160,6 +158,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") + this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") /** @@ -200,14 +199,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test def testProduceConsumeViaAssign(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head, numRecords) } @Test def testProduceConsumeViaSubscribe(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) consumers.head.subscribe(List(topic).asJava) consumeRecords(this.consumers.head, numRecords) } @@ -223,16 +222,25 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas private def setWildcardResourceAcls() { AclCommand.main(produceConsumeWildcardAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource) } } - protected def setAclsAndProduce() { - AclCommand.main(produceAclArgs) - AclCommand.main(consumeAclArgs) + @Test + def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = { + // topic2 is not created on setup() + val tp2 = new TopicPartition("topic2", 0) + setAclsAndProduce(tp2) + consumers.head.assign(List(tp2).asJava) + consumeRecords(this.consumers.head, numRecords, topic = tp2.topic) + } + + protected def setAclsAndProduce(tp: TopicPartition) { + AclCommand.main(produceAclArgs(tp.topic)) + AclCommand.main(consumeAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, new Resource(Topic, tp.topic)) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } sendRecords(numRecords, tp) @@ -283,10 +291,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithoutDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } @@ -328,10 +336,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } sendRecords(numRecords, tp) @@ -343,9 +351,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test def testNoGroupAcl(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) } sendRecords(numRecords, tp) consumers.head.assign(List(tp).asJava) diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index a5bf33171a4a0..643cd4ce6afbb 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -56,7 +56,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { */ @Test(timeout = 15000) def testTwoConsumersWithDifferentSaslCredentials(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) val consumer1 = consumers.head val consumer2Config = new Properties diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 9b6272860ed7a..9197f79882f08 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -49,8 +49,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { ) private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( - TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs), - Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete", + TopicResources -> (Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs), + Array("--operation", "Read" , "--operation", "Write", "--operation", "Create", "--operation", "Describe", "--operation", "Delete", "--operation", "DescribeConfigs", "--operation", "AlterConfigs")), Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite), Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs", @@ -61,10 +61,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { ) private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]]( - TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), + TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe, Create), Hosts), TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), - Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Some(Create), - if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) + Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, + Set(if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) ) private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]]( diff --git a/docs/security.html b/docs/security.html index 06dd8fbd1fc03..0ef37d75e929b 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1133,7 +1133,7 @@

Command Line Interface --producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, - DESCRIBE on topic and CREATE on cluster. + DESCRIBE and CREATE on topic. Convenience diff --git a/docs/upgrade.html b/docs/upgrade.html index 532c8bc572802..4f1c5b34767d9 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -98,13 +98,10 @@
Notable changes in 2 will be removed in a future version.
  • The internal method kafka.admin.AdminClient.deleteRecordsBefore has been removed. Users are encouraged to migrate to org.apache.kafka.clients.admin.AdminClient.deleteRecords.
  • The tool kafka.tools.ReplayLogProducer has been removed.
  • -
  • KIP-176 finally removes - the --new-consumer option for all consumer based tools as kafka-console-consumer, kafka-consumer-perf-test - and kafka-consumer-groups. - The new consumer is automatically used if the bootstrap servers list is provided on the command line - otherwise, when the zookeeper connection is provided, the old consumer is used. - The --new-consumer option had already been ignored as the way of selecting the consumer since Kafka 1.0.0, - this KIP just removes the option. +
  • The AclCommand tool --producer convenience option uses the KIP-277 finer grained ACL on the given topic.
  • +
  • KIP-176 removes + the --new-consumer option for all consumer based tools. This option is redundant since the new consumer is automatically + used if --bootstrap-server is defined.