Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore from Individual Shard Snapshot Files in Parallel (#48110) #48686

Merged
merged 1 commit into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
Expand Down Expand Up @@ -1850,12 +1851,16 @@ public boolean recoverFromStore() {
return storeRecovery.recoverFromStore(this);
}

public boolean restoreFromRepository(Repository repository) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
try {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRepository(this, repository, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
Expand Down Expand Up @@ -2540,17 +2545,15 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
case SNAPSHOT:
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
threadPool.generic().execute(() -> {
try {
final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository());
if (restoreFromRepository(repository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
threadPool.generic().execute(
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
restoreListener -> restoreFromRepository(
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
break;
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
Expand Down
175 changes: 99 additions & 76 deletions server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState);
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener) {
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,10 @@ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
* @param listener listener to invoke once done
*/
void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);

void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener);
/**
* Retrieve shard snapshot status for the stored snapshot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,11 +1214,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
final ActionListener<Void> filesListener = ActionListener.delegateResponse(
new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> {
filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception
l.onFailure(e);
});
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
for (int i = 0; i < workers; ++i) {
executor.execute(ActionRunnable.run(filesListener, () -> {
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
Expand All @@ -1242,19 +1238,42 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
ShardId shardId = store.shardId();
try {
final BlobContainer container = shardContainer(indexId, snapshotShardId);
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
RecoveryState recoveryState, ActionListener<Void> listener) {
final ShardId shardId = store.shardId();
final ActionListener<Void> restoreListener = ActionListener.delegateResponse(listener,
(l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e)));
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final BlobContainer container = shardContainer(indexId, snapshotShardId);
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
ActionListener<Void> listener) {
if (filesToRecover.isEmpty()) {
listener.onResponse(null);
} else {
// Start as many workers as fit into the snapshot pool at once at the most
final int workers =
Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), snapshotFiles.indexFiles().size());
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files = new LinkedBlockingQueue<>(filesToRecover);
final ActionListener<Void> allFilesListener =
fileQueueListener(files, workers, ActionListener.map(listener, v -> null));
// restore the files from the snapshot to the Lucene store
for (int i = 0; i < workers; ++i) {
executor.execute(ActionRunnable.run(allFilesListener, () -> {
store.incRef();
try {
BlobStoreIndexShardSnapshot.FileInfo fileToRecover;
while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) {
restoreFile(fileToRecover, store);
}
} finally {
store.decRef();
}
}));
}
}
}

Expand Down Expand Up @@ -1294,10 +1313,16 @@ protected InputStream openSlice(long slice) throws IOException {
}
}
}
}.restore(snapshotFiles, store);
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}.restore(snapshotFiles, store, l);
}));
}

private static ActionListener<Void> fileQueueListener(BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files, int workers,
ActionListener<Collection<Void>> listener) {
return ActionListener.delegateResponse(new GroupedActionListener<>(listener, workers), (l, e) -> {
files.clear(); // Stop uploading the remaining files if we run into any exception
l.onFailure(e);
});
}

private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId
/**
* Performs restore operation
*/
public void restore(SnapshotFiles snapshotFiles, Store store) {
public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Void> listener) {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
Expand Down Expand Up @@ -150,36 +151,49 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
}
}

restoreFiles(filesToRecover, store);
restoreFiles(filesToRecover, store, ActionListener.wrap(
v -> {
store.incRef();
try {
afterRestore(snapshotFiles, store, restoredSegmentsFile);
listener.onResponse(null);
} finally {
store.decRef();
}
}, listener::onFailure));
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
} catch (Exception e) {
listener.onFailure(e);
} finally {
store.decRef();
}
}

// read the snapshot data persisted
try {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetaData restoredSegmentsFile) {
// read the snapshot data persisted
try {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}

/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
}
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
}
} catch (IOException e) {
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
}
} finally {
store.decRef();
} catch (IOException e) {
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
}
}

Expand All @@ -189,7 +203,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
* @param filesToRecover List of files to restore
* @param store Store to restore into
*/
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException;
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
ActionListener<Void> listener);

@SuppressWarnings("unchecked")
private static Iterable<StoreFileMetaData> concat(Store.RecoveryDiff diff) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
* {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(Repository)} )}
* {@link IndexShard#restoreFromRepository} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link ShardRouting#recoverySource()} property.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2348,23 +2348,24 @@ public void testRestoreShard() throws IOException {

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
try {
RecoveryState recoveryState, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
}
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return null;
});
}
}));
}, future);
assertTrue(future.actionGet());
assertThat(target.getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
RecoveryState recoveryState, ActionListener<Void> listener) {

}

Expand Down
Loading