Skip to content

Commit

Permalink
Add interface changes for async repository downloads
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Aug 8, 2023
1 parent 6eb87b5 commit 393c7a8
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Disallow compression level to be set for default and best_compression index codecs ([#8737]()https://github.com/opensearch-project/OpenSearch/pull/8737)
- Prioritize replica shard movement during shard relocation ([#8875](https://github.com/opensearch-project/OpenSearch/pull/8875))
- Introducing Default and Best Compression codecs as their algorithm name ([#9123]()https://github.com/opensearch-project/OpenSearch/pull/9123)
- New API for supporting async downloads within VerifyingMultiStreamBlobContainer ([#](https://github.com/opensearch-project/OpenSearch/pull/))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down Expand Up @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -210,6 +211,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public CompletableFuture<ReadContext> asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException {
throw new UnsupportedOperationException("S3 BlobContainer currently does not support async blob downloads");
}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,19 @@ public void onFailure(Exception e) {}
}
}

public void testAsyncBlobDownload() {
final S3BlobStore blobStore = mock(S3BlobStore.class);
final BlobPath blobPath = mock(BlobPath.class);
final String blobName = "test-blob";

final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.asyncBlobDownload(blobName, false);
});

assertEquals("S3 BlobContainer currently does not support async blob downloads", e.getMessage());
}

public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException {
testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -24,7 +25,12 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -114,6 +120,32 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public CompletableFuture<ReadContext> asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException {
final int numStreams = forceSingleStream ? 1 : 5;
final ExecutorService executorService = Executors.newFixedThreadPool(numStreams);

// Fetch blob metadata
final InputStream blobInputStream = readBlob(blobName);
final long blobSize = blobInputStream.available();
blobInputStream.close();

// Create input streams for the blob
final List<InputStream> blobInputStreams = new ArrayList<>();
long streamSize = (int) Math.ceil(blobSize * 1.0 / numStreams);
for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) {
long start = streamNumber * streamSize;
blobInputStreams.add(readBlob(blobName, start, streamSize));
}

CompletableFuture<ReadContext> readContextFuture = CompletableFuture.supplyAsync(
() -> new ReadContext(blobInputStreams, null, numStreams, blobSize),
executorService
);
executorService.shutdown();
return readContextFuture;
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.common.blobstore;

import org.opensearch.action.ActionListener;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -31,4 +33,17 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates and populates a list of {@link java.io.InputStream} from the blob stored within the repository, returned
* within an async callback using the {@link ReadContext} object.
* Defaults to using multiple streams, when feasible, unless a single stream is forced using @param forceSingleStream.
* An {@link IOException} is thrown if requesting any of the input streams fails, or reading metadata for the
* requested blob fails
* @param blobName Name of the blob to be read using the async mechanism
* @param forceSingleStream Value to denote forced use of a single stream within the returned object
* @return A future of {@link ReadContext} object which serves the input streams and other metadata for the blob
* @throws IOException if any of the input streams could not be requested, or reading metadata for requested blob fails
*/
CompletableFuture<ReadContext> asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read;

import java.io.InputStream;
import java.util.List;

/**
* ReadContext is used to encapsulate all data needed by <code>VerifyingMultiStreamBlobContainer#asyncBlobDownload</code>
*
* @opensearch.experimental
*/
public class ReadContext {
private final List<InputStream> blobInputStreams;
private final String blobChecksum;
private final int numStreams;
private final long blobSize;

public ReadContext(List<InputStream> blobInputStreams, String blobChecksum, int numStreams, long blobSize) {
this.blobInputStreams = blobInputStreams;
this.blobChecksum = blobChecksum;
this.numStreams = numStreams;
this.blobSize = blobSize;
}

public List<InputStream> getBlobInputStreams() {
return blobInputStreams;
}

public String getBlobChecksum() {
return blobChecksum;
}

public int getNumStreams() {
return numStreams;
}

public long getBlobSize() {
return blobSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Abstractions for stream based file reads */
package org.opensearch.common.blobstore.stream.read;

0 comments on commit 393c7a8

Please sign in to comment.