diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala index 465b10f1e54b8..6b7701485e293 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala @@ -93,7 +93,7 @@ final class BrokerServerMetrics private ( ) addMetric(metrics, lastAppliedRecordOffsetName) { _ => - lastAppliedImageProvenance.get.offset() + lastAppliedImageProvenance.get.lastContainedOffset() } addMetric(metrics, lastAppliedRecordTimestampName) { _ => @@ -132,7 +132,7 @@ final class BrokerServerMetrics private ( override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit = lastAppliedImageProvenance.set(provenance) - override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().offset() + override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset() def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs() } diff --git a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala index 200deed42704a..fc1c53543b5d6 100644 --- a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala +++ b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala @@ -67,7 +67,7 @@ final class BrokerServerMetricsTest { val expectedValue = 1000 brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance( expectedValue, - brokerMetrics.lastAppliedImageProvenance.get().epoch(), + brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(), brokerMetrics.lastAppliedTimestamp())); assertEquals(expectedValue, offsetMetric.metricValue.asInstanceOf[Long]) } @@ -90,7 +90,7 @@ final class BrokerServerMetricsTest { brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance( brokerMetrics.lastAppliedOffset(), - brokerMetrics.lastAppliedImageProvenance.get().epoch(), + brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(), timestamp)) assertEquals(timestamp, timestampMetric.metricValue.asInstanceOf[Long]) assertEquals(time.milliseconds - timestamp, lagMetric.metricValue.asInstanceOf[Long]) diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java index 36db5dab1de6b..2202b4fe2fe62 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java @@ -91,11 +91,11 @@ public MetadataProvenance provenance() { } public OffsetAndEpoch highestOffsetAndEpoch() { - return new OffsetAndEpoch(provenance.offset(), provenance.epoch()); + return new OffsetAndEpoch(provenance.lastContainedOffset(), provenance.lastContainedEpoch()); } public long offset() { - return provenance.offset(); + return provenance.lastContainedOffset(); } public FeaturesImage features() { diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java index 3e65d3cfb8031..57ce8efa6df52 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java @@ -18,6 +18,7 @@ package org.apache.kafka.image; import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.snapshot.Snapshots; import java.util.Objects; @@ -28,30 +29,30 @@ public final class MetadataProvenance { public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, -1, -1L); - private final long offset; - private final int epoch; + private final long lastContainedOffset; + private final int lastContainedEpoch; private final long lastContainedLogTimeMs; public MetadataProvenance( - long offset, - int epoch, + long lastContainedOffset, + int lastContainedEpoch, long lastContainedLogTimeMs ) { - this.offset = offset; - this.epoch = epoch; + this.lastContainedOffset = lastContainedOffset; + this.lastContainedEpoch = lastContainedEpoch; this.lastContainedLogTimeMs = lastContainedLogTimeMs; } - public OffsetAndEpoch offsetAndEpoch() { - return new OffsetAndEpoch(offset, epoch); + public OffsetAndEpoch snapshotId() { + return new OffsetAndEpoch(lastContainedOffset + 1, lastContainedEpoch); } - public long offset() { - return offset; + public long lastContainedOffset() { + return lastContainedOffset; } - public int epoch() { - return epoch; + public int lastContainedEpoch() { + return lastContainedEpoch; } public long lastContainedLogTimeMs() { @@ -62,30 +63,30 @@ public long lastContainedLogTimeMs() { * Returns the name that a snapshot with this provenance would have. */ public String snapshotName() { - return String.format("snapshot %020d-%010d", offset, epoch); + return String.format("snapshot %s", Snapshots.filenameFromSnapshotId(snapshotId())); } @Override public boolean equals(Object o) { if (o == null || !o.getClass().equals(this.getClass())) return false; MetadataProvenance other = (MetadataProvenance) o; - return offset == other.offset && - epoch == other.epoch && + return lastContainedOffset == other.lastContainedOffset && + lastContainedEpoch == other.lastContainedEpoch && lastContainedLogTimeMs == other.lastContainedLogTimeMs; } @Override public int hashCode() { - return Objects.hash(offset, - epoch, + return Objects.hash(lastContainedOffset, + lastContainedEpoch, lastContainedLogTimeMs); } @Override public String toString() { return "MetadataProvenance(" + - "offset=" + offset + - ", epoch=" + epoch + + "lastContainedOffset=" + lastContainedOffset + + ", lastContainedEpoch=" + lastContainedEpoch + ", lastContainedLogTimeMs=" + lastContainedLogTimeMs + ")"; } diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index c1fd0aa4389be..21df1a761cd5e 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -79,7 +79,7 @@ public void updateBatchSize(int size) { } @Override public void updateLastAppliedImageProvenance(MetadataProvenance provenance) { - this.lastAppliedOffset = provenance.offset(); + this.lastAppliedOffset = provenance.lastContainedOffset(); } @Override @@ -278,7 +278,7 @@ public void handleCommit(BatchReader reader) { LogDeltaManifest manifest = loadLogDelta(delta, reader); if (log.isDebugEnabled()) { log.debug("Generated a metadata delta between {} and {} from {} batch(es) " + - "in {} us.", image.offset(), manifest.provenance().offset(), + "in {} us.", image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs())); } try { @@ -286,10 +286,10 @@ public void handleCommit(BatchReader reader) { } catch (Throwable e) { faultHandler.handleFault("Error generating new metadata image from " + "metadata delta between offset " + image.offset() + - " and " + manifest.provenance().offset(), e); + " and " + manifest.provenance().lastContainedOffset(), e); return; } - if (stillNeedToCatchUp(manifest.provenance().offset())) { + if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) { return; } log.debug("Publishing new image with provenance {}.", image.provenance()); @@ -298,7 +298,7 @@ public void handleCommit(BatchReader reader) { publisher.publishLogDelta(delta, image, manifest); } catch (Throwable e) { faultHandler.handleFault("Unhandled error publishing the new metadata " + - "image ending at " + manifest.provenance().offset() + + "image ending at " + manifest.provenance().lastContainedOffset() + " with publisher " + publisher.name(), e); } } @@ -332,8 +332,8 @@ LogDeltaManifest loadLogDelta( long startNs = time.nanoseconds(); int numBatches = 0; long numBytes = 0L; - long lastOffset = image.provenance().offset(); - int lastEpoch = image.provenance().epoch(); + long lastOffset = image.provenance().lastContainedOffset(); + int lastEpoch = image.provenance().lastContainedEpoch(); long lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs(); while (reader.hasNext()) { @@ -376,7 +376,7 @@ public void handleSnapshot(SnapshotReader reader) { SnapshotManifest manifest = loadSnapshot(delta, reader); if (log.isDebugEnabled()) { log.debug("Generated a metadata delta from a snapshot at offset {} " + - "in {} us.", manifest.provenance().offset(), + "in {} us.", manifest.provenance().lastContainedOffset(), NANOSECONDS.toMicros(manifest.elapsedNs())); } try { @@ -386,7 +386,7 @@ public void handleSnapshot(SnapshotReader reader) { "snapshot at offset " + reader.lastContainedLogOffset(), e); return; } - if (stillNeedToCatchUp(manifest.provenance().offset())) { + if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) { return; } log.debug("Publishing new snapshot image with provenance {}.", image.provenance()); diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java index 31ac2169be4e4..ec46bcc06faec 100644 --- a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java @@ -98,9 +98,10 @@ private SnapshotEmitter( @Override public void maybeEmit(MetadataImage image) { MetadataProvenance provenance = image.provenance(); - Optional> snapshotWriter = - raftClient.createSnapshot(provenance.offsetAndEpoch(), - provenance.lastContainedLogTimeMs()); + Optional> snapshotWriter = raftClient.createSnapshot( + provenance.snapshotId(), + provenance.lastContainedLogTimeMs() + ); if (!snapshotWriter.isPresent()) { log.error("Not generating {} because it already exists.", provenance.snapshotName()); return; diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java index 43809de3898fe..651acc5248334 100644 --- a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java @@ -205,8 +205,7 @@ public void publishSnapshot( MetadataImage newImage, SnapshotManifest manifest ) { - log.debug("Resetting the snapshot counters because we just read a snapshot at offset {}.", - newImage.provenance().offset()); + log.debug("Resetting the snapshot counters because we just read {}.", newImage.provenance().snapshotName()); resetSnapshotCounters(); } diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index 1aa86738f980b..b7e49243c43b3 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -39,12 +39,12 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1; @@ -154,13 +154,11 @@ static MockSnapshotReader fromRecordLists( MetadataProvenance provenance, List> lists ) { - List> batches = new ArrayList<>(); - lists.forEach(records -> batches.add(Batch.data( - provenance.offset(), - provenance.epoch(), - provenance.lastContainedLogTimeMs(), - 0, - records))); + List> batches = lists + .stream() + .map(records -> Batch.data(0, 0, 0, 0, records)) + .collect(Collectors.toList()); + return new MockSnapshotReader(provenance, batches); } @@ -179,17 +177,17 @@ MockSnapshotReader setTime(MockTime time) { @Override public OffsetAndEpoch snapshotId() { - return provenance.offsetAndEpoch(); + return provenance.snapshotId(); } @Override public long lastContainedLogOffset() { - return provenance.offset(); + return provenance.lastContainedOffset(); } @Override public int lastContainedLogEpoch() { - return provenance.epoch(); + return provenance.lastContainedEpoch(); } @Override diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java index ef40d71460438..ca72aa058ef9d 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java @@ -142,7 +142,7 @@ public OffsetAndEpoch snapshotId() { @Override public long lastContainedLogOffset() { - return snapshotId.offset(); + return snapshotId.offset() - 1; } @Override @@ -190,7 +190,7 @@ public void testEmit() throws Exception { build(); emitter.maybeEmit(MetadataImageTest.IMAGE1); MockSnapshotWriter writer = mockRaftClient.writers.get( - MetadataImageTest.IMAGE1.highestOffsetAndEpoch()); + MetadataImageTest.IMAGE1.provenance().snapshotId()); assertNotNull(writer); assertEquals(MetadataImageTest.IMAGE1.highestOffsetAndEpoch().offset(), writer.lastContainedLogOffset()); diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java index e7a57ae183bc0..c04f2190616f6 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java @@ -52,7 +52,7 @@ static Path snapshotDir(Path logDir) { return logDir; } - static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) { + public static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) { return String.format("%s-%s", OFFSET_FORMATTER.format(snapshotId.offset()), EPOCH_FORMATTER.format(snapshotId.epoch())); }