Skip to content

Commit eadf0dd

Browse files
committed
HADOOP-19354. S3AInputStream to be created by factory under S3AStore
Ability to create custom streams (type = custom), which reads class from "fs.s3a.input.stream.custom.factory". This is mainly for testing, especially CNFE and similar. Unit test TestStreamFactories for this. ObjectInputStreams save and export stream type to assist these tests too, as it enables assertions on the generated stream type. Simplified that logic related to the old prefetch enabled flag If fs.s3a.prefetch.enabled is true, the prefetch stream is returned, the stream.type option is not used at all. Simpler logic, simpler docs, fewer support calls. Parameters supplied to ObjectInputStreamFactory.bind converted to a parameter object. Allows for more parameters to be added later if ever required. ObjectInputStreamFactory returns more requirements to the store/fs. For this reason StreamThreadOptions threadRequirements(); is renamed StreamFactoryRequirements factoryRequirements() VectorIO context changes * Returned in factoryRequirements() * exiting configuration reading code moved into StreamIntegration.populateVectoredIOContext() * Streams which don't have custom vector IO, e.g. prefetching can return a minimum seek range of 0. This disables range merging on the default PositionedReadable implementation, so ensures that they will only get asked for data which will be read...leaving prefetch/cache code to know exactly what is needed. Other * Draft docs. * Stream capability declares stream type & is exported through FS too. (todo: test, document, add to bucket-info) * ConfigurationHelper.resolveEnum() supercedes Configuration.getEnum() with - case independence - fallback is a supplier<Enum> rather than a simple value. Change-Id: I2e59300af48042df8173de61d0b3d6139a0ae7fe
1 parent 9c8e753 commit eadf0dd

File tree

18 files changed

+794
-121
lines changed

18 files changed

+794
-121
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ConfigurationHelper.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.Locale;
2424
import java.util.Map;
25+
import java.util.function.Function;
2526
import java.util.stream.Collectors;
2627

2728
import org.apache.hadoop.classification.InterfaceAudience;
@@ -123,4 +124,39 @@ public static <E extends Enum<E>> Map<String, E> mapEnumNamesToValues(
123124
return mapping;
124125
}
125126

127+
/**
128+
* Look up an enum from the configuration option and map it to
129+
* a value in the supplied enum class.
130+
* If no value is supplied or there is no match for the supplied value,
131+
* the fallback function is invoked, passing in the trimmed and possibly
132+
* empty string of the value.
133+
* Extends {link {@link Configuration#getEnum(String, Enum)}}
134+
* by adding case independence and a lambda expression for fallback,
135+
* rather than a default value.
136+
* @param conf configuration
137+
* @param name property name
138+
* @param enumClass classname to resolve
139+
* @param fallback fallback supplier
140+
* @return an enum value
141+
* @param <E> enumeration type.
142+
* @throws IllegalArgumentException If mapping is illegal for the type provided
143+
*/
144+
public static <E extends Enum<E>> E resolveEnum(
145+
Configuration conf,
146+
String name,
147+
Class<E> enumClass,
148+
Function<String, E> fallback) {
149+
150+
final String val = conf.getTrimmed(name, "");
151+
152+
// build a map of lower case string to enum values.
153+
final Map<String, E> mapping = mapEnumNamesToValues("", enumClass);
154+
final E mapped = mapping.get(val.toLowerCase(Locale.ROOT));
155+
if (mapped != null) {
156+
return mapped;
157+
} else {
158+
// fallback handles it
159+
return fallback.apply(val);
160+
}
161+
}
126162
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.hadoop.classification.InterfaceAudience;
2222
import org.apache.hadoop.classification.InterfaceStability;
2323
import org.apache.hadoop.fs.Options;
24-
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
2524
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
2625
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
2726

@@ -1594,30 +1593,48 @@ private Constants() {
15941593
StreamIntegration.CLASSIC;
15951594

15961595
/**
1597-
* The prefetching input stream: "prefetch".
1596+
* The prefetching input stream: {@value}.
15981597
*/
15991598
public static final String INPUT_STREAM_TYPE_PREFETCH = StreamIntegration.PREFETCH;
16001599

16011600
/**
1602-
* The analytics input stream: "analytics".
1601+
* The analytics input stream: {@value}.
16031602
*/
16041603
public static final String INPUT_STREAM_TYPE_ANALYTICS =
16051604
StreamIntegration.ANALYTICS;
16061605

16071606
/**
1608-
* The default input stream.
1609-
* Currently {@link #INPUT_STREAM_TYPE_CLASSIC}
1607+
* Request the default input stream,
1608+
* whatever it is for this release: {@value}.
16101609
*/
1611-
public static final String INPUT_STREAM_TYPE_DEFAULT = InputStreamType.DEFAULT_STREAM_TYPE.getName();
1610+
public static final String INPUT_STREAM_TYPE_DEFAULT = StreamIntegration.DEFAULT;
1611+
1612+
/**
1613+
* The custom input stream type: {@value}".
1614+
* If set, the classname is loaded from
1615+
* {@link #INPUT_STREAM_CUSTOM_FACTORY}.
1616+
* <p>
1617+
* This option is primarily for testing as it can
1618+
* be used to generated failures.
1619+
*/
1620+
public static final String INPUT_STREAM_TYPE_CUSTOM =
1621+
StreamIntegration.CUSTOM;
1622+
1623+
/**
1624+
* Classname of the factory to instantiate for custom streams: {@value}.
1625+
*/
1626+
public static final String INPUT_STREAM_CUSTOM_FACTORY = "fs.s3a.input.stream.custom.factory";
16121627

16131628
/**
16141629
* Controls whether the prefetching input stream is enabled.
16151630
*/
1631+
@Deprecated
16161632
public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled";
16171633

16181634
/**
16191635
* Default option as to whether the prefetching input stream is enabled.
16201636
*/
1637+
@Deprecated
16211638
public static final boolean PREFETCH_ENABLED_DEFAULT = false;
16221639

16231640
// If the default values are used, each file opened for reading will consume

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@
148148
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
149149
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
150150
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
151-
import org.apache.hadoop.fs.s3a.impl.streams.StreamThreadOptions;
151+
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
152152
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
153153
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
154154
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -369,12 +369,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
369369
/** Vectored IO context. */
370370
private VectoredIOContext vectoredIOContext;
371371

372-
/**
373-
* Maximum number of active range read operation a single
374-
* input stream can have.
375-
*/
376-
private int vectoredActiveRangeReads;
377-
378372
private long readAhead;
379373
private ChangeDetectionPolicy changeDetectionPolicy;
380374
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -755,9 +749,6 @@ public void initialize(URI name, Configuration originalConf)
755749
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
756750
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
757751
inputPolicy);
758-
vectoredActiveRangeReads = intOption(conf,
759-
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
760-
vectoredIOContext = populateVectoredIOContext(conf);
761752
scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
762753
optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
763754
OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
@@ -772,6 +763,11 @@ public void initialize(URI name, Configuration originalConf)
772763
// this is to aid mocking.
773764
s3Client = getStore().getOrCreateS3Client();
774765

766+
final StreamFactoryRequirements factoryRequirements =
767+
getStore().factoryRequirements();
768+
// get the vector IO context from the factory.
769+
vectoredIOContext = factoryRequirements.vectoredIOContext();
770+
775771
// thread pool init requires store to be created
776772
initThreadPools();
777773

@@ -843,23 +839,6 @@ protected S3AStore createS3AStore(final ClientManager clientManager,
843839
return st;
844840
}
845841

846-
/**
847-
* Populates the configurations related to vectored IO operation
848-
* in the context which has to passed down to input streams.
849-
* @param conf configuration object.
850-
* @return VectoredIOContext.
851-
*/
852-
private VectoredIOContext populateVectoredIOContext(Configuration conf) {
853-
final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
854-
DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
855-
final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
856-
DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
857-
return new VectoredIOContext()
858-
.setMinSeekForVectoredReads(minSeekVectored)
859-
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
860-
.build();
861-
}
862-
863842
/**
864843
* Test bucket existence in S3.
865844
* When the value of {@link Constants#S3A_BUCKET_PROBE} is set to 0,
@@ -950,9 +929,9 @@ private void initThreadPools() {
950929
TimeUnit.SECONDS,
951930
Duration.ZERO).getSeconds();
952931

953-
final StreamThreadOptions threadRequirements =
954-
getStore().threadRequirements();
955-
int numPrefetchThreads = threadRequirements.sharedThreads();
932+
final StreamFactoryRequirements factoryRequirements =
933+
getStore().factoryRequirements();
934+
int numPrefetchThreads = factoryRequirements.sharedThreads();
956935

957936
int activeTasksForBoundedThreadPool = maxThreads;
958937
int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
@@ -970,7 +949,7 @@ private void initThreadPools() {
970949
unboundedThreadPool.allowCoreThreadTimeOut(true);
971950
executorCapacity = intOption(conf,
972951
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
973-
if (threadRequirements.createFuturePool()) {
952+
if (factoryRequirements.createFuturePool()) {
974953
// create a future pool.
975954
final S3AInputStreamStatistics s3AInputStreamStatistics =
976955
statisticsContext.newInputStreamStatistics();
@@ -1860,13 +1839,13 @@ private FSDataInputStream executeOpen(
18601839
LOG.debug("Opening '{}'", readContext);
18611840

18621841
// what does the stream need
1863-
final StreamThreadOptions requirements =
1864-
getStore().threadRequirements();
1842+
final StreamFactoryRequirements requirements =
1843+
getStore().factoryRequirements();
18651844

18661845
// calculate the permit count.
18671846
final int permitCount = requirements.streamThreads() +
18681847
(requirements.vectorSupported()
1869-
? vectoredActiveRangeReads
1848+
? requirements.vectoredIOContext().getVectoredActiveRangeReads()
18701849
: 0);
18711850
// create an executor which is a subset of the
18721851
// bounded thread pool.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,6 @@ public class S3AInputStream extends ObjectInputStream implements CanSetReadahead
146146

147147
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
148148

149-
/** Vectored IO context. */
150-
private final VectoredIOContext vectoredIOContext;
151-
152149
/**
153150
* This is the actual position within the object, used by
154151
* lazy seek to decide whether to seek on the next read or not.
@@ -195,7 +192,6 @@ public S3AInputStream(ObjectReadParameters parameters) {
195192
getObjectAttributes());
196193
setReadahead(context.getReadahead());
197194
this.asyncDrainThreshold = context.getAsyncDrainThreshold();
198-
this.vectoredIOContext = this.getContext().getVectoredIOContext();
199195
}
200196

201197
/**
@@ -764,6 +760,7 @@ public String toString() {
764760
synchronized (this) {
765761
final StringBuilder sb = new StringBuilder(
766762
"S3AInputStream{");
763+
sb.append(super.toString()).append(" ");
767764
sb.append(getUri());
768765
sb.append(" wrappedStream=")
769766
.append(isObjectStreamOpen() ? "open" : "closed");
@@ -776,7 +773,7 @@ public String toString() {
776773
sb.append(" remainingInCurrentRequest=")
777774
.append(remainingInCurrentRequest());
778775
sb.append(" ").append(changeTracker);
779-
sb.append(" ").append(vectoredIOContext);
776+
sb.append(" ").append(getVectoredIOContext());
780777
sb.append('\n').append(s);
781778
sb.append('}');
782779
return sb.toString();
@@ -826,22 +823,6 @@ public void readFully(long position, byte[] buffer, int offset, int length)
826823
}
827824
}
828825

829-
/**
830-
* {@inheritDoc}.
831-
*/
832-
@Override
833-
public int minSeekForVectorReads() {
834-
return vectoredIOContext.getMinSeekForVectorReads();
835-
}
836-
837-
/**
838-
* {@inheritDoc}.
839-
*/
840-
@Override
841-
public int maxReadSizeForVectorReads() {
842-
return vectoredIOContext.getMaxReadSizeForVectorReads();
843-
}
844-
845826
/**
846827
* {@inheritDoc}
847828
* Vectored read implementation for S3AInputStream.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ public class VectoredIOContext {
4040
*/
4141
private int maxReadSizeForVectorReads;
4242

43+
/**
44+
* Maximum number of active range read operation a single
45+
* input stream can have.
46+
*/
47+
private int vectoredActiveRangeReads;
48+
4349
/**
4450
* Default no arg constructor.
4551
*/
@@ -68,11 +74,22 @@ public int getMaxReadSizeForVectorReads() {
6874
return maxReadSizeForVectorReads;
6975
}
7076

77+
public int getVectoredActiveRangeReads() {
78+
return vectoredActiveRangeReads;
79+
}
80+
81+
public VectoredIOContext setVectoredActiveRangeReads(
82+
final int vectoredActiveRangeReads) {
83+
this.vectoredActiveRangeReads = vectoredActiveRangeReads;
84+
return this;
85+
}
86+
7187
@Override
7288
public String toString() {
7389
return "VectoredIOContext{" +
7490
"minSeekForVectorReads=" + minSeekForVectorReads +
7591
", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
92+
", vectoredActiveRangeReads=" + vectoredActiveRangeReads +
7693
'}';
7794
}
7895
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,12 @@
7373
import org.apache.hadoop.fs.s3a.UploadInfo;
7474
import org.apache.hadoop.fs.s3a.api.RequestFactory;
7575
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
76+
import org.apache.hadoop.fs.s3a.impl.streams.FactoryBindingParameters;
77+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
7678
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
7779
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
7880
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
79-
import org.apache.hadoop.fs.s3a.impl.streams.StreamThreadOptions;
81+
import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements;
8082
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
8183
import org.apache.hadoop.fs.statistics.DurationTracker;
8284
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -111,7 +113,7 @@
111113
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE;
112114
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
113115
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
114-
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.createStreamFactory;
116+
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.factoryFromConfig;
115117
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
116118
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
117119
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
@@ -233,7 +235,7 @@ protected void serviceInit(final Configuration conf) throws Exception {
233235

234236
// create and register the stream factory, which will
235237
// then follow the service lifecycle
236-
objectInputStreamFactory = createStreamFactory(conf);
238+
objectInputStreamFactory = factoryFromConfig(conf);
237239
addService(objectInputStreamFactory);
238240

239241
// init all child services, including the stream factory
@@ -269,7 +271,6 @@ public boolean hasPathCapability(final Path path, final String capability) {
269271
}
270272
}
271273

272-
273274
/**
274275
* Return the capabilities of input streams created
275276
* through the store.
@@ -955,7 +956,7 @@ private void finishStreamFactoryInit() {
955956
"Client Manager is in wrong state: %s", clientManager.getServiceState());
956957

957958
// finish initialization and pass down callbacks to self
958-
objectInputStreamFactory.bind(new FactoryCallbacks());
959+
objectInputStreamFactory.bind(new FactoryBindingParameters(new FactoryCallbacks()));
959960
}
960961

961962
@Override /* ObjectInputStreamFactory */
@@ -966,21 +967,25 @@ public ObjectInputStream readObject(ObjectReadParameters parameters)
966967
}
967968

968969
@Override /* ObjectInputStreamFactory */
969-
public StreamThreadOptions threadRequirements() {
970-
return objectInputStreamFactory.threadRequirements();
970+
public StreamFactoryRequirements factoryRequirements() {
971+
return objectInputStreamFactory.factoryRequirements();
971972
}
972973

973974
/**
974975
* This operation is not implemented, as
975976
* is this class which invokes it on the actual factory.
976-
* @param callbacks factory callbacks.
977-
* @throws UnsupportedOperationException always
977+
* @param factoryBindingParameters@throws UnsupportedOperationException always
978978
*/
979979
@Override /* ObjectInputStreamFactory */
980-
public void bind(final StreamFactoryCallbacks callbacks) {
980+
public void bind(final FactoryBindingParameters factoryBindingParameters) {
981981
throw new UnsupportedOperationException("Not supported");
982982
}
983983

984+
@Override /* ObjectInputStreamFactory */
985+
public InputStreamType streamType() {
986+
return objectInputStreamFactory.streamType();
987+
}
988+
984989
/**
985990
* Callbacks from {@link ObjectInputStreamFactory} instances.
986991
*/

0 commit comments

Comments
 (0)