Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,38 +234,37 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseBody = new OffsetCommitResponse(results.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
// filter non-existent topics
val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
!metadataCache.contains(topicPartition.topic)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded newline?

val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition {
Copy link
Member

@ijuma ijuma Sep 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first variable should be existingAndAuthorizedForDescribeTopics.

case (topicPartition, _) => metadataCache.contains(topicPartition.topic) && authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
Copy link
Contributor

@hachikuji hachikuji Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not much difference here, but it seems more natural to check authorization first.

Copy link
Member

@ijuma ijuma Sep 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataCache has two methods that do the same thing: contains and hasTopicMetadata. We should remove one of them to avoid confusion. Also, we call hasTopicMetadata in the block for if (header.apiVersion == 0), which seems unnecessary as we have already done it here (it doesn't prevent potential race conditions either although it does narrow the window a little).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys

val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
val (authorizedTopics, unauthorizedForReadTopics) = existingOrAuthorizedForDescribeTopics.partition {
case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic))
}

// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
var combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a val.

unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++
nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))

mergedCommitStatus.foreach { case (topicPartition, errorCode) =>
combinedCommitStatus.foreach { case (topicPartition, errorCode) =>
if (errorCode != Errors.NONE.code) {
debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also log information about the unauthorized for describe with debug enabled, probably.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the code so that these errors are logged as per my comment above. I thought that we should indicate in the server log that the issue is lack of describe instead of a missing topic. Something as simple as some extra text at the end: "as the user u does not have DESCRIBE permission" (something like that anyway).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the previous code did not include non existing topics from the server log. Not sure if that was intentional. @hachikuji, do you know?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but it seems unintentional. I can't think of a reason why we wouldn't want to log the errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check: are you going to address my comment with regards to adding additional text for the unauthorized for describe case when logging? It would be helpful to have that information on the server-side logs when debugging issues. It is possible to correlate with the authorizer logs, but that's more work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logged

}
}
val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))

val responseHeader = new ResponseHeader(header.correlationId)
val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}

if (authorizedRequestInfo.isEmpty)
if (authorizedTopics.isEmpty)
sendResponseCallback(Map.empty)
else if (header.apiVersion == 0) {
// for version 0 always store offsets to ZK
val responseInfo = authorizedRequestInfo.map {
val responseInfo = authorizedTopics.map {
case (topicPartition, partitionData) =>
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
try {
Expand Down Expand Up @@ -302,7 +301,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// - If v2 we use the default expiration timestamp
val currentTimestamp = SystemTime.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedRequestInfo.mapValues { partitionData =>
val partitionData = authorizedTopics.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
Expand Down Expand Up @@ -555,7 +554,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
new PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava)
)

val responseMap = authorizedRequestInfo.map({case (topicPartition, partitionData) =>
Expand Down Expand Up @@ -606,7 +605,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code,
new PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET)
})
Expand Down Expand Up @@ -782,7 +781,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} else if (config.autoCreateTopicsEnable) {
createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
} else {
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic),
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false,
java.util.Collections.emptyList())
}
}
Expand Down Expand Up @@ -811,25 +810,36 @@ class KafkaApis(val requestChannel: RequestChannel,
metadataRequest.topics.asScala.toSet
}

var (authorizedTopics, unauthorizedTopics) =
var (authorizedTopics, unauthorizedForDescribeTopics) =
topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic)))

var unauthorizedForCreateTopics = Set[String]()

if (authorizedTopics.nonEmpty) {
val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) {
authorizer.foreach { az =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this patch, but the authorize function seems to already handle this, so could we replace this and the if below with a call to authorize(request.session, Create, Resource.ClusterResource)?

Copy link
Contributor

@hachikuji hachikuji Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix this please? Is there any reason not to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done thanks

if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
authorizedTopics --= nonExistingTopics
unauthorizedTopics ++= nonExistingTopics
unauthorizedForCreateTopics ++= nonExistingTopics
}
}
}
}

val unauthorizedTopicMetadata = unauthorizedTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic),
val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic),
java.util.Collections.emptyList()))

// do not disclose the existence of unauthorized topics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be obvious, but we may as well emphasize unauthorized for Describe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

val unauthorizedForDescribeTopicMetadata =
// In case of all topics, don't include unauthorized topics
if ((requestVersion == 0 && (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)) || (metadataRequest.isAllTopics))
Copy link
Contributor

@hachikuji hachikuji Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Doesn't seem like the parenthesis around metadataRequest.isAllTopics are necessary.

Set.empty[MetadataResponse.TopicMetadata]
else
unauthorizedForDescribeTopics.map(topic =>
new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()))

// In version 0, we returned an error when brokers with replicas were unavailable,
// while in higher versions we simply don't include the broker in the returned broker list
val errorUnavailableEndpoints = requestVersion == 0
Expand All @@ -839,7 +849,7 @@ class KafkaApis(val requestChannel: RequestChannel,
else
getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints)

val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata
val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata

val brokers = metadataCache.getAliveBrokers

Expand Down Expand Up @@ -876,9 +886,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
}
val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap

if (header.apiVersion == 0) {
// version 0 reads offsets from ZK
Expand Down Expand Up @@ -1203,7 +1212,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}.toMap
sendResponseCallback(results)
} else {
// If no authorized topics return immediatly
// If no authorized topics return immediately
if (authorizedTopics.isEmpty)
sendResponseCallback(Map())
else {
Expand Down
Loading