Skip to content

Commit

Permalink
Master node changes for master task throttling (#3882)
Browse files Browse the repository at this point in the history
* Master node changes for master task throttling

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel authored Sep 2, 2022
1 parent c470262 commit e231136
Show file tree
Hide file tree
Showing 14 changed files with 937 additions and 43 deletions.
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.ParseField;
Expand Down Expand Up @@ -1613,8 +1614,8 @@ private enum OpenSearchExceptionHandle {
* TODO: Change the version number of check as per version in which this change will be merged.
*/
MASTER_TASK_THROTTLED_EXCEPTION(
org.opensearch.cluster.service.MasterTaskThrottlingException.class,
org.opensearch.cluster.service.MasterTaskThrottlingException::new,
ClusterManagerThrottlingException.class,
ClusterManagerThrottlingException::new,
163,
Version.V_3_0_0
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ default String describeTasks(List<T> tasks) {
return String.join(", ", tasks.stream().map(t -> (CharSequence) t.toString()).filter(t -> t.length() > 0)::iterator);
}

public static final String DEFAULT_CLUSTER_MANAGER_THROTTLING_KEY = "";

default String getClusterManagerThrottlingKey() {
return DEFAULT_CLUSTER_MANAGER_THROTTLING_KEY;
}

/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

/**
* This class does throttling on task submission to cluster manager node, it uses throttling key defined in various executors
* as key for throttling. Throttling will be performed over task executor's class level, different task types have different executors class.
*
* Set specific setting to for setting the threshold of throttling of particular task type.
* e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
* Set it to default value(-1) to disable the throttling for this task type.
*/
public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);

public static final Setting<Settings> THRESHOLD_SETTINGS = Setting.groupSetting(
"cluster_manager.throttling.thresholds.",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* To configure more task for throttling, override getClusterManagerThrottlingKey method with task name in task executor.
* Verify that throttled tasks would be retry.
*
* Added retry mechanism in TransportClusterManagerNodeAction so it would be retried for customer generated tasks.
*/
public static Set<String> CONFIGURED_TASK_FOR_THROTTLING = Collections.unmodifiableSet(
// TODO
// Add throttling key for other master task as well.
// Will be added as part of upcoming PRs,
// Right now just added "put-mapping" for ref only.
new HashSet<>(Arrays.asList("put-mapping"))
);

private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling
private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener;

private final ConcurrentMap<String, Long> tasksCount;
private final ConcurrentMap<String, Long> tasksThreshold;
private final Supplier<Version> minNodeVersionSupplier;

public ClusterManagerTaskThrottler(
final ClusterSettings clusterSettings,
final Supplier<Version> minNodeVersionSupplier,
final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener
) {
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
this.minNodeVersionSupplier = minNodeVersionSupplier;
this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener;
tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment
}

void validateSetting(final Settings settings) {
/**
* TODO: Change the version number of check as per version in which this change will be merged.
*/
if (minNodeVersionSupplier.get().compareTo(Version.V_3_0_0) < 0) {
throw new IllegalArgumentException("All the nodes in cluster should be on version later than or equal to 3.0.0");
}
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
if (!CONFIGURED_TASK_FOR_THROTTLING.contains(key)) {
throw new IllegalArgumentException("Cluster manager task throttling is not configured for given task type: " + key);
}
int threshold = groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE);
if (threshold < MIN_THRESHOLD_VALUE) {
throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling");
}
}
}

void updateSetting(final Settings settings) {
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE));
}
}

void updateLimit(final String taskKey, final int limit) {
assert limit >= MIN_THRESHOLD_VALUE;
if (limit == MIN_THRESHOLD_VALUE) {
tasksThreshold.remove(taskKey);
} else {
tasksThreshold.put(taskKey, (long) limit);
}
}

Long getThrottlingLimit(final String taskKey) {
return tasksThreshold.get(taskKey);
}

@Override
public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
String clusterManagerThrottlingKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey).getClusterManagerThrottlingKey();
tasksCount.putIfAbsent(clusterManagerThrottlingKey, 0L);
tasksCount.computeIfPresent(clusterManagerThrottlingKey, (key, count) -> {
int size = tasks.size();
Long threshold = tasksThreshold.get(clusterManagerThrottlingKey);
if (threshold != null && (count + size > threshold)) {
clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey, size);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
clusterManagerThrottlingKey,
tasks.size(),
threshold
);
throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey);
}
return count + size;
});
}

@Override
public void onSubmitFailure(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
}

/**
* Tasks will be removed from the queue before processing, so here we will reduce the count of tasks.
*
* @param tasks list of tasks which will be executed.
*/
@Override
public void onBeginProcessing(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
}

@Override
public void onTimeout(List<? extends TaskBatcher.BatchedTask> tasks) {
reduceTaskCount(tasks);
}

private void reduceTaskCount(List<? extends TaskBatcher.BatchedTask> tasks) {
String masterTaskKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey).getClusterManagerThrottlingKey();
tasksCount.computeIfPresent(masterTaskKey, (key, count) -> count - tasks.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

/**
* Listener interface for master task throttling
*/
public interface ClusterManagerTaskThrottlerListener {
void onThrottle(String type, final int counts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
import java.io.IOException;

/**
* Exception raised from master node due to task throttling.
* Exception raised from cluster manager node due to task throttling.
*/
public class MasterTaskThrottlingException extends OpenSearchException {
public class ClusterManagerThrottlingException extends OpenSearchException {

public MasterTaskThrottlingException(String msg, Object... args) {
public ClusterManagerThrottlingException(String msg, Object... args) {
super(msg, args);
}

public MasterTaskThrottlingException(StreamInput in) throws IOException {
public ClusterManagerThrottlingException(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.metrics.CounterMetric;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Contains stats of Cluster Manager Task Throttling.
* It stores the total cumulative count of throttled tasks per task type.
*/
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener {

private Map<String, CounterMetric> throttledTasksCount = new ConcurrentHashMap<>();

private void incrementThrottlingCount(String type, final int counts) {
throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric()).inc(counts);
}

public long getThrottlingCount(String type) {
return throttledTasksCount.get(type) == null ? 0 : throttledTasksCount.get(type).count();
}

public long getTotalThrottledTaskCount() {
CounterMetric totalCount = new CounterMetric();
throttledTasksCount.forEach((aClass, counterMetric) -> { totalCount.inc(counterMetric.count()); });
return totalCount.count();
}

@Override
public void onThrottle(String type, int counts) {
incrementThrottlingCount(type, counts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Assertions;
import org.opensearch.Version;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.AckedClusterStateTaskListener;
import org.opensearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -127,6 +128,8 @@ public class MasterService extends AbstractLifecycleComponent {

private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor;
private volatile Batcher taskBatcher;
protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler;
private final ClusterManagerThrottlingStats throttlingStats;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
Expand All @@ -137,6 +140,8 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
this::setSlowTaskLoggingThreshold
);

this.throttlingStats = new ClusterManagerThrottlingStats();
this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats);
this.threadPool = threadPool;
}

Expand All @@ -157,7 +162,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = createThreadPoolExecutor();
taskBatcher = new Batcher(logger, threadPoolExecutor);
taskBatcher = new Batcher(logger, threadPoolExecutor, clusterManagerTaskThrottler);
}

protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
Expand All @@ -172,8 +177,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
@SuppressWarnings("unchecked")
class Batcher extends TaskBatcher {

Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) {
super(logger, threadExecutor);
Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) {
super(logger, threadExecutor, taskBatcherListener);
}

@Override
Expand Down Expand Up @@ -589,6 +594,20 @@ public List<PendingClusterTask> pendingTasks() {
}).collect(Collectors.toList());
}

/**
* Returns the number of throttled pending tasks.
*/
public long numberOfThrottledPendingTasks() {
return throttlingStats.getTotalThrottledTaskCount();
}

/**
* Returns the min version of nodes in cluster
*/
public Version getMinNodeVersion() {
return state().getNodes().getMinNodeVersion();
}

/**
* Returns the number of currently pending tasks.
*/
Expand Down
Loading

0 comments on commit e231136

Please sign in to comment.