Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LogByteSizeMergePolicy instead of TieredMergePolicy for time-based data. #92684

Merged
merged 17 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/92684.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92684
summary: Use `LogByteSizeMergePolicy` instead of `TieredMergePolicy` for time-based
data
area: Engine
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING,
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING,
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_TYPE_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING,
IndexSortConfig.INDEX_SORT_FIELD_SETTING,
IndexSortConfig.INDEX_SORT_ORDER_SETTING,
IndexSortConfig.INDEX_SORT_MISSING_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
mergePolicyConfig::setCompoundFormatThreshold
);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_TYPE_SETTING, mergePolicyConfig::setMergePolicyType);
scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
mergePolicyConfig::setDeletesPctAllowed
Expand All @@ -809,6 +810,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
mergePolicyConfig::setSegmentsPerTier
);
scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING,
mergePolicyConfig::setMergeFactor
);

scopedSettings.addSettingsUpdateConsumer(
MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING,
Expand Down Expand Up @@ -1216,8 +1221,8 @@ public long getGcDeletesInMillis() {
/**
* Returns the merge policy that should be used for this index.
*/
public MergePolicy getMergePolicy() {
return mergePolicyConfig.getMergePolicy();
public MergePolicy getMergePolicy(boolean isTimeBasedIndex) {
return mergePolicyConfig.getMergePolicy(isTimeBasedIndex);
}

public <T> T getValue(Setting<T> setting) {
Expand Down
147 changes: 126 additions & 21 deletions server/src/main/java/org/elasticsearch/index/MergePolicyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.index;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
Expand Down Expand Up @@ -101,15 +102,31 @@
*/

public final class MergePolicyConfig {
private final TieredMergePolicy mergePolicy = new TieredMergePolicy();
private final TieredMergePolicy tieredMergePolicy = new TieredMergePolicy();
private final LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy();
private final Logger logger;
private final boolean mergesEnabled;
private volatile Type mergePolicyType;

public static final double DEFAULT_EXPUNGE_DELETES_ALLOWED = 10d;
public static final ByteSizeValue DEFAULT_FLOOR_SEGMENT = new ByteSizeValue(2, ByteSizeUnit.MB);
public static final int DEFAULT_MAX_MERGE_AT_ONCE = 10;
public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider raising this default "a bit" when using log-byte-size policy? To get to a similar segment size as in the tiered merge policy.

I say this because, I could imagine us wanting to merge up all the remaining small segments after rollover. And with just below 5GB, that may give 11 segs rather than 10, which seems wasteful. It would be good to have a ~5.1GB avg large segment size - to match our defaults of 50GB shards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, TieredMergePolicy may stop merging anywhere between 2.5GB and 5GB (https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/index/TieredMergePolicy.java#L375-L390). It's less frequent than with LogByteSizeMergePolicy because it has the ability to pack small segments together with large segments due to its ability to merge non-adjacent segments, but if one of the background merges produces a 4GB segment, then this segment won't get merged because the merge would mostly consist of rewriting this big large segment, which is wasteful.

In my opinion, what we should do depends on what we think is the best practice for time-based indices:

  • If we think that old indices should get force-merged to a single segment, it might make more sense to lower the max merged segment size to something like 1GB in order to save merging while the index is actively being written: since all segments will get rewritten in the end, it doesn't really matter where we end with background merges and leaning towards smaller segments would help decrease the merging overhead on ingestion.
  • If we think that it's a better practice to only merge the smaller segments and end up with segments in the order of 5GB on average as you are suggesting, then it might indeed make more sense to set the max merged segment size to something like 6GB.
  • Or maybe we should not configure a maximum merged segment size at all on time-based indices, because we already have a rollover size that already bounds somehow the maximum size of segments. If you have 10 1GB segments, why would you do 2 5-segments merges to get 2 5GB segments when doing a single 10-segments merge has roughly the same cost and results in fewer segments?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the 3rd option, unless if there are practical limits we could run into (heap and memory primarily). We may still want some "desired segment size" for tail merging, but that can be added as part of that effort (if we implement it). Given that it is now a factor 16, it seems very likely that with any limit we will be doing several smaller merges to fit under the indicated roof. Compared to the 5GB limit we have now, we should not see 16 of those being merged together. Correct me if I am wrong, but removing the roof is thus unlikely to lead to more actual merging than with the 5GB limit?
I suppose we might be able to ease the merging a bit with a 1GB roof. Perhaps worth trying out what the amplification would be in the two cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no practical limits that I can think of and I agree that there shouldn't be more merging than what we're seeing now because 5GB is already within one mergeFactor of the rollover size of 50GB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henningandersen Here's a video that shows the difference between max segment size = 1GB and an unbounded max segment size: https://photos.app.goo.gl/cyWZeTmthhWwC3fZ9.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I forgot to mention that this video was created with 300kB flushes on average rather than 1MB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have a new video then comparing 5GB max segment size to unbounded - with same flush size. Just to verify our mental model around the write amplification effect of the max segment size.
I wonder if we'd still want something like 100GB max segment size as protection against users using a non-default rollover size or hiccups in ILM and the rollover process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a video that compares 5GB to unbounded with TieredMergePolicy: https://photos.app.goo.gl/JEDJdGFVcUAbXaKP9, and there with LogByteSizeMergePolicy (mergeFactor=16, same parameters as TieredMergePolicy otherwise): https://photos.app.goo.gl/cp3my5RzEbTHrAXSA. Very similar numbers of segments and write amplification but the bigger segments are bigger when the max segment size is unbounded. You'd need to ingest more than 50GB in an index to start seeing a bigger difference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, I think a 100GB (or 50GB) roof should be our new default with log byte size merge policy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a change that sets a roof of 100GB on time-based data.

/**
* Time-based data generally gets rolled over, so there is not much value in enforcing a maximum segment size, which has the side effect
* of merging fewer segments together than the merge factor, which in-turn increases write amplification. So we set an arbitrarily high
* roof that serves as a protection that we expect to never hit.
*/
public static final ByteSizeValue DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT = new ByteSizeValue(100, ByteSizeUnit.GB);
public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d;
/**
* A default value for {@link LogByteSizeMergePolicy}'s merge factor: 16. This default value differs from the Lucene default of 10 in
* order to account for the fact that Elasticsearch uses {@link LogByteSizeMergePolicy} for time-based data, where it usually makes
* sense to merge data less aggressively, and because {@link LogByteSizeMergePolicy} merges segments more aggressively than
* {@link TieredMergePolicy} for the same number of segments per tier / merge factor because {@link TieredMergePolicy} makes decisions
* at the whole index level, while {@link LogByteSizeMergePolicy} makes decisions on a per-tier basis.
*/
public static final int DEFAULT_MERGE_FACTOR = 16;
public static final double DEFAULT_DELETES_PCT_ALLOWED = 20.0d;
private static final String INDEX_COMPOUND_FORMAT_SETTING_KEY = "index.compound_format";
public static final Setting<CompoundFileThreshold> INDEX_COMPOUND_FORMAT_SETTING = new Setting<>(
Expand All @@ -120,6 +137,57 @@ public final class MergePolicyConfig {
Property.IndexScope
);

public enum Type {
UNSET {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex) {
if (isTimeBasedIndex) {
// With time-based data, it's important that the merge policy only merges adjacent segments, so that segments end up
// with non-overlapping time ranges if data gets indexed in order. This makes queries more efficient, as range filters
// on the timestamp are more likely to either fully match a segment or not match it at all, which Lucene handles more
// efficiently than a partially matching segment. This also plays nicely with the fact that recent data is more heavily
// queried than older data, so some segments are more likely to not get touched at all by queries if they don't
// intersect with the query's range.

// The downside of only doing adjacent merges is that it may result in slightly less efficient merging if there is a lot
// of variance in the size of flushes. Allowing merges of non-adjacent segments also makes it possible to reclaim
// deletes a bit more efficiently by merging together segments that have the most deletes, even though they might not be
// adjacent. But overall, the benefits of only doing adjacent merging exceed the downsides for time-based data.

// LogByteSizeMergePolicy is similar to TieredMergePolicy, as it also tries to organize segments into tiers of
// exponential sizes. The main difference is that it never merges non-adjacent segments, which is an interesting
// property for time-based data as described above.

return config.logByteSizeMergePolicy;
} else {
return config.tieredMergePolicy;
}
}
},
TIERED {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex) {
return config.tieredMergePolicy;
}
},
LOG_BYTE_SIZE {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeBasedIndex) {
return config.logByteSizeMergePolicy;
}
};

abstract MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeSeries);
}

public static final Setting<Type> INDEX_MERGE_POLICY_TYPE_SETTING = Setting.enumSetting(
Type.class,
"index.merge.policy.type",
Type.UNSET,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Double> INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING = Setting.doubleSetting(
"index.merge.policy.expunge_deletes_allowed",
DEFAULT_EXPUNGE_DELETES_ALLOWED,
Expand Down Expand Up @@ -150,7 +218,8 @@ public final class MergePolicyConfig {
);
public static final Setting<ByteSizeValue> INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING = Setting.byteSizeSetting(
"index.merge.policy.max_merged_segment",
DEFAULT_MAX_MERGED_SEGMENT,
// We're not using DEFAULT_MAX_MERGED_SEGMENT here as we want different defaults for time-based data vs. non-time based
new ByteSizeValue(0, ByteSizeUnit.BYTES),
Property.Dynamic,
Property.IndexScope
);
Expand All @@ -161,6 +230,13 @@ public final class MergePolicyConfig {
Property.Dynamic,
Property.IndexScope
);
public static final Setting<Integer> INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting(
"index.merge.policy.merge_factor",
DEFAULT_MERGE_FACTOR,
2,
Property.Dynamic,
Property.IndexScope
);
public static final Setting<Double> INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING = Setting.doubleSetting(
"index.merge.policy.deletes_pct_allowed",
DEFAULT_DELETES_PCT_ALLOWED,
Expand All @@ -174,13 +250,15 @@ public final class MergePolicyConfig {

MergePolicyConfig(Logger logger, IndexSettings indexSettings) {
this.logger = logger;
Type mergePolicyType = indexSettings.getValue(INDEX_MERGE_POLICY_TYPE_SETTING);
double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage
ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING);
int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING);
// TODO is this really a good default number for max_merge_segment, what happens for large indices,
// won't they end up with many segments?
ByteSizeValue maxMergedSegment = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING);
double segmentsPerTier = indexSettings.getValue(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING);
int mergeFactor = indexSettings.getValue(INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING);
double deletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING);
this.mergesEnabled = indexSettings.getSettings().getAsBoolean(INDEX_MERGE_ENABLED, true);
if (mergesEnabled == false) {
Expand All @@ -190,15 +268,17 @@ public final class MergePolicyConfig {
);
}
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING).configure(mergePolicy);
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.getMbFrac());
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
mergePolicy.setSegmentsPerTier(segmentsPerTier);
mergePolicy.setDeletesPctAllowed(deletesPctAllowed);
setMergePolicyType(mergePolicyType);
setCompoundFormatThreshold(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING));
setExpungeDeletesAllowed(forceMergeDeletesPctAllowed);
setFloorSegmentSetting(floorSegment);
setMaxMergesAtOnce(maxMergeAtOnce);
setMaxMergedSegment(maxMergedSegment);
setSegmentsPerTier(segmentsPerTier);
setMergeFactor(mergeFactor);
setDeletesPctAllowed(deletesPctAllowed);
logger.trace(
"using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}],"
"using merge policy with expunge_deletes_allowed[{}], floor_segment[{}],"
+ " max_merge_at_once[{}], max_merged_segment[{}], segments_per_tier[{}],"
+ " deletes_pct_allowed[{}]",
forceMergeDeletesPctAllowed,
Expand All @@ -210,32 +290,54 @@ public final class MergePolicyConfig {
);
}

void setSegmentsPerTier(Double segmentsPerTier) {
mergePolicy.setSegmentsPerTier(segmentsPerTier);
void setMergePolicyType(Type type) {
this.mergePolicyType = type;
}

void setSegmentsPerTier(double segmentsPerTier) {
tieredMergePolicy.setSegmentsPerTier(segmentsPerTier);
// LogByteSizeMergePolicy ignores this parameter, it always tries to have between 1 and merge_factor - 1 segments per tier.
}

void setMergeFactor(int mergeFactor) {
// TieredMergePolicy ignores this setting, it configures a number of segments per tier instead, which has different semantics.
logByteSizeMergePolicy.setMergeFactor(mergeFactor);
}

void setMaxMergedSegment(ByteSizeValue maxMergedSegment) {
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
// We use 0 as a placeholder for "unset".
if (maxMergedSegment.getBytes() == 0) {
tieredMergePolicy.setMaxMergedSegmentMB(DEFAULT_MAX_MERGED_SEGMENT.getMbFrac());
logByteSizeMergePolicy.setMaxMergeMB(DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT.getMbFrac());
} else {
tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
logByteSizeMergePolicy.setMaxMergeMB(maxMergedSegment.getMbFrac());
}
}

void setMaxMergesAtOnce(Integer maxMergeAtOnce) {
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
void setMaxMergesAtOnce(int maxMergeAtOnce) {
tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
// LogByteSizeMergePolicy ignores this parameter, it always merges merge_factor segments at once.
}

void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) {
mergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
tieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
logByteSizeMergePolicy.setMinMergeMB(floorSegementSetting.getMbFrac());
}

void setExpungeDeletesAllowed(Double value) {
mergePolicy.setForceMergeDeletesPctAllowed(value);
tieredMergePolicy.setForceMergeDeletesPctAllowed(value);
// LogByteSizeMergePolicy doesn't have a similar configuration option
}

void setCompoundFormatThreshold(CompoundFileThreshold compoundFileThreshold) {
compoundFileThreshold.configure(mergePolicy);
compoundFileThreshold.configure(tieredMergePolicy);
compoundFileThreshold.configure(logByteSizeMergePolicy);
}

void setDeletesPctAllowed(Double deletesPctAllowed) {
mergePolicy.setDeletesPctAllowed(deletesPctAllowed);
tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed);
// LogByteSizeMergePolicy doesn't have a similar configuration option
}

private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) {
Expand All @@ -258,8 +360,11 @@ private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerT
}

@SuppressForbidden(reason = "we always use an appropriate merge scheduler alongside this policy so NoMergePolic#INSTANCE is ok")
MergePolicy getMergePolicy() {
return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE;
MergePolicy getMergePolicy(boolean isTimeBasedIndex) {
if (mergesEnabled == false) {
return NoMergePolicy.INSTANCE;
}
return mergePolicyType.getMergePolicy(this, isTimeBasedIndex);
}

private static CompoundFileThreshold parseCompoundFormat(String noCFSRatio) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3269,14 +3269,14 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
this.warmer.warm(reader);
}
};
final boolean isTimeseriesIndex = mapperService == null ? false : mapperService.mappingLookup().hasTimestampField();
final boolean isTimeBasedIndex = mapperService == null ? false : mapperService.mappingLookup().hasTimestampField();
return new EngineConfig(
shardId,
threadPool,
indexSettings,
warmer,
store,
indexSettings.getMergePolicy(),
indexSettings.getMergePolicy(isTimeBasedIndex),
buildIndexAnalyzer(mapperService),
similarityService.similarity(mapperService == null ? null : mapperService::fieldType),
codecService,
Expand All @@ -3293,7 +3293,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
replicationTracker::getRetentionLeases,
this::getOperationPrimaryTerm,
snapshotCommitSupplier,
isTimeseriesIndex ? TIMESERIES_LEAF_READERS_SORTER : null,
isTimeBasedIndex ? TIMESERIES_LEAF_READERS_SORTER : null,
relativeTimeInNanosSupplier,
indexCommitListener,
routingEntry().isPromotableToPrimary()
Expand Down
Loading