Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19354. S3A: S3AInputStream to be created by factory under S3AStore (#7214) #7412

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -123,4 +124,39 @@ public static <E extends Enum<E>> Map<String, E> mapEnumNamesToValues(
return mapping;
}

/**
* Look up an enum from the configuration option and map it to
* a value in the supplied enum class.
* If no value is supplied or there is no match for the supplied value,
* the fallback function is invoked, passing in the trimmed and possibly
* empty string of the value.
* Extends {link {@link Configuration#getEnum(String, Enum)}}
* by adding case independence and a lambda expression for fallback,
* rather than a default value.
* @param conf configuration
* @param name property name
* @param enumClass classname to resolve
* @param fallback fallback supplier
* @param <E> enumeration type.
* @return an enum value
* @throws IllegalArgumentException If mapping is illegal for the type provided
*/
public static <E extends Enum<E>> E resolveEnum(
Configuration conf,
String name,
Class<E> enumClass,
Function<String, E> fallback) {

final String val = conf.getTrimmed(name, "");

// build a map of lower case string to enum values.
final Map<String, E> mapping = mapEnumNamesToValues("", enumClass);
final E mapped = mapping.get(val.toLowerCase(Locale.ROOT));
if (mapped != null) {
return mapped;
} else {
// fallback handles it
return fallback.apply(val);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.hadoop.util.ConfigurationHelper.ERROR_MULTIPLE_ELEMENTS_MATCHING_TO_LOWER_CASE_VALUE;
import static org.apache.hadoop.util.ConfigurationHelper.mapEnumNamesToValues;
import static org.apache.hadoop.util.ConfigurationHelper.parseEnumSet;
import static org.apache.hadoop.util.ConfigurationHelper.resolveEnum;

/**
* Test for {@link ConfigurationHelper}.
Expand All @@ -43,6 +44,12 @@ public class TestConfigurationHelper extends AbstractHadoopTestBase {
*/
private enum SimpleEnum { a, b, c, i }

/**
* Upper case version of SimpleEnum.
* "i" is included for case tests, as it is special in turkey.
*/
private enum UppercaseEnum { A, B, C, I }


/**
* Special case: an enum with no values.
Expand Down Expand Up @@ -171,4 +178,65 @@ public void testDuplicateValues() {
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c);
}

@Test
public void testResolveEnumGood() throws Throwable {
assertEnumResolution("c", SimpleEnum.c);
}

@Test
public void testResolveEnumTrimmed() throws Throwable {
// strings are trimmed at each end
assertEnumResolution("\n i \n ", SimpleEnum.i);
}

@Test
public void testResolveEnumCaseConversion() throws Throwable {
assertEnumResolution("C", SimpleEnum.c);
}

@Test
public void testResolveEnumNoMatch() throws Throwable {
assertEnumResolution("other", null);
}

@Test
public void testResolveEnumEmpty() throws Throwable {
assertEnumResolution("", null);
}

@Test
public void testResolveEnumUpperCaseConversion() throws Throwable {
assertUpperEnumResolution("C", UppercaseEnum.C);
}

@Test
public void testResolveLowerToUpperCaseConversion() throws Throwable {
assertUpperEnumResolution("i", UppercaseEnum.I);
}

/**
* Assert that a string value in a configuration resolves to the expected
* value.
* @param value value to set
* @param expected expected outcome, set to null for no resolution.
*/
private void assertEnumResolution(final String value, final SimpleEnum expected) {
Assertions.assertThat(resolveEnum(confWithKey(value),
"key", SimpleEnum.class, (v) -> null))
.describedAs("Resolution of %s", value)
.isEqualTo(expected);
}

/**
* Equivalent for Uppercase Enum.
* @param value value to set
* @param expected expected outcome, set to null for no resolution.
*/
private void assertUpperEnumResolution(final String value, UppercaseEnum expected) {
Assertions.assertThat(resolveEnum(confWithKey(value),
"key", UppercaseEnum.class, (v) -> null))
.describedAs("Resolution of %s", value)
.isEqualTo(expected);
}

}
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</Match>
<!-- we are using completable futures, so ignore the Future which submit() returns -->
<Match>
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
<Class name="org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>

Expand Down
35 changes: 24 additions & 11 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
<!-- marker retention policy -->
<fs.s3a.directory.marker.retention></fs.s3a.directory.marker.retention>

<!-- Is prefetch enabled? -->
<fs.s3a.prefetch.enabled>unset</fs.s3a.prefetch.enabled>
<!-- stream type to use in tests; passed down in fs.s3a.input.stream.type -->
<stream>classic</stream>
<!-- Job ID; allows for parallel jobs on same bucket -->
<!-- job.id is used to build the path for tests; default is 00.-->
<job.id>00</job.id>
Expand Down Expand Up @@ -130,8 +130,8 @@
<!-- Markers-->
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>
<fs.s3a.directory.marker.audit>${fs.s3a.directory.marker.audit}</fs.s3a.directory.marker.audit>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down Expand Up @@ -171,8 +171,8 @@
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>

<test.default.timeout>${test.integration.timeout}</test.default.timeout>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
<!-- are root tests enabled. Set to false when running parallel jobs on same bucket -->
<fs.s3a.root.tests.enabled>${root.tests.enabled}</fs.s3a.root.tests.enabled>

Expand Down Expand Up @@ -225,8 +225,8 @@
<!-- Markers-->
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>
<fs.s3a.directory.marker.audit>${fs.s3a.directory.marker.audit}</fs.s3a.directory.marker.audit>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
<!-- are root tests enabled. Set to false when running parallel jobs on same bucket -->
<fs.s3a.root.tests.enabled>${root.tests.enabled}</fs.s3a.root.tests.enabled>
<test.unique.fork.id>job-${job.id}</test.unique.fork.id>
Expand Down Expand Up @@ -289,8 +289,8 @@
<!-- Markers-->
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>
<fs.s3a.directory.marker.audit>${fs.s3a.directory.marker.audit}</fs.s3a.directory.marker.audit>
<!-- Prefetch -->
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
<!-- Stream Type -->
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
<test.unique.fork.id>job-${job.id}</test.unique.fork.id>
</systemPropertyVariables>
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
Expand Down Expand Up @@ -362,7 +362,20 @@
</property>
</activation>
<properties>
<fs.s3a.prefetch.enabled>true</fs.s3a.prefetch.enabled>
<stream>prefetch</stream>
</properties>
</profile>

<!-- Switch to the analytics input stream-->
<profile>
<id>analytics</id>
<activation>
<property>
<name>analytics</name>
</property>
</activation>
<properties>
<stream>analytics</stream>
</properties>
</profile>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import java.time.Duration;
Expand Down Expand Up @@ -1563,14 +1564,60 @@ private Constants() {
*/
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";

/**
* Input stream type: {@value}.
*/
public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";

/**
* The classic input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_CLASSIC =
StreamIntegration.CLASSIC;

/**
* The prefetching input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_PREFETCH = StreamIntegration.PREFETCH;

/**
* The analytics input stream: {@value}.
*/
public static final String INPUT_STREAM_TYPE_ANALYTICS =
StreamIntegration.ANALYTICS;

/**
* Request the default input stream,
* whatever it is for this release: {@value}.
*/
public static final String INPUT_STREAM_TYPE_DEFAULT = StreamIntegration.DEFAULT;

/**
* The custom input stream type: {@value}".
* If set, the classname is loaded from
* {@link #INPUT_STREAM_CUSTOM_FACTORY}.
* <p>
* This option is primarily for testing as it can
* be used to generated failures.
*/
public static final String INPUT_STREAM_TYPE_CUSTOM =
StreamIntegration.CUSTOM;

/**
* Classname of the factory to instantiate for custom streams: {@value}.
*/
public static final String INPUT_STREAM_CUSTOM_FACTORY = "fs.s3a.input.stream.custom.factory";

/**
* Controls whether the prefetching input stream is enabled.
*/
@Deprecated
public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled";

/**
* Default option as to whether the prefetching input stream is enabled.
*/
@Deprecated
public static final boolean PREFETCH_ENABLED_DEFAULT = false;

// If the default values are used, each file opened for reading will consume
Expand Down
Loading