diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0daafbf4ce887..b7f8bc5722710 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -119,6 +119,7 @@ public abstract class TransportReplicationAction< protected final IndicesService indicesService; protected final TransportRequestOptions transportOptions; protected final String executor; + protected final boolean forceExecutionOnPrimary; // package private for testing protected final String transportReplicaAction; @@ -157,6 +158,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); + this.forceExecutionOnPrimary = forceExecutionOnPrimary; transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); @@ -905,7 +907,7 @@ void retryBecauseUnavailable(ShardId shardId, String message) { protected void acquirePrimaryOperationPermit(final IndexShard primary, final Request request, final ActionListener onAcquired) { - primary.acquirePrimaryOperationPermit(onAcquired, executor, request); + primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 5a2820fd6de00..cf3998bd0cca9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -60,7 +60,6 @@ public abstract class TransportWriteAction< Response extends ReplicationResponse & WriteResponse > extends TransportReplicationAction { - private final boolean forceExecution; private final IndexingPressure indexingPressure; private final String executor; @@ -74,13 +73,12 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executor = executor; - this.forceExecution = forceExecutionOnPrimary; this.indexingPressure = indexingPressure; } @Override protected Releasable checkOperationLimits(Request request) { - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); } @Override @@ -97,7 +95,7 @@ protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal // If this primary request was received directly from the network, we must mark a new primary // operation. This happens if the write action skips the reroute step (ex: rsync) or during // primary delegation, after the primary relocation hand-off. - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); } } @@ -107,7 +105,7 @@ protected long primaryOperationSize(Request request) { @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution); + return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary); } protected long replicaOperationSize(ReplicaRequest request) { @@ -163,7 +161,7 @@ protected void doRun() { @Override public boolean isForceExecution() { - return forceExecution; + return forceExecutionOnPrimary; } }); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 07d84c78d1a16..a07b9faf87272 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2723,10 +2723,16 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { * isn't used */ public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo) { + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false); + } + + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo, + boolean forceExecution) { verifyNotClosed(); assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; - indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo); + indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution, + debugInfo); } /** diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 04845547c29ec..5c3e31b4414db 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -131,7 +131,7 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception { acquiredPermits.incrementAndGet(); callback.onResponse(acquiredPermits::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(true)); when(indexShard.getReplicationGroup()).thenReturn( new ReplicationGroup(shardRoutingTable, clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()), diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 25f27d2abfb3b..56bd370397ad2 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -126,6 +126,7 @@ import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -152,6 +153,7 @@ public static R resolveRequest(TransportRequest r private static ThreadPool threadPool; + private boolean forceExecute; private ClusterService clusterService; private TransportService transportService; private CapturingTransport transport; @@ -172,6 +174,7 @@ public static void beforeClass() { @Before public void setUp() throws Exception { super.setUp(); + forceExecute = randomBoolean(); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = transport.createTransportService(clusterService.getSettings(), threadPool, @@ -839,7 +842,7 @@ public void testSeqNoIsSetOnPrimary() { //noinspection unchecked ((ActionListener)invocation.getArguments()[0]).onResponse(count::decrementAndGet); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject(), eq(forceExecute)); when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get()); final IndexService indexService = mock(IndexService.class); @@ -1272,7 +1275,7 @@ private class TestAction extends TransportReplicationAction()), - Request::new, Request::new, ThreadPool.Names.SAME); + Request::new, Request::new, ThreadPool.Names.SAME, false, forceExecute); } @Override @@ -1343,7 +1346,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED)); } return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(forceExecute)); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[3];