Skip to content

Commit

Permalink
Add interface changes for async repository downloads
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Aug 18, 2023
1 parent 8e95a82 commit b505545
Show file tree
Hide file tree
Showing 11 changed files with 394 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Make SearchTemplateRequest implement IndicesRequest.Replaceable ([#9122]()https://github.com/opensearch-project/OpenSearch/pull/9122)
- [BWC and API enforcement] Define the initial set of annotations, their meaning and relations between them ([#9223](https://github.com/opensearch-project/OpenSearch/pull/9223))
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))
- New API for supporting async downloads within VerifyingMultiStreamBlobContainer ([#9182](https://github.com/opensearch-project/OpenSearch/pull/9182))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) {
throw new UnsupportedOperationException("S3 BlobContainer currently does not support async blob downloads");
}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;

import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -881,6 +882,19 @@ public void onFailure(Exception e) {}
}
}

public void testAsyncBlobDownload() {
final S3BlobStore blobStore = mock(S3BlobStore.class);
final BlobPath blobPath = mock(BlobPath.class);
final String blobName = "test-blob";

final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.readBlobAsync(blobName, 0, 0, new PlainActionFuture<>());
});

assertEquals("S3 BlobContainer currently does not support async blob downloads", e.getMessage());
}

public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException {
testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) {
new Thread(() -> {
try {
listener.onResponse(readBlob(blobName, position, length));
} catch (Exception e) {
listener.onFailure(e);
}
}).start();
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.blobstore.stream.listener.FileCompletionListener;
import org.opensearch.common.blobstore.stream.listener.StreamCompletionListener;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -31,4 +40,72 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates an async callback of an {@link java.io.InputStream} for the specified blob within the container.
* An {@link IOException} is thrown if requesting the input stream fails.
* @param blobName The name of the blob to get an {@link InputStream} for.
* @param position The position in the blob where the next byte will be read.
* @param length An indication of the number of bytes to be read.
* @param listener Async listener for {@link InputStream} object which serves the input streams and other metadata for the blob
*/
void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener);

/**
* Fetches the checksum for the blob stored within the repository implementation
* @param blobName The name of the blob for which repository checksum is needed
* @return checksum string of the blob
* @throws IOException in case of any issues with the blob
*/
default String blobChecksum(String blobName) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Downloads the blob to the given path location using an async mechanism
* @param blobName The name of the blob which is to be fetched from the container
* @param segmentFileLocation The location where the blob needs to be downloade
* @param segmentCompletionListener Async listener for notification of completion or failure
*/
default void asyncBlobDownload(String blobName, Path segmentFileLocation, ActionListener<String> segmentCompletionListener) {
try {
final long segmentSize = listBlobs().get(blobName).length();
final long optimalStreamSize = readBlobPreferredLength();
final int numStreams = (int) Math.ceil(segmentSize * 1.0 / optimalStreamSize);

final AtomicBoolean anyStreamFailed = new AtomicBoolean();
final List<String> partFileNames = Collections.synchronizedList(new ArrayList<>());

final String segmentFileName = segmentFileLocation.getFileName().toString();
final Path segmentDirectory = segmentFileLocation.getParent();

final FileCompletionListener fileCompletionListener = new FileCompletionListener(
numStreams,
segmentFileName,
segmentDirectory,
partFileNames,
anyStreamFailed,
segmentCompletionListener
);

for (int streamNumber = 0; streamNumber < numStreams; streamNumber++) {
String partFileName = UUID.randomUUID().toString();
long start = streamNumber * optimalStreamSize;
long end = Math.min(segmentSize, ((streamNumber + 1) * optimalStreamSize));
long length = end - start;
partFileNames.add(partFileName);

final StreamCompletionListener streamCompletionListener = new StreamCompletionListener(
partFileName,
segmentDirectory,
anyStreamFailed,
fileCompletionListener
);
readBlobAsync(blobName, start, length, streamCompletionListener);
}
} catch (Exception e) {
segmentCompletionListener.onFailure(e);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.listener;

import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FileCompletionListener implements ActionListener<String> {
private final int numStreams;
private final String segmentFileName;
private final Path segmentDirectory;
private final List<String> toDownloadPartFileNames;
private final AtomicInteger downloadedPartFiles;
private final AtomicBoolean anyStreamFailed;
private final ActionListener<String> segmentCompletionListener;

public FileCompletionListener(
int numStreams,
String segmentFileName,
Path segmentDirectory,
List<String> toDownloadPartFileNames,
AtomicBoolean anyStreamFailed,
ActionListener<String> segmentCompletionListener
) {
this.downloadedPartFiles = new AtomicInteger();
this.numStreams = numStreams;
this.segmentFileName = segmentFileName;
this.segmentDirectory = segmentDirectory;
this.anyStreamFailed = anyStreamFailed;
this.toDownloadPartFileNames = toDownloadPartFileNames;
this.segmentCompletionListener = segmentCompletionListener;
}

@Override
public void onResponse(String streamFileName) {
if (!anyStreamFailed.get() && downloadedPartFiles.incrementAndGet() == numStreams) {
createCompleteSegmentFile();
performChecksum();
segmentCompletionListener.onResponse(segmentFileName);
}
}

@Override
public void onFailure(Exception e) {
try (Stream<Path> segmentDirectoryStream = Files.list(segmentDirectory)) {
Set<String> tempFilesInDirectory = segmentDirectoryStream.filter(path -> !Files.isDirectory(path))
.map(path -> path.getFileName().toString())
.collect(Collectors.toSet());

if (tempFilesInDirectory.contains(segmentFileName)) {
Files.delete(segmentDirectory.resolve(segmentFileName));
}

tempFilesInDirectory.retainAll(toDownloadPartFileNames);
for (String tempFile : tempFilesInDirectory) {
Files.delete(segmentDirectory.resolve(tempFile));
}

} catch (IOException ex) {
// Die silently?
}

if (!anyStreamFailed.get()) {
segmentCompletionListener.onFailure(e);
anyStreamFailed.compareAndSet(false, true);
}
}

private void performChecksum() {
// TODO: Add checksum logic
}

private void createCompleteSegmentFile() {
try {
Path segmentFilePath = segmentDirectory.resolve(segmentFileName);
try (OutputStream segmentFile = Files.newOutputStream(segmentFilePath)) {
for (String partFileName : toDownloadPartFileNames) {
Path partFilePath = segmentDirectory.resolve(partFileName);
try (InputStream partFile = Files.newInputStream(partFilePath)) {
partFile.transferTo(segmentFile);
}
Files.delete(partFilePath);
}
}
} catch (IOException e) {
onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.listener;

import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;

public class StreamCompletionListener implements ActionListener<InputStream> {
private final String partFileName;
private final Path segmentDirectory;
private final AtomicBoolean anyStreamFailed;
private final ActionListener<String> fileCompletionListener;

public StreamCompletionListener(
String partFileName,
Path segmentDirectory,
AtomicBoolean anyStreamFailed,
ActionListener<String> fileCompletionListener
) {
this.partFileName = partFileName;
this.segmentDirectory = segmentDirectory;
this.anyStreamFailed = anyStreamFailed;
this.fileCompletionListener = fileCompletionListener;
}

@Override
public void onResponse(InputStream inputStream) {
try (inputStream) {
// Do not write new segments if any stream for this file has already failed
if (!anyStreamFailed.get()) {
Path partFilePath = segmentDirectory.resolve(partFileName);
try (OutputStream outputStream = Files.newOutputStream(partFilePath)) {
inputStream.transferTo(outputStream);
}
fileCompletionListener.onResponse(partFileName);
}
} catch (IOException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
fileCompletionListener.onFailure(e);
}
}
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ public synchronized IndexShard createShard(
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
// TODO: Make it a part of the constructor
remoteStore.setShardPath(path);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Expand All @@ -487,6 +489,8 @@ public synchronized IndexShard createShard(
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
// TODO: Make it a part of the constructor
store.setShardPath(path);
eventListener.onStoreCreated(shardId);
indexShard = new IndexShard(
routing,
Expand Down
Loading

0 comments on commit b505545

Please sign in to comment.