From 085a2f532c304304f9cac4d3bfcce30c4312f7a2 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Mon, 9 Sep 2024 19:04:34 +0530 Subject: [PATCH] [Backport 2.x] ClusterManagerTaskThrottler Improvements (#15647) * ClusterManagerTaskThrottler Improvements (#15508) Add shallow check in ClusterManagerTaskThrottler to fail fast before computeIfPresent to avoid lock when queue is full Signed-off-by: Sumit Bansal (cherry picked from commit 17b5f98c5e28c5b56fe88d39513c4ce119600b9c) --- .../service/ClusterManagerTaskThrottler.java | 75 ++++--- .../ClusterManagerThrottlingException.java | 6 + .../ClusterManagerTaskThrottlerTests.java | 187 ++++++++++++++++-- 3 files changed, 233 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index 827f3a12fbce4..39ce218dd801a 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -33,7 +33,7 @@ *

* Set specific setting to for setting the threshold of throttling of particular task type. * e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks, - * Set it to default value(-1) to disable the throttling for this task type. + * Set it to default value(-1) to disable the throttling for this task type. */ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class); @@ -69,7 +69,7 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener; - private final ConcurrentMap tasksCount; + final ConcurrentMap tasksCount; private final ConcurrentMap tasksThreshold; private final Supplier minNodeVersionSupplier; @@ -209,30 +209,59 @@ Long getThrottlingLimit(final String taskKey) { return tasksThreshold.get(taskKey); } + private void failFastWhenThrottlingThresholdsAreAlreadyBreached( + final boolean throttlingEnabledWithThreshold, + final Long threshold, + final long existingTaskCount, + final int incomingTaskCount, + final String taskThrottlingKey + ) { + if (throttlingEnabledWithThreshold && shouldThrottle(threshold, existingTaskCount, incomingTaskCount)) { + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); + } + } + @Override public void onBeginSubmit(List tasks) { - ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) + final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) .getClusterManagerThrottlingKey(); - tasksCount.putIfAbsent(clusterManagerThrottlingKey.getTaskThrottlingKey(), 0L); - tasksCount.computeIfPresent(clusterManagerThrottlingKey.getTaskThrottlingKey(), (key, count) -> { - int size = tasks.size(); - if (clusterManagerThrottlingKey.isThrottlingEnabled()) { - Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); - if (threshold != null && shouldThrottle(threshold, count, size)) { - clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); - logger.warn( - "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", - clusterManagerThrottlingKey.getTaskThrottlingKey(), - tasks.size(), - threshold - ); - throw new ClusterManagerThrottlingException( - "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey.getTaskThrottlingKey() - ); - } - } - return count + size; - }); + final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey(); + final Long threshold = getThrottlingLimit(taskThrottlingKey); + final boolean isThrottlingEnabledWithThreshold = clusterManagerThrottlingKey.isThrottlingEnabled() && threshold != null; + int incomingTaskCount = tasks.size(); + + try { + tasksCount.putIfAbsent(taskThrottlingKey, 0L); + // Perform shallow check before acquiring lock to avoid blocking of network threads + // if throttling is ongoing for a specific task + failFastWhenThrottlingThresholdsAreAlreadyBreached( + isThrottlingEnabledWithThreshold, + threshold, + tasksCount.get(taskThrottlingKey), + incomingTaskCount, + taskThrottlingKey + ); + + tasksCount.computeIfPresent(taskThrottlingKey, (key, existingTaskCount) -> { + failFastWhenThrottlingThresholdsAreAlreadyBreached( + isThrottlingEnabledWithThreshold, + threshold, + existingTaskCount, + incomingTaskCount, + taskThrottlingKey + ); + return existingTaskCount + incomingTaskCount; + }); + } catch (final ClusterManagerThrottlingException e) { + clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, incomingTaskCount); + logger.trace( + "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", + taskThrottlingKey, + incomingTaskCount, + threshold + ); + throw e; + } } /** diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java index 04fa9fa45d5ea..7a835910c400f 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java @@ -25,4 +25,10 @@ public ClusterManagerThrottlingException(String msg, Object... args) { public ClusterManagerThrottlingException(StreamInput in) throws IOException { super(in); } + + @Override + public Throwable fillInStackTrace() { + // This is on the hot path; stack traces are expensive to compute and not very useful for this exception, so don't fill it. + return this; + } } diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index e25a0e0b2c3bf..3bd9333dc4168 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -30,6 +30,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -69,7 +71,7 @@ public static void afterClass() { public void testDefaults() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); throttler.registerClusterManagerTask("create-index", true); @@ -108,7 +110,7 @@ public void testValidateSettingsForDifferentVersion() { } } - public void testValidateSettingsForTaskWihtoutRetryOnDataNode() { + public void testValidateSettingsForTaskWithoutRetryOnDataNode() { DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0); DiscoveryNode dataNode = getDataNode(Version.V_2_5_0); setState( @@ -139,7 +141,7 @@ public void testUpdateSettingsForNullValue() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -173,7 +175,7 @@ public void testSettingsOnBootstrap() { .put("cluster_manager.throttling.retry.max.delay", maxDelay + "s") .build(); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -187,7 +189,7 @@ public void testSettingsOnBootstrap() { public void testUpdateRetryDelaySetting() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); // verify defaults @@ -217,7 +219,7 @@ public void testValidateSettingsForUnknownTask() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); // set some limit for update snapshot tasks @@ -236,7 +238,7 @@ public void testUpdateThrottlingLimitForBasicSanity() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -263,7 +265,7 @@ public void testValidateSettingForLimit() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -274,7 +276,7 @@ public void testValidateSettingForLimit() { public void testUpdateLimit() { ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, new ClusterManagerThrottlingStats()); throttler.registerClusterManagerTask("put-mapping", true); @@ -309,7 +311,7 @@ public void testThrottlingForDisabledThrottlingTask() { String taskKey = "test"; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, false); @@ -321,6 +323,9 @@ public void testThrottlingForDisabledThrottlingTask() { // Asserting that there was not any throttling for it assertEquals(0L, throttlingStats.getThrottlingCount(taskKey)); + + // Asserting value in tasksCount map to make sure it gets updated even when throttling is disabled + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); } public void testThrottlingForInitialStaticSettingAndVersionCheck() { @@ -339,7 +344,7 @@ public void testThrottlingForInitialStaticSettingAndVersionCheck() { .put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value) .build(); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(initialSettings, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true); @@ -367,7 +372,7 @@ public void testThrottling() { String taskKey = "test"; ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { - return clusterService.getMasterService().getMinNodeVersion(); + return clusterService.getClusterManagerService().getMinNodeVersion(); }, throttlingStats); ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); @@ -406,6 +411,164 @@ public void testThrottling() { throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)); } + public void testThrottlingWithLock() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); + String taskKey = "test"; + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { + return clusterService.getClusterManagerService().getMinNodeVersion(); + }, throttlingStats); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + + throttler.updateLimit(taskKey, 5); + + // adding 3 tasks + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + // adding 3 more tasks, these tasks should be throttled + // taskCount in Queue: 3 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + ); + assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + + // remove one task + throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + + // add 3 tasks should pass now. + // taskCount in Queue: 2 Threshold: 5 + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + final CountDownLatch latch = new CountDownLatch(1); + Thread threadToLock = null; + try { + // Taking lock on tasksCount will not impact throttling behaviour now. + threadToLock = new Thread(() -> { + throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10L; + }); + }); + threadToLock.start(); + + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + final ClusterManagerThrottlingException exception = assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); + } finally { + if (threadToLock != null) { + latch.countDown(); + // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread + try { + threadToLock.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); + } + + public void testThrottlingWithMultipleOnBeginSubmitsThreadsWithLock() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); + String taskKey = "test"; + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(Settings.EMPTY, clusterSettings, () -> { + return clusterService.getClusterManagerService().getMinNodeVersion(); + }, throttlingStats); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); + + throttler.updateLimit(taskKey, 5); + + // adding 3 tasks + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + // adding 3 more tasks, these tasks should be throttled + // taskCount in Queue: 3 Threshold: 5 + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) + ); + assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); + + // remove one task + throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); + + // add 3 tasks should pass now. + // taskCount in Queue: 2 Threshold: 5 + throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); + + final CountDownLatch latch = new CountDownLatch(1); + Thread threadToLock = null; + List submittingThreads = new ArrayList<>(); + + try { + // Taking lock on tasksCount will not impact throttling behaviour now. + threadToLock = new Thread(() -> { + throttler.tasksCount.computeIfPresent(taskKey, (key, count) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10L; + }); + }); + threadToLock.start(); + + final CountDownLatch latch2 = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + Thread submittingThread = new Thread(() -> { + // adding one task will throttle + // taskCount in Queue: 5 Threshold: 5 + final ClusterManagerThrottlingException exception = assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) + ); + assertEquals("Throttling Exception : Limit exceeded for test", exception.getMessage()); + assertEquals(Optional.of(5L).get(), throttler.tasksCount.get(taskKey)); + latch2.countDown(); + }); + submittingThread.start(); + submittingThreads.add(submittingThread); + } + try { + latch2.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(13L, throttlingStats.getThrottlingCount(taskKey)); + } finally { + if (threadToLock != null) { + latch.countDown(); + try { + // Wait to complete and then assert on new tasksCount that got modified by threadToLock Thread + threadToLock.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + for (Thread submittingThread : submittingThreads) { + try { + submittingThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + assertEquals(Optional.of(10L).get(), throttler.tasksCount.get(taskKey)); + } + private List getMockUpdateTaskList( String taskKey, ClusterManagerTaskThrottler.ThrottlingKey throttlingKey,