Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send file chunks asynchronously in peer recovery #39769

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.store.StoreFileMetaData;

final class FileChunk {
final StoreFileMetaData md;
final BytesReference content;
final long position;
final boolean lastChunk;

FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,6 @@ public void renameAllTempFiles() throws IOException {
store.renameTempFilesSafe(tempFileNames);
}

static final class FileChunk {
final StoreFileMetaData md;
final BytesReference content;
final long position;
final boolean lastChunk;
FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}
}

private final class FileChunkWriter {
// chunks can be delivered out of order, we need to buffer chunks if there's a gap between them.
final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,24 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -62,18 +64,21 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -102,6 +107,7 @@ public class RecoverySourceHandler {
private final int chunkSizeInBytes;
private final RecoveryTargetHandler recoveryTarget;
private final int maxConcurrentFileChunks;
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final CancellableThreads cancellableThreads = new CancellableThreads();

public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request,
Expand All @@ -124,7 +130,6 @@ public StartRecoveryRequest getRequest() {
* performs the recovery from the local engine to the target
*/
public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final List<Closeable> resources = new CopyOnWriteArrayList<>();
final Closeable releaseResources = () -> IOUtils.close(resources);
final ActionListener<RecoveryResponse> wrappedListener = ActionListener.notifyOnce(listener);
try {
Expand Down Expand Up @@ -411,7 +416,11 @@ public SendFileResult phase1(final IndexCommit snapshot, final Supplier<Integer>
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(
phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.get()));
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps);
final PlainActionFuture<Void> sendFileChunksFuture = new PlainActionFuture<>();
final StoreFileMetaData[] filesSortedByLength = phase1Files.toArray(new StoreFileMetaData[0]);
ArrayUtil.timSort(filesSortedByLength, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
sendFiles(store, filesSortedByLength, translogOps, sendFileChunksFuture);
cancellableThreads.execute(sendFileChunksFuture::actionGet);
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
Expand Down Expand Up @@ -680,72 +689,157 @@ public String toString() {
'}';
}

void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) throws Exception {
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
final byte[] buffer = new byte[chunkSizeInBytes];
for (final StoreFileMetaData md : files) {
if (error.get() != null) {
break;
void sendFiles(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps, ActionListener<Void> listener) {
final MultiFileSender multiFileSender = new MultiFileSender(store, files, translogOps);
resources.add(multiFileSender); // need to register to the resource list so we can clean up if the recovery gets cancelled.
final ActionListener<Void> wrappedListener = ActionListener.wrap(
r -> {
if (Assertions.ENABLED) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
long expectedChunks = Arrays.stream(files).mapToLong(f -> (f.length() + chunkSizeInBytes - 1) / chunkSizeInBytes).sum();
long sentChunks = multiFileSender.requestSeqIdTracker.getCheckpoint() + 1;
assert sentChunks == expectedChunks : "sent chunks=" + sentChunks + " != expected chunks=" + expectedChunks;
}
multiFileSender.close();
listener.onResponse(null);
},
e -> {
IOUtils.closeWhileHandlingException(multiFileSender);
listener.onFailure(e);
});
multiFileSender.sendFileChunks(ActionListener.notifyOnce(wrappedListener));
}

private final class MultiFileSender implements Closeable {
private final Store store;
private final List<StoreFileMetaData> files;
private final Supplier<Integer> translogOps;
private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
private final Deque<byte[]> recycledBuffers;
private boolean readerClosed; // ensure we don't reopen reader after the recovery was cancelled
private StoreFileMetaData md;
private InputStream currentInput;
private int position;
private final Object turnMutex = new Object();
private final AtomicBoolean turnAcquired = new AtomicBoolean();

MultiFileSender(Store store, StoreFileMetaData[] files, Supplier<Integer> translogOps) {
this.store = store;
this.files = new ArrayList<>(Arrays.asList(files));
this.translogOps = translogOps;
this.recycledBuffers = ConcurrentCollections.newDeque();
for (int i = 0; i < maxConcurrentFileChunks; i++) {
this.recycledBuffers.add(new byte[chunkSizeInBytes]);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
InputStream in = new InputStreamIndexInput(indexInput, md.length())) {
long position = 0;
int bytesRead;
while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) {
final BytesArray content = new BytesArray(buffer, 0, bytesRead);
final boolean lastChunk = position + content.length() == md.length();
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqId - maxConcurrentFileChunks));
cancellableThreads.checkForCancel();
if (error.get() != null) {
break;
}

void sendFileChunks(ActionListener<Void> listener) {
while (true) {
synchronized (turnMutex) {
if (turnAcquired.compareAndSet(false, true) == false) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
return;
}
final long requestFilePosition = position;
cancellableThreads.executeIO(() ->
recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.get(),
if (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getCheckpoint() >= maxConcurrentFileChunks) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
turnAcquired.set(false);
return;
}
}
try {
final byte[] buffer = recycledBuffers.removeFirst();
final FileChunk chunk = readChunk(buffer);
if (chunk == null) {
if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getCheckpoint()) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
listener.onResponse(null);
}
recycledBuffers.addFirst(buffer);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
turnAcquired.set(false);
return;
}
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
turnAcquired.set(false);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
cancellableThreads.execute(() ->
recoveryTarget.writeFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk, translogOps.get(),
ActionListener.wrap(
r -> requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId),
e -> {
error.compareAndSet(null, Tuple.tuple(md, e));
r -> {
recycledBuffers.addFirst(buffer);
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
sendFileChunks(listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make sure we always fork here somehow? I am a bit worried that we are ending up with a stack overflow? Like we can assert that we don't have sendFileChunks in the stacktrace for instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #39988 for this.

},
e -> listener.onFailure(handleErrorOnSendFiles(chunk.md, e))
)));
position += content.length();
} catch (Exception e) {
listener.onFailure(e);
return;
}
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(md, e));
break;
}
}
// When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway.
// This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error.
if (error.get() == null) {
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo()));

synchronized FileChunk readChunk(final byte[] buffer) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we parallelize the reading on top of a single file and then synchronize all of it. This doesn't make sense to me. I think we should build the model on top of the file and chuck head of time. ie. if we want to read with N threads in parallel then chunk the file up in N pieces and send them all in parallel. That means we must write them in the correct places on the other side as well but blocking on the read side here is not making much sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to have a multiplexer if you want to make use of the parallelism between sending and reading. We need some kind of threadpool and a task queue for that. Once I am done reading a chunk I put it on a queue and read the next chunk. Another worker can then pick it up and send it. If the queue fills up we add more threads until we saturate. Or we do reading and sending in the same thread but notify others that another chunk can be read. But there is so much blocking going on here I feel like we didn't make the right design decisions?

cancellableThreads.checkForCancel();
if (readerClosed) {
throw new IllegalStateException("chunk reader was closed");
}
try {
if (currentInput == null) {
if (files.isEmpty()) {
return null;
}
md = files.remove(0);
position = 0;
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
currentInput = new BufferedInputStream(new InputStreamIndexInput(indexInput, md.length()), chunkSizeInBytes) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void close() throws IOException {
indexInput.close(); //InputStreamIndexInput's close is noop
}
};
}
final int bytesRead = currentInput.read(buffer, 0, buffer.length);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
final boolean lastChunk = position + bytesRead == md.length();
if (bytesRead == -1) {
IOUtils.close(currentInput, () -> currentInput = null);
return readChunk(buffer);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), position, lastChunk);
position += bytesRead;
return chunk;
} catch (Exception e) {
throw handleErrorOnSendFiles(md, e);
}
}
if (error.get() != null) {
handleErrorOnSendFiles(store, error.get().v1(), error.get().v2());

@Override
public synchronized void close() throws IOException {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
if (readerClosed == false) {
readerClosed = true;
IOUtils.close(currentInput);
}
}
}

private void handleErrorOnSendFiles(Store store, StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
failEngine(corruptIndexException);
throw corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException(
"File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
shardId, request.targetNode(), md), corruptIndexException);
throw exception;
Exception handleErrorOnSendFiles(StoreFileMetaData md, Exception e) {
try {
final IOException corruptIndexException;
if (md != null && (corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
try {
failEngine(corruptIndexException);
} catch (Exception inner) {
corruptIndexException.addSuppressed(inner);
}
return corruptIndexException;
} else { // corruption has happened on the way to replica
final RemoteTransportException remoteTransportException =
new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
remoteTransportException.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage("{} Remote file corruption on node {}, recovering {}. local checksum OK",
shardId, request.targetNode(), md), corruptIndexException);
return remoteTransportException;
}
}
} catch (Exception inner) {
e.addSuppressed(inner);
}
} else {
throw e;
return e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesR
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>(
ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE));
ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}

}
Loading