Skip to content

Commit a8da9b2

Browse files
committed
Integrates AAL into S3A
1 parent 1a81c3b commit a8da9b2

File tree

42 files changed

+889
-14
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+889
-14
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ public final class StreamStatisticNames {
9797
*/
9898
public static final String STREAM_READ_OPENED = "stream_read_opened";
9999

100+
/**
101+
* Total count of times an analytics input stream was opened.
102+
*
103+
* Value: {@value}.
104+
*/
105+
public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened";
106+
100107
/**
101108
* Count of exceptions raised during input stream reads.
102109
* Value: {@value}.

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ public void testMultipartUpload() throws Exception {
507507
@Test
508508
public void testMultipartUploadEmptyPart() throws Exception {
509509
FileSystem fs = getFileSystem();
510-
Path file = path("testMultipartUpload");
510+
Path file = path("testMultipartUploadEmptyPart");
511511
try (MultipartUploader uploader =
512512
fs.createMultipartUploader(file).build()) {
513513
UploadHandle uploadHandle = uploader.startUpload(file).get();

hadoop-project/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@
206206
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
207207
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
208208
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
209+
<amazon-s3-analyticsaccelerator-s3.version>0.0.4</amazon-s3-analyticsaccelerator-s3.version>
209210
<aws.eventstream.version>1.0.1</aws.eventstream.version>
210211
<hsqldb.version>2.7.1</hsqldb.version>
211212
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
@@ -1113,6 +1114,11 @@
11131114
</exclusion>
11141115
</exclusions>
11151116
</dependency>
1117+
<dependency>
1118+
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
1119+
<artifactId>analyticsaccelerator-s3</artifactId>
1120+
<version>${amazon-s3-analyticsaccelerator-s3.version}</version>
1121+
</dependency>
11161122
<dependency>
11171123
<groupId>org.apache.mina</groupId>
11181124
<artifactId>mina-core</artifactId>

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,11 @@
484484
<artifactId>amazon-s3-encryption-client-java</artifactId>
485485
<scope>provided</scope>
486486
</dependency>
487+
<dependency>
488+
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
489+
<artifactId>analyticsaccelerator-s3</artifactId>
490+
<scope>compile</scope>
491+
</dependency>
487492
<dependency>
488493
<groupId>org.assertj</groupId>
489494
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1827,4 +1827,13 @@ private Constants() {
18271827
* Value: {@value}.
18281828
*/
18291829
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
1830+
1831+
1832+
/**
1833+
* Prefix to configure Analytics Accelerator Library.
1834+
* Value: {@value}.
1835+
*/
1836+
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
1837+
"fs.s3a.analytics.accelerator";
1838+
18301839
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public S3AsyncClient createS3AsyncClient(
166166
.httpClientBuilder(httpClientBuilder);
167167

168168
// multipart upload pending with HADOOP-19326.
169-
if (!parameters.isClientSideEncryptionEnabled()) {
169+
if (!parameters.isClientSideEncryptionEnabled() && !parameters.isAnalyticsAcceleratorEnabled()) {
170170
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
171171
.multipartEnabled(parameters.isMultipartCopy());
172172
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,11 @@
147147
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
148148
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
149149
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
150+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
150151
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
151152
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
152153
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
154+
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
153155
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
154156
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
155157
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -440,6 +442,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
440442
*/
441443
private boolean isCSEEnabled;
442444

445+
/**
446+
* Is this S3A FS instance using analytics accelerator?
447+
*/
448+
private boolean isAnalyticsAccelaratorEnabled;
449+
443450
/**
444451
* Bucket AccessPoint.
445452
*/
@@ -629,6 +636,8 @@ public void initialize(URI name, Configuration originalConf)
629636
// If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
630637
isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());
631638

639+
isAnalyticsAccelaratorEnabled = StreamIntegration.determineInputStreamType(conf).equals(InputStreamType.Analytics);
640+
632641
// Create the appropriate fsHandler instance using a factory method
633642
fsHandler = createFileSystemHandler();
634643
fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
@@ -1156,6 +1165,7 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I
11561165
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
11571166
.withClientSideEncryptionEnabled(isCSEEnabled)
11581167
.withClientSideEncryptionMaterials(cseMaterials)
1168+
.withAnalyticsAcceleratorEnabled(isAnalyticsAccelaratorEnabled)
11591169
.withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION));
11601170

11611171
// this is where clients and the transfer manager are created on demand.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.hadoop.classification.InterfaceStability;
2929
import org.apache.hadoop.fs.FileSystem;
3030
import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
31+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
3132
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
3233
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
3334
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -840,6 +841,7 @@ private final class InputStreamStatistics
840841
private final AtomicLong closed;
841842
private final AtomicLong forwardSeekOperations;
842843
private final AtomicLong openOperations;
844+
private final AtomicLong analyticsStreamOpenOperations;
843845
private final AtomicLong readExceptions;
844846
private final AtomicLong readsIncomplete;
845847
private final AtomicLong readOperations;
@@ -888,7 +890,8 @@ private InputStreamStatistics(
888890
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
889891
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
890892
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
891-
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
893+
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
894+
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
892895
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
893896
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
894897
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -927,6 +930,9 @@ private InputStreamStatistics(
927930
StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS);
928931
openOperations = st.getCounterReference(
929932
StreamStatisticNames.STREAM_READ_OPENED);
933+
analyticsStreamOpenOperations = st.getCounterReference(
934+
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED
935+
);
930936
readExceptions = st.getCounterReference(
931937
StreamStatisticNames.STREAM_READ_EXCEPTIONS);
932938
readsIncomplete = st.getCounterReference(
@@ -1030,6 +1036,17 @@ public long streamOpened() {
10301036
return openOperations.getAndIncrement();
10311037
}
10321038

1039+
@Override
1040+
public long streamOpened(InputStreamType type) {
1041+
long count = openOperations.getAndIncrement();
1042+
1043+
if (type == InputStreamType.Analytics) {
1044+
count = analyticsStreamOpenOperations.getAndIncrement();
1045+
}
1046+
1047+
return count;
1048+
}
1049+
10331050
/**
10341051
* {@inheritDoc}.
10351052
* If the connection was aborted, increment {@link #aborted}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,11 @@ final class S3ClientCreationParameters {
202202
*/
203203
private boolean fipsEnabled;
204204

205+
/**
206+
* Is analytics accelerator enabled?
207+
*/
208+
private boolean isAnalyticsAcceleratorEnabled;
209+
205210
/**
206211
* List of execution interceptors to include in the chain
207212
* of interceptors in the SDK.
@@ -457,6 +462,17 @@ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean
457462
return this;
458463
}
459464

465+
/**
466+
* Set the analytics accelerator enabled flag.
467+
*
468+
* @param value new value
469+
* @return the builder
470+
*/
471+
public S3ClientCreationParameters withAnalyticsAcceleratorEnabled(final boolean value) {
472+
this.isAnalyticsAcceleratorEnabled = value;
473+
return this;
474+
}
475+
460476
/**
461477
* Set the KMS client region.
462478
* This is required for CSE-KMS
@@ -477,6 +493,14 @@ public boolean isClientSideEncryptionEnabled() {
477493
return this.isCSEEnabled;
478494
}
479495

496+
/**
497+
* Get the analytics accelerator enabled flag.
498+
* @return analytics accelerator enabled flag.
499+
*/
500+
public boolean isAnalyticsAcceleratorEnabled() {
501+
return this.isAnalyticsAcceleratorEnabled;
502+
}
503+
480504
/**
481505
* Set the client side encryption materials.
482506
*

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,10 @@ public enum Statistic {
322322
TYPE_COUNTER),
323323

324324
/* Stream Reads */
325+
STREAM_READ_ANALYTICS_OPENED(
326+
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
327+
"Total count of times an analytics input stream to object store data was opened",
328+
TYPE_COUNTER),
325329
STREAM_READ_BYTES(
326330
StreamStatisticNames.STREAM_READ_BYTES,
327331
"Bytes read from an input stream in read() calls",

0 commit comments

Comments
 (0)