Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to dynamically resize threadpools size #16236

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
- Add support to dynamically resize threadpools size. ([#16236](https://github.com/opensearch-project/OpenSearch/pull/16236))
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
- New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,100 @@ public void testMissingUnits() {
}
}

public void testThreadPoolSettings() {
// wrong threadpool
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.wrong.max", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertTrue(ex.getCause().getMessage().contains("illegal thread_pool name : "));
}

// Scaling threadpool - negative value
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value");
}

// Scaling threadpool - Other than max and core
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.wrong", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [core, max]");
}

// Scaling threadpool - core > max
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put("cluster.thread_pool.snapshot.core", "2").put("cluster.thread_pool.snapshot.max", "1").build()
)
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "core threadpool size cannot be greater than max");
}
gbbafna marked this conversation as resolved.
Show resolved Hide resolved

// Scaling threadpool - happy case
ClusterUpdateSettingsResponse clusterUpdateSettingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "1").build())
.setPersistentSettings(Settings.builder().put("cluster.thread_pool.snapshot.max", "5").build())
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
.get();
assertTrue(clusterUpdateSettingsResponse.isAcknowledged());

// Fixed threadpool - Other than size
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.wrong", "-1").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal thread_pool config : [wrong] should only have [size]");
}

// Fixed threadpool - 0 value
try {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.size", "0").build())
.get();
fail("bogus value");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getCause().getMessage(), "illegal value for [cluster.thread_pool.get], has to be positive value");
}

// Fixed threadpool - happy case
clusterUpdateSettingsResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.thread_pool.get.size", "1").build())
.setPersistentSettings(Settings.builder().put("cluster.thread_pool.get.size", "1").build())
.get();
assertTrue(clusterUpdateSettingsResponse.isAcknowledged());
}

public void testLoggerLevelUpdate() {
assertAcked(prepareCreate("test"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,10 @@ public void apply(Settings value, Settings current, Settings previous) {
// Settings to be used for limiting rest requests
ResponseLimitSettings.CAT_INDICES_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SHARDS_RESPONSE_LIMIT_SETTING,
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING
ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING,

// Thread pool Settings
ThreadPool.CLUSTER_THREAD_POOL_SIZE_SETTING
)
)
);
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ protected Node(
additionalSettingsFilter,
settingsUpgraders
);
threadPool.registerClusterSettingsListeners(settingsModule.getClusterSettings());
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))
Expand Down
101 changes: 101 additions & 0 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.SizeValue;
Expand All @@ -58,11 +59,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -122,6 +126,9 @@ public static class Names {
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
}

static Set<String> scalingThreadPoolKeys = new HashSet<>(Arrays.asList("max", "core"));
static Set<String> fixedThreadPoolKeys = new HashSet<>(Arrays.asList("size"));

/**
* The threadpool type.
*
Expand Down Expand Up @@ -222,6 +229,12 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

public static final Setting<Settings> CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting(
"cluster.thread_pool.",
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
this(settings, null, customBuilders);
}
Expand Down Expand Up @@ -403,6 +416,94 @@ public Info info(String name) {
return holder.info;
}

public void registerClusterSettingsListeners(ClusterSettings clusterSettings) {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool, this::validateSetting);
}

/*
Scaling threadpool can provide only max and core
Fixed/ResizableQueue can provide only size

For example valid settings would be for scaling and fixed thead pool
cluster.threadpool.snapshot.max : "5",
cluster.threadpool.snapshot.core : "5",
cluster.threadpool.get.size : "2",
*/
private void validateSetting(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
if (THREAD_POOL_TYPES.containsKey(tpName) == false) {
throw new IllegalArgumentException("illegal thread_pool name : " + tpName);
}
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor threadPoolExecutor = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
if (scalingThreadPoolKeys.containsAll(tpGroup.keySet()) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have " + scalingThreadPoolKeys
);
}
int max = tpGroup.getAsInt("max", threadPoolExecutor.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", threadPoolExecutor.getCorePoolSize());
if (core < 1 || max < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
} else if (core > max) {
throw new IllegalArgumentException("core threadpool size cannot be greater than max");
}
} else {
if (fixedThreadPoolKeys.containsAll(tpGroup.keySet()) == false) {
throw new IllegalArgumentException(
"illegal thread_pool config : " + tpGroup.keySet() + " should only have " + fixedThreadPoolKeys
);
}
int size = tpGroup.getAsInt("size", threadPoolExecutor.getMaximumPoolSize());
if (size < 1) {
throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value");
}
}
}
}

public void setThreadPool(Settings tpSettings) {
Map<String, Settings> tpGroups = tpSettings.getAsGroups();
for (Map.Entry<String, Settings> entry : tpGroups.entrySet()) {
String tpName = entry.getKey();
Settings tpGroup = entry.getValue();
ExecutorHolder holder = executors.get(tpName);
assert holder.executor instanceof OpenSearchThreadPoolExecutor;
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) holder.executor;
if (holder.info.type == ThreadPoolType.SCALING) {
int max = tpGroup.getAsInt("max", executor.getMaximumPoolSize());
int core = tpGroup.getAsInt("core", executor.getCorePoolSize());
/*
If we are decreasing, core pool size has to be decreased first.
If we are increasing ,max pool size has to be increased first
This ensures that core pool is always smaller than max pool size .
Other wise IllegalArgumentException will be thrown from ThreadPoolExecutor
*/
if (core < executor.getCorePoolSize()) {
executor.setCorePoolSize(core);
executor.setMaximumPoolSize(max);
} else {
executor.setMaximumPoolSize(max);
executor.setCorePoolSize(core);
}
} else {
int size = tpGroup.getAsInt("size", executor.getMaximumPoolSize());
if (size < executor.getCorePoolSize()) {
executor.setCorePoolSize(size);
executor.setMaximumPoolSize(size);
} else {
executor.setMaximumPoolSize(size);
executor.setCorePoolSize(size);
}
}
}
}

public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
for (ExecutorHolder holder : executors.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -152,4 +153,47 @@ public void testInheritContextOnSchedule() throws InterruptedException {
terminate(threadPool);
}
}

public void testThreadPoolResize() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
// increase it
Settings commonSettings = Settings.builder().put("snapshot.max", "10").put("snapshot.core", "2").put("get.size", "100").build();
threadPool.setThreadPool(commonSettings);
ExecutorService executorService = threadPool.executor("snapshot");
OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(10, executor.getMaximumPoolSize());
assertEquals(2, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(100, executor.getMaximumPoolSize());
assertEquals(100, executor.getCorePoolSize());

// decrease it
commonSettings = Settings.builder().put("snapshot.max", "2").put("snapshot.core", "1").put("get.size", "90").build();
threadPool.setThreadPool(commonSettings);
executorService = threadPool.executor("snapshot");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(2, executor.getMaximumPoolSize());
assertEquals(1, executor.getCorePoolSize());

executorService = threadPool.executor("get");
executor = (OpenSearchThreadPoolExecutor) executorService;
assertEquals(90, executor.getMaximumPoolSize());
assertEquals(90, executor.getCorePoolSize());
} finally {
terminate(threadPool);
}
}

public void testThreadPoolResizeFail() {
TestThreadPool threadPool = new TestThreadPool("test");
try {
Settings commonSettings = Settings.builder().put("snapshot.max", "50").put("snapshot.core", "100").build();
assertThrows(IllegalArgumentException.class, () -> threadPool.setThreadPool(commonSettings));
} finally {
terminate(threadPool);
}
}
}
Loading