Skip to content

Commit 2999d57

Browse files
committed
only enable multipart on async client if AAL && CSE are disabled.
This commit should not be part of the final merge, but is required till we upgrade the SDK.
1 parent 969bfc1 commit 2999d57

File tree

4 files changed

+36
-2
lines changed

4 files changed

+36
-2
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public S3AsyncClient createS3AsyncClient(
166166
.httpClientBuilder(httpClientBuilder);
167167

168168
// multipart upload pending with HADOOP-19326.
169-
if (!parameters.isClientSideEncryptionEnabled()) {
169+
if (!parameters.isClientSideEncryptionEnabled() && !parameters.isAnalyticsAcceleratorEnabled()) {
170170
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
171171
.multipartEnabled(parameters.isMultipartCopy());
172172
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,11 @@
146146
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
147147
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
148148
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
149+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
149150
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
150151
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
151152
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
153+
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
152154
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
153155
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
154156
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -433,6 +435,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
433435
*/
434436
private boolean isCSEEnabled;
435437

438+
/**
439+
* Is this S3A FS instance using analytics accelerator?
440+
*/
441+
private boolean isAnalyticsAccelaratorEnabled;
442+
436443
/**
437444
* Bucket AccessPoint.
438445
*/
@@ -605,6 +612,8 @@ public void initialize(URI name, Configuration originalConf)
605612
// If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
606613
isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());
607614

615+
isAnalyticsAccelaratorEnabled = StreamIntegration.determineInputStreamType(conf).equals(InputStreamType.Analytics);
616+
608617
// Create the appropriate fsHandler instance using a factory method
609618
fsHandler = createFileSystemHandler();
610619
fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
@@ -1121,6 +1130,7 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I
11211130
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
11221131
.withClientSideEncryptionEnabled(isCSEEnabled)
11231132
.withClientSideEncryptionMaterials(cseMaterials)
1133+
.withAnalyticsAcceleratorEnabled(isAnalyticsAccelaratorEnabled)
11241134
.withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION));
11251135

11261136
// this is where clients and the transfer manager are created on demand.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,11 @@ final class S3ClientCreationParameters {
202202
*/
203203
private boolean fipsEnabled;
204204

205+
/**
206+
* Is analytics accelerator enabled
207+
*/
208+
private boolean isAnalyticsAcceleratorEnabled;
209+
205210
/**
206211
* List of execution interceptors to include in the chain
207212
* of interceptors in the SDK.
@@ -457,6 +462,17 @@ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean
457462
return this;
458463
}
459464

465+
/**
466+
* Set the analytics accelerator enabled flag.
467+
*
468+
* @param value new value
469+
* @return the builder
470+
*/
471+
public S3ClientCreationParameters withAnalyticsAcceleratorEnabled(final boolean value) {
472+
this.isAnalyticsAcceleratorEnabled = value;
473+
return this;
474+
}
475+
460476
/**
461477
* Set the KMS client region.
462478
* This is required for CSE-KMS
@@ -477,6 +493,14 @@ public boolean isClientSideEncryptionEnabled() {
477493
return this.isCSEEnabled;
478494
}
479495

496+
/**
497+
* Get the analytics accelerator enabled flag.
498+
* @return analytics accelerator enabled flag.
499+
*/
500+
public boolean isAnalyticsAcceleratorEnabled() {
501+
return this.isAnalyticsAcceleratorEnabled;
502+
}
503+
480504
/**
481505
* Set the client side encryption materials.
482506
*

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public static ObjectInputStreamFactory factoryFromConfig(final Configuration con
135135
* @param conf configuration
136136
* @return a stream factory.
137137
*/
138-
static InputStreamType determineInputStreamType(final Configuration conf) {
138+
public static InputStreamType determineInputStreamType(final Configuration conf) {
139139
// work out the default stream; this includes looking at the
140140
// deprecated prefetch enabled key to see if it is set.
141141
if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {

0 commit comments

Comments
 (0)