Skip to content

KAFKA-3396 : Unauthorized topics are returned to the user#1428

Closed
edoardocomar wants to merge 4 commits intoapache:trunkfrom
edoardocomar:KAFKA-3396
Closed

KAFKA-3396 : Unauthorized topics are returned to the user#1428
edoardocomar wants to merge 4 commits intoapache:trunkfrom
edoardocomar:KAFKA-3396

Conversation

@edoardocomar
Copy link
Contributor

Modified KafkaApis to return Errors.UNKNOWN_TOPIC_OR_PARTITION if
principal has no Describe access to topic

Unit tests expanded

Some paths cause the client to block due to bug
https://issues.apache.org/jira/browse/KAFKA-3727?filter=-2
tests work around this by executing in separate thread

@edoardocomar
Copy link
Contributor Author

Hi this work has been done with @mimaison.

Please note that we had issues with the tests because of
https://issues.apache.org/jira/browse/KAFKA-3727
"Consumer.poll() stuck in loop on non-existent topic manually assigned"
and
https://issues.apache.org/jira/browse/KAFKA-3728

@edoardocomar edoardocomar force-pushed the KAFKA-3396 branch 2 times, most recently from b188a46 to 0b12cb4 Compare May 26, 2016 19:51
@ijuma
Copy link
Member

ijuma commented Jun 4, 2016

Can you please rebase this now that #1425 has been merged?

@edoardocomar
Copy link
Contributor Author

Thanks @ijuma, I did;
some tests still look ugly because of https://issues.apache.org/jira/browse/KAFKA-3727

@ijuma
Copy link
Member

ijuma commented Jun 6, 2016

@hachikuji Thoughts on how to handle this? It seems to me that we can't really fix this without fixing KAFKA-3727.

@edoardocomar
Copy link
Contributor Author

@ijuma I'd be happy to work on KAFKA-3727, but I think we need a bit of discussion on what the expected behavior should be. My comments are in that Jira issue https://issues.apache.org/jira/browse/KAFKA-3727

@hachikuji
Copy link
Contributor

@ijuma Seems we need to decide how the client should handle non-existent topics both before and after KIP-4. I can see a case for both retrying and raising an exception. I hate to suggest it, but perhaps the behavior should be configurable?

@mimaison
Copy link
Member

mimaison commented Jun 8, 2016

@hachikuji I agree we can probably find use cases that justify both behaviours. What surprised us mostly was the difference between assign and subscribe.

Also for the looping behaviour, it would be nice to be able to control it with a max timeout or retries, as having to wakeup() from another thread is not very nice. In the end it sounds like a lot of new options so it might not be ideal.

@hachikuji
Copy link
Contributor

@mimaison Can you clarify what surprised about the difference between assign and subscribe?

@edoardocomar
Copy link
Contributor Author

@hachikuji as we wrote in https://issues.apache.org/jira/browse/KAFKA-3727

The behavior of a consumer on poll() for a non-existing topic is surprisingly different/inconsistent
between a consumer that subscribed to the topic and one that had the topic-partition manually assigned.
The "subscribed" consumer will return an empty collection
The "assigned" consumer will loop forever - this feels a bug to me.

@hachikuji
Copy link
Contributor

@edoardocomar Ah, I missed that. I agree the behavior is a little surprising, but it makes sense when I think of the implementation. We currently block setting the initial position of each partition. When using subscribe(), it's not possible to be assigned a non-existing partition so this is never a problem. But obviously a manually assigned partition for a non-existent topic is a non-existing partition, and we'll block setting the position. So to fix the inconsistency, we'll have to think through the implications of removing that blocking.

@edoardocomar
Copy link
Contributor Author

@hachikuji @ijuma why should a missing topic or partition be a retriable exception
I am thinking at least of the case where auto-create is turned off.

@hachikuji
Copy link
Contributor

hachikuji commented Jun 14, 2016

@edoardocomar Sorry for the slow response. The consumer has traditionally supported the ability to be started before any producers and that means retrying on unknown topics. Even if we had some way to inspect whether autocreate has been disabled, we probably still have to support this for compatibility. I'm also thinking about the impact of the CreateTopic API in KIP-4 (which is under discussion right now). The most likely eventual outcome is that autocreation will be taken out of the broker and pushed into the producer, so it seems that the picture doesn't really change for the consumer, but we should consider whether this outcome is ideal.

One option for the problem at hand is to keep the current behavior (i.e. do nothing). Users can use the partitionsFor() API to verify the existence of any needed topics and raise an exception themselves if they expect it to exist ahead of time. Alternatively, we would probably need a separate configuration that lets the consumer raise an exception instead of retrying. This would call for a new KIP. Topic deletion may be an interesting case to think about as well since the partitionsFor workaround may not help (I'm not too sure).

All that said, I don't think we need to block this issue on a resolution to that problem. It doesn't seem like a big difference to me whether we retry indefinitely because the user is not authorized to the topic or because we are waiting for a topic that may never be created. In either case, the user ultimately has to monitor the progress of the application and take steps when it is failing or falling behind.

@ijuma
Copy link
Member

ijuma commented Jun 14, 2016

My concern is that configuring security is challenging enough as it is. Configuring authorization incorrectly is very common and harder to diagnose than a topic that hasn't been created (even the latter sometimes causes people to be confused and waste time).

Replacing an exception with indefinite blocking is making the user experience during development even worse than it is at the moment (which is already suboptimal in a number of situations). Ideally, we would be moving in the other direction.

If we decide to do this, then at the very least, we need to think about how users will diagnose the problem and update the authorization documentation with the information.

@hachikuji
Copy link
Contributor

@ijuma At the very least, we can change the consumer/producer log messages to warn the user that the topic might not exist or they might not have access to it. I agree this is less than ideal, but I don't see an alternative other than what I suggested above (changing how the client handles unknown topics). That is, unless we think this patch is not worth having at all. I was a little torn on this initially, but it does seem better to avoid leaking topic information.

@ijuma
Copy link
Member

ijuma commented Jun 14, 2016

@hachikuji The log warning you have suggested seems like our best bet at the moment.

@edoardocomar
Copy link
Contributor Author

Hi @ijuma @hachikuji
we have made a PR #1535
to address the non existent topic or partition issue https://issues.apache.org/jira/browse/KAFKA-3727

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to handle the all topics case specially:

 val topics =
       // Handle old metadata request logic. Version 0 has no way to specify "no topics".
       if (requestVersion == 0) {
         if (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)
           metadataCache.getAllTopics()
         else
           metadataRequest.topics.asScala.toSet
       } else {
         if (metadataRequest.isAllTopics)
           metadataCache.getAllTopics()
         else
           metadataRequest.topics.asScala.toSet
       }

For that case, we want to filter the topics that the user doesn't have DESCRIBE access to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this patch, but the authorize function seems to already handle this, so could we replace this and the if below with a call to authorize(request.session, Create, Resource.ClusterResource)?

Copy link
Contributor

@hachikuji hachikuji Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix this please? Is there any reason not to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done thanks

@mimaison
Copy link
Member

Thanks for all the feedback ! I've updated the PR

@mimaison
Copy link
Member

mimaison commented Sep 8, 2016

@ijuma @hachikuji I believe we've addressed all the comments, can you have another look ?

@ijuma
Copy link
Member

ijuma commented Sep 8, 2016

Sorry for the delay @mimaison and thanks for the reminder. Will try to look at this soon.

@ijuma
Copy link
Member

ijuma commented Sep 20, 2016

@mimaison, with the feature freeze in place, this is one of the main bugs we want to get in for 0.10.1.0. Could you please merge trunk into this branch? KIP-79 made some changes to the list offsets request so it may need some updates. Sorry about that. :(

@edoardocomar
Copy link
Contributor Author

@ijuma @hachikuji - in hindsight, shouldn't we also handle an offsetRequest not to return TOPIC_AUTHORIZATION_FAILED where Describe auth is checked. e.g.

private def handleOffsetRequestV0(request : RequestChannel.Request) ... {
...
    val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition {
      case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
    }

    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
      new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
    )

it seems we had missed it

edoardocomar and others added 4 commits September 21, 2016 13:24
Modified KafkaApis to return Errors.UNKNOWN_TOPIC_OR_PARTITION if
principal has no Describe access to topic

Unit tests expanded

Some paths cause the client to block due to bug
https://issues.apache.org/jira/browse/KAFKA-3727?filter=-2
tests work around this by executing in separate thread
- Added tests from 44ad3ec
- Small refactorings
Rebased after kip-79 changes.
Fixing leak of topic for LIST_OFFSETS when unauthorized.
Added tests.
@edoardocomar
Copy link
Contributor Author

edoardocomar commented Sep 21, 2016

@ijuma @hachikuji the Jenkins build failure
https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/5818/
looks unrelated to our changes. Can you please review ? thanks

I'm forcing a new rebuild by rebasing on trunk rather than 0.10.1

@ijuma
Copy link
Member

ijuma commented Sep 21, 2016

@edoardocomar, we should, yes. I think that was "missed" because that code was moved as part of KIP-79, but I haven't checked in detail yet. Will check the PR today or tomorrow. Thanks!

@edoardocomar
Copy link
Contributor Author

@ijuma Jenkins has failed with a timeout, the tests locally pass for me can you please kick Jenkins again ? thanks

@edoardocomar
Copy link
Contributor Author

@ijuma @hachikuji can you please take a look at the revised PR? thanks

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! I left some comments (mostly nitpicks). I do think it's worthwhile treating UNKNOWN_TOPIC_OR_PARTITION explicitly in OffsetFetch and OffsetCommit response handlers on the client though.

// filter non-existent topics
val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
!metadataCache.contains(topicPartition.topic)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded newline?

!metadataCache.contains(topicPartition.topic)

val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
case (topicPartition, _) => metadataCache.contains(topicPartition.topic) && authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
Copy link
Contributor

@hachikuji hachikuji Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not much difference here, but it seems more natural to check authorization first.

Copy link
Member

@ijuma ijuma Sep 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataCache has two methods that do the same thing: contains and hasTopicMetadata. We should remove one of them to avoid confusion. Also, we call hasTopicMetadata in the block for if (header.apiVersion == 0), which seems unnecessary as we have already done it here (it doesn't prevent potential race conditions either although it does narrow the window a little).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
java.util.Collections.emptyList()))

// do not disclose the existence of unauthorized topics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be obvious, but we may as well emphasize unauthorized for Describe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

// do not disclose the existence of unauthorized topics
val unauthorizedForDescribeTopicMetadata =
// In case of all topics, don't include unauthorized topics
if ((requestVersion == 0 && (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)) || (metadataRequest.isAllTopics))
Copy link
Contributor

@hachikuji hachikuji Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Doesn't seem like the parenthesis around metadataRequest.isAllTopics are necessary.

if (resourceType == Topic)
if (apiKey == ApiKeys.PRODUCE || apiKey == ApiKeys.FETCH ) {
//Only allowing TOPIC_AUTHORIZATION_FAILED as an error code
Seq(resourceType.errorCode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be clearer to reference the TOPIC_AUTHORIZATION error code directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not right. Both fetch and produce should also return UNKNOWN_TOPIC_OR_PARTITION if given a topic that the user doesn't have DESCRIBE permission for, right? It seems like neither checks if the user has DESCRIBE permission, which seems to defeat the point of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed thanks

for (record <- consumer.poll(50).asScala) {
records.add(record)

val future = Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little weird here to bound this loop by the number of iterations. Could we refactor it to use waitUntil? For example:

    TestUtils.waitUntilTrue(() => {
        for (record <- consumer.poll(50).asScala) 
          records.add(record)
        records.size >= numRecords
    })

Either that or use the timeout directly in the loop instead of maxIters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks


/**
@Test
def testNoProduceAclWithDescribeAcl {
Copy link
Contributor

@hachikuji hachikuji Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The first "Acl" seems out of place. Should this be testNoProduceWithDescribeAcl? A few more like this below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

}
}

private def noConsumeAclWithDescribeAclProlog {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What is "Prolog"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a Setup step. renamed

val producerProps = props.getOrElse(new Properties)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to set it to a low value in EndToEndAuthorizationTest
to allow some test to finish quickly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change causes the parameter of createNewProducer to be ignored affecting a bunch of other tests though. It seems like the easiest thing is to add a couple of protected methods in IntegrationTestHarness: createProducer and createConsumer. You can then override that in EndToEndAuthorizationTest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, done

for (record <- consumer.poll(50).asScala) {
records.add(record)

val future = Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Seems like we could make this a more conventional timeout loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@mimaison
Copy link
Member

Thanks for the feedback, we'll update the PR on Monday.

Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience. I left a few more comments. The most important ones are related to how we don't seem to check DESCRIBE permissions for fetch and produce.

val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
!metadataCache.contains(topicPartition.topic)

val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
Copy link
Member

@ijuma ijuma Sep 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first variable should be existingAndAuthorizedForDescribeTopics.

// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
var combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a val.

!metadataCache.contains(topicPartition.topic)

val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
case (topicPartition, _) => metadataCache.contains(topicPartition.topic) && authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
Copy link
Member

@ijuma ijuma Sep 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataCache has two methods that do the same thing: contains and hasTopicMetadata. We should remove one of them to avoid confusion. Also, we call hasTopicMetadata in the block for if (header.apiVersion == 0), which seems unnecessary as we have already done it here (it doesn't prevent potential race conditions either although it does narrow the window a little).


addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
this.consumers.head.poll(50)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for () in subscription and isEmpty.

consumeRecords(this.consumers.head)
}

@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should extend the pattern subscription tests to also include the case where the user has DESCRIBE, but not READ permission. In those cases, we should still get TopicAuthorizationException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done thanks


// set the subscription pattern to an internal topic that the consumer has no read permission for, but since
// `exclude.internal.topics` is true by default, the subscription should be empty and no authorization exception
// should be thrown
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the implementation in trunk, this comment isn't really helpful anymore. I would add an additional case where the consumer has DESCRIBE, but no READ permission for the internal topic. And then move this comment there (and tweak it slightly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test added

try {
consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
consumeRecords(consumer)
} catch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we get this instead of an empty subscription?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, fixed test

if (resourceType == Topic)
if (apiKey == ApiKeys.PRODUCE || apiKey == ApiKeys.FETCH ) {
//Only allowing TOPIC_AUTHORIZATION_FAILED as an error code
Seq(resourceType.errorCode)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not right. Both fetch and produce should also return UNKNOWN_TOPIC_OR_PARTITION if given a topic that the user doesn't have DESCRIBE permission for, right? It seems like neither checks if the user has DESCRIBE permission, which seems to defeat the point of this PR.

@edoardocomar
Copy link
Contributor Author

@ijuma We have handled the FETCH and PRODUCE commands

@edoardocomar
Copy link
Contributor Author

As I have squashed some commits, I am opening a new PR

asfgit pushed a commit that referenced this pull request Oct 1, 2016
Reopening of #1428

Author: Edoardo Comar <ecomar@uk.ibm.com>
Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #1908 from edoardocomar/KAFKA-3396
asfgit pushed a commit that referenced this pull request Oct 1, 2016
Reopening of #1428

Author: Edoardo Comar <ecomar@uk.ibm.com>
Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #1908 from edoardocomar/KAFKA-3396

(cherry picked from commit 8124f6e)
Signed-off-by: Jason Gustafson <jason@confluent.io>
efeg added a commit to efeg/kafka that referenced this pull request May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments