From 9260ca66e427179e408d46f6015ac8349e52f822 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Thu, 3 Apr 2025 04:57:24 +0000 Subject: [PATCH 1/4] Increase default maxMergeAtOnce from 10 to 30 Signed-off-by: Varun Bansal --- .../index/ClusterMaxMergesAtOnceIT.java | 134 ++++++++++++++++++ .../common/settings/ClusterSettings.java | 1 + .../org/opensearch/index/IndexModule.java | 12 +- .../org/opensearch/index/IndexService.java | 17 ++- .../org/opensearch/index/IndexSettings.java | 26 +++- .../index/TieredMergePolicyProvider.java | 43 +++--- .../opensearch/indices/IndicesService.java | 45 +++++- .../opensearch/index/IndexModuleTests.java | 3 +- 8 files changed, 249 insertions(+), 32 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java b/server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java new file mode 100644 index 0000000000000..789df49335d72 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index; + +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.indices.IndicesService; +import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2) +public class ClusterMaxMergesAtOnceIT extends AbstractSnapshotIntegTestCase { + + @Override + public Settings indexSettings() { + return Settings.builder().put(super.indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build(); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + internalCluster().startClusterManagerOnlyNode(); + } + + public void testClusterLevelDefaultUpdatesMergePolicy() throws ExecutionException, InterruptedException { + String clusterManagerName = internalCluster().getClusterManagerName(); + List dataNodes = new ArrayList<>(internalCluster().getDataNodeNames()); + + String indexName = "log-myindex-1"; + createIndex(indexName); + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes)); + String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); + assertEquals(30, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.getKey(), 20)) + .get(); + + indexName = "log-myindex-2"; + createIndex(indexName); + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes)); + uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService secondIndexService = indicesService.indexService(new Index(indexName, uuid)); + assertEquals(20, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + assertEquals(20, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + + // Create index with index level override in settings + indexName = "log-myindex-3"; + createIndex( + indexName, + Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 15).build() + ); + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + indicesService = internalCluster().getInstance(IndicesService.class, randomFrom(dataNodes)); + uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService thirdIndexService = indicesService.indexService(new Index(indexName, uuid)); + assertEquals(15, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + + // changing cluster level default should only affect indices without index level override + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.getKey(), 35)) + .get(); + assertEquals(35, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + assertEquals(35, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + assertEquals(15, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + + // removing index level override should pick up the cluster level default + final UpdateSettingsRequestBuilder builder = client().admin().indices().prepareUpdateSettings(indexName); + builder.setSettings( + Settings.builder().putNull(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey()).build() + ); + builder.execute().actionGet(); + + assertEquals(35, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + assertEquals(35, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + assertEquals(35, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 9d8b458d70966..bcd779334b244 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -297,6 +297,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING, IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING, IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING, + IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING, IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 3dfcad48fb9d5..c764a172f26e4 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -632,7 +632,8 @@ public IndexService newIndexService( Supplier clusterDefaultRefreshIntervalSupplier, Supplier fixedRefreshIntervalSchedulingEnabled, RecoverySettings recoverySettings, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + Supplier clusterDefaultMaxMergeAtOnceSupplier ) throws IOException { return newIndexService( indexCreationContext, @@ -658,7 +659,8 @@ public IndexService newIndexService( recoverySettings, remoteStoreSettings, (s) -> {}, - shardId -> ReplicationStats.empty() + shardId -> ReplicationStats.empty(), + clusterDefaultMaxMergeAtOnceSupplier ); } @@ -686,7 +688,8 @@ public IndexService newIndexService( RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings, Consumer replicator, - Function segmentReplicationStatsProvider + Function segmentReplicationStatsProvider, + Supplier clusterDefaultMaxMergeAtOnceSupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -750,7 +753,8 @@ public IndexService newIndexService( fileCache, compositeIndexSettings, replicator, - segmentReplicationStatsProvider + segmentReplicationStatsProvider, + clusterDefaultMaxMergeAtOnceSupplier ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index de269f1676f1c..210729234a9e4 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -239,7 +239,8 @@ public IndexService( FileCache fileCache, CompositeIndexSettings compositeIndexSettings, Consumer replicator, - Function segmentReplicationStatsProvider + Function segmentReplicationStatsProvider, + Supplier clusterDefaultMaxMergeAtOnceSupplier ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -328,6 +329,7 @@ public IndexService( this.fileCache = fileCache; this.replicator = replicator; this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; + indexSettings.setDefaultMaxMergesAtOnce(clusterDefaultMaxMergeAtOnceSupplier.get()); updateFsyncTaskIfNecessary(); } @@ -366,7 +368,8 @@ public IndexService( Supplier clusterDefaultRefreshIntervalSupplier, Supplier fixedRefreshIntervalSchedulingEnabled, RecoverySettings recoverySettings, - RemoteStoreSettings remoteStoreSettings + RemoteStoreSettings remoteStoreSettings, + Supplier clusterDefaultMaxMergeAtOnce ) { this( indexSettings, @@ -407,7 +410,8 @@ public IndexService( null, null, s -> {}, - (shardId) -> ReplicationStats.empty() + (shardId) -> ReplicationStats.empty(), + clusterDefaultMaxMergeAtOnce ); } @@ -1126,6 +1130,13 @@ private void updateReplicationTask() { } } + /** + * Called whenever the cluster level {@code cluster.default.index.max_merge_at_once} changes. + */ + public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) { + indexSettings.setDefaultMaxMergesAtOnce(newDefaultMaxMergeAtOnce); + } + /** * Called whenever the refresh interval changes. This can happen in 2 cases - * 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 38604ffd8bf8f..019390c9ff638 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1096,7 +1096,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti ); scopedSettings.addSettingsUpdateConsumer( TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, - tieredMergePolicyProvider::setMaxMergesAtOnce + this::updateMaxMergesAtOnce ); scopedSettings.addSettingsUpdateConsumer( TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, @@ -1250,6 +1250,30 @@ private void setRefreshInterval(TimeValue timeValue) { this.refreshInterval = timeValue; } + /** + * Update the default maxMergesAtOnce + * 1. sets the new default in {@code TieredMergePolicyProvide} + * 2. sets the maxMergesAtOnce on the actual TieredMergePolicy used by the engine if no index level override exists + */ + void setDefaultMaxMergesAtOnce(int newDefaultMaxMergesAtOnce) { + tieredMergePolicyProvider.setDefaultMaxMergesAtOnce(newDefaultMaxMergesAtOnce); + if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(getSettings()) == false) { + tieredMergePolicyProvider.setMaxMergesAtOnceToDefault(); + } + } + + /** + * Updates the maxMergesAtOnce for actual TieredMergePolicy used by the engine. + * Sets it to default maxMergesAtOnce if index level settings is being removed + */ + void updateMaxMergesAtOnce(int newMaxMergesAtOnce) { + if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(getSettings()) == false) { + tieredMergePolicyProvider.setMaxMergesAtOnce(newMaxMergesAtOnce); + } else { + tieredMergePolicyProvider.setMaxMergesAtOnceToDefault(); + } + } + /** * Returns the settings for this index. These settings contain the node and index level settings where * settings that are specified on both index and node level are overwritten by the index settings. diff --git a/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java index 2eeb25dee88c3..bd23acfc49d57 100644 --- a/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java +++ b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java @@ -134,6 +134,7 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider { private final Logger logger; private final boolean mergesEnabled; + private int defaultMaxMergeAtOnce = 30; public static final double DEFAULT_EXPUNGE_DELETES_ALLOWED = 10d; @@ -143,7 +144,9 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider { */ public static final ByteSizeValue DEFAULT_FLOOR_SEGMENT = new ByteSizeValue(16, ByteSizeUnit.MB); - public static final int DEFAULT_MAX_MERGE_AT_ONCE = 10; + public static final int MIN_DEFAULT_MAX_MERGE_AT_ONCE = 2; + public static final int DEFAULT_MAX_MERGE_AT_ONCE = 30; + public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB); public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d; public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d; @@ -173,7 +176,7 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider { public static final Setting INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING = Setting.intSetting( "index.merge.policy.max_merge_at_once", DEFAULT_MAX_MERGE_AT_ONCE, - 2, + MIN_DEFAULT_MAX_MERGE_AT_ONCE, Property.Dynamic, Property.IndexScope ); @@ -225,7 +228,7 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider { INDEX_MERGE_ENABLED ); } - maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier); + tieredMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING)); tieredMergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed); tieredMergePolicy.setFloorSegmentMB(floorSegment.getMbFrac()); @@ -247,6 +250,21 @@ void setMaxMergesAtOnce(Integer maxMergeAtOnce) { tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); } + /** + * Update the value for maxMergesAtOnce in TieredMergePolicy used by engine to default value. + * This would happen if index level override is being removed and we need to fallback to cluster level default + */ + void setMaxMergesAtOnceToDefault() { + tieredMergePolicy.setMaxMergeAtOnce(defaultMaxMergeAtOnce); + } + + /** + * Update the default value for maxMergesAtOnce. It is used when index level override is not present + */ + void setDefaultMaxMergesAtOnce(Integer defaultMaxMergesAtOnce) { + this.defaultMaxMergeAtOnce = defaultMaxMergesAtOnce; + } + void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) { tieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac()); } @@ -263,25 +281,6 @@ void setDeletesPctAllowed(Double deletesPctAllowed) { tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); } - private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) { - // fixing maxMergeAtOnce, see TieredMergePolicy#setMaxMergeAtOnce - if (!(segmentsPerTier >= maxMergeAtOnce)) { - int newMaxMergeAtOnce = (int) segmentsPerTier; - // max merge at once should be at least 2 - if (newMaxMergeAtOnce <= 1) { - newMaxMergeAtOnce = 2; - } - logger.debug( - "changing max_merge_at_once from [{}] to [{}] because segments_per_tier [{}] has to be higher or " + "equal to it", - maxMergeAtOnce, - newMaxMergeAtOnce, - segmentsPerTier - ); - maxMergeAtOnce = newMaxMergeAtOnce; - } - return maxMergeAtOnce; - } - public MergePolicy getMergePolicy() { return mergesEnabled ? tieredMergePolicy : NoMergePolicy.INSTANCE; } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index ec60fa61d0c99..cac74e328d90a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -209,6 +209,8 @@ import static org.opensearch.core.common.util.CollectionUtils.arrayAsArrayList; import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX; import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION; +import static org.opensearch.index.TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE; +import static org.opensearch.index.TieredMergePolicyProvider.MIN_DEFAULT_MAX_MERGE_AT_ONCE; import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; @@ -275,6 +277,21 @@ public class IndicesService extends AbstractLifecycleComponent Property.Dynamic ); + /** + * This setting is used to set the maxMergeAtOnce parameter for {@code TieredMergePolicy} + * when the {@code index.merge.policy.max_merge_at_once} index setting is not provided during index creation + * or when the existing {@code index.merge.policy.max_merge_at_once} index setting is set as null. + * This comes handy when the user wants to change the maxMergeAtOnce across all indexes created in a cluster + * which is different from the default. + */ + public static final Setting CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING = Setting.intSetting( + "cluster.default.index.max_merge_at_once", + DEFAULT_MAX_MERGE_AT_ONCE, + MIN_DEFAULT_MAX_MERGE_AT_ONCE, + Property.NodeScope, + Property.Dynamic + ); + /** * This setting is used to set the minimum refresh interval applicable for all indexes in a cluster. The * {@code cluster.default.index.refresh_interval} setting value needs to be higher than this setting's value. Index @@ -381,6 +398,7 @@ public class IndicesService extends AbstractLifecycleComponent private final Consumer replicator; private final Function segmentReplicationStatsProvider; private volatile int maxSizeInRequestCache; + private volatile int defaultMaxMergeAtOnce; @Override protected void doStart() { @@ -544,6 +562,10 @@ protected void closeInternal() { this.maxSizeInRequestCache = INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING, this::setMaxSizeInRequestCache); + + this.defaultMaxMergeAtOnce = CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, this::onDefaultMaxMergeAtOnceUpdate); } public IndicesService( @@ -628,6 +650,22 @@ private void onRefreshIntervalUpdate(TimeValue clusterDefaultRefreshInterval) { } } + /** + * The changes to dynamic cluster setting {@code cluster.default.index.max_merge_at_once} needs to be updated. This + * method gets called whenever the setting changes. We set the instance variable with the updated value as this is + * also a supplier to all IndexService that have been created on the node. We also notify the change to all + * IndexService instances that are created on this node. + * + * @param newDefaultMaxMergeAtOnce the updated cluster default maxMergeAtOnce. + */ + private void onDefaultMaxMergeAtOnceUpdate(int newDefaultMaxMergeAtOnce) { + this.defaultMaxMergeAtOnce = newDefaultMaxMergeAtOnce; // do we need this? + for (Map.Entry entry : indices.entrySet()) { + IndexService indexService = entry.getValue(); + indexService.onDefaultMaxMergeAtOnceChanged(newDefaultMaxMergeAtOnce); + } + } + private static BiFunction getTranslogFactorySupplier( Supplier repositoriesServiceSupplier, ThreadPool threadPool, @@ -1031,7 +1069,8 @@ private synchronized IndexService createIndexService( this.recoverySettings, this.remoteStoreSettings, replicator, - segmentReplicationStatsProvider + segmentReplicationStatsProvider, + this::getClusterDefaultMaxMergeAtOnce ); } @@ -2177,6 +2216,10 @@ private TimeValue getClusterDefaultRefreshInterval() { return this.clusterDefaultRefreshInterval; } + private Integer getClusterDefaultMaxMergeAtOnce() { + return this.defaultMaxMergeAtOnce; + } + public RemoteStoreSettings getRemoteStoreSettings() { return this.remoteStoreSettings; } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index b12c1c0eca628..023964c954057 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -267,7 +267,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, s -> {}, - null + null, + () -> TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE ); } From db4b0c6540cc95d7a300da8e03606729e2c6a12b Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Fri, 4 Apr 2025 06:17:38 +0000 Subject: [PATCH 2/4] add changelog entry Signed-off-by: Varun Bansal --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 455184632ea35..5fd2616ce6d3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803)) - Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768)) - Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822)) +- Introduce new dynamic cluster setting to tune maxMergeAtOnce at cluster level ([#17774](https://github.com/opensearch-project/OpenSearch/pull/17774)) ### Changed - Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912)) @@ -39,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Unwrap singleton DocValues in date histogram aggregation. ([#17643](https://github.com/opensearch-project/OpenSearch/pull/17643)) - Introduce 512 byte limit to search and ingest pipeline IDs ([#17786](https://github.com/opensearch-project/OpenSearch/pull/17786)) - Avoid skewed segment replication lag metric ([#17831](https://github.com/opensearch-project/OpenSearch/pull/17831)) +- Increase the default maxMergesAtOnce to 30 ([#17774](https://github.com/opensearch-project/OpenSearch/pull/17774)) ### Dependencies - Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669)) From 1b75c1f5aa66897665c88596977fb6dfcebb868f Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Wed, 9 Apr 2025 19:36:40 +0000 Subject: [PATCH 3/4] Handle the cases when index setting are removed and added Signed-off-by: Varun Bansal --- .../opensearch/index/ClusterMaxMergesAtOnceIT.java | 12 +++++++++++- .../java/org/opensearch/index/IndexSettings.java | 14 ++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java b/server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java index 789df49335d72..7e53595715770 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/ClusterMaxMergesAtOnceIT.java @@ -120,7 +120,7 @@ public void testClusterLevelDefaultUpdatesMergePolicy() throws ExecutionExceptio assertEquals(15, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); // removing index level override should pick up the cluster level default - final UpdateSettingsRequestBuilder builder = client().admin().indices().prepareUpdateSettings(indexName); + UpdateSettingsRequestBuilder builder = client().admin().indices().prepareUpdateSettings(indexName); builder.setSettings( Settings.builder().putNull(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey()).build() ); @@ -130,5 +130,15 @@ public void testClusterLevelDefaultUpdatesMergePolicy() throws ExecutionExceptio assertEquals(35, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); assertEquals(35, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + // update index level setting to override cluster level default + builder = client().admin().indices().prepareUpdateSettings(indexName); + builder.setSettings( + Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 17).build() + ); + builder.execute().actionGet(); + + assertEquals(35, ((OpenSearchTieredMergePolicy) indexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + assertEquals(35, ((OpenSearchTieredMergePolicy) secondIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); + assertEquals(17, ((OpenSearchTieredMergePolicy) thirdIndexService.getIndexSettings().getMergePolicy(true)).getMaxMergeAtOnce()); } } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 019390c9ff638..ea8f362550a60 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1095,8 +1095,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti tieredMergePolicyProvider::setFloorSegmentSetting ); scopedSettings.addSettingsUpdateConsumer( - TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, - this::updateMaxMergesAtOnce + this::updateMaxMergesAtOnce, + List.of(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING) ); scopedSettings.addSettingsUpdateConsumer( TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, @@ -1266,11 +1266,13 @@ void setDefaultMaxMergesAtOnce(int newDefaultMaxMergesAtOnce) { * Updates the maxMergesAtOnce for actual TieredMergePolicy used by the engine. * Sets it to default maxMergesAtOnce if index level settings is being removed */ - void updateMaxMergesAtOnce(int newMaxMergesAtOnce) { - if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(getSettings()) == false) { - tieredMergePolicyProvider.setMaxMergesAtOnce(newMaxMergesAtOnce); - } else { + void updateMaxMergesAtOnce(Settings updatedSettings) { + if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(updatedSettings) == false) { tieredMergePolicyProvider.setMaxMergesAtOnceToDefault(); + } else { + tieredMergePolicyProvider.setMaxMergesAtOnce( + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.get(updatedSettings) + ); } } From 5eb2146431c937d77301e8d5b12e15e8337ad236 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Thu, 10 Apr 2025 03:35:58 +0000 Subject: [PATCH 4/4] Improve logging Signed-off-by: Varun Bansal --- server/src/main/java/org/opensearch/index/IndexSettings.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index ea8f362550a60..a869a1b7dc231 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1252,7 +1252,7 @@ private void setRefreshInterval(TimeValue timeValue) { /** * Update the default maxMergesAtOnce - * 1. sets the new default in {@code TieredMergePolicyProvide} + * 1. sets the new default in {@code TieredMergePolicyProvider} * 2. sets the maxMergesAtOnce on the actual TieredMergePolicy used by the engine if no index level override exists */ void setDefaultMaxMergesAtOnce(int newDefaultMaxMergesAtOnce) { @@ -1268,6 +1268,7 @@ void setDefaultMaxMergesAtOnce(int newDefaultMaxMergesAtOnce) { */ void updateMaxMergesAtOnce(Settings updatedSettings) { if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(updatedSettings) == false) { + logger.debug("Resetting maxMergesAtOnce to cluster default"); tieredMergePolicyProvider.setMaxMergesAtOnceToDefault(); } else { tieredMergePolicyProvider.setMaxMergesAtOnce(