Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@
<!-- Shell -->
<suppress checks="CyclomaticComplexity"
files="(GlobComponent|MetadataNodeManager).java"/>
<suppress checks="MethodLength"
files="(MetadataNodeManager).java"/>
<suppress checks="JavaNCSS"
files="(MetadataNodeManager).java"/>

<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The feature name." },
{ "name": "FeatureLevel", "type": "int16", "versions": "0+",
"about": "The current finalized feature level of this feature for the cluster." }
"about": "The current finalized feature level of this feature for the cluster, a value of 0 means feature not supported." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,22 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
Expand All @@ -39,6 +45,8 @@
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
Expand Down Expand Up @@ -302,6 +310,22 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
create("isFenced").setContents("false");
break;
}
case BROKER_REGISTRATION_CHANGE_RECORD: {
BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord) message;
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).get();
if (fencingChange != BrokerRegistrationFencingChange.NONE) {
data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
.create("isFenced").setContents(Boolean.toString(fencingChange.asBoolean().get()));
}
BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).get();
if (inControlledShutdownChange != BrokerRegistrationInControlledShutdownChange.NONE) {
data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
.create("inControlledShutdown").setContents(Boolean.toString(inControlledShutdownChange.asBoolean().get()));
}
break;
}
case REMOVE_TOPIC_RECORD: {
RemoveTopicRecord record = (RemoveTopicRecord) message;
DirectoryNode topicsDirectory =
Expand Down Expand Up @@ -333,6 +357,35 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
producerIds.create("nextBlockStartId").setContents(record.nextProducerId() + "");
break;
}
case ACCESS_CONTROL_ENTRY_RECORD: {
AccessControlEntryRecord record = (AccessControlEntryRecord) message;
DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
FileNode file = acls.create(record.id().toString());
Copy link
Member Author

@dengziming dengziming Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as KIP-778 described, each ACL is shown in /acl/id/ in its JSON form. however, I should mention here that their schema in ZkNode is more hierarchical:

get /kafka/kafka-acl/${resourceType}/${resourceName}
 
{
    "version":1,
    "acls":[
        {
            "principal":"User:kafka",
            "permissionType":"Allow",
            "operation":"Read",
            "host":"1.1.1.1"
        }
    ]
}

file.setContents(AccessControlEntryRecordJsonConverter.write(record,
AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
break;
}
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
RemoveAccessControlEntryRecord record = (RemoveAccessControlEntryRecord) message;
DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
acls.rmrf(record.id().toString());
break;
}
case FEATURE_LEVEL_RECORD: {
FeatureLevelRecord record = (FeatureLevelRecord) message;
DirectoryNode features = data.root.mkdirs("features");
if (record.featureLevel() == 0) {
features.rmrf(record.name());
Comment on lines +377 to +378
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, could you let me know where we mentioned featureLevel == 0 means delete it? I checked the record schema, it didn't mention this:

"apiKey": 12,
"type": "metadata",
"name": "FeatureLevelRecord",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The feature name." },
{ "name": "FeatureLevel", "type": "int16", "versions": "0+",
"about": "The current finalized feature level of this feature for the cluster." }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly we have a RemoveFeatureLevelRecord.json but we removed it in #12207 and we use 0-0 to represent not supported:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing! What a magic 0, haha! Could we add a simple note into FeatureLevelRecord.json? Something like:

{ "name": "FeatureLevel", "type": "int16", "versions": "0+", 
     "about": "The current finalized feature level of this feature for the cluster. 0 means feature not enabled." } 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion.

} else {
FileNode file = features.create(record.name());
file.setContents(FeatureLevelRecordJsonConverter.write(record,
FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
}
break;
}
case NO_OP_RECORD: {
break;
}
default:
throw new RuntimeException("Unhandled metadata record type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,31 @@
package org.apache.kafka.shell;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -256,6 +268,61 @@ public void testUnfenceBrokerRecordAndFenceBrokerRecord() {
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
}

@Test
public void testBrokerRegistrationChangeRecord() {
RegisterBrokerRecord record = new RegisterBrokerRecord()
.setBrokerId(1)
.setBrokerEpoch(2);
metadataNodeManager.handleMessage(record);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());

// Unfence broker
BrokerRegistrationChangeRecord record1 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
metadataNodeManager.handleMessage(record1);
assertEquals("false",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());

// Fence broker
BrokerRegistrationChangeRecord record2 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setFenced(BrokerRegistrationFencingChange.FENCE.value());
metadataNodeManager.handleMessage(record2);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());

// Unchanged
BrokerRegistrationChangeRecord record3 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setFenced(BrokerRegistrationFencingChange.NONE.value());
metadataNodeManager.handleMessage(record3);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());

// Controlled shutdown
BrokerRegistrationChangeRecord record4 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
metadataNodeManager.handleMessage(record4);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());

// Unchanged
BrokerRegistrationChangeRecord record5 = new BrokerRegistrationChangeRecord()
.setBrokerId(1)
.setBrokerEpoch(2)
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.NONE.value());
metadataNodeManager.handleMessage(record5);
assertEquals("true",
metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
}

@Test
public void testClientQuotaRecord() {
ClientQuotaRecord record = new ClientQuotaRecord()
Expand Down Expand Up @@ -336,4 +403,43 @@ public void testProducerIdsRecord() {
11000 + "",
metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents());
}

@Test
public void testAccessControlEntryRecordAndRemoveAccessControlEntryRecord() {
AccessControlEntryRecord record1 = new AccessControlEntryRecord()
.setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since this is just for test, we can use Uuid.ZERO_UUID for testing, to avoid random strings here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little overkill here, we are using random strings in a lot of test cases and it won't reduce the code quality, and Uuid.ZERO_UUID may have some special meaning in different cases.

.setHost("example.com")
.setResourceType(ResourceType.GROUP.code())
.setResourceName("group")
.setOperation(AclOperation.READ.code())
.setPermissionType(AclPermissionType.ALLOW.code())
.setPrincipal("User:kafka")
.setPatternType(PatternType.LITERAL.code());
metadataNodeManager.handleMessage(record1);
assertEquals(
AccessControlEntryRecordJsonConverter.write(record1, AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
metadataNodeManager.getData().root().directory("acl").directory("id").file("GcaQDl2UTsCNs1p9s37XkQ").contents());

RemoveAccessControlEntryRecord record2 = new RemoveAccessControlEntryRecord()
.setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
metadataNodeManager.handleMessage(record2);
assertFalse(metadataNodeManager.getData().root().directory("acl").directory("id").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
}

@Test
public void testFeatureLevelRecord() {
FeatureLevelRecord record1 = new FeatureLevelRecord()
.setName("metadata.version")
.setFeatureLevel((short) 3);
metadataNodeManager.handleMessage(record1);
assertEquals(
FeatureLevelRecordJsonConverter.write(record1, FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
metadataNodeManager.getData().root().directory("features").file("metadata.version").contents());

FeatureLevelRecord record2 = new FeatureLevelRecord()
.setName("metadata.version")
.setFeatureLevel((short) 0);
metadataNodeManager.handleMessage(record2);
assertFalse(metadataNodeManager.getData().root().directory("features").children().containsKey("metadata.version"));
}
}