diff --git a/CHANGELOG.md b/CHANGELOG.md index e5ab51e9b162e..04c9428f3795e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Disallow compression level to be set for default and best_compression index codecs ([#8737]()https://github.com/opensearch-project/OpenSearch/pull/8737) - Prioritize replica shard movement during shard relocation ([#8875](https://github.com/opensearch-project/OpenSearch/pull/8875)) - Introducing Default and Best Compression codecs as their algorithm name ([#9123]()https://github.com/opensearch-project/OpenSearch/pull/9123) +- New API for supporting async downloads within VerifyingMultiStreamBlobContainer ([#](https://github.com/opensearch-project/OpenSearch/pull/)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 1a644934245cb..3bf47dd3b7cf9 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -46,6 +46,7 @@ import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.AbstractBlobContainer; @@ -210,6 +211,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } + @Override + public CompletableFuture asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException { + throw new UnsupportedOperationException("S3 BlobContainer currently does not support async blob downloads"); + } + // package private for testing long getLargeBlobThresholdInBytes() { return blobStore.bufferSizeInBytes(); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index f88b3616d2f0a..519c4db0cc901 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -879,6 +879,19 @@ public void onFailure(Exception e) {} } } + public void testAsyncBlobDownload() { + final S3BlobStore blobStore = mock(S3BlobStore.class); + final BlobPath blobPath = mock(BlobPath.class); + final String blobName = "test-blob"; + + final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> { + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + blobContainer.asyncBlobDownload(blobName, false); + }); + + assertEquals("S3 BlobContainer currently does not support async blob downloads", e.getMessage()); + } + public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException { testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java index 8f2814eb7c4c4..1b969fbda213c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.BlobPath; @@ -24,7 +25,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -114,6 +120,32 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } + @Override + public CompletableFuture asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException { + final int numStreams = forceSingleStream ? 1 : 5; + final ExecutorService executorService = Executors.newFixedThreadPool(numStreams); + + // Fetch blob metadata + final InputStream blobInputStream = readBlob(blobName); + final long blobSize = blobInputStream.available(); + blobInputStream.close(); + + // Create input streams for the blob + final List blobInputStreams = new ArrayList<>(); + long streamSize = (int) Math.ceil(blobSize * 1.0 / numStreams); + for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) { + long start = streamNumber * streamSize; + blobInputStreams.add(readBlob(blobName, start, streamSize)); + } + + CompletableFuture readContextFuture = CompletableFuture.supplyAsync( + () -> new ReadContext(blobInputStreams, null, numStreams, blobSize), + executorService + ); + executorService.shutdown(); + return readContextFuture; + } + private boolean isSegmentFile(String filename) { return !filename.endsWith(".tlog") && !filename.endsWith(".ckp"); } diff --git a/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java index 0dfcc5c50e4b1..22d48b51dd884 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java @@ -9,9 +9,11 @@ package org.opensearch.common.blobstore; import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow @@ -31,4 +33,17 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer { * @throws IOException if any of the input streams could not be read, or the target blob could not be written to */ void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException; + + /** + * Creates and populates a list of {@link java.io.InputStream} from the blob stored within the repository, returned + * within an async callback using the {@link ReadContext} object. + * Defaults to using multiple streams, when feasible, unless a single stream is forced using @param forceSingleStream. + * An {@link IOException} is thrown if requesting any of the input streams fails, or reading metadata for the + * requested blob fails + * @param blobName Name of the blob to be read using the async mechanism + * @param forceSingleStream Value to denote forced use of a single stream within the returned object + * @return A future of {@link ReadContext} object which serves the input streams and other metadata for the blob + * @throws IOException if any of the input streams could not be requested, or reading metadata for requested blob fails + */ + CompletableFuture asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException; } diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java new file mode 100644 index 0000000000000..116014411b6c6 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.read; + +import java.io.InputStream; +import java.util.List; + +/** + * ReadContext is used to encapsulate all data needed by VerifyingMultiStreamBlobContainer#asyncBlobDownload + * + * @opensearch.experimental + */ +public class ReadContext { + private final List blobInputStreams; + private final String blobChecksum; + private final int numStreams; + private final long blobSize; + + public ReadContext(List blobInputStreams, String blobChecksum, int numStreams, long blobSize) { + this.blobInputStreams = blobInputStreams; + this.blobChecksum = blobChecksum; + this.numStreams = numStreams; + this.blobSize = blobSize; + } + + public List getBlobInputStreams() { + return blobInputStreams; + } + + public String getBlobChecksum() { + return blobChecksum; + } + + public int getNumStreams() { + return numStreams; + } + + public long getBlobSize() { + return blobSize; + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java new file mode 100644 index 0000000000000..a12b0b09fae62 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/read/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Abstractions for stream based file reads */ +package org.opensearch.common.blobstore.stream.read;