Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add master task throttling #553

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.master;

import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.support.master.MasterThrottlingRetryListener;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportMessageListener;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class MasterTaskThrottlingIT extends OpenSearchIntegTestCase {

private static final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);

/*
* This integ test will test end-end master throttling feature for
* remote master.
*
* It will check the number of request coming to master node
* should be total number of requests + throttled requests from master.
* This will ensure the end-end feature is working as master is throwing
* Throttling exception and data node is performing retries on it.
*
*/
public void testThrottlingForRemoteMaster() throws Exception {
try {
internalCluster().beforeTest(random(), 0);
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
int throttlingLimit = randomIntBetween(1, 5);
createIndex("test");

ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", throttlingLimit)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());

TransportService masterTransportService = (internalCluster().getInstance(TransportService.class, masterNode));
AtomicInteger requestCountOnMaster = new AtomicInteger();
AtomicInteger throttledRequest = new AtomicInteger();
int totalRequest = randomIntBetween(throttlingLimit, 5 * throttlingLimit);
CountDownLatch latch = new CountDownLatch(totalRequest);

masterTransportService.addMessageListener(new TransportMessageListener() {
@Override
public void onRequestReceived(long requestId, String action) {
if (action.contains("mapping")) {
requestCountOnMaster.incrementAndGet();
}
}

@Override
public void onResponseSent(long requestId, String action, Exception error) {
if (action.contains("mapping")) {
throttledRequest.incrementAndGet();
assertEquals(MasterTaskThrottlingException.class, error.getClass());
}
}
});

ActionListener listener = new ActionListener() {
@Override
public void onResponse(Object o) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
latch.countDown();
throw new AssertionError(e);
}
};

Thread[] threads = new Thread[totalRequest];
for (int i = 0; i < totalRequest; i++) {
PutMappingRequest putMappingRequest = new PutMappingRequest("test")
.type("type")
.source("field" + i, "type=text");
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
internalCluster().client(dataNode).admin().indices().putMapping(putMappingRequest, listener);
}
});
}
for (int i = 0; i < totalRequest; i++) {
threads[i].run();
}
for (int i = 0; i < totalRequest; i++) {
threads[i].join();
}
latch.await();

assertEquals(totalRequest + throttledRequest.get(), requestCountOnMaster.get());
assertBusy(() -> {
assertEquals(clusterService().getMasterService().numberOfThrottledPendingTasks(), throttledRequest.get());
});
assertEquals(MasterThrottlingRetryListener.getRetryingTasksCount(), 0);
}
finally {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", (String) null)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());
}
}

/*
* This will test the throttling feature for single node.
*
* Here we will assert the client behaviour that client's request is not
* failed, i.e. Throttling exception is not passed to the client.
* Data node will internally do the retry and request should pass.
*
*/
public void testThrottlingForSingleNode() throws Exception {
try {
internalCluster().beforeTest(random(), 0);
String node = internalCluster().startNode();
int throttlingLimit = randomIntBetween(1, 5);
createIndex("test");

ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", throttlingLimit)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());

AtomicInteger successfulRequest = new AtomicInteger();
int totalRequest = randomIntBetween(throttlingLimit, 3 * throttlingLimit);
CountDownLatch latch = new CountDownLatch(totalRequest);

ActionListener listener = new ActionListener() {
@Override
public void onResponse(Object o) {
latch.countDown();
successfulRequest.incrementAndGet();
}

@Override
public void onFailure(Exception e) {
latch.countDown();
throw new AssertionError(e);
}
};

Thread[] threads = new Thread[totalRequest];
for (int i = 0; i < totalRequest; i++) {
PutMappingRequest putMappingRequest = new PutMappingRequest("test")
.type("type")
.source("field" + i, "type=text");
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
internalCluster().client(node).admin().indices().putMapping(putMappingRequest, listener);
}
});
}
for (int i = 0; i < totalRequest; i++) {
threads[i].run();
}
for (int i = 0; i < totalRequest; i++) {
threads[i].join();
}

latch.await();
assertEquals(totalRequest, successfulRequest.get());
assertEquals(MasterThrottlingRetryListener.getRetryingTasksCount(), 0);
}
finally {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", (String) null)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());
}
}

@BeforeClass
public static void initTestScheduler() {
MasterThrottlingRetryListener.setThrottlingRetryScheduler(scheduler);
}

@AfterClass
public static void terminateScheduler() {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,15 @@ private enum OpenSearchExceptionHandle {
org.opensearch.transport.NoSeedNodeLeftException.class,
org.opensearch.transport.NoSeedNodeLeftException::new,
160,
LegacyESVersion.V_7_10_0);
LegacyESVersion.V_7_10_0),
/**
* TODO: Change the version number of check as per version in which this change will be merged.
*/
MASTER_TASK_THROTTLED_EXCEPTION(
org.opensearch.cluster.service.MasterTaskThrottlingException.class,
org.opensearch.cluster.service.MasterTaskThrottlingException::new,
161,
LegacyESVersion.V_7_10_3);

final Class<? extends OpenSearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends OpenSearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClus
this.allocationService = allocationService;
}

@Override
public String getMasterThrottlingKey() {
return "cluster-reroute-api";
}

@Override
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
}

@Override
public String getMasterThrottlingKey() {
return "cluster-update-settings";
}

@Override
public void onAllNodesAcked(@Nullable Exception e) {
if (changed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}

@Override
public String getMasterThrottlingKey() {
return "auto-create";
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}

@Override
public String getMasterThrottlingKey() {
return "delete-dangling-index";
}

@Override
public ClusterState execute(final ClusterState currentState) {
return deleteDanglingIndex(currentState, indexToDelete);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public String getMasterThrottlingKey() {
return "remove-data-stream";
}

@Override
public ClusterState execute(ClusterState currentState) {
return removeDataStream(deleteIndexService, currentState, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
return rolloverResult.clusterState;
}

@Override
public String getMasterThrottlingKey() {
return "rollover-index";
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.bulk;

import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;

import java.util.Iterator;
Expand Down Expand Up @@ -103,6 +104,19 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
}

/**
* It provides exponential backoff between retries until it reaches maxDelay.
* It uses equal jitter scheme as it is being used for throttled exceptions.
* It will make random distribution and also guarantees a minimum delay.
*
* @param baseDelay BaseDelay for exponential Backoff
* @param maxDelay MaxDelay that can be returned from backoff policy
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelay) {
return new ExponentialEqualJitterBackoff(baseDelay, maxDelay);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
Expand Down Expand Up @@ -180,6 +194,61 @@ public TimeValue next() {
}
}

private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
private final int maxDelay;
private final int baseDelay;

private ExponentialEqualJitterBackoff(int baseDelay, int maxDelay) {
this.maxDelay = maxDelay;
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelay);
}
}

private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
/**
* Maximum retry limit. Avoids integer overflow issues.
* Post Max Retries, max delay will be returned with Equal Jitter.
*
* NOTE: If the value is greater than 30, there can be integer overflow
* issues during delay calculation.
**/
private final int MAX_RETRIES = 30;

private final int maxDelay;
private final int baseDelay;
private int retriesAttempted;

private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelay) {
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
int retries = Math.min(retriesAttempted, MAX_RETRIES);
int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelay);
retriesAttempted++;
return TimeValue.timeValueMillis((exponentialDelay/2) +
Randomness.get().nextInt(exponentialDelay/2 + 1));
}
}

private static final class ConstantBackoff extends BackoffPolicy {
private final TimeValue delay;

Expand Down
Loading