Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
  • Loading branch information
Sandeep Kumawat committed Apr 9, 2024
1 parent ca99c35 commit b528cfa
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,7 @@ public long readBlobPreferredLength() {
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, null);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, null);
}
return null;
});
writeBlobWithMetadata(blobName, inputStream, null, blobSize, failIfAlreadyExists);
}

/**
Expand All @@ -196,7 +188,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
public void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
@Nullable Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
Expand Down Expand Up @@ -611,7 +603,7 @@ void executeSingleUpload(
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));

if (metadata != null) {
if (metadata != null && !metadata.isEmpty()) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
Expand Down Expand Up @@ -668,7 +660,7 @@ void executeMultipartUpload(
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (metadata != null) {
if (metadata != null && !metadata.isEmpty()) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ boolean isAborted() {
return isStreamAborted.get();
}

public Map<String, String> getMetadata() {
Map<String, String> getMetadata() {
return this.metadata;

Check warning on line 273 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RetryingInputStream.java#L273

Added line #L273 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,12 @@ private void uploadInParts(

CreateMultipartUploadRequest.Builder createMultipartUploadRequestBuilder = CreateMultipartUploadRequest.builder()
.bucket(uploadRequest.getBucket())
.metadata(uploadRequest.getMetadata())
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));

if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) {
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());

Check warning on line 136 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java#L136

Added line #L136 was not covered by tests
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
Expand Down Expand Up @@ -325,10 +328,13 @@ private void uploadInOneChunk(
) {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(uploadRequest.getBucket())
.metadata(uploadRequest.getMetadata())
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));

if (uploadRequest.getMetadata() != null && !uploadRequest.getMetadata().isEmpty()) {
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());

Check warning on line 336 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java#L336

Added line #L336 was not covered by tests
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.repositories.s3.async;

import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
Expand Down Expand Up @@ -49,7 +50,7 @@ public UploadRequest(
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum,
boolean uploadRetryEnabled,
Map<String, String> metadata
@Nullable Map<String, String> metadata
) {
this.bucket = bucket;
this.key = key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.action.ActionListener;

Expand Down Expand Up @@ -166,7 +167,7 @@ default long readBlobPreferredLength() {
default void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
@Nullable Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
Expand Down Expand Up @@ -219,7 +220,7 @@ default void writeBlobWithMetadata(
default void writeBlobAtomicWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
@Nullable Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private WriteContext(
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
@Nullable Long expectedChecksum,
Map<String, String> metadata
@Nullable Map<String, String> metadata
) {
this.fileName = fileName;
this.streamContextSupplier = streamContextSupplier;
Expand Down

0 comments on commit b528cfa

Please sign in to comment.