diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java index fa1b411289c17..f7b867a6b01c3 100644 --- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java +++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecordJsonConverter; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; @@ -318,6 +319,15 @@ private void handleCommitImpl(MetadataRecordType type, ApiMessage message) node.create(record.key()).setContents(record.value() + ""); break; } + case PRODUCER_IDS_RECORD: { + ProducerIdsRecord record = (ProducerIdsRecord) message; + DirectoryNode producerIds = data.root.mkdirs("producerIds"); + producerIds.create("lastBlockBrokerId").setContents(record.brokerId() + ""); + producerIds.create("lastBlockBrokerEpoch").setContents(record.brokerEpoch() + ""); + + producerIds.create("nextBlockStartId").setContents(record.nextProducerId() + ""); + break; + } default: throw new RuntimeException("Unhandled metadata record type"); } diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java index 81483f5290cc9..f0cfffb28178b 100644 --- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java +++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecordJsonConverter; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; @@ -35,6 +36,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -276,7 +278,7 @@ public void testClientQuotaRecord() { "user", "kraft").children().containsKey("producer_byte_rate")); record = new ClientQuotaRecord() - .setEntity(Arrays.asList( + .setEntity(Collections.singletonList( new ClientQuotaRecord.EntityData() .setEntityType("user") .setEntityName(null) @@ -290,4 +292,41 @@ record = new ClientQuotaRecord() metadataNodeManager.getData().root().directory("client-quotas", "user", "").file("producer_byte_rate").contents()); } + + @Test + public void testProducerIdsRecord() { + // generate a producerId record + ProducerIdsRecord record1 = new ProducerIdsRecord() + .setBrokerId(0) + .setBrokerEpoch(1) + .setNextProducerId(10000); + metadataNodeManager.handleMessage(record1); + + assertEquals( + "0", + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerId").contents()); + assertEquals( + "1", + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerEpoch").contents()); + assertEquals( + 10000 + "", + metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents()); + + // generate another producerId record + ProducerIdsRecord record2 = new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(2) + .setNextProducerId(11000); + metadataNodeManager.handleMessage(record2); + + assertEquals( + "1", + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerId").contents()); + assertEquals( + "2", + metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerEpoch").contents()); + assertEquals( + 11000 + "", + metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents()); + } }