From 23f15a53657ca25332606f2bf29ebfe0b8db2ef9 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Tue, 6 Sep 2022 16:07:49 +0530 Subject: [PATCH] Data node changes for master task throttling (#4204) * Data node changes for master task throttling Signed-off-by: Dhwanil Patel * Using Retryable action for retries * Used RemoteAddress instead of new field for checking local Request --- .../opensearch/action/bulk/BackoffPolicy.java | 53 +++++++++ .../action/support/RetryableAction.java | 31 ++++-- .../TransportClusterManagerNodeAction.java | 45 ++++++-- .../action/bulk/BackoffPolicyTests.java | 18 ++++ ...ransportClusterManagerNodeActionTests.java | 102 ++++++++++++++++++ 5 files changed, 232 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java index 15200417d28ac..185e6b76ec18e 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java +++ b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java @@ -119,6 +119,17 @@ public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int max return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry); } + /** + * It provides exponential backoff between retries until it reaches Integer.MAX_VALUE. + * It uses full jitter scheme for random distribution. + * + * @param baseDelay BaseDelay for exponential Backoff + * @return A backoff policy with exponential backoff with full jitter. + */ + public static BackoffPolicy exponentialFullJitterBackoff(long baseDelay) { + return new ExponentialFullJitterBackoff(baseDelay); + } + /** * Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy. */ @@ -270,6 +281,48 @@ public TimeValue next() { } } + private static class ExponentialFullJitterBackoff extends BackoffPolicy { + private final long baseDelay; + + private ExponentialFullJitterBackoff(long baseDelay) { + this.baseDelay = baseDelay; + } + + @Override + public Iterator iterator() { + return new ExponentialFullJitterBackoffIterator(baseDelay); + } + } + + private static class ExponentialFullJitterBackoffIterator implements Iterator { + /** + * Current delay in exponential backoff + */ + private long currentDelay; + + private ExponentialFullJitterBackoffIterator(long baseDelay) { + this.currentDelay = baseDelay; + } + + /** + * There is not any limit for this BackOff. + * This Iterator will always return back off delay. + * + * @return true + */ + @Override + public boolean hasNext() { + return true; + } + + @Override + public TimeValue next() { + TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1); + currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE); + return delayToReturn; + } + } + /** * Concrete Constant Back Off Policy * diff --git a/server/src/main/java/org/opensearch/action/support/RetryableAction.java b/server/src/main/java/org/opensearch/action/support/RetryableAction.java index 38b7e6ec2a8a0..e7a9d7545342b 100644 --- a/server/src/main/java/org/opensearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/opensearch/action/support/RetryableAction.java @@ -36,13 +36,14 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.common.Randomness; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayDeque; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -64,6 +65,7 @@ public abstract class RetryableAction { private final long startMillis; private final ActionListener finalListener; private final String executor; + private final BackoffPolicy backoffPolicy; private volatile Scheduler.ScheduledCancellable retryTask; @@ -74,7 +76,15 @@ public RetryableAction( TimeValue timeoutValue, ActionListener listener ) { - this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME); + this( + logger, + threadPool, + initialDelay, + timeoutValue, + listener, + BackoffPolicy.exponentialFullJitterBackoff(initialDelay.getMillis()), + ThreadPool.Names.SAME + ); } public RetryableAction( @@ -83,6 +93,7 @@ public RetryableAction( TimeValue initialDelay, TimeValue timeoutValue, ActionListener listener, + BackoffPolicy backoffPolicy, String executor ) { this.logger = logger; @@ -95,10 +106,11 @@ public RetryableAction( this.startMillis = threadPool.relativeTimeInMillis(); this.finalListener = listener; this.executor = executor; + this.backoffPolicy = backoffPolicy; } public void run() { - final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null); + final RetryingListener retryingListener = new RetryingListener(backoffPolicy.iterator(), null); final Runnable runnable = createRunnable(retryingListener); threadPool.executor(executor).execute(runnable); } @@ -146,12 +158,12 @@ private class RetryingListener implements ActionListener { private static final int MAX_EXCEPTIONS = 4; - private final long delayMillisBound; private ArrayDeque caughtExceptions; + private Iterator backoffDelayIterator; - private RetryingListener(long delayMillisBound, ArrayDeque caughtExceptions) { - this.delayMillisBound = delayMillisBound; + private RetryingListener(Iterator backoffDelayIterator, ArrayDeque caughtExceptions) { this.caughtExceptions = caughtExceptions; + this.backoffDelayIterator = backoffDelayIterator; } @Override @@ -175,12 +187,9 @@ public void onFailure(Exception e) { } else { addException(e); - final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE); - final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions); - final Runnable runnable = createRunnable(retryingListener); - final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1; + final TimeValue delay = backoffDelayIterator.next(); + final Runnable runnable = createRunnable(this); if (isDone.get() == false) { - final TimeValue delay = TimeValue.timeValueMillis(delayMillis); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); try { retryTask = threadPool.schedule(runnable, delay, executor); diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index a97f4ffe555b6..51bde108dff73 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -39,8 +39,10 @@ import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.RetryableAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterManagerNodeChangePredicate; @@ -50,6 +52,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.Writeable; @@ -156,12 +159,10 @@ protected boolean localExecute(Request request) { @Override protected void doExecute(Task task, final Request request, ActionListener listener) { - ClusterState state = clusterService.state(); - logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); } - new AsyncSingleAction(task, request, listener).doStart(state); + new AsyncSingleAction(task, request, listener).run(); } /** @@ -169,21 +170,53 @@ protected void doExecute(Task task, final Request request, ActionListener listener; + private ActionListener listener; private final Request request; private ClusterStateObserver observer; private final long startTime; private final Task task; + private static final int BASE_DELAY_MILLIS = 10; + private static final int MAX_DELAY_MILLIS = 5000; AsyncSingleAction(Task task, Request request, ActionListener listener) { + super( + logger, + threadPool, + TimeValue.timeValueMillis(BASE_DELAY_MILLIS), + request.clusterManagerNodeTimeout, + listener, + BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS), + ThreadPool.Names.SAME + ); this.task = task; this.request = request; - this.listener = listener; this.startTime = threadPool.relativeTimeInMillis(); } + @Override + public void tryAction(ActionListener retryListener) { + ClusterState state = clusterService.state(); + logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); + this.listener = retryListener; + doStart(state); + } + + @Override + public boolean shouldRetry(Exception e) { + // If remote address is null, i.e request is generated from same node and we would want to perform retry for it + // If remote address is not null, i.e request is generated from remote node and received on this master node on transport layer + // in that case we would want throttling retry to perform on remote node only not on this master node. + if (request.remoteAddress() == null) { + if (e instanceof TransportException) { + return ((TransportException) e).unwrapCause() instanceof ClusterManagerThrottlingException; + } + return e instanceof ClusterManagerThrottlingException; + } + return false; + } + protected void doStart(ClusterState clusterState) { try { final DiscoveryNodes nodes = clusterState.nodes(); diff --git a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java index 0daf74acc9ae9..2f9ae9a154f46 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java @@ -98,4 +98,22 @@ public void testEqualJitterExponentialBackOffPolicy() { assertTrue(delay.getMillis() <= maxDelay); } } + + public void testExponentialBackOffPolicy() { + long baseDelay = 10; + int maxDelay = 10000; + long currentDelay = baseDelay; + BackoffPolicy policy = BackoffPolicy.exponentialFullJitterBackoff(baseDelay); + Iterator iterator = policy.iterator(); + + // Assert equal jitter + int numberOfRetries = randomInt(20); + + for (int i = 0; i < numberOfRetries; i++) { + TimeValue delay = iterator.next(); + assertTrue(delay.getMillis() >= 0); + assertTrue(delay.getMillis() <= currentDelay); + currentDelay = currentDelay * 2; + } + } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 1195ed2590b1e..c45bae224dbd6 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -52,6 +52,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -80,6 +81,9 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -606,4 +610,102 @@ public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws assertTrue(listener.isDone()); assertThat(listener.get(), equalTo(response)); } + + public void testThrottlingRetryLocalMaster() throws InterruptedException, BrokenBarrierException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean exception = new AtomicBoolean(true); + AtomicBoolean retried = new AtomicBoolean(false); + CyclicBarrier barrier = new CyclicBarrier(2); + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode })); + + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + if (exception.getAndSet(false)) { + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test"); + } else { + try { + retried.set(true); + barrier.await(); + } catch (Exception e) { + throw new AssertionError(); + } + } + } + }; + action.execute(request, listener); + + barrier.await(); + assertTrue(retried.get()); + assertFalse(exception.get()); + } + + public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException { + Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60)); + DiscoveryNode masterNode = this.remoteNode; + setState( + clusterService, + // use a random base version so it can go down when simulating a restart. + ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, new DiscoveryNode[] { localNode, masterNode })) + .version(randomIntBetween(0, 10)) + ); + + PlainActionFuture listener = new PlainActionFuture<>(); + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool); + action.execute(request, listener); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0]; + assertTrue(capturedRequest.node.isMasterNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("internal:testAction")); + transport.handleRemoteError( + capturedRequest.requestId, + new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test") + ); + + assertFalse(listener.isDone()); + + // waiting for retry to trigger + Thread.sleep(100); + + // Retry for above throttling exception + capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + capturedRequest = capturedRequests[0]; + Response response = new Response(); + transport.handleResponse(capturedRequest.requestId, response); + + assertTrue(listener.isDone()); + listener.get(); + } + + public void testRetryForDifferentException() throws InterruptedException, BrokenBarrierException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean exception = new AtomicBoolean(true); + AtomicBoolean retried = new AtomicBoolean(false); + CyclicBarrier barrier = new CyclicBarrier(2); + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode })); + + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) + throws Exception { + if (exception.getAndSet(false)) { + throw new Exception("Different exception"); + } else { + // If called second time due to retry, throw exception + retried.set(true); + throw new AssertionError("Should not retry for other exception"); + } + } + }; + action.execute(request, listener); + + assertFalse(retried.get()); + assertFalse(exception.get()); + } }