Skip to content

Commit

Permalink
Make CcrRepository#restore non-Blocking (#48814) (#48823)
Browse files Browse the repository at this point in the history
With the changes in #48110 there is no more need
to block a generic thread when waiting for the multi file transfer
in `CcrRepository`.
  • Loading branch information
original-brownbear authored Nov 1, 2019
1 parent 6c290ec commit e26d01e
Showing 1 changed file with 93 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -83,6 +84,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -301,11 +303,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
ActionListener<Void> listener) {
// TODO: Instead of blocking in the restore logic and synchronously completing the listener we should just make below logic async
ActionListener.completeWith(listener, () -> {
final ShardId shardId = store.shardId();
final LinkedList<Closeable> toClose = new LinkedList<>();
final ActionListener<Void> restoreListener = ActionListener.runBefore(ActionListener.delegateResponse(listener,
(l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))),
() -> IOUtils.close(toClose));
try {
// TODO: Add timeouts to network calls / the restore process.
createEmptyStore(store);
ShardId shardId = store.shardId();

final Map<String, String> ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
Expand Down Expand Up @@ -348,21 +353,23 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh
},
CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(store.indexSettings().getNodeSettings()),
Ccr.CCR_THREAD_POOL_NAME);

toClose.add(() -> {
logger.trace(
"{} canceling background renewal of retention lease [{}] at the end of restore", shardId, retentionLeaseId);
renewable.cancel();
});
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
// response, we should be able to retry by creating a new session.
try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) {
restoreSession.restoreFiles(store);
final RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState);
toClose.addFirst(restoreSession); // Some tests depend on closing session before cancelling retention lease renewal
restoreSession.restoreFiles(store, ActionListener.wrap(v -> {
logger.trace("[{}] completed CCR restore", shardId);
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex());
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
} finally {
logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId,
retentionLeaseId);
renewable.cancel();
}
return null;
});
restoreListener.onResponse(null);
}, restoreListener::onFailure));
} catch (Exception e) {
restoreListener.onFailure(e);
}
}

private void createEmptyStore(Store store) {
Expand Down Expand Up @@ -471,98 +478,90 @@ private static class RestoreSession extends FileRestoreContext implements Closea
this.throttleListener = throttleListener;
}

void restoreFiles(Store store) {
void restoreFiles(Store store, ActionListener<Void> listener) {
ArrayList<FileInfo> fileInfos = new ArrayList<>();
for (StoreFileMetaData fileMetaData : sourceMetaData) {
ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
}
SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
restore(snapshotFiles, store, future);
future.actionGet();
restore(snapshotFiles, store, listener);
}

@Override
protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
final PlainActionFuture<Void> restoreFilesFuture = new PlainActionFuture<>();
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) {

final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {});
long offset = 0;

@Override
protected void onNewFile(StoreFileMetaData md) {
offset = 0;
}

@Override
protected FileChunk nextChunkRequest(StoreFileMetaData md) {
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
offset += bytesRequested;
return new FileChunk(md, bytesRequested, offset == md.length());
}
protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> allFilesListener) {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
logger, threadPool.getThreadContext(), allFilesListener, ccrSettings.getMaxConcurrentFileChunks(), mds) {

@Override
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(
r -> {
writeFileChunk(request.md, r);
listener.onResponse(null);
}, listener::onFailure), false);

remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
}

private void writeFileChunk(StoreFileMetaData md,
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
multiFileWriter.incRef();
try (Releasable ignored = multiFileWriter::decRef) {
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
} catch (Exception e) {
handleError(md, e);
throw e;
}
}

@Override
protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
}
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
});
long offset = 0;

@Override
protected void onNewFile(StoreFileMetaData md) {
offset = 0;
}

@Override
protected FileChunk nextChunkRequest(StoreFileMetaData md) {
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
offset += bytesRequested;
return new FileChunk(md, bytesRequested, offset == md.length());
}

@Override
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(r -> {
writeFileChunk(request.md, r);
listener.onResponse(null);
}, listener::onFailure), false);

remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
}

private void writeFileChunk(StoreFileMetaData md,
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
multiFileWriter.incRef();
try (Releasable ignored = multiFileWriter::decRef) {
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
} catch (Exception e) {
handleError(md, e);
throw e;
}

@Override
public void close() {
multiFileWriter.close();
}

@Override
protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
}
};
multiFileTransfer.start();
restoreFilesFuture.actionGet();
logger.trace("[{}] completed CCR restore", shardId);
return null;
});
throw e;
}

@Override
public void close() {
multiFileWriter.close();
}
};
multiFileTransfer.start();
}

@Override
Expand Down

0 comments on commit e26d01e

Please sign in to comment.