Skip to content

Commit

Permalink
Add version check during task submission for bwc for static threshold…
Browse files Browse the repository at this point in the history
… setting (#5633)

* Add version check during task submission for bwc for static threshold setting

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel authored Dec 27, 2022
1 parent 98ca4a7 commit ea1cc9d
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix case sensitivity for wildcard queries ([#5462](https://github.com/opensearch-project/OpenSearch/pull/5462))
- Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524))
- Fix backward compatibility for static cluster manager throttling threshold setting ([#5633](https://github.com/opensearch-project/OpenSearch/pull/5633))
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.4...HEAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,7 @@ private enum OpenSearchExceptionHandle {
ClusterManagerThrottlingException.class,
ClusterManagerThrottlingException::new,
165,
Version.V_2_4_0
Version.V_2_5_0
),
SNAPSHOT_IN_USE_DELETION_EXCEPTION(
SnapshotInUseDeletionException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -51,6 +52,11 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private final ConcurrentMap<String, Long> tasksThreshold;
private final Supplier<Version> minNodeVersionSupplier;

// Once all nodes are greater than or equal 2.5.0 version, then only it will start throttling.
// During upgrade as well, it will wait for all older version nodes to leave the cluster before starting throttling.
// This is needed specifically for static setting to enable throttling.
private AtomicBoolean startThrottling = new AtomicBoolean();

public ClusterManagerTaskThrottler(
final Settings settings,
final ClusterSettings clusterSettings,
Expand Down Expand Up @@ -168,7 +174,7 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
int size = tasks.size();
if (clusterManagerThrottlingKey.isThrottlingEnabled()) {
Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey());
if (threshold != null && (count + size > threshold)) {
if (threshold != null && shouldThrottle(threshold, count, size)) {
clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
Expand All @@ -185,6 +191,28 @@ public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
});
}

/**
* If throttling thresholds are set via static setting, it will update the threshold map.
* It may start throwing throttling exception to older nodes in cluster.
* Older version nodes will not be equipped to handle the throttling exception and
* this may result in unexpected behavior where internal tasks would start failing without any retries.
*
* For every task submission request, it will validate if nodes version is greater or equal to 2.5.0 and set the startThrottling flag.
* Once the startThrottling flag is set, it will not perform check for next set of tasks.
*/
private boolean shouldThrottle(Long threshold, Long count, int size) {
if (!startThrottling.get()) {
if (minNodeVersionSupplier.get().compareTo(Version.V_2_5_0) >= 0) {
startThrottling.compareAndSet(false, true);
logger.info("Starting cluster manager throttling as all nodes are higher than or equal to 2.5.0");
} else {
logger.info("Skipping cluster manager throttling as at least one node < 2.5.0 is present in cluster");
return false;
}
}
return count + size > threshold;
}

@Override
public void onSubmitFailure(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,48 @@ public void testThrottlingForDisabledThrottlingTask() {
assertEquals(0L, throttlingStats.getThrottlingCount(taskKey));
}

public void testThrottlingForInitialStaticSettingAndVersionCheck() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0);
DiscoveryNode dataNode = getDataNode(Version.V_2_4_0);
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode })
);

// setting threshold in initial settings
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int put_mapping_threshold_value = randomIntBetween(1, 10);
Settings initialSettings = Settings.builder()
.put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value)
.build();
ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler(
initialSettings,
clusterSettings,
() -> { return clusterService.getMasterService().getMinNodeVersion(); },
throttlingStats
);
ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true);

// verifying adding more tasks then threshold passes
throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, put_mapping_threshold_value + 5));
assertEquals(0L, throttlingStats.getThrottlingCount("put-mapping"));

// Removing older version node from cluster
setState(
clusterService,
ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode })
);

// adding more tasks, these tasks should be throttled
// As queue already have more tasks than threshold from previous call.
assertThrows(
ClusterManagerThrottlingException.class,
() -> throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, 3))
);
assertEquals(3L, throttlingStats.getThrottlingCount("put-mapping"));
}

public void testThrottling() {
ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats();
String taskKey = "test";
Expand Down

0 comments on commit ea1cc9d

Please sign in to comment.