diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 369d2f8f23bbe..0bdf245d5b9b0 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -64,7 +64,7 @@ object MetadataCacheTest { val partialImage = new MetadataImage( new RaftOffsetAndEpoch(100, 10), image.features(), ClusterImage.EMPTY, - image.topics(), image.configs(), image.clientQuotas()) + image.topics(), image.configs(), image.clientQuotas(), image.producerIds()) val delta = new MetadataDelta(partialImage) def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 2b1a397db20d3..64d155686d3d2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.stream.IntStream import java.util.{Collections, Optional, Properties} + import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log._ @@ -50,8 +51,8 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, TopicIdPartition, Uuid} -import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicsDelta, TopicsImage} +import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, ProducerIdsImage, TopicsDelta, TopicsImage} import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch} import org.easymock.EasyMock import org.junit.jupiter.api.Assertions._ @@ -3489,7 +3490,8 @@ class ReplicaManagerTest { ClusterImageTest.IMAGE1, topicsImage, ConfigurationsImage.EMPTY, - ClientQuotasImage.EMPTY + ClientQuotasImage.EMPTY, + ProducerIdsImage.EMPTY ) } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index aa7725b9e4f4c..1d2b8c367a5a0 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; @@ -61,6 +62,8 @@ public final class MetadataDelta { private ClientQuotasDelta clientQuotasDelta = null; + private ProducerIdsDelta producerIdsDelta = null; + public MetadataDelta(MetadataImage image) { this.image = image; this.highestOffset = image.highestOffsetAndEpoch().offset; @@ -75,22 +78,58 @@ public FeaturesDelta featuresDelta() { return featuresDelta; } + public FeaturesDelta getOrCreateFeaturesDelta() { + if (featuresDelta == null) featuresDelta = new FeaturesDelta(image.features()); + return featuresDelta; + } + public ClusterDelta clusterDelta() { return clusterDelta; } + public ClusterDelta getOrCreateClusterDelta() { + if (clusterDelta == null) clusterDelta = new ClusterDelta(image.cluster()); + return clusterDelta; + } + public TopicsDelta topicsDelta() { return topicsDelta; } + public TopicsDelta getOrCreateTopicsDelta() { + if (topicsDelta == null) topicsDelta = new TopicsDelta(image.topics()); + return topicsDelta; + } + public ConfigurationsDelta configsDelta() { return configsDelta; } + public ConfigurationsDelta getOrCreateConfigsDelta() { + if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs()); + return configsDelta; + } + public ClientQuotasDelta clientQuotasDelta() { return clientQuotasDelta; } + public ClientQuotasDelta getOrCreateClientQuotasDelta() { + if (clientQuotasDelta == null) clientQuotasDelta = new ClientQuotasDelta(image.clientQuotas()); + return clientQuotasDelta; + } + + public ProducerIdsDelta producerIdsDelta() { + return producerIdsDelta; + } + + public ProducerIdsDelta getOrCreateProducerIdsDelta() { + if (producerIdsDelta == null) { + producerIdsDelta = new ProducerIdsDelta(image.producerIds()); + } + return producerIdsDelta; + } + public void read(long highestOffset, int highestEpoch, Iterator> reader) { while (reader.hasNext()) { List batch = reader.next(); @@ -140,7 +179,7 @@ public void replay(long offset, int epoch, ApiMessage record) { replay((ClientQuotaRecord) record); break; case PRODUCER_IDS_RECORD: - // Nothing to do. + replay((ProducerIdsRecord) record); break; case REMOVE_FEATURE_LEVEL_RECORD: replay((RemoveFeatureLevelRecord) record); @@ -164,60 +203,53 @@ public void replay(UnregisterBrokerRecord record) { } public void replay(TopicRecord record) { - if (topicsDelta == null) topicsDelta = new TopicsDelta(image.topics()); - topicsDelta.replay(record); + getOrCreateTopicsDelta().replay(record); } public void replay(PartitionRecord record) { - if (topicsDelta == null) topicsDelta = new TopicsDelta(image.topics()); - topicsDelta.replay(record); + getOrCreateTopicsDelta().replay(record); } public void replay(ConfigRecord record) { - if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs()); - configsDelta.replay(record); + getOrCreateConfigsDelta().replay(record); } public void replay(PartitionChangeRecord record) { - if (topicsDelta == null) topicsDelta = new TopicsDelta(image.topics()); - topicsDelta.replay(record); + getOrCreateTopicsDelta().replay(record); } public void replay(FenceBrokerRecord record) { - if (clusterDelta == null) clusterDelta = new ClusterDelta(image.cluster()); - clusterDelta.replay(record); + getOrCreateClusterDelta().replay(record); } public void replay(UnfenceBrokerRecord record) { - if (clusterDelta == null) clusterDelta = new ClusterDelta(image.cluster()); - clusterDelta.replay(record); + getOrCreateClusterDelta().replay(record); } public void replay(RemoveTopicRecord record) { - if (topicsDelta == null) topicsDelta = new TopicsDelta(image.topics()); + getOrCreateTopicsDelta().replay(record); String topicName = topicsDelta.replay(record); - if (configsDelta == null) configsDelta = new ConfigurationsDelta(image.configs()); - configsDelta.replay(record, topicName); + getOrCreateConfigsDelta().replay(record, topicName); } public void replay(FeatureLevelRecord record) { - if (featuresDelta == null) featuresDelta = new FeaturesDelta(image.features()); - featuresDelta.replay(record); + getOrCreateFeaturesDelta().replay(record); } public void replay(BrokerRegistrationChangeRecord record) { - if (clusterDelta == null) clusterDelta = new ClusterDelta(image.cluster()); - clusterDelta.replay(record); + getOrCreateClusterDelta().replay(record); } public void replay(ClientQuotaRecord record) { - if (clientQuotasDelta == null) clientQuotasDelta = new ClientQuotasDelta(image.clientQuotas()); - clientQuotasDelta.replay(record); + getOrCreateClientQuotasDelta().replay(record); + } + + public void replay(ProducerIdsRecord record) { + getOrCreateProducerIdsDelta().replay(record); } public void replay(RemoveFeatureLevelRecord record) { - if (featuresDelta == null) featuresDelta = new FeaturesDelta(image.features()); - featuresDelta.replay(record); + getOrCreateFeaturesDelta().replay(record); } /** @@ -225,11 +257,12 @@ public void replay(RemoveFeatureLevelRecord record) { * referenced in the snapshot records we just applied. */ public void finishSnapshot() { - if (featuresDelta != null) featuresDelta.finishSnapshot(); - if (clusterDelta != null) clusterDelta.finishSnapshot(); - if (topicsDelta != null) topicsDelta.finishSnapshot(); - if (configsDelta != null) configsDelta.finishSnapshot(); - if (clientQuotasDelta != null) clientQuotasDelta.finishSnapshot(); + getOrCreateFeaturesDelta().finishSnapshot(); + getOrCreateClusterDelta().finishSnapshot(); + getOrCreateTopicsDelta().finishSnapshot(); + getOrCreateConfigsDelta().finishSnapshot(); + getOrCreateClientQuotasDelta().finishSnapshot(); + getOrCreateProducerIdsDelta().finishSnapshot(); } public MetadataImage apply() { @@ -263,13 +296,20 @@ public MetadataImage apply() { } else { newClientQuotas = clientQuotasDelta.apply(); } + ProducerIdsImage newProducerIds; + if (producerIdsDelta == null) { + newProducerIds = image.producerIds(); + } else { + newProducerIds = producerIdsDelta.apply(); + } return new MetadataImage( new OffsetAndEpoch(highestOffset, highestEpoch), newFeatures, newCluster, newTopics, newConfigs, - newClientQuotas + newClientQuotas, + newProducerIds ); } @@ -283,6 +323,7 @@ public String toString() { ", topicsDelta=" + topicsDelta + ", configsDelta=" + configsDelta + ", clientQuotasDelta=" + clientQuotasDelta + + ", producerIdsDelta=" + producerIdsDelta + ')'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java index b9fdbc17580f1..76aebe6140091 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java @@ -37,7 +37,8 @@ public final class MetadataImage { ClusterImage.EMPTY, TopicsImage.EMPTY, ConfigurationsImage.EMPTY, - ClientQuotasImage.EMPTY); + ClientQuotasImage.EMPTY, + ProducerIdsImage.EMPTY); private final OffsetAndEpoch highestOffsetAndEpoch; @@ -51,13 +52,16 @@ public final class MetadataImage { private final ClientQuotasImage clientQuotas; + private final ProducerIdsImage producerIds; + public MetadataImage( OffsetAndEpoch highestOffsetAndEpoch, FeaturesImage features, ClusterImage cluster, TopicsImage topics, ConfigurationsImage configs, - ClientQuotasImage clientQuotas + ClientQuotasImage clientQuotas, + ProducerIdsImage producerIds ) { this.highestOffsetAndEpoch = highestOffsetAndEpoch; this.features = features; @@ -65,6 +69,7 @@ public MetadataImage( this.topics = topics; this.configs = configs; this.clientQuotas = clientQuotas; + this.producerIds = producerIds; } public boolean isEmpty() { @@ -72,7 +77,8 @@ public boolean isEmpty() { cluster.isEmpty() && topics.isEmpty() && configs.isEmpty() && - clientQuotas.isEmpty(); + clientQuotas.isEmpty() && + producerIds.isEmpty(); } public OffsetAndEpoch highestOffsetAndEpoch() { @@ -99,12 +105,17 @@ public ClientQuotasImage clientQuotas() { return clientQuotas; } + public ProducerIdsImage producerIds() { + return producerIds; + } + public void write(Consumer> out) { features.write(out); cluster.write(out); topics.write(out); configs.write(out); clientQuotas.write(out); + producerIds.write(out); } @Override @@ -116,12 +127,19 @@ public boolean equals(Object o) { cluster.equals(other.cluster) && topics.equals(other.topics) && configs.equals(other.configs) && - clientQuotas.equals(other.clientQuotas); + clientQuotas.equals(other.clientQuotas) && + producerIds.equals(other.producerIds); } @Override public int hashCode() { - return Objects.hash(highestOffsetAndEpoch, features, cluster, topics, configs, clientQuotas); + return Objects.hash(highestOffsetAndEpoch, + features, + cluster, + topics, + configs, + clientQuotas, + producerIds); } @Override @@ -132,6 +150,7 @@ public String toString() { ", topics=" + topics + ", configs=" + configs + ", clientQuotas=" + clientQuotas + + ", producerIdsImage=" + producerIds + ")"; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java new file mode 100644 index 0000000000000..c0c43ea01f40b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.ProducerIdsRecord; + + +public final class ProducerIdsDelta { + private long highestSeenProducerId; + + public ProducerIdsDelta(ProducerIdsImage image) { + this.highestSeenProducerId = image.highestSeenProducerId(); + } + + public void setHighestSeenProducerId(long highestSeenProducerId) { + this.highestSeenProducerId = highestSeenProducerId; + } + + public long highestSeenProducerId() { + return highestSeenProducerId; + } + + public void finishSnapshot() { + // Nothing to do + } + + public void replay(ProducerIdsRecord record) { + highestSeenProducerId = record.producerIdsEnd(); + } + + public ProducerIdsImage apply() { + return new ProducerIdsImage(highestSeenProducerId); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java new file mode 100644 index 0000000000000..1d497e051fcc1 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + + +/** + * Stores the highest seen producer ID in the metadata image. + * + * This class is thread-safe. + */ +public final class ProducerIdsImage { + public final static ProducerIdsImage EMPTY = new ProducerIdsImage(-1L); + + private final long highestSeenProducerId; + + public ProducerIdsImage(long highestSeenProducerId) { + this.highestSeenProducerId = highestSeenProducerId; + } + + public long highestSeenProducerId() { + return highestSeenProducerId; + } + + public void write(Consumer> out) { + if (highestSeenProducerId >= 0) { + out.accept(Collections.singletonList(new ApiMessageAndVersion( + new ProducerIdsRecord(). + setBrokerId(-1). + setBrokerEpoch(-1). + setProducerIdsEnd(highestSeenProducerId), (short) 0))); + } + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ProducerIdsImage)) return false; + ProducerIdsImage other = (ProducerIdsImage) o; + return highestSeenProducerId == other.highestSeenProducerId; + } + + @Override + public int hashCode() { + return Objects.hash(highestSeenProducerId); + } + + @Override + public String toString() { + return "ProducerIdsImage(highestSeenProducerId=" + highestSeenProducerId + ")"; + } + + public boolean isEmpty() { + return highestSeenProducerId < 0; + } +} diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index 43709ba5f2c34..73606251f9c79 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -40,7 +40,8 @@ public class MetadataImageTest { ClusterImageTest.IMAGE1, TopicsImageTest.IMAGE1, ConfigurationsImageTest.IMAGE1, - ClientQuotasImageTest.IMAGE1); + ClientQuotasImageTest.IMAGE1, + ProducerIdsImageTest.IMAGE1); DELTA1 = new MetadataDelta(IMAGE1); RecordTestUtils.replayAll(DELTA1, 200, 5, FeaturesImageTest.DELTA1_RECORDS); @@ -48,6 +49,7 @@ public class MetadataImageTest { RecordTestUtils.replayAll(DELTA1, 200, 5, TopicsImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, 200, 5, ConfigurationsImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, 200, 5, ClientQuotasImageTest.DELTA1_RECORDS); + RecordTestUtils.replayAll(DELTA1, 200, 5, ProducerIdsImageTest.DELTA1_RECORDS); IMAGE2 = new MetadataImage( new OffsetAndEpoch(200, 5), @@ -55,7 +57,8 @@ public class MetadataImageTest { ClusterImageTest.IMAGE2, TopicsImageTest.IMAGE2, ConfigurationsImageTest.IMAGE2, - ClientQuotasImageTest.IMAGE2); + ClientQuotasImageTest.IMAGE2, + ProducerIdsImageTest.IMAGE2); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java new file mode 100644 index 0000000000000..2b147bd40403b --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/ProducerIdsImageTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +@Timeout(value = 40) +public class ProducerIdsImageTest { + final static ProducerIdsImage IMAGE1; + + final static List DELTA1_RECORDS; + + final static ProducerIdsDelta DELTA1; + + final static ProducerIdsImage IMAGE2; + + static { + IMAGE1 = new ProducerIdsImage(123); + + DELTA1_RECORDS = new ArrayList<>(); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord(). + setBrokerId(2). + setBrokerEpoch(100). + setProducerIdsEnd(456), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new ProducerIdsRecord(). + setBrokerId(3). + setBrokerEpoch(100). + setProducerIdsEnd(789), (short) 0)); + + DELTA1 = new ProducerIdsDelta(IMAGE1); + RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); + + IMAGE2 = new ProducerIdsImage(789); + } + + @Test + public void testEmptyImageRoundTrip() throws Throwable { + testToImageAndBack(ProducerIdsImage.EMPTY); + } + + @Test + public void testImage1RoundTrip() throws Throwable { + testToImageAndBack(IMAGE1); + } + + @Test + public void testApplyDelta1() throws Throwable { + assertEquals(IMAGE2, DELTA1.apply()); + } + + @Test + public void testImage2RoundTrip() throws Throwable { + testToImageAndBack(IMAGE2); + } + + private void testToImageAndBack(ProducerIdsImage image) throws Throwable { + MockSnapshotConsumer writer = new MockSnapshotConsumer(); + image.write(writer); + ProducerIdsDelta delta = new ProducerIdsDelta(ProducerIdsImage.EMPTY); + RecordTestUtils.replayAllBatches(delta, writer.batches()); + ProducerIdsImage nextImage = delta.apply(); + assertEquals(image, nextImage); + } +}