Skip to content

Commit

Permalink
Implemented benefits based middle range compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
JingQianCloud committed Aug 17, 2023
1 parent dbb41b0 commit 97f8515
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 4 deletions.
20 changes: 20 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/config/StoreConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,20 @@ public class StoreConfig {
@Default("1")
public final int storeMinLogSegmentCountToReclaimToTriggerCompaction;

/**
* Only the log segment whose valid data percentage is less or equal than the specified number
* will be qualified for compaction.number.
*/
@Config("store.max.log.segment.valid.data.percentage.to.qualify.compaction")
public final double storeMaxLogSegmentValidDataPercentageToQualifyCompaction;

/**
* the time interval to run the "middle range compaction" in the stalts based compaction.
* the interval is in milliseconds.
*/
@Config("store.stats.based.middle.range.compaction.interval.in.ms")
public final long storeStatsBasedMiddleRangeCompactionIntervalInMs;

/**
* The number of buckets for stats bucketing, a value of 0 will disable bucketing.
*/
Expand Down Expand Up @@ -615,6 +629,12 @@ public StoreConfig(VerifiableProperties verifiableProperties) {
"com.github.ambry.store.CompactAllPolicyFactory");
storeMinLogSegmentCountToReclaimToTriggerCompaction =
verifiableProperties.getIntInRange("store.min.log.segment.count.to.reclaim.to.trigger.compaction", 1, 1, 1000);
storeMaxLogSegmentValidDataPercentageToQualifyCompaction =
verifiableProperties.getDoubleInRange("store.max.log.segment.valid.data.percentage.to.qualify.compaction", 0.30,
0.0, 1.0);
storeStatsBasedMiddleRangeCompactionIntervalInMs =
verifiableProperties.getLongInRange("store.stats.based.middle.range.compaction.interval.in.ms", 0, 0,
Long.MAX_VALUE);
storeStatsBucketCount = verifiableProperties.getIntInRange("store.stats.bucket.count", 0, 0, 10000);
storeStatsBucketSpanInMinutes =
verifiableProperties.getLongInRange("store.stats.bucket.span.in.minutes", 60, 1, 10000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package com.github.ambry.store;

import com.github.ambry.config.StoreConfig;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Time;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -45,6 +47,7 @@ public CompactionPolicy getCompactionPolicy() {
*/
class CompactAllPolicy implements CompactionPolicy {

final static long ERROR_MARGIN_MS = 1000 * 60 * 60;
private final StoreConfig storeConfig;
private final Time time;
private final long messageRetentionTimeInMs;
Expand All @@ -58,13 +61,27 @@ class CompactAllPolicy implements CompactionPolicy {

@Override
public CompactionDetails getCompactionDetails(long totalCapacity, long usedCapacity, long segmentCapacity,
long segmentHeaderSize, List<LogSegmentName> logSegmentsNotInJournal, BlobStoreStats blobStoreStats, String dataDir) {
long segmentHeaderSize, List<LogSegmentName> logSegmentsNotInJournal, BlobStoreStats blobStoreStats,
String dataDir) throws StoreException {
CompactionDetails details = null;
logger.trace("UsedCapacity {} vs TotalCapacity {}", usedCapacity, totalCapacity);
if (usedCapacity >= (storeConfig.storeMinUsedCapacityToTriggerCompactionInPercentage / 100.0) * totalCapacity) {
if (logSegmentsNotInJournal != null) {
details = new CompactionDetails(time.milliseconds() - messageRetentionTimeInMs, logSegmentsNotInJournal, null);
logger.info("Generating CompactionDetails {} using CompactAllPolicy", details);

Pair<Long, NavigableMap<LogSegmentName, Long>> validDataSizeByLogSegment =
blobStoreStats.getValidDataSizeByLogSegment(
new TimeRange(time.milliseconds() - messageRetentionTimeInMs - ERROR_MARGIN_MS, ERROR_MARGIN_MS));
final StringBuilder sizeLog = new StringBuilder(
"Valid data size for " + dataDir + " from BlobStoreStats " + validDataSizeByLogSegment.getFirst()
+ " segments: ");
validDataSizeByLogSegment.getSecond().forEach((logSegmentName, validDataSize) -> {
sizeLog.append(
logSegmentName + " " + validDataSize / 1000 / 1000 / 1000.0 + "GB " + validDataSize / segmentCapacity
+ "%;");
});
logger.info(sizeLog.toString());
}
}
return details;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Time;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -35,12 +36,15 @@ class StatsBasedCompactionPolicy implements CompactionPolicy {
private final Time time;
private final StoreConfig storeConfig;
private final long messageRetentionTimeInMs;
// map from store id to the time when last time we run the "middle range compaction"
private final Map<String, Long> blobToLastMiddleRangeCompaction;
private static final Logger logger = LoggerFactory.getLogger(StatsBasedCompactionPolicy.class);

StatsBasedCompactionPolicy(StoreConfig storeConfig, Time time) {
this.storeConfig = storeConfig;
this.time = time;
this.messageRetentionTimeInMs = TimeUnit.MINUTES.toMillis(storeConfig.storeDeletedMessageRetentionMinutes);
this.blobToLastMiddleRangeCompaction = new HashMap<>();
}

@Override
Expand All @@ -54,14 +58,23 @@ public CompactionDetails getCompactionDetails(long totalCapacity, long usedCapac
Pair<Long, NavigableMap<LogSegmentName, Long>> validDataSizeByLogSegment =
blobStoreStats.getValidDataSizeByLogSegment(
new TimeRange(time.milliseconds() - messageRetentionTimeInMs - ERROR_MARGIN_MS, ERROR_MARGIN_MS));
logger.info("Valid data size for {} from BlobStoreStats {} ", dataDir, validDataSizeByLogSegment);
final StringBuilder sizeLog = new StringBuilder(
"Valid data size for " + dataDir + " from BlobStoreStats " + validDataSizeByLogSegment.getFirst()
+ " segments: ");
validDataSizeByLogSegment.getSecond().forEach((logSegmentName, validDataSize) -> {
sizeLog.append(
logSegmentName + " " + validDataSize / 1000 / 1000 / 1000.0 + "GB " + validDataSize / segmentCapacity
+ "%;");
});
logger.info(sizeLog.toString());

NavigableMap<LogSegmentName, Long> potentialLogSegmentValidSizeMap = validDataSizeByLogSegment.getSecond()
.subMap(logSegmentsNotInJournal.get(0), true,
logSegmentsNotInJournal.get(logSegmentsNotInJournal.size() - 1), true);

CostBenefitInfo bestCandidateToCompact =
getBestCandidateToCompact(potentialLogSegmentValidSizeMap, segmentCapacity, segmentHeaderSize,
blobStoreStats.getMaxBlobSize());
blobStoreStats.getMaxBlobSize(), blobStoreStats);
if (bestCandidateToCompact != null) {
details =
new CompactionDetails(validDataSizeByLogSegment.getFirst(), bestCandidateToCompact.getSegmentsToCompact(),
Expand All @@ -85,10 +98,59 @@ public CompactionDetails getCompactionDetails(long totalCapacity, long usedCapac
* @return the {@link CostBenefitInfo} for the best candidate to compact. {@code null} if there isn't any.
*/
private CostBenefitInfo getBestCandidateToCompact(NavigableMap<LogSegmentName, Long> validDataPerLogSegments,
long segmentCapacity, long segmentHeaderSize, long maxBlobSize) {
long segmentCapacity, long segmentHeaderSize, long maxBlobSize, BlobStoreStats blobStoreStats) {
Map.Entry<LogSegmentName, Long> firstEntry = validDataPerLogSegments.firstEntry();
Map.Entry<LogSegmentName, Long> lastEntry = validDataPerLogSegments.lastEntry();
CostBenefitInfo bestCandidateToCompact = null;

// weigh more on compaction benefit
// try to reclaim as many log segments as possible with reasonable cost.
if (storeConfig.storeStatsBasedMiddleRangeCompactionIntervalInMs != 0) {
String storeId = blobStoreStats.getStoreId();
// when boots up, give it a chance to try the "middle range compaction"
if (!blobToLastMiddleRangeCompaction.containsKey(storeId)) {
blobToLastMiddleRangeCompaction.put(storeId,
System.currentTimeMillis() - storeConfig.storeStatsBasedMiddleRangeCompactionIntervalInMs);
}

if (System.currentTimeMillis() - blobToLastMiddleRangeCompaction.get(storeId)
>= storeConfig.storeStatsBasedMiddleRangeCompactionIntervalInMs) {
// find the first log segment has valid data percentage <= the percentage threshold
while (firstEntry != null) {
if (firstEntry.getValue() / (segmentCapacity * 1.0)
<= storeConfig.storeMaxLogSegmentValidDataPercentageToQualifyCompaction) {
break;
}
firstEntry = validDataPerLogSegments.higherEntry(firstEntry.getKey());
}
if (firstEntry != null) {
// find the last log segment has valid data percentage <= the percentage threshold
while (lastEntry != null && firstEntry.getKey().compareTo(lastEntry.getKey()) < 0) {
if (lastEntry.getValue() / (segmentCapacity * 1.0)
<= storeConfig.storeMaxLogSegmentValidDataPercentageToQualifyCompaction) {
break;
}
lastEntry = validDataPerLogSegments.lowerEntry(lastEntry.getKey());
}
if (lastEntry != null && firstEntry.getKey().compareTo(lastEntry.getKey()) < 0) {
CostBenefitInfo costBenefitInfo =
getCostBenefitInfo(firstEntry.getKey(), lastEntry.getKey(), validDataPerLogSegments, segmentCapacity,
segmentHeaderSize, maxBlobSize);
if (costBenefitInfo.getBenefit() > 1) {
logger.info("Merging middle log segments which are qualified for compaction {} ", costBenefitInfo);
blobToLastMiddleRangeCompaction.put(storeId, System.currentTimeMillis());
return costBenefitInfo;
}
}
}
logger.info("Merging middle log segments, no qualified middle range. ");
}
}

// weigh more on least IO effort
// try to reclaim some log segments with the least IO effort
firstEntry = validDataPerLogSegments.firstEntry();
lastEntry = validDataPerLogSegments.lastEntry();
while (firstEntry != null) {
Map.Entry<LogSegmentName, Long> endEntry = lastEntry;
while (endEntry != null && firstEntry.getKey().compareTo(endEntry.getKey()) <= 0) {
Expand Down

0 comments on commit 97f8515

Please sign in to comment.