Skip to content

Commit 3b31b5a

Browse files
committed
Address PR Comments
1 parent 838dc24 commit 3b31b5a

32 files changed

+654
-278
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
5757
* It is possible to have parts uploaded in any order (or in parallel).
5858
* @param uploadId Identifier from {@link #startUpload(Path)}.
5959
* @param partNumber Index of the part relative to others.
60+
* @param isLastPart is the part the last part of the upload?
6061
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
6162
* @param inputStream Data for this part. Implementations MUST close this
6263
* stream after reading in the data.
@@ -67,6 +68,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
6768
CompletableFuture<PartHandle> putPart(
6869
UploadHandle uploadId,
6970
int partNumber,
71+
boolean isLastPart,
7072
Path filePath,
7173
InputStream inputStream,
7274
long lengthInBytes)
@@ -77,7 +79,7 @@ CompletableFuture<PartHandle> putPart(
7779
* @param uploadId Identifier from {@link #startUpload(Path)}.
7880
* @param filePath Target path for upload (as {@link #startUpload(Path)}.
7981
* @param handles non-empty map of part number to part handle.
80-
* from {@link #putPart(UploadHandle, int, Path, InputStream, long)}.
82+
* from {@link #putPart(UploadHandle, int, boolean, Path, InputStream, long)}.
8183
* @return unique PathHandle identifier for the uploaded file.
8284
* @throws IOException IO failure
8385
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {
101101

102102
/**
103103
* Check all the arguments to the
104-
* {@link MultipartUploader#putPart(UploadHandle, int, Path, InputStream, long)}
104+
* {@link MultipartUploader#putPart(UploadHandle, int, boolean, Path, InputStream, long)}
105105
* operation.
106106
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
107107
* @param inputStream Data for this part. Implementations MUST close this

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public CompletableFuture<UploadHandle> startUpload(Path filePath)
111111

112112
@Override
113113
public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
114-
int partNumber, Path filePath,
114+
int partNumber, boolean isLastPart, Path filePath,
115115
InputStream inputStream,
116116
long lengthInBytes)
117117
throws IOException {

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ public void testSingleUpload() throws Exception {
249249
// was interpreted as an inconsistent write.
250250
MultipartUploader completer = uploader0;
251251
// and upload with uploader 1 to validate cross-uploader uploads
252-
PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
252+
PartHandle partHandle = putPart(file, uploadHandle, 1, true, payload);
253253
partHandles.put(1, partHandle);
254254
PathHandle fd = complete(completer, uploadHandle, file,
255255
partHandles);
@@ -317,12 +317,13 @@ protected PartHandle buildAndPutPart(
317317
final Path file,
318318
final UploadHandle uploadHandle,
319319
final int index,
320+
final boolean isLastPart,
320321
final MessageDigest origDigest) throws IOException {
321322
byte[] payload = generatePayload(index);
322323
if (origDigest != null) {
323324
origDigest.update(payload);
324325
}
325-
return putPart(file, uploadHandle, index, payload);
326+
return putPart(file, uploadHandle, index, isLastPart, payload);
326327
}
327328

328329
/**
@@ -331,13 +332,15 @@ protected PartHandle buildAndPutPart(
331332
* @param file destination
332333
* @param uploadHandle handle
333334
* @param index index of part
335+
* @param isLastPart is last part of the upload ?
334336
* @param payload byte array of payload
335337
* @return the part handle
336338
* @throws IOException IO failure.
337339
*/
338340
protected PartHandle putPart(final Path file,
339341
final UploadHandle uploadHandle,
340342
final int index,
343+
final boolean isLastPart,
341344
final byte[] payload) throws IOException {
342345
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
343346
PartHandle partHandle;
@@ -347,7 +350,7 @@ protected PartHandle putPart(final Path file,
347350
payload.length,
348351
file)) {
349352
partHandle = awaitFuture(getUploader(index)
350-
.putPart(uploadHandle, index, file,
353+
.putPart(uploadHandle, index, isLastPart, file,
351354
new ByteArrayInputStream(payload),
352355
payload.length));
353356
}
@@ -488,7 +491,7 @@ public void testMultipartUpload() throws Exception {
488491
MessageDigest origDigest = DigestUtils.getMd5Digest();
489492
int payloadCount = getTestPayloadCount();
490493
for (int i = 1; i <= payloadCount; ++i) {
491-
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
494+
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, i == payloadCount,
492495
origDigest);
493496
partHandles.put(i, partHandle);
494497
}
@@ -515,7 +518,7 @@ public void testMultipartUploadEmptyPart() throws Exception {
515518
origDigest.update(payload);
516519
InputStream is = new ByteArrayInputStream(payload);
517520
PartHandle partHandle = awaitFuture(
518-
uploader.putPart(uploadHandle, 1, file, is, payload.length));
521+
uploader.putPart(uploadHandle, 1, true, file, is, payload.length));
519522
partHandles.put(1, partHandle);
520523
completeUpload(file, uploadHandle, partHandles, origDigest, 0);
521524
}
@@ -530,7 +533,7 @@ public void testUploadEmptyBlock() throws Exception {
530533
Path file = methodPath();
531534
UploadHandle uploadHandle = startUpload(file);
532535
Map<Integer, PartHandle> partHandles = new HashMap<>();
533-
partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
536+
partHandles.put(1, putPart(file, uploadHandle, 1, true, new byte[0]));
534537
completeUpload(file, uploadHandle, partHandles, null, 0);
535538
}
536539

@@ -550,7 +553,8 @@ public void testMultipartUploadReverseOrder() throws Exception {
550553
origDigest.update(payload);
551554
}
552555
for (int i = payloadCount; i > 0; --i) {
553-
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
556+
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
557+
null));
554558
}
555559
completeUpload(file, uploadHandle, partHandles, origDigest,
556560
payloadCount * partSizeInBytes());
@@ -574,7 +578,8 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
574578
}
575579
Map<Integer, PartHandle> partHandles = new HashMap<>();
576580
for (int i = payloadCount; i > 0; i -= 2) {
577-
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
581+
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
582+
null));
578583
}
579584
completeUpload(file, uploadHandle, partHandles, origDigest,
580585
getTestPayloadCount() * partSizeInBytes());
@@ -591,7 +596,7 @@ public void testMultipartUploadAbort() throws Exception {
591596
UploadHandle uploadHandle = startUpload(file);
592597
Map<Integer, PartHandle> partHandles = new HashMap<>();
593598
for (int i = 12; i > 10; i--) {
594-
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
599+
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == 12, null));
595600
}
596601
abortUpload(uploadHandle, file);
597602

@@ -601,7 +606,7 @@ public void testMultipartUploadAbort() throws Exception {
601606

602607
intercept(IOException.class,
603608
() -> awaitFuture(
604-
uploader0.putPart(uploadHandle, 49, file, is, len)));
609+
uploader0.putPart(uploadHandle, 49, true, file, is, len)));
605610
intercept(IOException.class,
606611
() -> complete(uploader0, uploadHandle, file, partHandles));
607612

@@ -701,7 +706,8 @@ public void testPutPartEmptyUploadID() throws Exception {
701706
byte[] payload = generatePayload(1);
702707
InputStream is = new ByteArrayInputStream(payload);
703708
intercept(IllegalArgumentException.class,
704-
() -> uploader0.putPart(emptyHandle, 1, dest, is, payload.length));
709+
() -> uploader0.putPart(emptyHandle, 1, true, dest, is,
710+
payload.length));
705711
}
706712

707713
/**
@@ -715,7 +721,7 @@ public void testCompleteEmptyUploadID() throws Exception {
715721
UploadHandle emptyHandle =
716722
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
717723
Map<Integer, PartHandle> partHandles = new HashMap<>();
718-
PartHandle partHandle = putPart(dest, realHandle, 1,
724+
PartHandle partHandle = putPart(dest, realHandle, 1, true,
719725
generatePayload(1, SMALL_FILE));
720726
partHandles.put(1, partHandle);
721727

@@ -743,7 +749,7 @@ public void testDirectoryInTheWay() throws Exception {
743749
UploadHandle uploadHandle = startUpload(file);
744750
Map<Integer, PartHandle> partHandles = new HashMap<>();
745751
int size = SMALL_FILE;
746-
PartHandle partHandle = putPart(file, uploadHandle, 1,
752+
PartHandle partHandle = putPart(file, uploadHandle, 1, true,
747753
generatePayload(1, size));
748754
partHandles.put(1, partHandle);
749755

@@ -802,10 +808,10 @@ public void testConcurrentUploads() throws Throwable {
802808
assertNotEquals("Upload handles match", upload1, upload2);
803809

804810
// put part 1
805-
partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
811+
partHandles1.put(partId1, putPart(file, upload1, partId1, false, payload1));
806812

807813
// put part2
808-
partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
814+
partHandles2.put(partId2, putPart(file, upload2, partId2, true, payload2));
809815

810816
// complete part u1. expect its size and digest to
811817
// be as expected.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -737,14 +737,16 @@ private Constants() {
737737
"fs.s3a.encryption.key";
738738

739739
/**
740-
* Client side encryption (CSE-CUSTOM) with custom cryptographic material manager class name.
740+
* Custom keyring class name for CSE-KMS.
741+
* value:{@value}
741742
*/
742743
public static final String S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME =
743744
"fs.s3a.encryption.cse.custom.keyring.class.name";
744745

745746
/**
746-
* This config initializes unencrypted s3 client will be used to access unencrypted
747-
* s3 object. This is to provide backward compatibility.
747+
* Config to support reading unencrypted s3 objects when CSE is enabled.
748+
* This is to provide backward compatibility with V1/V2 client.
749+
* value:{@value}
748750
*/
749751
public static final String S3_ENCRYPTION_CSE_READ_UNENCRYPTED_OBJECTS =
750752
"fs.s3a.encryption.cse.read.unencrypted.objects";
@@ -759,7 +761,8 @@ private Constants() {
759761
* This is to provide backward compatability with objects encrypted with V1 client.
760762
* Unlike V2 and V3 client which always pads 16 bytes, V1 client pads bytes till the
761763
* object size reaches next multiple of 16.
762-
* * This is to provide backward compatibility.
764+
* This is to provide backward compatibility with V1 client.
765+
* value:{@value}
763766
*/
764767
public static final String S3_ENCRYPTION_CSE_OBJECT_SIZE_FROM_RANGED_GET_ENABLED =
765768
"fs.s3a.encryption.cse.object.size.ranged.get.enabled";
@@ -770,10 +773,11 @@ private Constants() {
770773
public static final boolean S3_ENCRYPTION_CSE_OBJECT_SIZE_FROM_RANGED_GET_ENABLED_DEFAULT = false;
771774

772775
/**
773-
* Config to control whether to skip file named with suffix
774-
* {@link #S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}. Encryption V1 client supports storing
775-
* encryption metadata in an instruction file which should be skipped while listing for the files.
776-
* This is to provide backward compatibility.
776+
* Config to skip file named with suffix
777+
* {@link #S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}. V1/V2 client supports storing encryption
778+
* metadata in an instruction file which should be skipped while listing for the files.
779+
* This is to provide backward compatibility with V1/V2 client.
780+
* value:{@value}
777781
*/
778782
public static final String S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE =
779783
"fs.s3a.encryption.cse.skip.instruction.file";

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
import java.io.IOException;
2222
import java.net.URI;
2323

24-
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.s3a.impl.encryption.CSEMaterials;
26+
import org.apache.hadoop.util.Preconditions;
27+
import org.apache.hadoop.util.ReflectionUtils;
28+
import org.apache.hadoop.util.functional.LazyAtomicReference;
29+
2530
import software.amazon.awssdk.services.s3.S3AsyncClient;
2631
import software.amazon.awssdk.services.s3.S3Client;
2732
import software.amazon.encryption.s3.S3AsyncEncryptionClient;
@@ -30,12 +35,11 @@
3035
import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
3136
import software.amazon.encryption.s3.materials.Keyring;
3237

33-
import org.apache.hadoop.conf.Configuration;
34-
import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
35-
import org.apache.hadoop.util.ReflectionUtils;
36-
3738
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
3839

40+
/**
41+
* Factory class to create encrypted s3 client and encrypted async s3 client.
42+
*/
3943
public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
4044

4145
private static final String ENCRYPTION_CLIENT_CLASSNAME =
@@ -44,7 +48,11 @@ public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
4448
/**
4549
* Encryption client availability.
4650
*/
47-
private static final boolean ENCRYPTION_CLIENT_FOUND = checkForEncryptionClient();
51+
private static final LazyAtomicReference<Boolean> ENCRYPTION_CLIENT_AVAILABLE =
52+
LazyAtomicReference.lazyAtomicReferenceFromSupplier(
53+
EncryptionS3ClientFactory::checkForEncryptionClient
54+
);
55+
4856

4957
/**
5058
* S3Client to be wrapped by encryption client.
@@ -73,11 +81,13 @@ private static boolean checkForEncryptionClient() {
7381
* @return true if it was found in the classloader
7482
*/
7583
private static synchronized boolean isEncryptionClientAvailable() {
76-
return ENCRYPTION_CLIENT_FOUND;
84+
return ENCRYPTION_CLIENT_AVAILABLE.get();
7785
}
7886

7987
/**
80-
* Create encrypted s3 client.
88+
* Creates both synchronous and asynchronous encrypted s3 clients.
89+
* Synchronous client is wrapped by encryption client first and then
90+
* Asynchronous client is wrapped by encryption client.
8191
* @param uri S3A file system URI
8292
* @param parameters parameter object
8393
* @return encrypted s3 client
@@ -114,7 +124,16 @@ public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters par
114124
return createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
115125
}
116126

127+
/**
128+
* Create encrypted s3 client.
129+
* @param cseMaterials
130+
* @return encrypted s3 client
131+
*/
117132
private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
133+
Preconditions.checkArgument(s3AsyncClient !=null,
134+
"S3 async client not initialized");
135+
Preconditions.checkArgument(s3Client !=null,
136+
"S3 client not initialized");
118137
S3EncryptionClient.Builder s3EncryptionClientBuilder =
119138
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
120139
// this is required for doing S3 ranged GET calls
@@ -141,7 +160,14 @@ private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
141160
return s3EncryptionClientBuilder.build();
142161
}
143162

163+
/**
164+
* Create async encrypted s3 client.
165+
* @param cseMaterials
166+
* @return encrypted async s3 client
167+
*/
144168
private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials cseMaterials) {
169+
Preconditions.checkArgument(s3AsyncClient !=null,
170+
"S3 async client not initialized");
145171
S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
146172
S3AsyncEncryptionClient.builder().wrappedClient(s3AsyncClient)
147173
// this is required for doing S3 ranged GET calls
@@ -186,10 +212,8 @@ private Keyring getKeyringProvider(String className,
186212
}
187213

188214
private Class<? extends Keyring> getCustomKeyringProviderClass(String className) {
189-
if (Strings.isNullOrEmpty(className)) {
190-
throw new IllegalArgumentException(
191-
"Custom Keyring class name is null or empty");
192-
}
215+
Preconditions.checkArgument(className !=null && !className.isEmpty(),
216+
"Custom Keyring class name is null or empty");
193217
try {
194218
return Class.forName(className).asSubclass(Keyring.class);
195219
} catch (ClassNotFoundException e) {

0 commit comments

Comments
 (0)