Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandika3 committed Nov 12, 2024
1 parent 5cfb015 commit a2a4a59
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,27 +318,28 @@ public final CompletableFuture<Long> truncate(long index) {

@Override
public final CompletableFuture<Long> purge(long suggestedIndex) {
final long adjustedIndex;
if (purgePreservation > 0) {
final long currentIndex = getNextIndex() - 1;
suggestedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation);
adjustedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation);
} else {
adjustedIndex = suggestedIndex;
}
final long lastPurge = purgeIndex.get();
if (suggestedIndex - lastPurge < purgeGap) {
if (adjustedIndex - lastPurge < purgeGap) {
return CompletableFuture.completedFuture(lastPurge);
}
long startIndex = getStartIndex();
if (suggestedIndex < startIndex) {
LOG.info("{}: purge is skipped since the suggested index {} is lower than " +
"log start index {}",
getName(), suggestedIndex, startIndex);
final long startIndex = getStartIndex();
if (adjustedIndex < startIndex) {
LOG.info("{}: purge({}) is skipped: adjustedIndex = {} < startIndex = {}, purgePreservation = {}",
getName(), suggestedIndex, adjustedIndex, startIndex, purgePreservation);
return CompletableFuture.completedFuture(lastPurge);
}
LOG.info("{}: purge {}", getName(), suggestedIndex);
final long finalSuggestedIndex = suggestedIndex;
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
LOG.info("{}: purge {}", getName(), adjustedIndex );
return purgeImpl(adjustedIndex).whenComplete((purged, e) -> {
updatePurgeIndex(purged);
if (e != null) {
LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
LOG.warn(getName() + ": Failed to purge " + adjustedIndex, e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ TruncationSegments purge(long index) {
list.addAll(segments);
segments.clear();
sizeInBytes = 0;
} else if (segmentIndex == -1) {
// nothing to purge
return null;
} else if (segmentIndex >= 0) {
// we start to purge the closedSegments which do not overlap with index.
LogSegment overlappedSegment = segments.get(segmentIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public void testPurgeOnOpenSegment() throws Exception {
long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
long purgePreservation = 0L;
purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex, 0, 0);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex);
}

@Test
Expand All @@ -577,7 +577,7 @@ public void testPurgeOnClosedSegments() throws Exception {
int segmentSize = 200;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex, 0, 0);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex);
}

@Test
Expand All @@ -588,7 +588,7 @@ public void testPurgeLogMetric() throws Exception {
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
final RatisMetricRegistry metricRegistryForLogWorker = RaftLogMetricsBase.createRegistry(MEMBER_ID);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex, 0, 0);
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndexOfClosedSegment, expectedIndex);
final DefaultTimekeeperImpl purge = (DefaultTimekeeperImpl) metricRegistryForLogWorker.timer("purgeLog");
assertTrue(purge.getTimer().getCount() > 0);
}
Expand All @@ -600,7 +600,7 @@ public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception {
int segmentSize = 200;
long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1) - 1;
long expectedIndex = RaftLog.LEAST_VALID_LOG_INDEX;
purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex, 0, 0);
purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex);
}

@Test
Expand All @@ -618,6 +618,11 @@ public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws Excepti
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex, startIndex, purgePreservation);
}

private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex,
long expectedIndex) throws Exception {
purgeAndVerify(startTerm, endTerm, segmentSize, purgeGap, purgeIndex, expectedIndex, 0, 0);
}

private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex,
long expectedIndex, long startIndex, long purgePreservation) throws Exception {
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, startIndex);
Expand Down

0 comments on commit a2a4a59

Please sign in to comment.