Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master node changes for master task throttling #3882

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
fb05a69
Master node changes for master task throttling
dhwanilpatel Jun 6, 2022
3ee75ef
Incorporated comments
dhwanilpatel Jul 19, 2022
6c99e1b
Simplified the logic of acquire/release for throttling
dhwanilpatel Aug 4, 2022
2fad544
Incorporated comments
dhwanilpatel Aug 12, 2022
bfdd946
Incoporated comments 08/17
dhwanilpatel Aug 17, 2022
41bec65
Merge branch 'feature/master-task-throttling' into throttling-master-…
dhwanilpatel Aug 24, 2022
a3fb105
[BUG] Running "opensearch-service.bat start" and "opensearch-service.…
burck1 Aug 24, 2022
0bf6b2f
Removing dead code in RecoveryTarget. (#4278)
mch2 Aug 24, 2022
5dd7947
Update the head ref to changelog verifier (#4296)
kotwanikunal Aug 24, 2022
1bfabed
Add 2.x version to CHANGELOG (#4297)
kotwanikunal Aug 25, 2022
e408f67
Fix TaskBatcher UT
dhwanilpatel Aug 26, 2022
a860470
Add JavaDoc for TaskBatcherListener
dhwanilpatel Aug 26, 2022
1dbb63a
Do not fail replica shard due to primary closure (#4133)
andrross Aug 26, 2022
c62cecb
Some dependency updates (#4308)
reta Aug 26, 2022
65f966e
Restore using the class ClusterInfoRequest and ClusterInfoRequestBuil…
Aug 26, 2022
7ea6e88
[BUG] Create logs directory before running OpenSearch on Windows (#4305)
burck1 Aug 29, 2022
cd961f3
Use RemoteSegmentStoreDirectory instead of RemoteDirectory (#4240)
sachinpkale Aug 29, 2022
7fe5830
ZIP publication groupId value is configurable (#4156)
lukas-vlcek Aug 29, 2022
f4e041e
[Segment Replication] Add timeout on Mockito.verify to reduce flakyne…
dreamer-89 Aug 29, 2022
eaefc71
Incorporated comments 08/30
dhwanilpatel Aug 30, 2022
beb09af
Adding @dreamer-89 to Opensearch maintainers. (#4342)
kartg Aug 30, 2022
4bccdbe
[CVE] Update snakeyaml dependency (#4341)
adnapibar Aug 30, 2022
82bda89
Fixed commit workflow for dependabot PR helper (#4331)
kotwanikunal Aug 30, 2022
48d6869
Add release notes for patch release 1.3.5 (#4343)
adnapibar Aug 30, 2022
f16ea9c
Add release notes for patch release 2.2.1 (#4344)
adnapibar Aug 30, 2022
4f65ef5
Add label configuration for dependabot PRs (#4348)
kotwanikunal Aug 31, 2022
d72861f
Support for HTTP/2 (server-side) (#3847)
reta Aug 31, 2022
c28221e
Fix token usage for changelog helper (#4351)
kotwanikunal Aug 31, 2022
100120a
Revert "Fix token usage for changelog helper (#4351)" (#4361)
kotwanikunal Aug 31, 2022
19d1a2b
Segment Replication - Implement segment replication event cancellatio…
mch2 Aug 31, 2022
4a6e937
Bug fixes for dependabot changelog verifier (#4364)
kotwanikunal Sep 1, 2022
689a2c4
Add changes for Create PIT and Delete PIT rest layer and rest high le…
bharath-techie Sep 1, 2022
bd11c69
Bump com.diffplug.spotless from 6.9.1 to 6.10.0 (#4319)
dependabot[bot] Sep 1, 2022
236f2f6
Update to Netty 4.1.80.Final (#4359)
reta Sep 1, 2022
5c3cc93
Bump xmlbeans from 5.1.0 to 5.1.1 in /plugins/ingest-attachment (#4354)
dependabot[bot] Sep 1, 2022
715da84
Fix randomized test failure NRTReplicationEngineTests.testUpdateSegme…
adnapibar Sep 1, 2022
70d911c
[AUTO] [main] Added bwc version 2.2.2. (#4383)
opensearch-trigger-bot[bot] Sep 2, 2022
53b6614
Renamed onProcess method of TaskBatcherListener
dhwanilpatel Sep 2, 2022
018912b
Minor comment incorporated
dhwanilpatel Sep 2, 2022
85ede3b
Merge branch 'main' into feature/master-task-throttling
dhwanilpatel Sep 2, 2022
1ccbeac
Merge branch 'feature/master-task-throttling' of https://github.com/o…
dhwanilpatel Sep 2, 2022
3c8b6d7
Merge branch 'feature/master-task-throttling' into throttling-master-…
dhwanilpatel Sep 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ default String describeTasks(List<T> tasks) {
return String.join(", ", tasks.stream().map(t -> (CharSequence) t.toString()).filter(t -> t.length() > 0)::iterator);
}

public static final String DEFAULT_MASTER_THROTTLING_KEY = "";

default String getMasterThrottlingKey() {
return DEFAULT_MASTER_THROTTLING_KEY;
}

/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ public class MasterService extends AbstractLifecycleComponent {

private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor;
private volatile Batcher taskBatcher;
/**
* Throttler which performs throttling based on pending task count per task type.
* It acquire permits before task goes into queue and release the permit when task is removed from the queue.
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
*
* It acquire permits at one place, while submitting the tasks.
* It releases from three places.
* 1. If submit task is failed due to some exception (Since we already acquire permit and submit is failing so we need to release it).
* 2. Release before execution ( Task is removed from queue and went into execution).
* 3. Task is timed out in queue(Task is timed out in queue, so master will throw exception to data nodes. Releasing as task is removed from queue)
*/
protected final MasterTaskThrottler masterTaskThrottler;
private final MasterThrottlingStats throttlingStats;

public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
Expand All @@ -131,6 +143,8 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP
this::setSlowTaskLoggingThreshold
);

this.throttlingStats = new MasterThrottlingStats();
this.masterTaskThrottler = new MasterTaskThrottler(clusterSettings, this, throttlingStats);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
this.threadPool = threadPool;
}

Expand All @@ -151,7 +165,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting");
threadPoolExecutor = createThreadPoolExecutor();
taskBatcher = new Batcher(logger, threadPoolExecutor);
taskBatcher = new Batcher(logger, threadPoolExecutor, masterTaskThrottler);
}

protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
Expand All @@ -166,8 +180,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
@SuppressWarnings("unchecked")
class Batcher extends TaskBatcher {

Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) {
super(logger, threadExecutor);
Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) {
super(logger, threadExecutor, taskBatcherListener);
}

@Override
Expand Down Expand Up @@ -570,6 +584,13 @@ public List<PendingClusterTask> pendingTasks() {
}).collect(Collectors.toList());
}

/**
* Returns the number of throttled pending tasks.
*/
public long numberOfThrottledPendingTasks() {
return throttlingStats.getTotalThrottledTaskCount();
}

/**
* Returns the number of currently pending tasks.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterStateTaskExecutor;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* This class is extension of {@link Throttler} and does throttling of master tasks.
*
* This class does throttling on task submission to master, it uses class name of request of tasks as key for
* throttling. Throttling will be performed over task executor's class level, different task types have different executors class.
*
* Set specific setting to for setting the threshold of throttling of particular task type.
* e.g : Set "master.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
* Set it to default value(-1) to disable the throttling for this task type.
*/
public class MasterTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(MasterTaskThrottler.class);

public static final Setting<Settings> THRESHOLD_SETTINGS = Setting.groupSetting(
"master.throttling.thresholds.",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* To configure more task for throttling, override getMasterThrottlingKey method with task name in task executor.
* Verify that throttled tasks would be retry.
*
* Added retry mechanism in TransportMasterNodeAction so it would be retried for customer generated tasks.
*/
public static Set<String> CONFIGURED_TASK_FOR_THROTTLING = Collections.unmodifiableSet(
// TODO
// Add throttling key for other master task as well.
// Will be added as part of upcoming PRs,
// Right now just added "put-mapping" for ref only.
new HashSet<>(Arrays.asList("put-mapping"))
);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

private final int DEFAULT_THRESHOLD_VALUE = -1; // Disabled throttling
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
private final MasterService masterService;
private final MasterTaskThrottlerListener masterTaskThrottlerListener;

private final ConcurrentMap<String, Long> tasksCount;
private final ConcurrentMap<String, Long> tasksThreshold;

public MasterTaskThrottler(
final ClusterSettings clusterSettings,
final MasterService masterService,
final MasterTaskThrottlerListener masterTaskThrottlerListener
) {
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting);
this.masterService = masterService;
this.masterTaskThrottlerListener = masterTaskThrottlerListener;
tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will lend in different segment
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will lend in different segment
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}

public void validateSetting(final Settings settings) {
/**
* TODO: Change the version number of check as per version in which this change will be merged.
*/
if (masterService.state().nodes().getMinNodeVersion().compareTo(Version.V_3_0_0) < 0) {
throw new IllegalArgumentException("All the nodes in cluster should be on version later than or equal to 3.0.0");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there are restriction on having all node above 3.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make it to the version to which PR is going to be merged, as of now kept it as Current Version(i.e 3.0).
I have added TODO for changing the version as well.

Once we raise final PR against master branch will change the version in it and take care of TODO.

}
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
if (!CONFIGURED_TASK_FOR_THROTTLING.contains(key)) {
throw new IllegalArgumentException("Master task throttling is not configured for given task type: " + key);
}
int threshold = groups.get(key).getAsInt("value", DEFAULT_THRESHOLD_VALUE);
if (threshold < DEFAULT_THRESHOLD_VALUE) {
throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling");
}
}
}

public void updateSetting(final Settings settings) {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Settings> groups = settings.getAsGroups();
for (String key : groups.keySet()) {
updateLimit(key, groups.get(key).getAsInt("value", DEFAULT_THRESHOLD_VALUE));
}
}

protected void updateLimit(final String taskKey, final int limit) {
assert limit >= DEFAULT_THRESHOLD_VALUE;
if (limit == DEFAULT_THRESHOLD_VALUE) {
tasksThreshold.remove(taskKey);
} else {
tasksThreshold.put(taskKey, (long)limit);
}
}

@Override
public void beforeSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
String masterTaskKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey).getMasterThrottlingKey();
tasksCount.putIfAbsent(masterTaskKey, 0L);
tasksCount.computeIfPresent(masterTaskKey, (key, count) -> {
long size = tasks.size();
Long threshold = tasksThreshold.get(masterTaskKey);
if(threshold != null && (count + size > threshold)) {
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
masterTaskKey,
tasks.size(),
threshold
);
throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for " + masterTaskKey);
}
return count + size;
});
}

@Override
public void beforeExecute(List<? extends TaskBatcher.BatchedTask> tasks) {
String masterTaskKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey).getMasterThrottlingKey();
tasksCount.computeIfPresent(masterTaskKey, (key, count) -> count - tasks.size());
}

@Override
public void beforeTimeout(List<? extends TaskBatcher.BatchedTask> tasks) {
String masterTaskKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey).getMasterThrottlingKey();
tasksCount.computeIfPresent(masterTaskKey, (key, count) -> count - tasks.size());
}
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

/**
* Listener interface for master task throttling
*/
public interface MasterTaskThrottlerListener {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
void onThrottle(String type, final int permits);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.metrics.CounterMetric;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Contains stats of Master Task Throttling.
* It stores the total cumulative count of throttled tasks per task type.
*/
public class MasterThrottlingStats implements MasterTaskThrottlerListener {

private Map<String, CounterMetric> throttledTasksCount = new ConcurrentHashMap<>();

private void incrementThrottlingCount(String type, final int permits) {
if (!throttledTasksCount.containsKey(type)) {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric());
}
throttledTasksCount.get(type).inc(permits);
}

public long getThrottlingCount(String type) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use an API to get throttled metrics along with the task type. It would be really helpful to tune this up as an end user . However, we can do it once the stats API is present as well .

return throttledTasksCount.get(type).count();
}

public long getTotalThrottledTaskCount() {
CounterMetric totalCount = new CounterMetric();
throttledTasksCount.forEach((aClass, counterMetric) -> { totalCount.inc(counterMetric.count()); });
return totalCount.count();
}

@Override
public void onThrottle(String type, int permits) {
incrementThrottlingCount(type, permits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ public abstract class TaskBatcher {
private final PrioritizedOpenSearchThreadPoolExecutor threadExecutor;
// package visible for tests
final Map<Object, LinkedHashSet<BatchedTask>> tasksPerBatchingKey = new HashMap<>();
private final TaskBatcherListener taskBatcherListener;
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

public TaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) {
public TaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) {
this.logger = logger;
this.threadExecutor = threadExecutor;
this.taskBatcherListener = taskBatcherListener;
}

public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws OpenSearchRejectedExecutionException {
Expand All @@ -75,6 +77,11 @@ public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue t
final BatchedTask firstTask = tasks.get(0);
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey)
: "tasks submitted in a batch should share the same batching key: " + tasks;
assert tasks.stream().allMatch(t -> t.getTask().getClass() == firstTask.getTask().getClass())
: "tasks submitted in a batch should be of same class: " + tasks;

taskBatcherListener.beforeSubmit(tasks);

dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
// convert to an identity map to check for dups based on task identity
final Map<Object, BatchedTask> tasksIdentity = tasks.stream()
.collect(
Expand Down Expand Up @@ -136,6 +143,7 @@ private void onTimeoutInternal(List<? extends BatchedTask> tasks, TimeValue time
}
}
}
taskBatcherListener.beforeTimeout(toRemove);
onTimeout(toRemove, timeout);
}
}
Expand Down Expand Up @@ -173,6 +181,7 @@ void runIfNotProcessed(BatchedTask updateTask) {
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");

taskBatcherListener.beforeExecute(toExecute);
run(updateTask.batchingKey, toExecute, tasksSummary);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.MasterService;
import org.opensearch.cluster.service.MasterTaskThrottler;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.network.NetworkService;
Expand Down Expand Up @@ -569,7 +570,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS,
ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT,
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES
IndexingPressure.MAX_INDEXING_BYTES,
MasterTaskThrottler.THRESHOLD_SETTINGS
)
)
);
Expand Down
Loading