Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update merge on refresh and merge on commit defaults in Opensearch (Lucene 9.3) #3561

Merged
merged 2 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(InternalSettingsPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/3561")
public void testHighWatermarkNotExceeded() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
50 changes: 46 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
package org.opensearch.index;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -53,9 +54,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
Expand All @@ -73,6 +76,9 @@
* @opensearch.internal
*/
public final class IndexSettings {
private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default";
private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";

public static final Setting<List<String>> DEFAULT_FIELD_SETTING = Setting.listSetting(
"index.query.default_field",
Collections.singletonList("*"),
Expand Down Expand Up @@ -526,14 +532,21 @@ public final class IndexSettings {
public static final Setting<TimeValue> INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting(
"index.merge_on_flush.max_full_flush_merge_wait_time",
new TimeValue(10, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
new TimeValue(1, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Boolean> INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting(
"index.merge_on_flush.enabled",
false,
true, /* https://issues.apache.org/jira/browse/LUCENE-10078 */
Property.IndexScope,
Property.Dynamic
);

public static final Setting<String> INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString(
"index.merge_on_flush.policy",
MERGE_ON_FLUSH_DEFAULT_POLICY,
Property.IndexScope,
Property.Dynamic
);
Expand Down Expand Up @@ -632,6 +645,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
* Is merge of flush enabled or not
*/
private volatile boolean mergeOnFlushEnabled;
/**
* Specialized merge-on-flush policy if provided
*/
private volatile UnaryOperator<MergePolicy> mergeOnFlushPolicy;

/**
* Returns the default search fields for this index.
Expand Down Expand Up @@ -750,6 +767,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING);
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -822,6 +840,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -892,7 +911,7 @@ public String getUUID() {
* Returns <code>true</code> if the index has a custom data path
*/
public boolean hasCustomDataPath() {
return Strings.isNotEmpty(customDataPath());
return !Strings.isEmpty(customDataPath());
reta marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -1426,4 +1445,27 @@ public TimeValue getMaxFullFlushMergeWaitTime() {
public boolean isMergeOnFlushEnabled() {
return mergeOnFlushEnabled;
}

private void setMergeOnFlushPolicy(String policy) {
if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) {
mergeOnFlushPolicy = null;
} else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) {
this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new;
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey()
+ " has unsupported policy specified: "
+ policy
+ ". Please use one of: "
+ MERGE_ON_FLUSH_DEFAULT_POLICY
+ ", "
+ MERGE_ON_FLUSH_MERGE_POLICY
);
}
}

public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
return Optional.ofNullable(mergeOnFlushPolicy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -133,6 +132,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -2295,14 +2295,14 @@ private IndexWriterConfig getIndexWriterConfig() {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
mergePolicy = new MergeOnFlushMergePolicy(mergePolicy);
} else {
logger.warn(
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
);
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = config().getIndexSettings().getMergeOnFlushPolicy();
if (mergeOnFlushPolicy.isPresent()) {
mergePolicy = mergeOnFlushPolicy.get().apply(mergePolicy);
}
}
} else {
// Disable merge on refresh
iwc.setMaxFullFlushMergeWaitMillis(0);
}

iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,8 @@ public void testWrapAllDocsLive() throws Exception {
public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy()));
// override 500ms default introduced in
// https://issues.apache.org/jira/browse/LUCENE-10078
config.setMaxFullFlushMergeWaitMillis(0);
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy()))
.setMaxFullFlushMergeWaitMillis(0);
IndexWriter writer = new IndexWriter(dir, config);
int numDocs = between(1, 10);
List<String> liveDocs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,7 @@ public void testMergeSegmentsOnCommitIsDisabled() throws Exception {

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down Expand Up @@ -576,7 +575,7 @@ public void testMergeSegmentsOnCommit() throws Exception {
final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to switch away from a boolean here at this point? I'm not sure we expect any merge on flush policy variations other than the default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nknize the INDEX_MERGE_ON_FLUSH_POLICY is a new setting (semantics of INDEX_MERGE_ON_FLUSH_ENABLED is kept as boolean). The reason for that is that not every policy may implement merge-on-flush/commit but the composite one could be used for this reasons.

final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down Expand Up @@ -638,14 +637,52 @@ public void testMergeSegmentsOnCommit() throws Exception {
}
}

public void testMergeSegmentsOnCommitDefault() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings());
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

final TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setSegmentsPerTier(2);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get)
)
) {
List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));

ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
engine.refresh("test");

segments = engine.segments(true);
assertThat(segments.size(), equalTo(1));

ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null);
engine.index(indexForDoc(doc2));
engine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_3, null);
engine.index(indexForDoc(doc3));
engine.refresh("test");

segments = engine.segments(true);
assertThat(segments.size(), equalTo(2));
}
}

// this test writes documents to the engine while concurrently flushing/commit
public void testConcurrentMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void testGetForUpdate() throws IOException {
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)

.build();
IndexMetadata metadata = IndexMetadata.builder("test")
.putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
Expand Down