Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use LogByteSizeMergePolicy instead of TieredMergePolicy for time-based data. #92684

Merged
merged 17 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/92684.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92684
summary: Use `LogByteSizeMergePolicy` instead of `TieredMergePolicy` for time-based
data
area: Engine
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING,
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING,
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_TYPE_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
mergePolicyConfig::setCompoundFormatThreshold
);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_TYPE_SETTING, mergePolicyConfig::setMergePolicyType);
scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
mergePolicyConfig::setDeletesPctAllowed
Expand Down Expand Up @@ -1210,8 +1211,8 @@ public long getGcDeletesInMillis() {
/**
* Returns the merge policy that should be used for this index.
*/
public MergePolicy getMergePolicy() {
return mergePolicyConfig.getMergePolicy();
public MergePolicy getMergePolicy(boolean isTimeseriesIndex) {
return mergePolicyConfig.getMergePolicy(isTimeseriesIndex);
}

public <T> T getValue(Setting<T> setting) {
Expand Down
103 changes: 83 additions & 20 deletions server/src/main/java/org/elasticsearch/index/MergePolicyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.index;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
Expand Down Expand Up @@ -101,9 +102,11 @@
*/

public final class MergePolicyConfig {
private final TieredMergePolicy mergePolicy = new TieredMergePolicy();
private final TieredMergePolicy tieredMergePolicy = new TieredMergePolicy();
private final LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy();
private final Logger logger;
private final boolean mergesEnabled;
private volatile Type mergePolicyType;

public static final double DEFAULT_EXPUNGE_DELETES_ALLOWED = 10d;
public static final ByteSizeValue DEFAULT_FLOOR_SEGMENT = new ByteSizeValue(2, ByteSizeUnit.MB);
Expand All @@ -120,6 +123,45 @@ public final class MergePolicyConfig {
Property.IndexScope
);

public enum Type {
UNSET {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeSeriesIndex) {
if (isTimeSeriesIndex) {
// TieredMergePolicy is better than LogByteSizeMergePolicy at computing cheaper merges, but it does so by allowing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is formulated as if it in practice could be (substantially) more expensive to use the log merge policy, perhaps we can elaborate here on why this is unlikely to be the case for data streams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I pushed an update that makes it sound like a better trade-off.

// itself to merge non-adjacent segments. An important property we get when only merging adjacent segments and data gets
// indexed in order is that segments have non-overlapping time ranges. This means that a range query on the time field
// will only partially match 2 segments at most, and other segments will either fully match or not match at all.
return config.logByteSizeMergePolicy;
} else {
return config.tieredMergePolicy;
}
}
},
TIERED {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeSeriesIndex) {
return config.tieredMergePolicy;
}
},
LOG_BYTE_SIZE {
@Override
MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeSeriesIndex) {
return config.logByteSizeMergePolicy;
}
};

abstract MergePolicy getMergePolicy(MergePolicyConfig config, boolean isTimeSeries);
}

public static final Setting<Type> INDEX_MERGE_POLICY_TYPE_SETTING = Setting.enumSetting(
Type.class,
"index.merge.policy.type",
Type.UNSET,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Double> INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING = Setting.doubleSetting(
"index.merge.policy.expunge_deletes_allowed",
DEFAULT_EXPUNGE_DELETES_ALLOWED,
Expand Down Expand Up @@ -174,6 +216,7 @@ public final class MergePolicyConfig {

MergePolicyConfig(Logger logger, IndexSettings indexSettings) {
this.logger = logger;
Type mergePolicyType = indexSettings.getValue(INDEX_MERGE_POLICY_TYPE_SETTING);
double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage
ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING);
int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING);
Expand All @@ -190,15 +233,16 @@ public final class MergePolicyConfig {
);
}
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING).configure(mergePolicy);
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.getMbFrac());
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
mergePolicy.setSegmentsPerTier(segmentsPerTier);
mergePolicy.setDeletesPctAllowed(deletesPctAllowed);
setMergePolicyType(mergePolicyType);
setCompoundFormatThreshold(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING));
setExpungeDeletesAllowed(forceMergeDeletesPctAllowed);
setFloorSegmentSetting(floorSegment);
setMaxMergesAtOnce(maxMergeAtOnce);
setMaxMergedSegment(maxMergedSegment);
setSegmentsPerTier(segmentsPerTier);
setDeletesPctAllowed(deletesPctAllowed);
logger.trace(
"using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}],"
"using merge policy with expunge_deletes_allowed[{}], floor_segment[{}],"
+ " max_merge_at_once[{}], max_merged_segment[{}], segments_per_tier[{}],"
+ " deletes_pct_allowed[{}]",
forceMergeDeletesPctAllowed,
Expand All @@ -210,32 +254,48 @@ public final class MergePolicyConfig {
);
}

void setSegmentsPerTier(Double segmentsPerTier) {
mergePolicy.setSegmentsPerTier(segmentsPerTier);
void setMergePolicyType(Type type) {
this.mergePolicyType = type;
}

void setSegmentsPerTier(double segmentsPerTier) {
tieredMergePolicy.setSegmentsPerTier(segmentsPerTier);
logByteSizeMergePolicy.setMergeFactor((int) segmentsPerTier);
}

void setMaxMergedSegment(ByteSizeValue maxMergedSegment) {
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
// Note: max merge MB has different semantics on LogByteSizeMergePolicy: it's the maximum size for a segment to be considered for a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this opens us up for degenerate cases in both ends, either now stopping at 2.5GB or going up to 25GB. I imagine this being provoked by different input styles (either document size, frequency etc), i.e., some data streams work well (hitting around 5GB) whereas others may suffer from worse search or indexing (due to the larger merges) performance?

I wonder if we could adapt the log merge policy to be closer to the tiered merge policy here. I.e., if it can merge together 2 adjacent segments but not 3, then merge the 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your question, it made me check the actual LogByteSizeMergePolicy behavior, and actually my comment is not right, LogByteSizeMergePolicy works as you describe since may last year via apache/lucene#935, which introduced the behavior you described as part of fixing another problem with this merge policy, the fact that this merge policy packs segments together is even tested.

// merge, ie. max input segment size, while for TieredMergePolicy, it's the max output segment size. Also LogByteSizeMergePolicy
// doesn't try to pack as many segments together as necessary to get as close as possible to the max merged segment size. To
// account for that, we divide the max segment size by 2, and in practice, the maximum segment size in an index will be somewhere in
// [maxMergedSegment / 2, maxMergedSegment * 5] (assuming a merge factor of 10).
logByteSizeMergePolicy.setMaxMergeMB(maxMergedSegment.getMbFrac() / 2);
}

void setMaxMergesAtOnce(Integer maxMergeAtOnce) {
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
void setMaxMergesAtOnce(int maxMergeAtOnce) {
tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
// LogByteSizeMergePolicy ignores this parameter, it always merges "segments per tier" segments at once.
}

void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) {
mergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
tieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
logByteSizeMergePolicy.setMinMergeMB(floorSegementSetting.getMbFrac());
}

void setExpungeDeletesAllowed(Double value) {
mergePolicy.setForceMergeDeletesPctAllowed(value);
tieredMergePolicy.setForceMergeDeletesPctAllowed(value);
// LogByteSizeMergePolicy doesn't have a similar configuration option
}

void setCompoundFormatThreshold(CompoundFileThreshold compoundFileThreshold) {
compoundFileThreshold.configure(mergePolicy);
compoundFileThreshold.configure(tieredMergePolicy);
compoundFileThreshold.configure(logByteSizeMergePolicy);
}

void setDeletesPctAllowed(Double deletesPctAllowed) {
mergePolicy.setDeletesPctAllowed(deletesPctAllowed);
tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed);
// LogByteSizeMergePolicy doesn't have a similar configuration option
}

private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) {
Expand All @@ -258,8 +318,11 @@ private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerT
}

@SuppressForbidden(reason = "we always use an appropriate merge scheduler alongside this policy so NoMergePolic#INSTANCE is ok")
MergePolicy getMergePolicy() {
return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE;
MergePolicy getMergePolicy(boolean isTimeSeriesIndex) {
if (mergesEnabled == false) {
return NoMergePolicy.INSTANCE;
}
return mergePolicyType.getMergePolicy(this, isTimeSeriesIndex);
}

private static CompoundFileThreshold parseCompoundFormat(String noCFSRatio) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3255,7 +3255,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
indexSettings,
warmer,
store,
indexSettings.getMergePolicy(),
indexSettings.getMergePolicy(isTimeseriesIndex),
buildIndexAnalyzer(mapperService),
similarityService.similarity(mapperService == null ? null : mapperService::fieldType),
codecService,
Expand Down
Loading