Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ public static Schema[] schemaVersions() {
*
* REQUEST_TIMED_OUT(7)
* INVALID_TOPIC_EXCEPTION(17)
* CLUSTER_AUTHORIZATION_FAILED(31)
* TOPIC_AUTHORIZATION_FAILED(29)
* TOPIC_ALREADY_EXISTS(36)
* INVALID_PARTITIONS(37)
* INVALID_REPLICATION_FACTOR(38)
* INVALID_REPLICA_ASSIGNMENT(39)
* INVALID_CONFIG(40)
* NOT_CONTROLLER(41)
* INVALID_REQUEST(42)
* POLICY_VIOLATION(44)
*/

private final Map<String, ApiError> errors;
Expand Down
25 changes: 14 additions & 11 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object AclCommand extends Logging {

val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All),
Topic -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs, All),
Group -> Set(Read, Describe, Delete, All),
Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All),
TransactionalId -> Set(Describe, Write, All),
Expand Down Expand Up @@ -153,13 +153,16 @@ object AclCommand extends Logging {
val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId)
val enableIdempotence = opts.options.has(opts.idempotentOpt)

val acls = getAcl(opts, Set(Write, Describe))
val topicAcls = getAcl(opts, Set(Write, Describe, Create))
val transactionalIdAcls = getAcl(opts, Set(Write, Describe))

//Write, Describe permission on topics, Create permission on cluster, Write, Describe on transactionalIds
topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] +
(Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++
(if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else Set.empty[Acl])))
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
topics.map(_ -> topicAcls).toMap ++
transactionalIds.map(_ -> transactionalIdAcls).toMap ++
(if (enableIdempotence)
Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite)))
else
Map.empty)
}

private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
Expand All @@ -168,12 +171,12 @@ object AclCommand extends Logging {
val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
val groups: Set[Resource] = resources.filter(_.resourceType == Group)

//Read,Describe on topic, Read on consumerGroup + Create on cluster
//Read, Describe on topic, Read on consumerGroup

val acls = getAcl(opts, Set(Read, Describe))

topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
topics.map(_ -> acls).toMap ++
groups.map(_ -> getAcl(opts, Set(Read))).toMap
}

private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
Expand Down Expand Up @@ -355,7 +358,7 @@ object AclCommand extends Logging {
.ofType(classOf[String])

val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
"This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE on cluster. ")
"This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.")

val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
"This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")
Expand Down
26 changes: 17 additions & 9 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import scala.collection.JavaConverters._
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails

/**
* Logic to handle the various Kafka requests
Expand Down Expand Up @@ -1040,8 +1041,10 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
if (!authorize(request.session, Create, Resource.ClusterResource)) {
authorizedTopics --= nonExistingTopics
unauthorizedForCreateTopics ++= nonExistingTopics
unauthorizedForCreateTopics = nonExistingTopics.filter { topic =>
!authorize(request.session, Create, new Resource(Topic, topic))
}
authorizedTopics --= unauthorizedForCreateTopics
}
}
}
Expand Down Expand Up @@ -1424,16 +1427,20 @@ class KafkaApis(val requestChannel: RequestChannel,
(topic, new ApiError(Errors.NOT_CONTROLLER, null))
}
sendResponseCallback(results)
} else if (!authorize(request.session, Create, Resource.ClusterResource)) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
}
sendResponseCallback(results)
} else {
val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) =>
!createTopicsRequest.duplicateTopics.contains(topic)
}

val (authorizedTopics, unauthorizedTopics) =
if (authorize(request.session, Create, Resource.ClusterResource)) {
(validTopics, Map[String, TopicDetails]())
} else {
validTopics.partition { case (topic, _) =>
authorize(request.session, Create, new Resource(Topic, topic))
}
}

// Special handling to add duplicate topics to the response
def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = {

Expand All @@ -1447,14 +1454,15 @@ class KafkaApis(val requestChannel: RequestChannel,
duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty

val completeResults = results ++ duplicatedTopicsResults
val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults
sendResponseCallback(completeResults)
}

adminManager.createTopics(
createTopicsRequest.timeout,
createTopicsRequest.validateOnly,
validTopics,
authorizedTopics,
sendResponseWithDuplicatesCallback
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.util
import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties}
import java.time.Duration

import kafka.admin.AdminClient
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService}
Expand Down Expand Up @@ -73,6 +74,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val groupResource = new Resource(Group, group)
val deleteTopicResource = new Resource(Topic, deleteTopic)
val transactionalIdResource = new Resource(TransactionalId, transactionalId)
val createTopicResource = new Resource(Topic, createTopic)

val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
Expand All @@ -82,6 +84,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)))
val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
Expand Down Expand Up @@ -207,7 +210,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.LEADER_AND_ISR -> clusterAcl,
ApiKeys.STOP_REPLICA -> clusterAcl,
ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl,
ApiKeys.CREATE_TOPICS -> clusterCreateAcl,
ApiKeys.CREATE_TOPICS -> topicCreateAcl,
ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
ApiKeys.DELETE_RECORDS -> topicDeleteAcl,
ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
Expand Down Expand Up @@ -492,6 +495,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}

@Test
def testCreateTopicAuthorizationWithClusterCreate() {
removeAllAcls()
val resources = Set[ResourceType](Topic)

sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = false)

for ((resource, acls) <- clusterCreateAcl)
addAndVerifyAcls(acls, resource)
sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = true)
}

@Test
def testFetchFollowerRequest() {
val key = ApiKeys.FETCH
Expand Down Expand Up @@ -551,18 +566,30 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}

@Test
def testCreatePermissionNeededForWritingToNonExistentTopic() {
val newTopic = "newTopic"
val topicPartition = new TopicPartition(newTopic, 0)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
def testCreatePermissionOnTopicToWriteToNonExistentTopic() {
testCreatePermissionNeededToWriteToNonExistentTopic(Topic)
}

@Test
def testCreatePermissionOnClusterToWriteToNonExistentTopic() {
testCreatePermissionNeededToWriteToNonExistentTopic(Cluster)
}

private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) {
val topicPartition = new TopicPartition(createTopic, 0)
val newTopicResource = new Resource(Topic, createTopic)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
try {
sendRecords(numRecords, topicPartition)
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(createTopic), e.unauthorizedTopics())
}

addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), resource)

sendRecords(numRecords, topicPartition)
}

Expand Down Expand Up @@ -800,27 +827,37 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}

@Test
def testCreatePermissionNeededToReadFromNonExistentTopic() {
val newTopic = "newTopic"
def testCreatePermissionOnTopicToReadFromNonExistentTopic() {
testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
Topic)
}

@Test
def testCreatePermissionOnClusterToReadFromNonExistentTopic() {
testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)),
Cluster)
}

private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) {
val topicPartition = new TopicPartition(newTopic, 0)
val newTopicResource = new Resource(Topic, newTopic)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource)
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource)
try {
this.consumers.head.assign(List(topicPartition).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
}
this.consumers.head.assign(List(topicPartition).asJava)
val unauthorizedTopics = intercept[TopicAuthorizationException] {
(0 until 10).foreach(_ => consumers.head.poll(Duration.ofMillis(50L)))
}.unauthorizedTopics
assertEquals(Collections.singleton(newTopic), unauthorizedTopics)

addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource)
addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource
addAndVerifyAcls(acls, resource)

sendRecords(numRecords, topicPartition)
consumeRecords(this.consumers.head, topic = newTopic, part = 0)
TestUtils.waitUntilTrue(() => {
this.consumers.head.poll(Duration.ofMillis(50L))
this.zkClient.topicExists(newTopic)
}, "Expected topic was not created")
}

@Test(expected = classOf[AuthorizationException])
Expand Down
Loading