Skip to content

Commit 843a83b

Browse files
harshitakaushik-devPeter Alfonsi
authored andcommitted
Add support for merge rate limiting (opensearch-project#19309)
Signed-off-by: Harshita Kaushik <harycash@amazon.com>
1 parent 4232a13 commit 843a83b

File tree

9 files changed

+359
-4
lines changed

9 files changed

+359
-4
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3434
- [Rule based auto-tagging] Add Rule based auto-tagging IT ([#18550](https://github.com/opensearch-project/OpenSearch/pull/18550))
3535
- Add all-active ingestion as docrep equivalent in pull-based ingestion ([#19316](https://github.com/opensearch-project/OpenSearch/pull/19316))
3636
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
37+
- Add rate limiting for merges with cluster and index settings ([#19309](https://github.com/opensearch-project/OpenSearch/pull/19309))
3738
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
39+
- Add rate limiting for merges with cluster and index settings ([#19309](https://github.com/opensearch-project/OpenSearch/pull/19309))
3840
- Enable skip_list for @timestamp field or index sort field by default([#19480](https://github.com/opensearch-project/OpenSearch/pull/19480))
3941
- Implement GRPC MatchPhrase, MultiMatch queries ([#19449](https://github.com/opensearch-project/OpenSearch/pull/19449))
4042
- Optimize gRPC transport thread management for improved throughput ([#19278](https://github.com/opensearch-project/OpenSearch/pull/19278))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.opensearch.index.IndexModule;
115115
import org.opensearch.index.IndexSettings;
116116
import org.opensearch.index.IndexingPressure;
117+
import org.opensearch.index.MergeSchedulerConfig;
117118
import org.opensearch.index.SegmentReplicationPressureService;
118119
import org.opensearch.index.ShardIndexingPressureMemoryManager;
119120
import org.opensearch.index.ShardIndexingPressureSettings;
@@ -523,6 +524,7 @@ public void apply(Settings value, Settings current, Settings previous) {
523524
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
524525
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
525526
IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY,
527+
MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
526528
ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING,
527529
ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING,
528530
ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING,

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
9090
MergeSchedulerConfig.AUTO_THROTTLE_SETTING,
9191
MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING,
9292
MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING,
93+
MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
9394
IndexMetadata.SETTING_INDEX_VERSION_CREATED,
9495
IndexMetadata.SETTING_INDEX_CREATION_DATE,
9596
IndexMetadata.INDEX_UUID_SETTING,

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,6 +1235,18 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
12351235
indexSettings.setDefaultMaxMergesAtOnce(newDefaultMaxMergeAtOnce);
12361236
}
12371237

1238+
/**
1239+
* Called whenever the cluster level {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} changes.
1240+
* The change is only applied if the index doesn't have its own explicit force merge MB per sec setting.
1241+
*
1242+
* @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
1243+
*/
1244+
public void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
1245+
if (!MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings())) {
1246+
indexSettings.getMergeSchedulerConfig().setMaxForceMergeMBPerSec(maxForceMergeMBPerSec);
1247+
}
1248+
}
1249+
12381250
/**
12391251
* Called whenever the refresh interval changes. This can happen in 2 cases -
12401252
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
11781178
mergeSchedulerConfig::setMaxThreadAndMergeCount
11791179
);
11801180
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
1181+
scopedSettings.addSettingsUpdateConsumer(
1182+
MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
1183+
value -> mergeSchedulerConfig.updateMaxForceMergeMBPerSec(this)
1184+
);
11811185
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
11821186
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
11831187
scopedSettings.addSettingsUpdateConsumer(

server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,28 @@
6363
* unluckily suddenly requires a large merge will see that merge aggressively
6464
* throttled, while an application doing heavy indexing will see the throttle
6565
* move higher to allow merges to keep up with ongoing indexing.
66+
*
67+
* <li><code>index.merge.scheduler.max_force_merge_mb_per_sec</code>:
68+
* <p>
69+
* Controls the rate limiting for forced merges in MB per second at the index level.
70+
* The default value of Double.POSITIVE_INFINITY means no rate limiting is applied.
71+
* Setting a finite positive value will limit the throughput of forced merge operations
72+
* to the specified rate. This setting takes precedence over the cluster-level setting.
73+
*
74+
* <li><code>cluster.merge.scheduler.max_force_merge_mb_per_sec</code>:
75+
* <p>
76+
* Controls the rate limiting for forced merges in MB per second at the cluster level.
77+
* The default value of Double.POSITIVE_INFINITY means no rate limiting is applied.
78+
* This setting is used as a fallback when the index-level setting is not specified.
79+
* Index-level settings take precedence over this cluster-level setting.
80+
* </ul>
81+
*
82+
* <p><b>Setting Precedence:</b>
83+
* <ul>
84+
* <li>If only index-level setting is specified: uses index value
85+
* <li>If only cluster-level setting is specified: uses cluster value
86+
* <li>If both settings are specified: uses index value (index takes precedence)
87+
* <li>If neither setting is specified: uses default value (Double.POSITIVE_INFINITY)
6688
* </ul>
6789
*
6890
* @opensearch.api
@@ -90,16 +112,35 @@ public final class MergeSchedulerConfig {
90112
Property.Dynamic,
91113
Property.IndexScope
92114
);
115+
public static final Setting<Double> MAX_FORCE_MERGE_MB_PER_SEC_SETTING = Setting.doubleSetting(
116+
"index.merge.scheduler.max_force_merge_mb_per_sec",
117+
Double.POSITIVE_INFINITY,
118+
0.0d,
119+
Double.POSITIVE_INFINITY,
120+
Property.Dynamic,
121+
Property.IndexScope
122+
);
123+
124+
public static final Setting<Double> CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING = Setting.doubleSetting(
125+
"cluster.merge.scheduler.max_force_merge_mb_per_sec",
126+
Double.POSITIVE_INFINITY,
127+
0.0d,
128+
Double.POSITIVE_INFINITY,
129+
Property.Dynamic,
130+
Property.NodeScope
131+
);
93132

94133
private volatile boolean autoThrottle;
95134
private volatile int maxThreadCount;
96135
private volatile int maxMergeCount;
136+
private volatile double maxForceMergeMBPerSec;
97137

98138
MergeSchedulerConfig(IndexSettings indexSettings) {
99139
int maxThread = indexSettings.getValue(MAX_THREAD_COUNT_SETTING);
100140
int maxMerge = indexSettings.getValue(MAX_MERGE_COUNT_SETTING);
101141
setMaxThreadAndMergeCount(maxThread, maxMerge);
102142
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
143+
updateMaxForceMergeMBPerSec(indexSettings);
103144
}
104145

105146
/**
@@ -151,4 +192,34 @@ void setMaxThreadAndMergeCount(int maxThreadCount, int maxMergeCount) {
151192
public int getMaxMergeCount() {
152193
return maxMergeCount;
153194
}
195+
196+
/**
197+
* Returns the maximum force merge rate in MB per second.
198+
* A value of Double.POSITIVE_INFINITY indicates no rate limiting.
199+
*/
200+
public double getMaxForceMergeMBPerSec() {
201+
return maxForceMergeMBPerSec;
202+
}
203+
204+
/**
205+
* Sets the maximum force merge rate in MB per second.
206+
* A value of Double.POSITIVE_INFINITY disables rate limiting.
207+
*/
208+
void setMaxForceMergeMBPerSec(double maxForceMergeMBPerSec) {
209+
this.maxForceMergeMBPerSec = maxForceMergeMBPerSec;
210+
}
211+
212+
/**
213+
* Updates the maximum force merge rate based on index settings, with fallback to cluster settings.
214+
* This method handles the case where an index-level setting is removed and should
215+
* fall back to the cluster-level setting.
216+
*/
217+
public void updateMaxForceMergeMBPerSec(IndexSettings indexSettings) {
218+
boolean hasIndexSetting = MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings());
219+
if (hasIndexSetting) {
220+
this.maxForceMergeMBPerSec = indexSettings.getValue(MAX_FORCE_MERGE_MB_PER_SEC_SETTING);
221+
} else {
222+
this.maxForceMergeMBPerSec = CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.get(indexSettings.getNodeSettings());
223+
}
224+
}
154225
}

server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.opensearch.common.logging.Loggers;
4040
import org.opensearch.common.metrics.CounterMetric;
4141
import org.opensearch.common.metrics.MeanMetric;
42-
import org.opensearch.common.settings.Settings;
4342
import org.opensearch.common.unit.TimeValue;
4443
import org.opensearch.common.util.concurrent.ConcurrentCollections;
4544
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
@@ -64,7 +63,7 @@
6463
class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
6564

6665
protected final Logger logger;
67-
private final Settings indexSettings;
66+
private final IndexSettings indexSettings;
6867
private final ShardId shardId;
6968

7069
private final MeanMetric totalMerges = new MeanMetric();
@@ -83,7 +82,7 @@ class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
8382
OpenSearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
8483
this.config = indexSettings.getMergeSchedulerConfig();
8584
this.shardId = shardId;
86-
this.indexSettings = indexSettings.getSettings();
85+
this.indexSettings = indexSettings;
8786
this.logger = Loggers.getLogger(getClass(), shardId);
8887
refreshConfig();
8988
}
@@ -192,7 +191,10 @@ protected boolean maybeStall(MergeSource mergeSource) {
192191
protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
193192
MergeThread thread = super.getMergeThread(mergeSource, merge);
194193
thread.setName(
195-
OpenSearchExecutors.threadName(indexSettings, "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName())
194+
OpenSearchExecutors.threadName(
195+
indexSettings.getSettings(),
196+
"[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName()
197+
)
196198
);
197199
return thread;
198200
}
@@ -215,6 +217,9 @@ MergeStats stats() {
215217
}
216218

217219
void refreshConfig() {
220+
// Update the config with current index settings before using it
221+
config.updateMaxForceMergeMBPerSec(indexSettings);
222+
218223
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
219224
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
220225
}
@@ -224,6 +229,26 @@ void refreshConfig() {
224229
} else if (config.isAutoThrottle() == false && isEnabled) {
225230
disableAutoIOThrottle();
226231
}
232+
applyMergeRateLimit();
233+
}
234+
235+
/**
236+
* Applies the merge rate limit based on the current configuration.
237+
* If the setting value is Double.POSITIVE_INFINITY, rate limiting is disabled.
238+
* Otherwise, auto-throttling is enabled as the available rate limiting mechanism.
239+
*/
240+
private void applyMergeRateLimit() {
241+
double maxForceMergeMBPerSec = config.getMaxForceMergeMBPerSec();
242+
if (maxForceMergeMBPerSec != getForceMergeMBPerSec()) {
243+
logger.info(
244+
"[{}][{}] updating force merge rate limit from [{}] to [{}] MB/sec",
245+
shardId.getIndexName(),
246+
shardId.id(),
247+
super.getForceMergeMBPerSec(),
248+
maxForceMergeMBPerSec
249+
);
250+
setForceMergeMBPerSec(maxForceMergeMBPerSec);
251+
}
227252
}
228253

229254
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
import org.opensearch.index.IndexService;
107107
import org.opensearch.index.IndexSettings;
108108
import org.opensearch.index.IngestionConsumerFactory;
109+
import org.opensearch.index.MergeSchedulerConfig;
109110
import org.opensearch.index.ReplicationStats;
110111
import org.opensearch.index.analysis.AnalysisRegistry;
111112
import org.opensearch.index.cache.request.ShardRequestCache;
@@ -591,6 +592,11 @@ protected void closeInternal() {
591592
this.defaultMaxMergeAtOnce = CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.get(clusterService.getSettings());
592593
clusterService.getClusterSettings()
593594
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, this::onDefaultMaxMergeAtOnceUpdate);
595+
clusterService.getClusterSettings()
596+
.addSettingsUpdateConsumer(
597+
MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
598+
this::onClusterLevelForceMergeMBPerSecUpdate
599+
);
594600
}
595601

596602
@InternalApi
@@ -694,6 +700,20 @@ private void onDefaultMaxMergeAtOnceUpdate(int newDefaultMaxMergeAtOnce) {
694700
}
695701
}
696702

703+
/**
704+
* The changes to dynamic cluster setting {@code cluster.merge.scheduler.max_force_merge_mb_per_sec} needs to be updated. This
705+
* method gets called whenever the setting changes. We notify the change to all IndexService instances that are created on this node
706+
* so they can update their merge schedulers accordingly.
707+
*
708+
* @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
709+
*/
710+
private void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
711+
for (Map.Entry<String, IndexService> entry : indices.entrySet()) {
712+
IndexService indexService = entry.getValue();
713+
indexService.onClusterLevelForceMergeMBPerSecUpdate(maxForceMergeMBPerSec);
714+
}
715+
}
716+
697717
private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTranslogFactorySupplier(
698718
Supplier<RepositoriesService> repositoriesServiceSupplier,
699719
ThreadPool threadPool,

0 commit comments

Comments
 (0)