Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ final class BrokerServerMetrics private (
)

addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
lastAppliedImageProvenance.get.offset()
lastAppliedImageProvenance.get.lastContainedOffset()
}

addMetric(metrics, lastAppliedRecordTimestampName) { _ =>
Expand Down Expand Up @@ -132,7 +132,7 @@ final class BrokerServerMetrics private (
override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
lastAppliedImageProvenance.set(provenance)

override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().offset()
override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset()

def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ final class BrokerServerMetricsTest {
val expectedValue = 1000
brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
expectedValue,
brokerMetrics.lastAppliedImageProvenance.get().epoch(),
brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(),
brokerMetrics.lastAppliedTimestamp()));
assertEquals(expectedValue, offsetMetric.metricValue.asInstanceOf[Long])
}
Expand All @@ -90,7 +90,7 @@ final class BrokerServerMetricsTest {

brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
brokerMetrics.lastAppliedOffset(),
brokerMetrics.lastAppliedImageProvenance.get().epoch(),
brokerMetrics.lastAppliedImageProvenance.get().lastContainedEpoch(),
timestamp))
assertEquals(timestamp, timestampMetric.metricValue.asInstanceOf[Long])
assertEquals(time.milliseconds - timestamp, lagMetric.metricValue.asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ public MetadataProvenance provenance() {
}

public OffsetAndEpoch highestOffsetAndEpoch() {
return new OffsetAndEpoch(provenance.offset(), provenance.epoch());
return new OffsetAndEpoch(provenance.lastContainedOffset(), provenance.lastContainedEpoch());
}

public long offset() {
return provenance.offset();
return provenance.lastContainedOffset();
}

public FeaturesImage features() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.image;

import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.snapshot.Snapshots;

import java.util.Objects;

Expand All @@ -28,30 +29,30 @@
public final class MetadataProvenance {
public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, -1, -1L);

private final long offset;
private final int epoch;
private final long lastContainedOffset;
private final int lastContainedEpoch;
private final long lastContainedLogTimeMs;

public MetadataProvenance(
long offset,
int epoch,
long lastContainedOffset,
int lastContainedEpoch,
long lastContainedLogTimeMs
) {
this.offset = offset;
this.epoch = epoch;
this.lastContainedOffset = lastContainedOffset;
this.lastContainedEpoch = lastContainedEpoch;
this.lastContainedLogTimeMs = lastContainedLogTimeMs;
}

public OffsetAndEpoch offsetAndEpoch() {
return new OffsetAndEpoch(offset, epoch);
public OffsetAndEpoch snapshotId() {
return new OffsetAndEpoch(lastContainedOffset + 1, lastContainedEpoch);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most important change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment about how the snapshot ID contains an offset one greater than the last contained offset? It could otherwise be easy to miss.

Probably include a reference to this JIRA as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'll do this when I implement https://issues.apache.org/jira/browse/KAFKA-14620.

}

public long offset() {
return offset;
public long lastContainedOffset() {
return lastContainedOffset;
}

public int epoch() {
return epoch;
public int lastContainedEpoch() {
return lastContainedEpoch;
}

public long lastContainedLogTimeMs() {
Expand All @@ -62,30 +63,30 @@ public long lastContainedLogTimeMs() {
* Returns the name that a snapshot with this provenance would have.
*/
public String snapshotName() {
return String.format("snapshot %020d-%010d", offset, epoch);
return String.format("snapshot %s", Snapshots.filenameFromSnapshotId(snapshotId()));
}

@Override
public boolean equals(Object o) {
if (o == null || !o.getClass().equals(this.getClass())) return false;
MetadataProvenance other = (MetadataProvenance) o;
return offset == other.offset &&
epoch == other.epoch &&
return lastContainedOffset == other.lastContainedOffset &&
lastContainedEpoch == other.lastContainedEpoch &&
lastContainedLogTimeMs == other.lastContainedLogTimeMs;
}

@Override
public int hashCode() {
return Objects.hash(offset,
epoch,
return Objects.hash(lastContainedOffset,
lastContainedEpoch,
lastContainedLogTimeMs);
}

@Override
public String toString() {
return "MetadataProvenance(" +
"offset=" + offset +
", epoch=" + epoch +
"lastContainedOffset=" + lastContainedOffset +
", lastContainedEpoch=" + lastContainedEpoch +
", lastContainedLogTimeMs=" + lastContainedLogTimeMs +
")";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void updateBatchSize(int size) { }

@Override
public void updateLastAppliedImageProvenance(MetadataProvenance provenance) {
this.lastAppliedOffset = provenance.offset();
this.lastAppliedOffset = provenance.lastContainedOffset();
}

@Override
Expand Down Expand Up @@ -278,18 +278,18 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
LogDeltaManifest manifest = loadLogDelta(delta, reader);
if (log.isDebugEnabled()) {
log.debug("Generated a metadata delta between {} and {} from {} batch(es) " +
"in {} us.", image.offset(), manifest.provenance().offset(),
"in {} us.", image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
}
try {
image = delta.apply(manifest.provenance());
} catch (Throwable e) {
faultHandler.handleFault("Error generating new metadata image from " +
"metadata delta between offset " + image.offset() +
" and " + manifest.provenance().offset(), e);
" and " + manifest.provenance().lastContainedOffset(), e);
return;
}
if (stillNeedToCatchUp(manifest.provenance().offset())) {
if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
return;
}
log.debug("Publishing new image with provenance {}.", image.provenance());
Expand All @@ -298,7 +298,7 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
publisher.publishLogDelta(delta, image, manifest);
} catch (Throwable e) {
faultHandler.handleFault("Unhandled error publishing the new metadata " +
"image ending at " + manifest.provenance().offset() +
"image ending at " + manifest.provenance().lastContainedOffset() +
" with publisher " + publisher.name(), e);
}
}
Expand Down Expand Up @@ -332,8 +332,8 @@ LogDeltaManifest loadLogDelta(
long startNs = time.nanoseconds();
int numBatches = 0;
long numBytes = 0L;
long lastOffset = image.provenance().offset();
int lastEpoch = image.provenance().epoch();
long lastOffset = image.provenance().lastContainedOffset();
int lastEpoch = image.provenance().lastContainedEpoch();
long lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs();

while (reader.hasNext()) {
Expand Down Expand Up @@ -376,7 +376,7 @@ public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
SnapshotManifest manifest = loadSnapshot(delta, reader);
if (log.isDebugEnabled()) {
log.debug("Generated a metadata delta from a snapshot at offset {} " +
"in {} us.", manifest.provenance().offset(),
"in {} us.", manifest.provenance().lastContainedOffset(),
NANOSECONDS.toMicros(manifest.elapsedNs()));
}
try {
Expand All @@ -386,7 +386,7 @@ public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
"snapshot at offset " + reader.lastContainedLogOffset(), e);
return;
}
if (stillNeedToCatchUp(manifest.provenance().offset())) {
if (stillNeedToCatchUp(manifest.provenance().lastContainedOffset())) {
return;
}
log.debug("Publishing new snapshot image with provenance {}.", image.provenance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ private SnapshotEmitter(
@Override
public void maybeEmit(MetadataImage image) {
MetadataProvenance provenance = image.provenance();
Optional<SnapshotWriter<ApiMessageAndVersion>> snapshotWriter =
raftClient.createSnapshot(provenance.offsetAndEpoch(),
provenance.lastContainedLogTimeMs());
Optional<SnapshotWriter<ApiMessageAndVersion>> snapshotWriter = raftClient.createSnapshot(
provenance.snapshotId(),
provenance.lastContainedLogTimeMs()
);
if (!snapshotWriter.isPresent()) {
log.error("Not generating {} because it already exists.", provenance.snapshotName());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ public void publishSnapshot(
MetadataImage newImage,
SnapshotManifest manifest
) {
log.debug("Resetting the snapshot counters because we just read a snapshot at offset {}.",
newImage.provenance().offset());
log.debug("Resetting the snapshot counters because we just read {}.", newImage.provenance().snapshotName());
resetSnapshotCounters();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
Expand Down Expand Up @@ -154,13 +154,11 @@ static MockSnapshotReader fromRecordLists(
MetadataProvenance provenance,
List<List<ApiMessageAndVersion>> lists
) {
List<Batch<ApiMessageAndVersion>> batches = new ArrayList<>();
lists.forEach(records -> batches.add(Batch.data(
provenance.offset(),
provenance.epoch(),
provenance.lastContainedLogTimeMs(),
0,
records)));
List<Batch<ApiMessageAndVersion>> batches = lists
.stream()
.map(records -> Batch.data(0, 0, 0, 0, records))
.collect(Collectors.toList());

return new MockSnapshotReader(provenance, batches);
}

Expand All @@ -179,17 +177,17 @@ MockSnapshotReader setTime(MockTime time) {

@Override
public OffsetAndEpoch snapshotId() {
return provenance.offsetAndEpoch();
return provenance.snapshotId();
}

@Override
public long lastContainedLogOffset() {
return provenance.offset();
return provenance.lastContainedOffset();
}

@Override
public int lastContainedLogEpoch() {
return provenance.epoch();
return provenance.lastContainedEpoch();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public OffsetAndEpoch snapshotId() {

@Override
public long lastContainedLogOffset() {
return snapshotId.offset();
return snapshotId.offset() - 1;
}

@Override
Expand Down Expand Up @@ -190,7 +190,7 @@ public void testEmit() throws Exception {
build();
emitter.maybeEmit(MetadataImageTest.IMAGE1);
MockSnapshotWriter writer = mockRaftClient.writers.get(
MetadataImageTest.IMAGE1.highestOffsetAndEpoch());
MetadataImageTest.IMAGE1.provenance().snapshotId());
assertNotNull(writer);
assertEquals(MetadataImageTest.IMAGE1.highestOffsetAndEpoch().offset(),
writer.lastContainedLogOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static Path snapshotDir(Path logDir) {
return logDir;
}

static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) {
public static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) {
return String.format("%s-%s", OFFSET_FORMATTER.format(snapshotId.offset()), EPOCH_FORMATTER.format(snapshotId.epoch()));
}

Expand Down