Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send file chunks asynchronously in peer recovery #39769

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,20 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -63,17 +65,20 @@
import org.elasticsearch.transport.RemoteTransportException;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -102,6 +107,7 @@ public class RecoverySourceHandler {
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
private final int maxConcurrentFileChunks;
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final CancellableThreads cancellableThreads = new CancellableThreads();

public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request,
Expand All @@ -124,7 +130,6 @@ public StartRecoveryRequest getRequest() {
* performs the recovery from the local engine to the target
*/
public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final List<Closeable> resources = new CopyOnWriteArrayList<>();
final Closeable releaseResources = () -> IOUtils.close(resources);
final ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
try {
Expand Down Expand Up @@ -411,7 +416,11 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
final PlainActionFuture<Void> sendFileChunksFuture = new PlainActionFuture<>();
final StoreFileMetaData[] filesSortedByLength = phase1Files.toArray(new StoreFileMetaData[0]);
ArrayUtil.timSort(filesSortedByLength, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
sendFiles(store, filesSortedByLength, translogOps, sendFileChunksFuture);
cancellableThreads.execute(sendFileChunksFuture::actionGet);
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
Expand Down Expand Up @@ -680,72 +689,181 @@ public String toString() {
'}';
}

void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
final byte[] buffer = new byte[chunkSizeInBytes];
for (final StoreFileMetaData md : files) {
if (error.get() != null) {
break;
}
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
InputStream in = new InputStreamIndexInput(indexInput, md.length())) {
long position = 0;
int bytesRead;
while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) {
final BytesArray content = new BytesArray(buffer, 0, bytesRead);
final boolean lastChunk = position + content.length() == md.length();
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks));
cancellableThreads.checkForCancel();
if (error.get() != null) {
void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps, ActionListener<Void> listener) {
final MultiFileSender multiFileSender = new MultiFileSender(store, files, translogOps);
resources.add(multiFileSender); // need to register to the resource list so we can clean up if the recovery gets cancelled.
final ActionListener<Void> wrappedListener = ActionListener.wrap(
r -> {
multiFileSender.close();
listener.onResponse(null);
},
e -> {
IOUtils.closeWhileHandlingException(multiFileSender);
listener.onFailure(e);
});
multiFileSender.sendFileChunks(ActionListener.notifyOnce(wrappedListener));
}

private final class MultiFileSender implements Closeable {
private final Store store;
private final List<StoreFileMetaData> files;
private final Supplier<Integer> translogOps;
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
private final Deque<byte[]> recycledBuffers;
private boolean closed; // ensure we don't reopen files if the recovery is cancelled
private StoreFileMetaData md;
private InputStream currentInput;
private int position;
private final Semaphore semaphore = new Semaphore(1);

MultiFileSender(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) {
this.store = store;
this.files = new ArrayList<>(Arrays.asList(files));
this.translogOps = translogOps;
this.recycledBuffers = ConcurrentCollections.newDeque();
}

/**
* File chunks are read sequentially by at most one thread. Other threads which are triggered by file-chunk responses
* will abort without waiting if another thread is reading files already. This is controlled via {@code semaphore}.
*
* This implementation can send up to {@code maxConcurrentFileChunks} consecutive file-chunk requests without waiting
* for the replies from the recovery target to reduce the recovery time in secure/compressed/high latency communication.
* We assign a seqId to every file-chunk request and stop reading/sending file chunks if the gap between max_seq_no
* and local_checkpoint is greater than {@code maxConcurrentFileChunks}.
*/
void sendFileChunks(ActionListener<Void> listener) {
while (true) {
cancellableThreads.checkForCancel();
synchronized (this) {
if (semaphore.tryAcquire() == false) {
break;
}
final long requestFilePosition = position;
cancellableThreads.executeIO(() ->
recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(),
// don't send more, the number of unreplied chunks is already greater than maxConcurrentFileChunks,
if (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getCheckpoint() >= maxConcurrentFileChunks) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
semaphore.release();
break;
}
}
try {
final byte[] reusedBuffer = recycledBuffers.pollFirst();
final byte[] buffer = reusedBuffer != null ? reusedBuffer : new byte[chunkSizeInBytes];
final FileChunk chunk = readChunk(buffer);
semaphore.release(); // other thread can read and send chunks
if (chunk == null) {
if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getCheckpoint()) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
listener.onResponse(null);
}
break;
}
cancellableThreads.execute(() ->
recoveryTarget.writeFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk, translogOps.get(),
ActionListener.wrap(
r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId),
e -> {
error.compareAndSet(null, Tuple.tuple(md, e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
r -> {
recycledBuffers.addFirst(buffer);
requestSeqIdTracker.markSeqNoAsCompleted(chunk.seqId);
sendFileChunks(listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make sure we always fork here somehow? I am a bit worried that we are ending up with a stack overflow? Like we can assert that we don't have sendFileChunks in the stacktrace for instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #39988 for this.

},
e -> listener.onFailure(handleErrorOnSendFiles(chunk.md, e))
)));
position += content.length();
} catch (Exception e) {
listener.onFailure(e);
return;
}
}
}

FileChunk readChunk(final byte[] buffer) throws Exception {
try {
synchronized (this) {
if (closed) {
throw new IllegalStateException("chunk reader was closed");
}
if (currentInput == null) {
if (files.isEmpty()) {
return null;
}
md = files.remove(0);
position = 0;
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
currentInput = new InputStreamIndexInput(indexInput, md.length()) {
@Override
public void close() throws IOException {
indexInput.close(); //InputStreamIndexInput's close is noop
}
};
}
}
final int bytesRead = currentInput.read(buffer);
if (bytesRead == -1) {
throw new EOFException("position [" + position + "] md [" + md + "]");
}
final boolean lastChunk;
final long chunkPosition;
synchronized (this) {
chunkPosition = this.position;
this.position += bytesRead;
lastChunk = this.position == md.length();
if (lastChunk) {
IOUtils.close(currentInput, () -> currentInput = null);
}
}
final long seqId = requestSeqIdTracker.generateSeqNo();
return new FileChunk(seqId, md, new BytesArray(buffer, 0, bytesRead), chunkPosition, lastChunk);
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(md, e));
break;
throw handleErrorOnSendFiles(md, e);
}
}
// When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway.
// This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error.
if (error.get() == null) {
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));

@Override
public synchronized void close() throws IOException {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
if (closed == false) {
closed = true;
IOUtils.close(currentInput);
}
}
if (error.get() != null) {
handleErrorOnSendFiles(store, error.get().v1(), error.get().v2());

Exception handleErrorOnSendFiles(StoreFileMetaData md, Exception e) {
try {
final IOException corruptIndexException;
if (md != null && (corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
try {
failEngine(corruptIndexException);
} catch (Exception inner) {
corruptIndexException.addSuppressed(inner);
}
return corruptIndexException;
} else { // corruption has happened on the way to replica
final RemoteTransportException remoteTransportException =
new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
remoteTransportException.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
shardId, request.targetNode(), md), corruptIndexException);
return remoteTransportException;
}
}
} catch (Exception inner) {
e.addSuppressed(inner);
}
return e;
}
}

private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
failEngine(corruptIndexException);
throw corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException(
"File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
shardId, request.targetNode(), md), corruptIndexException);
throw exception;
}
} else {
throw e;
private static final class FileChunk {
final long seqId;
final StoreFileMetaData md;
final BytesReference content;
final long position;
final boolean lastChunk;

FileChunk(long seqId, StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
this.seqId = seqId;
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>(
ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE));
ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

}
Loading