Skip to content
Merged
3 changes: 2 additions & 1 deletion checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
<allow pkg="org.apache.kafka.common.protocol.types" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.resource" />
</subpackage>

<subpackage name="record">
Expand Down Expand Up @@ -146,7 +147,7 @@
</subpackage>

<subpackage name="utils">
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.ResourceFilter;
import org.apache.kafka.common.resource.ResourceType;

import java.util.Objects;

Expand All @@ -36,9 +35,7 @@ public class AclBindingFilter {
/**
* A filter which matches any ACL binding.
*/
public static final AclBindingFilter ANY = new AclBindingFilter(
new ResourceFilter(ResourceType.ANY, null),
new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
public static final AclBindingFilter ANY = new AclBindingFilter(ResourceFilter.ANY, AccessControlEntryFilter.ANY);

/**
* Create an instance of this filter with the provided parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.protocol;

import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.resource.ResourceNameType;

public class CommonFields {
public static final Field.Int32 THROTTLE_TIME_MS = new Field.Int32("throttle_time_ms",
Expand Down Expand Up @@ -45,6 +46,8 @@ public class CommonFields {
public static final Field.Int8 RESOURCE_TYPE = new Field.Int8("resource_type", "The resource type");
public static final Field.Str RESOURCE_NAME = new Field.Str("resource_name", "The resource name");
public static final Field.NullableStr RESOURCE_NAME_FILTER = new Field.NullableStr("resource_name", "The resource name filter");
public static final Field.Int8 RESOURCE_NAME_TYPE = new Field.Int8("resource_name_type", "The resource name type", ResourceNameType.LITERAL.code());
public static final Field.Int8 RESOURCE_NAME_TYPE_FILTER = new Field.Int8("resource_name_type_filter", "The resource name type filter", ResourceNameType.LITERAL.code());
public static final Field.Str PRINCIPAL = new Field.Str("principal", "The ACL principal");
public static final Field.NullableStr PRINCIPAL_FILTER = new Field.NullableStr("principal", "The ACL principal filter");
public static final Field.Str HOST = new Field.Str("host", "The ACL host");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public static class Int8 extends Field {
public Int8(String name, String docString) {
super(name, Type.INT8, docString, false, null);
}
public Int8(String name, String docString, byte defaultValue) {
super(name, Type.INT8, docString, true, defaultValue);
}
}

public static class Int32 extends Field {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourceNameType;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;
Expand All @@ -36,6 +38,7 @@
import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;

public class CreateAclsRequest extends AbstractRequest {
Expand All @@ -51,9 +54,20 @@ public class CreateAclsRequest extends AbstractRequest {
PERMISSION_TYPE))));

/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
* Version 1 adds RESOURCE_NAME_TYPE.
* Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
*
* For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
*/
private static final Schema CREATE_ACLS_REQUEST_V1 = CREATE_ACLS_REQUEST_V0;
private static final Schema CREATE_ACLS_REQUEST_V1 = new Schema(
new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
RESOURCE_TYPE,
RESOURCE_NAME,
RESOURCE_NAME_TYPE,
PRINCIPAL,
HOST,
OPERATION,
PERMISSION_TYPE))));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't figure out how to add a comment to the correct line using github's terrible user interface. But on line 110 where you have the code:

        @Override
        public CreateAclsRequest build(short version) {
            return new CreateAclsRequest(version, creations);
        }

You must add verification that the version you are using supports the ACLs you are trying to send. If someone tries to set a prefix ACL when using CREATE_ACLS_REQUEST_V0, the code must throw UnsupportedVersionException. In fact, anything besides ResourceNameType.LITERAL should trigger a UVE there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider it done.

public static Schema[] schemaVersions() {
return new Schema[]{CREATE_ACLS_REQUEST_V0, CREATE_ACLS_REQUEST_V1};
Expand Down Expand Up @@ -111,6 +125,8 @@ public String toString() {
CreateAclsRequest(short version, List<AclCreation> aclCreations) {
super(version);
this.aclCreations = aclCreations;

validate(aclCreations);
}

public CreateAclsRequest(Struct struct, short version) {
Expand Down Expand Up @@ -158,4 +174,17 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable
public static CreateAclsRequest parse(ByteBuffer buffer, short version) {
return new CreateAclsRequest(ApiKeys.CREATE_ACLS.parseRequest(version, buffer), version);
}

private void validate(List<AclCreation> aclCreations) {
if (version() == 0) {
final boolean unsupported = aclCreations.stream()
.map(AclCreation::acl)
.map(AclBinding::resource)
.map(Resource::nameType)
.anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
if (unsupported) {
throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;

import java.nio.ByteBuffer;
Expand All @@ -43,7 +43,7 @@ public class CreateAclsResponse extends AbstractResponse {
ERROR_MESSAGE))));

/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
* The version number is bumped to indicate that, on quota violation, brokers send out responses before throttling.
*/
private static final Schema CREATE_ACLS_RESPONSE_V1 = CREATE_ACLS_RESPONSE_V0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.ResourceFilter;
import org.apache.kafka.common.resource.ResourceNameType;
import org.apache.kafka.common.utils.Utils;

import java.nio.ByteBuffer;
Expand All @@ -37,6 +39,7 @@
import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE_FILTER;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;

public class DeleteAclsRequest extends AbstractRequest {
Expand All @@ -52,9 +55,20 @@ public class DeleteAclsRequest extends AbstractRequest {
PERMISSION_TYPE))));

/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
* V1 sees a new `RESOURCE_NAME_TYPE_FILTER` that controls how the filter handles different resource name types.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to preserve the comment on quota violation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

* Also, when the quota is violated, brokers will respond to a version 1 or later request before throttling.
*
* For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
*/
private static final Schema DELETE_ACLS_REQUEST_V1 = DELETE_ACLS_REQUEST_V0;
private static final Schema DELETE_ACLS_REQUEST_V1 = new Schema(
new Field(FILTERS, new ArrayOf(new Schema(
RESOURCE_TYPE,
RESOURCE_NAME_FILTER,
RESOURCE_NAME_TYPE_FILTER,
PRINCIPAL_FILTER,
HOST_FILTER,
OPERATION,
PERMISSION_TYPE))));

public static Schema[] schemaVersions() {
return new Schema[]{DELETE_ACLS_REQUEST_V0, DELETE_ACLS_REQUEST_V1};
Expand Down Expand Up @@ -84,6 +98,8 @@ public String toString() {
DeleteAclsRequest(short version, List<AclBindingFilter> filters) {
super(version);
this.filters = filters;

validate(version, filters);
}

public DeleteAclsRequest(Struct struct, short version) {
Expand Down Expand Up @@ -136,4 +152,16 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable
public static DeleteAclsRequest parse(ByteBuffer buffer, short version) {
return new DeleteAclsRequest(DELETE_ACLS.parseRequest(version, buffer), version);
}

private void validate(short version, List<AclBindingFilter> filters) {
if (version == 0) {
final boolean unsupported = filters.stream()
.map(AclBindingFilter::resourceFilter)
.map(ResourceFilter::nameType)
.anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
if (unsupported) {
throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourceNameType;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +45,7 @@
import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;

Expand All @@ -51,7 +54,7 @@ public class DeleteAclsResponse extends AbstractResponse {
private final static String FILTER_RESPONSES_KEY_NAME = "filter_responses";
private final static String MATCHING_ACLS_KEY_NAME = "matching_acls";

private static final Schema MATCHING_ACL = new Schema(
private static final Schema MATCHING_ACL_V0 = new Schema(
ERROR_CODE,
ERROR_MESSAGE,
RESOURCE_TYPE,
Expand All @@ -61,18 +64,43 @@ public class DeleteAclsResponse extends AbstractResponse {
OPERATION,
PERMISSION_TYPE);

/**
* V1 sees a new `RESOURCE_NAME_TYPE` that describes how the resource name is interpreted.
*
* For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
*/
private static final Schema MATCHING_ACL_V1 = new Schema(
ERROR_CODE,
ERROR_MESSAGE,
RESOURCE_TYPE,
RESOURCE_NAME,
RESOURCE_NAME_TYPE,
PRINCIPAL,
HOST,
OPERATION,
PERMISSION_TYPE);

private static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema(
THROTTLE_TIME_MS,
new Field(FILTER_RESPONSES_KEY_NAME,
new ArrayOf(new Schema(
ERROR_CODE,
ERROR_MESSAGE,
new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL), "The matching ACLs")))));
new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL_V0), "The matching ACLs")))));

/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
* V1 sees a new `RESOURCE_NAME_TYPE` field added to MATCHING_ACL_V1, that describes how the resource name is interpreted
* and version was bumped to indicate that, on quota violation, brokers send out responses before throttling.
*
* For more info, see {@link org.apache.kafka.common.resource.ResourceNameType}.
*/
private static final Schema DELETE_ACLS_RESPONSE_V1 = DELETE_ACLS_RESPONSE_V0;
private static final Schema DELETE_ACLS_RESPONSE_V1 = new Schema(
THROTTLE_TIME_MS,
new Field(FILTER_RESPONSES_KEY_NAME,
new ArrayOf(new Schema(
ERROR_CODE,
ERROR_MESSAGE,
new Field(MATCHING_ACLS_KEY_NAME, new ArrayOf(MATCHING_ACL_V1), "The matching ACLs")))));

public static Schema[] schemaVersions() {
return new Schema[]{DELETE_ACLS_RESPONSE_V0, DELETE_ACLS_RESPONSE_V1};
Expand Down Expand Up @@ -161,6 +189,8 @@ public DeleteAclsResponse(Struct struct) {

@Override
protected Struct toStruct(short version) {
validate(version);

Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
struct.set(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> responseStructs = new ArrayList<>();
Expand Down Expand Up @@ -211,4 +241,18 @@ public String toString() {
public boolean shouldClientThrottle(short version) {
return version >= 1;
}

private void validate(short version) {
if (version == 0) {
final boolean unsupported = responses.stream()
.flatMap(r -> r.deletions.stream())
.map(AclDeletionResult::acl)
.map(AclBinding::resource)
.map(Resource::nameType)
.anyMatch(nameType -> nameType != ResourceNameType.LITERAL);
if (unsupported) {
throw new UnsupportedVersionException("Version 0 only supports literal resource name types");
}
}
}
}
Loading