Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async blob read and download support using multiple streams #9592

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -211,6 +212,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException();
}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;

import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -881,6 +882,17 @@ public void onFailure(Exception e) {}
}
}

public void testAsyncBlobDownload() {
final S3BlobStore blobStore = mock(S3BlobStore.class);
final BlobPath blobPath = mock(BlobPath.class);
final String blobName = "test-blob";

final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.readBlobAsync(blobName, new PlainActionFuture<>());
});
}

public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException {
testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
Expand All @@ -24,6 +25,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -114,6 +117,27 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
new Thread(() -> {
try {
long contentLength = listBlobs().get(blobName).length();
long partSize = contentLength / 10;
int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
List<InputStreamContainer> blobPartStreams = new ArrayList<>();
for (int partNumber = 0; partNumber < numberOfParts; partNumber++) {
long offset = partNumber * partSize;
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(blobPartStream);
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
}
}).start();
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -31,4 +36,25 @@
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container.
* @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched.
* @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob
*/
@ExperimentalApi
void readBlobAsync(String blobName, ActionListener<ReadContext> listener);

/**
* Asynchronously downloads the blob to the specified location using an executor from the thread pool.
* @param blobName The name of the blob for which needs to be downloaded.
* @param fileLocation The path on local disk where the blob needs to be downloaded.
* @param threadPool The threadpool instance which will provide the executor for performing a multipart download.
* @param completionListener Listener which will be notified when the download is complete.
*/
@ExperimentalApi
default void asyncBlobDownload(String blobName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
ReadContextListener readContextListener = new ReadContextListener(blobName, fileLocation, threadPool, completionListener);
readBlobAsync(blobName, readContextListener);
}

Check warning on line 59 in server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java#L57-L59

Added lines #L57 - L59 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.io.InputStreamContainer;

import java.util.List;

/**
* ReadContext is used to encapsulate all data needed by <code>BlobContainer#readBlobAsync</code>
*/
@ExperimentalApi
public class ReadContext {
private final long blobSize;
private final List<InputStreamContainer> partStreams;
private final String blobChecksum;

public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String blobChecksum) {
this.blobSize = blobSize;
this.partStreams = partStreams;
this.blobChecksum = blobChecksum;
}

public String getBlobChecksum() {
return blobChecksum;

Check warning on line 32 in server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java#L32

Added line #L32 was not covered by tests
}

public int getNumberOfParts() {
return partStreams.size();
}

public long getBlobSize() {
return blobSize;

Check warning on line 40 in server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java#L40

Added line #L40 was not covered by tests
}

public List<InputStreamContainer> getPartStreams() {
return partStreams;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read.listener;

import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.action.ActionListener;

import java.util.concurrent.atomic.AtomicInteger;

/**
* FileCompletionListener listens for completion of fetch on all the streams for a file, where
* individual streams are handled using {@link FilePartWriter}. The {@link FilePartWriter}(s)
* hold a reference to the file completion listener to be notified.
*/
@InternalApi
class FileCompletionListener implements ActionListener<Integer> {

private final int numberOfParts;
private final String fileName;
private final AtomicInteger completedPartsCount;
private final ActionListener<String> completionListener;

public FileCompletionListener(int numberOfParts, String fileName, ActionListener<String> completionListener) {
this.completedPartsCount = new AtomicInteger();
this.numberOfParts = numberOfParts;
this.fileName = fileName;
this.completionListener = completionListener;
}

@Override
public void onResponse(Integer unused) {
if (completedPartsCount.incrementAndGet() == numberOfParts) {
completionListener.onResponse(fileName);
}
}

@Override
public void onFailure(Exception e) {
completionListener.onFailure(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read.listener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.io.Channels;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* FilePartWriter transfers the provided stream into the specified file path using a {@link FileChannel}
* instance. It performs offset based writes to the file and notifies the {@link FileCompletionListener} on completion.
*/
@InternalApi
class FilePartWriter implements Runnable {

private final int partNumber;
private final InputStreamContainer blobPartStreamContainer;
private final Path fileLocation;
private final AtomicBoolean anyPartStreamFailed;
private final ActionListener<Integer> fileCompletionListener;
private static final Logger logger = LogManager.getLogger(FilePartWriter.class);

// 8 MB buffer for transfer
private static final int BUFFER_SIZE = 8 * 1024 * 2024;

public FilePartWriter(
int partNumber,
InputStreamContainer blobPartStreamContainer,
Path fileLocation,
AtomicBoolean anyPartStreamFailed,
ActionListener<Integer> fileCompletionListener
) {
this.partNumber = partNumber;
this.blobPartStreamContainer = blobPartStreamContainer;
this.fileLocation = fileLocation;
this.anyPartStreamFailed = anyPartStreamFailed;
this.fileCompletionListener = fileCompletionListener;
}

@Override
public void run() {
// Ensures no writes to the file if any stream fails.
if (anyPartStreamFailed.get() == false) {
try (FileChannel outputFileChannel = FileChannel.open(fileLocation, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
try (InputStream inputStream = blobPartStreamContainer.getInputStream()) {
long streamOffset = blobPartStreamContainer.getOffset();
final byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
Channels.writeToChannel(buffer, 0, bytesRead, outputFileChannel, streamOffset);
streamOffset += bytesRead;
}
Comment on lines +65 to +69
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this isn't doing DIRECT_IO, it might thrash page cache for encrypted data transfers? Can we instead use FileChannel#transferFrom

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transferFrom does not support repositioning the channel beyond the file length. With multi threaded operation and a new file, we do not have the complete file at hand. We might have to resort using individual files and stitching logic in that case.

}
} catch (IOException e) {
processFailure(e);
return;
}
fileCompletionListener.onResponse(partNumber);
}
}

void processFailure(Exception e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the JVM crashes before the clean up gets triggered? Can you please write an IT to ensure that this case doesn't lead to corruption?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out. Should we write to a file with a different name (i.e. add something like a ".multipart" suffix) and rename to the actual filename upon completion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an issue here: #9784

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Bukhtawar and @andrross!

try {
Files.deleteIfExists(fileLocation);
} catch (IOException ex) {

Check warning on line 82 in server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java#L82

Added line #L82 was not covered by tests
// Die silently
logger.info("Failed to delete file {} on stream failure: {}", fileLocation, ex);

Check warning on line 84 in server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/FilePartWriter.java#L84

Added line #L84 was not covered by tests
}
if (anyPartStreamFailed.getAndSet(true) == false) {
fileCompletionListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read.listener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* ReadContextListener orchestrates the async file fetch from the {@link org.opensearch.common.blobstore.BlobContainer}
* using a {@link ReadContext} callback. On response, it spawns off the download using multiple streams which are
* spread across a {@link ThreadPool} executor.
*/
@InternalApi
public class ReadContextListener implements ActionListener<ReadContext> {

private final String fileName;
private final Path fileLocation;
private final ThreadPool threadPool;
private final ActionListener<String> completionListener;
private static final Logger logger = LogManager.getLogger(ReadContextListener.class);

public ReadContextListener(String fileName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
this.fileName = fileName;
this.fileLocation = fileLocation;
this.threadPool = threadPool;
this.completionListener = completionListener;
}

@Override
public void onResponse(ReadContext readContext) {
logger.trace("Streams received for blob {}", fileName);
final int numParts = readContext.getNumberOfParts();
final AtomicBoolean anyPartStreamFailed = new AtomicBoolean();
FileCompletionListener fileCompletionListener = new FileCompletionListener(numParts, fileName, completionListener);

for (int partNumber = 0; partNumber < numParts; partNumber++) {
FilePartWriter filePartWriter = new FilePartWriter(
partNumber,
readContext.getPartStreams().get(partNumber),
fileLocation,
anyPartStreamFailed,
fileCompletionListener
);
threadPool.executor(ThreadPool.Names.GENERIC).submit(filePartWriter);
}
}

@Override
public void onFailure(Exception e) {
completionListener.onFailure(e);
}
}
Loading
Loading