Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore from Individual Shard Snapshot Files in Parallel #48110

Merged
merged 49 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
3639a6f
step 1
original-brownbear Oct 11, 2019
acacf4f
step 2
original-brownbear Oct 11, 2019
0c4a128
async restore
original-brownbear Oct 12, 2019
a4beb48
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 12, 2019
1fd86af
bck
original-brownbear Oct 12, 2019
5fb8169
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 13, 2019
eb6f102
another step
original-brownbear Oct 13, 2019
9deb3cd
another step
original-brownbear Oct 14, 2019
711c43a
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 14, 2019
af00465
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 14, 2019
b4d6d88
async
original-brownbear Oct 14, 2019
19c8f35
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 14, 2019
3d16462
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 15, 2019
13bf860
parallel
original-brownbear Oct 15, 2019
17cf524
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 16, 2019
eaf9761
Merge branch 'master' of https://github.com/elastic/elasticsearch int…
original-brownbear Oct 16, 2019
51e7c34
bck
original-brownbear Oct 16, 2019
4b28df3
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 16, 2019
22b2306
Simplify Shard Snapshot Upload Code
original-brownbear Oct 16, 2019
852efe3
fix mistake
original-brownbear Oct 16, 2019
77b6ab2
simpler
original-brownbear Oct 16, 2019
f5e18c7
fix test
original-brownbear Oct 16, 2019
cea3e85
smarter snapshot info
original-brownbear Oct 16, 2019
087aeeb
cleaner
original-brownbear Oct 16, 2019
3bb0683
shorter
original-brownbear Oct 16, 2019
ea7f6ce
more randomness
original-brownbear Oct 16, 2019
6ce0808
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 16, 2019
4613489
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 17, 2019
1cf0f13
cleanup abstraction levels
original-brownbear Oct 17, 2019
7e4406e
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 21, 2019
9707a4b
Merge remote-tracking branch 'origin/better-concurrency-snapshot-uplo…
original-brownbear Oct 21, 2019
c48183f
nicer
original-brownbear Oct 21, 2019
f273373
nicer
original-brownbear Oct 21, 2019
fc4694b
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 21, 2019
a21ad9b
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 21, 2019
94feeca
nicer + todo
original-brownbear Oct 21, 2019
8f8f747
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 22, 2019
253e198
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 23, 2019
889f334
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 23, 2019
0971b46
add todo
original-brownbear Oct 23, 2019
80bf095
fix
original-brownbear Oct 23, 2019
6082460
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 23, 2019
3ef5686
nicer
original-brownbear Oct 23, 2019
86db9c7
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 24, 2019
a8989cc
fixes
original-brownbear Oct 24, 2019
b781b5b
shorter diff
original-brownbear Oct 24, 2019
c880b39
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 29, 2019
215609e
CR: comments
original-brownbear Oct 29, 2019
6630105
Merge remote-tracking branch 'elastic/master' into async-restore
original-brownbear Oct 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
Expand Down Expand Up @@ -1816,12 +1817,16 @@ public boolean recoverFromStore() {
return storeRecovery.recoverFromStore(this);
}

public boolean restoreFromRepository(Repository repository) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromRepository(this, repository);
public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
try {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRepository(this, repository, listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
Expand Down Expand Up @@ -2504,17 +2509,15 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
case SNAPSHOT:
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
threadPool.generic().execute(() -> {
try {
final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository());
if (restoreFromRepository(repository)) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
threadPool.generic().execute(
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
restoreListener -> restoreFromRepository(
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
break;
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
Expand Down
175 changes: 99 additions & 76 deletions server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState);
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener) {
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,10 @@ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
* @param listener listener to invoke once done
*/
void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);

void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener);
/**
* Retrieve shard snapshot status for the stored snapshot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,11 +1195,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
final ActionListener<Void> filesListener = ActionListener.delegateResponse(
new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> {
filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception
l.onFailure(e);
});
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
for (int i = 0; i < workers; ++i) {
executor.execute(ActionRunnable.run(filesListener, () -> {
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
Expand All @@ -1223,19 +1219,42 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
ShardId shardId = store.shardId();
try {
final BlobContainer container = shardContainer(indexId, snapshotShardId);
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
RecoveryState recoveryState, ActionListener<Void> listener) {
final ShardId shardId = store.shardId();
final ActionListener<Void> restoreListener = ActionListener.delegateResponse(listener,
(l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e)));
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final BlobContainer container = shardContainer(indexId, snapshotShardId);
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
ActionListener<Void> listener) {
if (filesToRecover.isEmpty()) {
listener.onResponse(null);
} else {
// Start as many workers as fit into the snapshot pool at once at the most
final int workers =
Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), snapshotFiles.indexFiles().size());
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files = new LinkedBlockingQueue<>(filesToRecover);
final ActionListener<Void> allFilesListener =
fileQueueListener(files, workers, ActionListener.map(listener, v -> null));
// restore the files from the snapshot to the Lucene store
for (int i = 0; i < workers; ++i) {
executor.execute(ActionRunnable.run(allFilesListener, () -> {
store.incRef();
try {
BlobStoreIndexShardSnapshot.FileInfo fileToRecover;
while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not very fair to other actions running on the node. This is essentially blocking the snapshot threadpool until all files have been restored for the shard. Imagine a concurrent snapshot (in the future). We should avoid blocking threadpools for very long actions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, but to some degree blocking the threadpool here is by design so we restore shard-by-shard as efficiently as possible. Maybe it would be better to move this action onto the generic pool in a follow-up and allow configuring the restore parallelism independent from snapshot parallelism?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the question when it comes to parallel operations is more general anyway. Doesn't it boil down to what trade-offs we want to make about the point in time of creating index commits?
If we don't do these long running operations per-shard then we'll have a better chance of quickly having a thread available for taking index-commits but the individual commits will live for longer. If we absolutely don't care about quickly taking index commits I think the current approach is best. If we strongly care, we could move taking index commits to the generic pool (but even if we did that we'd have to bound their number ...).

-> until we decide on a strategy for scheduling the steps in parallel snapshots creation this seems like the best approach since it's fastest end-to-end isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

restoreFile(fileToRecover, store);
}
} finally {
store.decRef();
}
}));
}
}
}

Expand Down Expand Up @@ -1275,10 +1294,16 @@ protected InputStream openSlice(long slice) throws IOException {
}
}
}
}.restore(snapshotFiles, store);
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}.restore(snapshotFiles, store, l);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the changes around FileRestoreContext but this is adding more complexity on something that is already torn to fit for CCR and regular restore needs. I think we should look at simplify FileRestoreContext before adding async behavior to it. Or maybe you already investigated this?

Copy link
Member Author

@original-brownbear original-brownbear Oct 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found some significant simplifications here in #48173 when investigating this for this PR but that was pretty much all the simplification I could find (it does remove incorrectly shared code between blobstore and CCR repo). Actually, with that change the separation between CCR and blobstore code should be "correct" in theory as all the code in the base class is now used across both implementations.

I think I'd "simply" make the CCR logic properly async in a follow-up as that would simplify it overall as well but figured for now keeping CCR code as is and blocking around the changed API was the way to go to not mix things into this PR.

}));
}

private static ActionListener<Void> fileQueueListener(BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files, int workers,
ActionListener<Collection<Void>> listener) {
return ActionListener.delegateResponse(new GroupedActionListener<>(listener, workers), (l, e) -> {
files.clear(); // Stop uploading the remaining files if we run into any exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For extra safety can we call onFailure() in a finally block?

try {
   files.clear(); // Stop uploading the remaining files if we run into any exception
} finally {
   l.onFailure(e);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shouldn't be necessary I think. If we run into an exception/throwable here the node is probably done for anyway? (the only way I see us getting here is some OOM, otherwise clear() should never throw on a healthy JVM?)

l.onFailure(e);
});
}

private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -74,7 +75,7 @@ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId
/**
* Performs restore operation
*/
public void restore(SnapshotFiles snapshotFiles, Store store) {
public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Void> listener) {
store.incRef();
try {
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
Expand Down Expand Up @@ -150,36 +151,49 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
}
}

restoreFiles(filesToRecover, store);
restoreFiles(filesToRecover, store, ActionListener.wrap(
v -> {
store.incRef();
try {
afterRestore(snapshotFiles, store, restoredSegmentsFile);
listener.onResponse(null);
} finally {
store.decRef();
}
}, listener::onFailure));
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
} catch (Exception e) {
listener.onFailure(e);
} finally {
store.decRef();
}
}

// read the snapshot data persisted
try {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetaData restoredSegmentsFile) {
// read the snapshot data persisted
try {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}

/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
}
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
continue; //skip write.lock, checksum files and files that exist in the snapshot
}
try {
store.deleteQuiet("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
}
} catch (IOException e) {
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
}
} finally {
store.decRef();
} catch (IOException e) {
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
}
}

Expand All @@ -189,7 +203,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
* @param filesToRecover List of files to restore
* @param store Store to restore into
*/
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException;
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
ActionListener<Void> listener);

@SuppressWarnings("unchecked")
private static Iterable<StoreFileMetaData> concat(Store.RecoveryDiff diff) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
* {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} method.
* <p>
* Individual shards are getting restored as part of normal recovery process in
* {@link IndexShard#restoreFromRepository(Repository)} )}
* {@link IndexShard#restoreFromRepository} )}
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
* at the {@link ShardRouting#recoverySource()} property.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2338,23 +2338,24 @@ public void testRestoreShard() throws IOException {

DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRepository(new RestoreOnlyRepository("test") {
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
try {
RecoveryState recoveryState, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
cleanLuceneIndex(targetStore.directory());
for (String file : sourceStore.directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra")) {
continue;
}
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return null;
});
}
}));
}, future);
assertTrue(future.actionGet());
assertThat(target.getLocalCheckpoint(), equalTo(2L));
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s

@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
RecoveryState recoveryState, ActionListener<Void> listener) {

}

@Override
Expand Down
Loading