diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index 07bc519346f7c..34d90035ad6cc 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -21,6 +21,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedSupplier; import java.util.ArrayList; import java.util.List; @@ -180,4 +181,16 @@ protected void innerOnFailure(Exception e) { } }; } + + /** + * Completes the given listener with the result from the provided supplier accordingly. + * This method is mainly used to complete a listener with a block of synchronous code. + */ + static <Response> void completeWith(ActionListener<Response> listener, CheckedSupplier<Response, ? extends Exception> supplier) { + try { + listener.onResponse(supplier.get()); + } catch (Exception e) { + listener.onFailure(e); + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java index 432cef6ad3029..19a0618e1c5a4 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -37,10 +37,16 @@ public class ActionListenerResponseHandler<Response extends TransportResponse> i private final ActionListener<? super Response> listener; private final Writeable.Reader<Response> reader; + private final String executor; - public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) { + public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader, String executor) { this.listener = Objects.requireNonNull(listener); this.reader = Objects.requireNonNull(reader); + this.executor = Objects.requireNonNull(executor); + } + + public ActionListenerResponseHandler(ActionListener<? super Response> listener, Writeable.Reader<Response> reader) { + this(listener, reader, ThreadPool.Names.SAME); } @Override @@ -55,7 +61,7 @@ public void handleException(TransportException e) { @Override public String executor() { - return ThreadPool.Names.SAME; + return executor; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 8edaf0ef093ab..8579caecc8add 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -445,11 +445,12 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception { - try (RecoveryRef recoveryRef = - onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { - recoveryRef.target().finalizeRecovery(request.globalCheckpoint()); + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + final ActionListener<TransportResponse> listener = + new HandledTransportAction.ChannelActionListener<>(channel, Actions.FINALIZE, request); + recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), + ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } - channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index bdda5e8d8d4f8..b1585251934dc 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -32,6 +32,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.StepListener; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.StopWatch; @@ -71,6 +72,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -137,6 +139,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); throw e; }); + final Consumer<Exception> onFailure = e -> + IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); + runUnderPrimaryPermit(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); @@ -235,16 +240,21 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } - finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint); - final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time - assert resources.isEmpty() : "not every resource is released [" + resources + "]"; - IOUtils.close(resources); - wrappedListener.onResponse( - new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, + final StepListener<Void> finalizeStep = new StepListener<>(); + finalizeRecovery(sendSnapshotResult.targetLocalCheckpoint, finalizeStep); + finalizeStep.whenComplete(r -> { + assert resources.isEmpty() : "not every resource is released [" + resources + "]"; + final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time + final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize, - sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, prepareEngineTime.millis(), - sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()) - ); + sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, + prepareEngineTime.millis(), sendSnapshotResult.totalOperations, sendSnapshotResult.tookTime.millis()); + try { + wrappedListener.onResponse(response); + } finally { + IOUtils.close(resources); + } + }, onFailure); } catch (Exception e) { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); } @@ -585,10 +595,7 @@ SendSnapshotResult phase2(long startingSeqNo, long requiredSeqNoRangeStart, long return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime); } - /* - * finalizes the recovery process - */ - public void finalizeRecovery(final long targetLocalCheckpoint) throws IOException { + void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } @@ -604,21 +611,26 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger); final long globalCheckpoint = shard.getGlobalCheckpoint(); - cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); - runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), - shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); - - if (request.isPrimaryRelocation()) { - logger.trace("performing relocation hand-off"); - // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done - cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext)); - /* - * if the recovery process fails after disabling primary mode on the source shard, both relocation source and - * target are failed (see {@link IndexShard#updateRoutingEntry}). - */ - } - stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + final StepListener<Void> finalizeListener = new StepListener<>(); + cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener)); + finalizeListener.whenComplete(r -> { + runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), + shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger); + + if (request.isPrimaryRelocation()) { + logger.trace("performing relocation hand-off"); + // TODO: make relocated async + // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done + cancellableThreads.execute(() -> shard.relocated(recoveryTarget::handoffPrimaryContext)); + /* + * if the recovery process fails after disabling primary mode on the source shard, both relocation source and + * target are failed (see {@link IndexShard#updateRoutingEntry}). + */ + } + stopWatch.stop(); + logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + listener.onResponse(null); + }, listener::onFailure); } static final class SendSnapshotResult { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 54a42bcdc928a..73ad4c17594c1 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -372,12 +372,15 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void finalizeRecovery(final long globalCheckpoint) throws IOException { - final IndexShard indexShard = indexShard(); - indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); - // Persist the global checkpoint. - indexShard.sync(); - indexShard.finalizeRecovery(); + public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) { + ActionListener.completeWith(listener, () -> { + final IndexShard indexShard = indexShard(); + indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); + // Persist the global checkpoint. + indexShard.sync(); + indexShard.finalizeRecovery(); + return null; + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index c958665b04497..18e57866c68cf 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -42,8 +42,9 @@ public interface RecoveryTargetHandler { * updates the global checkpoint. * * @param globalCheckpoint the global checkpoint on the recovery source + * @param listener the listener which will be notified when this method is completed */ - void finalizeRecovery(long globalCheckpoint) throws IOException; + void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener); /** * Blockingly waits for cluster state with at least clusterStateVersion to be available diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 560d679bbe7fe..53eb3e342face 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportFuture; import org.elasticsearch.transport.TransportRequestOptions; @@ -85,11 +86,12 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void finalizeRecovery(final long globalCheckpoint) { + public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java index 1d4d83457b20e..cd3735b4843e6 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java @@ -18,16 +18,20 @@ */ package org.elasticsearch.action; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class ActionListenerTests extends ESTestCase { @@ -201,4 +205,16 @@ public void onFailure(Exception e) { assertThat(onFailureTimes.get(), equalTo(1)); } } + + public void testCompleteWith() { + PlainActionFuture<Integer> onResponseListener = new PlainActionFuture<>(); + ActionListener.completeWith(onResponseListener, () -> 100); + assertThat(onResponseListener.isDone(), equalTo(true)); + assertThat(onResponseListener.actionGet(), equalTo(100)); + + PlainActionFuture<Integer> onFailureListener = new PlainActionFuture<>(); + ActionListener.completeWith(onFailureListener, () -> { throw new IOException("not found"); }); + assertThat(onFailureListener.isDone(), equalTo(true)); + assertThat(expectThrows(ExecutionException.class, onFailureListener::get).getCause(), instanceOf(IOException.class)); + } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 6f38822092aea..0f591f6db54f2 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteRequest; @@ -847,13 +848,13 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } @Override - public void finalizeRecovery(long globalCheckpoint) throws IOException { + public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) { if (hasBlocked() == false) { // it maybe that not ops have been transferred, block now blockIfNeeded(RecoveryState.Stage.TRANSLOG); } blockIfNeeded(RecoveryState.Stage.FINALIZE); - super.finalizeRecovery(globalCheckpoint); + super.finalizeRecovery(globalCheckpoint, listener); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4745904a55467..5c7dd089534a2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2524,9 +2524,8 @@ public long indexTranslogOperations(List<Translog.Operation> operations, int tot } @Override - public void finalizeRecovery(long globalCheckpoint) throws IOException { - super.finalizeRecovery(globalCheckpoint); - assertListenerCalled.accept(replica); + public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) { + super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica))); } }, false, true); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 7b9e8fe05dad7..29eb1466056e8 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -686,7 +686,7 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void finalizeRecovery(long globalCheckpoint) { + public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) { } @Override