Skip to content

Commit de01956

Browse files
authored
Merge pull request apache#8 from ahmarsuhail/HADOOP-18073-sdk-upgrade-mpu-operations
Hadoop 18073 sdk upgrade mpu operations
2 parents 6a03f91 + 4c83040 commit de01956

40 files changed

+651
-518
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
/**
22+
* Enum to map AWS SDK V1 Acl values to SDK V2.
23+
*/
24+
public enum AWSCannedACL {
25+
Private("PRIVATE"),
26+
PublicRead("PUBLIC_READ"),
27+
PublicReadWrite("PUBLIC_READ_WRITE"),
28+
AuthenticatedRead("AUTHENTICATED_READ"),
29+
AwsExecRead("AWS_EXEC_READ"),
30+
BucketOwnerRead("BUCKET_OWNER_READ"),
31+
BucketOwnerFullControl("BUCKET_OWNER_FULL_CONTROL");
32+
33+
private final String value;
34+
35+
AWSCannedACL(String value){
36+
this.value = value;
37+
}
38+
39+
public String toString() { return this.value; }
40+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import java.util.List;
22+
23+
import software.amazon.awssdk.services.s3.model.S3Error;
24+
25+
import org.apache.hadoop.classification.InterfaceAudience;
26+
import org.apache.hadoop.classification.InterfaceStability;
27+
28+
/**
29+
* Exception raised in {@link S3AFileSystem#deleteObjects} when
30+
* one or more of the keys could not be deleted.
31+
*/
32+
@InterfaceAudience.Public
33+
@InterfaceStability.Unstable
34+
public class MultiObjectDeleteException extends RuntimeException {
35+
36+
private final List<S3Error> errors;
37+
38+
public MultiObjectDeleteException(List<S3Error> errors) {
39+
super(errors.toString());
40+
this.errors = errors;
41+
}
42+
43+
public List<S3Error> errors() { return errors; }
44+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
import java.util.NoSuchElementException;
2424
import javax.annotation.Nullable;
2525

26-
import com.amazonaws.services.s3.AmazonS3;
27-
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
28-
import com.amazonaws.services.s3.model.MultipartUpload;
29-
import com.amazonaws.services.s3.model.MultipartUploadListing;
3026
import org.slf4j.Logger;
3127
import org.slf4j.LoggerFactory;
3228

29+
import software.amazon.awssdk.services.s3.S3Client;
30+
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
31+
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
32+
import software.amazon.awssdk.services.s3.model.MultipartUpload;
33+
3334
import org.apache.hadoop.fs.RemoteIterator;
3435
import org.apache.hadoop.fs.s3a.api.RequestFactory;
3536
import org.apache.hadoop.fs.s3a.impl.StoreContext;
@@ -43,7 +44,7 @@
4344
* MultipartUtils upload-specific functions for use by S3AFileSystem and Hadoop
4445
* CLI.
4546
* The Audit span active when
46-
* {@link #listMultipartUploads(StoreContext, AmazonS3, String, int)}
47+
* {@link #listMultipartUploads(StoreContext, S3Client, String, int)}
4748
* was invoked is retained for all subsequent operations.
4849
*/
4950
public final class MultipartUtils {
@@ -67,7 +68,7 @@ private MultipartUtils() { }
6768
*/
6869
static MultipartUtils.UploadIterator listMultipartUploads(
6970
final StoreContext storeContext,
70-
AmazonS3 s3,
71+
S3Client s3,
7172
@Nullable String prefix,
7273
int maxKeys)
7374
throws IOException {
@@ -84,14 +85,14 @@ static MultipartUtils.UploadIterator listMultipartUploads(
8485
* at the time the iterator was constructed.
8586
*/
8687
static class ListingIterator implements
87-
RemoteIterator<MultipartUploadListing> {
88+
RemoteIterator<ListMultipartUploadsResponse> {
8889

8990
private final String prefix;
9091

9192
private final RequestFactory requestFactory;
9293

9394
private final int maxKeys;
94-
private final AmazonS3 s3;
95+
private final S3Client s3;
9596
private final Invoker invoker;
9697

9798
private final AuditSpan auditSpan;
@@ -101,7 +102,7 @@ static class ListingIterator implements
101102
/**
102103
* Most recent listing results.
103104
*/
104-
private MultipartUploadListing listing;
105+
private ListMultipartUploadsResponse listing;
105106

106107
/**
107108
* Indicator that this is the first listing.
@@ -114,7 +115,7 @@ static class ListingIterator implements
114115
private int listCount = 0;
115116

116117
ListingIterator(final StoreContext storeContext,
117-
AmazonS3 s3,
118+
S3Client s3,
118119
@Nullable String prefix,
119120
int maxKeys) throws IOException {
120121
this.storeContext = storeContext;
@@ -153,7 +154,7 @@ public boolean hasNext() throws IOException {
153154
*/
154155
@Override
155156
@Retries.RetryTranslated
156-
public MultipartUploadListing next() throws IOException {
157+
public ListMultipartUploadsResponse next() throws IOException {
157158
if (firstListing) {
158159
firstListing = false;
159160
} else {
@@ -171,32 +172,34 @@ public MultipartUploadListing next() throws IOException {
171172
public String toString() {
172173
return "Upload iterator: prefix " + prefix
173174
+ "; list count " + listCount
174-
+ "; upload count " + listing.getMultipartUploads().size()
175+
+ "; upload count " + listing.uploads().size()
175176
+ "; isTruncated=" + listing.isTruncated();
176177
}
177178

178179
@Retries.RetryTranslated
179180
private void requestNextBatch() throws IOException {
180181
try (AuditSpan span = auditSpan.activate()) {
181-
ListMultipartUploadsRequest req = requestFactory
182-
.newListMultipartUploadsRequest(prefix);
182+
ListMultipartUploadsRequest.Builder requestBuilder = requestFactory
183+
.newListMultipartUploadsRequestBuilder(prefix);
183184
if (!firstListing) {
184-
req.setKeyMarker(listing.getNextKeyMarker());
185-
req.setUploadIdMarker(listing.getNextUploadIdMarker());
185+
requestBuilder.keyMarker(listing.nextKeyMarker());
186+
requestBuilder.uploadIdMarker(listing.nextUploadIdMarker());
186187
}
187-
req.setMaxUploads(maxKeys);
188+
requestBuilder.maxUploads(maxKeys);
189+
190+
ListMultipartUploadsRequest request = requestBuilder.build();
188191

189192
LOG.debug("[{}], Requesting next {} uploads prefix {}, " +
190193
"next key {}, next upload id {}", listCount, maxKeys, prefix,
191-
req.getKeyMarker(), req.getUploadIdMarker());
194+
request.keyMarker(), request.uploadIdMarker());
192195
listCount++;
193196

194197
listing = invoker.retry("listMultipartUploads", prefix, true,
195198
trackDurationOfOperation(storeContext.getInstrumentation(),
196199
MULTIPART_UPLOAD_LIST.getSymbol(),
197-
() -> s3.listMultipartUploads(req)));
200+
() -> s3.listMultipartUploads(requestBuilder.build())));
198201
LOG.debug("Listing found {} upload(s)",
199-
listing.getMultipartUploads().size());
202+
listing.uploads().size());
200203
LOG.debug("New listing state: {}", this);
201204
}
202205
}
@@ -216,14 +219,14 @@ public static class UploadIterator
216219
*/
217220
private ListingIterator lister;
218221
/** Current listing: the last upload listing we fetched. */
219-
private MultipartUploadListing listing;
222+
private ListMultipartUploadsResponse listing;
220223
/** Iterator over the current listing. */
221224
private ListIterator<MultipartUpload> batchIterator;
222225

223226
@Retries.RetryTranslated
224227
public UploadIterator(
225228
final StoreContext storeContext,
226-
AmazonS3 s3,
229+
S3Client s3,
227230
int maxKeys,
228231
@Nullable String prefix)
229232
throws IOException {
@@ -249,7 +252,7 @@ public MultipartUpload next() throws IOException {
249252
private boolean requestNextBatch() throws IOException {
250253
if (lister.hasNext()) {
251254
listing = lister.next();
252-
batchIterator = listing.getMultipartUploads().listIterator();
255+
batchIterator = listing.uploads().listIterator();
253256
return batchIterator.hasNext();
254257
}
255258
return false;

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.slf4j.Logger;
4949
import org.slf4j.LoggerFactory;
5050

51+
import software.amazon.awssdk.services.s3.model.CompletedPart;
5152
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
5253
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
5354

@@ -419,7 +420,7 @@ public void close() throws IOException {
419420
uploadCurrentBlock(true);
420421
}
421422
// wait for the partial uploads to finish
422-
final List<PartETag> partETags =
423+
final List<CompletedPart> partETags =
423424
multiPartUpload.waitForAllPartUploads();
424425
bytes = bytesSubmitted;
425426

@@ -742,7 +743,7 @@ protected IOStatisticsAggregator getThreadIOStatistics() {
742743
*/
743744
private class MultiPartUpload {
744745
private final String uploadId;
745-
private final List<ListenableFuture<PartETag>> partETagsFutures;
746+
private final List<ListenableFuture<CompletedPart>> partETagsFutures;
746747
private int partsSubmitted;
747748
private int partsUploaded;
748749
private long bytesSubmitted;
@@ -874,20 +875,22 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
874875
block, progressListener, now());
875876
request.setGeneralProgressListener(callback);
876877
statistics.blockUploadQueued(block.dataSize());
877-
ListenableFuture<PartETag> partETagFuture =
878+
ListenableFuture<CompletedPart> partETagFuture =
878879
executorService.submit(() -> {
879880
// this is the queued upload operation
880881
// do the upload
881882
try {
882883
LOG.debug("Uploading part {} for id '{}'",
883884
currentPartNumber, uploadId);
885+
// TODO: This needs to updated during uploadPart work. Remove PartEtag import.
884886
PartETag partETag = writeOperationHelper.uploadPart(request)
885887
.getPartETag();
886888
LOG.debug("Completed upload of {} to part {}",
887889
block, partETag.getETag());
888890
LOG.debug("Stream statistics of {}", statistics);
889891
partsUploaded++;
890-
return partETag;
892+
return CompletedPart.builder().eTag(partETag.getETag()).partNumber(currentPartNumber)
893+
.build();
891894
} catch (IOException e) {
892895
// save immediately.
893896
noteUploadFailure(e);
@@ -905,7 +908,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
905908
* @return list of results
906909
* @throws IOException IO Problems
907910
*/
908-
private List<PartETag> waitForAllPartUploads() throws IOException {
911+
private List<CompletedPart> waitForAllPartUploads() throws IOException {
909912
LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
910913
try {
911914
return Futures.allAsList(partETagsFutures).get();
@@ -929,7 +932,7 @@ private List<PartETag> waitForAllPartUploads() throws IOException {
929932
*/
930933
private void cancelAllActiveFutures() {
931934
LOG.debug("Cancelling futures");
932-
for (ListenableFuture<PartETag> future : partETagsFutures) {
935+
for (ListenableFuture<CompletedPart> future : partETagsFutures) {
933936
future.cancel(true);
934937
}
935938
}
@@ -941,7 +944,7 @@ private void cancelAllActiveFutures() {
941944
* @param partETags list of partial uploads
942945
* @throws IOException on any problem
943946
*/
944-
private void complete(List<PartETag> partETags)
947+
private void complete(List<CompletedPart> partETags)
945948
throws IOException {
946949
maybeRethrowUploadFailure();
947950
AtomicInteger errorCount = new AtomicInteger(0);

0 commit comments

Comments
 (0)