diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8546b0e8f7154..dfe0cec46c984 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -42,6 +42,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -1816,12 +1817,16 @@ public boolean recoverFromStore() { return storeRecovery.recoverFromStore(this); } - public boolean restoreFromRepository(Repository repository) { - assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + - recoveryState.getRecoverySource(); - StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - return storeRecovery.recoverFromRepository(this, repository); + public void restoreFromRepository(Repository repository, ActionListener listener) { + try { + assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; + assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + + recoveryState.getRecoverySource(); + StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); + storeRecovery.recoverFromRepository(this, repository, listener); + } catch (Exception e) { + listener.onFailure(e); + } } /** @@ -2504,17 +2509,15 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService case SNAPSHOT: markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource(); - threadPool.generic().execute(() -> { - try { - final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository()); - if (restoreFromRepository(repository)) { - recoveryListener.onRecoveryDone(recoveryState); - } - } catch (Exception e) { - recoveryListener.onRecoveryFailure(recoveryState, - new RecoveryFailedException(recoveryState, null, e), true); - } - }); + threadPool.generic().execute( + ActionRunnable.wrap(ActionListener.wrap(r -> { + if (r) { + recoveryListener.onRecoveryDone(recoveryState); + } + }, + e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), + restoreListener -> restoreFromRepository( + repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener))); break; case LOCAL_SHARDS: final IndexMetaData indexMetaData = indexSettings().getIndexMetaData(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index aafa8c1cf4527..4d4d3260046ae 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -30,6 +30,8 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -88,10 +90,16 @@ boolean recoverFromStore(final IndexShard indexShard) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE : "expected store recovery type but was: " + recoveryType; - return executeRecovery(indexShard, () -> { + final PlainActionFuture future = PlainActionFuture.newFuture(); + final ActionListener recoveryListener = recoveryListener(indexShard, future); + try { logger.debug("starting recovery from store ..."); internalRecoverFromStore(indexShard); - }); + recoveryListener.onResponse(true); + } catch (Exception e) { + recoveryListener.onFailure(e); + } + return future.actionGet(); } return false; } @@ -117,14 +125,15 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate Sort indexSort = indexShard.getIndexSort(); final boolean hasNested = indexShard.mapperService().hasNested(); final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); - return executeRecovery(indexShard, () -> { + final PlainActionFuture future = PlainActionFuture.newFuture(); + ActionListener.completeWith(recoveryListener(indexShard, future), () -> { logger.debug("starting recovery from local shards {}", shards); try { final Directory directory = indexShard.store().directory(); // don't close this directory!! final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new); final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong(); final long maxUnsafeAutoIdTimestamp = - shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong(); + shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong(); addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp, indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested); internalRecoverFromStore(indexShard); @@ -132,11 +141,13 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate // copied segments - we will also see them in stats etc. indexShard.getEngine().forceMerge(false, -1, false, false, false); + return true; } catch (IOException ex) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex); } - }); + assert future.isDone(); + return future.actionGet(); } return false; } @@ -262,21 +273,22 @@ public void readBytes(byte[] b, int offset, int len) throws IOException { * previously created index snapshot into an existing initializing shard. * @param indexShard the index shard instance to recovery the snapshot from * @param repository the repository holding the physical files the shard should be recovered from - * @return true if the shard has been recovered successfully, false if the recovery - * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. + * @param listener resolves to true if the shard has been recovered successfully, false if the recovery + * has been ignored due to a concurrent modification of if the clusters state has changed due to async updates. */ - boolean recoverFromRepository(final IndexShard indexShard, Repository repository) { - if (canRecover(indexShard)) { - RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); - assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; - SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); - return executeRecovery(indexShard, () -> { - logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); - restore(indexShard, repository, recoverySource); - }); + void recoverFromRepository(final IndexShard indexShard, Repository repository, ActionListener listener) { + try { + if (canRecover(indexShard)) { + RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); + assert recoveryType == RecoverySource.Type.SNAPSHOT : "expected snapshot recovery type: " + recoveryType; + SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) indexShard.recoveryState().getRecoverySource(); + restore(indexShard, repository, recoverySource, recoveryListener(indexShard, listener)); + } else { + listener.onResponse(false); + } + } catch (Exception e) { + listener.onFailure(e); } - return false; - } private boolean canRecover(IndexShard indexShard) { @@ -290,59 +302,62 @@ private boolean canRecover(IndexShard indexShard) { return true; } - /** - * Recovers the state of the shard from the store. - */ - private boolean executeRecovery(final IndexShard indexShard, Runnable recoveryRunnable) throws IndexShardRecoveryException { - try { - recoveryRunnable.run(); - // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway - // to call post recovery. - final IndexShardState shardState = indexShard.state(); - final RecoveryState recoveryState = indexShard.recoveryState(); - assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : - "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]"; - - if (logger.isTraceEnabled()) { - RecoveryState.Index index = recoveryState.getIndex(); - StringBuilder sb = new StringBuilder(); - sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [") + private ActionListener recoveryListener(IndexShard indexShard, ActionListener listener) { + return ActionListener.wrap(res -> { + if (res) { + // Check that the gateway didn't leave the shard in init or recovering stage. it is up to the gateway + // to call post recovery. + final IndexShardState shardState = indexShard.state(); + final RecoveryState recoveryState = indexShard.recoveryState(); + assert shardState != IndexShardState.CREATED && shardState != IndexShardState.RECOVERING : + "recovery process of " + shardId + " didn't get to post_recovery. shardState [" + shardState + "]"; + + if (logger.isTraceEnabled()) { + RecoveryState.Index index = recoveryState.getIndex(); + StringBuilder sb = new StringBuilder(); + sb.append(" index : files [").append(index.totalFileCount()).append("] with total_size [") .append(new ByteSizeValue(index.totalBytes())).append("], took[") .append(TimeValue.timeValueMillis(index.time())).append("]\n"); - sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [") + sb.append(" : recovered_files [").append(index.recoveredFileCount()).append("] with total_size [") .append(new ByteSizeValue(index.recoveredBytes())).append("]\n"); - sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [") + sb.append(" : reusing_files [").append(index.reusedFileCount()).append("] with total_size [") .append(new ByteSizeValue(index.reusedBytes())).append("]\n"); - sb.append(" verify_index : took [") - .append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [") - .append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n"); - sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations()) + sb.append(" verify_index : took [") + .append(TimeValue.timeValueMillis(recoveryState.getVerifyIndex().time())).append("], check_index [") + .append(timeValueMillis(recoveryState.getVerifyIndex().checkIndexTime())).append("]\n"); + sb.append(" translog : number_of_operations [").append(recoveryState.getTranslog().recoveredOperations()) .append("], took [").append(TimeValue.timeValueMillis(recoveryState.getTranslog().time())).append("]"); - logger.trace("recovery completed from [shard_store], took [{}]\n{}", - timeValueMillis(recoveryState.getTimer().time()), sb); - } else if (logger.isDebugEnabled()) { - logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time())); - } - return true; - } catch (IndexShardRecoveryException e) { - if (indexShard.state() == IndexShardState.CLOSED) { - // got closed on us, just ignore this recovery - return false; - } - if ((e.getCause() instanceof IndexShardClosedException) || (e.getCause() instanceof IndexShardNotStartedException)) { - // got closed on us, just ignore this recovery - return false; + logger.trace("recovery completed from [shard_store], took [{}]\n{}", + timeValueMillis(recoveryState.getTimer().time()), sb); + } else if (logger.isDebugEnabled()) { + logger.debug("recovery completed from [shard_store], took [{}]", timeValueMillis(recoveryState.getTimer().time())); + } } - throw e; - } catch (IndexShardClosedException | IndexShardNotStartedException e) { - } catch (Exception e) { - if (indexShard.state() == IndexShardState.CLOSED) { - // got closed on us, just ignore this recovery - return false; + listener.onResponse(res); + }, ex -> { + if (ex instanceof IndexShardRecoveryException) { + if (indexShard.state() == IndexShardState.CLOSED) { + // got closed on us, just ignore this recovery + listener.onResponse(false); + return; + } + if ((ex.getCause() instanceof IndexShardClosedException) || (ex.getCause() instanceof IndexShardNotStartedException)) { + // got closed on us, just ignore this recovery + listener.onResponse(false); + return; + } + listener.onFailure(ex); + } else if (ex instanceof IndexShardClosedException || ex instanceof IndexShardNotStartedException) { + listener.onResponse(false); + } else { + if (indexShard.state() == IndexShardState.CLOSED) { + // got closed on us, just ignore this recovery + listener.onResponse(false); + } else { + listener.onFailure(new IndexShardRecoveryException(shardId, "failed recovery", ex)); + } } - throw new IndexShardRecoveryException(shardId, "failed recovery", e); - } - return false; + }); } /** @@ -436,14 +451,30 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState /** * Restores shard from {@link SnapshotRecoverySource} associated with this shard in routing table */ - private void restore(final IndexShard indexShard, final Repository repository, final SnapshotRecoverySource restoreSource) { + private void restore(IndexShard indexShard, Repository repository, SnapshotRecoverySource restoreSource, + ActionListener listener) { + logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource()); final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog(); if (restoreSource == null) { - throw new IndexShardRestoreFailedException(shardId, "empty restore source"); + listener.onFailure(new IndexShardRestoreFailedException(shardId, "empty restore source")); + return; } if (logger.isTraceEnabled()) { logger.trace("[{}] restoring shard [{}]", restoreSource.snapshot(), shardId); } + final ActionListener restoreListener = ActionListener.wrap( + v -> { + final Store store = indexShard.store(); + bootstrap(indexShard, store); + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + writeEmptyRetentionLeasesFile(indexShard); + indexShard.openEngineAndRecoverFromTranslog(); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); + indexShard.finalizeRecovery(); + indexShard.postRecovery("restore done"); + listener.onResponse(true); + }, e -> listener.onFailure(new IndexShardRestoreFailedException(shardId, "restore failed", e)) + ); try { translogState.totalOperations(0); translogState.totalOperationsOnStart(0); @@ -456,17 +487,9 @@ private void restore(final IndexShard indexShard, final Repository repository, f final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); assert indexShard.getEngineOrNull() == null; repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId, - indexShard.recoveryState()); - final Store store = indexShard.store(); - bootstrap(indexShard, store); - assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; - writeEmptyRetentionLeasesFile(indexShard); - indexShard.openEngineAndRecoverFromTranslog(); - indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); - indexShard.finalizeRecovery(); - indexShard.postRecovery("restore done"); + indexShard.recoveryState(), restoreListener); } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "restore failed", e); + restoreListener.onFailure(e); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 59092cc573bf2..f156966b2e67f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -123,8 +123,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener); } @Override - public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState); + public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, + ActionListener listener) { + in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 31687fbe0f8ec..57bb49038e432 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -211,9 +211,10 @@ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshot * @param indexId id of the index in the repository from which the restore is occurring * @param snapshotShardId shard id (in the snapshot) * @param recoveryState recovery state + * @param listener listener to invoke once done */ - void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState); - + void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, + ActionListener listener); /** * Retrieve shard snapshot status for the stored snapshot * diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9c1c24a35fbe4..35cfe4cdfdc0d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1195,11 +1195,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); // Start as many workers as fit into the snapshot pool at once at the most final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount); - final ActionListener filesListener = ActionListener.delegateResponse( - new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> { - filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception - l.onFailure(e); - }); + final ActionListener filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener); for (int i = 0; i < workers; ++i) { executor.execute(ActionRunnable.run(filesListener, () -> { BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); @@ -1223,19 +1219,42 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, - RecoveryState recoveryState) { - ShardId shardId = store.shardId(); - try { - final BlobContainer container = shardContainer(indexId, snapshotShardId); - BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + RecoveryState recoveryState, ActionListener listener) { + final ShardId shardId = store.shardId(); + final ActionListener restoreListener = ActionListener.delegateResponse(listener, + (l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final BlobContainer container = shardContainer(indexId, snapshotShardId); + executor.execute(ActionRunnable.wrap(restoreListener, l -> { + final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); + final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @Override - protected void restoreFiles(List filesToRecover, Store store) throws IOException { - // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store); + protected void restoreFiles(List filesToRecover, Store store, + ActionListener listener) { + if (filesToRecover.isEmpty()) { + listener.onResponse(null); + } else { + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = + Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), snapshotFiles.indexFiles().size()); + final BlockingQueue files = new LinkedBlockingQueue<>(filesToRecover); + final ActionListener allFilesListener = + fileQueueListener(files, workers, ActionListener.map(listener, v -> null)); + // restore the files from the snapshot to the Lucene store + for (int i = 0; i < workers; ++i) { + executor.execute(ActionRunnable.run(allFilesListener, () -> { + store.incRef(); + try { + BlobStoreIndexShardSnapshot.FileInfo fileToRecover; + while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) { + restoreFile(fileToRecover, store); + } + } finally { + store.decRef(); + } + })); + } } } @@ -1275,10 +1294,16 @@ protected InputStream openSlice(long slice) throws IOException { } } } - }.restore(snapshotFiles, store); - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } + }.restore(snapshotFiles, store, l); + })); + } + + private static ActionListener fileQueueListener(BlockingQueue files, int workers, + ActionListener> listener) { + return ActionListener.delegateResponse(new GroupedActionListener<>(listener, workers), (l, e) -> { + files.clear(); // Stop uploading the remaining files if we run into any exception + l.onFailure(e); + }); } private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 914d87202c682..885e7e0ac2ce1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.shard.ShardId; @@ -74,7 +75,7 @@ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId /** * Performs restore operation */ - public void restore(SnapshotFiles snapshotFiles, Store store) { + public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener listener) { store.incRef(); try { logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); @@ -150,36 +151,49 @@ public void restore(SnapshotFiles snapshotFiles, Store store) { } } - restoreFiles(filesToRecover, store); + restoreFiles(filesToRecover, store, ActionListener.wrap( + v -> { + store.incRef(); + try { + afterRestore(snapshotFiles, store, restoredSegmentsFile); + listener.onResponse(null); + } finally { + store.decRef(); + } + }, listener::onFailure)); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); } + } catch (Exception e) { + listener.onFailure(e); + } finally { + store.decRef(); + } + } - // read the snapshot data persisted - try { - Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); - } catch (IOException e) { - throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); - } + private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetaData restoredSegmentsFile) { + // read the snapshot data persisted + try { + Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); + } catch (IOException e) { + throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); + } - /// now, go over and clean files that are in the store, but were not in the snapshot - try { - for (String storeFile : store.directory().listAll()) { - if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { - continue; //skip write.lock, checksum files and files that exist in the snapshot - } - try { - store.deleteQuiet("restore", storeFile); - store.directory().deleteFile(storeFile); - } catch (IOException e) { - logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile); - } + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { + continue; //skip write.lock, checksum files and files that exist in the snapshot + } + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile); } - } catch (IOException e) { - logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId); } - } finally { - store.decRef(); + } catch (IOException e) { + logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId); } } @@ -189,7 +203,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store) { * @param filesToRecover List of files to restore * @param store Store to restore into */ - protected abstract void restoreFiles(List filesToRecover, Store store) throws IOException; + protected abstract void restoreFiles(List filesToRecover, Store store, + ActionListener listener); @SuppressWarnings("unchecked") private static Iterable concat(Store.RecoveryDiff diff) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 5f3bc91a9978e..b7722cfaf732e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -107,7 +107,7 @@ * {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} method. *

* Individual shards are getting restored as part of normal recovery process in - * {@link IndexShard#restoreFromRepository(Repository)} )} + * {@link IndexShard#restoreFromRepository} )} * method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking * at the {@link ShardRouting#recoverySource()} property. *

diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index da6c1807f0e34..d818bd49aa083 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2338,11 +2338,12 @@ public void testRestoreShard() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { + final PlainActionFuture future = PlainActionFuture.newFuture(); + target.restoreFromRepository(new RestoreOnlyRepository("test") { @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, - RecoveryState recoveryState) { - try { + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { if (file.equals("write.lock") || file.startsWith("extra")) { @@ -2350,11 +2351,11 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh } targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); } - } catch (Exception ex) { - throw new RuntimeException(ex); - } + return null; + }); } - })); + }, future); + assertTrue(future.actionGet()); assertThat(target.getLocalCheckpoint(), equalTo(2L)); assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 63e5aa820d137..22f1dd815893f 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -205,7 +205,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, - RecoveryState recoveryState) { + RecoveryState recoveryState, ActionListener listener) { + } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 5694629325557..5d5dc1d23e639 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -117,7 +117,9 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { new UnassignedInfo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, "")); routing = ShardRoutingHelper.initialize(routing, localNode.getId(), 0); RecoveryState state = new RecoveryState(routing, localNode, null); - runGeneric(threadPool, () -> repository.restoreShard(store, snapshotId, indexId, shardId, state)); + final PlainActionFuture futureA = PlainActionFuture.newFuture(); + runGeneric(threadPool, () -> repository.restoreShard(store, snapshotId, indexId, shardId, state, futureA)); + futureA.actionGet(); assertTrue(state.getIndex().recoveredBytes() > 0); assertEquals(0, state.getIndex().reusedFileCount()); assertEquals(indexCommit.getFileNames().size(), state.getIndex().recoveredFileCount()); @@ -138,14 +140,16 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { // roll back to the first snap and then incrementally restore RecoveryState firstState = new RecoveryState(routing, localNode, null); - runGeneric(threadPool, () -> - repository.restoreShard(store, snapshotId, indexId, shardId, firstState)); + final PlainActionFuture futureB = PlainActionFuture.newFuture(); + runGeneric(threadPool, () -> repository.restoreShard(store, snapshotId, indexId, shardId, firstState, futureB)); + futureB.actionGet(); assertEquals("should reuse everything except of .liv and .si", commitFileNames.size()-2, firstState.getIndex().reusedFileCount()); RecoveryState secondState = new RecoveryState(routing, localNode, null); - runGeneric(threadPool, () -> - repository.restoreShard(store, incSnapshotId, indexId, shardId, secondState)); + final PlainActionFuture futureC = PlainActionFuture.newFuture(); + runGeneric(threadPool, () -> repository.restoreShard(store, incSnapshotId, indexId, shardId, secondState, futureC)); + futureC.actionGet(); assertEquals(secondState.getIndex().reusedFileCount(), commitFileNames.size()-2); assertEquals(secondState.getIndex().recoveredFileCount(), 2); List recoveredFiles = diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 98e2f88292d56..8d5cfc7471b59 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -805,11 +805,14 @@ protected void recoverShardFromSnapshot(final IndexShard shard, new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, version, index); final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, recoverySource); shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null)); + final PlainActionFuture future = PlainActionFuture.newFuture(); repository.restoreShard(shard.store(), snapshot.getSnapshotId(), indexId, shard.shardId(), - shard.recoveryState()); + shard.recoveryState(), + future); + future.actionGet(); } /** diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index b6048376687b2..1534d5870d0a8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -291,26 +291,29 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } @Override - public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - // TODO: Add timeouts to network calls / the restore process. - createEmptyStore(store); - ShardId shardId = store.shardId(); - - final Map ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); - final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); - final String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); - final Index leaderIndex = new Index(leaderIndexName, leaderUUID); - final ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); - - final Client remoteClient = getRemoteClusterClient(); - - final String retentionLeaseId = + public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, + ActionListener listener) { + // TODO: Instead of blocking in the restore logic and synchronously completing the listener we should just make below logic async + ActionListener.completeWith(listener, () -> { + // TODO: Add timeouts to network calls / the restore process. + createEmptyStore(store); + ShardId shardId = store.shardId(); + + final Map ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); + final String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + final Index leaderIndex = new Index(leaderIndexName, leaderUUID); + final ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); + + final Client remoteClient = getRemoteClusterClient(); + + final String retentionLeaseId = retentionLeaseId(localClusterName, shardId.getIndex(), remoteClusterAlias, leaderIndex); - acquireRetentionLeaseOnLeader(shardId, retentionLeaseId, leaderShardId, remoteClient); + acquireRetentionLeaseOnLeader(shardId, retentionLeaseId, leaderShardId, remoteClient); - // schedule renewals to run during the restore - final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( + // schedule renewals to run during the restore + final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( () -> { logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId); final ThreadContext threadContext = threadPool.getThreadContext(); @@ -318,38 +321,40 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh // we have to execute under the system context so that if security is enabled the renewal is authorized threadContext.markAsSystemContext(); CcrRetentionLeases.asyncRenewRetentionLease( - leaderShardId, - retentionLeaseId, - RETAIN_ALL, - remoteClient, - ActionListener.wrap( - r -> {}, - e -> { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - assert cause instanceof ElasticsearchSecurityException == false : cause; - if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) { - logger.warn(new ParameterizedMessage( - "{} background renewal of retention lease [{}] failed during restore", shardId, - retentionLeaseId), cause); - } - })); + leaderShardId, + retentionLeaseId, + RETAIN_ALL, + remoteClient, + ActionListener.wrap( + r -> {}, + e -> { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + assert cause instanceof ElasticsearchSecurityException == false : cause; + if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) { + logger.warn(new ParameterizedMessage( + "{} background renewal of retention lease [{}] failed during restore", shardId, + retentionLeaseId), cause); + } + })); } }, CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(store.indexSettings().getNodeSettings()), Ccr.CCR_THREAD_POOL_NAME); - // TODO: There should be some local timeout. And if the remote cluster returns an unknown session - // response, we should be able to retry by creating a new session. - try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) { - restoreSession.restoreFiles(store); - updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex()); - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } finally { - logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, - retentionLeaseId); - renewable.cancel(); - } + // TODO: There should be some local timeout. And if the remote cluster returns an unknown session + // response, we should be able to retry by creating a new session. + try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) { + restoreSession.restoreFiles(store); + updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex()); + } catch (Exception e) { + throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); + } finally { + logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, + retentionLeaseId); + renewable.cancel(); + } + return null; + }); } private void createEmptyStore(Store store) { @@ -465,86 +470,92 @@ void restoreFiles(Store store) { fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize)); } SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos); - restore(snapshotFiles, store); + final PlainActionFuture future = PlainActionFuture.newFuture(); + restore(snapshotFiles, store, future); + future.actionGet(); } @Override - protected void restoreFiles(List filesToRecover, Store store) { - logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); - final PlainActionFuture restoreFilesFuture = new PlainActionFuture<>(); - final List mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList()); - final MultiFileTransfer multiFileTransfer = new MultiFileTransfer<>( - logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) { - - final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {}); - long offset = 0; - - @Override - protected void onNewFile(StoreFileMetaData md) { - offset = 0; - } - - @Override - protected FileChunk nextChunkRequest(StoreFileMetaData md) { - final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset)); - offset += bytesRequested; - return new FileChunk(md, bytesRequested, offset == md.length()); - } - - @Override - protected void executeChunkRequest(FileChunk request, ActionListener listener) { - final ActionListener threadedListener - = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap( + protected void restoreFiles(List filesToRecover, Store store, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); + final PlainActionFuture restoreFilesFuture = new PlainActionFuture<>(); + final List mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList()); + final MultiFileTransfer multiFileTransfer = new MultiFileTransfer<>( + logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) { + + final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { + }); + long offset = 0; + + @Override + protected void onNewFile(StoreFileMetaData md) { + offset = 0; + } + + @Override + protected FileChunk nextChunkRequest(StoreFileMetaData md) { + final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset)); + offset += bytesRequested; + return new FileChunk(md, bytesRequested, offset == md.length()); + } + + @Override + protected void executeChunkRequest(FileChunk request, ActionListener listener) { + final ActionListener threadedListener + = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap( r -> { writeFileChunk(request.md, r); listener.onResponse(null); }, listener::onFailure), false); - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, - new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested), - ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(), - ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME)); - } - - private void writeFileChunk(StoreFileMetaData md, - GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception { - final int actualChunkSize = r.getChunk().length(); - logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", - shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize); - final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); - throttleListener.accept(nanosPaused); - multiFileWriter.incRef(); - try (Releasable ignored = multiFileWriter::decRef) { - final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length(); - multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk); - } catch (Exception e) { - handleError(md, e); - throw e; + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, + new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested), + ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(), + ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME)); + } + + private void writeFileChunk(StoreFileMetaData md, + GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception { + final int actualChunkSize = r.getChunk().length(); + logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", + shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize); + final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); + throttleListener.accept(nanosPaused); + multiFileWriter.incRef(); + try (Releasable ignored = multiFileWriter::decRef) { + final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length(); + multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk); + } catch (Exception e) { + handleError(md, e); + throw e; + } } - } - - @Override - protected void handleError(StoreFileMetaData md, Exception e) throws Exception { - final IOException corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { - try { - store.markStoreCorrupted(corruptIndexException); - } catch (IOException ioe) { - logger.warn("store cannot be marked as corrupted", e); + + @Override + protected void handleError(StoreFileMetaData md, Exception e) throws Exception { + final IOException corruptIndexException; + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + try { + store.markStoreCorrupted(corruptIndexException); + } catch (IOException ioe) { + logger.warn("store cannot be marked as corrupted", e); + } + throw corruptIndexException; } - throw corruptIndexException; + throw e; } - throw e; - } - - @Override - public void close() { - multiFileWriter.close(); - } - }; - multiFileTransfer.start(); - restoreFilesFuture.actionGet(); - logger.trace("[{}] completed CCR restore", shardId); + + @Override + public void close() { + multiFileWriter.close(); + } + }; + multiFileTransfer.start(); + restoreFilesFuture.actionGet(); + logger.trace("[{}] completed CCR restore", shardId); + return null; + }); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index fcfc62fb8a194..3fdf16a67946f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -451,11 +451,12 @@ protected synchronized void recoverPrimary(IndexShard primary) { ShardRouting routing = ShardRoutingHelper.newWithRestoreSource(primary.routingEntry(), new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); primary.markAsRecovering("remote recovery from leader", new RecoveryState(routing, localNode, null)); + final PlainActionFuture future = PlainActionFuture.newFuture(); primary.restoreFromRepository(new RestoreOnlyRepository(index.getName()) { @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, - RecoveryState recoveryState) { - try { + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { IndexShard leader = leaderGroup.getPrimary(); Lucene.cleanLuceneIndex(primary.store().directory()); try (Engine.IndexCommitRef sourceCommit = leader.acquireSafeIndexCommit()) { @@ -465,11 +466,15 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); } } - } catch (Exception ex) { - throw new AssertionError(ex); - } + return null; + }); } - }); + }, future); + try { + future.actionGet(); + } catch (Exception ex) { + throw new AssertionError(ex); + } } }; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index 232d48833107e..2f468a57236b6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; @@ -125,11 +126,12 @@ public void testRestoreShard() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("store", new RecoveryState(routing, localNode, null)); - assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") { + final PlainActionFuture future = PlainActionFuture.newFuture(); + target.restoreFromRepository(new RestoreOnlyRepository("test") { @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, - RecoveryState recoveryState) { - try { + RecoveryState recoveryState, ActionListener listener) { + ActionListener.completeWith(listener, () -> { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { if (file.equals("write.lock") || file.startsWith("extra")) { @@ -137,11 +139,11 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh } targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); } - } catch (Exception ex) { - throw new RuntimeException(ex); - } + return null; + }); } - })); + }, future); + assertTrue(future.actionGet()); assertThat(target.getLocalCheckpoint(), equalTo(0L)); assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L)); assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index c4ccf6bfd832b..cbe7ab21de523 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -233,8 +233,11 @@ public void testRestoreMinmal() throws IOException { restoredShard.mapperService().merge(shard.indexSettings().getIndexMetaData(), MapperService.MergeReason.MAPPING_RECOVERY); DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT); restoredShard.markAsRecovering("test from snap", new RecoveryState(restoredShard.routingEntry(), discoveryNode, null)); - runAsSnapshot(shard.getThreadPool(), () -> - assertTrue(restoredShard.restoreFromRepository(repository))); + runAsSnapshot(shard.getThreadPool(), () -> { + final PlainActionFuture future = PlainActionFuture.newFuture(); + restoredShard.restoreFromRepository(repository, future); + assertTrue(future.actionGet()); + }); assertEquals(restoredShard.recoveryState().getStage(), RecoveryState.Stage.DONE); assertEquals(restoredShard.recoveryState().getTranslog().recoveredOperations(), 0); assertEquals(IndexShardState.POST_RECOVERY, restoredShard.state());