diff --git a/server/src/internalClusterTest/java/org/opensearch/master/MasterTaskThrottlingIT.java b/server/src/internalClusterTest/java/org/opensearch/master/MasterTaskThrottlingIT.java new file mode 100644 index 0000000000000..34cacfa1ae1d3 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/master/MasterTaskThrottlingIT.java @@ -0,0 +1,216 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.master; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.opensearch.action.support.master.MasterThrottlingRetryListener; +import org.opensearch.cluster.service.MasterTaskThrottlingException; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.cluster.service.MasterTaskThrottler; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.TransportMessageListener; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class MasterTaskThrottlingIT extends OpenSearchIntegTestCase { + + private static final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + + /* + * This integ test will test end-end master throttling feature for + * remote master. + * + * It will check the number of request coming to master node + * should be total number of requests + throttled requests from master. + * This will ensure the end-end feature is working as master is throwing + * Throttling exception and data node is performing retries on it. + * + */ + public void testThrottlingForRemoteMaster() throws Exception { + try { + internalCluster().beforeTest(random(), 0); + String masterNode = internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + int throttlingLimit = randomIntBetween(1, 5); + createIndex("test"); + + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + Settings settings = Settings.builder() + .put("master.throttling.thresholds.put-mapping.value", throttlingLimit) + .build(); + settingsRequest.transientSettings(settings); + assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); + + TransportService masterTransportService = (internalCluster().getInstance(TransportService.class, masterNode)); + AtomicInteger requestCountOnMaster = new AtomicInteger(); + AtomicInteger throttledRequest = new AtomicInteger(); + int totalRequest = randomIntBetween(throttlingLimit, 5 * throttlingLimit); + CountDownLatch latch = new CountDownLatch(totalRequest); + + masterTransportService.addMessageListener(new TransportMessageListener() { + @Override + public void onRequestReceived(long requestId, String action) { + if (action.contains("mapping")) { + requestCountOnMaster.incrementAndGet(); + } + } + + @Override + public void onResponseSent(long requestId, String action, Exception error) { + if (action.contains("mapping")) { + throttledRequest.incrementAndGet(); + assertEquals(MasterTaskThrottlingException.class, error.getClass()); + } + } + }); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Object o) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + throw new AssertionError(e); + } + }; + + Thread[] threads = new Thread[totalRequest]; + for (int i = 0; i < totalRequest; i++) { + PutMappingRequest putMappingRequest = new PutMappingRequest("test") + .type("type") + .source("field" + i, "type=text"); + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + internalCluster().client(dataNode).admin().indices().putMapping(putMappingRequest, listener); + } + }); + } + for (int i = 0; i < totalRequest; i++) { + threads[i].run(); + } + for (int i = 0; i < totalRequest; i++) { + threads[i].join(); + } + latch.await(); + + assertEquals(totalRequest + throttledRequest.get(), requestCountOnMaster.get()); + assertBusy(() -> { + assertEquals(clusterService().getMasterService().numberOfThrottledPendingTasks(), throttledRequest.get()); + }); + assertEquals(MasterThrottlingRetryListener.getRetryingTasksCount(), 0); + } + finally { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + Settings settings = Settings.builder() + .put("master.throttling.thresholds.put-mapping.value", (String) null) + .build(); + settingsRequest.transientSettings(settings); + assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + } + + /* + * This will test the throttling feature for single node. + * + * Here we will assert the client behaviour that client's request is not + * failed, i.e. Throttling exception is not passed to the client. + * Data node will internally do the retry and request should pass. + * + */ + public void testThrottlingForSingleNode() throws Exception { + try { + internalCluster().beforeTest(random(), 0); + String node = internalCluster().startNode(); + int throttlingLimit = randomIntBetween(1, 5); + createIndex("test"); + + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + Settings settings = Settings.builder() + .put("master.throttling.thresholds.put-mapping.value", throttlingLimit) + .build(); + settingsRequest.transientSettings(settings); + assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); + + AtomicInteger successfulRequest = new AtomicInteger(); + int totalRequest = randomIntBetween(throttlingLimit, 3 * throttlingLimit); + CountDownLatch latch = new CountDownLatch(totalRequest); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Object o) { + latch.countDown(); + successfulRequest.incrementAndGet(); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + throw new AssertionError(e); + } + }; + + Thread[] threads = new Thread[totalRequest]; + for (int i = 0; i < totalRequest; i++) { + PutMappingRequest putMappingRequest = new PutMappingRequest("test") + .type("type") + .source("field" + i, "type=text"); + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + internalCluster().client(node).admin().indices().putMapping(putMappingRequest, listener); + } + }); + } + for (int i = 0; i < totalRequest; i++) { + threads[i].run(); + } + for (int i = 0; i < totalRequest; i++) { + threads[i].join(); + } + + latch.await(); + assertEquals(totalRequest, successfulRequest.get()); + assertEquals(MasterThrottlingRetryListener.getRetryingTasksCount(), 0); + } + finally { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + Settings settings = Settings.builder() + .put("master.throttling.thresholds.put-mapping.value", (String) null) + .build(); + settingsRequest.transientSettings(settings); + assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + } + + @BeforeClass + public static void initTestScheduler() { + MasterThrottlingRetryListener.setThrottlingRetryScheduler(scheduler); + } + + @AfterClass + public static void terminateScheduler() { + Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS); + } +} diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index a6a12d7ebb4f7..55dadd63adbb6 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -1594,6 +1594,15 @@ private enum OpenSearchExceptionHandle { org.opensearch.transport.NoSeedNodeLeftException::new, 160, LegacyESVersion.V_7_10_0 + ), + /** + * TODO: Change the version number of check as per version in which this change will be merged. + */ + MASTER_TASK_THROTTLED_EXCEPTION( + org.opensearch.cluster.service.MasterTaskThrottlingException.class, + org.opensearch.cluster.service.MasterTaskThrottlingException::new, + 161, + Version.V_1_3_0 ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index 406ff4bcd8e06..dc2eb270696b8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -241,6 +241,11 @@ static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClus this.allocationService = allocationService; } + @Override + public String getMasterThrottlingKey() { + return "cluster-reroute-api"; + } + @Override protected ClusterRerouteResponse newResponse(boolean acknowledged) { return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index af5da6f538d67..df559d8469fde 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -136,6 +136,11 @@ protected void masterOperation( private volatile boolean changed = false; + @Override + public String getMasterThrottlingKey() { + return "cluster-update-settings"; + } + @Override protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate()); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java index 6b510291f1ccb..ccc40e827f996 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java @@ -138,6 +138,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public String getMasterThrottlingKey() { + return "auto-create"; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata()); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java index 495e8cb1fcac8..c982f968bafd3 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java @@ -155,6 +155,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { return new AcknowledgedResponse(acknowledged); } + @Override + public String getMasterThrottlingKey() { + return "delete-dangling-index"; + } + @Override public ClusterState execute(final ClusterState currentState) { return deleteDanglingIndex(currentState, indexToDelete); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 95ac25c47e842..e284620eab2d8 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -208,6 +208,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "remove-data-stream"; + } + @Override public ClusterState execute(ClusterState currentState) { return removeDataStream(deleteIndexService, currentState, request); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java index 1a2f4be522e2b..b94f8827873f3 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -204,6 +204,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { return rolloverResult.clusterState; } + @Override + public String getMasterThrottlingKey() { + return "rollover-index"; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java index 0b22ee04141ed..ec5b57dd156a5 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java +++ b/server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java @@ -31,6 +31,7 @@ package org.opensearch.action.bulk; +import org.opensearch.common.Randomness; import org.opensearch.common.unit.TimeValue; import java.util.Iterator; @@ -105,6 +106,19 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries); } + /** + * It provides exponential backoff between retries until it reaches maxDelay. + * It uses equal jitter scheme as it is being used for throttled exceptions. + * It will make random distribution and also guarantees a minimum delay. + * + * @param baseDelay BaseDelay for exponential Backoff + * @param maxDelay MaxDelay that can be returned from backoff policy + * @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay + */ + public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelay) { + return new ExponentialEqualJitterBackoff(baseDelay, maxDelay); + } + /** * Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy. */ @@ -197,6 +211,61 @@ public TimeValue next() { } } + private static class ExponentialEqualJitterBackoff extends BackoffPolicy { + private final int maxDelay; + private final int baseDelay; + + private ExponentialEqualJitterBackoff(int baseDelay, int maxDelay) { + this.maxDelay = maxDelay; + this.baseDelay = baseDelay; + } + + @Override + public Iterator iterator() { + return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelay); + } + } + + private static class ExponentialEqualJitterBackoffIterator implements Iterator { + /** + * Maximum retry limit. Avoids integer overflow issues. + * Post Max Retries, max delay will be returned with Equal Jitter. + * + * NOTE: If the value is greater than 30, there can be integer overflow + * issues during delay calculation. + **/ + private final int MAX_RETRIES = 30; + + private final int maxDelay; + private final int baseDelay; + private int retriesAttempted; + + private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelay) { + this.baseDelay = baseDelay; + this.maxDelay = maxDelay; + } + + /** + * There is not any limit for this BackOff. + * This Iterator will always return back off delay. + * + * @return true + */ + @Override + public boolean hasNext() { + return true; + } + + @Override + public TimeValue next() { + int retries = Math.min(retriesAttempted, MAX_RETRIES); + int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelay); + retriesAttempted++; + return TimeValue.timeValueMillis((exponentialDelay/2) + + Randomness.get().nextInt(exponentialDelay/2 + 1)); + } + } + /** * Concrete Constant Back Off Policy * diff --git a/server/src/main/java/org/opensearch/action/support/master/MasterNodeRequest.java b/server/src/main/java/org/opensearch/action/support/master/MasterNodeRequest.java index 34a8c65dde491..6fe7538c69e0f 100644 --- a/server/src/main/java/org/opensearch/action/support/master/MasterNodeRequest.java +++ b/server/src/main/java/org/opensearch/action/support/master/MasterNodeRequest.java @@ -49,18 +49,21 @@ public abstract class MasterNodeRequest, Response extends ActionResponse> + implements ActionListener { + + private static final Logger logger = LogManager.getLogger(MasterThrottlingRetryListener.class); + + /** + * Base delay in millis. + */ + private final int BASE_DELAY_MILLIS = 10; + + /** + * Maximum delay in millis. + */ + private final int MAX_DELAY_MILLIS = 5000; + + private long totalDelay; + private final Iterator backoffDelay; + private final ActionListener listener; + private final Request request; + private final Runnable runnable; + private final String actionName; + private final boolean localNodeRequest; + + private static ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + + public MasterThrottlingRetryListener( + String actionName, + Request request, + Runnable runnable, + ActionListener actionListener) { + this.actionName = actionName; + this.listener = actionListener; + this.request = request; + this.runnable = runnable; + this.backoffDelay = BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS).iterator(); + /** + This is to determine whether request is generated from local node or from remote node. + If it is local node's request we need to perform the retries on this node. + If it is remote node's request, we will not perform retries on this node and let remote node perform the retries. + + If request is from remote data node, then data node will set remoteRequest flag in {@link MasterNodeRequest} + and send request to master, using that on master node we can determine if the request was localRequest or remoteRequest. + */ + this.localNodeRequest = !(request.isRemoteRequest()); + } + + @Override + public void onResponse(Response response) { + listener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + + if(localNodeRequest && isThrottlingException(e)) { + logger.info("Retrying [{}] on throttling exception from master. Error: [{}]", + actionName, getExceptionMessage(e)); + long delay = backoffDelay.next().getMillis(); + if(totalDelay + delay >= request.masterNodeTimeout.getMillis()) { + delay = request.masterNodeTimeout.getMillis() - totalDelay; + scheduler.schedule(new Runnable() { + @Override + public void run() { + listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout, actionName)); + } + }, delay, TimeUnit.MILLISECONDS); + } else { + scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS); + } + totalDelay += delay; + } else { + listener.onFailure(e); + } + } + + /** + * For Testcase purposes. + * @param retrySceduler scheduler defined in test cases. + */ + public static void setThrottlingRetryScheduler(ScheduledThreadPoolExecutor retrySceduler) { + scheduler = retrySceduler; + } + + private boolean isThrottlingException(Exception e) { + if(e instanceof TransportException) { + return ((TransportException)e).unwrapCause() instanceof MasterTaskThrottlingException; + } + return e instanceof MasterTaskThrottlingException; + } + + private String getExceptionMessage(Exception e) { + if(e instanceof TransportException) { + return ((TransportException)e).unwrapCause().getMessage(); + } else { + return e.getMessage(); + } + } + + public static long getRetryingTasksCount() { + return scheduler.getActiveCount() + scheduler.getQueue().size(); + } +} diff --git a/server/src/main/java/org/opensearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/opensearch/action/support/master/TransportMasterNodeAction.java index 4e0a180fe0cd4..7dc43230854d8 100644 --- a/server/src/main/java/org/opensearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/master/TransportMasterNodeAction.java @@ -134,12 +134,10 @@ protected boolean localExecute(Request request) { @Override protected void doExecute(Task task, final Request request, ActionListener listener) { - ClusterState state = clusterService.state(); - logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); } - new AsyncSingleAction(task, request, listener).doStart(state); + new AsyncSingleAction(task, request, listener).start(); } /** @@ -158,10 +156,16 @@ class AsyncSingleAction { AsyncSingleAction(Task task, Request request, ActionListener listener) { this.task = task; this.request = request; - this.listener = listener; + this.listener = new MasterThrottlingRetryListener(actionName, request, this::start, listener); this.startTime = threadPool.relativeTimeInMillis(); } + public void start() { + ClusterState state = clusterService.state(); + logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); + doStart(state); + } + protected void doStart(ClusterState clusterState) { try { final DiscoveryNodes nodes = clusterState.nodes(); @@ -210,6 +214,7 @@ protected void doStart(ClusterState clusterState) { } else { DiscoveryNode clusterManagerNode = nodes.getMasterNode(); final String actionName = getMasterActionName(clusterManagerNode); + request.setRemoteRequest(true); transportService.sendRequest( clusterManagerNode, actionName, diff --git a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java index 00e58d88d8798..3744711bc812f 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java @@ -78,6 +78,11 @@ default String describeTasks(List tasks) { return String.join(", ", tasks.stream().map(t -> (CharSequence) t.toString()).filter(t -> t.length() > 0)::iterator); } + public static final String DEFAULT_MASTER_THROOTLING_KEY = ""; + default String getMasterThrottlingKey() { + return DEFAULT_MASTER_THROOTLING_KEY; + } + /** * Represents the result of a batched execution of cluster state update tasks * @param the type of the cluster state update task diff --git a/server/src/main/java/org/opensearch/cluster/ClusterStateUpdateTask.java b/server/src/main/java/org/opensearch/cluster/ClusterStateUpdateTask.java index f9de49a1f7e58..ce94e5d684335 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterStateUpdateTask.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterStateUpdateTask.java @@ -82,6 +82,10 @@ public String describeTasks(List tasks) { */ public abstract void onFailure(String source, Exception e); +// public String getMasterThrottlingKey() { +// return "sample key"; +// } + @Override public final void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { // final, empty implementation here as this method should only be defined in combination diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java index 412d4dba628cb..8e0b94d0604be 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -113,6 +113,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { return clusterState; } + @Override + public String getMasterThrottlingKey() { + return "create-data-stream"; + } + @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 642b0f7b8d36f..e63fd0d76ee9b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -321,6 +321,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public String getMasterThrottlingKey() { + return "create-index"; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { return applyCreateIndexRequest(currentState, request, false); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java index 66f5edf3da129..b81cf5436fa04 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java @@ -98,6 +98,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public String getMasterThrottlingKey() { + return "delete-index"; + } + @Override public ClusterState execute(final ClusterState currentState) { return deleteIndices(currentState, Sets.newHashSet(request.indices())); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java index 8d6939a57240c..aa5e71a9f3f08 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java @@ -106,6 +106,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public String getMasterThrottlingKey() { + return "index-aliases"; + } + @Override public ClusterState execute(ClusterState currentState) { return applyAliasActions(currentState, request.actions()); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexStateService.java index 4f1000e3407fd..2aa42f92e376c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexStateService.java @@ -197,7 +197,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina new ClusterStateUpdateTask(Priority.URGENT) { private final Map blockedIndices = new HashMap<>(); - @Override public ClusterState execute(final ClusterState currentState) { return addIndexClosedBlocks(concreteIndices, blockedIndices, currentState); @@ -221,6 +220,11 @@ public void clusterStateProcessed(final String source, final ClusterState oldSta new ClusterStateUpdateTask(Priority.URGENT) { private final List indices = new ArrayList<>(); + @Override + public String getMasterThrottlingKey() { + return "close-indices"; + } + @Override public ClusterState execute(final ClusterState currentState) throws Exception { Tuple> closingResult = closeRoutingTable( @@ -980,6 +984,12 @@ private void onlyOpenIndex( clusterService.submitStateUpdateTask( "open-indices " + indicesAsString, new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + + @Override + public String getMasterThrottlingKey() { + return "open-indices"; + } + @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java index 9c734fd7b3bdc..840df35bfcb0b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java @@ -140,6 +140,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "remove-index-template"; + } + @Override public ClusterState execute(ClusterState currentState) { Set templateNames = new HashSet<>(); @@ -198,6 +203,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "create-component-template"; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { return addComponentTemplate(currentState, create, name, template); @@ -358,6 +368,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "remove-component-template"; + } + @Override public ClusterState execute(ClusterState currentState) { Set templateNames = new HashSet<>(); @@ -447,6 +462,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "create-index-template-v2"; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { return addIndexTemplateV2(currentState, create, name, template); @@ -764,6 +784,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "remove-index-template-v2"; + } + @Override public ClusterState execute(ClusterState currentState) { return innerRemoveIndexTemplateV2(currentState, name); @@ -868,6 +893,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "create-index-template"; + } + @Override public ClusterState execute(ClusterState currentState) throws Exception { validateTemplate(request.settings, request.mappings, indicesService, xContentRegistry); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java index 7f67c45fc80e5..ef533ebc23f63 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java @@ -109,6 +109,11 @@ public ClusterTasksResult execute(ClusterState currentState, Listbuilder().successes(tasks).build(newClusterState); } + + @Override + public String getMasterThrottlingKey() { + return "refresh-mapping"; + } } /** @@ -246,6 +251,11 @@ public ClusterTasksResult execute( } } + @Override + public String getMasterThrottlingKey() { + return "put-mapping"; + } + private ClusterState applyRequest( ClusterState currentState, PutMappingClusterStateUpdateRequest request, diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 1390860271577..f565181fc7c65 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -157,6 +157,11 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } + @Override + public String getMasterThrottlingKey() { + return "update-settings"; + } + @Override public ClusterState execute(ClusterState currentState) { diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 0f9106f6de0c9..6ee9ff2596db1 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -45,6 +45,7 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; import org.opensearch.cluster.ClusterStateTaskListener; +import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.coordination.ClusterStatePublisher; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.metadata.Metadata; @@ -121,6 +122,7 @@ public class MasterService extends AbstractLifecycleComponent { private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor; private volatile Batcher taskBatcher; + protected final MasterTaskThrottler masterTaskThrottler; public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); @@ -131,6 +133,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP this::setSlowTaskLoggingThreshold ); + this.masterTaskThrottler = new MasterTaskThrottler(clusterSettings, this); this.threadPool = threadPool; } @@ -181,15 +184,41 @@ protected void onTimeout(List tasks, TimeValue timeout) { ) ) ); + String masterThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey).getMasterThrottlingKey(); + masterTaskThrottler.release(masterThrottlingKey, tasks.size()); } @Override protected void run(Object batchingKey, List tasks, String tasksSummary) { + // All the batched tasks will have same executor so once they are executed in batch + // we will release all batch's permit from master task throttling. + String masterThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey).getMasterThrottlingKey(); + masterTaskThrottler.release(masterThrottlingKey, tasks.size()); ClusterStateTaskExecutor taskExecutor = (ClusterStateTaskExecutor) batchingKey; List updateTasks = (List) tasks; runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary)); } + @Override + public void submitTasks( + List tasks, @Nullable TimeValue timeout) throws OpenSearchRejectedExecutionException { + assert tasks.size() > 0; + String masterThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey).getMasterThrottlingKey(); + if(masterTaskThrottler.acquire(masterThrottlingKey, tasks.size())) { + try { + super.submitTasks(tasks, timeout); + } catch (Exception e) { + masterTaskThrottler.release(masterThrottlingKey, tasks.size()); + throw e; + } + } else { + logger.warn("Throwing Throttling Exception for [{}]. Trying to acquire [{}] permits, limit is set to [{}]", + tasks.get(0).getTask().getClass(), tasks.size(), masterTaskThrottler.getThrottlingLimit( + masterThrottlingKey)); + throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for " + tasks.get(0).getTask().getClass()); + } + } + class UpdateTask extends BatchedTask { final ClusterStateTaskListener listener; @@ -570,6 +599,13 @@ public List pendingTasks() { }).collect(Collectors.toList()); } + /** + * Returns the number of throttled pending tasks. + */ + public long numberOfThrottledPendingTasks() { + return masterTaskThrottler.getThrottlingStats().getTotalThrottledTaskCount(); + } + /** * Returns the number of currently pending tasks. */ diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/MasterTaskThrottler.java new file mode 100644 index 0000000000000..2a0228a7cf505 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/MasterTaskThrottler.java @@ -0,0 +1,137 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest; +import org.opensearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; +import org.opensearch.action.support.master.MasterNodeRequest; +import org.opensearch.cluster.ack.AckedRequest; +import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.metadata.MetadataUpdateSettingsService; +import org.opensearch.cluster.routing.DelayedAllocationService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.snapshots.UpdateIndexShardSnapshotStatusRequest; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class is extension of {@link Throttler} and does throttling of master tasks. + * + * This class does throttling on task submission to master, it uses class name of request of tasks as key for + * throttling. Throttling will be performed over task executor's class level, different task types have different executors class. + * + * Set specific setting to for setting the threshold of throttling of particular task type. + * e.g : Set "master.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks, + * Set it to default value(-1) to disable the throttling for this task type. + */ +public class MasterTaskThrottler extends Throttler { + private static final Logger logger = LogManager.getLogger(MasterTaskThrottler.class); + + public static final Setting THRESHOLD_SETTINGS = + Setting.groupSetting("master.throttling.thresholds.", Setting.Property.Dynamic, Setting.Property.NodeScope); + + /** + * To configure more task for throttling, override getMasterThrottlingKey method with task name in task executor. + * Verify that throttled tasks would be retry. + * + * Added retry mechanism in TransportMasterNodeAction so it would be retried for customer generated tasks. + */ + public static Set CONFIGURED_TASK_FOR_THROTTLING = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + "update-settings", "cluster-update-settings", + "create-index", "auto-create", + "delete-index", "delete-dangling-index", + "create-data-stream", "remove-data-stream", + "rollover-index", "index-aliases", + "put-mapping", "refresh-mapping", + "close-indices", "open-indices", + "create-index-template", "remove-index-template", + "create-component-template", "remove-component-template", + "create-index-template-v2", "remove-index-template-v2", + "put-pipeline", "delete-pipeline", + "create-persistent-task", "finish-persistent-task", "remove-persistent-task","update-task-state", + "put-script", "delete-script", + "put_repository", "delete_repository", + "create-snapshot","delete-snapshot","update-snapshot-state", + "restore_snapshot", + "cluster-reroute-api" + ))); + + private final int DEFAULT_THRESHOLD_VALUE = -1; // Disabled throttling + private final MasterThrottlingStats throttlingStats = new MasterThrottlingStats(); + private final MasterService masterService; + + public MasterTaskThrottler(final ClusterSettings clusterSettings, final MasterService masterService) { + super(true); + clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting); + this.masterService = masterService; + } + + public void validateSetting(final Settings settings) { + /** + * TODO: Change the version number of check as per version in which this change will be merged. + */ + if(masterService.state().nodes().getMinNodeVersion().compareTo(Version.V_1_3_0) < 0) { + throw new IllegalArgumentException("All the nodes in cluster should be on version later than or equal to 1.3.0"); + } + Map groups = settings.getAsGroups(); + for(String key : groups.keySet()) { + if(!CONFIGURED_TASK_FOR_THROTTLING.contains(key)) { + throw new IllegalArgumentException("Master task throttling is not configured for given task type: " + key); + } + int threshold = groups.get(key).getAsInt("value", DEFAULT_THRESHOLD_VALUE); + if(threshold < DEFAULT_THRESHOLD_VALUE) { + throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling"); + } + } + } + + public void updateSetting(final Settings settings) { + Map groups = settings.getAsGroups(); + for(String key : groups.keySet()) { + updateLimit(key, groups.get(key).getAsInt("value", DEFAULT_THRESHOLD_VALUE)); + } + } + + @Override + public boolean acquire(final String type, final int permits) { + boolean ableToAcquire = super.acquire(type, permits); + if(!ableToAcquire) { + throttlingStats.incrementThrottlingCount(type, permits); + } + return ableToAcquire; + } + + public MasterThrottlingStats getThrottlingStats() { + return throttlingStats; + } + + protected void updateLimit(final String className, final int limit) { + assert limit >= DEFAULT_THRESHOLD_VALUE; + if(limit == DEFAULT_THRESHOLD_VALUE) { + super.removeThrottlingLimit(className); + } else { + super.updateThrottlingLimit(className, limit); + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterTaskThrottlingException.java b/server/src/main/java/org/opensearch/cluster/service/MasterTaskThrottlingException.java new file mode 100644 index 0000000000000..167fc2001b69a --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/MasterTaskThrottlingException.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * Exception raised from master node due to task throttling. + */ +public class MasterTaskThrottlingException extends OpenSearchException { + + public MasterTaskThrottlingException(String msg, Object... args) { + super(msg, args); + } + + public MasterTaskThrottlingException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterThrottlingStats.java b/server/src/main/java/org/opensearch/cluster/service/MasterThrottlingStats.java new file mode 100644 index 0000000000000..d8adc5469d6d8 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/MasterThrottlingStats.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.common.metrics.CounterMetric; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Contains stats of Master Task Throttling. + * It stores the total cumulative count of throttled tasks per task type. + */ +public class MasterThrottlingStats { + + private Map throttledTasksCount = new ConcurrentHashMap<>(); + + public void incrementThrottlingCount(String type, final int permits) { + if(!throttledTasksCount.containsKey(type)) { + throttledTasksCount.put(type, new CounterMetric()); + } + throttledTasksCount.get(type).inc(permits); + } + + public long getThrottlingCount(Class type) { + return throttledTasksCount.get(type).count(); + } + + public long getTotalThrottledTaskCount() { + CounterMetric totalCount = new CounterMetric(); + throttledTasksCount.forEach((aClass, counterMetric) -> {totalCount.inc(counterMetric.count());}); + return totalCount.count(); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index e04c8617ecd33..ec6d7d82a6a52 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -75,6 +75,8 @@ public void submitTasks(List tasks, @Nullable TimeValue t final BatchedTask firstTask = tasks.get(0); assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; + assert tasks.stream().allMatch(t -> t.getTask().getClass() == firstTask.getTask().getClass()): + "tasks submitted in a batch should be of same class: " + tasks; // convert to an identity map to check for dups based on task identity final Map tasksIdentity = tasks.stream() .collect( diff --git a/server/src/main/java/org/opensearch/cluster/service/Throttler.java b/server/src/main/java/org/opensearch/cluster/service/Throttler.java new file mode 100644 index 0000000000000..61bd3ca0d682c --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/Throttler.java @@ -0,0 +1,117 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.common.AdjustableSemaphore; + +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Base class for Throttling logic. + * It provides throttling functionality over multiple keys. + * It provides the functionality of enable/disable throttling using enableThrottling variable. + * + * @param the type of key on which we want to do throttling. + */ +public class Throttler { + protected ConcurrentMap semaphores = new ConcurrentHashMap(); + + private boolean throttlingEnabled; + + public Throttler(final boolean throttlingEnabled) { + this.throttlingEnabled = throttlingEnabled; + } + + /** + * Method to acquire permits for a key type. + * If throttling is disabled, it will always return True, + * else it will return true if permits can be acquired within threshold limits. + * + * If threshold is not set for key then also it will return True. + * + * @param type Key for which we want to acquire permits. + * @param permits Number of permits to acquire. + * @return boolean representing was it able to acquire the permits or not. + */ + public boolean acquire(final T type, final int permits) { + assert permits > 0; + AdjustableSemaphore semaphore = semaphores.get(type); + if(throttlingEnabled && Objects.nonNull(semaphore)) { + return semaphore.tryAcquire(permits); + } + return true; + } + + /** + * Release the given permits for given type. + * + * @param type key for which we want to release permits. + * @param permits number of permits to release. + */ + public void release(final T type, final int permits) { + assert permits > 0; + AdjustableSemaphore semaphore = semaphores.get(type); + if(throttlingEnabled && Objects.nonNull(semaphore)) { + semaphore.release(permits); + assert semaphore.availablePermits() <= semaphore.getMaxPermits(); + } + } + + /** + * Update the Threshold for throttling for given type. + * + * @param key Key for which we want to update limit. + * @param newLimit Updated limit. + */ + public synchronized void updateThrottlingLimit(final T key, final Integer newLimit) { + assert newLimit >= 0; + if(semaphores.containsKey(key)) { + semaphores.get(key).setMaxPermits(newLimit); + } else { + semaphores.put(key, new AdjustableSemaphore(newLimit)); + } + } + + /** + * Remove the threshold for given key. + * Throttler will no longer do throttling for given key. + * + * @param key Key for which we want to remove throttling. + */ + public synchronized void removeThrottlingLimit(final T key) { + assert semaphores.containsKey(key); + semaphores.remove(key); + } + + /** + * Set flag for enabling/disabling the throttling logic. + * Clear the state of previous semaphores with each update. + * + * @param throttlingEnabled flag repressing enabled/disabled throttling. + */ + public synchronized void setThrottlingEnabled(final boolean throttlingEnabled) { + this.throttlingEnabled = throttlingEnabled; + for(T key: semaphores.keySet()) { + semaphores.put(key, new AdjustableSemaphore(semaphores.get(key).getMaxPermits())); + } + } + + public Integer getThrottlingLimit(final T key) { + if(semaphores.containsKey(key)) { + return semaphores.get(key).getMaxPermits(); + } + return null; + } + + public boolean isThrottlingEnabled() { + return throttlingEnabled; + } +} diff --git a/server/src/main/java/org/opensearch/common/AdjustableSemaphore.java b/server/src/main/java/org/opensearch/common/AdjustableSemaphore.java new file mode 100644 index 0000000000000..2d4385edf0bc8 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/AdjustableSemaphore.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import java.util.concurrent.Semaphore; + +/** + * AdjustableSemaphore is Extended Semphore where we can change maxPermits. + */ +public class AdjustableSemaphore extends Semaphore { + private final Object maxPermitsMutex = new Object(); + private int maxPermits; + + public AdjustableSemaphore(int maxPermits) { + super(maxPermits); + this.maxPermits = maxPermits; + } + + /** + * Update the maxPermits in semaphore + */ + public void setMaxPermits(int permits) { + synchronized (maxPermitsMutex) { + final int diff = Math.subtractExact(permits, maxPermits); + if (diff > 0) { + // add permits + release(diff); + } else if (diff < 0) { + // remove permits + reducePermits(Math.negateExact(diff)); + } + maxPermits = permits; + } + } + + /** + * Returns maxPermits. + */ + public int getMaxPermits() { + return maxPermits; + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index be92bf1643aee..c752337493e17 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -82,6 +82,7 @@ import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.MasterService; +import org.opensearch.cluster.service.MasterTaskThrottler; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.network.NetworkService; @@ -568,7 +569,8 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureMemoryManager.THROUGHPUT_DEGRADATION_LIMITS, ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, - IndexingPressure.MAX_INDEXING_BYTES + IndexingPressure.MAX_INDEXING_BYTES, + MasterTaskThrottler.THRESHOLD_SETTINGS ) ) ); diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index b8256fe896da4..bb2a91fcdbca4 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -291,6 +291,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { return innerDelete(request, currentState); } + + @Override + public String getMasterThrottlingKey() { + return "delete-pipeline"; + } } ); } @@ -385,6 +390,11 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { return innerPut(request, currentState); } + + @Override + public String getMasterThrottlingKey() { + return "put-pipeline"; + } } ); } diff --git a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java index b76fc1b93915e..216f449aefa73 100644 --- a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java @@ -144,6 +144,11 @@ public ClusterState execute(ClusterState currentState) { return update(currentState, builder.addTask(taskId, taskName, taskParams, assignment)); } + @Override + public String getMasterThrottlingKey() { + return "create-persistent-task"; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -203,6 +208,11 @@ public ClusterState execute(ClusterState currentState) { } } + @Override + public String getMasterThrottlingKey() { + return "finish-persistent-task"; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -234,6 +244,11 @@ public ClusterState execute(ClusterState currentState) { } } + @Override + public String getMasterThrottlingKey() { + return "remove-persistent-task"; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -277,6 +292,11 @@ public ClusterState execute(ClusterState currentState) { } } + @Override + public String getMasterThrottlingKey() { + return "update-task-state"; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 5b4e30f8495e8..694c8f07f1df1 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -229,6 +229,11 @@ public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).metadata(mdBuilder).build(); } + @Override + public String getMasterThrottlingKey() { + return "put_repository"; + } + @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", request.name()), e); @@ -290,6 +295,11 @@ public ClusterState execute(ClusterState currentState) { throw new RepositoryMissingException(request.name()); } + @Override + public String getMasterThrottlingKey() { + return "delete_repository"; + } + @Override public boolean mustAck(DiscoveryNode discoveryNode) { // repository was created on both cluster-manager and data nodes diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 6eaec491c8177..a10230a606e4e 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -439,6 +439,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public TimeValue timeout() { return updateTask.timeout(); } + + @Override + public String getMasterThrottlingKey() { + return updateTask.getMasterThrottlingKey(); + } }); }, onFailure)); } diff --git a/server/src/main/java/org/opensearch/script/ScriptService.java b/server/src/main/java/org/opensearch/script/ScriptService.java index 303fc5ccbcf88..0867b2348b9ab 100644 --- a/server/src/main/java/org/opensearch/script/ScriptService.java +++ b/server/src/main/java/org/opensearch/script/ScriptService.java @@ -604,6 +604,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).metadata(mdb).build(); } + + @Override + public String getMasterThrottlingKey() { + return "put-script"; + } } ); } @@ -630,6 +635,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).metadata(mdb).build(); } + + @Override + public String getMasterThrottlingKey() { + return "delete-script"; + } } ); } diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index cc4b8e556a3c7..cca81477a44a5 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -297,6 +297,11 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi final String restoreUUID = UUIDs.randomBase64UUID(); RestoreInfo restoreInfo = null; + @Override + public String getMasterThrottlingKey() { + return "restore_snapshot"; + } + @Override public ClusterState execute(ClusterState currentState) { RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 106359c485f86..08ca48a5820d7 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -343,6 +343,11 @@ public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); } + @Override + public String getMasterThrottlingKey() { + return "create-snapshot"; + } + @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); @@ -527,6 +532,11 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } + @Override + public String getMasterThrottlingKey() { + return "create-snapshot"; + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { try { @@ -2273,6 +2283,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { .build(); } + @Override + public String getMasterThrottlingKey() { + return "delete-snapshot"; + } + @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -3233,182 +3248,80 @@ public boolean assertAllListenersResolved() { * * Package private to allow for tests. */ - static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> { - int changedCount = 0; - int startedCount = 0; - final List entries = new ArrayList<>(); - final String localNodeId = currentState.nodes().getLocalNodeId(); - // Tasks to check for updates for running snapshots. - final List unconsumedTasks = new ArrayList<>(tasks); - // Tasks that were used to complete an existing in-progress shard snapshot - final Set executedTasks = new HashSet<>(); - // Outer loop over all snapshot entries in the order they were created in - for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { - if (entry.state().completed()) { - // completed snapshots do not require any updates so we just add them to the new list and keep going - entries.add(entry); - continue; - } - ImmutableOpenMap.Builder shards = null; - ImmutableOpenMap.Builder clones = null; - Map indicesLookup = null; - // inner loop over all the shard updates that are potentially applicable to the current snapshot entry - for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext();) { - final ShardSnapshotUpdate updateSnapshotState = iterator.next(); - final Snapshot updatedSnapshot = updateSnapshotState.snapshot; - final String updatedRepository = updatedSnapshot.getRepository(); - if (entry.repository().equals(updatedRepository) == false) { - // the update applies to a different repository so it is irrelevant here + static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = new ClusterStateTaskExecutor() { + + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + int changedCount = 0; + int startedCount = 0; + final List entries = new ArrayList<>(); + final String localNodeId = currentState.nodes().getLocalNodeId(); + // Tasks to check for updates for running snapshots. + final List unconsumedTasks = new ArrayList<>(tasks); + // Tasks that were used to complete an existing in-progress shard snapshot + final Set executedTasks = new HashSet<>(); + // Outer loop over all snapshot entries in the order they were created in + for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { + if (entry.state().completed()) { + // completed snapshots do not require any updates so we just add them to the new list and keep going + entries.add(entry); continue; } - if (updateSnapshotState.isClone()) { - // The update applied to a shard clone operation - final RepositoryShardId finishedShardId = updateSnapshotState.repoShardId; - if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { - assert entry.isClone() : "Non-clone snapshot [" - + entry - + "] received update for clone [" - + updateSnapshotState - + "]"; - final ShardSnapshotStatus existing = entry.clones().get(finishedShardId); - if (existing == null) { - logger.warn( - "Received clone shard snapshot status update [{}] but this shard is not tracked in [{}]", - updateSnapshotState, - entry - ); - assert false - : "This should never happen, cluster-manager will not submit a state update for a non-existing clone"; - continue; - } - if (existing.state().completed()) { - // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. - iterator.remove(); - continue; - } - logger.trace( - "[{}] Updating shard clone [{}] with status [{}]", - updatedSnapshot, - finishedShardId, - updateSnapshotState.updatedState.state() - ); - if (clones == null) { - clones = ImmutableOpenMap.builder(entry.clones()); - } - changedCount++; - clones.put(finishedShardId, updateSnapshotState.updatedState); - executedTasks.add(updateSnapshotState); - } else if (executedTasks.contains(updateSnapshotState)) { - // the update was already executed on the clone operation it applied to, now we check if it may be possible to - // start a shard snapshot or clone operation on the current entry - if (entry.isClone()) { - // current entry is a clone operation - final ShardSnapshotStatus existingStatus = entry.clones().get(finishedShardId); - if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { - continue; - } - if (clones == null) { - clones = ImmutableOpenMap.builder(entry.clones()); - } - final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; - logger.trace( - "Starting clone [{}] on [{}] with generation [{}]", - finishedShardId, - finishedStatus.nodeId(), - finishedStatus.generation() - ); - assert finishedStatus.nodeId().equals(localNodeId) : "Clone updated with node id [" - + finishedStatus.nodeId() - + "] but local node id is [" - + localNodeId + ImmutableOpenMap.Builder shards = null; + ImmutableOpenMap.Builder clones = null; + Map indicesLookup = null; + // inner loop over all the shard updates that are potentially applicable to the current snapshot entry + for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext();) { + final ShardSnapshotUpdate updateSnapshotState = iterator.next(); + final Snapshot updatedSnapshot = updateSnapshotState.snapshot; + final String updatedRepository = updatedSnapshot.getRepository(); + if (entry.repository().equals(updatedRepository) == false) { + // the update applies to a different repository so it is irrelevant here + continue; + } + if (updateSnapshotState.isClone()) { + // The update applied to a shard clone operation + final RepositoryShardId finishedShardId = updateSnapshotState.repoShardId; + if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { + assert entry.isClone() : "Non-clone snapshot [" + + entry + + "] received update for clone [" + + updateSnapshotState + "]"; - clones.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); - iterator.remove(); - } else { - // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id - final IndexMetadata indexMeta = currentState.metadata().index(finishedShardId.indexName()); - if (indexMeta == null) { - // The index name that finished cloning does not exist in the cluster state so it isn't relevant to a - // normal snapshot + final ShardSnapshotStatus existing = entry.clones().get(finishedShardId); + if (existing == null) { + logger.warn( + "Received clone shard snapshot status update [{}] but this shard is not tracked in [{}]", + updateSnapshotState, + entry + ); + assert false + : "This should never happen, cluster-manager will not submit a state update for a non-existing clone"; continue; } - final ShardId finishedRoutingShardId = new ShardId(indexMeta.getIndex(), finishedShardId.shardId()); - final ShardSnapshotStatus existingStatus = entry.shards().get(finishedRoutingShardId); - if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + if (existing.state().completed()) { + // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. + iterator.remove(); continue; } - if (shards == null) { - shards = ImmutableOpenMap.builder(entry.shards()); - } - final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; logger.trace( - "Starting [{}] on [{}] with generation [{}]", + "[{}] Updating shard clone [{}] with status [{}]", + updatedSnapshot, finishedShardId, - finishedStatus.nodeId(), - finishedStatus.generation() - ); - // A clone was updated, so we must use the correct data node id for the reassignment as actual shard - // snapshot - final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone( - currentState, - updateSnapshotState.updatedState.generation(), - finishedRoutingShardId + updateSnapshotState.updatedState.state() ); - shards.put(finishedRoutingShardId, shardSnapshotStatus); - if (shardSnapshotStatus.isActive()) { - // only remove the update from the list of tasks that might hold a reusable shard if we actually - // started a snapshot and didn't just fail - iterator.remove(); - } - } - } - } else { - // a (non-clone) shard snapshot operation was updated - final ShardId finishedShardId = updateSnapshotState.shardId; - if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { - final ShardSnapshotStatus existing = entry.shards().get(finishedShardId); - if (existing == null) { - logger.warn( - "Received shard snapshot status update [{}] but this shard is not tracked in [{}]", - updateSnapshotState, - entry - ); - assert false : "This should never happen, data nodes should only send updates for expected shards"; - continue; - } - if (existing.state().completed()) { - // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. - iterator.remove(); - continue; - } - logger.trace( - "[{}] Updating shard [{}] with status [{}]", - updatedSnapshot, - finishedShardId, - updateSnapshotState.updatedState.state() - ); - if (shards == null) { - shards = ImmutableOpenMap.builder(entry.shards()); - } - shards.put(finishedShardId, updateSnapshotState.updatedState); - executedTasks.add(updateSnapshotState); - changedCount++; - } else if (executedTasks.contains(updateSnapshotState)) { - // We applied the update for a shard snapshot state to its snapshot entry, now check if we can update - // either a clone or a snapshot - if (entry.isClone()) { - // Since we updated a normal snapshot we need to translate its shard ids to repository shard ids which requires - // a lookup for the index ids - if (indicesLookup == null) { - indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); + if (clones == null) { + clones = ImmutableOpenMap.builder(entry.clones()); } - // shard snapshot was completed, we check if we can start a clone operation for the same repo shard - final IndexId indexId = indicesLookup.get(finishedShardId.getIndexName()); - // If the lookup finds the index id then at least the entry is concerned with the index id just updated - // so we check on a shard level - if (indexId != null) { - final RepositoryShardId repoShardId = new RepositoryShardId(indexId, finishedShardId.getId()); - final ShardSnapshotStatus existingStatus = entry.clones().get(repoShardId); + changedCount++; + clones.put(finishedShardId, updateSnapshotState.updatedState); + executedTasks.add(updateSnapshotState); + } else if (executedTasks.contains(updateSnapshotState)) { + // the update was already executed on the clone operation it applied to, now we check if it may be possible to + // start a shard snapshot or clone operation on the current entry + if (entry.isClone()) { + // current entry is a clone operation + final ShardSnapshotStatus existingStatus = entry.clones().get(finishedShardId); if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { continue; } @@ -3422,58 +3335,169 @@ public boolean assertAllListenersResolved() { finishedStatus.nodeId(), finishedStatus.generation() ); - clones.put(repoShardId, new ShardSnapshotStatus(localNodeId, finishedStatus.generation())); + assert finishedStatus.nodeId().equals(localNodeId) : "Clone updated with node id [" + + finishedStatus.nodeId() + + "] but local node id is [" + + localNodeId + + "]"; + clones.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); iterator.remove(); - startedCount++; + } else { + // current entry is a snapshot operation so we must translate the repository shard id to a routing shard id + final IndexMetadata indexMeta = currentState.metadata().index(finishedShardId.indexName()); + if (indexMeta == null) { + // The index name that finished cloning does not exist in the cluster state so it isn't relevant to a + // normal snapshot + continue; + } + final ShardId finishedRoutingShardId = new ShardId(indexMeta.getIndex(), finishedShardId.shardId()); + final ShardSnapshotStatus existingStatus = entry.shards().get(finishedRoutingShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace( + "Starting [{}] on [{}] with generation [{}]", + finishedShardId, + finishedStatus.nodeId(), + finishedStatus.generation() + ); + // A clone was updated, so we must use the correct data node id for the reassignment as actual shard + // snapshot + final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone( + currentState, + updateSnapshotState.updatedState.generation(), + finishedRoutingShardId + ); + shards.put(finishedRoutingShardId, shardSnapshotStatus); + if (shardSnapshotStatus.isActive()) { + // only remove the update from the list of tasks that might hold a reusable shard if we actually + // started a snapshot and didn't just fail + iterator.remove(); + } } - } else { - // shard snapshot was completed, we check if we can start another snapshot - final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); - if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + } + } else { + // a (non-clone) shard snapshot operation was updated + final ShardId finishedShardId = updateSnapshotState.shardId; + if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { + final ShardSnapshotStatus existing = entry.shards().get(finishedShardId); + if (existing == null) { + logger.warn( + "Received shard snapshot status update [{}] but this shard is not tracked in [{}]", + updateSnapshotState, + entry + ); + assert false : "This should never happen, data nodes should only send updates for expected shards"; continue; } - if (shards == null) { - shards = ImmutableOpenMap.builder(entry.shards()); + if (existing.state().completed()) { + // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. + iterator.remove(); + continue; } - final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; logger.trace( - "Starting [{}] on [{}] with generation [{}]", + "[{}] Updating shard [{}] with status [{}]", + updatedSnapshot, finishedShardId, - finishedStatus.nodeId(), - finishedStatus.generation() + updateSnapshotState.updatedState.state() ); - shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); - iterator.remove(); + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + shards.put(finishedShardId, updateSnapshotState.updatedState); + executedTasks.add(updateSnapshotState); + changedCount++; + } else if (executedTasks.contains(updateSnapshotState)) { + // We applied the update for a shard snapshot state to its snapshot entry, now check if we can update + // either a clone or a snapshot + if (entry.isClone()) { + // Since we updated a normal snapshot we need to translate its shard ids to repository shard ids which requires + // a lookup for the index ids + if (indicesLookup == null) { + indicesLookup = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); + } + // shard snapshot was completed, we check if we can start a clone operation for the same repo shard + final IndexId indexId = indicesLookup.get(finishedShardId.getIndexName()); + // If the lookup finds the index id then at least the entry is concerned with the index id just updated + // so we check on a shard level + if (indexId != null) { + final RepositoryShardId repoShardId = new RepositoryShardId(indexId, finishedShardId.getId()); + final ShardSnapshotStatus existingStatus = entry.clones().get(repoShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (clones == null) { + clones = ImmutableOpenMap.builder(entry.clones()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace( + "Starting clone [{}] on [{}] with generation [{}]", + finishedShardId, + finishedStatus.nodeId(), + finishedStatus.generation() + ); + clones.put(repoShardId, new ShardSnapshotStatus(localNodeId, finishedStatus.generation())); + iterator.remove(); + startedCount++; + } + } else { + // shard snapshot was completed, we check if we can start another snapshot + final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; + } + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState; + logger.trace( + "Starting [{}] on [{}] with generation [{}]", + finishedShardId, + finishedStatus.nodeId(), + finishedStatus.generation() + ); + shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); + iterator.remove(); + } } } } - } - final SnapshotsInProgress.Entry updatedEntry; - if (shards != null) { - assert clones == null : "Should not have updated clones when updating shard snapshots but saw " - + clones - + " as well as " - + shards; - updatedEntry = entry.withShardStates(shards.build()); - } else if (clones != null) { - updatedEntry = entry.withClones(clones.build()); - } else { - updatedEntry = entry; + final SnapshotsInProgress.Entry updatedEntry; + if (shards != null) { + assert clones == null : "Should not have updated clones when updating shard snapshots but saw " + + clones + + " as well as " + + shards; + updatedEntry = entry.withShardStates(shards.build()); + } else if (clones != null) { + updatedEntry = entry.withClones(clones.build()); + } else { + updatedEntry = entry; + } + entries.add(updatedEntry); + } + if (changedCount > 0) { + logger.trace( + "changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + "[{}] shard snapshots", + changedCount, + startedCount + ); + return ClusterStateTaskExecutor.ClusterTasksResult.builder() + .successes(tasks) + .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build()); } - entries.add(updatedEntry); + return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(currentState); } - if (changedCount > 0) { - logger.trace( - "changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + "[{}] shard snapshots", - changedCount, - startedCount - ); - return ClusterStateTaskExecutor.ClusterTasksResult.builder() - .successes(tasks) - .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build()); + + @Override + public String getMasterThrottlingKey() { + return "update-snapshot-state"; } - return ClusterStateTaskExecutor.ClusterTasksResult.builder().successes(tasks).build(currentState); }; /** diff --git a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java index 1b7d848b626fe..c4348bbb53167 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java @@ -75,4 +75,27 @@ public void testWrapBackoffPolicy() { assertEquals(expectedRetries, retries.get()); } } + + public void testEqualJitterExponentialBackOffPolicy() { + int baseDelay = 10; + int maxDelay = 10000; + BackoffPolicy policy = BackoffPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay); + Iterator iterator = policy.iterator(); + + // Assert equal jitter + int retriesTillMaxDelay = 10; + for(int i=0 ; i < retriesTillMaxDelay ; i++) { + TimeValue delay = iterator.next(); + assertTrue(delay.getMillis()>= baseDelay * (1L << i) / 2); + assertTrue(delay.getMillis()<= baseDelay * (1L << i)); + } + + // Now policy should return max delay for next retries. + int retriesAfterMaxDelay = randomInt(10); + for (int i=0; i= maxDelay/2); + assertTrue(delay.getMillis() <= maxDelay); + } + } } diff --git a/server/src/test/java/org/opensearch/action/support/master/MasterThrottlingRetryListenerTests.java b/server/src/test/java/org/opensearch/action/support/master/MasterThrottlingRetryListenerTests.java new file mode 100644 index 0000000000000..e02a2f821a460 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/master/MasterThrottlingRetryListenerTests.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.master; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; +import org.opensearch.cluster.service.MasterTaskThrottlingException; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.Scheduler; + +import java.time.Instant; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.AfterClass; +import org.junit.Before; + +/** + * Test class of {@link MasterThrottlingRetryListener} + */ +public class MasterThrottlingRetryListenerTests extends OpenSearchTestCase { + private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler); + } + + @AfterClass + public static void afterClass() { + Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS); + } + + public void testRetryForLocalRequest() throws BrokenBarrierException, InterruptedException { + TransportMasterNodeActionTests.Request request = new TransportMasterNodeActionTests.Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicBoolean callBackExecuted = new AtomicBoolean(); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + callBackExecuted.set(true); + barrier.await(); + } catch (Exception e) { + new AssertionError(); + } + } + }; + + ActionListener taskRetryListener = + new MasterThrottlingRetryListener("test", request, runnable, listener); + + taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")); + barrier.await(); + assertTrue(callBackExecuted.get()); + } + + public void testRetryForRemoteRequest() throws BrokenBarrierException, InterruptedException { + TransportMasterNodeActionTests.Request request = new TransportMasterNodeActionTests.Request(); + request.setRemoteRequest(true); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean callBackExecuted = new AtomicBoolean(false); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + callBackExecuted.set(true); + } catch (Exception e) { + new AssertionError(); + } + } + }; + + ActionListener taskRetryListener = + new MasterThrottlingRetryListener("test", request, runnable, listener); + + taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")); + Thread.sleep(100); // some buffer time so callback can execute. + assertFalse(callBackExecuted.get()); + } + + public void testTimedOut() throws BrokenBarrierException, InterruptedException { + + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicBoolean onFailureExecuted = new AtomicBoolean(); + AtomicBoolean retryExecuted = new AtomicBoolean(); + AtomicBoolean firstExecute = new AtomicBoolean(true); + int timeOutSec = randomIntBetween(1, 5); + final Instant[] startTime = new Instant[1]; + final Instant[] endTime = new Instant[1]; + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(Object o) { + new AssertionError(); + } + + @Override + public void onFailure(Exception e) { + endTime[0] = Instant.now(); + try { + onFailureExecuted.set(true); + barrier.await(); + } catch (Exception exe) { + new AssertionError(); + } + assertEquals(ProcessClusterEventTimeoutException.class, e.getClass()); + } + }; + TransportMasterNodeActionTests.Request request = + new TransportMasterNodeActionTests.Request().masterNodeTimeout(TimeValue.timeValueSeconds(timeOutSec)); + + class TestRetryClass { + ActionListener listener; + TestRetryClass(ActionListener listener) { + this.listener = new MasterThrottlingRetryListener("test", request, this::execute, listener); + } + public void execute() { + if(firstExecute.getAndSet(false)) { + startTime[0] = Instant.now(); + } + listener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")); + } + } + + TestRetryClass testRetryClass = new TestRetryClass(listener); + testRetryClass.execute(); + + barrier.await(); + assertEquals(timeOutSec, (endTime[0].toEpochMilli() - startTime[0].toEpochMilli())/1000); + assertTrue(onFailureExecuted.get()); + assertFalse(retryExecuted.get()); + } + + public void testRetryForDifferentException() { + + TransportMasterNodeActionTests.Request request = new TransportMasterNodeActionTests.Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean callBackExecuted = new AtomicBoolean(); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + callBackExecuted.set(true); + } catch (Exception e) { + new AssertionError(); + } + } + }; + + ActionListener taskRetryListener = + new MasterThrottlingRetryListener("test", request, runnable, listener); + + taskRetryListener.onFailure(new Exception()); + assertFalse(callBackExecuted.get()); + + taskRetryListener.onFailure(new OpenSearchRejectedExecutionException("Different Exception")); + assertFalse(callBackExecuted.get()); + } +} diff --git a/server/src/test/java/org/opensearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/master/TransportMasterNodeActionTests.java index 1dd44f3186657..8aa1a1b7e9d59 100644 --- a/server/src/test/java/org/opensearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/master/TransportMasterNodeActionTests.java @@ -53,6 +53,7 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.service.MasterTaskThrottlingException; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.settings.Settings; @@ -66,6 +67,7 @@ import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.Scheduler; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.TransportService; import org.junit.After; @@ -80,6 +82,10 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ScheduledThreadPoolExecutor; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -95,6 +101,7 @@ public class TransportMasterNodeActionTests extends OpenSearchTestCase { private DiscoveryNode localNode; private DiscoveryNode remoteNode; private DiscoveryNode[] allNodes; + private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY); @BeforeClass public static void beforeClass() { @@ -132,6 +139,7 @@ public void setUp() throws Exception { Version.CURRENT ); allNodes = new DiscoveryNode[] { localNode, remoteNode }; + MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler); } @After @@ -144,6 +152,7 @@ public void tearDown() throws Exception { @AfterClass public static void afterClass() { ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS); threadPool = null; } @@ -157,7 +166,7 @@ void assertListenerThrows(String msg, ActionFuture listener, Class klass) } public static class Request extends MasterNodeRequest { - Request() {} + public Request() {} Request(StreamInput in) throws IOException { super(in); @@ -561,4 +570,73 @@ public void testDelegateToMasterOnNodeWithDeprecatedMasterRole() throws Executio assertTrue(listener.isDone()); assertThat(listener.get(), equalTo(response)); } + + public void testThrottlingRetryLocalMaster() throws InterruptedException, BrokenBarrierException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean exception = new AtomicBoolean(true); + AtomicBoolean retried = new AtomicBoolean(false); + CyclicBarrier barrier = new CyclicBarrier(2); + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[]{localNode})); + + TransportMasterNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + if(exception.getAndSet(false)) { + throw new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"); + } else { + try{ + retried.set(true); + barrier.await(); + } catch (Exception e) { + throw new AssertionError(); + } + } + } + }; + action.execute(request, listener); + + barrier.await(); + assertTrue(retried.get()); + assertFalse(exception.get()); + } + + public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException { + Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(60)); + DiscoveryNode masterNode = this.remoteNode; + setState( + clusterService, + // use a random base version so it can go down when simulating a restart. + ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, new DiscoveryNode[]{localNode, masterNode})) + .version(randomIntBetween(0, 10)) + ); + + PlainActionFuture listener = new PlainActionFuture<>(); + TransportMasterNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool); + action.execute(request, listener); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0]; + assertTrue(capturedRequest.node.isMasterNode()); + assertThat(capturedRequest.request, equalTo(request)); + assertThat(capturedRequest.action, equalTo("internal:testAction")); + transport.handleRemoteError(capturedRequest.requestId, + new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test")); + + assertFalse(listener.isDone()); + + //waiting for retry to trigger + Thread.sleep(100); + + // Retry for above throttling exception + capturedRequests = transport.getCapturedRequestsAndClear(); + assertThat(capturedRequests.length, equalTo(1)); + capturedRequest = capturedRequests[0]; + Response response = new Response(); + transport.handleResponse(capturedRequest.requestId, response); + + assertTrue(listener.isDone()); + listener.get(); + } } diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 845a5ee91052d..1b527436afe1d 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -688,6 +688,268 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } + public void testThrottlingForTaskSubmission() throws InterruptedException { + int throttlingLimit = randomIntBetween(1, 10); + int taskId = 1; + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch latch = new CountDownLatch(1); + final String taskName = "test"; + class Task { + private final int id; + + Task(int id) { + this.id = id; + } + } + + class TaskExecutor implements ClusterStateTaskExecutor { + private AtomicInteger published = new AtomicInteger(); + + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + latch.countDown(); + barrier.await(); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public String getMasterThrottlingKey() { + return taskName; + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { + published.incrementAndGet(); + } + } + + MasterService masterService = createMasterService(true); + masterService.masterTaskThrottler.setThrottlingEnabled(true); + masterService.masterTaskThrottler.updateThrottlingLimit(taskName, throttlingLimit); + + final ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + } + }; + + TaskExecutor executor = new TaskExecutor(); + // submit one task which will be execution, post that will submit throttlingLimit tasks. + try { + masterService.submitStateUpdateTask( + taskName, + new Task(taskId++), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + listener); + } catch (Exception e) { + throw new AssertionError(e); + } + // wait till task enter in execution. + latch.await(); + + for(int i = 0; i < throttlingLimit; i++) { + try { + masterService.submitStateUpdateTask( + taskName, + new Task(taskId++), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + listener); + } catch (Exception e) { + throw new AssertionError(e); + } + } + + // we have one task in execution and tasks in queue so next task should throttled. + final AtomicReference assertionRef = new AtomicReference<>(); + try { + masterService.submitStateUpdateTask( + taskName, + new Task(taskId++), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor, + listener); + } catch (MasterTaskThrottlingException e ){ + assertionRef.set(e); + } + assertNotNull(assertionRef.get()); + masterService.close(); + } + + public void testThrottlingForMultipleTaskTypes() throws InterruptedException { + int throttlingLimitForTask1 = randomIntBetween(1, 5); + int throttlingLimitForTask2 = randomIntBetween(1, 5); + int throttlingLimitForTask3 = randomIntBetween(1, 5); + int numberOfTask1 = randomIntBetween(throttlingLimitForTask1, 10); + int numberOfTask2 = randomIntBetween(throttlingLimitForTask2, 10); + int numberOfTask3 = randomIntBetween(throttlingLimitForTask3, 10); + class Task { + } + class Task1 extends Task { + } + class Task2 extends Task { + } + class Task3 extends Task { + } + + class Task1Executor implements ClusterStateTaskExecutor { + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + Thread.sleep(randomInt(1000)); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } + + @Override + public String getMasterThrottlingKey() { + return "Task1"; + } + } + + class Task2Executor implements ClusterStateTaskExecutor { + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + Thread.sleep(randomInt(1000)); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } + + @Override + public String getMasterThrottlingKey() { + return "Task2"; + } + } + + class Task3Executor implements ClusterStateTaskExecutor { + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + Thread.sleep(randomInt(1000)); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + + @Override + public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { } + + @Override + public String getMasterThrottlingKey() { + return "Task3"; + } + } + + MasterService masterService = createMasterService(true); + masterService.masterTaskThrottler.setThrottlingEnabled(true); + // configuring limits for Task1 and Task3. All task submission of Task2 should pass. + masterService.masterTaskThrottler.updateThrottlingLimit("Task1", throttlingLimitForTask1); + masterService.masterTaskThrottler.updateThrottlingLimit("Task3", throttlingLimitForTask3); + final CountDownLatch latch = new CountDownLatch(numberOfTask1 + numberOfTask2 + numberOfTask3); + AtomicInteger throttledTask1 = new AtomicInteger(); + AtomicInteger throttledTask2 = new AtomicInteger(); + AtomicInteger throttledTask3 = new AtomicInteger(); + AtomicInteger succeededTask1 = new AtomicInteger(); + AtomicInteger succeededTask2 = new AtomicInteger(); + AtomicInteger timedOutTask3 = new AtomicInteger(); + + + final ClusterStateTaskListener listener = new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + // Task3's timeout should have called this. + assertEquals("Task3", source); + timedOutTask3.incrementAndGet(); + latch.countDown(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if(source.equals("Task1")) { + succeededTask1.incrementAndGet(); + } else if(source.equals("Task2")) { + succeededTask2.incrementAndGet(); + } + latch.countDown(); + } + }; + Task1Executor executor1 = new Task1Executor(); + Task2Executor executor2 = new Task2Executor(); + Task3Executor executor3 = new Task3Executor(); + List threads = new ArrayList(); + for(int i = 0; i< numberOfTask1; i++) { + threads.add(new Thread(new Runnable() { + @Override + public void run() { + try { + masterService.submitStateUpdateTask( + "Task1", + new Task1(), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor1, + listener); + } catch (MasterTaskThrottlingException e) { + // Exception should be RejactedExecutionException. + throttledTask1.incrementAndGet(); + latch.countDown(); + } + } + })); + } + for(int i = 0; i< numberOfTask2; i++) { + threads.add(new Thread(new Runnable() { + @Override + public void run() { + try { + masterService.submitStateUpdateTask( + "Task2", + new Task2(), + ClusterStateTaskConfig.build(randomFrom(Priority.values())), + executor2, + listener); + } catch (MasterTaskThrottlingException e) { + throttledTask2.incrementAndGet(); + latch.countDown(); + } + } + })); + } + for(int i = 0; i< numberOfTask3; i++) { + threads.add(new Thread(new Runnable() { + @Override + public void run() { + try { + masterService.submitStateUpdateTask( + "Task3", + new Task3(), + ClusterStateTaskConfig.build(randomFrom(Priority.values()), new TimeValue(0)), + executor3, + listener); + } catch (MasterTaskThrottlingException e) { + throttledTask3.incrementAndGet(); + latch.countDown(); + } + } + })); + } + for(Thread thread : threads) { + thread.start(); + } + + // await for latch to clear + latch.await(); + + assertEquals(numberOfTask1, throttledTask1.get() + succeededTask1.get()); + assertEquals(numberOfTask2, succeededTask2.get()); + assertEquals(0, throttledTask2.get()); + assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get()); + masterService.close(); + } + public void testBlockingCallInClusterStateTaskListenerFails() throws InterruptedException { assumeTrue("assertions must be enabled for this test to work", BaseFuture.class.desiredAssertionStatus()); final CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterTaskThrottlerTests.java new file mode 100644 index 0000000000000..ad174753037c6 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/service/MasterTaskThrottlerTests.java @@ -0,0 +1,200 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.snapshots.UpdateIndexShardSnapshotStatusRequest; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import static org.opensearch.test.ClusterServiceUtils.setState; + +/** + * Contains tests for {@link MasterTaskThrottler} + */ +public class MasterTaskThrottlerTests extends OpenSearchTestCase { + + private static ThreadPool threadPool; + private ClusterService clusterService; + private DiscoveryNode localNode; + private DiscoveryNode[] allNodes; + + @BeforeClass + public static void beforeClass() { + threadPool = new TestThreadPool("TransportMasterNodeActionTests"); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + localNode = new DiscoveryNode("local_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.MASTER_ROLE), Version.V_7_10_3); + allNodes = new DiscoveryNode[]{localNode}; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + } + + + public void testDefaults() { + ClusterSettings clusterSettings = + new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + MasterTaskThrottler throttler = new MasterTaskThrottler(clusterSettings, clusterService.getMasterService()); + for(String key : MasterTaskThrottler.CONFIGURED_TASK_FOR_THROTTLING) { + assertNull(throttler.getThrottlingLimit(key)); + } + } + + public void testValidateSettingsForDifferentVersion() { + DiscoveryNode masterNode = new DiscoveryNode("local_master_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.MASTER_ROLE), Version.V_7_10_3); + DiscoveryNode dataNode = new DiscoveryNode("local_data_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.V_7_1_0); + setState(clusterService, ClusterStateCreationUtils.state(masterNode, masterNode, new DiscoveryNode[]{masterNode, dataNode})); + + ClusterSettings clusterSettings = + new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + MasterTaskThrottler throttler = new MasterTaskThrottler(clusterSettings, clusterService.getMasterService()); + + // set some limit for update snapshot tasks + int newLimit = randomIntBetween(1, 10); + + Settings newSettings = Settings.builder() + .put("master.throttling.thresholds.update_snapshot.value", newLimit) + .build(); + + AtomicBoolean exceptionThrown = new AtomicBoolean(); + try { + throttler.validateSetting(newSettings); + } catch (IllegalArgumentException e){ + exceptionThrown.set(true); + } + assertTrue(exceptionThrown.get()); + } + + public void testValidateSettingsForUnknownTask() { + DiscoveryNode masterNode = new DiscoveryNode("local_master_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.MASTER_ROLE), Version.V_7_10_3); + DiscoveryNode dataNode = new DiscoveryNode("local_data_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.V_7_10_3); + setState(clusterService, ClusterStateCreationUtils.state(masterNode, masterNode, new DiscoveryNode[]{masterNode, dataNode})); + + ClusterSettings clusterSettings = + new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + MasterTaskThrottler throttler = new MasterTaskThrottler(clusterSettings, clusterService.getMasterService()); + + // set some limit for update snapshot tasks + int newLimit = randomIntBetween(1, 10); + + Settings newSettings = Settings.builder() + .put("master.throttling.thresholds.random-task.value", newLimit) + .build(); + + AtomicBoolean exceptionThrown = new AtomicBoolean(); + try { + throttler.validateSetting(newSettings); + } catch (IllegalArgumentException e){ + exceptionThrown.set(true); + } + assertTrue(exceptionThrown.get()); + } + + public void testUpdateThrottlingLimitForHappyCase() { + DiscoveryNode masterNode = new DiscoveryNode("local_master_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.MASTER_ROLE), Version.V_7_10_3); + DiscoveryNode dataNode = new DiscoveryNode("local_data_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.V_7_10_3); + setState(clusterService, ClusterStateCreationUtils.state(masterNode, masterNode, new DiscoveryNode[]{masterNode, dataNode})); + + ClusterSettings clusterSettings = + new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + MasterTaskThrottler throttler = new MasterTaskThrottler(clusterSettings, clusterService.getMasterService()); + throttler.setThrottlingEnabled(true); + + // set some limit for update snapshot tasks + int newLimit = randomIntBetween(1, 10); + + Settings newSettings = Settings.builder() + .put("master.throttling.thresholds.create-index.value", newLimit) + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(newLimit, throttler.getThrottlingLimit("create-index").intValue()); + + // set update snapshot task limit to default + newSettings = Settings.builder() + .put("master.throttling.thresholds.create-index.value", -1) + .build(); + clusterSettings.applySettings(newSettings); + assertNull(throttler.getThrottlingLimit("create-index")); + } + + public void testValidateSettingForLimit() { + DiscoveryNode masterNode = new DiscoveryNode("local_master_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.MASTER_ROLE), Version.V_7_10_3); + DiscoveryNode dataNode = new DiscoveryNode("local_data_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), Version.V_7_10_3); + setState(clusterService, ClusterStateCreationUtils.state(masterNode, masterNode, new DiscoveryNode[]{masterNode, dataNode})); + + ClusterSettings clusterSettings = + new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + MasterTaskThrottler throttler = new MasterTaskThrottler(clusterSettings, clusterService.getMasterService()); + + Settings newSettings = Settings.builder() + .put("master.throttling.thresholds.create-index.values", -5) + .build(); + AtomicBoolean exceptionThrown = new AtomicBoolean(); + try { + throttler.validateSetting(newSettings); + } catch (IllegalArgumentException e){ + exceptionThrown.set(true); + } + assertTrue(exceptionThrown.getAndSet(false)); + } + + public void testUpdateLimit() { + ClusterSettings clusterSettings = + new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + MasterTaskThrottler throttler = new MasterTaskThrottler(clusterSettings, clusterService.getMasterService()); + throttler.setThrottlingEnabled(true); + + throttler.updateLimit("test", 5); + assertEquals(5, throttler.getThrottlingLimit("test").intValue()); + throttler.updateLimit("test", -1); + assertNull(throttler.getThrottlingLimit("test")); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/service/ThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ThrottlerTests.java new file mode 100644 index 0000000000000..f3cf9215a0599 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/service/ThrottlerTests.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.common.AdjustableSemaphore; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Contains tests of {@link Throttler} + */ +public class ThrottlerTests extends OpenSearchTestCase { + + public void testDisabledThrottling() { + Throttler throttler = new Throttler(true); + throttler.updateThrottlingLimit("testKey", 1); + throttler.setThrottlingEnabled(false); + boolean firstCall = throttler.acquire("testKey", 5); + assertTrue(firstCall); + } + + public void testThrottling() { + Throttler throttler = new Throttler(true); + throttler.updateThrottlingLimit("testKey", 5); + boolean firstCall = throttler.acquire("testKey", 1); + assertTrue(firstCall); + + boolean secondCall = throttler.acquire("testKey", 2); + assertTrue(secondCall); + + boolean thirdCall = throttler.acquire("testKey", 4); + assertFalse(thirdCall); + + // will remove used value + throttler.release("testKey", 2); + boolean fourthCall = throttler.acquire("testKey", 4); + assertTrue(fourthCall); + + + boolean fifthCall = throttler.acquire("testKey", 1); + assertFalse(fifthCall); + + // update limit and check + throttler.updateThrottlingLimit("testKey", 6); + boolean sixthCall = throttler.acquire("testKey", 1); + assertTrue(sixthCall); + } + + public void testAcquireWithFlippingOfThrottlingFlag() throws Exception { + String test_task = "test"; + Throttler throttler = new Throttler(true); + throttler.updateThrottlingLimit(test_task, 1); + assertTrue(throttler.acquire(test_task, 1)); + assertFalse(throttler.acquire(test_task, 1)); + + throttler.setThrottlingEnabled(false); + throttler.setThrottlingEnabled(true); + assertEquals(1, throttler.getThrottlingLimit(test_task).intValue()); + assertTrue(throttler.acquire(test_task, 1)); + assertFalse(throttler.acquire(test_task, 1)); + + } + + public void testThrottlingWithoutLimit() { + Throttler throttler = new Throttler(true); + boolean firstCall = throttler.acquire("testKey", 1); + assertTrue(firstCall); + + boolean secondCall = throttler.acquire("testKey", 2000); + assertTrue(secondCall); + } + + public void testRemoveThrottlingLimit() { + Throttler throttler = new Throttler(true); + throttler.updateThrottlingLimit("testKey", 5); + + boolean firstCall = throttler.acquire("testKey", 1); + assertTrue(firstCall); + + boolean secondCall = throttler.acquire("testKey", 5); + assertFalse(secondCall); + + throttler.removeThrottlingLimit("testKey"); + boolean thirdCall = throttler.acquire("testKey", 500); + assertTrue(thirdCall); + } + + public void testUpdateLimitForThrottlingSemaphore() { + int initialLimit = randomInt(10); + AdjustableSemaphore semaphore = new AdjustableSemaphore(initialLimit); + assertEquals(initialLimit, semaphore.availablePermits()); + + int newIncreasedLimit = randomIntBetween(initialLimit, 20); + semaphore.setMaxPermits(newIncreasedLimit); + assertEquals(newIncreasedLimit, semaphore.availablePermits()); + + int newDecreasedLimit = randomInt(newIncreasedLimit-1); + semaphore.setMaxPermits(newDecreasedLimit); + assertEquals(newDecreasedLimit, semaphore.availablePermits()); + } +}