Skip to content

Commit 9f3babb

Browse files
committed
Added test and changed nit naming
Signed-off-by: Harshita Kaushik <harycash@amazon.com>
1 parent b3f5de3 commit 9f3babb

File tree

6 files changed

+78
-16
lines changed

6 files changed

+78
-16
lines changed

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,7 @@ public void onDefaultMaxMergeAtOnceChanged(int newDefaultMaxMergeAtOnce) {
12411241
*
12421242
* @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
12431243
*/
1244-
public void onMaxForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
1244+
public void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
12451245
if (!MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings())) {
12461246
indexSettings.getMergeSchedulerConfig().setMaxForceMergeMBPerSec(maxForceMergeMBPerSec);
12471247
}

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1180,7 +1180,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
11801180
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
11811181
scopedSettings.addSettingsUpdateConsumer(
11821182
MergeSchedulerConfig.MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
1183-
mergeSchedulerConfig::setMaxForceMergeMBPerSec
1183+
value -> mergeSchedulerConfig.updateMaxForceMergeMBPerSec(this)
11841184
);
11851185
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
11861186
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);

server/src/main/java/org/opensearch/index/MergeSchedulerConfig.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,7 @@ public final class MergeSchedulerConfig {
140140
int maxMerge = indexSettings.getValue(MAX_MERGE_COUNT_SETTING);
141141
setMaxThreadAndMergeCount(maxThread, maxMerge);
142142
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
143-
144-
// Index setting takes precedence over cluster setting
145-
if (MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings())) {
146-
this.maxForceMergeMBPerSec = indexSettings.getValue(MAX_FORCE_MERGE_MB_PER_SEC_SETTING);
147-
} else {
148-
this.maxForceMergeMBPerSec = CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.get(indexSettings.getNodeSettings());
149-
}
143+
updateMaxForceMergeMBPerSec(indexSettings);
150144
}
151145

152146
/**
@@ -214,4 +208,18 @@ public double getMaxForceMergeMBPerSec() {
214208
void setMaxForceMergeMBPerSec(double maxForceMergeMBPerSec) {
215209
this.maxForceMergeMBPerSec = maxForceMergeMBPerSec;
216210
}
211+
212+
/**
213+
* Updates the maximum force merge rate based on index settings, with fallback to cluster settings.
214+
* This method handles the case where an index-level setting is removed and should
215+
* fall back to the cluster-level setting.
216+
*/
217+
public void updateMaxForceMergeMBPerSec(IndexSettings indexSettings) {
218+
boolean hasIndexSetting = MAX_FORCE_MERGE_MB_PER_SEC_SETTING.exists(indexSettings.getSettings());
219+
if (hasIndexSetting) {
220+
this.maxForceMergeMBPerSec = indexSettings.getValue(MAX_FORCE_MERGE_MB_PER_SEC_SETTING);
221+
} else {
222+
this.maxForceMergeMBPerSec = CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.get(indexSettings.getNodeSettings());
223+
}
224+
}
217225
}

server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.opensearch.common.logging.Loggers;
4040
import org.opensearch.common.metrics.CounterMetric;
4141
import org.opensearch.common.metrics.MeanMetric;
42-
import org.opensearch.common.settings.Settings;
4342
import org.opensearch.common.unit.TimeValue;
4443
import org.opensearch.common.util.concurrent.ConcurrentCollections;
4544
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
@@ -64,7 +63,7 @@
6463
class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
6564

6665
protected final Logger logger;
67-
private final Settings indexSettings;
66+
private final IndexSettings indexSettings;
6867
private final ShardId shardId;
6968

7069
private final MeanMetric totalMerges = new MeanMetric();
@@ -83,7 +82,7 @@ class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
8382
OpenSearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
8483
this.config = indexSettings.getMergeSchedulerConfig();
8584
this.shardId = shardId;
86-
this.indexSettings = indexSettings.getSettings();
85+
this.indexSettings = indexSettings;
8786
this.logger = Loggers.getLogger(getClass(), shardId);
8887
refreshConfig();
8988
}
@@ -192,7 +191,10 @@ protected boolean maybeStall(MergeSource mergeSource) {
192191
protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMerge merge) throws IOException {
193192
MergeThread thread = super.getMergeThread(mergeSource, merge);
194193
thread.setName(
195-
OpenSearchExecutors.threadName(indexSettings, "[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName())
194+
OpenSearchExecutors.threadName(
195+
indexSettings.getSettings(),
196+
"[" + shardId.getIndexName() + "][" + shardId.id() + "]: " + thread.getName()
197+
)
196198
);
197199
return thread;
198200
}
@@ -215,6 +217,9 @@ MergeStats stats() {
215217
}
216218

217219
void refreshConfig() {
220+
// Update the config with current index settings before using it
221+
config.updateMaxForceMergeMBPerSec(indexSettings);
222+
218223
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
219224
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
220225
}

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ protected void closeInternal() {
595595
clusterService.getClusterSettings()
596596
.addSettingsUpdateConsumer(
597597
MergeSchedulerConfig.CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING,
598-
this::onMaxForceMergeMBPerSecUpdate
598+
this::onClusterLevelForceMergeMBPerSecUpdate
599599
);
600600
}
601601

@@ -707,10 +707,10 @@ private void onDefaultMaxMergeAtOnceUpdate(int newDefaultMaxMergeAtOnce) {
707707
*
708708
* @param maxForceMergeMBPerSec the updated cluster max force merge MB per second rate.
709709
*/
710-
private void onMaxForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
710+
private void onClusterLevelForceMergeMBPerSecUpdate(double maxForceMergeMBPerSec) {
711711
for (Map.Entry<String, IndexService> entry : indices.entrySet()) {
712712
IndexService indexService = entry.getValue();
713-
indexService.onMaxForceMergeMBPerSecUpdate(maxForceMergeMBPerSec);
713+
indexService.onClusterLevelForceMergeMBPerSecUpdate(maxForceMergeMBPerSec);
714714
}
715715
}
716716

server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,4 +166,53 @@ public void testDynamicRateLimitUpdates() throws Exception {
166166
Loggers.setLevel(logger, (Level) null);
167167
}
168168
}
169+
170+
/**
171+
* Test that when index-level setting is removed, scheduler falls back to cluster-level setting
172+
*/
173+
public void testFallbackToClusterSettingWhenIndexSettingRemoved() throws Exception {
174+
MockAppender mockAppender = new MockAppender("testFallbackToClusterSettingWhenIndexSettingRemoved");
175+
mockAppender.start();
176+
final Logger logger = LogManager.getLogger(OpenSearchConcurrentMergeScheduler.class);
177+
Loggers.addAppender(logger, mockAppender);
178+
Loggers.setLevel(logger, Level.INFO);
179+
180+
try {
181+
Settings nodeSettings = Settings.builder().put(CLUSTER_MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "50.0").build();
182+
183+
// Start with index-level setting that overrides cluster setting
184+
Settings.Builder builder = Settings.builder()
185+
.put(SETTING_VERSION_CREATED, Version.CURRENT)
186+
.put(SETTING_NUMBER_OF_SHARDS, "1")
187+
.put(SETTING_NUMBER_OF_REPLICAS, "0")
188+
.put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "25.0");
189+
190+
IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", builder.build()), nodeSettings);
191+
ShardId shardId = new ShardId("test_index", "test_uuid", 0);
192+
193+
OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings);
194+
195+
// Should initially use index-level setting
196+
assertThat(scheduler.getForceMergeMBPerSec(), equalTo(25.0));
197+
198+
// Remove the index-level setting by NOT setting MAX_FORCE_MERGE_MB_PER_SEC_SETTING - should trigger fallback to cluster setting
199+
mockAppender.reset();
200+
Settings.Builder newBuilder = Settings.builder()
201+
.put(SETTING_VERSION_CREATED, Version.CURRENT)
202+
.put(SETTING_NUMBER_OF_SHARDS, "1")
203+
.put(SETTING_NUMBER_OF_REPLICAS, "0");
204+
205+
indexSettings.updateIndexMetadata(newIndexMeta("test_index", newBuilder.build()));
206+
scheduler.refreshConfig();
207+
208+
// Should now use cluster-level setting
209+
assertThat(scheduler.getForceMergeMBPerSec(), equalTo(50.0));
210+
assertTrue("Should log rate limit update when falling back to cluster setting", mockAppender.sawRateLimitUpdate);
211+
212+
} finally {
213+
Loggers.removeAppender(logger, mockAppender);
214+
mockAppender.stop();
215+
Loggers.setLevel(logger, (Level) null);
216+
}
217+
}
169218
}

0 commit comments

Comments
 (0)