Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interface changes for async repository downloads #9182

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- New API for supporting async downloads within VerifyingMultiStreamBlobContainer ([#9182](https://github.com/opensearch-project/OpenSearch/pull/9182))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
Expand All @@ -69,11 +72,13 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -211,6 +216,47 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throws IOException ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An "async" method should not be doing any I/O on the calling thread, right? Any IOException should be communicated via the callback.

try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client();
final String bucketName = blobStore.bucket();

// Fetch object part metadata
GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.build();

GetObjectAttributesResponse blobMetadata = s3AsyncClient.getObjectAttributes(getObjectAttributesRequest).join();

final long blobSize = blobMetadata.objectSize();
final int numParts = blobMetadata.objectParts().totalPartsCount();

final List<CompletableFuture<InputStreamContainer>> blobInputStreamsFuture = new ArrayList<>();
final List<InputStreamContainer> blobPartStreams = new ArrayList<>();

for (int partNumber = 0; partNumber < numParts; partNumber++) {
int finalPartNumber = partNumber;
blobInputStreamsFuture.add(
blobStore.getAsyncTransferManager()
.getPartInputStream(s3AsyncClient, bucketName, blobName, partNumber)
// TODO: Error handling
.whenComplete((data, error) -> blobPartStreams.add(finalPartNumber, data))
);
}

CompletableFuture.allOf(blobInputStreamsFuture.toArray(CompletableFuture[]::new)).whenComplete((data, error) -> {
if (error != null) {
listener.onFailure(new IOException(error));
} else {
listener.onResponse(new ReadContext(blobSize, blobPartStreams, null));
}
});
}
}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

package org.opensearch.repositories.s3.async;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand All @@ -20,6 +22,8 @@
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CompletableFutureUtils;
Expand All @@ -31,11 +35,13 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.exception.CorruptFileException;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.io.CheckedContainer;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -353,4 +359,27 @@ private void deleteUploadedObject(S3AsyncClient s3AsyncClient, UploadRequest upl
return null;
});
}

public CompletableFuture<InputStreamContainer> getPartInputStream(
S3AsyncClient s3AsyncClient,
String bucketName,
String blobName,
int partNumber
) {
GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobName)
.partNumber(partNumber);

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(this::transformResponseToInputStreamContainer)
);
}

private InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream<GetObjectResponse> streamResponse) {
GetObjectResponse getObjectResponse = streamResponse.response();
final Tuple<Long, Long> s3ResponseRange = HttpRangeUtils.fromHttpRangeHeader(getObjectResponse.contentRange());
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), s3ResponseRange.v1());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,26 @@

package org.opensearch.repositories.s3.utils;

import software.amazon.awssdk.core.exception.SdkException;

import org.opensearch.common.collect.Tuple;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public final class HttpRangeUtils {

private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$");

// TODO: Find an alternative for parsing the header
public static Tuple<Long, Long> fromHttpRangeHeader(String headerValue) {
Matcher matcher = RANGE_PATTERN.matcher(headerValue);
if (!matcher.find()) {
throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException());
}
return new Tuple<>(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2)));
}

/**
* Provides a byte range string per <a href="https://www.rfc-editor.org/rfc/rfc9110.html#name-byte-ranges">RFC 9110</a>
* @param start start position (inclusive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
Expand All @@ -24,6 +25,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -114,6 +117,27 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp

}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
new Thread(() -> {
try {
long contentLength = listBlobs().get(blobName).length();
long partSize = contentLength / 10;
int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
List<InputStreamContainer> blobPartStreams = new ArrayList<>();
for (int partNumber = 0; partNumber < numberOfParts; partNumber++) {
long offset = partNumber * partSize;
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(blobPartStream);
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
}
}).start();
}

private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;

/**
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand All @@ -31,4 +36,27 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
*/
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;

/**
* Creates an async callback of an {@link ReadContext} for the specified blob within the container.
* An {@link IOException} is thrown if requesting the blob fetch fails.
* @param blobName The name of the blob to get an {@link InputStream} for.
* @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob
*/
void readBlobAsync(String blobName, ActionListener<ReadContext> listener);

default void asyncBlobDownload(
String blobName,
Path segmentFileLocation,
ThreadPool threadPool,
ActionListener<String> segmentCompletionListener
) {
ReadContextListener readContextListener = new ReadContextListener(
blobName,
segmentFileLocation,
threadPool,
segmentCompletionListener
);
readBlobAsync(blobName, readContextListener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read;

import org.opensearch.common.io.InputStreamContainer;

import java.util.List;

/**
* ReadContext is used to encapsulate all data needed by <code>BlobContainer#readBlobAsync</code>
*
* @opensearch.internal
*/
public class ReadContext {
private final long blobSize;
private final List<InputStreamContainer> partStreams;
private final String blobChecksum;

public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String blobChecksum) {
this.blobSize = blobSize;
this.partStreams = partStreams;
this.blobChecksum = blobChecksum;
}

public String getBlobChecksum() {
return blobChecksum;
}

public int getNumberOfParts() {
return partStreams.size();
}

public long getBlobSize() {
return blobSize;
}

public List<InputStreamContainer> getPartStreams() {
return partStreams;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore.stream.read.listener;

import org.opensearch.core.action.ActionListener;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public class FileCompletionListener implements ActionListener<Integer> {
private final int numStreams;
private final String segmentFileName;
private final Set<Integer> completedParts;
private final ActionListener<String> segmentCompletionListener;

public FileCompletionListener(int numStreams, String segmentFileName, ActionListener<String> segmentCompletionListener) {
this.completedParts = Collections.synchronizedSet(new HashSet<>());
this.numStreams = numStreams;
this.segmentFileName = segmentFileName;
this.segmentCompletionListener = segmentCompletionListener;
}

@Override
public void onResponse(Integer partNumber) {
completedParts.add(partNumber);
if (completedParts.size() == numStreams) {
segmentCompletionListener.onResponse(segmentFileName);
}
}

@Override
public void onFailure(Exception e) {
segmentCompletionListener.onFailure(e);
}
}
Loading
Loading