Skip to content

Commit 2cb1244

Browse files
authored
Merge pull request apache#11 from passaro/HADOOP-18073-uploadPart
Upgrade uploadPart to SDK v2
2 parents de01956 + 8889e06 commit 2cb1244

File tree

12 files changed

+164
-218
lines changed

12 files changed

+164
-218
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,9 @@
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.concurrent.atomic.AtomicInteger;
3333

34-
import com.amazonaws.SdkBaseException;
3534
import com.amazonaws.event.ProgressEvent;
3635
import com.amazonaws.event.ProgressEventType;
3736
import com.amazonaws.event.ProgressListener;
38-
import com.amazonaws.services.s3.model.PartETag;
39-
import com.amazonaws.services.s3.model.UploadPartRequest;
4037

4138
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
4239
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
@@ -48,9 +45,13 @@
4845
import org.slf4j.Logger;
4946
import org.slf4j.LoggerFactory;
5047

48+
import software.amazon.awssdk.core.exception.SdkException;
49+
import software.amazon.awssdk.core.sync.RequestBody;
5150
import software.amazon.awssdk.services.s3.model.CompletedPart;
5251
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
5352
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
53+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
54+
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
5455

5556
import org.apache.hadoop.classification.InterfaceAudience;
5657
import org.apache.hadoop.classification.InterfaceStability;
@@ -848,20 +849,21 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
848849
final int currentPartNumber = partETagsFutures.size() + 1;
849850
final UploadPartRequest request;
850851
final S3ADataBlocks.BlockUploadData uploadData;
852+
final RequestBody requestBody;
851853
try {
852854
uploadData = block.startUpload();
853-
request = writeOperationHelper.newUploadPartRequest(
855+
requestBody = uploadData.hasFile()
856+
? RequestBody.fromFile(uploadData.getFile())
857+
: RequestBody.fromInputStream(uploadData.getUploadStream(), size);
858+
859+
request = writeOperationHelper.newUploadPartRequestBuilder(
854860
key,
855861
uploadId,
856862
currentPartNumber,
857-
size,
858-
uploadData.getUploadStream(),
859-
uploadData.getFile(),
860-
0L);
861-
request.setLastPart(isLast);
862-
} catch (SdkBaseException aws) {
863+
size).build();
864+
} catch (SdkException aws) {
863865
// catch and translate
864-
IOException e = translateException("upload", key, aws);
866+
IOException e = translateExceptionV2("upload", key, aws);
865867
// failure to start the upload.
866868
noteUploadFailure(e);
867869
throw e;
@@ -870,10 +872,14 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
870872
noteUploadFailure(e);
871873
throw e;
872874
}
873-
BlockUploadProgress callback =
874-
new BlockUploadProgress(
875-
block, progressListener, now());
876-
request.setGeneralProgressListener(callback);
875+
876+
// TODO: You cannot currently add progress listeners to requests not via the TM.
877+
// See also putObject
878+
// BlockUploadProgress callback =
879+
// new BlockUploadProgress(
880+
// block, progressListener, now());
881+
// request.setGeneralProgressListener(callback);
882+
877883
statistics.blockUploadQueued(block.dataSize());
878884
ListenableFuture<CompletedPart> partETagFuture =
879885
executorService.submit(() -> {
@@ -882,14 +888,15 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
882888
try {
883889
LOG.debug("Uploading part {} for id '{}'",
884890
currentPartNumber, uploadId);
885-
// TODO: This needs to updated during uploadPart work. Remove PartEtag import.
886-
PartETag partETag = writeOperationHelper.uploadPart(request)
887-
.getPartETag();
891+
UploadPartResponse response = writeOperationHelper
892+
.uploadPart(request, requestBody);
888893
LOG.debug("Completed upload of {} to part {}",
889-
block, partETag.getETag());
894+
block, response.eTag());
890895
LOG.debug("Stream statistics of {}", statistics);
891896
partsUploaded++;
892-
return CompletedPart.builder().eTag(partETag.getETag()).partNumber(currentPartNumber)
897+
return CompletedPart.builder()
898+
.eTag(response.eTag())
899+
.partNumber(currentPartNumber)
893900
.build();
894901
} catch (IOException e) {
895902
// save immediately.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,6 @@
5656
import com.amazonaws.services.s3.Headers;
5757
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
5858
import com.amazonaws.services.s3.model.SelectObjectContentResult;
59-
import com.amazonaws.services.s3.model.UploadPartRequest;
60-
import com.amazonaws.services.s3.model.UploadPartResult;
6159
import com.amazonaws.services.s3.transfer.TransferManager;
6260
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
6361

@@ -96,6 +94,8 @@
9694
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
9795
import software.amazon.awssdk.services.s3.model.S3Error;
9896
import software.amazon.awssdk.services.s3.model.StorageClass;
97+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
98+
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
9999
import software.amazon.awssdk.transfer.s3.CompletedCopy;
100100
import software.amazon.awssdk.transfer.s3.CompletedFileUpload;
101101
import software.amazon.awssdk.transfer.s3.Copy;
@@ -3036,23 +3036,24 @@ private long getPutRequestLength(PutObjectRequest putObjectRequest) {
30363036
/**
30373037
* Upload part of a multi-partition file.
30383038
* Increments the write and put counters.
3039-
* <i>Important: this call does not close any input stream in the request.</i>
3039+
* <i>Important: this call does not close any input stream in the body.</i>
30403040
*
30413041
* Retry Policy: none.
3042-
* @param request request
3042+
* @param request the upload part request.
3043+
* @param body the request body.
30433044
* @return the result of the operation.
3044-
* @throws AmazonClientException on problems
3045+
* @throws AwsServiceException on problems
30453046
*/
30463047
@Retries.OnceRaw
3047-
UploadPartResult uploadPart(UploadPartRequest request)
3048-
throws AmazonClientException {
3049-
long len = request.getPartSize();
3048+
UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body)
3049+
throws AwsServiceException {
3050+
long len = request.contentLength();
30503051
incrementPutStartStatistics(len);
30513052
try {
3052-
UploadPartResult uploadPartResult = s3.uploadPart(request);
3053+
UploadPartResponse uploadPartResponse = s3V2.uploadPart(request, body);
30533054
incrementPutCompletedStatistics(true, len);
3054-
return uploadPartResult;
3055-
} catch (AmazonClientException e) {
3055+
return uploadPartResponse;
3056+
} catch (AwsServiceException e) {
30563057
incrementPutCompletedStatistics(false, len);
30573058
throw e;
30583059
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,27 @@
1919
package org.apache.hadoop.fs.s3a;
2020

2121
import javax.annotation.Nullable;
22-
import java.io.File;
2322
import java.io.FileNotFoundException;
2423
import java.io.IOException;
25-
import java.io.InputStream;
2624
import java.util.List;
2725
import java.util.concurrent.atomic.AtomicInteger;
2826

2927
import com.amazonaws.services.s3.model.AmazonS3Exception;
3028
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
3129
import com.amazonaws.services.s3.model.SelectObjectContentResult;
32-
import com.amazonaws.services.s3.model.UploadPartRequest;
33-
import com.amazonaws.services.s3.model.UploadPartResult;
3430
import org.slf4j.Logger;
3531
import org.slf4j.LoggerFactory;
3632

33+
import software.amazon.awssdk.core.sync.RequestBody;
3734
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
3835
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
3936
import software.amazon.awssdk.services.s3.model.CompletedPart;
4037
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
4138
import software.amazon.awssdk.services.s3.model.MultipartUpload;
4239
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4340
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
41+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
42+
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
4443

4544
import org.apache.hadoop.classification.InterfaceAudience;
4645
import org.apache.hadoop.classification.InterfaceStability;
@@ -475,45 +474,31 @@ public void abortMultipartCommit(String destKey, String uploadId)
475474
}
476475

477476
/**
478-
* Create and initialize a part request of a multipart upload.
479-
* Exactly one of: {@code uploadStream} or {@code sourceFile}
480-
* must be specified.
481-
* A subset of the file may be posted, by providing the starting point
482-
* in {@code offset} and a length of block in {@code size} equal to
483-
* or less than the remaining bytes.
477+
* Create and initialize a part request builder of a multipart upload.
484478
* The part number must be less than 10000.
485479
* Retry policy is once-translated; to much effort
486480
* @param destKey destination key of ongoing operation
487481
* @param uploadId ID of ongoing upload
488482
* @param partNumber current part number of the upload
489483
* @param size amount of data
490-
* @param uploadStream source of data to upload
491-
* @param sourceFile optional source file.
492-
* @param offset offset in file to start reading.
493-
* @return the request.
484+
* @return the request builder.
494485
* @throws IllegalArgumentException if the parameters are invalid.
495486
* @throws PathIOException if the part number is out of range.
496487
*/
497488
@Override
498489
@Retries.OnceTranslated
499-
public UploadPartRequest newUploadPartRequest(
490+
public UploadPartRequest.Builder newUploadPartRequestBuilder(
500491
String destKey,
501492
String uploadId,
502493
int partNumber,
503-
int size,
504-
InputStream uploadStream,
505-
File sourceFile,
506-
Long offset) throws IOException {
494+
long size) throws IOException {
507495
return once("upload part request", destKey,
508496
withinAuditSpan(getAuditSpan(), () ->
509-
getRequestFactory().newUploadPartRequest(
497+
getRequestFactory().newUploadPartRequestBuilder(
510498
destKey,
511499
uploadId,
512500
partNumber,
513-
size,
514-
uploadStream,
515-
sourceFile,
516-
offset)));
501+
size)));
517502
}
518503

519504
/**
@@ -623,19 +608,20 @@ public CompleteMultipartUploadResponse commitUpload(
623608

624609
/**
625610
* Upload part of a multi-partition file.
626-
* @param request request
611+
* @param request the upload part request.
612+
* @param body the request body.
627613
* @return the result of the operation.
628614
* @throws IOException on problems
629615
*/
630616
@Retries.RetryTranslated
631-
public UploadPartResult uploadPart(UploadPartRequest request)
617+
public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body)
632618
throws IOException {
633-
return retry("upload part #" + request.getPartNumber()
634-
+ " upload ID " + request.getUploadId(),
635-
request.getKey(),
619+
return retry("upload part #" + request.partNumber()
620+
+ " upload ID " + request.uploadId(),
621+
request.key(),
636622
true,
637623
withinAuditSpan(getAuditSpan(),
638-
() -> owner.uploadPart(request)));
624+
() -> owner.uploadPart(request, body)));
639625
}
640626

641627
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,22 @@
2020

2121
import javax.annotation.Nullable;
2222
import java.io.Closeable;
23-
import java.io.File;
2423
import java.io.FileNotFoundException;
2524
import java.io.IOException;
26-
import java.io.InputStream;
2725
import java.util.List;
2826
import java.util.concurrent.atomic.AtomicInteger;
2927

3028
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
3129
import com.amazonaws.services.s3.model.SelectObjectContentResult;
32-
import com.amazonaws.services.s3.model.UploadPartRequest;
33-
import com.amazonaws.services.s3.model.UploadPartResult;
3430

31+
import software.amazon.awssdk.core.sync.RequestBody;
3532
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
3633
import software.amazon.awssdk.services.s3.model.CompletedPart;
3734
import software.amazon.awssdk.services.s3.model.MultipartUpload;
3835
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3936
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
37+
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
38+
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
4039

4140
import org.apache.hadoop.conf.Configuration;
4241
import org.apache.hadoop.fs.Path;
@@ -191,31 +190,20 @@ void abortMultipartCommit(String destKey, String uploadId)
191190
throws IOException;
192191

193192
/**
194-
* Create and initialize a part request of a multipart upload.
195-
* Exactly one of: {@code uploadStream} or {@code sourceFile}
196-
* must be specified.
197-
* A subset of the file may be posted, by providing the starting point
198-
* in {@code offset} and a length of block in {@code size} equal to
199-
* or less than the remaining bytes.
193+
* Create and initialize a part request builder of a multipart upload.
200194
* @param destKey destination key of ongoing operation
201195
* @param uploadId ID of ongoing upload
202196
* @param partNumber current part number of the upload
203197
* @param size amount of data
204-
* @param uploadStream source of data to upload
205-
* @param sourceFile optional source file.
206-
* @param offset offset in file to start reading.
207-
* @return the request.
198+
* @return the request builder.
208199
* @throws IllegalArgumentException if the parameters are invalid
209200
* @throws PathIOException if the part number is out of range.
210201
*/
211-
UploadPartRequest newUploadPartRequest(
202+
UploadPartRequest.Builder newUploadPartRequestBuilder(
212203
String destKey,
213204
String uploadId,
214205
int partNumber,
215-
int size,
216-
InputStream uploadStream,
217-
File sourceFile,
218-
Long offset) throws IOException;
206+
long size) throws IOException;
219207

220208
/**
221209
* PUT an object directly (i.e. not via the transfer manager).
@@ -281,12 +269,13 @@ CompleteMultipartUploadResponse commitUpload(
281269

282270
/**
283271
* Upload part of a multi-partition file.
284-
* @param request request
272+
* @param request the upload part request.
273+
* @param body the request body.
285274
* @return the result of the operation.
286275
* @throws IOException on problems
287276
*/
288277
@Retries.RetryTranslated
289-
UploadPartResult uploadPart(UploadPartRequest request)
278+
UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body)
290279
throws IOException;
291280

292281
/**

0 commit comments

Comments
 (0)