Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master node changes for master task throttling #3882

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
fb05a69
Master node changes for master task throttling
dhwanilpatel Jun 6, 2022
3ee75ef
Incorporated comments
dhwanilpatel Jul 19, 2022
6c99e1b
Simplified the logic of acquire/release for throttling
dhwanilpatel Aug 4, 2022
2fad544
Incorporated comments
dhwanilpatel Aug 12, 2022
bfdd946
Incoporated comments 08/17
dhwanilpatel Aug 17, 2022
41bec65
Merge branch 'feature/master-task-throttling' into throttling-master-…
dhwanilpatel Aug 24, 2022
a3fb105
[BUG] Running "opensearch-service.bat start" and "opensearch-service.…
burck1 Aug 24, 2022
0bf6b2f
Removing dead code in RecoveryTarget. (#4278)
mch2 Aug 24, 2022
5dd7947
Update the head ref to changelog verifier (#4296)
kotwanikunal Aug 24, 2022
1bfabed
Add 2.x version to CHANGELOG (#4297)
kotwanikunal Aug 25, 2022
e408f67
Fix TaskBatcher UT
dhwanilpatel Aug 26, 2022
a860470
Add JavaDoc for TaskBatcherListener
dhwanilpatel Aug 26, 2022
1dbb63a
Do not fail replica shard due to primary closure (#4133)
andrross Aug 26, 2022
c62cecb
Some dependency updates (#4308)
reta Aug 26, 2022
65f966e
Restore using the class ClusterInfoRequest and ClusterInfoRequestBuil…
Aug 26, 2022
7ea6e88
[BUG] Create logs directory before running OpenSearch on Windows (#4305)
burck1 Aug 29, 2022
cd961f3
Use RemoteSegmentStoreDirectory instead of RemoteDirectory (#4240)
sachinpkale Aug 29, 2022
7fe5830
ZIP publication groupId value is configurable (#4156)
lukas-vlcek Aug 29, 2022
f4e041e
[Segment Replication] Add timeout on Mockito.verify to reduce flakyne…
dreamer-89 Aug 29, 2022
eaefc71
Incorporated comments 08/30
dhwanilpatel Aug 30, 2022
beb09af
Adding @dreamer-89 to Opensearch maintainers. (#4342)
kartg Aug 30, 2022
4bccdbe
[CVE] Update snakeyaml dependency (#4341)
adnapibar Aug 30, 2022
82bda89
Fixed commit workflow for dependabot PR helper (#4331)
kotwanikunal Aug 30, 2022
48d6869
Add release notes for patch release 1.3.5 (#4343)
adnapibar Aug 30, 2022
f16ea9c
Add release notes for patch release 2.2.1 (#4344)
adnapibar Aug 30, 2022
4f65ef5
Add label configuration for dependabot PRs (#4348)
kotwanikunal Aug 31, 2022
d72861f
Support for HTTP/2 (server-side) (#3847)
reta Aug 31, 2022
c28221e
Fix token usage for changelog helper (#4351)
kotwanikunal Aug 31, 2022
100120a
Revert "Fix token usage for changelog helper (#4351)" (#4361)
kotwanikunal Aug 31, 2022
19d1a2b
Segment Replication - Implement segment replication event cancellatio…
mch2 Aug 31, 2022
4a6e937
Bug fixes for dependabot changelog verifier (#4364)
kotwanikunal Sep 1, 2022
689a2c4
Add changes for Create PIT and Delete PIT rest layer and rest high le…
bharath-techie Sep 1, 2022
bd11c69
Bump com.diffplug.spotless from 6.9.1 to 6.10.0 (#4319)
dependabot[bot] Sep 1, 2022
236f2f6
Update to Netty 4.1.80.Final (#4359)
reta Sep 1, 2022
5c3cc93
Bump xmlbeans from 5.1.0 to 5.1.1 in /plugins/ingest-attachment (#4354)
dependabot[bot] Sep 1, 2022
715da84
Fix randomized test failure NRTReplicationEngineTests.testUpdateSegme…
adnapibar Sep 1, 2022
70d911c
[AUTO] [main] Added bwc version 2.2.2. (#4383)
opensearch-trigger-bot[bot] Sep 2, 2022
53b6614
Renamed onProcess method of TaskBatcherListener
dhwanilpatel Sep 2, 2022
018912b
Minor comment incorporated
dhwanilpatel Sep 2, 2022
85ede3b
Merge branch 'main' into feature/master-task-throttling
dhwanilpatel Sep 2, 2022
1ccbeac
Merge branch 'feature/master-task-throttling' of https://github.com/o…
dhwanilpatel Sep 2, 2022
3c8b6d7
Merge branch 'feature/master-task-throttling' into throttling-master-…
dhwanilpatel Sep 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.ParseField;
Expand Down Expand Up @@ -1613,8 +1614,8 @@ private enum OpenSearchExceptionHandle {
* TODO: Change the version number of check as per version in which this change will be merged.
*/
MASTER_TASK_THROTTLED_EXCEPTION(
org.opensearch.cluster.service.MasterTaskThrottlingException.class,
org.opensearch.cluster.service.MasterTaskThrottlingException::new,
ClusterManagerThrottlingException.class,
ClusterManagerThrottlingException::new,
163,
Version.V_3_0_0
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ default String describeTasks(List<T> tasks) {
return String.join(", ", tasks.stream().map(t -> (CharSequence) t.toString()).filter(t -> t.length() > 0)::iterator);
}

public static final String DEFAULT_CLUSTER_MANAGER_THROTTLING_KEY = "";

default String getClusterManagerThrottlingKey() {
return DEFAULT_CLUSTER_MANAGER_THROTTLING_KEY;
}

/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

/**
* This class does throttling on task submission to cluster manager node, it uses throttling key defined in various executors
* as key for throttling. Throttling will be performed over task executor's class level, different task types have different executors class.
*
* Set specific setting to for setting the threshold of throttling of particular task type.
* e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
* Set it to default value(-1) to disable the throttling for this task type.
*/
public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);

public static final Setting<Settings> THRESHOLD_SETTINGS = Setting.groupSetting(
"cluster_manager.throttling.thresholds.",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* To configure more task for throttling, override getClusterManagerThrottlingKey method with task name in task executor.
* Verify that throttled tasks would be retry.
*
* Added retry mechanism in TransportClusterManagerNodeAction so it would be retried for customer generated tasks.
*/
public static Set<String> CONFIGURED_TASK_FOR_THROTTLING = Collections.unmodifiableSet(
// TODO
// Add throttling key for other master task as well.
// Will be added as part of upcoming PRs,
// Right now just added "put-mapping" for ref only.
new HashSet<>(Arrays.asList("put-mapping"))
);

private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener;

private final ConcurrentMap<String, Long> tasksCount;
private final ConcurrentMap<String, Long> tasksThreshold;
private final Supplier<Version> minNodeVersionSupplier;

public ClusterManagerTaskThrottler(
final ClusterSettings clusterSettings,
final Supplier<Version> minNodeVersionSupplier,
final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener
) {
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
}

void validateSetting(final Settings settings) {
/**
* TODO: Change the version number of check as per version in which this change will be merged.
*/
if (minNodeVersionSupplier.get().compareTo(Version.V_3_0_0) < 0) {
throw new IllegalArgumentException("All the nodes in cluster should be on version later than or equal to 3.0.0");
}
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
if (!CONFIGURED_TASK_FOR_THROTTLING.contains(key)) {
throw new IllegalArgumentException("Cluster manager task throttling is not configured for given task type: " + key);
}
int threshold = groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE);
if (threshold < MIN_THRESHOLD_VALUE) {
throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling");
}
}
}

void updateSetting(final Settings settings) {
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE));
}
}

void updateLimit(final String taskKey, final int limit) {
assert limit >= MIN_THRESHOLD_VALUE;
if (limit == MIN_THRESHOLD_VALUE) {
tasksThreshold.remove(taskKey);
} else {
tasksThreshold.put(taskKey, (long) limit);
}
}

Long getThrottlingLimit(final String taskKey) {
return tasksThreshold.get(taskKey);
}

@Override
public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
String clusterManagerThrottlingKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey).getClusterManagerThrottlingKey();
tasksCount.putIfAbsent(clusterManagerThrottlingKey, 0L);
tasksCount.computeIfPresent(clusterManagerThrottlingKey, (key, count) -> {
int size = tasks.size();
Long threshold = tasksThreshold.get(clusterManagerThrottlingKey);
if (threshold != null && (count + size > threshold)) {
clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey, size);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
clusterManagerThrottlingKey,
tasks.size(),
threshold
);
throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey);
}
return count + size;
});
}

@Override
public void onSubmitFailure(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
}

/**
* Tasks will be removed from the queue before processing, so here we will reduce the count of tasks.
*
* @param tasks list of tasks which will be executed.
*/
@Override
public void onBeginProcessing(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
}

@Override
public void onTimeout(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
}

private void reduceTaskCount(List<? extends TaskBatcher.BatchedTask> tasks) {
String masterTaskKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey).getClusterManagerThrottlingKey();
tasksCount.computeIfPresent(masterTaskKey, (key, count) -> count - tasks.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

/**
* Listener interface for master task throttling
*/
public interface ClusterManagerTaskThrottlerListener {
void onThrottle(String type, final int counts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
import java.io.IOException;

/**
* Exception raised from master node due to task throttling.
* Exception raised from cluster manager node due to task throttling.
*/
public class MasterTaskThrottlingException extends OpenSearchException {
public class ClusterManagerThrottlingException extends OpenSearchException {

public MasterTaskThrottlingException(String msg, Object... args) {
public ClusterManagerThrottlingException(String msg, Object... args) {
super(msg, args);
}

public MasterTaskThrottlingException(StreamInput in) throws IOException {
public ClusterManagerThrottlingException(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.metrics.CounterMetric;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Contains stats of Cluster Manager Task Throttling.
* It stores the total cumulative count of throttled tasks per task type.
*/
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener {

private Map<String, CounterMetric> throttledTasksCount = new ConcurrentHashMap<>();

private void incrementThrottlingCount(String type, final int counts) {
throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric()).inc(counts);
}

public long getThrottlingCount(String type) {
return throttledTasksCount.get(type) == null ? 0 : throttledTasksCount.get(type).count();
}

public long getTotalThrottledTaskCount() {
CounterMetric totalCount = new CounterMetric();
throttledTasksCount.forEach((aClass, counterMetric) -> { totalCount.inc(counterMetric.count()); });
return totalCount.count();
}

@Override
public void onThrottle(String type, int counts) {
incrementThrottlingCount(type, counts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Assertions;
import org.opensearch.Version;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.AckedClusterStateTaskListener;
import org.opensearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -127,6 +128,8 @@ public class MasterService extends AbstractLifecycleComponent {

private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor;
private volatile Batcher taskBatcher;
protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
private final ClusterManagerThrottlingStats throttlingStats;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
Expand All @@ -137,6 +140,8 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
this::setSlowTaskLoggingThreshold
);

this.throttlingStats = new ClusterManagerThrottlingStats();
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats);
this.threadPool = threadPool;
}

Expand All @@ -157,7 +162,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = createThreadPoolExecutor();
taskBatcher = new Batcher(logger, threadPoolExecutor);
taskBatcher = new Batcher(logger, threadPoolExecutor, clusterManagerTaskThrottler);
}

protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
Expand All @@ -172,8 +177,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
@SuppressWarnings("unchecked")
class Batcher extends TaskBatcher {

Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) {
super(logger, threadExecutor);
Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) {
super(logger, threadExecutor, taskBatcherListener);
}

@Override
Expand Down Expand Up @@ -589,6 +594,20 @@ public List<PendingClusterTask> pendingTasks() {
}).collect(Collectors.toList());
}

/**
* Returns the number of throttled pending tasks.
*/
public long numberOfThrottledPendingTasks() {
return throttlingStats.getTotalThrottledTaskCount();
}

/**
* Returns the min version of nodes in cluster
*/
public Version getMinNodeVersion() {
return state().getNodes().getMinNodeVersion();
}

/**
* Returns the number of currently pending tasks.
*/
Expand Down
Loading