Skip to content

Commit e76699b

Browse files
HADOOP-18708: S3A: Support S3 Client Side Encryption(CSE) (#6884)
Add support for S3 client side encryption (CSE). CSE can configured in two modes: - CSE-KMS where keys are provided by AWS KMS - CSE-CUSTOM where custom keys are provided by implementing a custom keyring. CSE requires an encryption library: amazon-s3-encryption-client-java.jar This is _not_ included in the shaded bundle.jar and is released separately. The version used is currently 3.1.1 Contributed by Syed Shameerur Rahman.
1 parent fc388f7 commit e76699b

File tree

53 files changed

+2175
-231
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2175
-231
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
5757
* It is possible to have parts uploaded in any order (or in parallel).
5858
* @param uploadId Identifier from {@link #startUpload(Path)}.
5959
* @param partNumber Index of the part relative to others.
60+
* @param isLastPart is the part the last part of the upload?
6061
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
6162
* @param inputStream Data for this part. Implementations MUST close this
6263
* stream after reading in the data.
@@ -67,6 +68,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
6768
CompletableFuture<PartHandle> putPart(
6869
UploadHandle uploadId,
6970
int partNumber,
71+
boolean isLastPart,
7072
Path filePath,
7173
InputStream inputStream,
7274
long lengthInBytes)
@@ -77,7 +79,7 @@ CompletableFuture<PartHandle> putPart(
7779
* @param uploadId Identifier from {@link #startUpload(Path)}.
7880
* @param filePath Target path for upload (as {@link #startUpload(Path)}.
7981
* @param handles non-empty map of part number to part handle.
80-
* from {@link #putPart(UploadHandle, int, Path, InputStream, long)}.
82+
* from {@link #putPart(UploadHandle, int, boolean, Path, InputStream, long)}.
8183
* @return unique PathHandle identifier for the uploaded file.
8284
* @throws IOException IO failure
8385
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
101101

102102
/**
103103
* Check all the arguments to the
104-
* {@link MultipartUploader#putPart(UploadHandle, int, Path, InputStream, long)}
104+
* {@link MultipartUploader#putPart(UploadHandle, int, boolean, Path, InputStream, long)}
105105
* operation.
106106
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
107107
* @param inputStream Data for this part. Implementations MUST close this

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public CompletableFuture<UploadHandle> startUpload(Path filePath)
111111

112112
@Override
113113
public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
114-
int partNumber, Path filePath,
114+
int partNumber, boolean isLastPart, Path filePath,
115115
InputStream inputStream,
116116
long lengthInBytes)
117117
throws IOException {

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ public void testSingleUpload() throws Exception {
249249
// was interpreted as an inconsistent write.
250250
MultipartUploader completer = uploader0;
251251
// and upload with uploader 1 to validate cross-uploader uploads
252-
PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
252+
PartHandle partHandle = putPart(file, uploadHandle, 1, true, payload);
253253
partHandles.put(1, partHandle);
254254
PathHandle fd = complete(completer, uploadHandle, file,
255255
partHandles);
@@ -317,12 +317,13 @@ protected PartHandle buildAndPutPart(
317317
final Path file,
318318
final UploadHandle uploadHandle,
319319
final int index,
320+
final boolean isLastPart,
320321
final MessageDigest origDigest) throws IOException {
321322
byte[] payload = generatePayload(index);
322323
if (origDigest != null) {
323324
origDigest.update(payload);
324325
}
325-
return putPart(file, uploadHandle, index, payload);
326+
return putPart(file, uploadHandle, index, isLastPart, payload);
326327
}
327328

328329
/**
@@ -331,13 +332,15 @@ protected PartHandle buildAndPutPart(
331332
* @param file destination
332333
* @param uploadHandle handle
333334
* @param index index of part
335+
* @param isLastPart is last part of the upload ?
334336
* @param payload byte array of payload
335337
* @return the part handle
336338
* @throws IOException IO failure.
337339
*/
338340
protected PartHandle putPart(final Path file,
339341
final UploadHandle uploadHandle,
340342
final int index,
343+
final boolean isLastPart,
341344
final byte[] payload) throws IOException {
342345
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
343346
PartHandle partHandle;
@@ -347,7 +350,7 @@ protected PartHandle putPart(final Path file,
347350
payload.length,
348351
file)) {
349352
partHandle = awaitFuture(getUploader(index)
350-
.putPart(uploadHandle, index, file,
353+
.putPart(uploadHandle, index, isLastPart, file,
351354
new ByteArrayInputStream(payload),
352355
payload.length));
353356
}
@@ -488,7 +491,7 @@ public void testMultipartUpload() throws Exception {
488491
MessageDigest origDigest = DigestUtils.getMd5Digest();
489492
int payloadCount = getTestPayloadCount();
490493
for (int i = 1; i <= payloadCount; ++i) {
491-
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
494+
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, i == payloadCount,
492495
origDigest);
493496
partHandles.put(i, partHandle);
494497
}
@@ -515,7 +518,7 @@ public void testMultipartUploadEmptyPart() throws Exception {
515518
origDigest.update(payload);
516519
InputStream is = new ByteArrayInputStream(payload);
517520
PartHandle partHandle = awaitFuture(
518-
uploader.putPart(uploadHandle, 1, file, is, payload.length));
521+
uploader.putPart(uploadHandle, 1, true, file, is, payload.length));
519522
partHandles.put(1, partHandle);
520523
completeUpload(file, uploadHandle, partHandles, origDigest, 0);
521524
}
@@ -530,7 +533,7 @@ public void testUploadEmptyBlock() throws Exception {
530533
Path file = methodPath();
531534
UploadHandle uploadHandle = startUpload(file);
532535
Map<Integer, PartHandle> partHandles = new HashMap<>();
533-
partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
536+
partHandles.put(1, putPart(file, uploadHandle, 1, true, new byte[0]));
534537
completeUpload(file, uploadHandle, partHandles, null, 0);
535538
}
536539

@@ -550,7 +553,8 @@ public void testMultipartUploadReverseOrder() throws Exception {
550553
origDigest.update(payload);
551554
}
552555
for (int i = payloadCount; i > 0; --i) {
553-
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
556+
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
557+
null));
554558
}
555559
completeUpload(file, uploadHandle, partHandles, origDigest,
556560
payloadCount * partSizeInBytes());
@@ -574,7 +578,8 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
574578
}
575579
Map<Integer, PartHandle> partHandles = new HashMap<>();
576580
for (int i = payloadCount; i > 0; i -= 2) {
577-
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
581+
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
582+
null));
578583
}
579584
completeUpload(file, uploadHandle, partHandles, origDigest,
580585
getTestPayloadCount() * partSizeInBytes());
@@ -591,7 +596,7 @@ public void testMultipartUploadAbort() throws Exception {
591596
UploadHandle uploadHandle = startUpload(file);
592597
Map<Integer, PartHandle> partHandles = new HashMap<>();
593598
for (int i = 12; i > 10; i--) {
594-
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
599+
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == 12, null));
595600
}
596601
abortUpload(uploadHandle, file);
597602

@@ -601,7 +606,7 @@ public void testMultipartUploadAbort() throws Exception {
601606

602607
intercept(IOException.class,
603608
() -> awaitFuture(
604-
uploader0.putPart(uploadHandle, 49, file, is, len)));
609+
uploader0.putPart(uploadHandle, 49, true, file, is, len)));
605610
intercept(IOException.class,
606611
() -> complete(uploader0, uploadHandle, file, partHandles));
607612

@@ -701,7 +706,8 @@ public void testPutPartEmptyUploadID() throws Exception {
701706
byte[] payload = generatePayload(1);
702707
InputStream is = new ByteArrayInputStream(payload);
703708
intercept(IllegalArgumentException.class,
704-
() -> uploader0.putPart(emptyHandle, 1, dest, is, payload.length));
709+
() -> uploader0.putPart(emptyHandle, 1, true, dest, is,
710+
payload.length));
705711
}
706712

707713
/**
@@ -715,7 +721,7 @@ public void testCompleteEmptyUploadID() throws Exception {
715721
UploadHandle emptyHandle =
716722
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
717723
Map<Integer, PartHandle> partHandles = new HashMap<>();
718-
PartHandle partHandle = putPart(dest, realHandle, 1,
724+
PartHandle partHandle = putPart(dest, realHandle, 1, true,
719725
generatePayload(1, SMALL_FILE));
720726
partHandles.put(1, partHandle);
721727

@@ -743,7 +749,7 @@ public void testDirectoryInTheWay() throws Exception {
743749
UploadHandle uploadHandle = startUpload(file);
744750
Map<Integer, PartHandle> partHandles = new HashMap<>();
745751
int size = SMALL_FILE;
746-
PartHandle partHandle = putPart(file, uploadHandle, 1,
752+
PartHandle partHandle = putPart(file, uploadHandle, 1, true,
747753
generatePayload(1, size));
748754
partHandles.put(1, partHandle);
749755

@@ -802,10 +808,10 @@ public void testConcurrentUploads() throws Throwable {
802808
assertNotEquals("Upload handles match", upload1, upload2);
803809

804810
// put part 1
805-
partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
811+
partHandles1.put(partId1, putPart(file, upload1, partId1, false, payload1));
806812

807813
// put part2
808-
partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
814+
partHandles2.put(partId2, putPart(file, upload2, partId2, true, payload2));
809815

810816
// complete part u1. expect its size and digest to
811817
// be as expected.

hadoop-project/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@
205205
<surefire.fork.timeout>900</surefire.fork.timeout>
206206
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
207207
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
208+
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
208209
<aws.eventstream.version>1.0.1</aws.eventstream.version>
209210
<hsqldb.version>2.7.1</hsqldb.version>
210211
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
@@ -1180,6 +1181,17 @@
11801181
</exclusion>
11811182
</exclusions>
11821183
</dependency>
1184+
<dependency>
1185+
<groupId>software.amazon.encryption.s3</groupId>
1186+
<artifactId>amazon-s3-encryption-client-java</artifactId>
1187+
<version>${amazon-s3-encryption-client-java.version}</version>
1188+
<exclusions>
1189+
<exclusion>
1190+
<groupId>*</groupId>
1191+
<artifactId>*</artifactId>
1192+
</exclusion>
1193+
</exclusions>
1194+
</dependency>
11831195
<dependency>
11841196
<groupId>org.apache.mina</groupId>
11851197
<artifactId>mina-core</artifactId>

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,16 @@
467467
<bannedImport>org.apache.hadoop.mapred.**</bannedImport>
468468
</bannedImports>
469469
</restrictImports>
470+
<restrictImports>
471+
<includeTestCode>false</includeTestCode>
472+
<reason>Restrict encryption client imports to encryption client factory</reason>
473+
<exclusions>
474+
<exclusion>org.apache.hadoop.fs.s3a.impl.EncryptionS3ClientFactory</exclusion>
475+
</exclusions>
476+
<bannedImports>
477+
<bannedImport>software.amazon.encryption.s3.**</bannedImport>
478+
</bannedImports>
479+
</restrictImports>
470480
</rules>
471481
</configuration>
472482
</execution>
@@ -511,6 +521,11 @@
511521
<artifactId>bundle</artifactId>
512522
<scope>compile</scope>
513523
</dependency>
524+
<dependency>
525+
<groupId>software.amazon.encryption.s3</groupId>
526+
<artifactId>amazon-s3-encryption-client-java</artifactId>
527+
<scope>provided</scope>
528+
</dependency>
514529
<dependency>
515530
<groupId>org.assertj</groupId>
516531
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,35 @@ private Constants() {
783783
public static final String S3_ENCRYPTION_CONTEXT =
784784
"fs.s3a.encryption.context";
785785

786+
/**
787+
* Client side encryption (CSE-CUSTOM) with custom cryptographic material manager class name.
788+
* Custom keyring class name for CSE-KMS.
789+
* value:{@value}
790+
*/
791+
public static final String S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME =
792+
"fs.s3a.encryption.cse.custom.keyring.class.name";
793+
794+
/**
795+
* Config to provide backward compatibility with V1 encryption client.
796+
* Enabling this configuration will invoke the followings
797+
* 1. Unencrypted s3 objects will be read using unencrypted/base s3 client when CSE is enabled.
798+
* 2. Size of encrypted object will be fetched from object header if present or
799+
* calculated using ranged S3 GET calls.
800+
* value:{@value}
801+
*/
802+
public static final String S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED =
803+
"fs.s3a.encryption.cse.v1.compatibility.enabled";
804+
805+
/**
806+
* Default value : {@value}.
807+
*/
808+
public static final boolean S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED_DEFAULT = false;
809+
810+
/**
811+
* S3 CSE-KMS KMS region config.
812+
*/
813+
public static final String S3_ENCRYPTION_CSE_KMS_REGION = "fs.s3a.encryption.cse.kms.region";
814+
786815
/**
787816
* List of custom Signers. The signer class will be loaded, and the signer
788817
* name will be associated with this signer class in the S3 SDK.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
4141
import software.amazon.awssdk.regions.Region;
4242
import software.amazon.awssdk.services.s3.S3AsyncClient;
43+
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
4344
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
4445
import software.amazon.awssdk.services.s3.S3Client;
4546
import software.amazon.awssdk.services.s3.S3Configuration;
@@ -152,11 +153,17 @@ public S3AsyncClient createS3AsyncClient(
152153
.thresholdInBytes(parameters.getMultiPartThreshold())
153154
.build();
154155

155-
return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
156-
.httpClientBuilder(httpClientBuilder)
157-
.multipartConfiguration(multipartConfiguration)
158-
.multipartEnabled(parameters.isMultipartCopy())
159-
.build();
156+
S3AsyncClientBuilder s3AsyncClientBuilder =
157+
configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
158+
.httpClientBuilder(httpClientBuilder);
159+
160+
// multipart upload pending with HADOOP-19326.
161+
if (!parameters.isClientSideEncryptionEnabled()) {
162+
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
163+
.multipartEnabled(parameters.isMultipartCopy());
164+
}
165+
166+
return s3AsyncClientBuilder.build();
160167
}
161168

162169
@Override
@@ -363,7 +370,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
363370
* @param conf config to build the URI from.
364371
* @return an endpoint uri
365372
*/
366-
private static URI getS3Endpoint(String endpoint, final Configuration conf) {
373+
protected static URI getS3Endpoint(String endpoint, final Configuration conf) {
367374

368375
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
369376

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
public class Listing extends AbstractStoreOperation {
7676

7777
private static final Logger LOG = S3AFileSystem.LOG;
78-
private final boolean isCSEEnabled;
7978

8079
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
8180
new AcceptAllButS3nDirs();
@@ -86,7 +85,6 @@ public Listing(ListingOperationCallbacks listingOperationCallbacks,
8685
StoreContext storeContext) {
8786
super(storeContext);
8887
this.listingOperationCallbacks = listingOperationCallbacks;
89-
this.isCSEEnabled = storeContext.isCSEEnabled();
9088
}
9189

9290
/**
@@ -446,14 +444,17 @@ private boolean requestNextBatch() throws IOException {
446444
* Build the next status batch from a listing.
447445
* @param objects the next object listing
448446
* @return true if this added any entries after filtering
447+
* @throws IOException IO problems. This can happen only when CSE is enabled.
449448
*/
450-
private boolean buildNextStatusBatch(S3ListResult objects) {
449+
private boolean buildNextStatusBatch(S3ListResult objects) throws IOException {
451450
// counters for debug logs
452451
int added = 0, ignored = 0;
453452
// list to fill in with results. Initial size will be list maximum.
454453
List<S3AFileStatus> stats = new ArrayList<>(
455454
objects.getS3Objects().size() +
456455
objects.getCommonPrefixes().size());
456+
String userName = getStoreContext().getUsername();
457+
long blockSize = listingOperationCallbacks.getDefaultBlockSize(null);
457458
// objects
458459
for (S3Object s3Object : objects.getS3Objects()) {
459460
String key = s3Object.key();
@@ -464,9 +465,9 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
464465
// Skip over keys that are ourselves and old S3N _$folder$ files
465466
if (acceptor.accept(keyPath, s3Object) && filter.accept(keyPath)) {
466467
S3AFileStatus status = createFileStatus(keyPath, s3Object,
467-
listingOperationCallbacks.getDefaultBlockSize(keyPath),
468-
getStoreContext().getUsername(),
469-
s3Object.eTag(), null, isCSEEnabled);
468+
blockSize, userName, s3Object.eTag(),
469+
null,
470+
listingOperationCallbacks.getObjectSize(s3Object));
470471
LOG.debug("Adding: {}", status);
471472
stats.add(status);
472473
added++;

0 commit comments

Comments
 (0)