Skip to content

Commit 2f48928

Browse files
committed
Multipart Upload (Implementation)
Signed-off-by: Tanishq Ranjan <tqranjan@amazon.com>
1 parent b723974 commit 2f48928

File tree

1 file changed

+170
-0
lines changed

1 file changed

+170
-0
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
4141
import software.amazon.awssdk.services.s3.model.CommonPrefix;
4242
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
43+
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
4344
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
4445
import software.amazon.awssdk.services.s3.model.CompletedPart;
4546
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -599,6 +600,175 @@ void executeSingleUploadIfEtagMatches(
599600
}
600601
}
601602

603+
public void executeMultipartUploadIfEtagMatches(
604+
final S3BlobStore blobStore,
605+
final String blobName,
606+
final InputStream input,
607+
final long blobSize,
608+
final Map<String, String> metadata,
609+
final String eTag,
610+
final ActionListener<String> etagListener
611+
) throws IOException {
612+
613+
ensureMultiPartUploadSize(blobSize);
614+
615+
final long partSize = blobStore.bufferSizeInBytes();
616+
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
617+
if (multiparts.v1() > Integer.MAX_VALUE) {
618+
throw new IllegalArgumentException("Too many multipart upload parts; consider a larger buffer size.");
619+
}
620+
final int nbParts = multiparts.v1().intValue();
621+
final long lastPartSize = multiparts.v2();
622+
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";
623+
624+
CreateMultipartUploadRequest.Builder createRequestBuilder = CreateMultipartUploadRequest.builder()
625+
.bucket(blobStore.bucket())
626+
.key(blobName)
627+
.storageClass(blobStore.getStorageClass())
628+
.acl(blobStore.getCannedACL())
629+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));
630+
631+
if (metadata != null && !metadata.isEmpty()) {
632+
createRequestBuilder.metadata(metadata);
633+
}
634+
if (blobStore.serverSideEncryption()) {
635+
createRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
636+
}
637+
638+
final CreateMultipartUploadRequest createMultipartUploadRequest = createRequestBuilder.build();
639+
final SetOnce<String> uploadId = new SetOnce<>();
640+
final String bucketName = blobStore.bucket();
641+
boolean success = false;
642+
643+
final InputStream requestInputStream = blobStore.isUploadRetryEnabled()
644+
? new BufferedInputStream(input, (int) (partSize + 1))
645+
: input;
646+
647+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
648+
// Initiate multipart upload
649+
uploadId.set(
650+
SocketAccess.doPrivileged(() -> clientReference.get().createMultipartUpload(createMultipartUploadRequest).uploadId())
651+
);
652+
if (Strings.isEmpty(uploadId.get())) {
653+
IOException exception = new IOException("Failed to initialize multipart upload for " + blobName);
654+
etagListener.onFailure(exception);
655+
throw exception;
656+
}
657+
658+
final List<CompletedPart> parts = new ArrayList<>(nbParts);
659+
long bytesCount = 0;
660+
661+
for (int i = 1; i <= nbParts; i++) {
662+
long currentPartSize = (i < nbParts) ? partSize : lastPartSize;
663+
final UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
664+
.bucket(bucketName)
665+
.key(blobName)
666+
.uploadId(uploadId.get())
667+
.partNumber(i)
668+
.contentLength(currentPartSize)
669+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
670+
.build();
671+
672+
bytesCount += currentPartSize;
673+
674+
final UploadPartResponse uploadResponse = SocketAccess.doPrivileged(
675+
() -> clientReference.get()
676+
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, currentPartSize))
677+
);
678+
679+
// Validate part-level ETag
680+
String partETag = uploadResponse.eTag();
681+
if (partETag == null) {
682+
IOException exception = new IOException(
683+
String.format(Locale.ROOT, "S3 part upload for [%s] part [%d] returned null ETag", blobName, i)
684+
);
685+
etagListener.onFailure(exception);
686+
throw exception;
687+
}
688+
689+
parts.add(CompletedPart.builder().partNumber(i).eTag(partETag).build());
690+
}
691+
692+
if (bytesCount != blobSize) {
693+
IOException exception = new IOException(
694+
String.format(Locale.ROOT, "Multipart upload for [%s] sent %d bytes; expected %d bytes", blobName, bytesCount, blobSize)
695+
);
696+
etagListener.onFailure(exception);
697+
throw exception;
698+
}
699+
700+
// Add ifMatch condition to the complete request
701+
CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
702+
.bucket(bucketName)
703+
.key(blobName)
704+
.uploadId(uploadId.get())
705+
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
706+
.ifMatch(eTag)
707+
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
708+
.build();
709+
710+
CompleteMultipartUploadResponse completeResponse = SocketAccess.doPrivileged(
711+
() -> clientReference.get().completeMultipartUpload(completeRequest)
712+
);
713+
714+
// Validate final ETag
715+
if (completeResponse.eTag() != null) {
716+
success = true;
717+
etagListener.onResponse(completeResponse.eTag());
718+
} else {
719+
IOException exception = new IOException(
720+
"S3 multipart upload for [" + blobName + "] returned null ETag, violating data integrity expectations"
721+
);
722+
etagListener.onFailure(exception);
723+
throw exception;
724+
}
725+
726+
} catch (S3Exception e) {
727+
if (e.statusCode() == 412) {
728+
etagListener.onFailure(new OpenSearchException("stale_primary_shard", e, "Precondition Failed : Etag Mismatch", blobName));
729+
throw new IOException("Unable to upload object [" + blobName + "] due to ETag mismatch", e);
730+
} else {
731+
IOException exception = new IOException(
732+
String.format(Locale.ROOT, "S3 error during multipart upload [%s]: %s", blobName, e.getMessage()),
733+
e
734+
);
735+
etagListener.onFailure(exception);
736+
throw exception;
737+
}
738+
} catch (SdkException e) {
739+
IOException exception = new IOException(String.format(Locale.ROOT, "S3 multipart upload failed for [%s]", blobName), e);
740+
etagListener.onFailure(exception);
741+
throw exception;
742+
} catch (Exception e) {
743+
// Catch-all for unexpected exceptions
744+
IOException exception = new IOException(
745+
String.format(Locale.ROOT, "Unexpected error during multipart upload [%s]: %s", blobName, e.getMessage()),
746+
e
747+
);
748+
etagListener.onFailure(exception);
749+
throw exception;
750+
} finally {
751+
if (!success && Strings.hasLength(uploadId.get())) {
752+
AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
753+
.bucket(bucketName)
754+
.key(blobName)
755+
.uploadId(uploadId.get())
756+
.build();
757+
try (AmazonS3Reference abortClient = blobStore.clientReference()) {
758+
SocketAccess.doPrivilegedVoid(() -> abortClient.get().abortMultipartUpload(abortRequest));
759+
} catch (Exception abortException) {
760+
logger.warn(
761+
"Failed to abort incomplete multipart upload [{}] with ID [{}]. "
762+
+ "This may result in orphaned S3 data and charges.",
763+
blobName,
764+
uploadId.get(),
765+
abortException
766+
);
767+
}
768+
}
769+
}
770+
}
771+
602772
/**
603773
* Uploads a blob using a single upload request
604774
*/

0 commit comments

Comments
 (0)