Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data node changes for master task throttling #4204

Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.bulk.BackoffPolicy;
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -64,6 +66,7 @@ public abstract class RetryableAction<Response> {
private final long startMillis;
private final ActionListener<Response> finalListener;
private final String executor;
private final BackoffPolicy backoffPolicy;

private volatile Scheduler.ScheduledCancellable retryTask;

Expand All @@ -74,7 +77,7 @@ public RetryableAction(
TimeValue timeoutValue,
ActionListener<Response> listener
) {
this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
this(logger, threadPool, initialDelay, timeoutValue, listener, null, ThreadPool.Names.SAME);
}

public RetryableAction(
Expand All @@ -83,6 +86,7 @@ public RetryableAction(
TimeValue initialDelay,
TimeValue timeoutValue,
ActionListener<Response> listener,
BackoffPolicy backoffPolicy,
String executor
) {
this.logger = logger;
Expand All @@ -95,10 +99,13 @@ public RetryableAction(
this.startMillis = threadPool.relativeTimeInMillis();
this.finalListener = listener;
this.executor = executor;
this.backoffPolicy = backoffPolicy;
}

public void run() {
final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null);
final RetryingListener retryingListener = backoffPolicy == null
? new DefaultRetryingListener(initialDelayMillis, null)
: new CustomRetryingListener(backoffPolicy.iterator(), null);
final Runnable runnable = createRunnable(retryingListener);
threadPool.executor(executor).execute(runnable);
}
Expand Down Expand Up @@ -142,18 +149,59 @@ public void onRejection(Exception e) {

public void onFinished() {}

private class RetryingListener implements ActionListener<Response> {
private class CustomRetryingListener extends RetryingListener {

private static final int MAX_EXCEPTIONS = 4;
private Iterator<TimeValue> backoffDelayIterator;
private ArrayDeque<Exception> caughtExceptions;

private CustomRetryingListener(Iterator<TimeValue> backoffDelayIterator, ArrayDeque<Exception> caughtExceptions) {
super(caughtExceptions);
this.backoffDelayIterator = backoffDelayIterator;
this.caughtExceptions = caughtExceptions;
}

public RetryingListener getRetryingListenerForNextRetry() {
return this;
}

public long getRetryDelay() {
return backoffDelayIterator.next().millis();
}

}

private class DefaultRetryingListener extends RetryingListener {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

private final long delayMillisBound;
private ArrayDeque<Exception> caughtExceptions;

private RetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
private DefaultRetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
super(caughtExceptions);
this.delayMillisBound = delayMillisBound;
this.caughtExceptions = caughtExceptions;
}

public RetryingListener getRetryingListenerForNextRetry() {
final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
final RetryingListener retryingListener = new DefaultRetryingListener(nextDelayMillisBound, caughtExceptions);
return retryingListener;
}

public long getRetryDelay() {
return Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
}
}

private abstract class RetryingListener implements ActionListener<Response> {

private static final int MAX_EXCEPTIONS = 4;

private ArrayDeque<Exception> caughtExceptions;

private RetryingListener(ArrayDeque<Exception> caughtExceptions) {
this.caughtExceptions = caughtExceptions;
}

@Override
public void onResponse(Response response) {
if (isDone.compareAndSet(false, true)) {
Expand All @@ -162,6 +210,10 @@ public void onResponse(Response response) {
}
}

public abstract RetryingListener getRetryingListenerForNextRetry();

public abstract long getRetryDelay();

dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onFailure(Exception e) {
if (shouldRetry(e)) {
Expand All @@ -175,10 +227,9 @@ public void onFailure(Exception e) {
} else {
addException(e);

final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
final long delayMillis = getRetryDelay();
final RetryingListener retryingListener = getRetryingListenerForNextRetry();
final Runnable runnable = createRunnable(retryingListener);
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,21 @@ public abstract class ClusterManagerNodeRequest<Request extends ClusterManagerNo
@Deprecated
protected TimeValue masterNodeTimeout = clusterManagerNodeTimeout;

protected boolean remoteRequest;

protected ClusterManagerNodeRequest() {}

protected ClusterManagerNodeRequest(StreamInput in) throws IOException {
super(in);
clusterManagerNodeTimeout = in.readTimeValue();
remoteRequest = in.readOptionalBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeTimeValue(clusterManagerNodeTimeout);
out.writeOptionalBoolean(remoteRequest);
}

/**
Expand Down Expand Up @@ -110,6 +114,11 @@ public final Request masterNodeTimeout(String timeout) {
return clusterManagerNodeTimeout(timeout);
}

public final Request setRemoteRequest(boolean remoteRequest) {
this.remoteRequest = remoteRequest;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still unclear why do we need this flag? The action knows where it should be executed, the retry listener should just help run the same action after scheduled delay on same threadpool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this distinguishion between requests coming to Master node as we have same code block (In TransportClusterManagerNodeAction) executing for both the case (local/remote master) and our retry logic is also on top of it only.

Using this flag we will determine whether request is generated from local node or from remote node. If it is local node's request we need to perform the retries on this node. If it is remote node's request, we will not perform retries on this node and let remote node perform the retries.

If request is from remote data node, then data node will set remoteRequest flag in {@link MasterNodeRequest} and send request to master, using that on master node we can determine if the request was localRequest or remoteRequest.

return (Request) this;
}

public final TimeValue clusterManagerNodeTimeout() {
return this.clusterManagerNodeTimeout;
}
Expand All @@ -119,4 +128,8 @@ public final TimeValue clusterManagerNodeTimeout() {
public final TimeValue masterNodeTimeout() {
return clusterManagerNodeTimeout();
}

public final boolean isRemoteRequest() {
return this.remoteRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.clustermanager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.transport.TransportException;

import java.util.Iterator;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* ActionListener for retrying the Throttled master tasks.
* It schedules the retry on the Throttling Exception from master node and
* delegates the response if it receive response from master.
*
* It uses ExponentialEqualJitterBackoff policy for determining delay between retries.
*/
public class MasterThrottlingRetryListener<Request extends ClusterManagerNodeRequest<Request>, Response extends ActionResponse>
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
implements
ActionListener<Response> {

private static final Logger logger = LogManager.getLogger(MasterThrottlingRetryListener.class);

/**
* Base delay in millis.
*/
private final int BASE_DELAY_MILLIS = 10;

/**
* Maximum delay in millis.
*/
private final int MAX_DELAY_MILLIS = 5000;

private long totalDelay;
private final Iterator<TimeValue> backoffDelay;
private final ActionListener<Response> listener;
private final Request request;
private final Runnable runnable;
private final String actionName;
private final boolean localNodeRequest;

private static ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

public MasterThrottlingRetryListener(String actionName, Request request, Runnable runnable, ActionListener<Response> actionListener) {
this.actionName = actionName;
this.listener = actionListener;
this.request = request;
this.runnable = runnable;
this.backoffDelay = BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS).iterator();
/**
This is to determine whether request is generated from local node or from remote node.
If it is local node's request we need to perform the retries on this node.
If it is remote node's request, we will not perform retries on this node and let remote node perform the retries.

If request is from remote data node, then data node will set remoteRequest flag in {@link MasterNodeRequest}
and send request to master, using that on master node we can determine if the request was localRequest or remoteRequest.
*/
this.localNodeRequest = !(request.isRemoteRequest());
}

@Override
public void onResponse(Response response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {

if (localNodeRequest && isThrottlingException(e)) {
logger.info("Retrying [{}] on throttling exception from master. Error: [{}]", actionName, getExceptionMessage(e));
long delay = backoffDelay.next().getMillis();
if (totalDelay + delay >= request.clusterManagerNodeTimeout.getMillis()) {
delay = request.clusterManagerNodeTimeout.getMillis() - totalDelay;
scheduler.schedule(new Runnable() {
@Override
public void run() {
listener.onFailure(new ProcessClusterEventTimeoutException(request.clusterManagerNodeTimeout, actionName));
}
}, delay, TimeUnit.MILLISECONDS);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
} else {
scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
totalDelay += delay;
} else {
listener.onFailure(e);
}
}

/**
* For Testcase purposes.
* @param retrySceduler scheduler defined in test cases.
*/
public static void setThrottlingRetryScheduler(ScheduledThreadPoolExecutor retrySceduler) {
scheduler = retrySceduler;
}

private boolean isThrottlingException(Exception e) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
}
return e instanceof MasterTaskThrottlingException;
}

private String getExceptionMessage(Exception e) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause().getMessage();
} else {
return e.getMessage();
}
}

public static long getRetryingTasksCount() {
return scheduler.getActiveCount() + scheduler.getQueue().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.MasterNodeChangePredicate;
Expand All @@ -51,6 +52,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -134,32 +136,51 @@ protected boolean localExecute(Request request) {

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
}
new AsyncSingleAction(task, request, listener).doStart(state);
new AsyncSingleAction(task, request, listener).run();
}

/**
* Asynchronous single action
*
* @opensearch.internal
*/
class AsyncSingleAction {
class AsyncSingleAction extends RetryableAction {

private final ActionListener<Response> listener;
private ActionListener<Response> listener;
private final Request request;
private ClusterStateObserver observer;
private final long startTime;
private final Task task;
private boolean localRequest;

AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
super(logger, threadPool, TimeValue.timeValueMillis(10), request.clusterManagerNodeTimeout, listener);
this.task = task;
this.request = request;
this.listener = listener;
this.startTime = threadPool.relativeTimeInMillis();
localRequest = !request.remoteRequest;
}

@Override
public void tryAction(ActionListener retryListener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
this.listener = retryListener;
doStart(state);
}

@Override
public boolean shouldRetry(Exception e) {
if (localRequest) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is remote node retrying MasterTaskThrottlingException now ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When request would be made to remote master node, we will set remoteRequest flag in it and send it to master.

For throttling exception, master will not perform the retry on it based on this check and let the exception flow to the data node and data node will perform the retry.

Since same code block is getting run for both remote/local master we need this segregation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As RetryableAction is triggering final listener after all the retries have exhausted, that explains why there is a need to differentiate local vs remote call? Can we differentiate by checking the transport request sourceNode instead of changing the request object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure made changes to rely on remoteAddress of the request, which will be null for local request for remote address it will have remote node's transport address.

Removed the new filed from the request.

if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
}
return e instanceof MasterTaskThrottlingException;
}
return false;
}

protected void doStart(ClusterState clusterState) {
Expand Down Expand Up @@ -210,6 +231,7 @@ protected void doStart(ClusterState clusterState) {
} else {
DiscoveryNode clusterManagerNode = nodes.getMasterNode();
final String actionName = getClusterManagerActionName(clusterManagerNode);
request.setRemoteRequest(true);
transportService.sendRequest(
clusterManagerNode,
actionName,
Expand Down
Loading