Skip to content

Commit

Permalink
Rebased older PR(opensearch-project#553) with latest mainline
Browse files Browse the repository at this point in the history
Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed May 26, 2022
1 parent eb847ae commit 6e9edd1
Show file tree
Hide file tree
Showing 43 changed files with 2,109 additions and 209 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.master;

import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.support.master.MasterThrottlingRetryListener;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.cluster.service.MasterTaskThrottler;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportMessageListener;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class MasterTaskThrottlingIT extends OpenSearchIntegTestCase {

private static final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);

/*
* This integ test will test end-end master throttling feature for
* remote master.
*
* It will check the number of request coming to master node
* should be total number of requests + throttled requests from master.
* This will ensure the end-end feature is working as master is throwing
* Throttling exception and data node is performing retries on it.
*
*/
public void testThrottlingForRemoteMaster() throws Exception {
try {
internalCluster().beforeTest(random(), 0);
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
int throttlingLimit = randomIntBetween(1, 5);
createIndex("test");

ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", throttlingLimit)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());

TransportService masterTransportService = (internalCluster().getInstance(TransportService.class, masterNode));
AtomicInteger requestCountOnMaster = new AtomicInteger();
AtomicInteger throttledRequest = new AtomicInteger();
int totalRequest = randomIntBetween(throttlingLimit, 5 * throttlingLimit);
CountDownLatch latch = new CountDownLatch(totalRequest);

masterTransportService.addMessageListener(new TransportMessageListener() {
@Override
public void onRequestReceived(long requestId, String action) {
if (action.contains("mapping")) {
requestCountOnMaster.incrementAndGet();
}
}

@Override
public void onResponseSent(long requestId, String action, Exception error) {
if (action.contains("mapping")) {
throttledRequest.incrementAndGet();
assertEquals(MasterTaskThrottlingException.class, error.getClass());
}
}
});

ActionListener listener = new ActionListener() {
@Override
public void onResponse(Object o) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
latch.countDown();
throw new AssertionError(e);
}
};

Thread[] threads = new Thread[totalRequest];
for (int i = 0; i < totalRequest; i++) {
PutMappingRequest putMappingRequest = new PutMappingRequest("test")
.type("type")
.source("field" + i, "type=text");
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
internalCluster().client(dataNode).admin().indices().putMapping(putMappingRequest, listener);
}
});
}
for (int i = 0; i < totalRequest; i++) {
threads[i].run();
}
for (int i = 0; i < totalRequest; i++) {
threads[i].join();
}
latch.await();

assertEquals(totalRequest + throttledRequest.get(), requestCountOnMaster.get());
assertBusy(() -> {
assertEquals(clusterService().getMasterService().numberOfThrottledPendingTasks(), throttledRequest.get());
});
assertEquals(MasterThrottlingRetryListener.getRetryingTasksCount(), 0);
}
finally {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", (String) null)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());
}
}

/*
* This will test the throttling feature for single node.
*
* Here we will assert the client behaviour that client's request is not
* failed, i.e. Throttling exception is not passed to the client.
* Data node will internally do the retry and request should pass.
*
*/
public void testThrottlingForSingleNode() throws Exception {
try {
internalCluster().beforeTest(random(), 0);
String node = internalCluster().startNode();
int throttlingLimit = randomIntBetween(1, 5);
createIndex("test");

ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", throttlingLimit)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());

AtomicInteger successfulRequest = new AtomicInteger();
int totalRequest = randomIntBetween(throttlingLimit, 3 * throttlingLimit);
CountDownLatch latch = new CountDownLatch(totalRequest);

ActionListener listener = new ActionListener() {
@Override
public void onResponse(Object o) {
latch.countDown();
successfulRequest.incrementAndGet();
}

@Override
public void onFailure(Exception e) {
latch.countDown();
throw new AssertionError(e);
}
};

Thread[] threads = new Thread[totalRequest];
for (int i = 0; i < totalRequest; i++) {
PutMappingRequest putMappingRequest = new PutMappingRequest("test")
.type("type")
.source("field" + i, "type=text");
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
internalCluster().client(node).admin().indices().putMapping(putMappingRequest, listener);
}
});
}
for (int i = 0; i < totalRequest; i++) {
threads[i].run();
}
for (int i = 0; i < totalRequest; i++) {
threads[i].join();
}

latch.await();
assertEquals(totalRequest, successfulRequest.get());
assertEquals(MasterThrottlingRetryListener.getRetryingTasksCount(), 0);
}
finally {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
Settings settings = Settings.builder()
.put("master.throttling.thresholds.put-mapping.value", (String) null)
.build();
settingsRequest.transientSettings(settings);
assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet());
}
}

@BeforeClass
public static void initTestScheduler() {
MasterThrottlingRetryListener.setThrottlingRetryScheduler(scheduler);
}

@AfterClass
public static void terminateScheduler() {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,15 @@ private enum OpenSearchExceptionHandle {
org.opensearch.transport.NoSeedNodeLeftException::new,
160,
LegacyESVersion.V_7_10_0
),
/**
* TODO: Change the version number of check as per version in which this change will be merged.
*/
MASTER_TASK_THROTTLED_EXCEPTION(
org.opensearch.cluster.service.MasterTaskThrottlingException.class,
org.opensearch.cluster.service.MasterTaskThrottlingException::new,
161,
Version.V_1_3_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClus
this.allocationService = allocationService;
}

@Override
public String getMasterThrottlingKey() {
return "cluster-reroute-api";
}

@Override
protected ClusterRerouteResponse newResponse(boolean acknowledged) {
return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ protected void masterOperation(

private volatile boolean changed = false;

@Override
public String getMasterThrottlingKey() {
return "cluster-update-settings";
}

@Override
protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
}

@Override
public String getMasterThrottlingKey() {
return "auto-create";
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {
return new AcknowledgedResponse(acknowledged);
}

@Override
public String getMasterThrottlingKey() {
return "delete-dangling-index";
}

@Override
public ClusterState execute(final ClusterState currentState) {
return deleteDanglingIndex(currentState, indexToDelete);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public String getMasterThrottlingKey() {
return "remove-data-stream";
}

@Override
public ClusterState execute(ClusterState currentState) {
return removeDataStream(deleteIndexService, currentState, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
return rolloverResult.clusterState;
}

@Override
public String getMasterThrottlingKey() {
return "rollover-index";
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
Expand Down
69 changes: 69 additions & 0 deletions server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.bulk;

import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;

import java.util.Iterator;
Expand Down Expand Up @@ -105,6 +106,19 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
}

/**
* It provides exponential backoff between retries until it reaches maxDelay.
* It uses equal jitter scheme as it is being used for throttled exceptions.
* It will make random distribution and also guarantees a minimum delay.
*
* @param baseDelay BaseDelay for exponential Backoff
* @param maxDelay MaxDelay that can be returned from backoff policy
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelay) {
return new ExponentialEqualJitterBackoff(baseDelay, maxDelay);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
Expand Down Expand Up @@ -197,6 +211,61 @@ public TimeValue next() {
}
}

private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
private final int maxDelay;
private final int baseDelay;

private ExponentialEqualJitterBackoff(int baseDelay, int maxDelay) {
this.maxDelay = maxDelay;
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelay);
}
}

private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
/**
* Maximum retry limit. Avoids integer overflow issues.
* Post Max Retries, max delay will be returned with Equal Jitter.
*
* NOTE: If the value is greater than 30, there can be integer overflow
* issues during delay calculation.
**/
private final int MAX_RETRIES = 30;

private final int maxDelay;
private final int baseDelay;
private int retriesAttempted;

private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelay) {
this.baseDelay = baseDelay;
this.maxDelay = maxDelay;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
int retries = Math.min(retriesAttempted, MAX_RETRIES);
int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelay);
retriesAttempted++;
return TimeValue.timeValueMillis((exponentialDelay/2) +
Randomness.get().nextInt(exponentialDelay/2 + 1));
}
}

/**
* Concrete Constant Back Off Policy
*
Expand Down
Loading

0 comments on commit 6e9edd1

Please sign in to comment.