Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement interface changes for s3 plugin to read/write blob with obj… #13113

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -138,6 +139,13 @@ public boolean blobExists(String blobName) {
}
}

@ExperimentalApi
@Override
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());
}

@Override
public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));
Expand Down Expand Up @@ -169,12 +177,27 @@ public long readBlobPreferredLength() {
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, null);
}

/**
* Write blob with its object metadata.
*/
@ExperimentalApi
@Override
public void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists,
@Nullable Map<String, String> metadata
) throws IOException {
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
});
Expand All @@ -190,7 +213,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
);
try {
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
Expand All @@ -203,7 +227,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
uploadRequest.getContentLength(),
uploadRequest.getMetadata()
);
completionListener.onResponse(null);
} catch (Exception ex) {
Expand Down Expand Up @@ -542,8 +567,13 @@ private String buildKey(String blobName) {
/**
* Uploads a blob using a single upload request
*/
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeSingleUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
Expand All @@ -560,6 +590,10 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));

if (metadata != null && !metadata.isEmpty()) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand All @@ -583,8 +617,13 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeMultipartUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

ensureMultiPartUploadSize(blobSize);
final long partSize = blobStore.bufferSizeInBytes();
Expand All @@ -609,6 +648,10 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (metadata != null && !metadata.isEmpty()) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;
private boolean eof;
private Map<String, String> metadata;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
Expand Down Expand Up @@ -122,6 +124,7 @@ private void openStream() throws IOException {
getObjectResponseInputStream.response().contentLength()
);
this.currentStream = getObjectResponseInputStream;
this.metadata = getObjectResponseInputStream.response().metadata();
this.isStreamAborted.set(false);
} catch (final SdkException e) {
if (e instanceof S3Exception) {
Expand Down Expand Up @@ -265,4 +268,8 @@ boolean isEof() {
boolean isAborted() {
return isStreamAborted.get();
}

Map<String, String> getMetadata() {
return this.metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ private void uploadInParts(
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));

if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
Expand Down Expand Up @@ -327,6 +331,10 @@ private void uploadInOneChunk(
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));

if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) {
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.repositories.s3.async;

import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
import java.util.Map;

/**
* A model encapsulating all details for an upload to S3
Expand All @@ -24,8 +26,8 @@ public class UploadRequest {
private final CheckedConsumer<Boolean, IOException> uploadFinalizer;
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;

private boolean uploadRetryEnabled;
private final Map<String, String> metadata;

/**
* Construct a new UploadRequest object
Expand All @@ -37,6 +39,7 @@ public class UploadRequest {
* @param uploadFinalizer An upload finalizer to call once all parts are uploaded
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
* @param metadata Metadata of the file being uploaded
*/
public UploadRequest(
String bucket,
Expand All @@ -46,7 +49,8 @@ public UploadRequest(
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum,
boolean uploadRetryEnabled
boolean uploadRetryEnabled,
@Nullable Map<String, String> metadata
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -56,6 +60,7 @@ public UploadRequest(
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
this.metadata = metadata;
}

public String getBucket() {
Expand Down Expand Up @@ -89,4 +94,11 @@ public Long getExpectedChecksum() {
public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

/**
* @return metadata of the blob to be uploaded
*/
public Map<String, String> getMetadata() {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
Expand All @@ -75,6 +76,7 @@

import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -659,6 +661,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
.writePriority(WritePriority.HIGH)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.metadata(new HashMap<>())
.build();

s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
Expand All @@ -668,7 +671,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t
} else {
assertNull(exceptionRef.get());
}
verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong());
verify(s3BlobContainer, times(1)).executeMultipartUpload(
any(S3BlobStore.class),
anyString(),
any(InputStream.class),
anyLong(),
anyMap()
);

if (expectException) {
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -240,7 +242,7 @@ public InputStream readBlob(String blobName, long position, long length) throws
};
}

public void testWriteBlobWithRetries() throws Exception {
public void writeBlobWithRetriesHelper(Map<String, String> metadata) throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);

Expand Down Expand Up @@ -280,11 +282,26 @@ public void testWriteBlobWithRetries() throws Exception {

final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
try (InputStream stream = new ByteArrayInputStream(bytes)) {
blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
if (metadata != null) {
blobContainer.writeBlobWithMetadata("write_blob_max_retries", stream, bytes.length, false, metadata);
} else {
blobContainer.writeBlob("write_blob_max_retries", stream, bytes.length, false);
}
}
assertThat(countDown.isCountedDown(), is(true));
}

public void testWriteBlobWithMetadataWithRetries() throws Exception {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, String> metadata = new HashMap<>();
metadata.put("key1", "value1");
metadata.put("key2", "value2");
writeBlobWithRetriesHelper(metadata);
}

public void testWriteBlobWithRetries() throws Exception {
writeBlobWithRetriesHelper(null);
}

public void testWriteBlobByStreamsWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
Expand Down Expand Up @@ -368,7 +385,7 @@ private int calculateNumberOfParts(long contentLength, long partSize) {
return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
}

public void testWriteBlobWithReadTimeouts() {
public void writeBlobWithReadTimeoutsHelper(Map<String, String> metadata) {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null);
Expand All @@ -386,7 +403,11 @@ public void testWriteBlobWithReadTimeouts() {

Exception exception = expectThrows(IOException.class, () -> {
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false);
if (metadata != null) {
blobContainer.writeBlobWithMetadata("write_blob_timeout", stream, bytes.length, false, metadata);
} else {
blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false);
}
}
});
assertThat(
Expand All @@ -401,7 +422,18 @@ public void testWriteBlobWithReadTimeouts() {
assertThat(exception.getCause().getCause().getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
}

public void testWriteLargeBlob() throws Exception {
public void testWriteBlobWithMetadataWithReadTimeouts() throws Exception {
Map<String, String> metadata = new HashMap<>();
metadata.put("key1", "value1");
metadata.put("key2", "value2");
writeBlobWithReadTimeoutsHelper(metadata);
}

public void testWriteBlobWithReadTimeouts() throws Exception {
writeBlobWithReadTimeoutsHelper(null);
}

public void WriteLargeBlobHelper(Map<String, String> metadata) throws Exception {
final boolean useTimeout = rarely();
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
Expand Down Expand Up @@ -487,13 +519,28 @@ public void testWriteLargeBlob() throws Exception {
}
});

blobContainer.writeBlob("write_large_blob", new ZeroInputStream(blobSize), blobSize, false);
if (metadata != null) {
blobContainer.writeBlobWithMetadata("write_large_blob", new ZeroInputStream(blobSize), blobSize, false, metadata);
} else {
blobContainer.writeBlob("write_large_blob", new ZeroInputStream(blobSize), blobSize, false);
}

assertThat(countDownInitiate.isCountedDown(), is(true));
assertThat(countDownUploads.get(), equalTo(0));
assertThat(countDownComplete.isCountedDown(), is(true));
}

public void testWriteLargeBlobWithMetadata() throws Exception {
Map<String, String> metadata = new HashMap<>();
metadata.put("key1", "value1");
metadata.put("key2", "value2");
WriteLargeBlobHelper(metadata);
}

public void testWriteLargeBlob() throws Exception {
WriteLargeBlobHelper(null);
}

/**
* Asserts that an InputStream is fully consumed, or aborted, when it is closed
*/
Expand Down
Loading
Loading