Skip to content

Commit

Permalink
Data node changes for master task throttling (#4204)
Browse files Browse the repository at this point in the history
* Data node changes for master task throttling

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>

* Using Retryable action for retries
* Used RemoteAddress instead of new field for checking local Request
  • Loading branch information
dhwanilpatel authored Sep 6, 2022
1 parent e231136 commit 23f15a5
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 17 deletions.
53 changes: 53 additions & 0 deletions server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int max
return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
}

/**
* It provides exponential backoff between retries until it reaches Integer.MAX_VALUE.
* It uses full jitter scheme for random distribution.
*
* @param baseDelay BaseDelay for exponential Backoff
* @return A backoff policy with exponential backoff with full jitter.
*/
public static BackoffPolicy exponentialFullJitterBackoff(long baseDelay) {
return new ExponentialFullJitterBackoff(baseDelay);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
Expand Down Expand Up @@ -270,6 +281,48 @@ public TimeValue next() {
}
}

private static class ExponentialFullJitterBackoff extends BackoffPolicy {
private final long baseDelay;

private ExponentialFullJitterBackoff(long baseDelay) {
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialFullJitterBackoffIterator(baseDelay);
}
}

private static class ExponentialFullJitterBackoffIterator implements Iterator<TimeValue> {
/**
* Current delay in exponential backoff
*/
private long currentDelay;

private ExponentialFullJitterBackoffIterator(long baseDelay) {
this.currentDelay = baseDelay;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1);
currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE);
return delayToReturn;
}
}

/**
* Concrete Constant Back Off Policy
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.Randomness;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -64,6 +65,7 @@ public abstract class RetryableAction<Response> {
private final long startMillis;
private final ActionListener<Response> finalListener;
private final String executor;
private final BackoffPolicy backoffPolicy;

private volatile Scheduler.ScheduledCancellable retryTask;

Expand All @@ -74,7 +76,15 @@ public RetryableAction(
TimeValue timeoutValue,
ActionListener<Response> listener
) {
this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
this(
logger,
threadPool,
initialDelay,
timeoutValue,
listener,
BackoffPolicy.exponentialFullJitterBackoff(initialDelay.getMillis()),
ThreadPool.Names.SAME
);
}

public RetryableAction(
Expand All @@ -83,6 +93,7 @@ public RetryableAction(
TimeValue initialDelay,
TimeValue timeoutValue,
ActionListener<Response> listener,
BackoffPolicy backoffPolicy,
String executor
) {
this.logger = logger;
Expand All @@ -95,10 +106,11 @@ public RetryableAction(
this.startMillis = threadPool.relativeTimeInMillis();
this.finalListener = listener;
this.executor = executor;
this.backoffPolicy = backoffPolicy;
}

public void run() {
final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null);
final RetryingListener retryingListener = new RetryingListener(backoffPolicy.iterator(), null);
final Runnable runnable = createRunnable(retryingListener);
threadPool.executor(executor).execute(runnable);
}
Expand Down Expand Up @@ -146,12 +158,12 @@ private class RetryingListener implements ActionListener<Response> {

private static final int MAX_EXCEPTIONS = 4;

private final long delayMillisBound;
private ArrayDeque<Exception> caughtExceptions;
private Iterator<TimeValue> backoffDelayIterator;

private RetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
this.delayMillisBound = delayMillisBound;
private RetryingListener(Iterator<TimeValue> backoffDelayIterator, ArrayDeque<Exception> caughtExceptions) {
this.caughtExceptions = caughtExceptions;
this.backoffDelayIterator = backoffDelayIterator;
}

@Override
Expand All @@ -175,12 +187,9 @@ public void onFailure(Exception e) {
} else {
addException(e);

final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
final Runnable runnable = createRunnable(retryingListener);
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
final TimeValue delay = backoffDelayIterator.next();
final Runnable runnable = createRunnable(this);
if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
try {
retryTask = threadPool.schedule(runnable, delay, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterManagerNodeChangePredicate;
Expand All @@ -50,6 +52,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -156,34 +159,64 @@ protected boolean localExecute(Request request) {

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
}
new AsyncSingleAction(task, request, listener).doStart(state);
new AsyncSingleAction(task, request, listener).run();
}

/**
* Asynchronous single action
*
* @opensearch.internal
*/
class AsyncSingleAction {
class AsyncSingleAction extends RetryableAction {

private final ActionListener<Response> listener;
private ActionListener<Response> listener;
private final Request request;
private ClusterStateObserver observer;
private final long startTime;
private final Task task;
private static final int BASE_DELAY_MILLIS = 10;
private static final int MAX_DELAY_MILLIS = 5000;

AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
super(
logger,
threadPool,
TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
request.clusterManagerNodeTimeout,
listener,
BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
ThreadPool.Names.SAME
);
this.task = task;
this.request = request;
this.listener = listener;
this.startTime = threadPool.relativeTimeInMillis();
}

@Override
public void tryAction(ActionListener retryListener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
this.listener = retryListener;
doStart(state);
}

@Override
public boolean shouldRetry(Exception e) {
// If remote address is null, i.e request is generated from same node and we would want to perform retry for it
// If remote address is not null, i.e request is generated from remote node and received on this master node on transport layer
// in that case we would want throttling retry to perform on remote node only not on this master node.
if (request.remoteAddress() == null) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause() instanceof ClusterManagerThrottlingException;
}
return e instanceof ClusterManagerThrottlingException;
}
return false;
}

protected void doStart(ClusterState clusterState) {
try {
final DiscoveryNodes nodes = clusterState.nodes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,22 @@ public void testEqualJitterExponentialBackOffPolicy() {
assertTrue(delay.getMillis() <= maxDelay);
}
}

public void testExponentialBackOffPolicy() {
long baseDelay = 10;
int maxDelay = 10000;
long currentDelay = baseDelay;
BackoffPolicy policy = BackoffPolicy.exponentialFullJitterBackoff(baseDelay);
Iterator<TimeValue> iterator = policy.iterator();

// Assert equal jitter
int numberOfRetries = randomInt(20);

for (int i = 0; i < numberOfRetries; i++) {
TimeValue delay = iterator.next();
assertTrue(delay.getMillis() >= 0);
assertTrue(delay.getMillis() <= currentDelay);
currentDelay = currentDelay * 2;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -80,6 +81,9 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.opensearch.test.ClusterServiceUtils.setState;
Expand Down Expand Up @@ -606,4 +610,102 @@ public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
}

public void testThrottlingRetryLocalMaster() throws InterruptedException, BrokenBarrierException {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
AtomicBoolean exception = new AtomicBoolean(true);
AtomicBoolean retried = new AtomicBoolean(false);
CyclicBarrier barrier = new CyclicBarrier(2);
setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode }));

TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) {
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
if (exception.getAndSet(false)) {
throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test");
} else {
try {
retried.set(true);
barrier.await();
} catch (Exception e) {
throw new AssertionError();
}
}
}
};
action.execute(request, listener);

barrier.await();
assertTrue(retried.get());
assertFalse(exception.get());
}

public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException {
Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60));
DiscoveryNode masterNode = this.remoteNode;
setState(
clusterService,
// use a random base version so it can go down when simulating a restart.
ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, new DiscoveryNode[] { localNode, masterNode }))
.version(randomIntBetween(0, 10))
);

PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool);
action.execute(request, listener);

CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0];
assertTrue(capturedRequest.node.isMasterNode());
assertThat(capturedRequest.request, equalTo(request));
assertThat(capturedRequest.action, equalTo("internal:testAction"));
transport.handleRemoteError(
capturedRequest.requestId,
new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test")
);

assertFalse(listener.isDone());

// waiting for retry to trigger
Thread.sleep(100);

// Retry for above throttling exception
capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1));
capturedRequest = capturedRequests[0];
Response response = new Response();
transport.handleResponse(capturedRequest.requestId, response);

assertTrue(listener.isDone());
listener.get();
}

public void testRetryForDifferentException() throws InterruptedException, BrokenBarrierException {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
AtomicBoolean exception = new AtomicBoolean(true);
AtomicBoolean retried = new AtomicBoolean(false);
CyclicBarrier barrier = new CyclicBarrier(2);
setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode }));

TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) {
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
throws Exception {
if (exception.getAndSet(false)) {
throw new Exception("Different exception");
} else {
// If called second time due to retry, throw exception
retried.set(true);
throw new AssertionError("Should not retry for other exception");
}
}
};
action.execute(request, listener);

assertFalse(retried.get());
assertFalse(exception.get());
}
}

0 comments on commit 23f15a5

Please sign in to comment.