Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
465d46b
Basic implementation for loading snapshot
jsancio Feb 3, 2021
1ff2ffc
Add test for snapshot loading API
jsancio Feb 9, 2021
1ee5b27
Allow for configurable buffer supplier
jsancio Feb 10, 2021
abb1441
Add snapshot reader tests
jsancio Feb 10, 2021
77d9c88
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Feb 16, 2021
d8b4e13
Use functional style when constructing SnapshotReader
jsancio Feb 18, 2021
f15ac95
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Feb 18, 2021
96099a7
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Feb 18, 2021
b139893
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Mar 11, 2021
6bbed7f
Fix test for snapshot releading
jsancio Mar 11, 2021
6346275
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Mar 11, 2021
c72053b
Change oldest to earliest
jsancio Mar 11, 2021
259ea30
Add Serde based Records iterator
jsancio Mar 19, 2021
5b9805b
Add fix the buffer allocation pattern
jsancio Mar 24, 2021
7e52209
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Mar 24, 2021
1cbf5c2
Handle the case where the max batch size is smaller than the batch
jsancio Mar 24, 2021
a859ab9
Add more documentation to RawSnapshotReader
jsancio Mar 24, 2021
033e332
Generate snapshot in ReplicatedCounter and fix simulation
jsancio Mar 24, 2021
2dd4939
Remove TODO since Jira was created
jsancio Mar 24, 2021
8adbd4c
Use Colletions.emptyIterator instead of Optional.empty
jsancio Apr 9, 2021
aba403b
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Apr 9, 2021
029fea5
Add for reading batches greater than the suggest size
jsancio Apr 10, 2021
2776ae8
Merge remote-tracking branch 'upstream/trunk' into kafka-12154-snapsh…
jsancio Apr 27, 2021
d55165e
Fix imports
jsancio Apr 27, 2021
5a3aafd
Add snapshot at log start validation for the simulation
jsancio Apr 29, 2021
7b8a320
Improve snapshot verity performance
jsancio Apr 30, 2021
fbbf953
Use supplier for assert message
jsancio Apr 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.shell"/>
<allow pkg="org.apache.kafka.snapshot"/>
<allow pkg="org.jline"/>
<allow pkg="scala.compat"/>
</subpackage>
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.BatchReader.Batch
import org.apache.kafka.raft.{BatchReader, RaftClient, RaftConfig, RecordSerde}
import org.apache.kafka.raft.{Batch, BatchReader, RaftClient, RaftConfig, RecordSerde}
import org.apache.kafka.snapshot.SnapshotReader

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -150,6 +150,7 @@ class TestRaftServer(
case class HandleClaim(epoch: Int) extends RaftEvent
case object HandleResign extends RaftEvent
case class HandleCommit(reader: BatchReader[Array[Byte]]) extends RaftEvent
case class HandleSnapshot(reader: SnapshotReader[Array[Byte]]) extends RaftEvent
case object Shutdown extends RaftEvent

private val eventQueue = new LinkedBlockingDeque[RaftEvent]()
Expand All @@ -175,6 +176,10 @@ class TestRaftServer(
eventQueue.offer(HandleCommit(reader))
}

override def handleSnapshot(reader: SnapshotReader[Array[Byte]]): Unit = {
eventQueue.offer(HandleSnapshot(reader))
}

override def initiateShutdown(): Boolean = {
val initiated = super.initiateShutdown()
eventQueue.offer(Shutdown)
Expand Down Expand Up @@ -226,7 +231,11 @@ class TestRaftServer(
reader.close()
}

case _ =>
case HandleSnapshot(reader) =>
// Ignore snapshots; only interested in records appended by this leader
reader.close()

case Shutdown => // Ignore shutdown command
}
}

Expand Down
133 changes: 133 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/Batch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

/**
* A batch of records.
*
* This type contains a list of records `T` along with the information associated with those records.
*/
public final class Batch<T> implements Iterable<T> {
private final long baseOffset;
private final int epoch;
private final long lastOffset;
private final List<T> records;

private Batch(long baseOffset, int epoch, long lastOffset, List<T> records) {
this.baseOffset = baseOffset;
this.epoch = epoch;
this.lastOffset = lastOffset;
this.records = records;
}

/**
* The offset of the last record in the batch.
*/
public long lastOffset() {
return lastOffset;
}

/**
* The offset of the first record in the batch.
*/
public long baseOffset() {
return baseOffset;
}

/**
* The list of records in the batch.
*/
public List<T> records() {
return records;
}

/**
* The epoch of the leader that appended the record batch.
*/
public int epoch() {
return epoch;
}

@Override
public Iterator<T> iterator() {
return records.iterator();
}

@Override
public String toString() {
return "Batch(" +
"baseOffset=" + baseOffset +
", epoch=" + epoch +
", lastOffset=" + lastOffset +
", records=" + records +
')';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Batch<?> batch = (Batch<?>) o;
return baseOffset == batch.baseOffset &&
epoch == batch.epoch &&
Objects.equals(records, batch.records);
}

@Override
public int hashCode() {
return Objects.hash(baseOffset, epoch, records);
}

/**
* Create a batch without any records.
*
* Internally this is used to propagate offset information for control batches which do not decode to the type T.
*
* @param baseOffset offset of the batch
* @param epoch epoch of the leader that created this batch
* @param lastOffset offset of the last record of this batch
*/
public static <T> Batch<T> empty(long baseOffset, int epoch, long lastOffset) {
return new Batch<>(baseOffset, epoch, lastOffset, Collections.emptyList());
}

/**
* Create a batch with the given base offset, epoch and records.
*
* @param baseOffset offset of the first record in the batch
* @param epoch epoch of the leader that created this batch
* @param records the list of records in this batch
*/
public static <T> Batch<T> of(long baseOffset, int epoch, List<T> records) {
if (records.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"Batch must contain at least one record; baseOffset = %s; epoch = %s",
baseOffset,
epoch
)
);
}

return new Batch<>(baseOffset, epoch, baseOffset + records.size() - 1, records);
}
}
57 changes: 1 addition & 56 deletions raft/src/main/java/org/apache/kafka/raft/BatchReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.apache.kafka.raft;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;

/**
Expand All @@ -32,7 +30,7 @@
*
* @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
*/
public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, AutoCloseable {
public interface BatchReader<T> extends Iterator<Batch<T>>, AutoCloseable {

/**
* Get the base offset of the readable batches. Note that this value is a constant
Expand All @@ -59,57 +57,4 @@ public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, AutoClos
*/
@Override
void close();

class Batch<T> {
private final long baseOffset;
private final int epoch;
private final List<T> records;

public Batch(long baseOffset, int epoch, List<T> records) {
this.baseOffset = baseOffset;
this.epoch = epoch;
this.records = records;
}

public long lastOffset() {
return baseOffset + records.size() - 1;
}

public long baseOffset() {
return baseOffset;
}

public List<T> records() {
return records;
}

public int epoch() {
return epoch;
}

@Override
public String toString() {
return "Batch(" +
"baseOffset=" + baseOffset +
", epoch=" + epoch +
", records=" + records +
')';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Batch<?> batch = (Batch<?>) o;
return baseOffset == batch.baseOffset &&
epoch == batch.epoch &&
Objects.equals(records, batch.records);
}

@Override
public int hashCode() {
return Objects.hash(baseOffset, epoch, records);
}
}

}
2 changes: 2 additions & 0 deletions raft/src/main/java/org/apache/kafka/raft/FollowerState.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public String toString() {
", epoch=" + epoch +
", leaderId=" + leaderId +
", voters=" + voters +
", highWatermark=" + highWatermark +
", fetchingSnapshot=" + fetchingSnapshot +
')';
}

Expand Down
Loading