Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,10 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu
resetGeneration();
future.raise(new CommitFailedException());
return;
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
return;
} else {
log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
Expand Down Expand Up @@ -731,6 +735,8 @@ public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartitio
// re-discover the coordinator and retry
coordinatorDead();
future.raise(error);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.utils.ZkUtils._
import java.util.Random
import java.util.Properties
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException}
import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException}
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.MetadataResponse

Expand Down Expand Up @@ -325,7 +325,7 @@ object AdminUtils extends Logging with AdminUtilities {
case e2: Throwable => throw new AdminOperationException(e2)
}
} else {
throw new InvalidTopicException("topic %s to delete does not exist".format(topic))
throw new UnknownTopicOrPartitionException("topic %s to delete does not exist".format(topic))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
//check if there is any Deny acl match that would disallow this operation.
val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)

//if principal is allowed to read or write we allow describe by default, the reverse does not apply to Deny.
//if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny.
val ops = if (Describe == operation)
Set[Operation](operation, Read, Write)
Set[Operation](operation, Read, Write, Delete)
else
Set[Operation](operation)

Expand Down
132 changes: 77 additions & 55 deletions core/src/main/scala/kafka/server/KafkaApis.scala

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}

def hasTopicMetadata(topic: String): Boolean = {
inReadLock(partitionMetadataLock) {
cache.contains(topic)
}
}

def getAllTopics(): Set[String] = {
inReadLock(partitionMetadataLock) {
cache.keySet.toSet
Expand Down
Loading