From 14acc5c715722d1a77812935452b997c9fb075a6 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Mon, 6 Jun 2022 16:48:11 +0530
Subject: [PATCH 1/9] Data node changes for master task throttling

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../ClusterManagerNodeRequest.java            |  13 ++
 .../MasterThrottlingRetryListener.java        | 131 +++++++++++++
 .../TransportClusterManagerNodeAction.java    |  13 +-
 .../MasterThrottlingRetryListenerTests.java   | 175 ++++++++++++++++++
 ...ransportClusterManagerNodeActionTests.java |  80 ++++++++
 5 files changed, 408 insertions(+), 4 deletions(-)
 create mode 100644 server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java
 create mode 100644 server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java

diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java b/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java
index 9d8a79cfed11d..d20a4d99cf767 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java
@@ -58,17 +58,21 @@ public abstract class ClusterManagerNodeRequest<Request extends ClusterManagerNo
     @Deprecated
     protected TimeValue masterNodeTimeout = clusterManagerNodeTimeout;
 
+    protected boolean remoteRequest;
+
     protected ClusterManagerNodeRequest() {}
 
     protected ClusterManagerNodeRequest(StreamInput in) throws IOException {
         super(in);
         clusterManagerNodeTimeout = in.readTimeValue();
+        remoteRequest = in.readOptionalBoolean();
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
         out.writeTimeValue(clusterManagerNodeTimeout);
+        out.writeOptionalBoolean(remoteRequest);
     }
 
     /**
@@ -110,6 +114,11 @@ public final Request masterNodeTimeout(String timeout) {
         return clusterManagerNodeTimeout(timeout);
     }
 
+    public final Request setRemoteRequest(boolean remoteRequest) {
+        this.remoteRequest = remoteRequest;
+        return (Request) this;
+    }
+
     public final TimeValue clusterManagerNodeTimeout() {
         return this.clusterManagerNodeTimeout;
     }
@@ -119,4 +128,8 @@ public final TimeValue clusterManagerNodeTimeout() {
     public final TimeValue masterNodeTimeout() {
         return clusterManagerNodeTimeout();
     }
+
+    public final boolean isRemoteRequest() {
+        return this.remoteRequest;
+    }
 }
diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java b/server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java
new file mode 100644
index 0000000000000..f287b6680498a
--- /dev/null
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java
@@ -0,0 +1,131 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.action.support.clustermanager;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.action.ActionListener;
+import org.opensearch.action.ActionResponse;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
+import org.opensearch.cluster.service.MasterTaskThrottlingException;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.threadpool.Scheduler;
+import org.opensearch.transport.TransportException;
+
+import java.util.Iterator;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ActionListener for retrying the Throttled master tasks.
+ * It schedules the retry on the Throttling Exception from master node and
+ * delegates the response if it receive response from master.
+ *
+ * It uses ExponentialEqualJitterBackoff policy for determining delay between retries.
+ */
+public class MasterThrottlingRetryListener<Request extends ClusterManagerNodeRequest<Request>, Response extends ActionResponse>
+    implements
+        ActionListener<Response> {
+
+    private static final Logger logger = LogManager.getLogger(MasterThrottlingRetryListener.class);
+
+    /**
+     * Base delay in millis.
+     */
+    private final int BASE_DELAY_MILLIS = 10;
+
+    /**
+     * Maximum delay in millis.
+     */
+    private final int MAX_DELAY_MILLIS = 5000;
+
+    private long totalDelay;
+    private final Iterator<TimeValue> backoffDelay;
+    private final ActionListener<Response> listener;
+    private final Request request;
+    private final Runnable runnable;
+    private final String actionName;
+    private final boolean localNodeRequest;
+
+    private static ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
+
+    public MasterThrottlingRetryListener(String actionName, Request request, Runnable runnable, ActionListener<Response> actionListener) {
+        this.actionName = actionName;
+        this.listener = actionListener;
+        this.request = request;
+        this.runnable = runnable;
+        this.backoffDelay = BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS).iterator();
+        /**
+         This is to determine whether request is generated from local node or from remote node.
+         If it is local node's request we need to perform the retries on this node.
+         If it is remote node's request, we will not perform retries on this node and let remote node perform the retries.
+
+         If request is from remote data node, then data node will set remoteRequest flag in {@link MasterNodeRequest}
+         and send request to master, using that on master node we can determine if the request was localRequest or remoteRequest.
+         */
+        this.localNodeRequest = !(request.isRemoteRequest());
+    }
+
+    @Override
+    public void onResponse(Response response) {
+        listener.onResponse(response);
+    }
+
+    @Override
+    public void onFailure(Exception e) {
+
+        if (localNodeRequest && isThrottlingException(e)) {
+            logger.info("Retrying [{}] on throttling exception from master. Error: [{}]", actionName, getExceptionMessage(e));
+            long delay = backoffDelay.next().getMillis();
+            if (totalDelay + delay >= request.clusterManagerNodeTimeout.getMillis()) {
+                delay = request.clusterManagerNodeTimeout.getMillis() - totalDelay;
+                scheduler.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        listener.onFailure(new ProcessClusterEventTimeoutException(request.clusterManagerNodeTimeout, actionName));
+                    }
+                }, delay, TimeUnit.MILLISECONDS);
+            } else {
+                scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+            }
+            totalDelay += delay;
+        } else {
+            listener.onFailure(e);
+        }
+    }
+
+    /**
+     * For Testcase purposes.
+     * @param retrySceduler scheduler defined in test cases.
+     */
+    public static void setThrottlingRetryScheduler(ScheduledThreadPoolExecutor retrySceduler) {
+        scheduler = retrySceduler;
+    }
+
+    private boolean isThrottlingException(Exception e) {
+        if (e instanceof TransportException) {
+            return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
+        }
+        return e instanceof MasterTaskThrottlingException;
+    }
+
+    private String getExceptionMessage(Exception e) {
+        if (e instanceof TransportException) {
+            return ((TransportException) e).unwrapCause().getMessage();
+        } else {
+            return e.getMessage();
+        }
+    }
+
+    public static long getRetryingTasksCount() {
+        return scheduler.getActiveCount() + scheduler.getQueue().size();
+    }
+}
diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
index 46c93a8d33c7b..4744107d28854 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
@@ -134,12 +134,10 @@ protected boolean localExecute(Request request) {
 
     @Override
     protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
-        ClusterState state = clusterService.state();
-        logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
         if (task != null) {
             request.setParentTask(clusterService.localNode().getId(), task.getId());
         }
-        new AsyncSingleAction(task, request, listener).doStart(state);
+        new AsyncSingleAction(task, request, listener).start();
     }
 
     /**
@@ -158,10 +156,16 @@ class AsyncSingleAction {
         AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
             this.task = task;
             this.request = request;
-            this.listener = listener;
+            this.listener = new MasterThrottlingRetryListener(actionName, request, this::start, listener);
             this.startTime = threadPool.relativeTimeInMillis();
         }
 
+        public void start() {
+            ClusterState state = clusterService.state();
+            logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
+            doStart(state);
+        }
+
         protected void doStart(ClusterState clusterState) {
             try {
                 final DiscoveryNodes nodes = clusterState.nodes();
@@ -210,6 +214,7 @@ protected void doStart(ClusterState clusterState) {
                     } else {
                         DiscoveryNode clusterManagerNode = nodes.getMasterNode();
                         final String actionName = getClusterManagerActionName(clusterManagerNode);
+                        request.setRemoteRequest(true);
                         transportService.sendRequest(
                             clusterManagerNode,
                             actionName,
diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java
new file mode 100644
index 0000000000000..dc46590d60951
--- /dev/null
+++ b/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java
@@ -0,0 +1,175 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.action.support.clustermanager;
+
+import org.opensearch.action.ActionListener;
+import org.opensearch.action.support.PlainActionFuture;
+import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
+import org.opensearch.cluster.service.MasterTaskThrottlingException;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.threadpool.Scheduler;
+
+import java.time.Instant;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+
+/**
+ * Test class of {@link MasterThrottlingRetryListener}
+ */
+public class MasterThrottlingRetryListenerTests extends OpenSearchTestCase {
+    private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY);
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler);
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS);
+    }
+
+    public void testRetryForLocalRequest() throws BrokenBarrierException, InterruptedException {
+        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
+        PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        AtomicBoolean callBackExecuted = new AtomicBoolean();
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    callBackExecuted.set(true);
+                    barrier.await();
+                } catch (Exception e) {
+                    new AssertionError();
+                }
+            }
+        };
+
+        ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);
+
+        taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
+        barrier.await();
+        assertTrue(callBackExecuted.get());
+    }
+
+    public void testRetryForRemoteRequest() throws BrokenBarrierException, InterruptedException {
+        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
+        request.setRemoteRequest(true);
+        PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
+        AtomicBoolean callBackExecuted = new AtomicBoolean(false);
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    callBackExecuted.set(true);
+                } catch (Exception e) {
+                    new AssertionError();
+                }
+            }
+        };
+
+        ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);
+
+        taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
+        Thread.sleep(100); // some buffer time so callback can execute.
+        assertFalse(callBackExecuted.get());
+    }
+
+    public void testTimedOut() throws BrokenBarrierException, InterruptedException {
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        AtomicBoolean onFailureExecuted = new AtomicBoolean();
+        AtomicBoolean retryExecuted = new AtomicBoolean();
+        AtomicBoolean firstExecute = new AtomicBoolean(true);
+        int timeOutSec = randomIntBetween(1, 5);
+        final Instant[] startTime = new Instant[1];
+        final Instant[] endTime = new Instant[1];
+
+        ActionListener listener = new ActionListener() {
+            @Override
+            public void onResponse(Object o) {
+                new AssertionError();
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                endTime[0] = Instant.now();
+                try {
+                    onFailureExecuted.set(true);
+                    barrier.await();
+                } catch (Exception exe) {
+                    new AssertionError();
+                }
+                assertEquals(ProcessClusterEventTimeoutException.class, e.getClass());
+            }
+        };
+        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request()
+            .clusterManagerNodeTimeout(TimeValue.timeValueSeconds(timeOutSec));
+
+        class TestRetryClass {
+            ActionListener listener;
+
+            TestRetryClass(ActionListener listener) {
+                this.listener = new MasterThrottlingRetryListener("test", request, this::execute, listener);
+            }
+
+            public void execute() {
+                if (firstExecute.getAndSet(false)) {
+                    startTime[0] = Instant.now();
+                }
+                listener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
+            }
+        }
+
+        TestRetryClass testRetryClass = new TestRetryClass(listener);
+        testRetryClass.execute();
+
+        barrier.await();
+        assertEquals(timeOutSec, (endTime[0].toEpochMilli() - startTime[0].toEpochMilli()) / 1000);
+        assertTrue(onFailureExecuted.get());
+        assertFalse(retryExecuted.get());
+    }
+
+    public void testRetryForDifferentException() {
+
+        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
+        PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
+        AtomicBoolean callBackExecuted = new AtomicBoolean();
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    callBackExecuted.set(true);
+                } catch (Exception e) {
+                    new AssertionError();
+                }
+            }
+        };
+
+        ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);
+
+        taskRetryListener.onFailure(new Exception());
+        assertFalse(callBackExecuted.get());
+
+        taskRetryListener.onFailure(new OpenSearchRejectedExecutionException("Different Exception"));
+        assertFalse(callBackExecuted.get());
+    }
+}
diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
index a5828edd65c16..b2e1b7c32b2f7 100644
--- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
+++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
@@ -53,6 +53,7 @@
 import org.opensearch.cluster.node.DiscoveryNodeRole;
 import org.opensearch.cluster.node.DiscoveryNodes;
 import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.io.stream.StreamInput;
 import org.opensearch.common.io.stream.StreamOutput;
 import org.opensearch.common.settings.Settings;
@@ -66,6 +67,7 @@
 import org.opensearch.test.transport.CapturingTransport;
 import org.opensearch.threadpool.TestThreadPool;
 import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.threadpool.Scheduler;
 import org.opensearch.transport.ConnectTransportException;
 import org.opensearch.transport.TransportService;
 import org.junit.After;
@@ -80,6 +82,10 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static org.opensearch.test.ClusterServiceUtils.createClusterService;
 import static org.opensearch.test.ClusterServiceUtils.setState;
@@ -95,6 +101,7 @@ public class TransportClusterManagerNodeActionTests extends OpenSearchTestCase {
     private DiscoveryNode localNode;
     private DiscoveryNode remoteNode;
     private DiscoveryNode[] allNodes;
+    private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY);
 
     @BeforeClass
     public static void beforeClass() {
@@ -132,6 +139,7 @@ public void setUp() throws Exception {
             Version.CURRENT
         );
         allNodes = new DiscoveryNode[] { localNode, remoteNode };
+        MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler);
     }
 
     @After
@@ -144,6 +152,7 @@ public void tearDown() throws Exception {
     @AfterClass
     public static void afterClass() {
         ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
+        Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS);
         threadPool = null;
     }
 
@@ -568,4 +577,75 @@ public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws
         assertTrue(listener.isDone());
         assertThat(listener.get(), equalTo(response));
     }
+
+    public void testThrottlingRetryLocalMaster() throws InterruptedException, BrokenBarrierException {
+        Request request = new Request();
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        AtomicBoolean exception = new AtomicBoolean(true);
+        AtomicBoolean retried = new AtomicBoolean(false);
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode }));
+
+        TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) {
+            @Override
+            protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
+                if (exception.getAndSet(false)) {
+                    throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test");
+                } else {
+                    try {
+                        retried.set(true);
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new AssertionError();
+                    }
+                }
+            }
+        };
+        action.execute(request, listener);
+
+        barrier.await();
+        assertTrue(retried.get());
+        assertFalse(exception.get());
+    }
+
+    public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException {
+        Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
+        DiscoveryNode masterNode = this.remoteNode;
+        setState(
+            clusterService,
+            // use a random base version so it can go down when simulating a restart.
+            ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, new DiscoveryNode[] { localNode, masterNode }))
+                .version(randomIntBetween(0, 10))
+        );
+
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool);
+        action.execute(request, listener);
+
+        CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
+        assertThat(capturedRequests.length, equalTo(1));
+        CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0];
+        assertTrue(capturedRequest.node.isMasterNode());
+        assertThat(capturedRequest.request, equalTo(request));
+        assertThat(capturedRequest.action, equalTo("internal:testAction"));
+        transport.handleRemoteError(
+            capturedRequest.requestId,
+            new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")
+        );
+
+        assertFalse(listener.isDone());
+
+        // waiting for retry to trigger
+        Thread.sleep(100);
+
+        // Retry for above throttling exception
+        capturedRequests = transport.getCapturedRequestsAndClear();
+        assertThat(capturedRequests.length, equalTo(1));
+        capturedRequest = capturedRequests[0];
+        Response response = new Response();
+        transport.handleResponse(capturedRequest.requestId, response);
+
+        assertTrue(listener.isDone());
+        listener.get();
+    }
 }

From 3b85f13d44b2f10244d79cbb8f13bb338eab3123 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Fri, 19 Aug 2022 18:55:13 +0530
Subject: [PATCH 2/9] Using Retryable action for retries

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../action/support/RetryableAction.java       | 67 ++++++++++++++++---
 .../TransportClusterManagerNodeAction.java    | 27 ++++++--
 ...ransportClusterManagerNodeActionTests.java |  2 +-
 3 files changed, 82 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/opensearch/action/support/RetryableAction.java b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
index 38b7e6ec2a8a0..f24a1b8f1bd44 100644
--- a/server/src/main/java/org/opensearch/action/support/RetryableAction.java
+++ b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
@@ -36,6 +36,7 @@
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.ActionRunnable;
+import org.opensearch.action.bulk.BackoffPolicy;
 import org.opensearch.common.Randomness;
 import org.opensearch.common.unit.TimeValue;
 import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
@@ -43,6 +44,7 @@
 import org.opensearch.threadpool.ThreadPool;
 
 import java.util.ArrayDeque;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -64,6 +66,7 @@ public abstract class RetryableAction<Response> {
     private final long startMillis;
     private final ActionListener<Response> finalListener;
     private final String executor;
+    private final BackoffPolicy backoffPolicy;
 
     private volatile Scheduler.ScheduledCancellable retryTask;
 
@@ -74,7 +77,7 @@ public RetryableAction(
         TimeValue timeoutValue,
         ActionListener<Response> listener
     ) {
-        this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
+        this(logger, threadPool, initialDelay, timeoutValue, listener, null, ThreadPool.Names.SAME);
     }
 
     public RetryableAction(
@@ -83,6 +86,7 @@ public RetryableAction(
         TimeValue initialDelay,
         TimeValue timeoutValue,
         ActionListener<Response> listener,
+        BackoffPolicy backoffPolicy,
         String executor
     ) {
         this.logger = logger;
@@ -95,10 +99,13 @@ public RetryableAction(
         this.startMillis = threadPool.relativeTimeInMillis();
         this.finalListener = listener;
         this.executor = executor;
+        this.backoffPolicy = backoffPolicy;
     }
 
     public void run() {
-        final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null);
+        final RetryingListener retryingListener = backoffPolicy == null
+            ? new DefaultRetryingListener(initialDelayMillis, null)
+            : new CustomRetryingListener(backoffPolicy.iterator(), null);
         final Runnable runnable = createRunnable(retryingListener);
         threadPool.executor(executor).execute(runnable);
     }
@@ -142,18 +149,59 @@ public void onRejection(Exception e) {
 
     public void onFinished() {}
 
-    private class RetryingListener implements ActionListener<Response> {
+    private class CustomRetryingListener extends RetryingListener {
 
-        private static final int MAX_EXCEPTIONS = 4;
+        private Iterator<TimeValue> backoffDelayIterator;
+        private ArrayDeque<Exception> caughtExceptions;
+
+        private CustomRetryingListener(Iterator<TimeValue> backoffDelayIterator, ArrayDeque<Exception> caughtExceptions) {
+            super(caughtExceptions);
+            this.backoffDelayIterator = backoffDelayIterator;
+            this.caughtExceptions = caughtExceptions;
+        }
+
+        public RetryingListener getRetryingListenerForNextRetry() {
+            return this;
+        }
+
+        public long getRetryDelay() {
+            return backoffDelayIterator.next().millis();
+        }
+
+    }
+
+    private class DefaultRetryingListener extends RetryingListener {
 
         private final long delayMillisBound;
         private ArrayDeque<Exception> caughtExceptions;
 
-        private RetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
+        private DefaultRetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
+            super(caughtExceptions);
             this.delayMillisBound = delayMillisBound;
             this.caughtExceptions = caughtExceptions;
         }
 
+        public RetryingListener getRetryingListenerForNextRetry() {
+            final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
+            final RetryingListener retryingListener = new DefaultRetryingListener(nextDelayMillisBound, caughtExceptions);
+            return retryingListener;
+        }
+
+        public long getRetryDelay() {
+            return Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
+        }
+    }
+
+    private abstract class RetryingListener implements ActionListener<Response> {
+
+        private static final int MAX_EXCEPTIONS = 4;
+
+        private ArrayDeque<Exception> caughtExceptions;
+
+        private RetryingListener(ArrayDeque<Exception> caughtExceptions) {
+            this.caughtExceptions = caughtExceptions;
+        }
+
         @Override
         public void onResponse(Response response) {
             if (isDone.compareAndSet(false, true)) {
@@ -162,6 +210,10 @@ public void onResponse(Response response) {
             }
         }
 
+        public abstract RetryingListener getRetryingListenerForNextRetry();
+
+        public abstract long getRetryDelay();
+
         @Override
         public void onFailure(Exception e) {
             if (shouldRetry(e)) {
@@ -175,10 +227,9 @@ public void onFailure(Exception e) {
                 } else {
                     addException(e);
 
-                    final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
-                    final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
+                    final long delayMillis = getRetryDelay();
+                    final RetryingListener retryingListener = getRetryingListenerForNextRetry();
                     final Runnable runnable = createRunnable(retryingListener);
-                    final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
                     if (isDone.get() == false) {
                         final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
                         logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
index 4744107d28854..c873dd221dcd0 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
@@ -41,6 +41,7 @@
 import org.opensearch.action.ActionRunnable;
 import org.opensearch.action.support.ActionFilters;
 import org.opensearch.action.support.HandledTransportAction;
+import org.opensearch.action.support.RetryableAction;
 import org.opensearch.cluster.ClusterState;
 import org.opensearch.cluster.ClusterStateObserver;
 import org.opensearch.cluster.MasterNodeChangePredicate;
@@ -51,6 +52,7 @@
 import org.opensearch.cluster.node.DiscoveryNode;
 import org.opensearch.cluster.node.DiscoveryNodes;
 import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.io.stream.StreamInput;
 import org.opensearch.common.io.stream.Writeable;
 import org.opensearch.common.unit.TimeValue;
@@ -137,7 +139,7 @@ protected void doExecute(Task task, final Request request, ActionListener<Respon
         if (task != null) {
             request.setParentTask(clusterService.localNode().getId(), task.getId());
         }
-        new AsyncSingleAction(task, request, listener).start();
+        new AsyncSingleAction(task, request, listener).run();
     }
 
     /**
@@ -145,27 +147,42 @@ protected void doExecute(Task task, final Request request, ActionListener<Respon
      *
      * @opensearch.internal
      */
-    class AsyncSingleAction {
+    class AsyncSingleAction extends RetryableAction {
 
-        private final ActionListener<Response> listener;
+        private ActionListener<Response> listener;
         private final Request request;
         private ClusterStateObserver observer;
         private final long startTime;
         private final Task task;
+        private boolean localRequest;
 
         AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
+            super(logger, threadPool, TimeValue.timeValueMillis(10), request.clusterManagerNodeTimeout, listener);
             this.task = task;
             this.request = request;
-            this.listener = new MasterThrottlingRetryListener(actionName, request, this::start, listener);
             this.startTime = threadPool.relativeTimeInMillis();
+            localRequest = !request.remoteRequest;
         }
 
-        public void start() {
+        @Override
+        public void tryAction(ActionListener retryListener) {
             ClusterState state = clusterService.state();
             logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
+            this.listener = retryListener;
             doStart(state);
         }
 
+        @Override
+        public boolean shouldRetry(Exception e) {
+            if (localRequest) {
+                if (e instanceof TransportException) {
+                    return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
+                }
+                return e instanceof MasterTaskThrottlingException;
+            }
+            return false;
+        }
+
         protected void doStart(ClusterState clusterState) {
             try {
                 final DiscoveryNodes nodes = clusterState.nodes();
diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
index b2e1b7c32b2f7..f2c521d160ecd 100644
--- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
+++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
@@ -609,7 +609,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
     }
 
     public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException {
-        Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60));
+        Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60));
         DiscoveryNode masterNode = this.remoteNode;
         setState(
             clusterService,

From a9cd2d19f4fcba61ec485f5005400d5c2c8c6bce Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Wed, 24 Aug 2022 12:45:46 +0530
Subject: [PATCH 3/9] Incorporated comments 08/24

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../opensearch/action/bulk/BackoffPolicy.java |  73 --------
 .../action/support/RetryPolicy.java           | 145 +++++++++++++++
 .../action/support/RetryableAction.java       |  81 ++------
 .../MasterThrottlingRetryListener.java        | 131 -------------
 .../TransportClusterManagerNodeAction.java    |  13 +-
 .../action/bulk/BackoffPolicyTests.java       |  23 ---
 .../action/support/RetryPolicyTests.java      |  58 ++++++
 .../MasterThrottlingRetryListenerTests.java   | 175 ------------------
 ...ransportClusterManagerNodeActionTests.java |  54 +++++-
 9 files changed, 283 insertions(+), 470 deletions(-)
 create mode 100644 server/src/main/java/org/opensearch/action/support/RetryPolicy.java
 delete mode 100644 server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java
 create mode 100644 server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java
 delete mode 100644 server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java

diff --git a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
index 15200417d28ac..0b22ee04141ed 100644
--- a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
+++ b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
@@ -31,7 +31,6 @@
 
 package org.opensearch.action.bulk;
 
-import org.opensearch.common.Randomness;
 import org.opensearch.common.unit.TimeValue;
 
 import java.util.Iterator;
@@ -106,19 +105,6 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
         return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
     }
 
-    /**
-     *  It provides exponential backoff between retries until it reaches maxDelayForRetry.
-     *  It uses equal jitter scheme as it is being used for throttled exceptions.
-     *  It will make random distribution and also guarantees a minimum delay.
-     *
-     * @param baseDelay BaseDelay for exponential Backoff
-     * @param maxDelayForRetry MaxDelay that can be returned from backoff policy
-     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
-     */
-    public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
-        return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
-    }
-
     /**
      * Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
      */
@@ -211,65 +197,6 @@ public TimeValue next() {
         }
     }
 
-    private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
-        private final int maxDelayForRetry;
-        private final int baseDelay;
-
-        private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
-            this.maxDelayForRetry = maxDelayForRetry;
-            this.baseDelay = baseDelay;
-        }
-
-        @Override
-        public Iterator<TimeValue> iterator() {
-            return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry);
-        }
-    }
-
-    private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
-        /**
-         * Retry limit to avoids integer overflow issues.
-         * Post this limit, max delay will be returned with Equal Jitter.
-         *
-         * NOTE: If the value is greater than 30, there can be integer overflow
-         * issues during delay calculation.
-         **/
-        private final int RETRIES_TILL_JITTER_INCREASE = 30;
-
-        /**
-         * Exponential increase in delay will happen till it reaches maxDelayForRetry.
-         * Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
-         * and not increase the delay.
-         */
-        private final int maxDelayForRetry;
-        private final int baseDelay;
-        private int retriesAttempted;
-
-        private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
-            this.baseDelay = baseDelay;
-            this.maxDelayForRetry = maxDelayForRetry;
-        }
-
-        /**
-         * There is not any limit for this BackOff.
-         * This Iterator will always return back off delay.
-         *
-         * @return true
-         */
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public TimeValue next() {
-            int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE);
-            int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry);
-            retriesAttempted++;
-            return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1));
-        }
-    }
-
     /**
      * Concrete Constant Back Off Policy
      *
diff --git a/server/src/main/java/org/opensearch/action/support/RetryPolicy.java b/server/src/main/java/org/opensearch/action/support/RetryPolicy.java
new file mode 100644
index 0000000000000..02790eb812283
--- /dev/null
+++ b/server/src/main/java/org/opensearch/action/support/RetryPolicy.java
@@ -0,0 +1,145 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.action.support;
+
+import org.opensearch.common.Randomness;
+import org.opensearch.common.unit.TimeValue;
+
+import java.util.Iterator;
+
+/**
+ * Various RetryPolicies for performing retry.
+ */
+public abstract class RetryPolicy implements Iterable<TimeValue> {
+    /**
+     *  It provides exponential backoff between retries until it reaches maxDelayForRetry.
+     *  It uses equal jitter scheme as it is being used for throttled exceptions.
+     *  It will make random distribution and also guarantees a minimum delay.
+     *
+     * @param baseDelay BaseDelay for exponential Backoff
+     * @param maxDelayForRetry MaxDelay that can be returned from backoff policy
+     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
+     */
+    public static RetryPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
+        return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
+    }
+
+    /**
+     *  It provides exponential backoff between retries until it reaches Integer.MAX_VALUE.
+     *  It will make random distribution of delay.
+     *
+     * @param baseDelay BaseDelay for exponential Backoff
+     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
+     */
+    public static RetryPolicy exponentialBackoff(long baseDelay) {
+        return new ExponentialBackoff(baseDelay);
+    }
+
+    private static class ExponentialEqualJitterBackoff extends RetryPolicy {
+        private final int maxDelayForRetry;
+        private final int baseDelay;
+
+        private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
+            this.maxDelayForRetry = maxDelayForRetry;
+            this.baseDelay = baseDelay;
+        }
+
+        @Override
+        public Iterator<TimeValue> iterator() {
+            return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry);
+        }
+    }
+
+    private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
+        /**
+         * Retry limit to avoids integer overflow issues.
+         * Post this limit, max delay will be returned with Equal Jitter.
+         *
+         * NOTE: If the value is greater than 30, there can be integer overflow
+         * issues during delay calculation.
+         **/
+        private final int RETRIES_TILL_JITTER_INCREASE = 30;
+
+        /**
+         * Exponential increase in delay will happen till it reaches maxDelayForRetry.
+         * Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
+         * and not increase the delay.
+         */
+        private final int maxDelayForRetry;
+        private final int baseDelay;
+        private int retriesAttempted;
+
+        private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
+            this.baseDelay = baseDelay;
+            this.maxDelayForRetry = maxDelayForRetry;
+        }
+
+        /**
+         * There is not any limit for this BackOff.
+         * This Iterator will always return back off delay.
+         *
+         * @return true
+         */
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public TimeValue next() {
+            int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE);
+            int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry);
+            retriesAttempted++;
+            return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1));
+        }
+    }
+
+    private static class ExponentialBackoff extends RetryPolicy {
+        private final long baseDelay;
+
+        private ExponentialBackoff(long baseDelay) {
+            this.baseDelay = baseDelay;
+        }
+
+        @Override
+        public Iterator<TimeValue> iterator() {
+            return new ExponentialBackoffIterator(baseDelay);
+        }
+    }
+
+    private static class ExponentialBackoffIterator implements Iterator<TimeValue> {
+        /**
+         * Current delay in exponential backoff
+         */
+        private long currentDelay;
+
+        private ExponentialBackoffIterator(long baseDelay) {
+            this.currentDelay = baseDelay;
+        }
+
+        /**
+         * There is not any limit for this BackOff.
+         * This Iterator will always return back off delay.
+         *
+         * @return true
+         */
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public TimeValue next() {
+            TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1);
+            currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE);
+            return delayToReturn;
+        }
+    }
+
+}
diff --git a/server/src/main/java/org/opensearch/action/support/RetryableAction.java b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
index f24a1b8f1bd44..71dca9585ad38 100644
--- a/server/src/main/java/org/opensearch/action/support/RetryableAction.java
+++ b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
@@ -36,8 +36,6 @@
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.ActionRunnable;
-import org.opensearch.action.bulk.BackoffPolicy;
-import org.opensearch.common.Randomness;
 import org.opensearch.common.unit.TimeValue;
 import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
 import org.opensearch.threadpool.Scheduler;
@@ -66,7 +64,7 @@ public abstract class RetryableAction<Response> {
     private final long startMillis;
     private final ActionListener<Response> finalListener;
     private final String executor;
-    private final BackoffPolicy backoffPolicy;
+    private final RetryPolicy retryPolicy;
 
     private volatile Scheduler.ScheduledCancellable retryTask;
 
@@ -77,7 +75,15 @@ public RetryableAction(
         TimeValue timeoutValue,
         ActionListener<Response> listener
     ) {
-        this(logger, threadPool, initialDelay, timeoutValue, listener, null, ThreadPool.Names.SAME);
+        this(
+            logger,
+            threadPool,
+            initialDelay,
+            timeoutValue,
+            listener,
+            RetryPolicy.exponentialBackoff(initialDelay.getMillis()),
+            ThreadPool.Names.SAME
+        );
     }
 
     public RetryableAction(
@@ -86,7 +92,7 @@ public RetryableAction(
         TimeValue initialDelay,
         TimeValue timeoutValue,
         ActionListener<Response> listener,
-        BackoffPolicy backoffPolicy,
+        RetryPolicy retryPolicy,
         String executor
     ) {
         this.logger = logger;
@@ -99,13 +105,11 @@ public RetryableAction(
         this.startMillis = threadPool.relativeTimeInMillis();
         this.finalListener = listener;
         this.executor = executor;
-        this.backoffPolicy = backoffPolicy;
+        this.retryPolicy = retryPolicy;
     }
 
     public void run() {
-        final RetryingListener retryingListener = backoffPolicy == null
-            ? new DefaultRetryingListener(initialDelayMillis, null)
-            : new CustomRetryingListener(backoffPolicy.iterator(), null);
+        final RetryingListener retryingListener = new RetryingListener(retryPolicy.iterator(), null);
         final Runnable runnable = createRunnable(retryingListener);
         threadPool.executor(executor).execute(runnable);
     }
@@ -149,57 +153,16 @@ public void onRejection(Exception e) {
 
     public void onFinished() {}
 
-    private class CustomRetryingListener extends RetryingListener {
-
-        private Iterator<TimeValue> backoffDelayIterator;
-        private ArrayDeque<Exception> caughtExceptions;
-
-        private CustomRetryingListener(Iterator<TimeValue> backoffDelayIterator, ArrayDeque<Exception> caughtExceptions) {
-            super(caughtExceptions);
-            this.backoffDelayIterator = backoffDelayIterator;
-            this.caughtExceptions = caughtExceptions;
-        }
-
-        public RetryingListener getRetryingListenerForNextRetry() {
-            return this;
-        }
-
-        public long getRetryDelay() {
-            return backoffDelayIterator.next().millis();
-        }
-
-    }
-
-    private class DefaultRetryingListener extends RetryingListener {
-
-        private final long delayMillisBound;
-        private ArrayDeque<Exception> caughtExceptions;
-
-        private DefaultRetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
-            super(caughtExceptions);
-            this.delayMillisBound = delayMillisBound;
-            this.caughtExceptions = caughtExceptions;
-        }
-
-        public RetryingListener getRetryingListenerForNextRetry() {
-            final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
-            final RetryingListener retryingListener = new DefaultRetryingListener(nextDelayMillisBound, caughtExceptions);
-            return retryingListener;
-        }
-
-        public long getRetryDelay() {
-            return Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
-        }
-    }
-
-    private abstract class RetryingListener implements ActionListener<Response> {
+    private class RetryingListener implements ActionListener<Response> {
 
         private static final int MAX_EXCEPTIONS = 4;
 
         private ArrayDeque<Exception> caughtExceptions;
+        private Iterator<TimeValue> backoffDelayIterator;
 
-        private RetryingListener(ArrayDeque<Exception> caughtExceptions) {
+        private RetryingListener(Iterator<TimeValue> backoffDelayIterator, ArrayDeque<Exception> caughtExceptions) {
             this.caughtExceptions = caughtExceptions;
+            this.backoffDelayIterator = backoffDelayIterator;
         }
 
         @Override
@@ -210,10 +173,6 @@ public void onResponse(Response response) {
             }
         }
 
-        public abstract RetryingListener getRetryingListenerForNextRetry();
-
-        public abstract long getRetryDelay();
-
         @Override
         public void onFailure(Exception e) {
             if (shouldRetry(e)) {
@@ -227,11 +186,9 @@ public void onFailure(Exception e) {
                 } else {
                     addException(e);
 
-                    final long delayMillis = getRetryDelay();
-                    final RetryingListener retryingListener = getRetryingListenerForNextRetry();
-                    final Runnable runnable = createRunnable(retryingListener);
+                    final TimeValue delay = backoffDelayIterator.next();
+                    final Runnable runnable = createRunnable(this);
                     if (isDone.get() == false) {
-                        final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
                         logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
                         try {
                             retryTask = threadPool.schedule(runnable, delay, executor);
diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java b/server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java
deleted file mode 100644
index f287b6680498a..0000000000000
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.action.support.clustermanager;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.opensearch.action.ActionListener;
-import org.opensearch.action.ActionResponse;
-import org.opensearch.action.bulk.BackoffPolicy;
-import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
-import org.opensearch.cluster.service.MasterTaskThrottlingException;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.threadpool.Scheduler;
-import org.opensearch.transport.TransportException;
-
-import java.util.Iterator;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ActionListener for retrying the Throttled master tasks.
- * It schedules the retry on the Throttling Exception from master node and
- * delegates the response if it receive response from master.
- *
- * It uses ExponentialEqualJitterBackoff policy for determining delay between retries.
- */
-public class MasterThrottlingRetryListener<Request extends ClusterManagerNodeRequest<Request>, Response extends ActionResponse>
-    implements
-        ActionListener<Response> {
-
-    private static final Logger logger = LogManager.getLogger(MasterThrottlingRetryListener.class);
-
-    /**
-     * Base delay in millis.
-     */
-    private final int BASE_DELAY_MILLIS = 10;
-
-    /**
-     * Maximum delay in millis.
-     */
-    private final int MAX_DELAY_MILLIS = 5000;
-
-    private long totalDelay;
-    private final Iterator<TimeValue> backoffDelay;
-    private final ActionListener<Response> listener;
-    private final Request request;
-    private final Runnable runnable;
-    private final String actionName;
-    private final boolean localNodeRequest;
-
-    private static ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
-
-    public MasterThrottlingRetryListener(String actionName, Request request, Runnable runnable, ActionListener<Response> actionListener) {
-        this.actionName = actionName;
-        this.listener = actionListener;
-        this.request = request;
-        this.runnable = runnable;
-        this.backoffDelay = BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS).iterator();
-        /**
-         This is to determine whether request is generated from local node or from remote node.
-         If it is local node's request we need to perform the retries on this node.
-         If it is remote node's request, we will not perform retries on this node and let remote node perform the retries.
-
-         If request is from remote data node, then data node will set remoteRequest flag in {@link MasterNodeRequest}
-         and send request to master, using that on master node we can determine if the request was localRequest or remoteRequest.
-         */
-        this.localNodeRequest = !(request.isRemoteRequest());
-    }
-
-    @Override
-    public void onResponse(Response response) {
-        listener.onResponse(response);
-    }
-
-    @Override
-    public void onFailure(Exception e) {
-
-        if (localNodeRequest && isThrottlingException(e)) {
-            logger.info("Retrying [{}] on throttling exception from master. Error: [{}]", actionName, getExceptionMessage(e));
-            long delay = backoffDelay.next().getMillis();
-            if (totalDelay + delay >= request.clusterManagerNodeTimeout.getMillis()) {
-                delay = request.clusterManagerNodeTimeout.getMillis() - totalDelay;
-                scheduler.schedule(new Runnable() {
-                    @Override
-                    public void run() {
-                        listener.onFailure(new ProcessClusterEventTimeoutException(request.clusterManagerNodeTimeout, actionName));
-                    }
-                }, delay, TimeUnit.MILLISECONDS);
-            } else {
-                scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-            }
-            totalDelay += delay;
-        } else {
-            listener.onFailure(e);
-        }
-    }
-
-    /**
-     * For Testcase purposes.
-     * @param retrySceduler scheduler defined in test cases.
-     */
-    public static void setThrottlingRetryScheduler(ScheduledThreadPoolExecutor retrySceduler) {
-        scheduler = retrySceduler;
-    }
-
-    private boolean isThrottlingException(Exception e) {
-        if (e instanceof TransportException) {
-            return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
-        }
-        return e instanceof MasterTaskThrottlingException;
-    }
-
-    private String getExceptionMessage(Exception e) {
-        if (e instanceof TransportException) {
-            return ((TransportException) e).unwrapCause().getMessage();
-        } else {
-            return e.getMessage();
-        }
-    }
-
-    public static long getRetryingTasksCount() {
-        return scheduler.getActiveCount() + scheduler.getQueue().size();
-    }
-}
diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
index c873dd221dcd0..349ab11dccfef 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
@@ -41,6 +41,7 @@
 import org.opensearch.action.ActionRunnable;
 import org.opensearch.action.support.ActionFilters;
 import org.opensearch.action.support.HandledTransportAction;
+import org.opensearch.action.support.RetryPolicy;
 import org.opensearch.action.support.RetryableAction;
 import org.opensearch.cluster.ClusterState;
 import org.opensearch.cluster.ClusterStateObserver;
@@ -155,9 +156,19 @@ class AsyncSingleAction extends RetryableAction {
         private final long startTime;
         private final Task task;
         private boolean localRequest;
+        private static final int BASE_DELAY_MILLIS = 10;
+        private static final int MAX_DELAY_MILLIS = 5000;
 
         AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
-            super(logger, threadPool, TimeValue.timeValueMillis(10), request.clusterManagerNodeTimeout, listener);
+            super(
+                logger,
+                threadPool,
+                TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
+                request.clusterManagerNodeTimeout,
+                listener,
+                RetryPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
+                ThreadPool.Names.SAME
+            );
             this.task = task;
             this.request = request;
             this.startTime = threadPool.relativeTimeInMillis();
diff --git a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
index 0daf74acc9ae9..1b7d848b626fe 100644
--- a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
@@ -75,27 +75,4 @@ public void testWrapBackoffPolicy() {
             assertEquals(expectedRetries, retries.get());
         }
     }
-
-    public void testEqualJitterExponentialBackOffPolicy() {
-        int baseDelay = 10;
-        int maxDelay = 10000;
-        BackoffPolicy policy = BackoffPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay);
-        Iterator<TimeValue> iterator = policy.iterator();
-
-        // Assert equal jitter
-        int retriesTillMaxDelay = 10;
-        for (int i = 0; i < retriesTillMaxDelay; i++) {
-            TimeValue delay = iterator.next();
-            assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2);
-            assertTrue(delay.getMillis() <= baseDelay * (1L << i));
-        }
-
-        // Now policy should return max delay for next retries.
-        int retriesAfterMaxDelay = randomInt(10);
-        for (int i = 0; i < retriesAfterMaxDelay; i++) {
-            TimeValue delay = iterator.next();
-            assertTrue(delay.getMillis() >= maxDelay / 2);
-            assertTrue(delay.getMillis() <= maxDelay);
-        }
-    }
 }
diff --git a/server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java b/server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java
new file mode 100644
index 0000000000000..107d09c44d60a
--- /dev/null
+++ b/server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java
@@ -0,0 +1,58 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.action.support;
+
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.util.Iterator;
+
+public class RetryPolicyTests extends OpenSearchTestCase {
+
+    public void testEqualJitterExponentialBackOffPolicy() {
+        int baseDelay = 10;
+        int maxDelay = 10000;
+        RetryPolicy policy = RetryPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay);
+        Iterator<TimeValue> iterator = policy.iterator();
+
+        // Assert equal jitter
+        int retriesTillMaxDelay = 10;
+        for (int i = 0; i < retriesTillMaxDelay; i++) {
+            TimeValue delay = iterator.next();
+            assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2);
+            assertTrue(delay.getMillis() <= baseDelay * (1L << i));
+        }
+
+        // Now policy should return max delay for next retries.
+        int retriesAfterMaxDelay = randomInt(10);
+        for (int i = 0; i < retriesAfterMaxDelay; i++) {
+            TimeValue delay = iterator.next();
+            assertTrue(delay.getMillis() >= maxDelay / 2);
+            assertTrue(delay.getMillis() <= maxDelay);
+        }
+    }
+
+    public void testExponentialBackOffPolicy() {
+        long baseDelay = 10;
+        int maxDelay = 10000;
+        long currentDelay = baseDelay;
+        RetryPolicy policy = RetryPolicy.exponentialBackoff(baseDelay);
+        Iterator<TimeValue> iterator = policy.iterator();
+
+        // Assert equal jitter
+        int numberOfRetries = randomInt(20);
+
+        for (int i = 0; i < numberOfRetries; i++) {
+            TimeValue delay = iterator.next();
+            assertTrue(delay.getMillis() >= 0);
+            assertTrue(delay.getMillis() <= currentDelay);
+            currentDelay = currentDelay * 2;
+        }
+    }
+}
diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java
deleted file mode 100644
index dc46590d60951..0000000000000
--- a/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.action.support.clustermanager;
-
-import org.opensearch.action.ActionListener;
-import org.opensearch.action.support.PlainActionFuture;
-import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
-import org.opensearch.cluster.service.MasterTaskThrottlingException;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
-import org.opensearch.test.OpenSearchTestCase;
-import org.opensearch.threadpool.Scheduler;
-
-import java.time.Instant;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.AfterClass;
-import org.junit.Before;
-
-/**
- * Test class of {@link MasterThrottlingRetryListener}
- */
-public class MasterThrottlingRetryListenerTests extends OpenSearchTestCase {
-    private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY);
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler);
-    }
-
-    @AfterClass
-    public static void afterClass() {
-        Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS);
-    }
-
-    public void testRetryForLocalRequest() throws BrokenBarrierException, InterruptedException {
-        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
-        PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
-        CyclicBarrier barrier = new CyclicBarrier(2);
-        AtomicBoolean callBackExecuted = new AtomicBoolean();
-        Runnable runnable = new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    callBackExecuted.set(true);
-                    barrier.await();
-                } catch (Exception e) {
-                    new AssertionError();
-                }
-            }
-        };
-
-        ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);
-
-        taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
-        barrier.await();
-        assertTrue(callBackExecuted.get());
-    }
-
-    public void testRetryForRemoteRequest() throws BrokenBarrierException, InterruptedException {
-        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
-        request.setRemoteRequest(true);
-        PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
-        AtomicBoolean callBackExecuted = new AtomicBoolean(false);
-        Runnable runnable = new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    callBackExecuted.set(true);
-                } catch (Exception e) {
-                    new AssertionError();
-                }
-            }
-        };
-
-        ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);
-
-        taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
-        Thread.sleep(100); // some buffer time so callback can execute.
-        assertFalse(callBackExecuted.get());
-    }
-
-    public void testTimedOut() throws BrokenBarrierException, InterruptedException {
-
-        CyclicBarrier barrier = new CyclicBarrier(2);
-        AtomicBoolean onFailureExecuted = new AtomicBoolean();
-        AtomicBoolean retryExecuted = new AtomicBoolean();
-        AtomicBoolean firstExecute = new AtomicBoolean(true);
-        int timeOutSec = randomIntBetween(1, 5);
-        final Instant[] startTime = new Instant[1];
-        final Instant[] endTime = new Instant[1];
-
-        ActionListener listener = new ActionListener() {
-            @Override
-            public void onResponse(Object o) {
-                new AssertionError();
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                endTime[0] = Instant.now();
-                try {
-                    onFailureExecuted.set(true);
-                    barrier.await();
-                } catch (Exception exe) {
-                    new AssertionError();
-                }
-                assertEquals(ProcessClusterEventTimeoutException.class, e.getClass());
-            }
-        };
-        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request()
-            .clusterManagerNodeTimeout(TimeValue.timeValueSeconds(timeOutSec));
-
-        class TestRetryClass {
-            ActionListener listener;
-
-            TestRetryClass(ActionListener listener) {
-                this.listener = new MasterThrottlingRetryListener("test", request, this::execute, listener);
-            }
-
-            public void execute() {
-                if (firstExecute.getAndSet(false)) {
-                    startTime[0] = Instant.now();
-                }
-                listener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
-            }
-        }
-
-        TestRetryClass testRetryClass = new TestRetryClass(listener);
-        testRetryClass.execute();
-
-        barrier.await();
-        assertEquals(timeOutSec, (endTime[0].toEpochMilli() - startTime[0].toEpochMilli()) / 1000);
-        assertTrue(onFailureExecuted.get());
-        assertFalse(retryExecuted.get());
-    }
-
-    public void testRetryForDifferentException() {
-
-        TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
-        PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
-        AtomicBoolean callBackExecuted = new AtomicBoolean();
-        Runnable runnable = new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    callBackExecuted.set(true);
-                } catch (Exception e) {
-                    new AssertionError();
-                }
-            }
-        };
-
-        ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);
-
-        taskRetryListener.onFailure(new Exception());
-        assertFalse(callBackExecuted.get());
-
-        taskRetryListener.onFailure(new OpenSearchRejectedExecutionException("Different Exception"));
-        assertFalse(callBackExecuted.get());
-    }
-}
diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
index f2c521d160ecd..ac999df6499f0 100644
--- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
+++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
@@ -67,7 +67,6 @@
 import org.opensearch.test.transport.CapturingTransport;
 import org.opensearch.threadpool.TestThreadPool;
 import org.opensearch.threadpool.ThreadPool;
-import org.opensearch.threadpool.Scheduler;
 import org.opensearch.transport.ConnectTransportException;
 import org.opensearch.transport.TransportService;
 import org.junit.After;
@@ -85,7 +84,6 @@
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static org.opensearch.test.ClusterServiceUtils.createClusterService;
 import static org.opensearch.test.ClusterServiceUtils.setState;
@@ -101,7 +99,6 @@ public class TransportClusterManagerNodeActionTests extends OpenSearchTestCase {
     private DiscoveryNode localNode;
     private DiscoveryNode remoteNode;
     private DiscoveryNode[] allNodes;
-    private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY);
 
     @BeforeClass
     public static void beforeClass() {
@@ -139,7 +136,6 @@ public void setUp() throws Exception {
             Version.CURRENT
         );
         allNodes = new DiscoveryNode[] { localNode, remoteNode };
-        MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler);
     }
 
     @After
@@ -152,7 +148,6 @@ public void tearDown() throws Exception {
     @AfterClass
     public static void afterClass() {
         ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
-        Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS);
         threadPool = null;
     }
 
@@ -648,4 +643,53 @@ public void testThrottlingRetryRemoteMaster() throws ExecutionException, Interru
         assertTrue(listener.isDone());
         listener.get();
     }
+
+    public void testRetryForDifferentException() throws InterruptedException, BrokenBarrierException {
+        Request request = new Request();
+        PlainActionFuture<Response> listener = new PlainActionFuture<>();
+        AtomicBoolean exception = new AtomicBoolean(true);
+        AtomicBoolean retried = new AtomicBoolean(false);
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode }));
+
+        TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) {
+            @Override
+            protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
+                throws Exception {
+                if (exception.getAndSet(false)) {
+                    throw new Exception("Different excetion");
+                } else {
+                    // If called second time due to retry, throw exception
+                    retried.set(true);
+                    throw new AssertionError("Should not retry for other exception");
+                }
+            }
+        };
+        action.execute(request, listener);
+
+        assertFalse(retried.get());
+        assertFalse(exception.get());
+    }
+
+    public void testShouldRetry() {
+        AtomicBoolean exception = new AtomicBoolean(true);
+        AtomicBoolean retried = new AtomicBoolean(false);
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) {
+            @Override
+            protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
+                if (exception.getAndSet(false)) {
+                    throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test");
+                } else {
+                    try {
+                        retried.set(true);
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new AssertionError();
+                    }
+                }
+            }
+        };
+
+    }
 }

From 3ae89ea40a50683cbd1db1c18f61561fd6cc4c31 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Tue, 30 Aug 2022 19:59:26 +0530
Subject: [PATCH 4/9] Moved back backoff policy to BackOffPolicy class

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../opensearch/action/bulk/BackoffPolicy.java | 126 +++++++++++++++
 .../action/support/RetryPolicy.java           | 145 ------------------
 .../action/support/RetryableAction.java       |  11 +-
 .../TransportClusterManagerNodeAction.java    |   4 +-
 .../action/bulk/BackoffPolicyTests.java       |  41 +++++
 .../action/support/RetryPolicyTests.java      |  58 -------
 6 files changed, 175 insertions(+), 210 deletions(-)
 delete mode 100644 server/src/main/java/org/opensearch/action/support/RetryPolicy.java
 delete mode 100644 server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java

diff --git a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
index 0b22ee04141ed..ac4062b342e50 100644
--- a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
+++ b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
@@ -31,6 +31,7 @@
 
 package org.opensearch.action.bulk;
 
+import org.opensearch.common.Randomness;
 import org.opensearch.common.unit.TimeValue;
 
 import java.util.Iterator;
@@ -105,6 +106,30 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
         return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
     }
 
+    /**
+     *  It provides exponential backoff between retries until it reaches maxDelayForRetry.
+     *  It uses equal jitter scheme as it is being used for throttled exceptions.
+     *  It will make random distribution and also guarantees a minimum delay.
+     *
+     * @param baseDelay BaseDelay for exponential Backoff
+     * @param maxDelayForRetry MaxDelay that can be returned from backoff policy
+     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
+     */
+    public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
+        return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
+    }
+
+    /**
+     *  It provides exponential backoff between retries until it reaches Integer.MAX_VALUE.
+     *  It will make random distribution of delay.
+     *
+     * @param baseDelay BaseDelay for exponential Backoff
+     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
+     */
+    public static BackoffPolicy exponentialRandomBackoff(long baseDelay) {
+        return new ExponentialRandomBackoff(baseDelay);
+    }
+
     /**
      * Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
      */
@@ -197,6 +222,107 @@ public TimeValue next() {
         }
     }
 
+    private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
+        private final int maxDelayForRetry;
+        private final int baseDelay;
+
+        private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
+            this.maxDelayForRetry = maxDelayForRetry;
+            this.baseDelay = baseDelay;
+        }
+
+        @Override
+        public Iterator<TimeValue> iterator() {
+            return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry);
+        }
+    }
+
+    private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
+        /**
+         * Retry limit to avoids integer overflow issues.
+         * Post this limit, max delay will be returned with Equal Jitter.
+         *
+         * NOTE: If the value is greater than 30, there can be integer overflow
+         * issues during delay calculation.
+         **/
+        private final int RETRIES_TILL_JITTER_INCREASE = 30;
+
+        /**
+         * Exponential increase in delay will happen till it reaches maxDelayForRetry.
+         * Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
+         * and not increase the delay.
+         */
+        private final int maxDelayForRetry;
+        private final int baseDelay;
+        private int retriesAttempted;
+
+        private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
+            this.baseDelay = baseDelay;
+            this.maxDelayForRetry = maxDelayForRetry;
+        }
+
+        /**
+         * There is not any limit for this BackOff.
+         * This Iterator will always return back off delay.
+         *
+         * @return true
+         */
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public TimeValue next() {
+            int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE);
+            int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry);
+            retriesAttempted++;
+            return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1));
+        }
+    }
+
+    private static class ExponentialRandomBackoff extends BackoffPolicy {
+        private final long baseDelay;
+
+        private ExponentialRandomBackoff(long baseDelay) {
+            this.baseDelay = baseDelay;
+        }
+
+        @Override
+        public Iterator<TimeValue> iterator() {
+            return new ExponentialRandomBackoffIterator(baseDelay);
+        }
+    }
+
+    private static class ExponentialRandomBackoffIterator implements Iterator<TimeValue> {
+        /**
+         * Current delay in exponential backoff
+         */
+        private long currentDelay;
+
+        private ExponentialRandomBackoffIterator(long baseDelay) {
+            this.currentDelay = baseDelay;
+        }
+
+        /**
+         * There is not any limit for this BackOff.
+         * This Iterator will always return back off delay.
+         *
+         * @return true
+         */
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public TimeValue next() {
+            TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1);
+            currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE);
+            return delayToReturn;
+        }
+    }
+
     /**
      * Concrete Constant Back Off Policy
      *
diff --git a/server/src/main/java/org/opensearch/action/support/RetryPolicy.java b/server/src/main/java/org/opensearch/action/support/RetryPolicy.java
deleted file mode 100644
index 02790eb812283..0000000000000
--- a/server/src/main/java/org/opensearch/action/support/RetryPolicy.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.action.support;
-
-import org.opensearch.common.Randomness;
-import org.opensearch.common.unit.TimeValue;
-
-import java.util.Iterator;
-
-/**
- * Various RetryPolicies for performing retry.
- */
-public abstract class RetryPolicy implements Iterable<TimeValue> {
-    /**
-     *  It provides exponential backoff between retries until it reaches maxDelayForRetry.
-     *  It uses equal jitter scheme as it is being used for throttled exceptions.
-     *  It will make random distribution and also guarantees a minimum delay.
-     *
-     * @param baseDelay BaseDelay for exponential Backoff
-     * @param maxDelayForRetry MaxDelay that can be returned from backoff policy
-     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
-     */
-    public static RetryPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
-        return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
-    }
-
-    /**
-     *  It provides exponential backoff between retries until it reaches Integer.MAX_VALUE.
-     *  It will make random distribution of delay.
-     *
-     * @param baseDelay BaseDelay for exponential Backoff
-     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
-     */
-    public static RetryPolicy exponentialBackoff(long baseDelay) {
-        return new ExponentialBackoff(baseDelay);
-    }
-
-    private static class ExponentialEqualJitterBackoff extends RetryPolicy {
-        private final int maxDelayForRetry;
-        private final int baseDelay;
-
-        private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
-            this.maxDelayForRetry = maxDelayForRetry;
-            this.baseDelay = baseDelay;
-        }
-
-        @Override
-        public Iterator<TimeValue> iterator() {
-            return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry);
-        }
-    }
-
-    private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
-        /**
-         * Retry limit to avoids integer overflow issues.
-         * Post this limit, max delay will be returned with Equal Jitter.
-         *
-         * NOTE: If the value is greater than 30, there can be integer overflow
-         * issues during delay calculation.
-         **/
-        private final int RETRIES_TILL_JITTER_INCREASE = 30;
-
-        /**
-         * Exponential increase in delay will happen till it reaches maxDelayForRetry.
-         * Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
-         * and not increase the delay.
-         */
-        private final int maxDelayForRetry;
-        private final int baseDelay;
-        private int retriesAttempted;
-
-        private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
-            this.baseDelay = baseDelay;
-            this.maxDelayForRetry = maxDelayForRetry;
-        }
-
-        /**
-         * There is not any limit for this BackOff.
-         * This Iterator will always return back off delay.
-         *
-         * @return true
-         */
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public TimeValue next() {
-            int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE);
-            int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry);
-            retriesAttempted++;
-            return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1));
-        }
-    }
-
-    private static class ExponentialBackoff extends RetryPolicy {
-        private final long baseDelay;
-
-        private ExponentialBackoff(long baseDelay) {
-            this.baseDelay = baseDelay;
-        }
-
-        @Override
-        public Iterator<TimeValue> iterator() {
-            return new ExponentialBackoffIterator(baseDelay);
-        }
-    }
-
-    private static class ExponentialBackoffIterator implements Iterator<TimeValue> {
-        /**
-         * Current delay in exponential backoff
-         */
-        private long currentDelay;
-
-        private ExponentialBackoffIterator(long baseDelay) {
-            this.currentDelay = baseDelay;
-        }
-
-        /**
-         * There is not any limit for this BackOff.
-         * This Iterator will always return back off delay.
-         *
-         * @return true
-         */
-        @Override
-        public boolean hasNext() {
-            return true;
-        }
-
-        @Override
-        public TimeValue next() {
-            TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1);
-            currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE);
-            return delayToReturn;
-        }
-    }
-
-}
diff --git a/server/src/main/java/org/opensearch/action/support/RetryableAction.java b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
index 71dca9585ad38..3c4b2a21d87ad 100644
--- a/server/src/main/java/org/opensearch/action/support/RetryableAction.java
+++ b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
@@ -36,6 +36,7 @@
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.ActionRunnable;
+import org.opensearch.action.bulk.BackoffPolicy;
 import org.opensearch.common.unit.TimeValue;
 import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
 import org.opensearch.threadpool.Scheduler;
@@ -64,7 +65,7 @@ public abstract class RetryableAction<Response> {
     private final long startMillis;
     private final ActionListener<Response> finalListener;
     private final String executor;
-    private final RetryPolicy retryPolicy;
+    private final BackoffPolicy backoffPolicy;
 
     private volatile Scheduler.ScheduledCancellable retryTask;
 
@@ -81,7 +82,7 @@ public RetryableAction(
             initialDelay,
             timeoutValue,
             listener,
-            RetryPolicy.exponentialBackoff(initialDelay.getMillis()),
+            BackoffPolicy.exponentialRandomBackoff(initialDelay.getMillis()),
             ThreadPool.Names.SAME
         );
     }
@@ -92,7 +93,7 @@ public RetryableAction(
         TimeValue initialDelay,
         TimeValue timeoutValue,
         ActionListener<Response> listener,
-        RetryPolicy retryPolicy,
+        BackoffPolicy backoffPolicy,
         String executor
     ) {
         this.logger = logger;
@@ -105,11 +106,11 @@ public RetryableAction(
         this.startMillis = threadPool.relativeTimeInMillis();
         this.finalListener = listener;
         this.executor = executor;
-        this.retryPolicy = retryPolicy;
+        this.backoffPolicy = backoffPolicy;
     }
 
     public void run() {
-        final RetryingListener retryingListener = new RetryingListener(retryPolicy.iterator(), null);
+        final RetryingListener retryingListener = new RetryingListener(backoffPolicy.iterator(), null);
         final Runnable runnable = createRunnable(retryingListener);
         threadPool.executor(executor).execute(runnable);
     }
diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
index 349ab11dccfef..96ac43f1ec180 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
@@ -39,9 +39,9 @@
 import org.opensearch.action.ActionListenerResponseHandler;
 import org.opensearch.action.ActionResponse;
 import org.opensearch.action.ActionRunnable;
+import org.opensearch.action.bulk.BackoffPolicy;
 import org.opensearch.action.support.ActionFilters;
 import org.opensearch.action.support.HandledTransportAction;
-import org.opensearch.action.support.RetryPolicy;
 import org.opensearch.action.support.RetryableAction;
 import org.opensearch.cluster.ClusterState;
 import org.opensearch.cluster.ClusterStateObserver;
@@ -166,7 +166,7 @@ class AsyncSingleAction extends RetryableAction {
                 TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
                 request.clusterManagerNodeTimeout,
                 listener,
-                RetryPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
+                BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
                 ThreadPool.Names.SAME
             );
             this.task = task;
diff --git a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
index 1b7d848b626fe..8bdaf7aae8c57 100644
--- a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
@@ -75,4 +75,45 @@ public void testWrapBackoffPolicy() {
             assertEquals(expectedRetries, retries.get());
         }
     }
+
+    public void testEqualJitterExponentialBackOffPolicy() {
+        int baseDelay = 10;
+        int maxDelay = 10000;
+        BackoffPolicy policy = BackoffPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay);
+        Iterator<TimeValue> iterator = policy.iterator();
+
+        // Assert equal jitter
+        int retriesTillMaxDelay = 10;
+        for (int i = 0; i < retriesTillMaxDelay; i++) {
+            TimeValue delay = iterator.next();
+            assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2);
+            assertTrue(delay.getMillis() <= baseDelay * (1L << i));
+        }
+
+        // Now policy should return max delay for next retries.
+        int retriesAfterMaxDelay = randomInt(10);
+        for (int i = 0; i < retriesAfterMaxDelay; i++) {
+            TimeValue delay = iterator.next();
+            assertTrue(delay.getMillis() >= maxDelay / 2);
+            assertTrue(delay.getMillis() <= maxDelay);
+        }
+    }
+
+    public void testExponentialBackOffPolicy() {
+        long baseDelay = 10;
+        int maxDelay = 10000;
+        long currentDelay = baseDelay;
+        BackoffPolicy policy = BackoffPolicy.exponentialRandomBackoff(baseDelay);
+        Iterator<TimeValue> iterator = policy.iterator();
+
+        // Assert equal jitter
+        int numberOfRetries = randomInt(20);
+
+        for (int i = 0; i < numberOfRetries; i++) {
+            TimeValue delay = iterator.next();
+            assertTrue(delay.getMillis() >= 0);
+            assertTrue(delay.getMillis() <= currentDelay);
+            currentDelay = currentDelay * 2;
+        }
+    }
 }
diff --git a/server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java b/server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java
deleted file mode 100644
index 107d09c44d60a..0000000000000
--- a/server/src/test/java/org/opensearch/action/support/RetryPolicyTests.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.action.support;
-
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.test.OpenSearchTestCase;
-
-import java.util.Iterator;
-
-public class RetryPolicyTests extends OpenSearchTestCase {
-
-    public void testEqualJitterExponentialBackOffPolicy() {
-        int baseDelay = 10;
-        int maxDelay = 10000;
-        RetryPolicy policy = RetryPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay);
-        Iterator<TimeValue> iterator = policy.iterator();
-
-        // Assert equal jitter
-        int retriesTillMaxDelay = 10;
-        for (int i = 0; i < retriesTillMaxDelay; i++) {
-            TimeValue delay = iterator.next();
-            assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2);
-            assertTrue(delay.getMillis() <= baseDelay * (1L << i));
-        }
-
-        // Now policy should return max delay for next retries.
-        int retriesAfterMaxDelay = randomInt(10);
-        for (int i = 0; i < retriesAfterMaxDelay; i++) {
-            TimeValue delay = iterator.next();
-            assertTrue(delay.getMillis() >= maxDelay / 2);
-            assertTrue(delay.getMillis() <= maxDelay);
-        }
-    }
-
-    public void testExponentialBackOffPolicy() {
-        long baseDelay = 10;
-        int maxDelay = 10000;
-        long currentDelay = baseDelay;
-        RetryPolicy policy = RetryPolicy.exponentialBackoff(baseDelay);
-        Iterator<TimeValue> iterator = policy.iterator();
-
-        // Assert equal jitter
-        int numberOfRetries = randomInt(20);
-
-        for (int i = 0; i < numberOfRetries; i++) {
-            TimeValue delay = iterator.next();
-            assertTrue(delay.getMillis() >= 0);
-            assertTrue(delay.getMillis() <= currentDelay);
-            currentDelay = currentDelay * 2;
-        }
-    }
-}

From 45760f5792750c954ab6066353143460b6391406 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Thu, 1 Sep 2022 16:34:24 +0530
Subject: [PATCH 5/9] Used RemoteAddress instead of new field for checking
 localRequest

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../clustermanager/ClusterManagerNodeRequest.java   | 13 -------------
 .../TransportClusterManagerNodeAction.java          |  8 ++++----
 2 files changed, 4 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java b/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java
index d20a4d99cf767..9d8a79cfed11d 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java
@@ -58,21 +58,17 @@ public abstract class ClusterManagerNodeRequest<Request extends ClusterManagerNo
     @Deprecated
     protected TimeValue masterNodeTimeout = clusterManagerNodeTimeout;
 
-    protected boolean remoteRequest;
-
     protected ClusterManagerNodeRequest() {}
 
     protected ClusterManagerNodeRequest(StreamInput in) throws IOException {
         super(in);
         clusterManagerNodeTimeout = in.readTimeValue();
-        remoteRequest = in.readOptionalBoolean();
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
         out.writeTimeValue(clusterManagerNodeTimeout);
-        out.writeOptionalBoolean(remoteRequest);
     }
 
     /**
@@ -114,11 +110,6 @@ public final Request masterNodeTimeout(String timeout) {
         return clusterManagerNodeTimeout(timeout);
     }
 
-    public final Request setRemoteRequest(boolean remoteRequest) {
-        this.remoteRequest = remoteRequest;
-        return (Request) this;
-    }
-
     public final TimeValue clusterManagerNodeTimeout() {
         return this.clusterManagerNodeTimeout;
     }
@@ -128,8 +119,4 @@ public final TimeValue clusterManagerNodeTimeout() {
     public final TimeValue masterNodeTimeout() {
         return clusterManagerNodeTimeout();
     }
-
-    public final boolean isRemoteRequest() {
-        return this.remoteRequest;
-    }
 }
diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
index 96ac43f1ec180..0a5457b891255 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
@@ -155,7 +155,6 @@ class AsyncSingleAction extends RetryableAction {
         private ClusterStateObserver observer;
         private final long startTime;
         private final Task task;
-        private boolean localRequest;
         private static final int BASE_DELAY_MILLIS = 10;
         private static final int MAX_DELAY_MILLIS = 5000;
 
@@ -172,7 +171,6 @@ class AsyncSingleAction extends RetryableAction {
             this.task = task;
             this.request = request;
             this.startTime = threadPool.relativeTimeInMillis();
-            localRequest = !request.remoteRequest;
         }
 
         @Override
@@ -185,7 +183,10 @@ public void tryAction(ActionListener retryListener) {
 
         @Override
         public boolean shouldRetry(Exception e) {
-            if (localRequest) {
+            // If remote address is null, i.e request is generated from same node and we would want to perform retry for it
+            // If remote address is not null, i.e request is generated from remote node and received on this master node on transport layer
+            // in that case we would want throttling retry to perform on remote node only not on this master node.
+            if (request.remoteAddress() == null) {
                 if (e instanceof TransportException) {
                     return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
                 }
@@ -242,7 +243,6 @@ protected void doStart(ClusterState clusterState) {
                     } else {
                         DiscoveryNode clusterManagerNode = nodes.getMasterNode();
                         final String actionName = getClusterManagerActionName(clusterManagerNode);
-                        request.setRemoteRequest(true);
                         transportService.sendRequest(
                             clusterManagerNode,
                             actionName,

From 4bf4e8b7eb47d25be4eca995510a17bd96a98023 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Thu, 1 Sep 2022 18:36:51 +0530
Subject: [PATCH 6/9] Add retryable action for refres-mapping and shard action
 which dont use TransportClusterManagerNodeAction

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../index/NodeMappingRefreshAction.java       | 75 ++++++++++++++++++-
 .../action/shard/ShardStateAction.java        | 58 ++++++++++++++
 .../snapshots/SnapshotResiliencyTests.java    |  2 +-
 3 files changed, 132 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java b/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java
index 5d2f9a5957772..71b1bdfeebf0f 100644
--- a/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java
+++ b/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java
@@ -34,18 +34,24 @@
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.opensearch.action.ActionListener;
 import org.opensearch.action.IndicesRequest;
+import org.opensearch.action.bulk.BackoffPolicy;
 import org.opensearch.action.support.IndicesOptions;
+import org.opensearch.action.support.RetryableAction;
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.cluster.metadata.MetadataMappingService;
 import org.opensearch.cluster.node.DiscoveryNode;
+import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.inject.Inject;
 import org.opensearch.common.io.stream.StreamInput;
 import org.opensearch.common.io.stream.StreamOutput;
+import org.opensearch.common.unit.TimeValue;
 import org.opensearch.tasks.Task;
 import org.opensearch.threadpool.ThreadPool;
 import org.opensearch.transport.EmptyTransportResponseHandler;
 import org.opensearch.transport.TransportChannel;
+import org.opensearch.transport.TransportException;
 import org.opensearch.transport.TransportRequest;
 import org.opensearch.transport.TransportRequestHandler;
 import org.opensearch.transport.TransportResponse;
@@ -66,11 +72,17 @@ public class NodeMappingRefreshAction {
 
     private final TransportService transportService;
     private final MetadataMappingService metadataMappingService;
+    private final ThreadPool threadPool;
 
     @Inject
-    public NodeMappingRefreshAction(TransportService transportService, MetadataMappingService metadataMappingService) {
+    public NodeMappingRefreshAction(
+        TransportService transportService,
+        MetadataMappingService metadataMappingService,
+        ThreadPool threadPool
+    ) {
         this.transportService = transportService;
         this.metadataMappingService = metadataMappingService;
+        this.threadPool = threadPool;
         transportService.registerRequestHandler(
             ACTION_NAME,
             ThreadPool.Names.SAME,
@@ -80,11 +92,70 @@ public NodeMappingRefreshAction(TransportService transportService, MetadataMappi
     }
 
     public void nodeMappingRefresh(final DiscoveryNode clusterManagerNode, final NodeMappingRefreshRequest request) {
+        new NodeMappingRefreshClusterManagerAction(clusterManagerNode, request).run();
+    }
+
+    private void sendNodeMappingRefreshToClusterManager(
+        final DiscoveryNode clusterManagerNode,
+        final NodeMappingRefreshRequest request,
+        ActionListener listener
+    ) {
         if (clusterManagerNode == null) {
             logger.warn("can't send mapping refresh for [{}], no cluster-manager known.", request.index());
             return;
         }
-        transportService.sendRequest(clusterManagerNode, ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
+        transportService.sendRequest(clusterManagerNode, ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
+            @Override
+            public void handleException(TransportException exp) {
+                listener.onFailure(exp);
+            }
+        });
+    }
+
+    /**
+     * RetryableAction for performing retires for cluster manager throttling.
+     */
+    private class NodeMappingRefreshClusterManagerAction extends RetryableAction {
+
+        private final DiscoveryNode clusterManagerNode;
+        private final NodeMappingRefreshRequest request;
+        private static final int BASE_DELAY_MILLIS = 10;
+        private static final int MAX_DELAY_MILLIS = 10;
+
+        private NodeMappingRefreshClusterManagerAction(DiscoveryNode clusterManagerNode, NodeMappingRefreshRequest request) {
+            super(
+                logger,
+                threadPool,
+                TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
+                TimeValue.timeValueMillis(Integer.MAX_VALUE), // Shard tasks are internal and don't have timeout
+                new ActionListener() {
+                    @Override
+                    public void onResponse(Object o) {}
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        logger.warn("Mapping refresh for [{}] failed due to [{}]", request.index, e.getMessage());
+                    }
+                },
+                BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
+                ThreadPool.Names.SAME
+            );
+            this.clusterManagerNode = clusterManagerNode;
+            this.request = request;
+        }
+
+        @Override
+        public void tryAction(ActionListener listener) {
+            sendNodeMappingRefreshToClusterManager(clusterManagerNode, request, listener);
+        }
+
+        @Override
+        public boolean shouldRetry(Exception e) {
+            if (e instanceof TransportException) {
+                return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
+            }
+            return e instanceof MasterTaskThrottlingException;
+        }
     }
 
     /**
diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java
index 93ddd74322ce9..d8aabfe6a3818 100644
--- a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java
+++ b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java
@@ -38,6 +38,8 @@
 import org.opensearch.OpenSearchException;
 import org.opensearch.ExceptionsHelper;
 import org.opensearch.action.ActionListener;
+import org.opensearch.action.bulk.BackoffPolicy;
+import org.opensearch.action.support.RetryableAction;
 import org.opensearch.cluster.ClusterChangedEvent;
 import org.opensearch.cluster.ClusterState;
 import org.opensearch.cluster.ClusterStateObserver;
@@ -55,6 +57,7 @@
 import org.opensearch.cluster.routing.allocation.FailedShard;
 import org.opensearch.cluster.routing.allocation.StaleShard;
 import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.Nullable;
 import org.opensearch.common.Priority;
 import org.opensearch.common.inject.Inject;
@@ -180,6 +183,15 @@ private void sendShardAction(
         final ClusterState currentState,
         final TransportRequest request,
         final ActionListener<Void> listener
+    ) {
+        new ShardStateClusterManagerAction(currentState, actionName, request, listener).run();
+    }
+
+    private void sendShardActionToClusterManager(
+        final String actionName,
+        final ClusterState currentState,
+        final TransportRequest request,
+        final ActionListener<Void> listener
     ) {
         ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
         DiscoveryNode clusterManagerNode = currentState.nodes().getMasterNode();
@@ -222,6 +234,52 @@ public void handleException(TransportException exp) {
         }
     }
 
+    /**
+     * RetryableAction for performing retires for cluster manager throttling.
+     */
+    private class ShardStateClusterManagerAction extends RetryableAction {
+        private final String actionName;
+        private final ActionListener listener;
+        private final ClusterState currentState;
+        private final TransportRequest request;
+        private static final int BASE_DELAY_MILLIS = 10;
+        private static final int MAX_DELAY_MILLIS = 5000;
+
+        private ShardStateClusterManagerAction(
+            ClusterState currentState,
+            String actionName,
+            TransportRequest request,
+            ActionListener listener
+        ) {
+            super(
+                logger,
+                threadPool,
+                TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
+                TimeValue.timeValueMillis(Integer.MAX_VALUE), // Shard tasks are internal and don't have timeout
+                listener,
+                BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
+                ThreadPool.Names.SAME
+            );
+            this.actionName = actionName;
+            this.listener = listener;
+            this.currentState = currentState;
+            this.request = request;
+        }
+
+        @Override
+        public void tryAction(ActionListener listener) {
+            sendShardActionToClusterManager(actionName, currentState, request, listener);
+        }
+
+        @Override
+        public boolean shouldRetry(Exception e) {
+            if (e instanceof TransportException) {
+                return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
+            }
+            return e instanceof MasterTaskThrottlingException;
+        }
+    }
+
     private static Class[] CLUSTER_MANAGER_CHANNEL_EXCEPTIONS = new Class[] {
         NotMasterException.class,
         ConnectTransportException.class,
diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
index 9558e898f8832..1b59876bda575 100644
--- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
@@ -1846,7 +1846,7 @@ public void onFailure(final Exception e) {
                     threadPool,
                     new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService),
                     shardStateAction,
-                    new NodeMappingRefreshAction(transportService, metadataMappingService),
+                    new NodeMappingRefreshAction(transportService, metadataMappingService, threadPool),
                     repositoriesService,
                     mock(SearchService.class),
                     new PeerRecoverySourceService(transportService, indicesService, recoverySettings),

From 0f66ede9c4e729b59405595abad016d0275b7b35 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Fri, 2 Sep 2022 11:10:26 +0530
Subject: [PATCH 7/9] Revert "Add retryable action for refres-mapping and shard
 action which dont use TransportClusterManagerNodeAction"

This reverts commit 4bf4e8b7eb47d25be4eca995510a17bd96a98023.

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../index/NodeMappingRefreshAction.java       | 75 +------------------
 .../action/shard/ShardStateAction.java        | 58 --------------
 .../snapshots/SnapshotResiliencyTests.java    |  2 +-
 3 files changed, 3 insertions(+), 132 deletions(-)

diff --git a/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java b/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java
index 71b1bdfeebf0f..5d2f9a5957772 100644
--- a/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java
+++ b/server/src/main/java/org/opensearch/cluster/action/index/NodeMappingRefreshAction.java
@@ -34,24 +34,18 @@
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.opensearch.action.ActionListener;
 import org.opensearch.action.IndicesRequest;
-import org.opensearch.action.bulk.BackoffPolicy;
 import org.opensearch.action.support.IndicesOptions;
-import org.opensearch.action.support.RetryableAction;
 import org.opensearch.cluster.metadata.IndexMetadata;
 import org.opensearch.cluster.metadata.MetadataMappingService;
 import org.opensearch.cluster.node.DiscoveryNode;
-import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.inject.Inject;
 import org.opensearch.common.io.stream.StreamInput;
 import org.opensearch.common.io.stream.StreamOutput;
-import org.opensearch.common.unit.TimeValue;
 import org.opensearch.tasks.Task;
 import org.opensearch.threadpool.ThreadPool;
 import org.opensearch.transport.EmptyTransportResponseHandler;
 import org.opensearch.transport.TransportChannel;
-import org.opensearch.transport.TransportException;
 import org.opensearch.transport.TransportRequest;
 import org.opensearch.transport.TransportRequestHandler;
 import org.opensearch.transport.TransportResponse;
@@ -72,17 +66,11 @@ public class NodeMappingRefreshAction {
 
     private final TransportService transportService;
     private final MetadataMappingService metadataMappingService;
-    private final ThreadPool threadPool;
 
     @Inject
-    public NodeMappingRefreshAction(
-        TransportService transportService,
-        MetadataMappingService metadataMappingService,
-        ThreadPool threadPool
-    ) {
+    public NodeMappingRefreshAction(TransportService transportService, MetadataMappingService metadataMappingService) {
         this.transportService = transportService;
         this.metadataMappingService = metadataMappingService;
-        this.threadPool = threadPool;
         transportService.registerRequestHandler(
             ACTION_NAME,
             ThreadPool.Names.SAME,
@@ -92,70 +80,11 @@ public NodeMappingRefreshAction(
     }
 
     public void nodeMappingRefresh(final DiscoveryNode clusterManagerNode, final NodeMappingRefreshRequest request) {
-        new NodeMappingRefreshClusterManagerAction(clusterManagerNode, request).run();
-    }
-
-    private void sendNodeMappingRefreshToClusterManager(
-        final DiscoveryNode clusterManagerNode,
-        final NodeMappingRefreshRequest request,
-        ActionListener listener
-    ) {
         if (clusterManagerNode == null) {
             logger.warn("can't send mapping refresh for [{}], no cluster-manager known.", request.index());
             return;
         }
-        transportService.sendRequest(clusterManagerNode, ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
-            @Override
-            public void handleException(TransportException exp) {
-                listener.onFailure(exp);
-            }
-        });
-    }
-
-    /**
-     * RetryableAction for performing retires for cluster manager throttling.
-     */
-    private class NodeMappingRefreshClusterManagerAction extends RetryableAction {
-
-        private final DiscoveryNode clusterManagerNode;
-        private final NodeMappingRefreshRequest request;
-        private static final int BASE_DELAY_MILLIS = 10;
-        private static final int MAX_DELAY_MILLIS = 10;
-
-        private NodeMappingRefreshClusterManagerAction(DiscoveryNode clusterManagerNode, NodeMappingRefreshRequest request) {
-            super(
-                logger,
-                threadPool,
-                TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
-                TimeValue.timeValueMillis(Integer.MAX_VALUE), // Shard tasks are internal and don't have timeout
-                new ActionListener() {
-                    @Override
-                    public void onResponse(Object o) {}
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        logger.warn("Mapping refresh for [{}] failed due to [{}]", request.index, e.getMessage());
-                    }
-                },
-                BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
-                ThreadPool.Names.SAME
-            );
-            this.clusterManagerNode = clusterManagerNode;
-            this.request = request;
-        }
-
-        @Override
-        public void tryAction(ActionListener listener) {
-            sendNodeMappingRefreshToClusterManager(clusterManagerNode, request, listener);
-        }
-
-        @Override
-        public boolean shouldRetry(Exception e) {
-            if (e instanceof TransportException) {
-                return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
-            }
-            return e instanceof MasterTaskThrottlingException;
-        }
+        transportService.sendRequest(clusterManagerNode, ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
     }
 
     /**
diff --git a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java
index d8aabfe6a3818..93ddd74322ce9 100644
--- a/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java
+++ b/server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java
@@ -38,8 +38,6 @@
 import org.opensearch.OpenSearchException;
 import org.opensearch.ExceptionsHelper;
 import org.opensearch.action.ActionListener;
-import org.opensearch.action.bulk.BackoffPolicy;
-import org.opensearch.action.support.RetryableAction;
 import org.opensearch.cluster.ClusterChangedEvent;
 import org.opensearch.cluster.ClusterState;
 import org.opensearch.cluster.ClusterStateObserver;
@@ -57,7 +55,6 @@
 import org.opensearch.cluster.routing.allocation.FailedShard;
 import org.opensearch.cluster.routing.allocation.StaleShard;
 import org.opensearch.cluster.service.ClusterService;
-import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.Nullable;
 import org.opensearch.common.Priority;
 import org.opensearch.common.inject.Inject;
@@ -183,15 +180,6 @@ private void sendShardAction(
         final ClusterState currentState,
         final TransportRequest request,
         final ActionListener<Void> listener
-    ) {
-        new ShardStateClusterManagerAction(currentState, actionName, request, listener).run();
-    }
-
-    private void sendShardActionToClusterManager(
-        final String actionName,
-        final ClusterState currentState,
-        final TransportRequest request,
-        final ActionListener<Void> listener
     ) {
         ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
         DiscoveryNode clusterManagerNode = currentState.nodes().getMasterNode();
@@ -234,52 +222,6 @@ public void handleException(TransportException exp) {
         }
     }
 
-    /**
-     * RetryableAction for performing retires for cluster manager throttling.
-     */
-    private class ShardStateClusterManagerAction extends RetryableAction {
-        private final String actionName;
-        private final ActionListener listener;
-        private final ClusterState currentState;
-        private final TransportRequest request;
-        private static final int BASE_DELAY_MILLIS = 10;
-        private static final int MAX_DELAY_MILLIS = 5000;
-
-        private ShardStateClusterManagerAction(
-            ClusterState currentState,
-            String actionName,
-            TransportRequest request,
-            ActionListener listener
-        ) {
-            super(
-                logger,
-                threadPool,
-                TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
-                TimeValue.timeValueMillis(Integer.MAX_VALUE), // Shard tasks are internal and don't have timeout
-                listener,
-                BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
-                ThreadPool.Names.SAME
-            );
-            this.actionName = actionName;
-            this.listener = listener;
-            this.currentState = currentState;
-            this.request = request;
-        }
-
-        @Override
-        public void tryAction(ActionListener listener) {
-            sendShardActionToClusterManager(actionName, currentState, request, listener);
-        }
-
-        @Override
-        public boolean shouldRetry(Exception e) {
-            if (e instanceof TransportException) {
-                return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
-            }
-            return e instanceof MasterTaskThrottlingException;
-        }
-    }
-
     private static Class[] CLUSTER_MANAGER_CHANNEL_EXCEPTIONS = new Class[] {
         NotMasterException.class,
         ConnectTransportException.class,
diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
index 1b59876bda575..9558e898f8832 100644
--- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java
@@ -1846,7 +1846,7 @@ public void onFailure(final Exception e) {
                     threadPool,
                     new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService),
                     shardStateAction,
-                    new NodeMappingRefreshAction(transportService, metadataMappingService, threadPool),
+                    new NodeMappingRefreshAction(transportService, metadataMappingService),
                     repositoriesService,
                     mock(SearchService.class),
                     new PeerRecoverySourceService(transportService, indicesService, recoverySettings),

From 5a4eaca120e32836765a08cb2b142adb4cda41e6 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Fri, 2 Sep 2022 12:05:18 +0530
Subject: [PATCH 8/9] Incorporated Comments

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../opensearch/action/bulk/BackoffPolicy.java | 18 +++++++-------
 .../action/support/RetryableAction.java       |  2 +-
 .../action/bulk/BackoffPolicyTests.java       |  2 +-
 ...ransportClusterManagerNodeActionTests.java | 24 +------------------
 4 files changed, 12 insertions(+), 34 deletions(-)

diff --git a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
index ac4062b342e50..185e6b76ec18e 100644
--- a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
+++ b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
@@ -121,13 +121,13 @@ public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int max
 
     /**
      *  It provides exponential backoff between retries until it reaches Integer.MAX_VALUE.
-     *  It will make random distribution of delay.
+     *  It uses full jitter scheme for random distribution.
      *
      * @param baseDelay BaseDelay for exponential Backoff
-     * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
+     * @return A backoff policy with exponential backoff with full jitter.
      */
-    public static BackoffPolicy exponentialRandomBackoff(long baseDelay) {
-        return new ExponentialRandomBackoff(baseDelay);
+    public static BackoffPolicy exponentialFullJitterBackoff(long baseDelay) {
+        return new ExponentialFullJitterBackoff(baseDelay);
     }
 
     /**
@@ -281,26 +281,26 @@ public TimeValue next() {
         }
     }
 
-    private static class ExponentialRandomBackoff extends BackoffPolicy {
+    private static class ExponentialFullJitterBackoff extends BackoffPolicy {
         private final long baseDelay;
 
-        private ExponentialRandomBackoff(long baseDelay) {
+        private ExponentialFullJitterBackoff(long baseDelay) {
             this.baseDelay = baseDelay;
         }
 
         @Override
         public Iterator<TimeValue> iterator() {
-            return new ExponentialRandomBackoffIterator(baseDelay);
+            return new ExponentialFullJitterBackoffIterator(baseDelay);
         }
     }
 
-    private static class ExponentialRandomBackoffIterator implements Iterator<TimeValue> {
+    private static class ExponentialFullJitterBackoffIterator implements Iterator<TimeValue> {
         /**
          * Current delay in exponential backoff
          */
         private long currentDelay;
 
-        private ExponentialRandomBackoffIterator(long baseDelay) {
+        private ExponentialFullJitterBackoffIterator(long baseDelay) {
             this.currentDelay = baseDelay;
         }
 
diff --git a/server/src/main/java/org/opensearch/action/support/RetryableAction.java b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
index 3c4b2a21d87ad..e7a9d7545342b 100644
--- a/server/src/main/java/org/opensearch/action/support/RetryableAction.java
+++ b/server/src/main/java/org/opensearch/action/support/RetryableAction.java
@@ -82,7 +82,7 @@ public RetryableAction(
             initialDelay,
             timeoutValue,
             listener,
-            BackoffPolicy.exponentialRandomBackoff(initialDelay.getMillis()),
+            BackoffPolicy.exponentialFullJitterBackoff(initialDelay.getMillis()),
             ThreadPool.Names.SAME
         );
     }
diff --git a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
index 8bdaf7aae8c57..2f9ae9a154f46 100644
--- a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
+++ b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java
@@ -103,7 +103,7 @@ public void testExponentialBackOffPolicy() {
         long baseDelay = 10;
         int maxDelay = 10000;
         long currentDelay = baseDelay;
-        BackoffPolicy policy = BackoffPolicy.exponentialRandomBackoff(baseDelay);
+        BackoffPolicy policy = BackoffPolicy.exponentialFullJitterBackoff(baseDelay);
         Iterator<TimeValue> iterator = policy.iterator();
 
         // Assert equal jitter
diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
index ac999df6499f0..ab878b819d46f 100644
--- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
+++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
@@ -657,7 +657,7 @@ public void testRetryForDifferentException() throws InterruptedException, Broken
             protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
                 throws Exception {
                 if (exception.getAndSet(false)) {
-                    throw new Exception("Different excetion");
+                    throw new Exception("Different exception");
                 } else {
                     // If called second time due to retry, throw exception
                     retried.set(true);
@@ -670,26 +670,4 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
         assertFalse(retried.get());
         assertFalse(exception.get());
     }
-
-    public void testShouldRetry() {
-        AtomicBoolean exception = new AtomicBoolean(true);
-        AtomicBoolean retried = new AtomicBoolean(false);
-        CyclicBarrier barrier = new CyclicBarrier(2);
-        TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) {
-            @Override
-            protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
-                if (exception.getAndSet(false)) {
-                    throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test");
-                } else {
-                    try {
-                        retried.set(true);
-                        barrier.await();
-                    } catch (Exception e) {
-                        throw new AssertionError();
-                    }
-                }
-            }
-        };
-
-    }
 }

From b3721e490b3e8b8813151ea016a8acdf7576c5f3 Mon Sep 17 00:00:00 2001
From: Dhwanil Patel <dhwanip@amazon.com>
Date: Fri, 2 Sep 2022 18:04:18 +0530
Subject: [PATCH 9/9] Changed throttling exception name due to merge from main

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
---
 .../clustermanager/TransportClusterManagerNodeAction.java   | 6 +++---
 .../TransportClusterManagerNodeActionTests.java             | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
index 4d8724e95f2d3..51bde108dff73 100644
--- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
+++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java
@@ -52,8 +52,8 @@
 import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
 import org.opensearch.cluster.node.DiscoveryNode;
 import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.cluster.service.ClusterManagerThrottlingException;
 import org.opensearch.cluster.service.ClusterService;
-import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.io.stream.StreamInput;
 import org.opensearch.common.io.stream.Writeable;
 import org.opensearch.common.unit.TimeValue;
@@ -210,9 +210,9 @@ public boolean shouldRetry(Exception e) {
             // in that case we would want throttling retry to perform on remote node only not on this master node.
             if (request.remoteAddress() == null) {
                 if (e instanceof TransportException) {
-                    return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
+                    return ((TransportException) e).unwrapCause() instanceof ClusterManagerThrottlingException;
                 }
-                return e instanceof MasterTaskThrottlingException;
+                return e instanceof ClusterManagerThrottlingException;
             }
             return false;
         }
diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
index 2f2490147465d..c45bae224dbd6 100644
--- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
+++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java
@@ -52,8 +52,8 @@
 import org.opensearch.cluster.node.DiscoveryNode;
 import org.opensearch.cluster.node.DiscoveryNodeRole;
 import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.cluster.service.ClusterManagerThrottlingException;
 import org.opensearch.cluster.service.ClusterService;
-import org.opensearch.cluster.service.MasterTaskThrottlingException;
 import org.opensearch.common.io.stream.StreamInput;
 import org.opensearch.common.io.stream.StreamOutput;
 import org.opensearch.common.settings.Settings;
@@ -623,7 +623,7 @@ public void testThrottlingRetryLocalMaster() throws InterruptedException, Broken
             @Override
             protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
                 if (exception.getAndSet(false)) {
-                    throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test");
+                    throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test");
                 } else {
                     try {
                         retried.set(true);
@@ -663,7 +663,7 @@ public void testThrottlingRetryRemoteMaster() throws ExecutionException, Interru
         assertThat(capturedRequest.action, equalTo("internal:testAction"));
         transport.handleRemoteError(
             capturedRequest.requestId,
-            new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")
+            new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test")
         );
 
         assertFalse(listener.isDone());