Skip to content

Commit

Permalink
Add support to dynamically resize threadpools size
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Oct 8, 2024
1 parent 96082f7 commit 8d75efd
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
- Add support to dynamically resize threadpools size. ([#16236](https://github.com/opensearch-project/OpenSearch/pull/16236))
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
- New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,35 @@ public void testMissingUnits() {
}
}

public void testThreadPoolSettings() {
String key1 = "cluster.thread_pool.snapshot.max";
Settings transientSettings = Settings.builder().put(key1, "-1").build();

String key2 = "cluster.thread_pool.snapshot.max";
Settings persistentSettings = Settings.builder().put(key2, "5").build();

try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(transientSettings)
.setPersistentSettings(persistentSettings)
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value");
}

transientSettings = Settings.builder().put(key1, "1").build();
persistentSettings = Settings.builder().put(key2, "5").build();
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(transientSettings)
.setPersistentSettings(persistentSettings)
.get();
}

public void testLoggerLevelUpdate() {
assertAcked(prepareCreate("test"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,10 @@ public void apply(Settings value, Settings current, Settings previous) {
// Settings to be used for limiting rest requests
ResponseLimitSettings.CAT_INDICES_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SHARDS_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING,

// Thread pool Settings
ThreadPool.CLUSTER_THREAD_POOL_SIZE_SETTING
)
)
);
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ protected Node(
additionalSettingsFilter,
settingsUpgraders
);
threadPool.setClusterSettings(settingsModule.getClusterSettings());
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))
Expand Down
63 changes: 63 additions & 0 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.SizeValue;
Expand Down Expand Up @@ -211,6 +212,8 @@ public static ThreadPoolType fromType(String type) {

private final ScheduledThreadPoolExecutor scheduler;

private ClusterSettings clusterSettings = null;

public Collection<ExecutorBuilder> builders() {
return Collections.unmodifiableCollection(builders.values());
}
Expand All @@ -222,10 +225,70 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

public static final Setting<Settings> CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting("cluster.thread_pool.", (tpSettings) -> {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
int max = tpGroup.getAsInt("max", 1);
int core = tpGroup.getAsInt("core", 1);
int size = tpGroup.getAsInt("size", 1);
if (max <= 0 || core <= 0 || size <= 0) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
}
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope

);

public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
this(settings, null, customBuilders);
}

public void setThreadPool(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
int max = tpGroup.getAsInt("max", o.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", o.getCorePoolSize());
if (core > max) {
// Can we do better than silently ignoring this as this can't be caught in static validation ?
logger.error("Thread pool {} core {} is higher than maximum value {}. Ignoring it", tpName, core, max);
continue;
}
// Below check makes sure we adhere to the constraint that cores <= max at all the time.
if (core < o.getCorePoolSize()) {
o.setCorePoolSize(core);
o.setMaximumPoolSize(max);
} else {
o.setMaximumPoolSize(max);
o.setCorePoolSize(core);
}
} else {
int size = tpGroup.getAsInt("size", o.getMaximumPoolSize());
if (size < o.getCorePoolSize()) {
o.setCorePoolSize(size);
o.setMaximumPoolSize(size);
} else {
o.setMaximumPoolSize(size);
o.setCorePoolSize(size);
}
}
}
}

public void setClusterSettings(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool);
}

public ThreadPool(
final Settings settings,
final AtomicReference<RunnableTaskExecutionListener> runnableTaskListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -152,4 +153,51 @@ public void testInheritContextOnSchedule() throws InterruptedException {
terminate(threadPool);
}
}

public void testThreadPoolResize() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
// increase it
Settings commonSettings = Settings.builder().put("snapshot.max", "10").put("snapshot.core", "2").put("get.size", "100").build();
threadPool.setThreadPool(commonSettings);
ExecutorService executorService = threadPool.executor("snapshot");
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(10, executor.getMaximumPoolSize());
assertEquals(2, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(100, executor.getMaximumPoolSize());
assertEquals(100, executor.getCorePoolSize());

// decrease it
commonSettings = Settings.builder().put("snapshot.max", "2").put("snapshot.core", "1").put("get.size", "90").build();
threadPool.setThreadPool(commonSettings);
executorService = threadPool.executor("snapshot");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(2, executor.getMaximumPoolSize());
assertEquals(1, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(90, executor.getMaximumPoolSize());
assertEquals(90, executor.getCorePoolSize());
} finally {
terminate(threadPool);
}
}

public void testThreadPoolResizeFail() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
Settings commonSettings = Settings.builder().put("snapshot.max", "50").put("snapshot.core", "100").build();
threadPool.setThreadPool(commonSettings);
ExecutorService executorService = threadPool.executor("snapshot");
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService;
assertNotEquals(50, executor.getMaximumPoolSize());
assertNotEquals(100, executor.getCorePoolSize());
} finally {
terminate(threadPool);
}
}
}

0 comments on commit 8d75efd

Please sign in to comment.