diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e7e2e4dca414f..8506bba5c509f 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -286,6 +286,7 @@
+
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index c0c1c65df349c..3d432159ce4c9 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -35,8 +35,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
-import org.apache.kafka.raft.BatchReader.Batch
-import org.apache.kafka.raft.{BatchReader, RaftClient, RaftConfig, RecordSerde}
+import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig, RecordSerde}
+import org.apache.kafka.snapshot.SnapshotReader
import scala.jdk.CollectionConverters._
@@ -150,6 +150,7 @@ class TestRaftServer(
case class HandleClaim(epoch: Int) extends RaftEvent
case object HandleResign extends RaftEvent
case class HandleCommit(reader: BatchReader[Array[Byte]]) extends RaftEvent
+ case class HandleSnapshot(reader: SnapshotReader[Array[Byte]]) extends RaftEvent
case object Shutdown extends RaftEvent
private val eventQueue = new LinkedBlockingDeque[RaftEvent]()
@@ -175,6 +176,10 @@ class TestRaftServer(
eventQueue.offer(HandleCommit(reader))
}
+ override def handleSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
+ eventQueue.offer(HandleSnapshot(reader))
+ }
+
override def initiateShutdown(): Boolean = {
val initiated = super.initiateShutdown()
eventQueue.offer(Shutdown)
@@ -226,7 +231,11 @@ class TestRaftServer(
reader.close()
}
- case _ =>
+ case HandleSnapshot(reader) =>
+ // Ignore snapshots; only interested in records appended by this leader
+ reader.close()
+
+ case Shutdown => // Ignore shutdown command
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/Batch.java b/raft/src/main/java/org/apache/kafka/raft/Batch.java
new file mode 100644
index 0000000000000..daa1e05c07a12
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/Batch.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A batch of records.
+ *
+ * This type contains a list of records `T` along with the information associated with those records.
+ */
+public final class Batch implements Iterable {
+ private final long baseOffset;
+ private final int epoch;
+ private final long lastOffset;
+ private final List records;
+
+ private Batch(long baseOffset, int epoch, long lastOffset, List records) {
+ this.baseOffset = baseOffset;
+ this.epoch = epoch;
+ this.lastOffset = lastOffset;
+ this.records = records;
+ }
+
+ /**
+ * The offset of the last record in the batch.
+ */
+ public long lastOffset() {
+ return lastOffset;
+ }
+
+ /**
+ * The offset of the first record in the batch.
+ */
+ public long baseOffset() {
+ return baseOffset;
+ }
+
+ /**
+ * The list of records in the batch.
+ */
+ public List records() {
+ return records;
+ }
+
+ /**
+ * The epoch of the leader that appended the record batch.
+ */
+ public int epoch() {
+ return epoch;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return records.iterator();
+ }
+
+ @Override
+ public String toString() {
+ return "Batch(" +
+ "baseOffset=" + baseOffset +
+ ", epoch=" + epoch +
+ ", lastOffset=" + lastOffset +
+ ", records=" + records +
+ ')';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Batch> batch = (Batch>) o;
+ return baseOffset == batch.baseOffset &&
+ epoch == batch.epoch &&
+ Objects.equals(records, batch.records);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(baseOffset, epoch, records);
+ }
+
+ /**
+ * Create a batch without any records.
+ *
+ * Internally this is used to propagate offset information for control batches which do not decode to the type T.
+ *
+ * @param baseOffset offset of the batch
+ * @param epoch epoch of the leader that created this batch
+ * @param lastOffset offset of the last record of this batch
+ */
+ public static Batch empty(long baseOffset, int epoch, long lastOffset) {
+ return new Batch<>(baseOffset, epoch, lastOffset, Collections.emptyList());
+ }
+
+ /**
+ * Create a batch with the given base offset, epoch and records.
+ *
+ * @param baseOffset offset of the first record in the batch
+ * @param epoch epoch of the leader that created this batch
+ * @param records the list of records in this batch
+ */
+ public static Batch of(long baseOffset, int epoch, List records) {
+ if (records.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Batch must contain at least one record; baseOffset = %s; epoch = %s",
+ baseOffset,
+ epoch
+ )
+ );
+ }
+
+ return new Batch<>(baseOffset, epoch, baseOffset + records.size() - 1, records);
+ }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java
index e5f9e38612a0d..6469af2095f57 100644
--- a/raft/src/main/java/org/apache/kafka/raft/BatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/BatchReader.java
@@ -17,8 +17,6 @@
package org.apache.kafka.raft;
import java.util.Iterator;
-import java.util.List;
-import java.util.Objects;
import java.util.OptionalLong;
/**
@@ -32,7 +30,7 @@
*
* @param record type (see {@link org.apache.kafka.raft.RecordSerde})
*/
-public interface BatchReader extends Iterator>, AutoCloseable {
+public interface BatchReader extends Iterator>, AutoCloseable {
/**
* Get the base offset of the readable batches. Note that this value is a constant
@@ -59,57 +57,4 @@ public interface BatchReader extends Iterator>, AutoClos
*/
@Override
void close();
-
- class Batch {
- private final long baseOffset;
- private final int epoch;
- private final List records;
-
- public Batch(long baseOffset, int epoch, List records) {
- this.baseOffset = baseOffset;
- this.epoch = epoch;
- this.records = records;
- }
-
- public long lastOffset() {
- return baseOffset + records.size() - 1;
- }
-
- public long baseOffset() {
- return baseOffset;
- }
-
- public List records() {
- return records;
- }
-
- public int epoch() {
- return epoch;
- }
-
- @Override
- public String toString() {
- return "Batch(" +
- "baseOffset=" + baseOffset +
- ", epoch=" + epoch +
- ", records=" + records +
- ')';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Batch> batch = (Batch>) o;
- return baseOffset == batch.baseOffset &&
- epoch == batch.epoch &&
- Objects.equals(records, batch.records);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(baseOffset, epoch, records);
- }
- }
-
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 8bfad3abc71af..0c277fb661c13 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -159,6 +159,8 @@ public String toString() {
", epoch=" + epoch +
", leaderId=" + leaderId +
", voters=" + voters +
+ ", highWatermark=" + highWatermark +
+ ", fetchingSnapshot=" + fetchingSnapshot +
')';
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index a3dbbdd3a66fd..e010122e5af38 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -72,6 +72,7 @@
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
@@ -137,7 +138,6 @@
* than the leader's log start offset. This API is similar to the Fetch API since the snapshot is stored
* as FileRecords, but we use {@link UnalignedRecords} in FetchSnapshotResponse because the records
* are not necessarily offset-aligned.
- *
*/
public class KafkaRaftClient implements RaftClient {
private static final int RETRY_BACKOFF_BASE_MS = 100;
@@ -311,8 +311,19 @@ private void updateListenersProgress(long highWatermark) {
private void updateListenersProgress(List listenerContexts, long highWatermark) {
for (ListenerContext listenerContext : listenerContexts) {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
- if (nextExpectedOffset < log.startOffset()) {
- listenerContext.fireHandleSnapshot(log.startOffset());
+ if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
+ SnapshotReader snapshot = latestSnapshot().orElseThrow(() -> {
+ return new IllegalStateException(
+ String.format(
+ "Snapshot expected since next offset of %s is %s, log start offset is %s and high-watermark is %s",
+ listenerContext.listener.getClass().getTypeName(),
+ nextExpectedOffset,
+ log.startOffset(),
+ highWatermark
+ )
+ );
+ });
+ listenerContext.fireHandleSnapshot(snapshot);
}
});
@@ -326,6 +337,16 @@ private void updateListenersProgress(List listenerContexts, lon
}
}
+ private Optional> latestSnapshot() {
+ return log.latestSnapshotId().flatMap(snapshotId ->
+ log
+ .readSnapshot(snapshotId)
+ .map(reader ->
+ SnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES)
+ )
+ );
+ }
+
private void maybeFireHandleCommit(long baseOffset, int epoch, List records) {
for (ListenerContext listenerContext : listenerContexts) {
OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset();
@@ -355,24 +376,28 @@ private void fireHandleResign(int epoch) {
}
@Override
- public void initialize() throws IOException {
- quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()));
+ public void initialize() {
+ try {
+ quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()));
- long currentTimeMs = time.milliseconds();
- if (quorum.isLeader()) {
- throw new IllegalStateException("Voter cannot initialize as a Leader");
- } else if (quorum.isCandidate()) {
- onBecomeCandidate(currentTimeMs);
- } else if (quorum.isFollower()) {
- onBecomeFollower(currentTimeMs);
- }
+ long currentTimeMs = time.milliseconds();
+ if (quorum.isLeader()) {
+ throw new IllegalStateException("Voter cannot initialize as a Leader");
+ } else if (quorum.isCandidate()) {
+ onBecomeCandidate(currentTimeMs);
+ } else if (quorum.isFollower()) {
+ onBecomeFollower(currentTimeMs);
+ }
- // When there is only a single voter, become candidate immediately
- if (quorum.isVoter()
- && quorum.remoteVoters().isEmpty()
- && !quorum.isLeader()
- && !quorum.isCandidate()) {
- transitionToCandidate(currentTimeMs);
+ // When there is only a single voter, become candidate immediately
+ if (quorum.isVoter()
+ && quorum.remoteVoters().isEmpty()
+ && !quorum.isCandidate()) {
+
+ transitionToCandidate(currentTimeMs);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@@ -1131,6 +1156,7 @@ private boolean handleFetchResponse(
if (records.sizeInBytes() > 0) {
appendAsFollower(records);
}
+
OptionalLong highWatermark = partitionResponse.highWatermark() < 0 ?
OptionalLong.empty() : OptionalLong.of(partitionResponse.highWatermark());
updateFollowerHighWatermark(state, highWatermark);
@@ -1260,7 +1286,8 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(
}
try (RawSnapshotReader snapshot = snapshotOpt.get()) {
- if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshot.sizeInBytes()) {
+ long snapshotSize = snapshot.sizeInBytes();
+ if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshotSize) {
return FetchSnapshotResponse.singleton(
log.topicPartition(),
responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot)
@@ -1268,20 +1295,25 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(
);
}
+ if (partitionSnapshot.position() > Integer.MAX_VALUE) {
+ throw new IllegalStateException(
+ String.format(
+ "Trying to fetch a snapshot with size (%s) and a position (%s) larger than %s",
+ snapshotSize,
+ partitionSnapshot.position(),
+ Integer.MAX_VALUE
+ )
+ );
+ }
+
int maxSnapshotSize;
try {
- maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
+ maxSnapshotSize = Math.toIntExact(snapshotSize);
} catch (ArithmeticException e) {
maxSnapshotSize = Integer.MAX_VALUE;
}
- if (partitionSnapshot.position() > Integer.MAX_VALUE) {
- throw new IllegalStateException(String.format("Trying to fetch a snapshot with position: %d lager than Int.MaxValue", partitionSnapshot.position()));
- }
-
- UnalignedRecords records = snapshot.read(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));
-
- long snapshotSize = snapshot.sizeInBytes();
+ UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize));
return FetchSnapshotResponse.singleton(
log.topicPartition(),
@@ -1386,10 +1418,15 @@ private boolean handleFetchSnapshotResponse(
);
}
- if (!(partitionSnapshot.unalignedRecords() instanceof MemoryRecords)) {
+ final UnalignedMemoryRecords records;
+ if (partitionSnapshot.unalignedRecords() instanceof MemoryRecords) {
+ records = new UnalignedMemoryRecords(((MemoryRecords) partitionSnapshot.unalignedRecords()).buffer());
+ } else if (partitionSnapshot.unalignedRecords() instanceof UnalignedMemoryRecords) {
+ records = (UnalignedMemoryRecords) partitionSnapshot.unalignedRecords();
+ } else {
throw new IllegalStateException(String.format("Received unexpected fetch snapshot response: %s", partitionSnapshot));
}
- snapshot.append(new UnalignedMemoryRecords(((MemoryRecords) partitionSnapshot.unalignedRecords()).buffer()));
+ snapshot.append(records);
if (snapshot.sizeInBytes() == partitionSnapshot.size()) {
// Finished fetching the snapshot.
@@ -2103,7 +2140,7 @@ private long pollUnattachedAsObserver(UnattachedState state, long currentTimeMs)
}
private long pollCurrentState(long currentTimeMs) throws IOException {
- maybeUpdateOldestSnapshotId();
+ maybeDeleteBeforeSnapshot();
if (quorum.isLeader()) {
return pollLeader(currentTimeMs);
@@ -2167,8 +2204,14 @@ private boolean maybeCompleteShutdown(long currentTimeMs) {
return false;
}
- private void maybeUpdateOldestSnapshotId() {
- log.latestSnapshotId().ifPresent(log::deleteBeforeSnapshot);
+ private void maybeDeleteBeforeSnapshot() {
+ log.latestSnapshotId().ifPresent(snapshotId -> {
+ quorum.highWatermark().ifPresent(highWatermark -> {
+ if (highWatermark.offset >= snapshotId.offset) {
+ log.deleteBeforeSnapshot(snapshotId);
+ }
+ });
+ });
}
private void wakeup() {
@@ -2257,7 +2300,7 @@ public CompletableFuture shutdown(int timeoutMs) {
}
@Override
- public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException {
+ public SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) {
return new SnapshotWriter<>(
log.createSnapshot(snapshotId),
MAX_BATCH_SIZE_BYTES,
@@ -2365,14 +2408,16 @@ public synchronized OptionalLong nextExpectedOffset() {
}
/**
- * This API is used when the Listener needs to be notified of a new Snapshot. This happens
- * when the context last acked end offset is less that then log start offset.
+ * This API is used when the Listener needs to be notified of a new snapshot. This happens
+ * when the context's next offset is less than the log start offset.
*/
- public void fireHandleSnapshot(long logStartOffset) {
+ public void fireHandleSnapshot(SnapshotReader reader) {
synchronized (this) {
- nextOffset = logStartOffset;
+ nextOffset = reader.snapshotId().offset;
lastSent = null;
}
+
+ listener.handleSnapshot(reader);
}
/**
@@ -2382,10 +2427,16 @@ public void fireHandleSnapshot(long logStartOffset) {
* data in memory, we let the state machine read the records from disk.
*/
public void fireHandleCommit(long baseOffset, Records records) {
- BufferSupplier bufferSupplier = BufferSupplier.create();
- RecordsBatchReader reader = new RecordsBatchReader<>(baseOffset, records,
- serde, bufferSupplier, this);
- fireHandleCommit(reader);
+ fireHandleCommit(
+ RecordsBatchReader.of(
+ baseOffset,
+ records,
+ serde,
+ BufferSupplier.create(),
+ MAX_BATCH_SIZE_BYTES,
+ this
+ )
+ );
}
/**
@@ -2396,7 +2447,7 @@ public void fireHandleCommit(long baseOffset, Records records) {
* followers.
*/
public void fireHandleCommit(long baseOffset, int epoch, List records) {
- BatchReader.Batch batch = new BatchReader.Batch<>(baseOffset, epoch, records);
+ Batch batch = Batch.of(baseOffset, epoch, records);
MemoryBatchReader reader = new MemoryBatchReader<>(Collections.singletonList(batch), this);
fireHandleCommit(reader);
}
@@ -2425,6 +2476,7 @@ void fireHandleResign(int epoch) {
public synchronized void onClose(BatchReader reader) {
OptionalLong lastOffset = reader.lastOffset();
+
if (lastOffset.isPresent()) {
nextOffset = lastOffset.getAsLong() + 1;
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 762769b12c51f..ac5c4ba1df05f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -300,12 +300,13 @@ else if (!that.endOffset.isPresent())
@Override
public String toString() {
- return "ReplicaState(" +
- "nodeId=" + nodeId +
- ", endOffset=" + endOffset +
- ", lastFetchTimestamp=" + lastFetchTimestamp +
- ", hasAcknowledgedLeader=" + hasAcknowledgedLeader +
- ')';
+ return String.format(
+ "ReplicaState(nodeId=%s, endOffset=%s, lastFetchTimestamp=%s, hasAcknowledgedLeader=%s)",
+ nodeId,
+ endOffset,
+ lastFetchTimestamp,
+ hasAcknowledgedLeader
+ );
}
}
@@ -318,11 +319,14 @@ public boolean canGrantVote(int candidateId, boolean isLogUpToDate) {
@Override
public String toString() {
- return "Leader(" +
- "localId=" + localId +
- ", epoch=" + epoch +
- ", epochStartOffset=" + epochStartOffset +
- ')';
+ return String.format(
+ "Leader(localId=%s, epoch=%s, epochStartOffset=%s, highWatermark=%s, voterStates=%s)",
+ localId,
+ epoch,
+ epochStartOffset,
+ highWatermark,
+ voterStates
+ );
}
@Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 74488b450ede1..7293f796eb772 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -16,19 +16,18 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
-import java.io.Closeable;
-import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-public interface RaftClient extends Closeable {
+public interface RaftClient extends AutoCloseable {
interface Listener {
/**
* Callback which is invoked for all records committed to the log.
- * It is the responsibility of the caller to invoke {@link BatchReader#close()}
+ * It is the responsibility of this implementation to invoke {@link BatchReader#close()}
* after consuming the reader.
*
* Note that there is not a one-to-one correspondence between writes through
@@ -44,6 +43,18 @@ interface Listener {
*/
void handleCommit(BatchReader reader);
+ /**
+ * Callback which is invoked when the Listener needs to load a snapshot.
+ * It is the responsibility of this implementation to invoke {@link SnapshotReader#close()}
+ * after consuming the reader.
+ *
+ * When handling this call, the implementation must assume that all previous calls
+ * to {@link #handleCommit} contain invalid data.
+ *
+ * @param reader snapshot reader instance which must be iterated and closed
+ */
+ void handleSnapshot(SnapshotReader reader);
+
/**
* Invoked after this node has become a leader. This is only called after
* all commits up to the start of the leader's epoch have been sent to
@@ -66,12 +77,9 @@ default void handleResign(int epoch) {}
}
/**
- * Initialize the client.
- * This should only be called once on startup.
- *
- * @throws IOException For any IO errors during initialization
+ * Initialize the client. This should only be called once on startup.
*/
- void initialize() throws IOException;
+ void initialize();
/**
* Register a listener to get commit/leader notifications.
@@ -157,5 +165,5 @@ default void handleResign(int epoch) {}
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return a writable snapshot
*/
- SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException;
+ SnapshotWriter createSnapshot(OffsetAndEpoch snapshotId);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 3db4d736a53f7..634ce37392f17 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -18,9 +18,11 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
-import java.util.Optional;
+import java.util.OptionalInt;
import static java.util.Collections.singletonList;
@@ -28,10 +30,12 @@ public class ReplicatedCounter implements RaftClient.Listener {
private final int nodeId;
private final Logger log;
private final RaftClient client;
+ private final int snapshotDelayInRecords = 10;
- private int committed;
- private int uncommitted;
- private Optional claimedEpoch;
+ private int committed = 0;
+ private int uncommitted = 0;
+ private OptionalInt claimedEpoch = OptionalInt.empty();
+ private long lastSnapshotEndOffset = 0;
public ReplicatedCounter(
int nodeId,
@@ -40,11 +44,7 @@ public ReplicatedCounter(
) {
this.nodeId = nodeId;
this.client = client;
- this.log = logContext.logger(ReplicatedCounter.class);
-
- this.committed = 0;
- this.uncommitted = 0;
- this.claimedEpoch = Optional.empty();
+ log = logContext.logger(ReplicatedCounter.class);
}
public synchronized boolean isWritable() {
@@ -56,7 +56,7 @@ public synchronized void increment() {
throw new KafkaException("Counter is not currently writable");
}
- int epoch = claimedEpoch.get();
+ int epoch = claimedEpoch.getAsInt();
uncommitted += 1;
Long offset = client.scheduleAppend(epoch, singletonList(uncommitted));
if (offset != null) {
@@ -68,20 +68,72 @@ public synchronized void increment() {
@Override
public synchronized void handleCommit(BatchReader reader) {
try {
- int initialValue = this.committed;
+ int initialCommitted = committed;
+ long nextReadOffset = 0;
+ int readEpoch = 0;
+
while (reader.hasNext()) {
- BatchReader.Batch batch = reader.next();
- log.debug("Handle commit of batch with records {} at base offset {}",
- batch.records(), batch.baseOffset());
- for (Integer value : batch.records()) {
- if (value != this.committed + 1) {
- throw new AssertionError("Expected next committed value to be " +
- (this.committed + 1) + ", but instead found " + value + " on node " + nodeId);
+ Batch batch = reader.next();
+ log.debug(
+ "Handle commit of batch with records {} at base offset {}",
+ batch.records(),
+ batch.baseOffset()
+ );
+ for (Integer nextCommitted: batch.records()) {
+ if (nextCommitted != committed + 1) {
+ throw new AssertionError(
+ String.format(
+ "Expected next committed value to be %s, but instead found %s on node %s",
+ committed + 1,
+ nextCommitted,
+ nodeId
+ )
+ );
}
- this.committed = value;
+ committed = nextCommitted;
+ }
+
+ nextReadOffset = batch.lastOffset() + 1;
+ readEpoch = batch.epoch();
+ }
+ log.debug("Counter incremented from {} to {}", initialCommitted, committed);
+
+ if (lastSnapshotEndOffset + snapshotDelayInRecords < nextReadOffset) {
+ log.debug("Generating new snapshot at {} since next commit offset is {}", lastSnapshotEndOffset, nextReadOffset);
+ try (SnapshotWriter snapshot = client.createSnapshot(new OffsetAndEpoch(nextReadOffset, readEpoch))) {
+ snapshot.append(singletonList(committed));
+ snapshot.freeze();
+ lastSnapshotEndOffset = nextReadOffset;
}
}
- log.debug("Counter incremented from {} to {}", initialValue, committed);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Override
+ public synchronized void handleSnapshot(SnapshotReader reader) {
+ try {
+ log.debug("Loading snapshot {}", reader.snapshotId());
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ if (batch.records().size() != 1) {
+ throw new AssertionError(
+ String.format(
+ "Expected the snapshot at %s to only contain one record %s",
+ reader.snapshotId(),
+ batch.records()
+ )
+ );
+ }
+
+ for (Integer value : batch) {
+ log.debug("Setting value: {}", value);
+ committed = value;
+ uncommitted = value;
+ }
+ }
+ log.debug("Finished loading snapshot. Set value: {}", committed);
} finally {
reader.close();
}
@@ -91,15 +143,14 @@ public synchronized void handleCommit(BatchReader reader) {
public synchronized void handleClaim(int epoch) {
log.debug("Counter uncommitted value initialized to {} after claiming leadership in epoch {}",
committed, epoch);
- this.uncommitted = committed;
- this.claimedEpoch = Optional.of(epoch);
+ uncommitted = committed;
+ claimedEpoch = OptionalInt.of(epoch);
}
@Override
public synchronized void handleResign(int epoch) {
log.debug("Counter uncommitted value reset after resigning leadership");
this.uncommitted = -1;
- this.claimedEpoch = Optional.empty();
+ this.claimedEpoch = OptionalInt.empty();
}
-
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index 198a2c630a79f..6a6b74b10acbf 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -22,11 +22,9 @@
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
-import java.io.Closeable;
-import java.io.IOException;
import java.util.Optional;
-public interface ReplicatedLog extends Closeable {
+public interface ReplicatedLog extends AutoCloseable {
/**
* Write a set of records to the local leader log. These messages will either
@@ -231,12 +229,13 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
/**
* Create a writable snapshot for the given snapshot id.
*
- * See {@link RawSnapshotWriter} for details on how to use this object.
+ * See {@link RawSnapshotWriter} for details on how to use this object. The caller of
+ * this method is responsible for invoking {@link RawSnapshotWriter#close()}.
*
* @param snapshotId the end offset and epoch that identifies the snapshot
* @return a writable snapshot
*/
- RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId) throws IOException;
+ RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId);
/**
* Opens a readable snapshot for the given snapshot id.
@@ -249,7 +248,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
* @return an Optional with a readable snapshot, if the snapshot exists, otherwise
* returns an empty Optional
*/
- Optional readSnapshot(OffsetAndEpoch snapshotId) throws IOException;
+ Optional readSnapshot(OffsetAndEpoch snapshotId);
/**
* Returns the latest snapshot id if one exists.
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/MemoryBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/MemoryBatchReader.java
index 2ff2b8cdda7b6..07ad1bbcb4533 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/MemoryBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/MemoryBatchReader.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft.internals;
+import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import java.util.Iterator;
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
index 0817138e3ff0a..a55815d0b5e5e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
@@ -16,136 +16,59 @@
*/
package org.apache.kafka.raft.internals;
-import org.apache.kafka.common.protocol.DataInputStreamReadable;
-import org.apache.kafka.common.protocol.Readable;
-import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.common.record.DefaultRecordBatch;
-import org.apache.kafka.common.record.FileRecords;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.RecordSerde;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.OptionalLong;
-public class RecordsBatchReader implements BatchReader {
+public final class RecordsBatchReader implements BatchReader {
private final long baseOffset;
- private final Records records;
- private final RecordSerde serde;
- private final BufferSupplier bufferSupplier;
+ private final RecordsIterator iterator;
private final CloseListener> closeListener;
- private Iterator batchIterator;
private long lastReturnedOffset;
- private Batch nextBatch;
+
+ private Optional> nextBatch = Optional.empty();
private boolean isClosed = false;
- private ByteBuffer allocatedBuffer = null;
- public RecordsBatchReader(
+ private RecordsBatchReader(
long baseOffset,
- Records records,
- RecordSerde serde,
- BufferSupplier bufferSupplier,
+ RecordsIterator iterator,
CloseListener> closeListener
) {
this.baseOffset = baseOffset;
- this.records = records;
- this.serde = serde;
- this.bufferSupplier = bufferSupplier;
+ this.iterator = iterator;
this.closeListener = closeListener;
this.lastReturnedOffset = baseOffset;
}
- private void materializeIterator() throws IOException {
- if (records instanceof MemoryRecords) {
- batchIterator = ((MemoryRecords) records).batchIterator();
- } else if (records instanceof FileRecords) {
- this.allocatedBuffer = bufferSupplier.get(records.sizeInBytes());
- ((FileRecords) records).readInto(allocatedBuffer, 0);
- MemoryRecords memRecords = MemoryRecords.readableRecords(allocatedBuffer);
- batchIterator = memRecords.batchIterator();
- } else {
- throw new IllegalStateException("Unexpected Records type " + records.getClass());
- }
- }
-
- private void findNextDataBatch() {
- if (batchIterator == null) {
- try {
- materializeIterator();
- } catch (IOException e) {
- throw new RuntimeException("Failed to read records into memory", e);
- }
- }
-
- while (batchIterator.hasNext()) {
- MutableRecordBatch nextBatch = batchIterator.next();
- if (!(nextBatch instanceof DefaultRecordBatch)) {
- throw new IllegalStateException();
- }
-
- DefaultRecordBatch batch = (DefaultRecordBatch) nextBatch;
- if (!batch.isControlBatch()) {
- this.nextBatch = readBatch(batch);
- return;
- } else {
- this.lastReturnedOffset = batch.lastOffset();
- }
- }
- }
-
- private Batch readBatch(DefaultRecordBatch batch) {
- Integer numRecords = batch.countOrNull();
- if (numRecords == null) {
- throw new IllegalStateException();
- }
-
- List records = new ArrayList<>(numRecords);
- try (DataInputStreamReadable input = new DataInputStreamReadable(
- batch.recordInputStream(bufferSupplier))) {
- for (int i = 0; i < numRecords; i++) {
- T record = readRecord(input);
- records.add(record);
- }
- return new Batch<>(
- batch.baseOffset(),
- batch.partitionLeaderEpoch(),
- records
- );
- }
- }
-
@Override
public boolean hasNext() {
- if (nextBatch != null) {
- return true;
- } else {
- findNextDataBatch();
- return nextBatch != null;
+ ensureOpen();
+
+ if (!nextBatch.isPresent()) {
+ nextBatch = nextBatch();
}
+
+ return nextBatch.isPresent();
}
@Override
public Batch next() {
- if (nextBatch != null) {
- Batch res = nextBatch;
- nextBatch = null;
- lastReturnedOffset = res.lastOffset();
- return res;
- } else {
- findNextDataBatch();
- if (nextBatch == null) {
- throw new NoSuchElementException();
- }
- return next();
+ if (!hasNext()) {
+ throw new NoSuchElementException("Records batch reader doesn't have any more elements");
}
+
+ Batch batch = nextBatch.get();
+ nextBatch = Optional.empty();
+
+ lastReturnedOffset = batch.lastOffset();
+ return batch;
}
@Override
@@ -163,48 +86,46 @@ public OptionalLong lastOffset() {
@Override
public void close() {
- isClosed = true;
+ if (!isClosed) {
+ isClosed = true;
- if (allocatedBuffer != null) {
- bufferSupplier.release(allocatedBuffer);
+ iterator.close();
+ closeListener.onClose(this);
}
-
- closeListener.onClose(this);
}
- public T readRecord(Readable input) {
- // Read size of body in bytes
- input.readVarint();
-
- // Read unused attributes
- input.readByte();
-
- long timestampDelta = input.readVarlong();
- if (timestampDelta != 0) {
- throw new IllegalArgumentException();
- }
-
- // Read offset delta
- input.readVarint();
-
- int keySize = input.readVarint();
- if (keySize != -1) {
- throw new IllegalArgumentException("Unexpected key size " + keySize);
- }
+ public static RecordsBatchReader of(
+ long baseOffset,
+ Records records,
+ RecordSerde serde,
+ BufferSupplier bufferSupplier,
+ int maxBatchSize,
+ CloseListener> closeListener
+ ) {
+ return new RecordsBatchReader<>(
+ baseOffset,
+ new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize),
+ closeListener
+ );
+ }
- int valueSize = input.readVarint();
- if (valueSize < 0) {
- throw new IllegalArgumentException();
+ private void ensureOpen() {
+ if (isClosed) {
+ throw new IllegalStateException("Records batch reader was closed");
}
+ }
- T record = serde.read(input, valueSize);
+ private Optional> nextBatch() {
+ while (iterator.hasNext()) {
+ Batch batch = iterator.next();
- int numHeaders = input.readVarint();
- if (numHeaders != 0) {
- throw new IllegalArgumentException();
+ if (batch.records().isEmpty()) {
+ lastReturnedOffset = batch.lastOffset();
+ } else {
+ return Optional.of(batch);
+ }
}
- return record;
+ return Optional.empty();
}
-
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
new file mode 100644
index 0000000000000..46155b57ee4f6
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class RecordsIterator implements Iterator>, AutoCloseable {
+ private final Records records;
+ private final RecordSerde serde;
+ private final BufferSupplier bufferSupplier;
+ private final int batchSize;
+
+ private Iterator nextBatches = Collections.emptyIterator();
+ private Optional> nextBatch = Optional.empty();
+ // Buffer used as the backing store for nextBatches if needed
+ private Optional allocatedBuffer = Optional.empty();
+ // Number of bytes from records read up to now
+ private int bytesRead = 0;
+ private boolean isClosed = false;
+
+ public RecordsIterator(
+ Records records,
+ RecordSerde serde,
+ BufferSupplier bufferSupplier,
+ int batchSize
+ ) {
+ this.records = records;
+ this.serde = serde;
+ this.bufferSupplier = bufferSupplier;
+ this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC);
+ }
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+
+ if (!nextBatch.isPresent()) {
+ nextBatch = nextBatch();
+ }
+
+ return nextBatch.isPresent();
+ }
+
+ @Override
+ public Batch next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("Batch iterator doesn't have any more elements");
+ }
+
+ Batch batch = nextBatch.get();
+ nextBatch = Optional.empty();
+
+ return batch;
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ allocatedBuffer.ifPresent(bufferSupplier::release);
+ allocatedBuffer = Optional.empty();
+ }
+
+ private void ensureOpen() {
+ if (isClosed) {
+ throw new IllegalStateException("Serde record batch itererator was closed");
+ }
+ }
+
+ private MemoryRecords readFileRecords(FileRecords fileRecords, ByteBuffer buffer) {
+ int start = buffer.position();
+ try {
+ fileRecords.readInto(buffer, bytesRead);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read records into memory", e);
+ }
+
+ bytesRead += buffer.limit() - start;
+ return MemoryRecords.readableRecords(buffer.slice());
+ }
+
+ private MemoryRecords createMemoryRecords(FileRecords fileRecords) {
+ final ByteBuffer buffer;
+ if (allocatedBuffer.isPresent()) {
+ buffer = allocatedBuffer.get();
+ buffer.compact();
+ } else {
+ buffer = bufferSupplier.get(Math.min(batchSize, records.sizeInBytes()));
+ allocatedBuffer = Optional.of(buffer);
+ }
+
+ MemoryRecords memoryRecords = readFileRecords(fileRecords, buffer);
+
+ // firstBatchSize() is always non-null because the minimum buffer is HEADER_SIZE_UP_TO_MAGIC.
+ if (memoryRecords.firstBatchSize() <= buffer.remaining()) {
+ return memoryRecords;
+ } else {
+ // Not enough bytes read; create a bigger buffer
+ ByteBuffer newBuffer = bufferSupplier.get(memoryRecords.firstBatchSize());
+ allocatedBuffer = Optional.of(newBuffer);
+
+ newBuffer.put(buffer);
+ bufferSupplier.release(buffer);
+
+ return readFileRecords(fileRecords, newBuffer);
+ }
+ }
+
+ private Iterator nextBatches() {
+ int recordSize = records.sizeInBytes();
+ if (bytesRead < recordSize) {
+ final MemoryRecords memoryRecords;
+ if (records instanceof MemoryRecords) {
+ bytesRead = recordSize;
+ memoryRecords = (MemoryRecords) records;
+ } else if (records instanceof FileRecords) {
+ memoryRecords = createMemoryRecords((FileRecords) records);
+ } else {
+ throw new IllegalStateException(String.format("Unexpected Records type %s", records.getClass()));
+ }
+
+ return memoryRecords.batchIterator();
+ }
+
+ return Collections.emptyIterator();
+ }
+
+ private Optional> nextBatch() {
+ if (!nextBatches.hasNext()) {
+ nextBatches = nextBatches();
+ }
+
+ if (nextBatches.hasNext()) {
+ MutableRecordBatch nextBatch = nextBatches.next();
+
+ // Update the buffer position to reflect the read batch
+ allocatedBuffer.ifPresent(buffer -> buffer.position(buffer.position() + nextBatch.sizeInBytes()));
+
+ if (!(nextBatch instanceof DefaultRecordBatch)) {
+ throw new IllegalStateException(
+ String.format("DefaultRecordBatch expected by record type was %s", nextBatch.getClass())
+ );
+ }
+
+ return Optional.of(readBatch((DefaultRecordBatch) nextBatch));
+ }
+
+ return Optional.empty();
+ }
+
+ private Batch readBatch(DefaultRecordBatch batch) {
+ final Batch result;
+ if (batch.isControlBatch()) {
+ result = Batch.empty(batch.baseOffset(), batch.partitionLeaderEpoch(), batch.lastOffset());
+ } else {
+ Integer numRecords = batch.countOrNull();
+ if (numRecords == null) {
+ throw new IllegalStateException("Expected a record count for the records batch");
+ }
+
+ List records = new ArrayList<>(numRecords);
+ try (DataInputStreamReadable input = new DataInputStreamReadable(batch.recordInputStream(bufferSupplier))) {
+ for (int i = 0; i < numRecords; i++) {
+ T record = readRecord(input);
+ records.add(record);
+ }
+ }
+
+ result = Batch.of(batch.baseOffset(), batch.partitionLeaderEpoch(), records);
+ }
+
+ return result;
+ }
+
+ private T readRecord(Readable input) {
+ // Read size of body in bytes
+ input.readVarint();
+
+ // Read unused attributes
+ input.readByte();
+
+ long timestampDelta = input.readVarlong();
+ if (timestampDelta != 0) {
+ throw new IllegalArgumentException();
+ }
+
+ // Read offset delta
+ input.readVarint();
+
+ int keySize = input.readVarint();
+ if (keySize != -1) {
+ throw new IllegalArgumentException("Unexpected key size " + keySize);
+ }
+
+ int valueSize = input.readVarint();
+ if (valueSize < 0) {
+ throw new IllegalArgumentException();
+ }
+
+ T record = serde.read(input, valueSize);
+
+ int numHeaders = input.readVarint();
+ if (numHeaders != 0) {
+ throw new IllegalArgumentException();
+ }
+
+ return record;
+ }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
index 1ca63f1b9c3cd..0bcf2c67055b1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
+++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
@@ -21,9 +21,11 @@
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.snapshot.SnapshotReader;
import java.util.List;
import java.util.stream.Collectors;
@@ -113,7 +115,7 @@ public void handleCommit(BatchReader reader) {
// not a leader. We want to move this IO to the state machine so that
// it does not block Raft replication
while (reader.hasNext()) {
- BatchReader.Batch batch = reader.next();
+ Batch batch = reader.next();
List records = batch.records().stream()
.map(ApiMessageAndVersion::message)
.collect(Collectors.toList());
@@ -124,6 +126,11 @@ public void handleCommit(BatchReader reader) {
}
}
+ @Override
+ public void handleSnapshot(SnapshotReader reader) {
+ reader.close();
+ }
+
@Override
public void handleClaim(int epoch) {
listener.handleNewLeader(new MetaLogLeader(nodeId, epoch));
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
index d0218c79cc427..820230ee64636 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
@@ -17,14 +17,12 @@
package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.FileRecords;
-import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedRecords;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import java.io.IOException;
import java.nio.file.Path;
-import java.util.Iterator;
public final class FileRawSnapshotReader implements RawSnapshotReader {
private final FileRecords fileRecords;
@@ -46,12 +44,13 @@ public long sizeInBytes() {
}
@Override
- public Iterator iterator() {
- return Utils.covariantCast(fileRecords.batchIterator());
+ public UnalignedRecords slice(long position, int size) {
+ return fileRecords.sliceUnaligned(Math.toIntExact(position), size);
}
- public UnalignedRecords read(long position, int size) {
- return fileRecords.sliceUnaligned(Math.toIntExact(position), size);
+ @Override
+ public Records records() {
+ return fileRecords;
}
@Override
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
index 6d0c17c4e051d..e259aefaba962 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
@@ -54,28 +54,32 @@ public OffsetAndEpoch snapshotId() {
}
@Override
- public long sizeInBytes() throws IOException {
- return channel.size();
+ public long sizeInBytes() {
+ try {
+ return channel.size();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
- public void append(UnalignedMemoryRecords records) throws IOException {
- if (frozen) {
- throw new IllegalStateException(
- String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
- );
+ public void append(UnalignedMemoryRecords records) {
+ try {
+ checkIfFrozen("Append");
+ Utils.writeFully(channel, records.buffer());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- Utils.writeFully(channel, records.buffer());
}
@Override
- public void append(MemoryRecords records) throws IOException {
- if (frozen) {
- throw new IllegalStateException(
- String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
- );
+ public void append(MemoryRecords records) {
+ try {
+ checkIfFrozen("Append");
+ Utils.writeFully(channel, records.buffer());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- Utils.writeFully(channel, records.buffer());
}
@Override
@@ -84,34 +88,57 @@ public boolean isFrozen() {
}
@Override
- public void freeze() throws IOException {
- if (frozen) {
- throw new IllegalStateException(
- String.format("Freeze is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
- );
- }
+ public void freeze() {
+ try {
+ checkIfFrozen("Freeze");
- channel.close();
- frozen = true;
+ channel.close();
+ frozen = true;
- // Set readonly and ignore the result
- if (!tempSnapshotPath.toFile().setReadOnly()) {
- throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
- }
+ if (!tempSnapshotPath.toFile().setReadOnly()) {
+ throw new IllegalStateException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath));
+ }
- Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
- Utils.atomicMoveWithFallback(tempSnapshotPath, destination);
+ Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+ Utils.atomicMoveWithFallback(tempSnapshotPath, destination);
- replicatedLog.ifPresent(log -> log.onSnapshotFrozen(snapshotId));
+ replicatedLog.ifPresent(log -> log.onSnapshotFrozen(snapshotId));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
- public void close() throws IOException {
+ public void close() {
try {
channel.close();
- } finally {
// This is a noop if freeze was called before calling close
Files.deleteIfExists(tempSnapshotPath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "FileRawSnapshotWriter(path=%s, snapshotId=%s, frozen=%s)",
+ tempSnapshotPath,
+ snapshotId,
+ frozen
+ );
+ }
+
+ void checkIfFrozen(String operation) {
+ if (frozen) {
+ throw new IllegalStateException(
+ String.format(
+ "%s is not supported. Snapshot is already frozen: id = %s; temp path = %s",
+ operation,
+ snapshotId,
+ tempSnapshotPath
+ )
+ );
}
}
@@ -126,14 +153,18 @@ public static FileRawSnapshotWriter create(
Path logDir,
OffsetAndEpoch snapshotId,
Optional replicatedLog
- ) throws IOException {
- Path path = Snapshots.createTempFile(logDir, snapshotId);
+ ) {
+ try {
+ Path path = Snapshots.createTempFile(logDir, snapshotId);
- return new FileRawSnapshotWriter(
- path,
- FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE, StandardOpenOption.APPEND)),
- snapshotId,
- replicatedLog
- );
+ return new FileRawSnapshotWriter(
+ path,
+ FileChannel.open(path, Utils.mkSet(StandardOpenOption.WRITE, StandardOpenOption.APPEND)),
+ snapshotId,
+ replicatedLog
+ );
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java
index 6d1ff28950b33..506728d10ae41 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java
@@ -16,17 +16,16 @@
*/
package org.apache.kafka.snapshot;
-import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
import java.io.Closeable;
-import java.io.IOException;
/**
* Interface for reading snapshots as a sequence of records.
*/
-public interface RawSnapshotReader extends Closeable, Iterable {
+public interface RawSnapshotReader extends Closeable {
/**
* Returns the end offset and epoch for the snapshot.
*/
@@ -34,20 +33,22 @@ public interface RawSnapshotReader extends Closeable, Iterable {
/**
* Returns the number of bytes for the snapshot.
- *
- * @throws IOException for any IO error while reading the size
*/
- long sizeInBytes() throws IOException;
+ long sizeInBytes();
/**
- * Reads bytes from position into the given buffer.
+ * Creates a slize of unaligned records from the position up to a size.
*
- * It is not guarantee that the given buffer will be filled.
+ * @param position the starting position of the slice in the snapshot
+ * @param size the maximum size of the slice
+ * @return an unaligned slice of records in the snapshot
+ */
+ UnalignedRecords slice(long position, int size);
+
+ /**
+ * Returns all of the records backing this snapshot reader.
*
- * @param size size to read from snapshot file
- * @param position the starting position in the snapshot to read
- * @return the region read from snapshot
- * @throws IOException for any IO error while reading the snapshot
+ * @return all of the records for this snapshot
*/
- UnalignedRecords read(long position, int size) throws IOException;
+ Records records();
}
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java
index f8ec58f17fcd2..07d8271e953f6 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java
@@ -21,13 +21,10 @@
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
-import java.io.Closeable;
-import java.io.IOException;
-
/**
* Interface for writing snapshot as a sequence of records.
*/
-public interface RawSnapshotWriter extends Closeable {
+public interface RawSnapshotWriter extends AutoCloseable {
/**
* Returns the end offset and epoch for the snapshot.
*/
@@ -35,10 +32,8 @@ public interface RawSnapshotWriter extends Closeable {
/**
* Returns the number of bytes for the snapshot.
- *
- * @throws IOException for any IO error while reading the size
*/
- long sizeInBytes() throws IOException;
+ long sizeInBytes();
/**
* Fully appends the memory record set to the snapshot.
@@ -47,9 +42,8 @@ public interface RawSnapshotWriter extends Closeable {
* snapshot.
*
* @param records the region to append
- * @throws IOException for any IO error during append
*/
- void append(MemoryRecords records) throws IOException;
+ void append(MemoryRecords records);
/**
* Fully appends the memory record set to the snapshot, the difference with {@link RawSnapshotWriter#append(MemoryRecords)}
@@ -59,9 +53,8 @@ public interface RawSnapshotWriter extends Closeable {
* snapshot.
*
* @param records the region to append
- * @throws IOException for any IO error during append
*/
- void append(UnalignedMemoryRecords records) throws IOException;
+ void append(UnalignedMemoryRecords records);
/**
* Returns true if the snapshot has been frozen, otherwise false is returned.
@@ -72,17 +65,13 @@ public interface RawSnapshotWriter extends Closeable {
/**
* Freezes the snapshot and marking it as immutable.
- *
- * @throws IOException for any IO error during freezing
*/
- void freeze() throws IOException;
+ void freeze();
/**
* Closes the snapshot writer.
*
* If close is called without first calling freeze the snapshot is aborted.
- *
- * @throws IOException for any IO error during close
*/
- void close() throws IOException;
+ void close();
}
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
new file mode 100644
index 0000000000000..af00cdb2286bf
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.util.Iterator;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.raft.internals.RecordsIterator;
+
+/**
+ * A type for reading an immutable snapshot.
+ *
+ * A snapshot reader can be used to scan through all of the objects T in a snapshot. It
+ * is assumed that the content of the snapshot represents all of the objects T for the topic
+ * partition from offset 0 up to but not including the end offset in the snapshot id.
+ */
+public final class SnapshotReader implements AutoCloseable, Iterator> {
+ private final OffsetAndEpoch snapshotId;
+ private final RecordsIterator iterator;
+
+ private SnapshotReader(
+ OffsetAndEpoch snapshotId,
+ RecordsIterator iterator
+ ) {
+ this.snapshotId = snapshotId;
+ this.iterator = iterator;
+ }
+
+ /**
+ * Returns the end offset and epoch for the snapshot.
+ */
+ public OffsetAndEpoch snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Batch next() {
+ return iterator.next();
+ }
+
+ /**
+ * Closes the snapshot reader.
+ */
+ public void close() {
+ iterator.close();
+ }
+
+ public static SnapshotReader of(
+ RawSnapshotReader snapshot,
+ RecordSerde serde,
+ BufferSupplier bufferSupplier,
+ int maxBatchSize
+ ) {
+ return new SnapshotReader<>(
+ snapshot.snapshotId(),
+ new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize)
+ );
+ }
+}
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
index 653859f2e43b8..e9b3c64e0362f 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
@@ -25,12 +25,10 @@
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
-import java.io.Closeable;
-import java.io.IOException;
import java.util.List;
/**
- * A type for writing a snapshot fora given end offset and epoch.
+ * A type for writing a snapshot for a given end offset and epoch.
*
* A snapshot writer can be used to append objects until freeze is called. When freeze is
* called the snapshot is validated and marked as immutable. After freeze is called any
@@ -42,7 +40,7 @@
*
* @see org.apache.kafka.raft.RaftClient#createSnapshot(OffsetAndEpoch)
*/
-final public class SnapshotWriter implements Closeable {
+final public class SnapshotWriter implements AutoCloseable {
final private RawSnapshotWriter snapshot;
final private BatchAccumulator accumulator;
final private Time time;
@@ -102,10 +100,9 @@ public boolean isFrozen() {
* The list of record passed are guaranteed to get written together.
*
* @param records the list of records to append to the snapshot
- * @throws IOException for any IO error while appending
* @throws IllegalStateException if append is called when isFrozen is true
*/
- public void append(List records) throws IOException {
+ public void append(List records) {
if (snapshot.isFrozen()) {
String message = String.format(
"Append not supported. Snapshot is already frozen: id = '%s'.",
@@ -124,10 +121,8 @@ public void append(List records) throws IOException {
/**
* Freezes the snapshot by flushing all pending writes and marking it as immutable.
- *
- * @throws IOException for any IO error during freezing
*/
- public void freeze() throws IOException {
+ public void freeze() {
appendBatches(accumulator.drain());
snapshot.freeze();
accumulator.close();
@@ -137,15 +132,13 @@ public void freeze() throws IOException {
* Closes the snapshot writer.
*
* If close is called without first calling freeze the snapshot is aborted.
- *
- * @throws IOException for any IO error during close
*/
- public void close() throws IOException {
+ public void close() {
snapshot.close();
accumulator.close();
}
- private void appendBatches(List> batches) throws IOException {
+ private void appendBatches(List> batches) {
try {
for (CompletedBatch batch : batches) {
snapshot.append(batch.data);
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 6af2f0d4b7a8f..6fd5147421165 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -31,8 +31,9 @@
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
-import org.apache.kafka.snapshot.SnapshotWriterTest;
+import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -47,6 +48,166 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
final public class KafkaRaftClientSnapshotTest {
+ @Test
+ public void testLeaderListernerNotified() throws Exception {
+ int localId = 0;
+ int otherNodeId = localId + 1;
+ Set voters = Utils.mkSet(localId, otherNodeId);
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c"))
+ .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f"))
+ .withEmptySnapshot(snapshotId)
+ .deleteBeforeSnapshot(snapshotId)
+ .build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ // Advance the highWatermark
+ long localLogEndOffset = context.log.endOffset().offset;
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
+ assertEquals(localLogEndOffset, context.client.highWatermark().getAsLong());
+
+ // Check that listener was notified of the new snapshot
+ try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
+ }
+ }
+
+ @Test
+ public void testFollowerListenerNotified() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c"))
+ .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f"))
+ .withEmptySnapshot(snapshotId)
+ .deleteBeforeSnapshot(snapshotId)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ // Advance the highWatermark
+ long localLogEndOffset = context.log.endOffset().offset;
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch);
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, localLogEndOffset, Errors.NONE)
+ );
+
+ context.pollUntilRequest();
+ context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch);
+
+ // Check that listener was notified of the new snapshot
+ try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
+ }
+ }
+
+ @Test
+ public void testSecondListenerNotified() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c"))
+ .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f"))
+ .withEmptySnapshot(snapshotId)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ // Advance the highWatermark
+ long localLogEndOffset = context.log.endOffset().offset;
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch);
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, localLogEndOffset, Errors.NONE)
+ );
+
+ context.pollUntilRequest();
+ context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch);
+
+ RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
+ context.client.register(secondListener);
+ context.client.poll();
+
+ // Check that the second listener was notified of the new snapshot
+ try (SnapshotReader snapshot = secondListener.drainHandledSnapshot().get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
+ }
+ }
+
+ @Test
+ public void testListenerRenotified() throws Exception {
+ int localId = 0;
+ int otherNodeId = localId + 1;
+ Set voters = Utils.mkSet(localId, otherNodeId);
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c"))
+ .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f"))
+ .appendToLog(snapshotId.epoch, Arrays.asList("g", "h", "i"))
+ .withEmptySnapshot(snapshotId)
+ .deleteBeforeSnapshot(snapshotId)
+ .build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ // Stop the listener from reading commit batches
+ context.listener.updateReadCommit(false);
+
+ // Advance the highWatermark
+ long localLogEndOffset = context.log.endOffset().offset;
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, localLogEndOffset, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId));
+ assertEquals(localLogEndOffset, context.client.highWatermark().getAsLong());
+
+ // Check that listener was notified of the new snapshot
+ try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
+ }
+
+ // Generate a new snapshot
+ OffsetAndEpoch secondSnapshot = new OffsetAndEpoch(localLogEndOffset, epoch);
+ try (SnapshotWriter snapshot = context.client.createSnapshot(secondSnapshot)) {
+ snapshot.freeze();
+ }
+ context.client.poll();
+
+ // Resume the listener from reading commit batches
+ context.listener.updateReadCommit(true);
+
+ context.client.poll();
+ // Check that listener was notified of the second snapshot
+ try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) {
+ assertEquals(secondSnapshot, snapshot.snapshotId());
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(), snapshot);
+ }
+ }
+
@Test
public void testFetchRequestOffsetLessThanLogStart() throws Exception {
int localId = 0;
@@ -128,7 +289,7 @@ public void testFetchRequestWithLargerLastFetchedEpoch() throws Exception {
}
context.client.poll();
- context.client.scheduleAppend(epoch, Arrays.asList("a", "b", "c"));
+ context.client.scheduleAppend(epoch, Arrays.asList("g", "h", "i"));
context.time.sleep(context.appendLingerMs());
context.client.poll();
@@ -412,7 +573,7 @@ public void testFetchSnapshotRequestAsLeader() throws Exception {
assertEquals(0, response.position());
assertEquals(snapshot.sizeInBytes(), response.unalignedRecords().sizeInBytes());
- UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.read(0, Math.toIntExact(snapshot.sizeInBytes()));
+ UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes()));
assertEquals(memoryRecords.buffer(), ((UnalignedMemoryRecords) response.unalignedRecords()).buffer());
}
@@ -456,7 +617,7 @@ public void testPartialFetchSnapshotRequestAsLeader() throws Exception {
assertEquals(0, response.position());
assertEquals(snapshot.sizeInBytes() / 2, response.unalignedRecords().sizeInBytes());
- UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.read(0, Math.toIntExact(snapshot.sizeInBytes()));
+ UnalignedMemoryRecords memoryRecords = (UnalignedMemoryRecords) snapshot.slice(0, Math.toIntExact(snapshot.sizeInBytes()));
ByteBuffer snapshotBuffer = memoryRecords.buffer();
ByteBuffer responseBuffer = ByteBuffer.allocate(Math.toIntExact(snapshot.sizeInBytes()));
@@ -747,9 +908,16 @@ public void testFetchResponseWithSnapshotId() throws Exception {
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset, snapshotId.epoch);
+ // Check that the snapshot was written to the log
try (RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get()) {
assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes());
- SnapshotWriterTest.assertSnapshot(Arrays.asList(records), snapshot);
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
+ }
+
+ // Check that listener was notified of the new snapshot
+ try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
}
}
@@ -844,9 +1012,16 @@ public void testFetchSnapshotResponsePartialData() throws Exception {
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset, snapshotId.epoch);
+ // Check that the snapshot was written to the log
try (RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get()) {
assertEquals(memorySnapshot.buffer().remaining(), snapshot.sizeInBytes());
- SnapshotWriterTest.assertSnapshot(Arrays.asList(records), snapshot);
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
+ }
+
+ // Check that listener was notified of the new snapshot
+ try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ SnapshotWriterReaderTest.assertSnapshot(Arrays.asList(records), snapshot);
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 55d4e16dc86e7..6322b0ac734f7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -2002,11 +2003,29 @@ public void testLeaderAppendSingleMemberQuorum() throws Exception {
// Now try reading it
int otherNodeId = 1;
- context.deliverRequest(context.fetchRequest(1, otherNodeId, 0L, 0, 500));
- context.pollUntilResponse();
+ List batches = new ArrayList<>(2);
+ boolean appended = true;
+
+ // Continue to fetch until the leader returns an empty response
+ while (appended) {
+ long fetchOffset = 0;
+ int lastFetchedEpoch = 0;
+ if (!batches.isEmpty()) {
+ MutableRecordBatch lastBatch = batches.get(batches.size() - 1);
+ fetchOffset = lastBatch.lastOffset() + 1;
+ lastFetchedEpoch = lastBatch.partitionLeaderEpoch();
+ }
+
+ context.deliverRequest(context.fetchRequest(1, otherNodeId, fetchOffset, lastFetchedEpoch, 0));
+ context.pollUntilResponse();
+
+ MemoryRecords fetchedRecords = context.assertSentFetchPartitionResponse(Errors.NONE, 1, OptionalInt.of(localId));
+ List fetchedBatch = Utils.toList(fetchedRecords.batchIterator());
+ batches.addAll(fetchedBatch);
+
+ appended = !fetchedBatch.isEmpty();
+ }
- MemoryRecords fetchedRecords = context.assertSentFetchPartitionResponse(Errors.NONE, 1, OptionalInt.of(localId));
- List batches = Utils.toList(fetchedRecords.batchIterator());
assertEquals(2, batches.size());
MutableRecordBatch leaderChangeBatch = batches.get(0);
@@ -2221,6 +2240,7 @@ public void testHandleClaimCallbackFiresAfterHighWatermarkReachesEpochStartOffse
List batch2 = Arrays.asList("4", "5", "6");
List batch3 = Arrays.asList("7", "8", "9");
+ List> expectedBatches = Arrays.asList(batch1, batch2, batch3);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(1, batch1)
.appendToLog(1, batch2)
@@ -2252,18 +2272,24 @@ public void testHandleClaimCallbackFiresAfterHighWatermarkReachesEpochStartOffse
// watermark advances and we can start sending committed data to the
// listener. Note that the `LeaderChange` control record is filtered.
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 500));
- context.client.poll();
- assertEquals(OptionalInt.empty(), context.listener.currentClaimedEpoch());
- assertEquals(3, context.listener.numCommittedBatches());
- assertEquals(batch1, context.listener.commitWithBaseOffset(0L));
- assertEquals(batch2, context.listener.commitWithBaseOffset(3L));
- assertEquals(batch3, context.listener.commitWithBaseOffset(6L));
- assertEquals(OptionalLong.of(8L), context.listener.lastCommitOffset());
+ context.pollUntil(() -> {
+ int committedBatches = context.listener.numCommittedBatches();
+ long baseOffset = 0;
+ for (int index = 0; index < committedBatches; index++) {
+ List expectedBatch = expectedBatches.get(index);
+ assertEquals(expectedBatch, context.listener.commitWithBaseOffset(baseOffset));
+ baseOffset += expectedBatch.size();
+ }
+
+ return context.listener.currentClaimedEpoch().isPresent();
+ });
- // Now that the listener has caught up to the start of the leader epoch,
- // we expect the `handleClaim` callback.
- context.client.poll();
assertEquals(OptionalInt.of(epoch), context.listener.currentClaimedEpoch());
+ // Note that last committed offset is inclusive, hence we subtract 1.
+ assertEquals(
+ OptionalLong.of(expectedBatches.stream().mapToInt(List::size).sum() - 1),
+ context.listener.lastCommitOffset()
+ );
}
@Test
@@ -2290,20 +2316,21 @@ public void testLateRegisteredListenerCatchesUp() throws Exception {
// Let the initial listener catch up
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L, epoch, 0));
- context.client.poll();
+ context.pollUntil(() -> OptionalInt.of(epoch).equals(context.listener.currentClaimedEpoch()));
assertEquals(OptionalLong.of(10L), context.client.highWatermark());
- context.client.poll();
+ assertEquals(OptionalLong.of(8L), context.listener.lastCommitOffset());
assertEquals(OptionalInt.of(epoch), context.listener.currentClaimedEpoch());
+ // Ensure that the `handleClaim` callback was not fired early
+ assertEquals(9L, context.listener.claimedEpochStartOffset(epoch));
// Register a second listener and allow it to catch up to the high watermark
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener();
context.client.register(secondListener);
- context.client.poll();
+ context.pollUntil(() -> OptionalInt.of(epoch).equals(secondListener.currentClaimedEpoch()));
assertEquals(OptionalLong.of(8L), secondListener.lastCommitOffset());
assertEquals(OptionalInt.of(epoch), context.listener.currentClaimedEpoch());
-
// Ensure that the `handleClaim` callback was not fired early
- assertEquals(9L, context.listener.claimedEpochStartOffset(epoch));
+ assertEquals(9L, secondListener.claimedEpochStartOffset(epoch));
}
@Test
@@ -2394,6 +2421,7 @@ public void testHandleCommitCallbackFiresInVotedState() throws Exception {
context.assertVotedCandidate(candidateEpoch, otherNodeId);
// Note the offset is 8 because the record at offset 9 is a control record
+ context.pollUntil(() -> secondListener.lastCommitOffset().equals(OptionalLong.of(8L)));
assertEquals(OptionalLong.of(8L), secondListener.lastCommitOffset());
assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch());
}
@@ -2443,6 +2471,7 @@ public void testHandleCommitCallbackFiresInCandidateState() throws Exception {
context.assertVotedCandidate(candidateEpoch, localId);
// Note the offset is 8 because the record at offset 9 is a control record
+ context.pollUntil(() -> secondListener.lastCommitOffset().equals(OptionalLong.of(8L)));
assertEquals(OptionalLong.of(8L), secondListener.lastCommitOffset());
assertEquals(OptionalInt.empty(), secondListener.currentClaimedEpoch());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 3ca50b14d9657..bf03a06ee8f4f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -37,7 +37,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -363,20 +362,26 @@ public LogFetchInfo read(long startOffset, Isolation isolation) {
}
ByteBuffer buffer = ByteBuffer.allocate(512);
- LogEntry firstEntry = null;
+ int batchCount = 0;
+ LogOffsetMetadata batchStartOffset = null;
for (LogBatch batch : batches) {
// Note that start offset is inclusive while max offset is exclusive. We only return
// complete batches, so batches which end at an offset larger than the max offset are
// filtered, which is effectively the same as having the consumer drop an incomplete
// batch returned in a fetch response.
- if (batch.lastOffset() >= startOffset) {
- if (batch.lastOffset() < maxOffset) {
- buffer = batch.writeTo(buffer);
+ if (batch.lastOffset() >= startOffset && batch.lastOffset() < maxOffset && !batch.entries.isEmpty()) {
+ buffer = batch.writeTo(buffer);
+
+ if (batchStartOffset == null) {
+ batchStartOffset = batch.entries.get(0).logOffsetMetadata();
}
- if (firstEntry == null && !batch.entries.isEmpty()) {
- firstEntry = batch.entries.get(0);
+ // Read on the mock log should return at most 2 batches. This is a simple solution
+ // for testing interesting partial read scenarios.
+ batchCount += 1;
+ if (batchCount >= 2) {
+ break;
}
}
}
@@ -384,12 +389,12 @@ public LogFetchInfo read(long startOffset, Isolation isolation) {
buffer.flip();
Records records = MemoryRecords.readableRecords(buffer);
- if (firstEntry == null) {
+ if (batchStartOffset == null) {
throw new RuntimeException("Expected to find at least one entry starting from offset " +
startOffset + " but found none");
}
- return new LogFetchInfo(records, firstEntry.logOffsetMetadata());
+ return new LogFetchInfo(records, batchStartOffset);
}
@Override
@@ -426,48 +431,50 @@ public Optional earliestSnapshotId() {
public void onSnapshotFrozen(OffsetAndEpoch snapshotId) {}
@Override
- public boolean deleteBeforeSnapshot(OffsetAndEpoch logStartSnapshotId) {
- if (logStartOffset() > logStartSnapshotId.offset ||
- highWatermark.offset < logStartSnapshotId.offset) {
-
+ public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
+ if (logStartOffset() > snapshotId.offset) {
+ throw new OffsetOutOfRangeException(
+ String.format(
+ "New log start (%s) is less than the curent log start offset (%s)",
+ snapshotId,
+ logStartOffset()
+ )
+ );
+ }
+ if (highWatermark.offset < snapshotId.offset) {
throw new OffsetOutOfRangeException(
String.format(
- "New log start (%s) is less than start offset (%s) or is greater than the high watermark (%s)",
- logStartSnapshotId,
- logStartOffset(),
+ "New log start (%s) is greater than the high watermark (%s)",
+ snapshotId,
highWatermark.offset
)
);
}
boolean updated = false;
- Optional snapshotIdOpt = latestSnapshotId();
- if (snapshotIdOpt.isPresent()) {
- OffsetAndEpoch snapshotId = snapshotIdOpt.get();
- if (startOffset() < logStartSnapshotId.offset &&
- highWatermark.offset >= logStartSnapshotId.offset &&
- snapshotId.offset >= logStartSnapshotId.offset) {
+ if (snapshots.containsKey(snapshotId)) {
+ snapshots.headMap(snapshotId, false).clear();
- snapshots.headMap(logStartSnapshotId, false).clear();
+ batches.removeIf(entry -> entry.lastOffset() < snapshotId.offset);
- batches.removeIf(entry -> entry.lastOffset() < logStartSnapshotId.offset);
-
- AtomicReference> last = new AtomicReference<>(Optional.empty());
- epochStartOffsets.removeIf(epochStartOffset -> {
- if (epochStartOffset.startOffset <= logStartSnapshotId.offset) {
- last.set(Optional.of(epochStartOffset));
- return true;
- }
+ AtomicReference> last = new AtomicReference<>(Optional.empty());
+ epochStartOffsets.removeIf(epochStartOffset -> {
+ if (epochStartOffset.startOffset <= snapshotId.offset) {
+ last.set(Optional.of(epochStartOffset));
+ return true;
+ }
- return false;
- });
+ return false;
+ });
- last.get().ifPresent(epochStartOffset -> {
- epochStartOffsets.add(0, new EpochStartOffset(epochStartOffset.epoch, logStartSnapshotId.offset));
- });
+ last.get().ifPresent(epochStartOffset -> {
+ epochStartOffsets.add(
+ 0,
+ new EpochStartOffset(epochStartOffset.epoch, snapshotId.offset)
+ );
+ });
- updated = true;
- }
+ updated = true;
}
return updated;
@@ -530,6 +537,16 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(metadata, offset, record);
}
+
+ @Override
+ public String toString() {
+ return String.format(
+ "LogEntry(metadata=%s, offset=%s, record=%s)",
+ metadata,
+ offset,
+ record
+ );
+ }
}
static class LogBatch {
@@ -582,6 +599,11 @@ ByteBuffer writeTo(ByteBuffer buffer) {
builder.close();
return builder.buffer();
}
+
+ @Override
+ public String toString() {
+ return String.format("LogBatch(entries=%s, epoch=%s, isControlBatch=%s)", entries, epoch, isControlBatch);
+ }
}
private static class EpochStartOffset {
@@ -676,18 +698,18 @@ public long sizeInBytes() {
}
@Override
- public Iterator iterator() {
- return Utils.covariantCast(data.batchIterator());
- }
-
- @Override
- public UnalignedRecords read(long position, int size) {
+ public UnalignedRecords slice(long position, int size) {
ByteBuffer buffer = data.buffer();
buffer.position(Math.toIntExact(position));
buffer.limit(Math.min(buffer.limit(), Math.toIntExact(position + size)));
return new UnalignedMemoryRecords(buffer.slice());
}
+ @Override
+ public Records records() {
+ return data;
+ }
+
@Override
public void close() {}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index f219dea729a02..492410b75a2dd 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -75,49 +75,6 @@ public void testTopicId() {
assertEquals(topicId, log.topicId());
}
- @Test
- public void testAppendAsLeaderHelper() {
- int epoch = 2;
- SimpleRecord recordOne = new SimpleRecord("one".getBytes());
- appendAsLeader(Collections.singleton(recordOne), epoch);
- assertEquals(epoch, log.lastFetchedEpoch());
- assertEquals(0L, log.startOffset());
- assertEquals(1L, log.endOffset().offset);
-
- Records records = log.read(0, Isolation.UNCOMMITTED).records;
- List extends RecordBatch> batches = Utils.toList(records.batches().iterator());
-
- RecordBatch batch = batches.get(0);
- assertEquals(0, batch.baseOffset());
- assertEquals(0, batch.lastOffset());
-
- List fetchedRecords = Utils.toList(batch.iterator());
- assertEquals(1, fetchedRecords.size());
- assertEquals(recordOne, new SimpleRecord(fetchedRecords.get(0)));
- assertEquals(0, fetchedRecords.get(0).offset());
-
- SimpleRecord recordTwo = new SimpleRecord("two".getBytes());
- SimpleRecord recordThree = new SimpleRecord("three".getBytes());
- appendAsLeader(Arrays.asList(recordTwo, recordThree), epoch);
- assertEquals(0L, log.startOffset());
- assertEquals(3L, log.endOffset().offset);
-
- records = log.read(0, Isolation.UNCOMMITTED).records;
- batches = Utils.toList(records.batches().iterator());
- assertEquals(2, batches.size());
-
- fetchedRecords = Utils.toList(records.records().iterator());
- assertEquals(3, fetchedRecords.size());
- assertEquals(recordOne, new SimpleRecord(fetchedRecords.get(0)));
- assertEquals(0, fetchedRecords.get(0).offset());
-
- assertEquals(recordTwo, new SimpleRecord(fetchedRecords.get(1)));
- assertEquals(1, fetchedRecords.get(1).offset());
-
- assertEquals(recordThree, new SimpleRecord(fetchedRecords.get(2)));
- assertEquals(2, fetchedRecords.get(2).offset());
- }
-
@Test
public void testTruncateTo() {
int epoch = 2;
@@ -174,28 +131,26 @@ public void testAssignEpochStartOffset() {
@Test
public void testAppendAsLeader() {
- SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
- final int currentEpoch = 3;
- final long initialOffset = log.endOffset().offset;
+ int epoch = 2;
+ SimpleRecord recordOne = new SimpleRecord("one".getBytes());
+ List expectedRecords = new ArrayList<>();
- log.appendAsLeader(
- MemoryRecords.withRecords(initialOffset, CompressionType.NONE, recordFoo),
- currentEpoch
- );
+ expectedRecords.add(recordOne);
+ appendAsLeader(Collections.singleton(recordOne), epoch);
- assertEquals(0, log.startOffset());
- assertEquals(1, log.endOffset().offset);
- assertEquals(currentEpoch, log.lastFetchedEpoch());
+ assertEquals(new OffsetAndEpoch(expectedRecords.size(), epoch), log.endOffsetForEpoch(epoch));
+ assertEquals(epoch, log.lastFetchedEpoch());
+ validateReadRecords(expectedRecords, log);
- Records records = log.read(0, Isolation.UNCOMMITTED).records;
- List extractRecords = new ArrayList<>();
- for (Record record : records.records()) {
- extractRecords.add(record.value());
- }
+ SimpleRecord recordTwo = new SimpleRecord("two".getBytes());
+ SimpleRecord recordThree = new SimpleRecord("three".getBytes());
+ expectedRecords.add(recordTwo);
+ expectedRecords.add(recordThree);
+ appendAsLeader(Arrays.asList(recordTwo, recordThree), epoch);
- assertEquals(1, extractRecords.size());
- assertEquals(recordFoo.value(), extractRecords.get(0));
- assertEquals(new OffsetAndEpoch(1, currentEpoch), log.endOffsetForEpoch(currentEpoch));
+ assertEquals(new OffsetAndEpoch(expectedRecords.size(), epoch), log.endOffsetForEpoch(epoch));
+ assertEquals(epoch, log.lastFetchedEpoch());
+ validateReadRecords(expectedRecords, log);
}
@Test
@@ -545,9 +500,9 @@ public void testUpdateLogStartOffsetWithMissingSnapshot() {
public void testFailToIncreaseLogStartPastHighWatermark() throws IOException {
int offset = 10;
int epoch = 0;
- OffsetAndEpoch snapshotId = new OffsetAndEpoch(2 * offset, 1 + epoch);
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(2 * offset, epoch);
- appendBatch(offset, epoch);
+ appendBatch(3 * offset, epoch);
log.updateHighWatermark(new LogOffsetMetadata(offset));
try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
@@ -802,15 +757,32 @@ public void testValidateValidEpochAndOffset() {
}
private Optional readOffsets(long startOffset, Isolation isolation) {
- Records records = log.read(startOffset, isolation).records;
+ // The current MockLog implementation reads at most one batch
+
long firstReadOffset = -1L;
long lastReadOffset = -1L;
- for (Record record : records.records()) {
- if (firstReadOffset < 0)
- firstReadOffset = record.offset();
- if (record.offset() > lastReadOffset)
- lastReadOffset = record.offset();
+
+ long currentStart = startOffset;
+ boolean foundRecord = true;
+ while (foundRecord) {
+ foundRecord = false;
+
+ Records records = log.read(currentStart, isolation).records;
+ for (Record record : records.records()) {
+ foundRecord = true;
+
+ if (firstReadOffset < 0L) {
+ firstReadOffset = record.offset();
+ }
+
+ if (record.offset() > lastReadOffset) {
+ lastReadOffset = record.offset();
+ }
+ }
+
+ currentStart = lastReadOffset + 1;
}
+
if (firstReadOffset < 0) {
return Optional.empty();
} else {
@@ -840,6 +812,11 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(startOffset, endOffset);
}
+
+ @Override
+ public String toString() {
+ return String.format("OffsetRange(startOffset=%s, endOffset=%s)", startOffset, endOffset);
+ }
}
private void appendAsLeader(Collection records, int epoch) {
@@ -861,4 +838,30 @@ private void appendBatch(int numRecords, int epoch) {
appendAsLeader(records, epoch);
}
+
+ private static void validateReadRecords(List expectedRecords, MockLog log) {
+ assertEquals(0L, log.startOffset());
+ assertEquals(expectedRecords.size(), log.endOffset().offset);
+
+ int currentOffset = 0;
+ while (currentOffset < log.endOffset().offset) {
+ Records records = log.read(currentOffset, Isolation.UNCOMMITTED).records;
+ List extends RecordBatch> batches = Utils.toList(records.batches().iterator());
+
+ assertTrue(batches.size() > 0);
+ for (RecordBatch batch : batches) {
+ assertTrue(batch.countOrNull() > 0);
+ assertEquals(currentOffset, batch.baseOffset());
+ assertEquals(currentOffset + batch.countOrNull() - 1, batch.lastOffset());
+
+ for (Record record : batch) {
+ assertEquals(currentOffset, record.offset());
+ assertEquals(expectedRecords.get(currentOffset), new SimpleRecord(record));
+ currentOffset += 1;
+ }
+
+ assertEquals(currentOffset - 1, batch.lastOffset());
+ }
+ }
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 024095af91de5..a70e5b638225b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -21,15 +21,15 @@
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
-import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
-import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -55,6 +55,8 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.StringSerde;
+import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.mockito.Mockito;
@@ -78,12 +80,12 @@
import java.util.stream.Collectors;
import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public final class RaftClientTestContext {
- private static final StringSerde STRING_SERDE = new StringSerde();
-
+ public final RecordSerde serde = Builder.SERDE;
final TopicPartition metadataPartition = Builder.METADATA_PARTITION;
final int electionBackoffMaxMs = Builder.ELECTION_BACKOFF_MAX_MS;
final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
@@ -111,6 +113,7 @@ public final class RaftClientTestContext {
public static final class Builder {
static final int DEFAULT_ELECTION_TIMEOUT_MS = 10000;
+ private static final RecordSerde SERDE = new StringSerde();
private static final TopicPartition METADATA_PARTITION = new TopicPartition("metadata", 0);
private static final int ELECTION_BACKOFF_MAX_MS = 100;
private static final int FETCH_MAX_WAIT_MS = 0;
@@ -184,6 +187,22 @@ Builder appendToLog(int epoch, List records) {
return this;
}
+ Builder withEmptySnapshot(OffsetAndEpoch snapshotId) throws IOException {
+ try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
+ snapshot.freeze();
+ }
+ return this;
+ }
+
+ Builder deleteBeforeSnapshot(OffsetAndEpoch snapshotId) throws IOException {
+ if (snapshotId.offset > log.highWatermark().offset) {
+ log.updateHighWatermark(new LogOffsetMetadata(snapshotId.offset));
+ }
+ log.deleteBeforeSnapshot(snapshotId);
+
+ return this;
+ }
+
Builder withElectionTimeoutMs(int electionTimeoutMs) {
this.electionTimeoutMs = electionTimeoutMs;
return this;
@@ -210,7 +229,7 @@ public RaftClientTestContext build() throws IOException {
ELECTION_BACKOFF_MAX_MS, FETCH_TIMEOUT_MS, appendLingerMs);
KafkaRaftClient client = new KafkaRaftClient<>(
- STRING_SERDE,
+ SERDE,
channel,
messageQueue,
log,
@@ -307,7 +326,7 @@ static MemoryRecords buildBatch(
ByteBuffer buffer = ByteBuffer.allocate(512);
BatchBuilder builder = new BatchBuilder<>(
buffer,
- STRING_SERDE,
+ Builder.SERDE,
CompressionType.NONE,
baseOffset,
timestamp,
@@ -1033,9 +1052,12 @@ FetchResponseData divergingFetchResponse(
}
static class MockListener implements RaftClient.Listener {
- private final List> commits = new ArrayList<>();
+ private final List> commits = new ArrayList<>();
+ private final List> savedBatches = new ArrayList<>();
private final Map claimedEpochStartOffsets = new HashMap<>();
private OptionalInt currentClaimedEpoch = OptionalInt.empty();
+ private Optional> snapshot = Optional.empty();
+ private boolean readCommit = true;
int numCommittedBatches() {
return commits.size();
@@ -1045,7 +1067,7 @@ Long claimedEpochStartOffset(int epoch) {
return claimedEpochStartOffsets.get(epoch);
}
- BatchReader.Batch lastCommit() {
+ Batch lastCommit() {
if (commits.isEmpty()) {
return null;
} else {
@@ -1081,6 +1103,42 @@ List commitWithLastOffset(long lastOffset) {
.orElse(null);
}
+ Optional> drainHandledSnapshot() {
+ Optional> temp = snapshot;
+ snapshot = Optional.empty();
+ return temp;
+ }
+
+ void updateReadCommit(boolean readCommit) {
+ this.readCommit = readCommit;
+
+ if (readCommit) {
+ for (BatchReader batch : savedBatches) {
+ readBatch(batch);
+ }
+
+ savedBatches.clear();
+ }
+ }
+
+ void readBatch(BatchReader reader) {
+ try {
+ while (reader.hasNext()) {
+ long nextOffset = lastCommitOffset().isPresent() ?
+ lastCommitOffset().getAsLong() + 1 : 0L;
+ Batch batch = reader.next();
+ // We expect monotonic offsets, but not necessarily sequential
+ // offsets since control records will be filtered.
+ assertTrue(batch.baseOffset() >= nextOffset,
+ "Received non-monotonic commit " + batch +
+ ". We expected an offset at least as large as " + nextOffset);
+ commits.add(batch);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
@Override
public void handleClaim(int epoch) {
// We record the next expected offset as the claimed epoch's start
@@ -1099,22 +1157,20 @@ public void handleResign(int epoch) {
@Override
public void handleCommit(BatchReader reader) {
- try {
- while (reader.hasNext()) {
- long nextOffset = lastCommitOffset().isPresent() ?
- lastCommitOffset().getAsLong() + 1 : 0L;
- BatchReader.Batch batch = reader.next();
- // We expect monotonic offsets, but not necessarily sequential
- // offsets since control records will be filtered.
- assertTrue(batch.baseOffset() >= nextOffset,
- "Received non-monotonic commit " + batch +
- ". We expected an offset at least as large as " + nextOffset);
- commits.add(batch);
- }
- } finally {
- reader.close();
+ if (readCommit) {
+ readBatch(reader);
+ } else {
+ savedBatches.add(reader);
}
}
- }
+ @Override
+ public void handleSnapshot(SnapshotReader reader) {
+ snapshot.ifPresent(snapshot -> assertDoesNotThrow(() -> snapshot.close()));
+
+ commits.clear();
+ savedBatches.clear();
+ snapshot = Optional.of(reader);
+ }
+ }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 94b278e7cc4a1..32e701a3854e6 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -27,6 +27,7 @@
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -34,9 +35,9 @@
import org.apache.kafka.raft.MockLog.LogBatch;
import org.apache.kafka.raft.MockLog.LogEntry;
import org.apache.kafka.raft.internals.BatchMemoryPool;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.junit.jupiter.api.Tag;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -53,11 +54,13 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -347,6 +350,7 @@ private EventScheduler schedulerWithDefaultInvariants(Cluster cluster) {
scheduler.addInvariant(new MonotonicEpoch(cluster));
scheduler.addInvariant(new MajorityReachedHighWatermark(cluster));
scheduler.addInvariant(new SingleLeader(cluster));
+ scheduler.addInvariant(new SnapshotAtLogStart(cluster));
scheduler.addValidation(new ConsistentCommittedData(cluster));
return scheduler;
}
@@ -717,7 +721,8 @@ void start(int nodeId) {
persistentState.store,
logContext,
time,
- random
+ random,
+ serde
);
node.initialize();
running.put(nodeId, node);
@@ -735,6 +740,7 @@ private static class RaftNode {
final ReplicatedCounter counter;
final Time time;
final Random random;
+ final RecordSerde intSerde;
private RaftNode(
int nodeId,
@@ -745,7 +751,8 @@ private RaftNode(
MockQuorumStateStore store,
LogContext logContext,
Time time,
- Random random
+ Random random,
+ RecordSerde intSerde
) {
this.nodeId = nodeId;
this.client = client;
@@ -757,15 +764,12 @@ private RaftNode(
this.time = time;
this.random = random;
this.counter = new ReplicatedCounter(nodeId, client, logContext);
+ this.intSerde = intSerde;
}
void initialize() {
- try {
- client.register(this.counter);
- client.initialize();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ client.register(this.counter);
+ client.initialize();
}
void poll() {
@@ -963,6 +967,52 @@ public void verify() {
}
}
+ private static class SnapshotAtLogStart implements Invariant {
+ final Cluster cluster;
+
+ private SnapshotAtLogStart(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ @Override
+ public void verify() {
+ for (Map.Entry nodeEntry : cluster.nodes.entrySet()) {
+ int nodeId = nodeEntry.getKey();
+ ReplicatedLog log = nodeEntry.getValue().log;
+ log.earliestSnapshotId().ifPresent(earliestSnapshotId -> {
+ long logStartOffset = log.startOffset();
+ ValidOffsetAndEpoch validateOffsetAndEpoch = log.validateOffsetAndEpoch(
+ earliestSnapshotId.offset,
+ earliestSnapshotId.epoch
+ );
+
+ assertTrue(
+ logStartOffset <= earliestSnapshotId.offset,
+ () -> String.format(
+ "invalid log start offset (%s) and snapshotId offset (%s): nodeId = %s",
+ logStartOffset,
+ earliestSnapshotId.offset,
+ nodeId
+ )
+ );
+ assertEquals(
+ ValidOffsetAndEpoch.valid(earliestSnapshotId),
+ validateOffsetAndEpoch,
+ () -> String.format("invalid leader epoch cache: nodeId = %s", nodeId)
+ );
+
+ if (logStartOffset > 0) {
+ assertEquals(
+ logStartOffset,
+ earliestSnapshotId.offset,
+ () -> String.format("mising snapshot at log start offset: nodeId = %s", nodeId)
+ );
+ }
+ });
+ }
+ }
+ }
+
/**
* Validating the committed data is expensive, so we do this as a {@link Validation}. We depend
* on the following external invariants:
@@ -986,14 +1036,44 @@ private int parseSequenceNumber(ByteBuffer value) {
return (int) Type.INT32.read(value);
}
- private void assertCommittedData(int nodeId, KafkaRaftClient manager, MockLog log) {
+ private void assertCommittedData(RaftNode node) {
+ final int nodeId = node.nodeId;
+ final KafkaRaftClient manager = node.client;
+ final MockLog log = node.log;
+
OptionalLong highWatermark = manager.highWatermark();
if (!highWatermark.isPresent()) {
// We cannot do validation if the current high watermark is unknown
return;
}
- for (LogBatch batch : log.readBatches(0L, highWatermark)) {
+ AtomicLong startOffset = new AtomicLong(0);
+ log.earliestSnapshotId().ifPresent(snapshotId -> {
+ assertTrue(snapshotId.offset <= highWatermark.getAsLong());
+ startOffset.set(snapshotId.offset);
+
+ try (SnapshotReader snapshot =
+ SnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) {
+ // Expect only one batch with only one record
+ assertTrue(snapshot.hasNext());
+ Batch batch = snapshot.next();
+ assertFalse(snapshot.hasNext());
+ assertEquals(1, batch.records().size());
+
+ // The snapshotId offset is an "end offset"
+ long offset = snapshotId.offset - 1;
+ int sequence = batch.records().get(0);
+ committedSequenceNumbers.putIfAbsent(offset, sequence);
+
+ assertEquals(
+ committedSequenceNumbers.get(offset),
+ sequence,
+ String.format("Committed sequence at offset %s changed on node %s", offset, nodeId)
+ );
+ }
+ });
+
+ for (LogBatch batch : log.readBatches(startOffset.get(), highWatermark)) {
if (batch.isControlBatch) {
continue;
}
@@ -1015,7 +1095,7 @@ private void assertCommittedData(int nodeId, KafkaRaftClient manager, M
@Override
public void validate() {
- cluster.forAllRunning(node -> assertCommittedData(node.nodeId, node.client, node.log));
+ cluster.forAllRunning(this::assertCommittedData);
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/MemoryBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/MemoryBatchReaderTest.java
index 90cb52714198b..25b843129a9a2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/MemoryBatchReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/MemoryBatchReaderTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft.internals;
+import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -31,17 +32,22 @@ class MemoryBatchReaderTest {
@Test
public void testIteration() {
- BatchReader.Batch batch1 = new BatchReader.Batch<>(0L, 1,
- Arrays.asList("a", "b", "c"));
- BatchReader.Batch batch2 = new BatchReader.Batch<>(3L, 2,
- Arrays.asList("d", "e"));
- BatchReader.Batch batch3 = new BatchReader.Batch<>(5L, 2,
- Arrays.asList("f", "g", "h", "i"));
+ Batch batch1 = Batch.of(
+ 0L, 1, Arrays.asList("a", "b", "c")
+ );
+ Batch batch2 = Batch.of(
+ 3L, 2, Arrays.asList("d", "e")
+ );
+ Batch batch3 = Batch.of(
+ 5L, 2, Arrays.asList("f", "g", "h", "i")
+ );
@SuppressWarnings("unchecked")
CloseListener> listener = Mockito.mock(CloseListener.class);
MemoryBatchReader reader = new MemoryBatchReader<>(
- Arrays.asList(batch1, batch2, batch3), listener);
+ Arrays.asList(batch1, batch2, batch3),
+ listener
+ );
assertEquals(0L, reader.baseOffset());
assertEquals(OptionalLong.of(8L), reader.lastOffset());
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index 78ffd51befaf5..e340738965e14 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -21,7 +21,7 @@
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
-import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -34,7 +34,6 @@
import java.util.NoSuchElementException;
import java.util.Set;
-import static java.util.Arrays.asList;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -44,7 +43,6 @@
class RecordsBatchReaderTest {
private static final int MAX_BATCH_BYTES = 128;
- private final MockTime time = new MockTime();
private final StringSerde serde = new StringSerde();
@ParameterizedTest
@@ -52,13 +50,9 @@ class RecordsBatchReaderTest {
public void testReadFromMemoryRecords(CompressionType compressionType) {
long baseOffset = 57;
- List> batches = asList(
- new BatchReader.Batch<>(baseOffset, 1, asList("a", "b", "c")),
- new BatchReader.Batch<>(baseOffset + 3, 2, asList("d", "e")),
- new BatchReader.Batch<>(baseOffset + 5, 2, asList("f"))
- );
+ List> batches = RecordsIteratorTest.createBatches(baseOffset);
+ MemoryRecords memRecords = RecordsIteratorTest.buildRecords(compressionType, batches);
- MemoryRecords memRecords = buildRecords(compressionType, batches);
testBatchReader(baseOffset, memRecords, batches);
}
@@ -67,13 +61,8 @@ public void testReadFromMemoryRecords(CompressionType compressionType) {
public void testReadFromFileRecords(CompressionType compressionType) throws Exception {
long baseOffset = 57;
- List> batches = asList(
- new BatchReader.Batch<>(baseOffset, 1, asList("a", "b", "c")),
- new BatchReader.Batch<>(baseOffset + 3, 2, asList("d", "e")),
- new BatchReader.Batch<>(baseOffset + 5, 2, asList("f"))
- );
-
- MemoryRecords memRecords = buildRecords(compressionType, batches);
+ List> batches = RecordsIteratorTest.createBatches(baseOffset);
+ MemoryRecords memRecords = RecordsIteratorTest.buildRecords(compressionType, batches);
FileRecords fileRecords = FileRecords.open(tempFile());
fileRecords.append(memRecords);
@@ -81,39 +70,10 @@ public void testReadFromFileRecords(CompressionType compressionType) throws Exce
testBatchReader(baseOffset, fileRecords, batches);
}
- private MemoryRecords buildRecords(
- CompressionType compressionType,
- List> batches
- ) {
- ByteBuffer buffer = ByteBuffer.allocate(1024);
-
- for (BatchReader.Batch batch : batches) {
- BatchBuilder builder = new BatchBuilder<>(
- buffer,
- serde,
- compressionType,
- batch.baseOffset(),
- time.milliseconds(),
- false,
- batch.epoch(),
- MAX_BATCH_BYTES
- );
-
- for (String record : batch.records()) {
- builder.appendRecord(record, null);
- }
-
- builder.build();
- }
-
- buffer.flip();
- return MemoryRecords.readableRecords(buffer);
- }
-
private void testBatchReader(
long baseOffset,
Records records,
- List> expectedBatches
+ List> expectedBatches
) {
BufferSupplier bufferSupplier = Mockito.mock(BufferSupplier.class);
Set allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>());
@@ -134,15 +94,16 @@ private void testBatchReader(
@SuppressWarnings("unchecked")
CloseListener> closeListener = Mockito.mock(CloseListener.class);
- RecordsBatchReader reader = new RecordsBatchReader<>(
+ RecordsBatchReader reader = RecordsBatchReader.of(
baseOffset,
records,
serde,
bufferSupplier,
+ MAX_BATCH_BYTES,
closeListener
);
- for (BatchReader.Batch batch : expectedBatches) {
+ for (Batch batch : expectedBatches) {
assertTrue(reader.hasNext());
assertEquals(batch, reader.next());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
new file mode 100644
index 0000000000000..e450b5266440c
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.RecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public final class RecordsIteratorTest {
+ private static final RecordSerde STRING_SERDE = new StringSerde();
+
+ private static Stream emptyRecords() throws IOException {
+ return Stream.of(
+ FileRecords.open(TestUtils.tempFile()),
+ MemoryRecords.EMPTY
+ ).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("emptyRecords")
+ void testEmptyRecords(Records records) throws IOException {
+ testIterator(Collections.emptyList(), records);
+ }
+
+ @Property
+ public void testMemoryRecords(
+ @ForAll CompressionType compressionType,
+ @ForAll long seed
+ ) {
+ List> batches = createBatches(seed);
+
+ MemoryRecords memRecords = buildRecords(compressionType, batches);
+ testIterator(batches, memRecords);
+ }
+
+ @Property
+ public void testFileRecords(
+ @ForAll CompressionType compressionType,
+ @ForAll long seed
+ ) throws IOException {
+ List> batches = createBatches(seed);
+
+ MemoryRecords memRecords = buildRecords(compressionType, batches);
+ FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
+ fileRecords.append(memRecords);
+
+ testIterator(batches, fileRecords);
+ }
+
+ private void testIterator(
+ List> expectedBatches,
+ Records records
+ ) {
+ Set allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>());
+
+ RecordsIterator iterator = createIterator(
+ records,
+ mockBufferSupplier(allocatedBuffers)
+ );
+
+ for (Batch batch : expectedBatches) {
+ assertTrue(iterator.hasNext());
+ assertEquals(batch, iterator.next());
+ }
+
+ assertFalse(iterator.hasNext());
+ assertThrows(NoSuchElementException.class, iterator::next);
+
+ iterator.close();
+ assertEquals(Collections.emptySet(), allocatedBuffers);
+ }
+
+ static RecordsIterator createIterator(Records records, BufferSupplier bufferSupplier) {
+ return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC);
+ }
+
+ static BufferSupplier mockBufferSupplier(Set buffers) {
+ BufferSupplier bufferSupplier = Mockito.mock(BufferSupplier.class);
+
+ Mockito.when(bufferSupplier.get(Mockito.anyInt())).thenAnswer(invocation -> {
+ int size = invocation.getArgument(0);
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ buffers.add(buffer);
+ return buffer;
+ });
+
+ Mockito.doAnswer(invocation -> {
+ ByteBuffer released = invocation.getArgument(0);
+ buffers.remove(released);
+ return null;
+ }).when(bufferSupplier).release(Mockito.any(ByteBuffer.class));
+
+ return bufferSupplier;
+ }
+
+ public static List> createBatches(long seed) {
+ Random random = new Random(seed);
+ long baseOffset = random.nextInt(100);
+ int epoch = random.nextInt(3) + 1;
+
+ int numberOfBatches = random.nextInt(100) + 1;
+ List> batches = new ArrayList<>(numberOfBatches);
+ for (int i = 0; i < numberOfBatches; i++) {
+ int numberOfRecords = random.nextInt(100) + 1;
+ List records = random
+ .ints(numberOfRecords, 0, 10)
+ .mapToObj(String::valueOf)
+ .collect(Collectors.toList());
+
+ batches.add(Batch.of(baseOffset, epoch, records));
+ baseOffset += records.size();
+ if (i % 5 == 0) {
+ epoch += random.nextInt(3);
+ }
+ }
+
+ return batches;
+ }
+
+ public static MemoryRecords buildRecords(
+ CompressionType compressionType,
+ List> batches
+ ) {
+ ByteBuffer buffer = ByteBuffer.allocate(102400);
+
+ for (Batch batch : batches) {
+ BatchBuilder builder = new BatchBuilder<>(
+ buffer,
+ STRING_SERDE,
+ compressionType,
+ batch.baseOffset(),
+ 12345L,
+ false,
+ batch.epoch(),
+ 1024
+ );
+
+ for (String record : batch.records()) {
+ builder.appendRecord(record, null);
+ }
+
+ builder.build();
+ }
+
+ buffer.flip();
+ return MemoryRecords.readableRecords(buffer);
+ }
+}
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
index dc4f6359d6cd1..ef38d465c5440 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
@@ -62,14 +62,14 @@ public void tearDown() throws IOException {
public void testWritingSnapshot() throws IOException {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
- int batches = 10;
+ int numberOfBatches = 10;
int expectedSize = 0;
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
assertEquals(0, snapshot.sizeInBytes());
UnalignedMemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
snapshot.append(records);
expectedSize += records.sizeInBytes();
}
@@ -88,13 +88,13 @@ public void testWritingSnapshot() throws IOException {
public void testWriteReadSnapshot() throws IOException {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
- int batches = 10;
+ int numberOfBatches = 10;
ByteBuffer expectedBuffer = ByteBuffer.wrap(randomBytes(bufferSize));
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
UnalignedMemoryRecords records = buildRecords(expectedBuffer);
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
snapshot.append(records);
}
@@ -104,7 +104,10 @@ public void testWriteReadSnapshot() throws IOException {
try (FileRawSnapshotReader snapshot = FileRawSnapshotReader.open(tempDir, offsetAndEpoch)) {
int countBatches = 0;
int countRecords = 0;
- for (RecordBatch batch : snapshot) {
+
+ Iterator batches = Utils.covariantCast(snapshot.records().batchIterator());
+ while (batches.hasNext()) {
+ RecordBatch batch = batches.next();
countBatches += 1;
Iterator records = batch.streamingIterator(new GrowableBufferSupplier());
@@ -120,8 +123,8 @@ public void testWriteReadSnapshot() throws IOException {
}
}
- assertEquals(batches, countBatches);
- assertEquals(batches, countRecords);
+ assertEquals(numberOfBatches, countBatches);
+ assertEquals(numberOfBatches, countRecords);
}
}
@@ -151,8 +154,8 @@ public void testPartialWriteReadSnapshot() throws IOException {
int totalSize = Math.toIntExact(snapshot.sizeInBytes());
assertEquals(expectedBuffer.remaining(), totalSize);
- UnalignedFileRecords record1 = (UnalignedFileRecords) snapshot.read(0, totalSize / 2);
- UnalignedFileRecords record2 = (UnalignedFileRecords) snapshot.read(totalSize / 2, totalSize - totalSize / 2);
+ UnalignedFileRecords record1 = (UnalignedFileRecords) snapshot.slice(0, totalSize / 2);
+ UnalignedFileRecords record2 = (UnalignedFileRecords) snapshot.slice(totalSize / 2, totalSize - totalSize / 2);
assertEquals(buffer1, TestUtils.toBuffer(record1));
assertEquals(buffer2, TestUtils.toBuffer(record2));
@@ -170,10 +173,10 @@ public void testBatchWriteReadSnapshot() throws IOException {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
int batchSize = 3;
- int batches = 10;
+ int numberOfBatches = 10;
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
ByteBuffer[] buffers = IntStream
.range(0, batchSize)
.mapToObj(ignore -> ByteBuffer.wrap(randomBytes(bufferSize))).toArray(ByteBuffer[]::new);
@@ -187,7 +190,10 @@ public void testBatchWriteReadSnapshot() throws IOException {
try (FileRawSnapshotReader snapshot = FileRawSnapshotReader.open(tempDir, offsetAndEpoch)) {
int countBatches = 0;
int countRecords = 0;
- for (RecordBatch batch : snapshot) {
+
+ Iterator batches = Utils.covariantCast(snapshot.records().batchIterator());
+ while (batches.hasNext()) {
+ RecordBatch batch = batches.next();
countBatches += 1;
Iterator records = batch.streamingIterator(new GrowableBufferSupplier());
@@ -202,8 +208,8 @@ public void testBatchWriteReadSnapshot() throws IOException {
}
}
- assertEquals(batches, countBatches);
- assertEquals(batches * batchSize, countRecords);
+ assertEquals(numberOfBatches, countBatches);
+ assertEquals(numberOfBatches * batchSize, countRecords);
}
}
@@ -212,11 +218,11 @@ public void testBufferWriteReadSnapshot() throws IOException {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
int batchSize = 3;
- int batches = 10;
+ int numberOfBatches = 10;
int expectedSize = 0;
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
ByteBuffer[] buffers = IntStream
.range(0, batchSize)
.mapToObj(ignore -> ByteBuffer.wrap(randomBytes(bufferSize))).toArray(ByteBuffer[]::new);
@@ -239,7 +245,9 @@ public void testBufferWriteReadSnapshot() throws IOException {
int countBatches = 0;
int countRecords = 0;
- for (RecordBatch batch : snapshot) {
+ Iterator batches = Utils.covariantCast(snapshot.records().batchIterator());
+ while (batches.hasNext()) {
+ RecordBatch batch = batches.next();
countBatches += 1;
Iterator records = batch.streamingIterator(new GrowableBufferSupplier());
@@ -254,8 +262,8 @@ public void testBufferWriteReadSnapshot() throws IOException {
}
}
- assertEquals(batches, countBatches);
- assertEquals(batches * batchSize, countRecords);
+ assertEquals(numberOfBatches, countBatches);
+ assertEquals(numberOfBatches * batchSize, countRecords);
}
}
@@ -263,11 +271,11 @@ public void testBufferWriteReadSnapshot() throws IOException {
public void testAbortedSnapshot() throws IOException {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(20L, 2);
int bufferSize = 256;
- int batches = 10;
+ int numberOfBatches = 10;
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
UnalignedMemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
snapshot.append(records);
}
}
@@ -281,11 +289,11 @@ public void testAbortedSnapshot() throws IOException {
public void testAppendToFrozenSnapshot() throws IOException {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
int bufferSize = 256;
- int batches = 10;
+ int numberOfBatches = 10;
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
UnalignedMemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
snapshot.append(records);
}
@@ -296,18 +304,18 @@ public void testAppendToFrozenSnapshot() throws IOException {
// File should exist and the size should be greater than the sum of all the buffers
assertTrue(Files.exists(Snapshots.snapshotPath(tempDir, offsetAndEpoch)));
- assertTrue(Files.size(Snapshots.snapshotPath(tempDir, offsetAndEpoch)) > bufferSize * batches);
+ assertTrue(Files.size(Snapshots.snapshotPath(tempDir, offsetAndEpoch)) > bufferSize * numberOfBatches);
}
@Test
public void testCreateSnapshotWithSameId() throws IOException {
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(20L, 2);
int bufferSize = 256;
- int batches = 1;
+ int numberOfBatches = 1;
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
UnalignedMemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
snapshot.append(records);
}
@@ -317,7 +325,7 @@ public void testCreateSnapshotWithSameId() throws IOException {
// Create another snapshot with the same id
try (FileRawSnapshotWriter snapshot = createSnapshotWriter(tempDir, offsetAndEpoch)) {
UnalignedMemoryRecords records = buildRecords(ByteBuffer.wrap(randomBytes(bufferSize)));
- for (int i = 0; i < batches; i++) {
+ for (int i = 0; i < numberOfBatches; i++) {
snapshot.append(records);
}
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
similarity index 77%
rename from raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
rename to raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index 27bdff2fda02d..be862101d73ba 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -20,19 +20,20 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
-import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClientTestContext;
+import org.apache.kafka.raft.internals.StringSerde;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
-final public class SnapshotWriterTest {
+final public class SnapshotWriterReaderTest {
private final int localId = 0;
private final Set voters = Collections.singleton(localId);
@@ -49,7 +50,7 @@ public void testWritingSnapshot() throws IOException {
snapshot.freeze();
}
- try (RawSnapshotReader reader = context.log.readSnapshot(id).get()) {
+ try (SnapshotReader reader = readSnapshot(context, id, Integer.MAX_VALUE)) {
assertSnapshot(expected, reader);
}
}
@@ -66,7 +67,7 @@ public void testAbortedSnapshot() throws IOException {
});
}
- assertFalse(context.log.readSnapshot(id).isPresent());
+ assertEquals(Optional.empty(), context.log.readSnapshot(id));
}
@Test
@@ -100,16 +101,37 @@ private List> buildRecords(int recordsPerBatch, int batches) {
return result;
}
+ private SnapshotReader readSnapshot(
+ RaftClientTestContext context,
+ OffsetAndEpoch snapshotId,
+ int maxBatchSize
+ ) {
+ return SnapshotReader.of(
+ context.log.readSnapshot(snapshotId).get(),
+ context.serde,
+ BufferSupplier.create(),
+ maxBatchSize
+ );
+ }
+
public static void assertSnapshot(List> batches, RawSnapshotReader reader) {
+ assertSnapshot(
+ batches,
+ SnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE)
+ );
+ }
+
+ public static void assertSnapshot(List> batches, SnapshotReader reader) {
List expected = new ArrayList<>();
batches.forEach(expected::addAll);
List actual = new ArrayList<>(expected.size());
- reader.forEach(batch -> {
- batch.streamingIterator(new GrowableBufferSupplier()).forEachRemaining(record -> {
- actual.add(Utils.utf8(record.value()));
- });
- });
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ for (String value : batch) {
+ actual.add(value);
+ }
+ }
assertEquals(expected, actual);
}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 739e0278d5cda..dde0e40defb2c 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -41,8 +41,10 @@
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.shell.MetadataNode.DirectoryNode;
import org.apache.kafka.shell.MetadataNode.FileNode;
import org.slf4j.Logger;
@@ -83,7 +85,7 @@ public void handleCommit(BatchReader reader) {
try {
// TODO: handle lastOffset
while (reader.hasNext()) {
- BatchReader.Batch batch = reader.next();
+ Batch batch = reader.next();
for (ApiMessageAndVersion messageAndVersion : batch.records()) {
handleMessage(messageAndVersion.message());
}
@@ -105,6 +107,20 @@ public void handleCommits(long lastOffset, List messages) {
}, null);
}
+ @Override
+ public void handleSnapshot(SnapshotReader reader) {
+ try {
+ while (reader.hasNext()) {
+ Batch batch = reader.next();
+ for (ApiMessageAndVersion messageAndVersion : batch) {
+ handleMessage(messageAndVersion.message());
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
@Override
public void handleNewLeader(MetaLogLeader leader) {
appendEvent("handleNewLeader", () -> {