Skip to content

Commit

Permalink
Add async read support for S3 plugin (#9694)
Browse files Browse the repository at this point in the history
* Add async read support for S3 plugin

Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
(cherry picked from commit 03ddc8a)
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>

* Move functionality to S3BlobContainer

Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>

---------

Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal authored Sep 15, 2023
1 parent d34b352 commit 70a582f
Show file tree
Hide file tree
Showing 5 changed files with 458 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.repositories.s3;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand All @@ -44,10 +46,15 @@
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
Expand All @@ -63,6 +70,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
Expand All @@ -75,11 +83,13 @@
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -212,9 +222,45 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@ExperimentalApi
@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException();
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
final S3AsyncClient s3AsyncClient = amazonS3Reference.get().client();
final String bucketName = blobStore.bucket();

final GetObjectAttributesResponse blobMetadata = getBlobMetadata(s3AsyncClient, bucketName, blobName).get();

final long blobSize = blobMetadata.objectSize();
final int numberOfParts = blobMetadata.objectParts().totalPartsCount();
final String blobChecksum = blobMetadata.checksum().checksumCRC32();

final List<InputStreamContainer> blobPartStreams = new ArrayList<>();
final List<CompletableFuture<InputStreamContainer>> blobPartInputStreamFutures = new ArrayList<>();
// S3 multipart files use 1 to n indexing
for (int partNumber = 1; partNumber <= numberOfParts; partNumber++) {
blobPartInputStreamFutures.add(getBlobPartInputStreamContainer(s3AsyncClient, bucketName, blobName, partNumber));
}

CompletableFuture.allOf(blobPartInputStreamFutures.toArray(CompletableFuture[]::new)).whenComplete((unused, throwable) -> {
if (throwable == null) {
listener.onResponse(
new ReadContext(
blobSize,
blobPartInputStreamFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()),
blobChecksum
)
);
} else {
Exception ex = throwable.getCause() instanceof Exception
? (Exception) throwable.getCause()
: new Exception(throwable.getCause());
listener.onFailure(ex);
}
});
} catch (Exception ex) {
listener.onFailure(SdkException.create("Error occurred while fetching blob parts from the repository", ex));
}
}

public boolean remoteIntegrityCheckSupported() {
Expand Down Expand Up @@ -633,4 +679,65 @@ static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long par
return Tuple.tuple(parts + 1, remaining);
}
}

/**
* Fetches a part of the blob from the S3 bucket and transforms it to an {@link InputStreamContainer}, which holds
* the stream and its related metadata.
* @param s3AsyncClient Async client to be utilized to fetch the object part
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the parts will be fetched
* @param partNumber Part number for the blob to be retrieved
* @return A future of {@link InputStreamContainer} containing the stream and stream metadata.
*/
CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
S3AsyncClient s3AsyncClient,
String bucketName,
String blobName,
int partNumber
) {
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobName)
.partNumber(partNumber);

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(S3BlobContainer::transformResponseToInputStreamContainer)
);
}

/**
* Transforms the stream response object from S3 into an {@link InputStreamContainer}
* @param streamResponse Response stream object from S3
* @return {@link InputStreamContainer} containing the stream and stream metadata
*/
// Package-Private for testing.
static InputStreamContainer transformResponseToInputStreamContainer(ResponseInputStream<GetObjectResponse> streamResponse) {
final GetObjectResponse getObjectResponse = streamResponse.response();
final String contentRange = getObjectResponse.contentRange();
final Long contentLength = getObjectResponse.contentLength();
if (contentRange == null || contentLength == null) {
throw SdkException.builder().message("Failed to fetch required metadata for blob part").build();
}
final Long offset = HttpRangeUtils.getStartOffsetFromRangeHeader(getObjectResponse.contentRange());
return new InputStreamContainer(streamResponse, getObjectResponse.contentLength(), offset);
}

/**
* Retrieves the metadata like checksum, object size and parts for the provided blob within the S3 bucket.
* @param s3AsyncClient Async client to be utilized to fetch the metadata
* @param bucketName Name of the S3 bucket
* @param blobName Identifier of the blob for which the metadata will be fetched
* @return A future containing the metadata within {@link GetObjectAttributesResponse}
*/
CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3AsyncClient, String bucketName, String blobName) {
// Fetch blob metadata - part info, size, checksum
final GetObjectAttributesRequest getObjectAttributesRequest = GetObjectAttributesRequest.builder()
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.build();

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,29 @@

package org.opensearch.repositories.s3.utils;

import software.amazon.awssdk.core.exception.SdkException;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public final class HttpRangeUtils {
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes\\s+(\\d+)-\\d+[/\\d*]+$");

/**
* Parses the content range header string value to calculate the start (offset) of the HTTP response.
* Tests against the RFC9110 specification of content range string.
* Sample values: "bytes 0-10/200", "bytes 0-10/*"
* <a href="https://www.rfc-editor.org/rfc/rfc9110.html#name-content-range">Details here</a>
* @param headerValue Header content range string value from the HTTP response
* @return Start (Offset) value of the HTTP response
*/
public static Long getStartOffsetFromRangeHeader(String headerValue) {
Matcher matcher = RANGE_PATTERN.matcher(headerValue);
if (!matcher.find()) {
throw SdkException.create("Regex match for Content-Range header {" + headerValue + "} failed", new RuntimeException());
}
return Long.parseLong(matcher.group(1));
}

/**
* Provides a byte range string per <a href="https://www.rfc-editor.org/rfc/rfc9110.html#name-byte-ranges">RFC 9110</a>
Expand Down
Loading

0 comments on commit 70a582f

Please sign in to comment.