diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java
index 827f3a12fbce4..39ce218dd801a 100644
--- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java
+++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java
@@ -33,7 +33,7 @@
*
* Set specific setting to for setting the threshold of throttling of particular task type.
* e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
- * Set it to default value(-1) to disable the throttling for this task type.
+ * Set it to default value(-1) to disable the throttling for this task type.
*/
public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);
@@ -69,7 +69,7 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener;
- private final ConcurrentMap tasksCount;
+ final ConcurrentMap tasksCount;
private final ConcurrentMap tasksThreshold;
private final Supplier minNodeVersionSupplier;
@@ -209,30 +209,59 @@ Long getThrottlingLimit(final String taskKey) {
return tasksThreshold.get(taskKey);
}
+ private void failFastWhenThrottlingThresholdsAreAlreadyBreached(
+ final boolean throttlingEnabledWithThreshold,
+ final Long threshold,
+ final long existingTaskCount,
+ final int incomingTaskCount,
+ final String taskThrottlingKey
+ ) {
+ if (throttlingEnabledWithThreshold && shouldThrottle(threshold, existingTaskCount, incomingTaskCount)) {
+ throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey);
+ }
+ }
+
@Override
public void onBeginSubmit(List extends TaskBatcher.BatchedTask> tasks) {
- ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey)
+ final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey)
.getClusterManagerThrottlingKey();
- tasksCount.putIfAbsent(clusterManagerThrottlingKey.getTaskThrottlingKey(), 0L);
- tasksCount.computeIfPresent(clusterManagerThrottlingKey.getTaskThrottlingKey(), (key, count) -> {
- int size = tasks.size();
- if (clusterManagerThrottlingKey.isThrottlingEnabled()) {
- Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey());
- if (threshold != null && shouldThrottle(threshold, count, size)) {
- clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size);
- logger.warn(
- "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
- clusterManagerThrottlingKey.getTaskThrottlingKey(),
- tasks.size(),
- threshold
- );
- throw new ClusterManagerThrottlingException(
- "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey.getTaskThrottlingKey()
- );
- }
- }
- return count + size;
- });
+ final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey();
+ final Long threshold = getThrottlingLimit(taskThrottlingKey);
+ final boolean isThrottlingEnabledWithThreshold = clusterManagerThrottlingKey.isThrottlingEnabled() && threshold != null;
+ int incomingTaskCount = tasks.size();
+
+ try {
+ tasksCount.putIfAbsent(taskThrottlingKey, 0L);
+ // Perform shallow check before acquiring lock to avoid blocking of network threads
+ // if throttling is ongoing for a specific task
+ failFastWhenThrottlingThresholdsAreAlreadyBreached(
+ isThrottlingEnabledWithThreshold,
+ threshold,
+ tasksCount.get(taskThrottlingKey),
+ incomingTaskCount,
+ taskThrottlingKey
+ );
+
+ tasksCount.computeIfPresent(taskThrottlingKey, (key, existingTaskCount) -> {
+ failFastWhenThrottlingThresholdsAreAlreadyBreached(
+ isThrottlingEnabledWithThreshold,
+ threshold,
+ existingTaskCount,
+ incomingTaskCount,
+ taskThrottlingKey
+ );
+ return existingTaskCount + incomingTaskCount;
+ });
+ } catch (final ClusterManagerThrottlingException e) {
+ clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, incomingTaskCount);
+ logger.trace(
+ "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
+ taskThrottlingKey,
+ incomingTaskCount,
+ threshold
+ );
+ throw e;
+ }
}
/**
diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java
index 04fa9fa45d5ea..7a835910c400f 100644
--- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java
+++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java
@@ -25,4 +25,10 @@ public ClusterManagerThrottlingException(String msg, Object... args) {
public ClusterManagerThrottlingException(StreamInput in) throws IOException {
super(in);
}
+
+ @Override
+ public Throwable fillInStackTrace() {
+ // This is on the hot path; stack traces are expensive to compute and not very useful for this exception, so don't fill it.
+ return this;
+ }
}
diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java
index e25a0e0b2c3bf..3bd9333dc4168 100644
--- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java
+++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java
@@ -30,6 +30,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.opensearch.test.ClusterServiceUtils.setState;
@@ -69,7 +71,7 @@ public static void afterClass() {
public void testDefaults() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);
throttler.registerClusterManagerTask("create-index", true);
@@ -108,7 +110,7 @@ public void testValidateSettingsForDifferentVersion() {
}
}
- public void testValidateSettingsForTaskWihtoutRetryOnDataNode() {
+ public void testValidateSettingsForTaskWithoutRetryOnDataNode() {
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_5_0);
setState(
@@ -139,7 +141,7 @@ public void testUpdateSettingsForNullValue() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);
@@ -173,7 +175,7 @@ public void testSettingsOnBootstrap() {
.put("cluster_manager.throttling.retry.max.delay", maxDelay + "s")
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);
@@ -187,7 +189,7 @@ public void testSettingsOnBootstrap() {
public void testUpdateRetryDelaySetting() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
// verify defaults
@@ -217,7 +219,7 @@ public void testValidateSettingsForUnknownTask() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
// set some limit for update snapshot tasks
@@ -236,7 +238,7 @@ public void testUpdateThrottlingLimitForBasicSanity() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);
@@ -263,7 +265,7 @@ public void testValidateSettingForLimit() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);
@@ -274,7 +276,7 @@ public void testValidateSettingForLimit() {
public void testUpdateLimit() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, new ClusterManagerThrottlingStats());
throttler.registerClusterManagerTask("put-mapping", true);
@@ -309,7 +311,7 @@ public void testThrottlingForDisabledThrottlingTask() {
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, false);
@@ -321,6 +323,9 @@ public void testThrottlingForDisabledThrottlingTask() {
// Asserting that there was not any throttling for it
assertEquals(0L, throttlingStats.getThrottlingCount(taskKey));
+
+ // Asserting value in tasksCount map to make sure it gets updated even when throttling is disabled
+ assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
}
public void testThrottlingForInitialStaticSettingAndVersionCheck() {
@@ -339,7 +344,7 @@ public void testThrottlingForInitialStaticSettingAndVersionCheck() {
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true);
@@ -367,7 +372,7 @@ public void testThrottling() {
String taskKey = "test";
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
- return clusterService.getMasterService().getMinNodeVersion();
+ return clusterService.getClusterManagerService().getMinNodeVersion();
}, throttlingStats);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true);
@@ -406,6 +411,164 @@ public void testThrottling() {
throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1));
}
+ public void testThrottlingWithLock() {
+ ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
+ String taskKey = "test";
+ ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
+ return clusterService.getClusterManagerService().getMinNodeVersion();
+ }, throttlingStats);
+ ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true);
+
+ throttler.updateLimit(taskKey, 5);
+
+ // adding 3 tasks
+ throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));
+
+ // adding 3 more tasks, these tasks should be throttled
+ // taskCount in Queue: 3 Threshold: 5
+ assertThrows(
+ ClusterManagerThrottlingException.class,
+ () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3))
+ );
+ assertEquals(3L, throttlingStats.getThrottlingCount(taskKey));
+
+ // remove one task
+ throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1));
+
+ // add 3 tasks should pass now.
+ // taskCount in Queue: 2 Threshold: 5
+ throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread threadToLock = null;
+ try {
+ // Taking lock on tasksCount will not impact throttling behaviour now.
+ threadToLock = new Thread(() -> {
+ throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return 10L;
+ });
+ });
+ threadToLock.start();
+
+ // adding one task will throttle
+ // taskCount in Queue: 5 Threshold: 5
+ final ClusterManagerThrottlingException exception = assertThrows(
+ ClusterManagerThrottlingException.class,
+ () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
+ );
+ assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage());
+ assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey));
+ assertEquals(4L, throttlingStats.getThrottlingCount(taskKey));
+ } finally {
+ if (threadToLock != null) {
+ latch.countDown();
+ // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread
+ try {
+ threadToLock.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
+ }
+
+ public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() {
+ ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
+ String taskKey = "test";
+ ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> {
+ return clusterService.getClusterManagerService().getMinNodeVersion();
+ }, throttlingStats);
+ ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true);
+
+ throttler.updateLimit(taskKey, 5);
+
+ // adding 3 tasks
+ throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));
+
+ // adding 3 more tasks, these tasks should be throttled
+ // taskCount in Queue: 3 Threshold: 5
+ assertThrows(
+ ClusterManagerThrottlingException.class,
+ () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3))
+ );
+ assertEquals(3L, throttlingStats.getThrottlingCount(taskKey));
+
+ // remove one task
+ throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1));
+
+ // add 3 tasks should pass now.
+ // taskCount in Queue: 2 Threshold: 5
+ throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread threadToLock = null;
+ List submittingThreads = new ArrayList<>();
+
+ try {
+ // Taking lock on tasksCount will not impact throttling behaviour now.
+ threadToLock = new Thread(() -> {
+ throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return 10L;
+ });
+ });
+ threadToLock.start();
+
+ final CountDownLatch latch2 = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ Thread submittingThread = new Thread(() -> {
+ // adding one task will throttle
+ // taskCount in Queue: 5 Threshold: 5
+ final ClusterManagerThrottlingException exception = assertThrows(
+ ClusterManagerThrottlingException.class,
+ () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1))
+ );
+ assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage());
+ assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey));
+ latch2.countDown();
+ });
+ submittingThread.start();
+ submittingThreads.add(submittingThread);
+ }
+ try {
+ latch2.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertEquals(13L, throttlingStats.getThrottlingCount(taskKey));
+ } finally {
+ if (threadToLock != null) {
+ latch.countDown();
+ try {
+ // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread
+ threadToLock.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ for (Thread submittingThread : submittingThreads) {
+ try {
+ submittingThread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey));
+ }
+
private List getMockUpdateTaskList(
String taskKey,
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey,