Skip to content

Commit 9c8e753

Browse files
committed
HADOOP-19354. S3A: Create S3 InputStreams via a factory
S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics. S3AStore * Manages the creation and service lifecycle of the chosen factory, as well as forwarding stream construction requests to the chosen factory. * Provides the callbacks needed by both the factories and input streams. * StreamCapabilities.hasCapability(), which is relayed to the active factory. This avoids the FS having to know what capabilities are available in the stream.
1 parent f38d707 commit 9c8e753

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2146
-706
lines changed

hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
</Match>
3131
<!-- we are using completable futures, so ignore the Future which submit() returns -->
3232
<Match>
33-
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
33+
<Class name="org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl" />
3434
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
3535
</Match>
3636

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.hadoop.classification.InterfaceAudience;
2222
import org.apache.hadoop.classification.InterfaceStability;
2323
import org.apache.hadoop.fs.Options;
24+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
25+
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
2426
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
2527

2628
import java.time.Duration;
@@ -1580,6 +1582,34 @@ private Constants() {
15801582
*/
15811583
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";
15821584

1585+
/**
1586+
* Input stream type: {@value}.
1587+
*/
1588+
public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";
1589+
1590+
/**
1591+
* The classic input stream: {@value}.
1592+
*/
1593+
public static final String INPUT_STREAM_TYPE_CLASSIC =
1594+
StreamIntegration.CLASSIC;
1595+
1596+
/**
1597+
* The prefetching input stream: "prefetch".
1598+
*/
1599+
public static final String INPUT_STREAM_TYPE_PREFETCH = StreamIntegration.PREFETCH;
1600+
1601+
/**
1602+
* The analytics input stream: "analytics".
1603+
*/
1604+
public static final String INPUT_STREAM_TYPE_ANALYTICS =
1605+
StreamIntegration.ANALYTICS;
1606+
1607+
/**
1608+
* The default input stream.
1609+
* Currently {@link #INPUT_STREAM_TYPE_CLASSIC}
1610+
*/
1611+
public static final String INPUT_STREAM_TYPE_DEFAULT = InputStreamType.DEFAULT_STREAM_TYPE.getName();
1612+
15831613
/**
15841614
* Controls whether the prefetching input stream is enabled.
15851615
*/

0 commit comments

Comments
 (0)