diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index aada612fceae9..c2cf3fb00fe96 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -135,6 +135,7 @@
**/TestS3AFastOutputStream.java
**/TestS3AFileSystemContract.java
**/TestS3AMiniYarnCluster.java
+ **/TestS3AEncryptionSSEC.java
**/Test*Root*.java
@@ -154,6 +155,7 @@
**/TestS3AFastOutputStream.java
**/TestS3AFileSystemContract.java
**/TestS3AMiniYarnCluster.java
+ **/TestS3AEncryptionSSEC.java
**/Test*Root*.java
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 71668db0e86c7..1c23545f8d07a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -142,12 +142,8 @@ private Constants() {
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";
- /**
- * The standard encryption algorithm AWS supports.
- * Different implementations may support others (or none).
- */
- public static final String SERVER_SIDE_ENCRYPTION_AES256 =
- "AES256";
+ public static final String SERVER_SIDE_ENCRYPTION_KEY =
+ "fs.s3a.server-side-encryption-key";
//override signature algorithm used for signing requests
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
new file mode 100644
index 0000000000000..bc8b07b65b184
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * This enum is to centralize the encryption methods and
+ * the value required in the configuration.
+ */
+public enum S3AEncryptionMethods {
+
+ SSE_S3("AES256"),
+ SSE_KMS("SSE-KMS"),
+ SSE_C("SSE-C");
+
+ private String method;
+
+ S3AEncryptionMethods(String method) {
+ this.method = method;
+ }
+
+ public String getMethod() {
+ return method;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 7a985c6f95cca..905830f98b4c1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -276,6 +276,7 @@ private MultiPartUpload initiateMultiPartUpload() throws IOException {
new InitiateMultipartUploadRequest(bucket,
key,
createDefaultMetadata());
+ fs.setSSEKMSOrCIfRequired(initiateMPURequest);
initiateMPURequest.setCannedACL(cannedACL);
try {
return new MultiPartUpload(
@@ -295,6 +296,7 @@ private void putObject() throws IOException {
fs.newPutObjectRequest(key,
om,
new ByteArrayInputStream(buffer.toByteArray()));
+ fs.setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setGeneralProgressListener(progressListener);
ListenableFuture putObjectResult =
executorService.submit(new Callable() {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9a3fcf250817d..9d5d3cc863653 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -48,12 +48,16 @@
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy;
@@ -126,6 +130,7 @@ public class S3AFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
+ private String serverSideEncryptionKey;
private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics;
private long readAhead;
@@ -290,6 +295,8 @@ public StorageStatistics provide() {
serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+ serverSideEncryptionKey = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
+
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
} catch (AmazonClientException e) {
@@ -621,9 +628,19 @@ public FSDataInputStream open(Path f, int bufferSize)
+ " because it is a directory");
}
- return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
- fileStatus.getLen(), s3, statistics, instrumentation, readAhead,
- inputPolicy));
+ return new FSDataInputStream(
+ new S3AInputStream(
+ new S3ObjectAttributes(
+ bucket,
+ pathToKey(f),
+ serverSideEncryptionAlgorithm,
+ serverSideEncryptionKey),
+ fileStatus.getLen(),
+ s3,
+ statistics,
+ instrumentation,
+ readAhead,
+ inputPolicy));
}
/**
@@ -894,7 +911,10 @@ protected void incrementStatistic(Statistic statistic, long count) {
*/
protected ObjectMetadata getObjectMetadata(String key) {
incrementStatistic(OBJECT_METADATA_REQUESTS);
- ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
+ GetObjectMetadataRequest request =
+ new GetObjectMetadataRequest(bucket, key);
+ setSSECIfRequired(request);
+ ObjectMetadata meta = s3.getObjectMetadata(request);
incrementReadOperations();
return meta;
}
@@ -974,6 +994,7 @@ public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile);
+ setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(metadata);
return putObjectRequest;
@@ -992,6 +1013,7 @@ PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
+ setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
return putObjectRequest;
}
@@ -1004,9 +1026,7 @@ PutObjectRequest newPutObjectRequest(String key,
*/
public ObjectMetadata newObjectMetadata() {
final ObjectMetadata om = new ObjectMetadata();
- if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
- om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
- }
+ setSSES3IfRequired(om);
return om;
}
@@ -1638,11 +1658,10 @@ private void copyFile(String srcKey, String dstKey, long size)
try {
ObjectMetadata srcom = getObjectMetadata(srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
- if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
- dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
- }
+ setSSES3IfRequired(dstom);
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
+ setSSEKMSOrCIfRequired(copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);
@@ -1674,6 +1693,111 @@ public void progressChanged(ProgressEvent progressEvent) {
}
}
+ protected void setSSEKMSOrCIfRequired(InitiateMultipartUploadRequest req) {
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
+ if(S3AEncryptionMethods.SSE_KMS.getMethod()
+ .equals(serverSideEncryptionAlgorithm)) {
+ if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
+ //Use specified key
+ req.setSSEAwsKeyManagementParams(
+ new SSEAwsKeyManagementParams(serverSideEncryptionKey)
+ );
+ }else{
+ //Use default key
+ req.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams());
+ }
+ }else if(S3AEncryptionMethods.SSE_C.getMethod()
+ .equals(serverSideEncryptionAlgorithm)) {
+ if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
+ //at the moment, only supports copy using the same key
+ req.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
+ }
+ }
+ }
+ }
+
+
+ protected void setSSEKMSOrCIfRequired(CopyObjectRequest copyObjectRequest) {
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
+ if(S3AEncryptionMethods.SSE_KMS.getMethod()
+ .equals(serverSideEncryptionAlgorithm)) {
+ if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
+ //Use specified key
+ copyObjectRequest.setSSEAwsKeyManagementParams(
+ new SSEAwsKeyManagementParams(serverSideEncryptionKey)
+ );
+ }else{
+ //Use default key
+ copyObjectRequest.setSSEAwsKeyManagementParams(
+ new SSEAwsKeyManagementParams()
+ );
+ }
+ }else if(S3AEncryptionMethods.SSE_C.getMethod()
+ .equals(serverSideEncryptionAlgorithm)) {
+ if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
+ //at the moment, only supports copy using the same key
+ copyObjectRequest.setSourceSSECustomerKey(
+ new SSECustomerKey(serverSideEncryptionKey)
+ );
+ copyObjectRequest.setDestinationSSECustomerKey(
+ new SSECustomerKey(serverSideEncryptionKey)
+ );
+ }
+ }
+ }
+ }
+
+ protected void setSSECIfRequired(GetObjectMetadataRequest request) {
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
+ if(S3AEncryptionMethods.SSE_C.getMethod()
+ .equals(serverSideEncryptionAlgorithm)) {
+ if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
+ //at the moment, only supports copy using the same key
+ request.setSSECustomerKey(
+ new SSECustomerKey(serverSideEncryptionKey)
+ );
+ }
+ }
+ }
+ }
+
+ protected void setSSEKMSOrCIfRequired(PutObjectRequest request) {
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
+ if(S3AEncryptionMethods.SSE_KMS.getMethod()
+ .equals(serverSideEncryptionAlgorithm)) {
+ if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
+ request.setSSEAwsKeyManagementParams(
+ new SSEAwsKeyManagementParams(serverSideEncryptionKey)
+ );
+ }else{
+ request.setSSEAwsKeyManagementParams(
+ new SSEAwsKeyManagementParams()
+ );
+ }
+ }else if(S3AEncryptionMethods.SSE_C.getMethod()
+ .equals(serverSideEncryptionAlgorithm)) {
+ if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
+ request.setSSECustomerKey(
+ new SSECustomerKey(serverSideEncryptionKey)
+ );
+ }
+ }
+ }
+ }
+
+ private void setSSES3IfRequired(ObjectMetadata metadata) {
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
+ if(S3AEncryptionMethods.SSE_S3.getMethod()
+ .equals(serverSideEncryptionAlgorithm) ||
+ (!S3AEncryptionMethods.SSE_KMS.getMethod()
+ .equals(serverSideEncryptionAlgorithm) &&
+ !S3AEncryptionMethods.SSE_C.getMethod()
+ .equals(serverSideEncryptionAlgorithm))) {
+ metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm);
+ }
+ }
+ }
+
/**
* Perform post-write actions.
* @param key key written to
@@ -1846,6 +1970,11 @@ public String toString() {
.append(serverSideEncryptionAlgorithm)
.append('\'');
}
+ if (serverSideEncryptionKey != null) {
+ sb.append(", serverSideEncryptionKey='")
+ .append(serverSideEncryptionKey)
+ .append('\'');
+ }
sb.append(", statistics {")
.append(statistics)
.append("}");
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index ccb97269aba0b..70b6469278362 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -22,6 +22,7 @@
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.services.s3.model.SSECustomerKey;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -78,6 +79,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private final String uri;
public static final Logger LOG = S3AFileSystem.LOG;
private final S3AInstrumentation.InputStreamStatistics streamStatistics;
+ private String serverSideEncryptionAlgorithm;
+ private String serverSideEncryptionKey;
private final S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
@@ -98,24 +101,33 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/
private long contentRangeStart;
- public S3AInputStream(String bucket,
- String key,
+ public S3AInputStream(S3ObjectAttributes s3Attributes,
long contentLength,
AmazonS3Client client,
FileSystem.Statistics stats,
S3AInstrumentation instrumentation,
long readahead,
S3AInputPolicy inputPolicy) {
- Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket");
- Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key");
- Preconditions.checkArgument(contentLength >= 0 , "Negative content length");
- this.bucket = bucket;
- this.key = key;
+ Preconditions.checkNotNull(s3Attributes);
+ Preconditions.checkArgument(
+ StringUtils.isNotEmpty(s3Attributes.getBucket()),
+ "No Bucket");
+ Preconditions.checkArgument(
+ StringUtils.isNotEmpty(s3Attributes.getKey()),
+ "No Key");
+ Preconditions.checkArgument(
+ contentLength >= 0 ,
+ "Negative content length");
+ this.bucket = s3Attributes.getBucket();
+ this.key = s3Attributes.getKey();
this.contentLength = contentLength;
this.client = client;
this.stats = stats;
this.uri = "s3a://" + this.bucket + "/" + this.key;
this.streamStatistics = instrumentation.newInputStreamStatistics();
+ this.serverSideEncryptionAlgorithm =
+ s3Attributes.getServerSideEncryptionAlgorithm();
+ this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
this.inputPolicy = inputPolicy;
setReadahead(readahead);
}
@@ -145,6 +157,7 @@ private synchronized void reopen(String reason, long targetPos, long length)
try {
GetObjectRequest request = new GetObjectRequest(bucket, key)
.withRange(targetPos, contentRangeFinish);
+ setSSECIfRequired(request);
wrappedStream = client.getObject(request).getObjectContent();
contentRangeStart = targetPos;
if (wrappedStream == null) {
@@ -158,6 +171,15 @@ private synchronized void reopen(String reason, long targetPos, long length)
this.pos = targetPos;
}
+ private void setSSECIfRequired(GetObjectRequest request) {
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm) &&
+ S3AEncryptionMethods.SSE_C.getMethod()
+ .equals(serverSideEncryptionAlgorithm) &&
+ StringUtils.isNotBlank(serverSideEncryptionKey)){
+ request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
+ }
+ }
+
@Override
public synchronized long getPos() throws IOException {
return (nextReadPos < 0) ? 0 : nextReadPos;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
new file mode 100644
index 0000000000000..276d8b368619f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * This class is only a holder for bucket, key, SSE Algorithm and SSE key
+ * attributes. It is only used in {@link S3AInputStream}
+ * as a way to reduce parameters being passed
+ * to the constructor of such class.
+ */
+public class S3ObjectAttributes {
+ private String bucket;
+ private String key;
+ private String serverSideEncryptionAlgorithm;
+ private String serverSideEncryptionKey;
+
+ public S3ObjectAttributes(
+ String bucket,
+ String key,
+ String serverSideEncryptionAlgorithm,
+ String serverSideEncryptionKey) {
+ this.bucket = bucket;
+ this.key = key;
+ this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+ this.serverSideEncryptionKey = serverSideEncryptionKey;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getServerSideEncryptionAlgorithm() {
+ return serverSideEncryptionAlgorithm;
+ }
+
+ public String getServerSideEncryptionKey() {
+ return serverSideEncryptionKey;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 9213a132b8ced..94dcf2e408d9c 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -254,7 +254,7 @@ monitoring and other features.
schemes.
* Supports authentication via: environment variables, Hadoop configuration
properties, the Hadoop key management store and IAM roles.
-* Supports S3 "Server Side Encryption" for both reading and writing.
+* Supports Server Side Encryption (SSE-S3, SSE-KMS, SSE-C) for both reading and writing.
* Supports proxies
* Test suites includes distcp and suites in downstream projects.
* Available since Hadoop 2.6; considered production ready in Hadoop 2.7.
@@ -572,10 +572,20 @@ this capability.
fs.s3a.server-side-encryption-algorithm
Specify a server-side encryption algorithm for s3a: file system.
- Unset by default, and the only other currently allowable value is AES256.
+ Unset by default. It supports the following values: 'AES256' (for SSE-S3), 'SSE-KMS'
+ and 'SSE-C'
+
+ fs.s3a.server-side-encryption-key
+ Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
+ has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
+ should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
+ you'll be using your default's S3 KMS key, otherwise you should set this property to
+ the specific KMS key id.
+
+
fs.s3a.buffer.dir
${hadoop.tmp.dir}/s3a
@@ -940,6 +950,10 @@ the DNS TTL of a JVM is "infinity".
To work with AWS better, set the DNS time-to-live of an application which
works with S3 to something lower. See [AWS documentation](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-jvm-ttl.html).
+*internal.S3V4AuthErrorRetryStrategy (S3V4AuthErrorRetryStrategy.java:buildRetryParams(117)) - Attempting to re-send the request to...*
+
+To avoid this warning, you should set the specific S3 endpoint by setting `fs.s3a.endpoint`
+
## Testing the S3 filesystem clients
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryption.java
index 43a26ff24955b..b79085f6b3f83 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryption.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryption.java
@@ -27,23 +27,24 @@
import java.io.IOException;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
/**
* Test whether or not encryption works by turning it on. Some checks
* are made for different file sizes as there have been reports that the
* file length may be rounded up to match word boundaries.
*/
-public class TestS3AEncryption extends AbstractS3ATestBase {
- private static final String AES256 = Constants.SERVER_SIDE_ENCRYPTION_AES256;
+public abstract class TestS3AEncryption extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
- AES256);
+ getSSEAlgorithm());
return conf;
}
@@ -98,7 +99,19 @@ private String createFilename(int len) {
*/
private void assertEncrypted(Path path) throws IOException {
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
- assertEquals(AES256, md.getSSEAlgorithm());
+ if(S3AEncryptionMethods.SSE_C.getMethod().equals(getSSEAlgorithm())){
+ assertEquals("AES256", md.getSSECustomerAlgorithm());
+ }else if(S3AEncryptionMethods.SSE_KMS.getMethod()
+ .equals(getSSEAlgorithm())){
+ assertEquals("aws:kms", md.getSSEAlgorithm());
+ assertTrue(md.getSSEAwsKmsKeyId().contains(
+ this.getConfiguration().getTrimmed(
+ Constants.SERVER_SIDE_ENCRYPTION_KEY)));
+ }else{
+ assertEquals("AES256", md.getSSEAlgorithm());
+ }
}
+ protected abstract String getSSEAlgorithm();
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionFastOutputStream.java
index 1fa8486b5df78..dd05dca860127 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionFastOutputStream.java
@@ -32,4 +32,9 @@ protected Configuration createConfiguration() {
conf.setBoolean(Constants.FAST_UPLOAD, true);
return conf;
}
+
+ @Override
+ protected String getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_S3.getMethod();
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSEC.java
new file mode 100644
index 0000000000000..1827f8f64ef6d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSEC.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Concrete class that extends {@link TestS3AEncryption}
+ * and tests SSE-C encryption.
+ */
+public class TestS3AEncryptionSSEC extends TestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ getSSEAlgorithm());
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+ "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
+ return conf;
+ }
+
+ @Override
+ protected String getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_C.getMethod();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSEKMS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSEKMS.java
new file mode 100644
index 0000000000000..5a0643272e4f7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSEKMS.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Concrete class that extends {@link TestS3AEncryption}
+ * and tests SSE-KMS encryption.
+ */
+public class TestS3AEncryptionSSEKMS extends TestS3AEncryption {
+
+ @Override
+ protected String getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_KMS.getMethod();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSES3.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSES3.java
new file mode 100644
index 0000000000000..9b135b2612893
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AEncryptionSSES3.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Concrete class that extends {@link TestS3AEncryption}
+ * and tests SSE-S3 encryption.
+ */
+public class TestS3AEncryptionSSES3 extends TestS3AEncryption {
+
+ @Override
+ protected String getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_S3.getMethod();
+ }
+}