Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.kafka.common.resource.ResourceType;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -56,29 +58,29 @@ public class AclBindingTest {
new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));

@Test
public void testMatching() throws Exception {
assertTrue(ACL1.equals(ACL1));
public void testMatching() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took the opportunity to fix the compiler warnings in this test file, as the PR was small...

assertEquals(ACL1, ACL1);
final AclBinding acl1Copy = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "mytopic", ResourceNameType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW));
assertTrue(ACL1.equals(acl1Copy));
assertTrue(acl1Copy.equals(ACL1));
assertTrue(ACL2.equals(ACL2));
assertFalse(ACL1.equals(ACL2));
assertFalse(ACL2.equals(ACL1));
assertEquals(ACL1, acl1Copy);
assertEquals(acl1Copy, ACL1);
assertEquals(ACL2, ACL2);
assertNotEquals(ACL1, ACL2);
assertNotEquals(ACL2, ACL1);
assertTrue(AclBindingFilter.ANY.matches(ACL1));
assertFalse(AclBindingFilter.ANY.equals(ACL1));
assertNotEquals(AclBindingFilter.ANY, ACL1);
assertTrue(AclBindingFilter.ANY.matches(ACL2));
assertFalse(AclBindingFilter.ANY.equals(ACL2));
assertNotEquals(AclBindingFilter.ANY, ACL2);
assertTrue(AclBindingFilter.ANY.matches(ACL3));
assertFalse(AclBindingFilter.ANY.equals(ACL3));
assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY));
assertNotEquals(AclBindingFilter.ANY, ACL3);
assertEquals(AclBindingFilter.ANY, AclBindingFilter.ANY);
assertTrue(ANY_ANONYMOUS.matches(ACL1));
assertFalse(ANY_ANONYMOUS.equals(ACL1));
assertNotEquals(ANY_ANONYMOUS, ACL1);
assertFalse(ANY_ANONYMOUS.matches(ACL2));
assertFalse(ANY_ANONYMOUS.equals(ACL2));
assertNotEquals(ANY_ANONYMOUS, ACL2);
assertTrue(ANY_ANONYMOUS.matches(ACL3));
assertFalse(ANY_ANONYMOUS.equals(ACL3));
assertNotEquals(ANY_ANONYMOUS, ACL3);
assertFalse(ANY_DENY.matches(ACL1));
assertFalse(ANY_DENY.matches(ACL2));
assertTrue(ANY_DENY.matches(ACL3));
Expand All @@ -87,12 +89,12 @@ public void testMatching() throws Exception {
assertFalse(ANY_MYTOPIC.matches(ACL3));
assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL));
assertTrue(ANY_DENY.matches(UNKNOWN_ACL));
assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL));
assertEquals(UNKNOWN_ACL, UNKNOWN_ACL);
assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL));
}

@Test
public void testUnknowns() throws Exception {
public void testUnknowns() {
assertFalse(ACL1.isUnknown());
assertFalse(ACL2.isUnknown());
assertFalse(ACL3.isUnknown());
Expand All @@ -103,12 +105,32 @@ public void testUnknowns() throws Exception {
}

@Test
public void testMatchesAtMostOne() throws Exception {
public void testMatchesAtMostOne() {
assertNull(ACL1.toFilter().findIndefiniteField());
assertNull(ACL2.toFilter().findIndefiniteField());
assertNull(ACL3.toFilter().findIndefiniteField());
assertFalse(ANY_ANONYMOUS.matchesAtMostOne());
assertFalse(ANY_DENY.matchesAtMostOne());
assertFalse(ANY_MYTOPIC.matchesAtMostOne());
}

@Test
public void shouldNotThrowOnUnknownResourceNameType() {
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.UNKNOWN), ACL1.entry());
}

@Test
public void shouldNotThrowOnUnknownResourceType() {
new AclBinding(new ResourcePattern(ResourceType.UNKNOWN, "foo", ResourceNameType.LITERAL), ACL1.entry());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnAnyResourceNameType() {
new AclBinding(new ResourcePattern(ResourceType.TOPIC, "foo", ResourceNameType.ANY), ACL1.entry());
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnAnyResourceType() {
new AclBinding(new ResourcePattern(ResourceType.ANY, "foo", ResourceNameType.LITERAL), ACL1.entry());
}
}
22 changes: 11 additions & 11 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import kafka.utils._
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType => JResourceNameType, ResourceType => JResourceType, Resource => JResource}
import org.apache.kafka.common.resource.{ResourcePatternFilter, ResourceNameType, ResourceType => JResourceType, Resource => JResource}

import scala.collection.JavaConverters._

object AclCommand extends Logging {

val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, JResourceNameType.LITERAL)
val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, ResourceNameType.LITERAL)

private val Newline = scala.util.Properties.lineSeparator

Expand Down Expand Up @@ -87,13 +87,13 @@ object AclCommand extends Logging {
}

private def addAcl(opts: AclCommandOptions) {
if (opts.options.valueOf(opts.resourceNameType) == JResourceNameType.ANY)
if (opts.options.valueOf(opts.resourceNameType) == ResourceNameType.ANY)
CommandLineUtils.printUsageAndDie(opts.parser, "A '--resource-name-type' value of 'Any' is not valid when adding acls.")

withAuthorizer(opts) { authorizer =>
val resourceToAcl = getResourceFilterToAcls(opts).map {
case (filter, acls) =>
Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), ResourceNameType.fromJava(filter.nameType())) -> acls
Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.nameType()) -> acls
}

if (resourceToAcl.values.exists(_.isEmpty))
Expand Down Expand Up @@ -262,13 +262,13 @@ object AclCommand extends Logging {
}

private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
val resourceNameType: JResourceNameType = opts.options.valueOf(opts.resourceNameType)
val resourceNameType: ResourceNameType = opts.options.valueOf(opts.resourceNameType)

var resourceFilters = Set.empty[ResourcePatternFilter]
if (opts.options.has(opts.topicOpt))
opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, resourceNameType))

if (resourceNameType == JResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
if (resourceNameType == ResourceNameType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
resourceFilters += ClusterResourceFilter

if (opts.options.has(opts.groupOpt))
Expand Down Expand Up @@ -349,7 +349,7 @@ object AclCommand extends Logging {
.withRequiredArg()
.ofType(classOf[String])
.withValuesConvertedBy(new ResourceNameTypeConverter())
.defaultsTo(JResourceNameType.LITERAL)
.defaultsTo(ResourceNameType.LITERAL)

val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
Expand Down Expand Up @@ -429,17 +429,17 @@ object AclCommand extends Logging {

}

class ResourceNameTypeConverter extends EnumConverter[JResourceNameType](classOf[JResourceNameType]) {
class ResourceNameTypeConverter extends EnumConverter[ResourceNameType](classOf[ResourceNameType]) {

override def convert(value: String): JResourceNameType = {
override def convert(value: String): ResourceNameType = {
val nameType = super.convert(value)
if (nameType.isUnknown)
throw new ValueConversionException("Unknown resourceNameType: " + value)

nameType
}

override def valuePattern: String = JResourceNameType.values
.filter(_ != JResourceNameType.UNKNOWN)
override def valuePattern: String = ResourceNameType.values
.filter(_ != ResourceNameType.UNKNOWN)
.mkString("|")
}
7 changes: 3 additions & 4 deletions core/src/main/scala/kafka/security/SecurityUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.security

import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceNameType, ResourceType}
import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
Expand All @@ -32,11 +32,10 @@ object SecurityUtils {
def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
(for {
resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType))
resourceNameType <- Try(ResourceNameType.fromJava(filter.patternFilter.nameType))
principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
resource = Resource(resourceType, filter.patternFilter.name, resourceNameType)
resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.nameType)
acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
} yield (resource, acl)) match {
case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
Expand All @@ -45,7 +44,7 @@ object SecurityUtils {
}

def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType.toJava)
val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.nameType)
val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString,
acl.operation.toJava, acl.permissionType.toJava)
new AclBinding(resourcePattern, entry)
Expand Down
34 changes: 21 additions & 13 deletions core/src/main/scala/kafka/security/auth/Authorizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
*
* // The following will add ACLs to the special literal topic resource path '*', which affects all topics:
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
*
* // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param acls set of acls to add to existing acls
* @param resource the resource path to which these acls should be attached
* @param resource the resource path to which these acls should be attached.
* the supplied resource will have a specific resource name type,
* i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
*/
def addAcls(acls: Set[Acl], resource: Resource): Unit

Expand All @@ -67,17 +69,19 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Literal))
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
*
* // The following will remove ACLs from the special literal topic resource path '*', which affects all topics:
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", Literal))
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
*
* // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", Prefixed))
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param acls set of acls to be removed.
* @param resource resource path from which the acls should be removed.
* the supplied resource will have a specific resource name type,
* i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
* @return true if some acl got removed, false if no acl was removed.
*/
def removeAcls(acls: Set[Acl], resource: Resource): Boolean
Expand All @@ -87,16 +91,18 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", Literal))
* authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
*
* // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics:
* authorizer.removeAcls(Resource(Topic, "*", Literal))
* authorizer.removeAcls(Resource(Topic, "*", LITERAL))
*
* // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
* authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param resource the resource path from which these acls should be removed.
* the supplied resource will have a specific resource name type,
* i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
* @return
*/
def removeAcls(resource: Resource): Boolean
Expand All @@ -106,16 +112,18 @@ trait Authorizer extends Configurable {
*
* {code}
* // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", Literal))
* authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
*
* // The following will get all ACLs from the special literal topic resource path '*', which affects all topics:
* authorizer.removeAcls(Resource(Topic, "*", Literal))
* authorizer.removeAcls(Resource(Topic, "*", LITERAL))
*
* // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", Prefixed))
* authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param resource the resource path to which the acls belong.
* the supplied resource will have a specific resource name type,
* i.e. the resource name type will not be ``ResourceNameType.ANY`` or ``ResourceNameType.UNKNOWN``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? I thought the current implementation allows you to pass in ResourceNameType.ANY to retrieve ACLs specified on either LITERAL or PREFIX?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is true. SimpleAclAuthorizer expects a concrete Resource, not something which could match multiple resources. The code is here:

  override def removeAcls(resource: Resource): Boolean = {
    inWriteLock(lock) {
      val result = zkClient.deleteResource(resource)
      updateCache(resource, VersionedAcls(Set(), 0))
      updateAclChangedFlag(resource)
      result
    }
  }

The messiness here is that this constraint is not expressed in the type system. We use the same enum to describe the name types a concrete resource can have vs. a resource filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. The enum has ANY and UNKNOWN values, but the code won't allow them to be passed to the authorizer. I documented so that people implementing their own would know they aren't going to get these values. It's part of the downside to standardizing on the Java ResourceNameType.

* @return empty set if no acls are found, otherwise the acls for the resource.
*/
def getAcls(resource: Resource): Set[Acl]
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/kafka/security/auth/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package kafka.security.auth

import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.{ResourceNameType, ResourcePattern}

object Resource {
val Separator = ":"
val ClusterResourceName = "kafka-cluster"
val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName, Literal)
val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, ResourceNameType.LITERAL)
val ProducerIdResourceName = "producer-id"
val WildCardResource = "*"

Expand All @@ -34,7 +34,7 @@ object Resource {
}
case _ =>
str.split(Separator, 2) match {
case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, Literal)
case Array(resourceType, name, _*) => new Resource(ResourceType.fromString(resourceType), name, ResourceNameType.LITERAL)
case _ => throw new IllegalArgumentException("expected a string in format ResourceType:ResourceName but got " + str)
}
}
Expand All @@ -50,6 +50,12 @@ object Resource {
*/
case class Resource(resourceType: ResourceType, name: String, nameType: ResourceNameType) {

if (nameType == ResourceNameType.ANY)
throw new IllegalArgumentException("nameType must not be ANY")

if (nameType == ResourceNameType.UNKNOWN)
throw new IllegalArgumentException("nameType must not be UNKNOWN")

/**
* Create an instance of this class with the provided parameters.
* Resource name type would default to ResourceNameType.LITERAL.
Expand All @@ -60,11 +66,11 @@ case class Resource(resourceType: ResourceType, name: String, nameType: Resource
*/
@deprecated("Use Resource(ResourceType, String, ResourceNameType")
def this(resourceType: ResourceType, name: String) {
this(resourceType, name, Literal)
this(resourceType, name, ResourceNameType.LITERAL)
}

def toPattern: ResourcePattern = {
new ResourcePattern(resourceType.toJava, name, nameType.toJava)
new ResourcePattern(resourceType.toJava, name, nameType)
}

override def toString: String = {
Expand Down
Loading