From 14acc5c715722d1a77812935452b997c9fb075a6 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Mon, 6 Jun 2022 16:48:11 +0530 Subject: [PATCH] Data node changes for master task throttling Signed-off-by: Dhwanil Patel --- .../ClusterManagerNodeRequest.java | 13 ++ .../MasterThrottlingRetryListener.java | 131 +++++++++++++ .../TransportClusterManagerNodeAction.java | 13 +- .../MasterThrottlingRetryListenerTests.java | 175 ++++++++++++++++++ ...ransportClusterManagerNodeActionTests.java | 80 ++++++++ 5 files changed, 408 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListener.java create mode 100644 server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java b/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java index 9d8a79cfed11d..d20a4d99cf767 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/ClusterManagerNodeRequest.java @@ -58,17 +58,21 @@ public abstract class ClusterManagerNodeRequest, Response extends ActionResponse> + implements + ActionListener { + + private static final Logger logger = LogManager.getLogger(MasterThrottlingRetryListener.class); + + /** + * Base delay in millis. + */ + private final int BASE_DELAY_MILLIS = 10; + + /** + * Maximum delay in millis. + */ + private final int MAX_DELAY_MILLIS = 5000; + + private long totalDelay; + private final Iterator backoffDelay; + private final ActionListener listener; + private final Request request; + private final Runnable runnable; + private final String actionName; + private final boolean localNodeRequest; + + private static ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + + public MasterThrottlingRetryListener(String actionName, Request request, Runnable runnable, ActionListener actionListener) { + this.actionName = actionName; + this.listener = actionListener; + this.request = request; + this.runnable = runnable; + this.backoffDelay = BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS).iterator(); + /** + This is to determine whether request is generated from local node or from remote node. + If it is local node's request we need to perform the retries on this node. + If it is remote node's request, we will not perform retries on this node and let remote node perform the retries. + + If request is from remote data node, then data node will set remoteRequest flag in {@link MasterNodeRequest} + and send request to master, using that on master node we can determine if the request was localRequest or remoteRequest. + */ + this.localNodeRequest = !(request.isRemoteRequest()); + } + + @Override + public void onResponse(Response response) { + listener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + + if (localNodeRequest && isThrottlingException(e)) { + logger.info("Retrying [{}] on throttling exception from master. Error: [{}]", actionName, getExceptionMessage(e)); + long delay = backoffDelay.next().getMillis(); + if (totalDelay + delay >= request.clusterManagerNodeTimeout.getMillis()) { + delay = request.clusterManagerNodeTimeout.getMillis() - totalDelay; + scheduler.schedule(new Runnable() { + @Override + public void run() { + listener.onFailure(new ProcessClusterEventTimeoutException(request.clusterManagerNodeTimeout, actionName)); + } + }, delay, TimeUnit.MILLISECONDS); + } else { + scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + totalDelay += delay; + } else { + listener.onFailure(e); + } + } + + /** + * For Testcase purposes. + * @param retrySceduler scheduler defined in test cases. + */ + public static void setThrottlingRetryScheduler(ScheduledThreadPoolExecutor retrySceduler) { + scheduler = retrySceduler; + } + + private boolean isThrottlingException(Exception e) { + if (e instanceof TransportException) { + return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException; + } + return e instanceof MasterTaskThrottlingException; + } + + private String getExceptionMessage(Exception e) { + if (e instanceof TransportException) { + return ((TransportException) e).unwrapCause().getMessage(); + } else { + return e.getMessage(); + } + } + + public static long getRetryingTasksCount() { + return scheduler.getActiveCount() + scheduler.getQueue().size(); + } +} diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 46c93a8d33c7b..4744107d28854 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -134,12 +134,10 @@ protected boolean localExecute(Request request) { @Override protected void doExecute(Task task, final Request request, ActionListener listener) { - ClusterState state = clusterService.state(); - logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); } - new AsyncSingleAction(task, request, listener).doStart(state); + new AsyncSingleAction(task, request, listener).start(); } /** @@ -158,10 +156,16 @@ class AsyncSingleAction { AsyncSingleAction(Task task, Request request, ActionListener listener) { this.task = task; this.request = request; - this.listener = listener; + this.listener = new MasterThrottlingRetryListener(actionName, request, this::start, listener); this.startTime = threadPool.relativeTimeInMillis(); } + public void start() { + ClusterState state = clusterService.state(); + logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); + doStart(state); + } + protected void doStart(ClusterState clusterState) { try { final DiscoveryNodes nodes = clusterState.nodes(); @@ -210,6 +214,7 @@ protected void doStart(ClusterState clusterState) { } else { DiscoveryNode clusterManagerNode = nodes.getMasterNode(); final String actionName = getClusterManagerActionName(clusterManagerNode); + request.setRemoteRequest(true); transportService.sendRequest( clusterManagerNode, actionName, diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java new file mode 100644 index 0000000000000..dc46590d60951 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/MasterThrottlingRetryListenerTests.java @@ -0,0 +1,175 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.clustermanager; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; +import org.opensearch.cluster.service.MasterTaskThrottlingException; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.Scheduler; + +import java.time.Instant; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.AfterClass; +import org.junit.Before; + +/** + * Test class of {@link MasterThrottlingRetryListener} + */ +public class MasterThrottlingRetryListenerTests extends OpenSearchTestCase { + private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler); + } + + @AfterClass + public static void afterClass() { + Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS); + } + + public void testRetryForLocalRequest() throws BrokenBarrierException, InterruptedException { + TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicBoolean callBackExecuted = new AtomicBoolean(); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + callBackExecuted.set(true); + barrier.await(); + } catch (Exception e) { + new AssertionError(); + } + } + }; + + ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener); + + taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")); + barrier.await(); + assertTrue(callBackExecuted.get()); + } + + public void testRetryForRemoteRequest() throws BrokenBarrierException, InterruptedException { + TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request(); + request.setRemoteRequest(true); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean callBackExecuted = new AtomicBoolean(false); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + callBackExecuted.set(true); + } catch (Exception e) { + new AssertionError(); + } + } + }; + + ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener); + + taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")); + Thread.sleep(100); // some buffer time so callback can execute. + assertFalse(callBackExecuted.get()); + } + + public void testTimedOut() throws BrokenBarrierException, InterruptedException { + + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicBoolean onFailureExecuted = new AtomicBoolean(); + AtomicBoolean retryExecuted = new AtomicBoolean(); + AtomicBoolean firstExecute = new AtomicBoolean(true); + int timeOutSec = randomIntBetween(1, 5); + final Instant[] startTime = new Instant[1]; + final Instant[] endTime = new Instant[1]; + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Object o) { + new AssertionError(); + } + + @Override + public void onFailure(Exception e) { + endTime[0] = Instant.now(); + try { + onFailureExecuted.set(true); + barrier.await(); + } catch (Exception exe) { + new AssertionError(); + } + assertEquals(ProcessClusterEventTimeoutException.class, e.getClass()); + } + }; + TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request() + .clusterManagerNodeTimeout(TimeValue.timeValueSeconds(timeOutSec)); + + class TestRetryClass { + ActionListener listener; + + TestRetryClass(ActionListener listener) { + this.listener = new MasterThrottlingRetryListener("test", request, this::execute, listener); + } + + public void execute() { + if (firstExecute.getAndSet(false)) { + startTime[0] = Instant.now(); + } + listener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")); + } + } + + TestRetryClass testRetryClass = new TestRetryClass(listener); + testRetryClass.execute(); + + barrier.await(); + assertEquals(timeOutSec, (endTime[0].toEpochMilli() - startTime[0].toEpochMilli()) / 1000); + assertTrue(onFailureExecuted.get()); + assertFalse(retryExecuted.get()); + } + + public void testRetryForDifferentException() { + + TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean callBackExecuted = new AtomicBoolean(); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + callBackExecuted.set(true); + } catch (Exception e) { + new AssertionError(); + } + } + }; + + ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener); + + taskRetryListener.onFailure(new Exception()); + assertFalse(callBackExecuted.get()); + + taskRetryListener.onFailure(new OpenSearchRejectedExecutionException("Different Exception")); + assertFalse(callBackExecuted.get()); + } +} diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index a5828edd65c16..b2e1b7c32b2f7 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -53,6 +53,7 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.service.MasterTaskThrottlingException; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.settings.Settings; @@ -66,6 +67,7 @@ import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.Scheduler; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.TransportService; import org.junit.After; @@ -80,6 +82,10 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -95,6 +101,7 @@ public class TransportClusterManagerNodeActionTests extends OpenSearchTestCase { private DiscoveryNode localNode; private DiscoveryNode remoteNode; private DiscoveryNode[] allNodes; + private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY); @BeforeClass public static void beforeClass() { @@ -132,6 +139,7 @@ public void setUp() throws Exception { Version.CURRENT ); allNodes = new DiscoveryNode[] { localNode, remoteNode }; + MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler); } @After @@ -144,6 +152,7 @@ public void tearDown() throws Exception { @AfterClass public static void afterClass() { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS); threadPool = null; } @@ -568,4 +577,75 @@ public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws assertTrue(listener.isDone()); assertThat(listener.get(), equalTo(response)); } + + public void testThrottlingRetryLocalMaster() throws InterruptedException, BrokenBarrierException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean exception = new AtomicBoolean(true); + AtomicBoolean retried = new AtomicBoolean(false); + CyclicBarrier barrier = new CyclicBarrier(2); + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode })); + + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + if (exception.getAndSet(false)) { + throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"); + } else { + try { + retried.set(true); + barrier.await(); + } catch (Exception e) { + throw new AssertionError(); + } + } + } + }; + action.execute(request, listener); + + barrier.await(); + assertTrue(retried.get()); + assertFalse(exception.get()); + } + + public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException { + Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60)); + DiscoveryNode masterNode = this.remoteNode; + setState( + clusterService, + // use a random base version so it can go down when simulating a restart. + ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, new DiscoveryNode[] { localNode, masterNode })) + .version(randomIntBetween(0, 10)) + ); + + PlainActionFuture listener = new PlainActionFuture<>(); + TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool); + action.execute(request, listener); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0]; + assertTrue(capturedRequest.node.isMasterNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("internal:testAction")); + transport.handleRemoteError( + capturedRequest.requestId, + new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test") + ); + + assertFalse(listener.isDone()); + + // waiting for retry to trigger + Thread.sleep(100); + + // Retry for above throttling exception + capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + capturedRequest = capturedRequests[0]; + Response response = new Response(); + transport.handleResponse(capturedRequest.requestId, response); + + assertTrue(listener.isDone()); + listener.get(); + } }