Skip to content

Commit

Permalink
RATIS-2186. Raft log should not purge index lower than the log start …
Browse files Browse the repository at this point in the history
…index
  • Loading branch information
ivandika3 committed Nov 9, 2024
1 parent e75a0d5 commit 5cfb015
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ public final CompletableFuture<Long> purge(long suggestedIndex) {
if (suggestedIndex - lastPurge < purgeGap) {
return CompletableFuture.completedFuture(lastPurge);
}
long startIndex = getStartIndex();
if (suggestedIndex < startIndex) {
LOG.info("{}: purge is skipped since the suggested index {} is lower than " +
"log start index {}",
getName(), suggestedIndex, startIndex);
return CompletableFuture.completedFuture(lastPurge);
}
LOG.info("{}: purge {}", getName(), suggestedIndex);
final long finalSuggestedIndex = suggestedIndex;
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static SegmentedRaftLog newSegmentedRaftLog(RaftStorage storage, RaftProperties
.build();
}

private SegmentedRaftLog newSegmentedRaftLogWithSnapshotIndex(RaftStorage storage, RaftProperties properties,
SegmentedRaftLog newSegmentedRaftLogWithSnapshotIndex(RaftStorage storage, RaftProperties properties,
LongSupplier getSnapshotIndexFromStateMachine) {
return SegmentedRaftLog.newBuilder()
.setMemberId(MEMBER_ID)
Expand Down Expand Up @@ -566,7 +566,8 @@ public void testPurgeOnOpenSegment() throws Exception {
int segmentSize = 200;
long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex);
long purgePreservation = 0L;
purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex, 0, 0);
}

@Test
Expand All @@ -576,7 +577,7 @@ public void testPurgeOnClosedSegments() throws Exception {
int segmentSize = 200;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex, 0, 0);
}

@Test
Expand All @@ -587,7 +588,7 @@ public void testPurgeLogMetric() throws Exception {
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry(MEMBER_ID);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex, 0, 0);
final DefaultTimekeeperImpl purge = (DefaultTimekeeperImpl) metricRegistryForLogWorker.timer("purgeLog");
assertTrue(purge.getTimer().getCount() > 0);
}
Expand All @@ -599,18 +600,34 @@ public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception {
int segmentSize = 200;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
long expectedIndex = RaftLog.LEAST_VALID_LOG_INDEX;
purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex);
purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex, 0, 0);
}

@Test
public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws Exception {
int startTerm = 0;
int endTerm = 5;
int segmentSize = 200;
long endIndex = segmentSize * (endTerm - startTerm) - 1;
// start index is set so that the suggested index will not be negative, which will not trigger any purge
long startIndex = 200;
// purge preservation is larger than the total size of the log entries
// which causes suggested index to be lower than the start index
long purgePreservation = (segmentSize * (endTerm - startTerm )) + 100;
// if the suggested index is lower than the start index due to the purge preservation, we should not purge anything
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex, startIndex, purgePreservation);
}

private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex,
long expectedIndex) throws Exception {
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, 0);
long expectedIndex, long startIndex, long purgePreservation) throws Exception {
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, startIndex);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);

final RaftProperties p = new RaftProperties();
RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
RaftServerConfigKeys.Log.setPurgePreservationLogNum(p, purgePreservation);
try (SegmentedRaftLog raftLog = newSegmentedRaftLogWithSnapshotIndex(storage, p, () -> startIndex - 1)) {
raftLog.open(startIndex - 1, null);
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
final Long purged = f.get();
Expand Down

0 comments on commit 5cfb015

Please sign in to comment.