Skip to content

Commit f7a331d

Browse files
HADOOP-15224. S3A: Add option to set checksum on S3 objects (#7396)
Add the property fs.s3a.create.checksum.algorithm that allow users to specify a checksum algorithm (CRC32, CRC32C, SHA1, or SHA256) to be used by the AWS SDK to generate the checksum for object integrity check. Contributed by Raphael Azzolini
1 parent 5c10e0d commit f7a331d

21 files changed

+951
-36
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,6 +1781,15 @@ private Constants() {
17811781
*/
17821782
public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;
17831783

1784+
/**
1785+
* Indicates the algorithm used to create the checksum for the object
1786+
* to be uploaded to S3. Unset by default. It supports the following values:
1787+
* 'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
1788+
* value:{@value}
1789+
*/
1790+
public static final String CHECKSUM_ALGORITHM =
1791+
"fs.s3a.create.checksum.algorithm";
1792+
17841793
/**
17851794
* Are extensions classes, such as {@code fs.s3a.aws.credentials.provider},
17861795
* going to be loaded from the same classloader that loaded

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,10 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
10641064
return CompletedPart.builder()
10651065
.eTag(response.eTag())
10661066
.partNumber(currentPartNumber)
1067+
.checksumCRC32(response.checksumCRC32())
1068+
.checksumCRC32C(response.checksumCRC32C())
1069+
.checksumSHA1(response.checksumSHA1())
1070+
.checksumSHA256(response.checksumSHA256())
10671071
.build();
10681072
} catch (Exception e) {
10691073
final IOException ex = e instanceof IOException

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
118118
import org.apache.hadoop.fs.s3a.impl.CSES3AFileSystemOperations;
119119
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
120+
import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
120121
import org.apache.hadoop.fs.s3a.impl.ClientManager;
121122
import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
122123
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
@@ -1327,6 +1328,7 @@ protected RequestFactory createRequestFactory() {
13271328
.withStorageClass(storageClass)
13281329
.withMultipartUploadEnabled(isMultipartUploadEnabled)
13291330
.withPartUploadTimeout(partUploadTimeout)
1331+
.withChecksumAlgorithm(ChecksumSupport.getChecksumAlgorithm(getConf()))
13301332
.build();
13311333
}
13321334

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
@InterfaceAudience.Private
7272
@InterfaceStability.Unstable
7373
public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommit>
74-
implements Iterable<String> {
74+
implements Iterable<UploadEtag> {
7575

7676
/**
7777
* Serialization ID: {@value}.
@@ -118,7 +118,7 @@ public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommi
118118
private String text = "";
119119

120120
/** Ordered list of etags. */
121-
private List<String> etags;
121+
private List<UploadEtag> etags;
122122

123123
/**
124124
* Any custom extra data committer subclasses may choose to add.
@@ -222,7 +222,7 @@ public void bindCommitData(List<CompletedPart> parts) throws ValidationFailure {
222222
for (CompletedPart part : parts) {
223223
verify(part.partNumber() == counter,
224224
"Expected part number %s but got %s", counter, part.partNumber());
225-
etags.add(part.eTag());
225+
etags.add(UploadEtag.fromCompletedPart(part));
226226
counter++;
227227
}
228228
}
@@ -237,9 +237,10 @@ public void validate() throws ValidationFailure {
237237
verify(length >= 0, "Invalid length: " + length);
238238
destinationPath();
239239
verify(etags != null, "No etag list");
240-
validateCollectionClass(etags, String.class);
241-
for (String etag : etags) {
242-
verify(StringUtils.isNotEmpty(etag), "Empty etag");
240+
validateCollectionClass(etags, UploadEtag.class);
241+
for (UploadEtag etag : etags) {
242+
verify(etag != null && StringUtils.isNotEmpty(etag.getEtag()),
243+
"Empty etag");
243244
}
244245
if (extraData != null) {
245246
validateCollectionClass(extraData.keySet(), String.class);
@@ -313,7 +314,7 @@ public int getPartCount() {
313314
* @return an iterator.
314315
*/
315316
@Override
316-
public Iterator<String> iterator() {
317+
public Iterator<UploadEtag> iterator() {
317318
return etags.iterator();
318319
}
319320

@@ -442,11 +443,11 @@ public void setText(String text) {
442443
}
443444

444445
/** @return ordered list of etags. */
445-
public List<String> getEtags() {
446+
public List<UploadEtag> getEtags() {
446447
return etags;
447448
}
448449

449-
public void setEtags(List<String> etags) {
450+
public void setEtags(List<UploadEtag> etags) {
450451
this.etags = etags;
451452
}
452453

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.commit.files;
20+
21+
import java.io.Serializable;
22+
import java.util.StringJoiner;
23+
24+
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
25+
import software.amazon.awssdk.services.s3.model.CompletedPart;
26+
27+
/**
28+
* Stores ETag and checksum values from {@link CompletedPart} responses from S3.
29+
* These values need to be stored to be later passed to the
30+
* {@link software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest
31+
* CompleteMultipartUploadRequest}
32+
*/
33+
public class UploadEtag implements Serializable {
34+
35+
/**
36+
* Serialization ID: {@value}.
37+
*/
38+
private static final long serialVersionUID = 1L;
39+
40+
private String etag;
41+
private String checksumAlgorithm;
42+
private String checksum;
43+
44+
public UploadEtag() {
45+
}
46+
47+
public UploadEtag(String etag, String checksumAlgorithm, String checksum) {
48+
this.etag = etag;
49+
this.checksumAlgorithm = checksumAlgorithm;
50+
this.checksum = checksum;
51+
}
52+
53+
public String getEtag() {
54+
return etag;
55+
}
56+
57+
public void setEtag(String etag) {
58+
this.etag = etag;
59+
}
60+
61+
public String getChecksumAlgorithm() {
62+
return checksumAlgorithm;
63+
}
64+
65+
public void setChecksumAlgorithm(String checksumAlgorithm) {
66+
this.checksumAlgorithm = checksumAlgorithm;
67+
}
68+
69+
public String getChecksum() {
70+
return checksum;
71+
}
72+
73+
public void setChecksum(String checksum) {
74+
this.checksum = checksum;
75+
}
76+
77+
public static UploadEtag fromCompletedPart(CompletedPart completedPart) {
78+
UploadEtag uploadEtag = new UploadEtag();
79+
uploadEtag.setEtag(completedPart.eTag());
80+
if (completedPart.checksumCRC32() != null) {
81+
uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32.toString());
82+
uploadEtag.setChecksum(completedPart.checksumCRC32());
83+
}
84+
if (completedPart.checksumCRC32C() != null) {
85+
uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32_C.toString());
86+
uploadEtag.setChecksum(completedPart.checksumCRC32C());
87+
}
88+
if (completedPart.checksumSHA1() != null) {
89+
uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA1.toString());
90+
uploadEtag.setChecksum(completedPart.checksumSHA1());
91+
}
92+
if (completedPart.checksumSHA256() != null) {
93+
uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA256.toString());
94+
uploadEtag.setChecksum(completedPart.checksumSHA256());
95+
}
96+
return uploadEtag;
97+
}
98+
99+
public static CompletedPart toCompletedPart(UploadEtag uploadEtag, int partNumber) {
100+
final CompletedPart.Builder builder = CompletedPart.builder()
101+
.partNumber(partNumber)
102+
.eTag(uploadEtag.etag);
103+
if (uploadEtag.checksumAlgorithm == null) {
104+
return builder.build();
105+
}
106+
final ChecksumAlgorithm checksumAlgorithm = ChecksumAlgorithm.fromValue(
107+
uploadEtag.checksumAlgorithm);
108+
switch (checksumAlgorithm) {
109+
case CRC32:
110+
builder.checksumCRC32(uploadEtag.checksum);
111+
break;
112+
case CRC32_C:
113+
builder.checksumCRC32C(uploadEtag.checksum);
114+
break;
115+
case SHA1:
116+
builder.checksumSHA1(uploadEtag.checksum);
117+
break;
118+
case SHA256:
119+
builder.checksumSHA256(uploadEtag.checksum);
120+
break;
121+
default:
122+
// do nothing
123+
}
124+
return builder.build();
125+
}
126+
127+
@Override
128+
public String toString() {
129+
return new StringJoiner(", ", UploadEtag.class.getSimpleName() + "[", "]")
130+
.add("serialVersionUID='" + serialVersionUID + "'")
131+
.add("etag='" + etag + "'")
132+
.add("checksumAlgorithm='" + checksumAlgorithm + "'")
133+
.add("checksum='" + checksum + "'")
134+
.toString();
135+
}
136+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hadoop.fs.s3a.WriteOperations;
5454
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
5555
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
56+
import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
5657
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
5758
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
5859
import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
@@ -165,9 +166,9 @@ public CommitOperations(S3AFileSystem fs,
165166
* @param tagIds list of tags
166167
* @return same list, now in numbered tuples
167168
*/
168-
public static List<CompletedPart> toPartEtags(List<String> tagIds) {
169+
public static List<CompletedPart> toPartEtags(List<UploadEtag> tagIds) {
169170
return IntStream.range(0, tagIds.size())
170-
.mapToObj(i -> CompletedPart.builder().partNumber(i + 1).eTag(tagIds.get(i)).build())
171+
.mapToObj(i -> UploadEtag.toCompletedPart(tagIds.get(i), i + 1))
171172
.collect(Collectors.toList());
172173
}
173174

@@ -655,6 +656,10 @@ private List<CompletedPart> uploadFileData(
655656
parts.add(CompletedPart.builder()
656657
.partNumber(partNumber)
657658
.eTag(response.eTag())
659+
.checksumCRC32(response.checksumCRC32())
660+
.checksumCRC32C(response.checksumCRC32C())
661+
.checksumSHA1(response.checksumSHA1())
662+
.checksumSHA256(response.checksumSHA256())
658663
.build());
659664
}
660665
return parts;
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl;
20+
21+
import java.util.Set;
22+
23+
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
24+
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
25+
26+
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.hadoop.util.ConfigurationHelper;
29+
30+
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
31+
32+
/**
33+
* Utility class to support operations on S3 object checksum.
34+
*/
35+
public final class ChecksumSupport {
36+
37+
private ChecksumSupport() {
38+
}
39+
40+
/**
41+
* Checksum algorithms that are supported by S3A.
42+
*/
43+
private static final Set<ChecksumAlgorithm> SUPPORTED_CHECKSUM_ALGORITHMS = ImmutableSet.of(
44+
ChecksumAlgorithm.CRC32,
45+
ChecksumAlgorithm.CRC32_C,
46+
ChecksumAlgorithm.SHA1,
47+
ChecksumAlgorithm.SHA256);
48+
49+
/**
50+
* Get the checksum algorithm to be used for data integrity check of the objects in S3.
51+
* This operation includes validating if the provided value is a supported checksum algorithm.
52+
* @param conf configuration to scan
53+
* @return the checksum algorithm to be passed on S3 requests
54+
* @throws IllegalArgumentException if the checksum algorithm is not known or not supported
55+
*/
56+
public static ChecksumAlgorithm getChecksumAlgorithm(Configuration conf) {
57+
return ConfigurationHelper.resolveEnum(conf,
58+
CHECKSUM_ALGORITHM,
59+
ChecksumAlgorithm.class,
60+
configValue -> {
61+
if (StringUtils.isBlank(configValue)) {
62+
return null;
63+
}
64+
if (ChecksumAlgorithm.CRC32_C.toString().equalsIgnoreCase(configValue)) {
65+
// In case the configuration value is CRC32C, without underscore.
66+
return ChecksumAlgorithm.CRC32_C;
67+
}
68+
throw new IllegalArgumentException("Checksum algorithm is not supported: " + configValue);
69+
});
70+
}
71+
}

0 commit comments

Comments
 (0)