From 6b688376bb9e028311c93638eccac180a7662d81 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Thu, 26 May 2016 14:18:12 +0100 Subject: [PATCH 1/4] KAFKA-3396 : Unauthorized topics are returned to the user Modified KafkaApis to return Errors.UNKNOWN_TOPIC_OR_PARTITION if principal has no Describe access to topic Unit tests expanded Some paths cause the client to block due to bug https://issues.apache.org/jira/browse/KAFKA-3727?filter=-2 tests work around this by executing in separate thread --- .../main/scala/kafka/server/KafkaApis.scala | 36 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 83 +++++++-- .../kafka/api/EndToEndAuthorizationTest.scala | 161 ++++++++++++++++-- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- 4 files changed, 240 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 15e5b620c35b1..5655c9195f57d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -238,12 +238,18 @@ class KafkaApis(val requestChannel: RequestChannel, val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) => !metadataCache.contains(topicPartition.topic) } - val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { + val filteredRequestInfo1 = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys + + //filter topics with Describe ACL as if they were non-existent + val (filteredRequestInfo2, unauthorizedForDescribeRequestInfo) = filteredRequestInfo1.partition { + case (topicPartition, offsetMetadata) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) + } + + val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo2.partition { case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } - + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code) @@ -254,7 +260,8 @@ class KafkaApis(val requestChannel: RequestChannel, s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") } } - val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) val responseHeader = new ResponseHeader(header.correlationId) val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) @@ -814,22 +821,29 @@ class KafkaApis(val requestChannel: RequestChannel, var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic))) + var uncreatableTopics = Set[String]() + if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { authorizer.foreach { az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) { authorizedTopics --= nonExistingTopics - unauthorizedTopics ++= nonExistingTopics + uncreatableTopics ++= nonExistingTopics } } } } - val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic), + val uncreatableTopicMetadata = uncreatableTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) + // do not disclose the existence of the unauthorized topic + val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), + java.util.Collections.emptyList())) + // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list val errorUnavailableEndpoints = requestVersion == 0 @@ -839,7 +853,7 @@ class KafkaApis(val requestChannel: RequestChannel, else getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints) - val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata + val completeTopicMetadata = topicMetadata ++ uncreatableTopicMetadata ++ unauthorizedTopicMetadata val brokers = metadataCache.getAliveBrokers @@ -876,8 +890,6 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } - val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code) - val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code) if (header.apiVersion == 0) { @@ -902,7 +914,7 @@ class KafkaApis(val requestChannel: RequestChannel, Errors.forException(e).code)) } }.toMap - new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava) + new OffsetFetchResponse((responseInfo).asJava) } else { // version 1 reads offsets from Kafka; val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap @@ -910,7 +922,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Note that we do not need to filter the partitions in the // metadata cache as the topic partitions will be filtered // in coordinator's offset manager through the offset cache - new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava) + new OffsetFetchResponse((offsets).asJava) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 6d3b098ebc9ae..aa8538280f053 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -24,6 +24,7 @@ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ @@ -37,6 +38,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer + + class AuthorizerIntegrationTest extends BaseRequestTest { override def numBrokers: Int = 1 @@ -282,8 +285,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRecords(numRecords, tp) fail("sendRecords should have thrown") } catch { - case e: TopicAuthorizationException => - assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) + case e: org.apache.kafka.common.errors.TimeoutException => //expected } } @@ -386,7 +388,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + case e: TestTimeoutException => //expected + } finally { + this.consumers.foreach(_.wakeup()) } } @@ -466,7 +470,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } - @Test(expected = classOf[TopicAuthorizationException]) + @Test(expected = classOf[KafkaException]) def testCommitWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) @@ -512,11 +516,35 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.position(tp) } - @Test(expected = classOf[TopicAuthorizationException]) + @Test(expected = classOf[TestTimeoutException]) def testOffsetFetchWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) - this.consumers.head.position(tp) + + try { + var caughtExc : Exception = null + var positionSucceeded = false + val t = new java.lang.Thread() { + override def run () { + try { + AuthorizerIntegrationTest.this.consumers.head.position(tp) // consumer is stuck in org.apache.kafka.clients.consumer.internals.Fetcher.listOffset(TopicPartition, long) + positionSucceeded = true + } catch { + case e : Exception => caughtExc = e + } + } + } + t.start() + t.join(10000L) + + if (caughtExc != null) + throw caughtExc + + if (!positionSucceeded) + throw new TestTimeoutException("Failed to position in 10000 millis.") + } finally { + this.consumers.foreach(_.wakeup()) + } } @Test @@ -537,10 +565,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testListOffsetsWithNoTopicAccess() { - val e = intercept[TopicAuthorizationException] { - this.consumers.head.partitionsFor(topic) - } - assertEquals(Set(topic), e.unauthorizedTopics().asScala) + val partitionInfos = this.consumers.head.partitionsFor(topic); + assertNull(partitionInfos) } @Test @@ -635,21 +661,42 @@ class AuthorizerIntegrationTest extends BaseRequestTest { part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) + + var caughtExc : Exception = null + + val t = new java.lang.Thread() { + override def run () { + println("in consumeRecords.Thread.run") + try { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) + } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + } catch { + case e : Exception => caughtExc = e + } } - if (iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 } + t.start() + t.join(10000L) + + if (caughtExc != null) + throw caughtExc + + if (records.isEmpty()) + throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + for (i <- 0 until numRecords) { val record = records.get(i) val offset = startingOffset + i assertEquals(topic, record.topic()) assertEquals(part, record.partition()) assertEquals(offset.toLong, record.offset()) - } + } } } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 8edb6f85aa89f..14c99888d035a 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -107,6 +107,26 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { s"--topic=$topic", s"--producer", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def describeAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--operation=Describe", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--remove", + s"--force", + s"--topic=$topic", + s"--operation=Describe", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--remove", + s"--force", + s"--topic=$topic", + s"--operation=Write", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def consumeAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", @@ -135,6 +155,8 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") + //to have testNoProduceAclWithoutDescribeAcl terminate quicker + this.producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000") /** * Starts MiniKDC and only then sets up the parent trait. @@ -203,23 +225,68 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { * isn't set. */ @Test - def testNoProduceAcl { + def testNoProduceAclWithoutDescribeAcl { //Produce records debug("Starting to send records") try{ sendRecords(numRecords, tp) - fail("Topic authorization exception expected") + fail("exception expected") } catch { - case e: TopicAuthorizationException => //expected + case e: org.apache.kafka.common.errors.TimeoutException => //expected } } - /** + @Test + def testNoProduceAclWithDescribeAcl { + AclCommand.main(describeAclArgs) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) + }) + //Produce records + debug("Starting to send records") + try{ + sendRecords(numRecords, tp) + fail("exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + /** * Tests that a consumer fails to consume messages without the appropriate * ACL set. */ @Test - def testNoConsumeAcl { + def testNoConsumeWithoutDescribeAclViaAssign { + noConsumeWithoutDescribeAclProlog + consumers.head.assign(List(tp).asJava) + + try { + consumeRecords(this.consumers.head) + fail("exception expected") + } catch { + case e: TestTimeoutException => //expected from consumeRecords() + } finally { + //to avoid a ConcurrentModificationException in tearDown() + //as the consumer is stuck in poll() + consumers.foreach(_.wakeup()) + } + } + + @Test + def testNoConsumeWithoutDescribeAclViaSubscribe { + noConsumeWithoutDescribeAclProlog + consumers.head.subscribe(List(topic).asJava) + + try { + consumeRecords(this.consumers.head) + fail("exception expected") + } catch { + case e: TestTimeoutException => //expected from consumeRecords() + } + } + + private def noConsumeWithoutDescribeAclProlog { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) servers.foreach(s => { @@ -229,9 +296,26 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { //Produce records debug("Starting to send records") sendRecords(numRecords, tp) - //Consume records + + //Deleting topic ACL without asking for console confirmation + AclCommand.main(deleteDescribeAclArgs) + AclCommand.main(deleteWriteAclArgs) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + }) + debug("Finished sending and starting to consume records") + } + + /** + * Tests that a consumer fails to consume messages without the appropriate + * ACL set. + */ + @Test + def testNoConsumeAclWithDescribeAclViaAssign { + noConsumeAclWithDescribeAclProlog consumers.head.assign(List(tp).asJava) + try { consumeRecords(this.consumers.head) fail("Topic authorization exception expected") @@ -239,6 +323,33 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { case e: TopicAuthorizationException => //expected } } + + @Test + def testNoConsumeAclWithDescribeAclViaSubscribe { + noConsumeAclWithDescribeAclProlog + consumers.head.subscribe(List(topic).asJava) + + try { + consumeRecords(this.consumers.head) + fail("Topic authorization exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + private def noConsumeAclWithDescribeAclProlog { + AclCommand.main(produceAclArgs) + AclCommand.main(groupAclArgs) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + }) + //Produce records + debug("Starting to send records") + sendRecords(numRecords, tp) + //Consume records + debug("Finished sending and starting to consume records") + } /** * Tests that a consumer fails to consume messages without the appropriate @@ -284,15 +395,35 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) + + var caughtExc : Exception = null + + val t = new java.lang.Thread() { + override def run () { + try { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) + } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + } catch { + case e : Exception => caughtExc = e + } } - if (iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 } + t.start() + t.join(10000L) + + if (caughtExc != null) + throw caughtExc + + if (records.isEmpty()) + throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + for (i <- 0 until numRecords) { val record = records.get(i) val offset = startingOffset + i @@ -302,3 +433,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } } } + +class TestTimeoutException (msg:String) + extends java.lang.RuntimeException (msg) {} + diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4e1d2784289d6..481c15150ccd3 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -455,9 +455,11 @@ object TestUtils extends Logging { props: Option[Properties] = None): KafkaProducer[K, V] = { val producerProps = props.getOrElse(new Properties) + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) - producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) + if(!producerProps.containsKey(ProducerConfig.MAX_BLOCK_MS_CONFIG)) + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) From cce5726797959a11d79c9fdde768d5e30b1c385f Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 26 Jul 2016 19:20:06 +0100 Subject: [PATCH 2/4] KAFKA-3396: Updates and cleanups following the feedback --- .../main/scala/kafka/server/KafkaApis.scala | 52 +++++------ .../kafka/api/AuthorizerIntegrationTest.scala | 90 ++++++------------- .../kafka/api/EndToEndAuthorizationTest.scala | 87 ++++++++---------- .../scala/unit/kafka/utils/TestUtils.scala | 3 - 4 files changed, 91 insertions(+), 141 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5655c9195f57d..8c6f432d6e3dd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -234,35 +234,32 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new OffsetCommitResponse(results.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - // filter non-existent topics - val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) => - !metadataCache.contains(topicPartition.topic) + + val (nonExistingRequestsInfo, existingTopicsRequestsInfo) = offsetCommitRequest.offsetData.asScala.toMap.partition { + case (topicPartition, _) => !metadataCache.contains(topicPartition.topic) } - val filteredRequestInfo1 = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys - - //filter topics with Describe ACL as if they were non-existent - val (filteredRequestInfo2, unauthorizedForDescribeRequestInfo) = filteredRequestInfo1.partition { + // treat topics with Describe ACL as if they were non-existent + val (filteredRequestInfo, unauthorizedForDescribeRequestInfo) = existingTopicsRequestsInfo.partition { case (topicPartition, offsetMetadata) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } - - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo2.partition { + + val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } - + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code) + var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ nonExistingRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) - mergedCommitStatus.foreach { case (topicPartition, errorCode) => + combinedCommitStatus.foreach { case (topicPartition, errorCode) => if (errorCode != Errors.NONE.code) { debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") } } - var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) - combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) - val responseHeader = new ResponseHeader(header.correlationId) val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) @@ -818,10 +815,10 @@ class KafkaApis(val requestChannel: RequestChannel, metadataRequest.topics.asScala.toSet } - var (authorizedTopics, unauthorizedTopics) = + var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic))) - var uncreatableTopics = Set[String]() + var unauthorizedForCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) @@ -829,21 +826,25 @@ class KafkaApis(val requestChannel: RequestChannel, authorizer.foreach { az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) { authorizedTopics --= nonExistingTopics - uncreatableTopics ++= nonExistingTopics + unauthorizedForCreateTopics ++= nonExistingTopics } } } } - val uncreatableTopicMetadata = uncreatableTopics.map(topic => + val uncreatableTopicMetadata = unauthorizedForCreateTopics.map(topic => new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) - // do not disclose the existence of the unauthorized topic - val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), - java.util.Collections.emptyList())) - + // do not disclose the existence of unauthorized topics + val unauthorizedTopicMetadata = + // In case of all topics, don't include unauthorized topics + if ((requestVersion == 0 && (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)) || (metadataRequest.isAllTopics)) + Set.empty[MetadataResponse.TopicMetadata] + else + unauthorizedForDescribeTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) + // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list val errorUnavailableEndpoints = requestVersion == 0 @@ -891,6 +892,7 @@ class KafkaApis(val requestChannel: RequestChannel, authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap if (header.apiVersion == 0) { // version 0 reads offsets from ZK @@ -914,7 +916,7 @@ class KafkaApis(val requestChannel: RequestChannel, Errors.forException(e).code)) } }.toMap - new OffsetFetchResponse((responseInfo).asJava) + new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava) } else { // version 1 reads offsets from Kafka; val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap @@ -922,7 +924,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Note that we do not need to filter the partitions in the // metadata cache as the topic partitions will be filtered // in coordinator's offset manager through the offset cache - new OffsetFetchResponse((offsets).asJava) + new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index aa8538280f053..66bb46a01dcc2 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -37,6 +37,12 @@ import org.junit.{After, Assert, Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +import org.apache.kafka.common.KafkaException @@ -283,9 +289,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def testProduceWithNoTopicAccess() { try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { - case e: org.apache.kafka.common.errors.TimeoutException => //expected + case e: TimeoutException => //expected } } @@ -294,7 +300,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) @@ -306,7 +312,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) @@ -377,7 +383,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testConsumeWithNoTopicAccess() { + def testConsumeWithoutTopicDescribeAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() @@ -388,9 +394,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TestTimeoutException => //expected - } finally { - this.consumers.foreach(_.wakeup()) + case e: KafkaException => //expected } } @@ -516,35 +520,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.position(tp) } - @Test(expected = classOf[TestTimeoutException]) + @Test(expected = classOf[KafkaException]) def testOffsetFetchWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) - - try { - var caughtExc : Exception = null - var positionSucceeded = false - val t = new java.lang.Thread() { - override def run () { - try { - AuthorizerIntegrationTest.this.consumers.head.position(tp) // consumer is stuck in org.apache.kafka.clients.consumer.internals.Fetcher.listOffset(TopicPartition, long) - positionSucceeded = true - } catch { - case e : Exception => caughtExc = e - } - } - } - t.start() - t.join(10000L) - - if (caughtExc != null) - throw caughtExc - - if (!positionSucceeded) - throw new TestTimeoutException("Failed to position in 10000 millis.") - } finally { - this.consumers.foreach(_.wakeup()) - } + this.consumers.head.position(tp) } @Test @@ -619,7 +599,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val possibleErrorCodes = resources.flatMap { resourceType => if(resourceType == Topic) // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names - Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code()) + Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) else Seq(resourceType.errorCode) } @@ -661,35 +641,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest { part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - - var caughtExc : Exception = null - - val t = new java.lang.Thread() { - override def run () { - println("in consumeRecords.Thread.run") - try { - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) - } - if (iters > maxIters) - throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } - } catch { - case e : Exception => caughtExc = e + + val future = Future { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 } + records } - t.start() - t.join(10000L) - - if (caughtExc != null) - throw caughtExc - - if (records.isEmpty()) - throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + val result = Await.result(future, 10 seconds) for (i <- 0 until numRecords) { val record = records.get(i) @@ -697,6 +662,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(topic, record.topic()) assertEquals(part, record.partition()) assertEquals(offset.toLong, record.offset()) - } + } } + } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 14c99888d035a..8567e75b021b8 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -30,13 +30,17 @@ import kafka.utils._ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig} import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig} import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.{TopicPartition} +import org.apache.kafka.common.{TopicPartition,KafkaException} import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException} +import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException,TimeoutException} import org.junit.Assert._ import org.junit.{Test, After, Before} import scala.collection.JavaConverters._ +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Failure, Success} /** @@ -171,9 +175,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } super.setUp AclCommand.main(topicBrokerReadAclArgs) - servers.foreach( s => + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) - ) + } // create the test topic with all the brokers as replicas TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers) } @@ -209,10 +213,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { private def setAclsAndProduce() { AclCommand.main(produceAclArgs) AclCommand.main(consumeAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -232,16 +236,16 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { sendRecords(numRecords, tp) fail("exception expected") } catch { - case e: org.apache.kafka.common.errors.TimeoutException => //expected + case e: TimeoutException => //expected } } @Test def testNoProduceAclWithDescribeAcl { AclCommand.main(describeAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) - }) + } //Produce records debug("Starting to send records") try{ @@ -265,11 +269,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { consumeRecords(this.consumers.head) fail("exception expected") } catch { - case e: TestTimeoutException => //expected from consumeRecords() - } finally { - //to avoid a ConcurrentModificationException in tearDown() - //as the consumer is stuck in poll() - consumers.foreach(_.wakeup()) + case e: KafkaException => //expected } } @@ -282,27 +282,27 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { consumeRecords(this.consumers.head) fail("exception expected") } catch { - case e: TestTimeoutException => //expected from consumeRecords() + case e: TestTimeoutException => //expected } } private def noConsumeWithoutDescribeAclProlog { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) - //Deleting topic ACL without asking for console confirmation + //Deleting topic ACL AclCommand.main(deleteDescribeAclArgs) AclCommand.main(deleteWriteAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) - }) + } debug("Finished sending and starting to consume records") } @@ -340,10 +340,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { private def noConsumeAclWithDescribeAclProlog { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -358,9 +358,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { @Test def testNoGroupAcl { AclCommand.main(produceAclArgs) - servers.foreach(s => + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) - ) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -396,33 +396,19 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - var caughtExc : Exception = null - - val t = new java.lang.Thread() { - override def run () { - try { - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) - } - if (iters > maxIters) - throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } - } catch { - case e : Exception => caughtExc = e + val future = Future { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 } + records } - t.start() - t.join(10000L) - - if (caughtExc != null) - throw caughtExc - - if (records.isEmpty()) - throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + val result = Await.result(future, 10 seconds) for (i <- 0 until numRecords) { val record = records.get(i) @@ -430,10 +416,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { assertEquals(topic, record.topic()) assertEquals(part, record.partition()) assertEquals(offset.toLong, record.offset()) - } + } } } -class TestTimeoutException (msg:String) - extends java.lang.RuntimeException (msg) {} +class TestTimeoutException(msg:String) extends java.lang.RuntimeException (msg) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 481c15150ccd3..e31e9fc78b4d2 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -455,11 +455,8 @@ object TestUtils extends Logging { props: Option[Properties] = None): KafkaProducer[K, V] = { val producerProps = props.getOrElse(new Properties) - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) - if(!producerProps.containsKey(ProducerConfig.MAX_BLOCK_MS_CONFIG)) - producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) From c7aefe21e5bb5692b75270cad8c7126f2a409a2c Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 17 Aug 2016 13:34:19 +0100 Subject: [PATCH 3/4] KAFKA-3396: More updates + rebased off trunk - Added tests from 44ad3ec - Small refactorings --- .../main/scala/kafka/server/KafkaApis.scala | 35 +++---- .../kafka/api/AuthorizerIntegrationTest.scala | 93 +++++++++++++++++-- 2 files changed, 100 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8c6f432d6e3dd..f88c0df3c0b77 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -235,24 +235,19 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - val (nonExistingRequestsInfo, existingTopicsRequestsInfo) = offsetCommitRequest.offsetData.asScala.toMap.partition { - case (topicPartition, _) => !metadataCache.contains(topicPartition.topic) + val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { + case (topicPartition, _) => metadataCache.contains(topicPartition.topic) && authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } - // treat topics with Describe ACL as if they were non-existent - val (filteredRequestInfo, unauthorizedForDescribeRequestInfo) = existingTopicsRequestsInfo.partition { - case (topicPartition, offsetMetadata) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) - } - - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { - case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) + val (authorizedTopics, unauthorizedForReadTopics) = existingOrAuthorizedForDescribeTopics.partition { + case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { - val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code) - var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ nonExistingRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) - combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) + var combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++ + unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) combinedCommitStatus.foreach { case (topicPartition, errorCode) => if (errorCode != Errors.NONE.code) { @@ -265,11 +260,11 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } - if (authorizedRequestInfo.isEmpty) + if (authorizedTopics.isEmpty) sendResponseCallback(Map.empty) else if (header.apiVersion == 0) { // for version 0 always store offsets to ZK - val responseInfo = authorizedRequestInfo.map { + val responseInfo = authorizedTopics.map { case (topicPartition, partitionData) => val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic) try { @@ -306,7 +301,7 @@ class KafkaApis(val requestChannel: RequestChannel, // - If v2 we use the default expiration timestamp val currentTimestamp = SystemTime.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - val partitionData = authorizedRequestInfo.mapValues { partitionData => + val partitionData = authorizedTopics.mapValues { partitionData => val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata new OffsetAndMetadata( offsetMetadata = OffsetMetadata(partitionData.offset, metadata), @@ -786,7 +781,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()) } } @@ -832,12 +827,12 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val uncreatableTopicMetadata = unauthorizedForCreateTopics.map(topic => + val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) // do not disclose the existence of unauthorized topics - val unauthorizedTopicMetadata = + val unauthorizedForDescribeTopicMetadata = // In case of all topics, don't include unauthorized topics if ((requestVersion == 0 && (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)) || (metadataRequest.isAllTopics)) Set.empty[MetadataResponse.TopicMetadata] @@ -854,7 +849,7 @@ class KafkaApis(val requestChannel: RequestChannel, else getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints) - val completeTopicMetadata = topicMetadata ++ uncreatableTopicMetadata ++ unauthorizedTopicMetadata + val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata val brokers = metadataCache.getAliveBrokers @@ -1217,7 +1212,7 @@ class KafkaApis(val requestChannel: RequestChannel, }.toMap sendResponseCallback(results) } else { - // If no authorized topics return immediatly + // If no authorized topics return immediately if (authorizedTopics.isEmpty) sendResponseCallback(Map()) else { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 66bb46a01dcc2..0b2c2d09efbb3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -15,6 +15,7 @@ package kafka.api import java.nio.ByteBuffer import java.util import java.util.concurrent.ExecutionException +import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} import kafka.common @@ -22,9 +23,9 @@ import kafka.common.TopicAndPartition import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ @@ -44,14 +45,13 @@ import scala.util.{Failure, Success} import org.apache.kafka.common.KafkaException - - class AuthorizerIntegrationTest extends BaseRequestTest { override def numBrokers: Int = 1 val brokerId: Integer = 0 val topic = "topic" + val topicPattern = "topic.*" val createTopic = "topic-new" val deleteTopic = "topic-delete" val part = 0 @@ -411,7 +411,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } } @@ -429,7 +429,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { Assert.fail("should have thrown exception") } catch { case e: TopicAuthorizationException => - assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } } @@ -445,6 +445,83 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) } + @Test + def testPatternSubscriptionWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + this.consumers.head.poll(50) + assertTrue(this.consumers.head.subscription().isEmpty()) + } + + @Test + def testPatternSubscriptionWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + + //create a unmatched topic + val unmatchedTopic = "unmatched" + TestUtils.createTopic(zkUtils, unmatchedTopic, 1, 1, this.servers) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic)) + sendRecords(1, new TopicPartition(unmatchedTopic, part)) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + val consumer = consumers.head + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + + // set the subscription pattern to an internal topic that the consumer has no read permission for, but since + // `exclude.internal.topics` is true by default, the subscription should be empty and no authorization exception + // should be thrown + consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener) + assertTrue(consumer.poll(50).isEmpty) + } + + @Test + def testPatternSubscriptionMatchingInternalTopicWithNoPermission() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + } catch { + case e: TestTimeoutException => //expected + } finally consumer.close() + } + + @Test + def testPatternSubscriptionNotMatchingInternalTopic() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + } finally consumer.close() +} + @Test def testCreatePermissionNeededToReadFromNonExistentTopic() { val newTopic = "newTopic" @@ -459,7 +536,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { Assert.fail("should have thrown exception") } catch { case e: TopicAuthorizationException => - assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()); + assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) @@ -545,7 +622,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testListOffsetsWithNoTopicAccess() { - val partitionInfos = this.consumers.head.partitionsFor(topic); + val partitionInfos = this.consumers.head.partitionsFor(topic) assertNull(partitionInfos) } From cf464fa76c4ff39a780f191f9a63362ff1959c99 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Wed, 21 Sep 2016 10:34:02 +0100 Subject: [PATCH 4/4] KAFKA-3396 : Unauthorized topics are returned to the user Rebased after kip-79 changes. Fixing leak of topic for LIST_OFFSETS when unauthorized. Added tests. --- .../main/scala/kafka/server/KafkaApis.scala | 6 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 36 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f88c0df3c0b77..bd40da0a9d81b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -554,7 +554,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => - new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava) + new PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava) ) val responseMap = authorizedRequestInfo.map({case (topicPartition, partitionData) => @@ -605,7 +605,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { - new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, + new PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) }) @@ -838,7 +838,7 @@ class KafkaApis(val requestChannel: RequestChannel, Set.empty[MetadataResponse.TopicMetadata] else unauthorizedForDescribeTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())) // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 0b2c2d09efbb3..70801e4c0e641 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -44,6 +44,8 @@ import scala.concurrent.duration._ import scala.util.{Failure, Success} import org.apache.kafka.common.KafkaException +import java.util.HashMap +import kafka.admin.AdminUtils class AuthorizerIntegrationTest extends BaseRequestTest { @@ -58,6 +60,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val correlationId = 0 val clientId = "client-Id" val tp = new TopicPartition(topic, part) + val topicAndPartition = new TopicAndPartition(topic, part) val group = "my-group" val topicResource = new Resource(Topic, topic) @@ -285,6 +288,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + /* + * checking that whether the topic exists or not, when unauthorized, FETCH and PRODUCE do not leak the topic name + */ + @Test + def testAuthorizationProduceFetchDoNotLeakTopicName() { + AdminUtils.deleteTopic(zkUtils, topic) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) + + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.PRODUCE -> createProduceRequest, + ApiKeys.FETCH -> createFetchRequest + ) + + for ((key, request) <- requestKeyToRequest) { + removeAllAcls + val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet + sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false) + } + } + @Test def testProduceWithNoTopicAccess() { try { @@ -627,7 +650,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testListOfsetsWithTopicDescribe() { + def testListOffsetsWithTopicDescribe() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.partitionsFor(topic) } @@ -674,9 +697,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response) val possibleErrorCodes = resources.flatMap { resourceType => - if(resourceType == Topic) - // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names - Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + if (resourceType == Topic) + if (apiKey == ApiKeys.PRODUCE || apiKey == ApiKeys.FETCH ) { + //Only allowing TOPIC_AUTHORIZATION_FAILED as an error code + Seq(resourceType.errorCode) + } else { + // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION/UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names + Seq(Errors.INVALID_TOPIC_EXCEPTION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + } else Seq(resourceType.errorCode) }