diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index ff330e52dc01e..bd204b08a612f 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -188,8 +188,6 @@ **/ITestDynamoDBMetadataStoreScale.java **/ITestTerasort*.java - - **/ITest*CommitMRJob.java **/ITestS3GuardDDBRootOperations.java @@ -231,8 +229,6 @@ **/ITestTerasort*.java - - **/ITest*CommitMRJob.java **/ITestS3AContractRootDir.java **/ITestS3GuardDDBRootOperations.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 7ec447863c150..833edd4a6b022 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -677,7 +677,8 @@ protected int commitTaskInternal(final TaskAttemptContext context, // we will try to abort the ones that had already succeeded. int commitCount = taskOutput.size(); final Queue commits = new ConcurrentLinkedQueue<>(); - LOG.info("{}: uploading from staging directory to S3", getRole()); + LOG.info("{}: uploading from staging directory to S3 {}", getRole(), + attemptPath); LOG.info("{}: Saving pending data information to {}", getRole(), commitsAttemptPath); if (taskOutput.isEmpty()) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java index c5fb967863953..c41715bd497d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java @@ -34,7 +34,7 @@ private StagingCommitterConstants() { /** * The temporary path for staging data, if not explicitly set. * By using an unqualified path, this will be qualified to be relative - * to the users' home directory, so protectec from access for others. + * to the users' home directory, so protected from access for others. */ public static final String FILESYSTEM_TEMP_PATH = "tmp/staging"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 6023daa13875b..1cf3fb4a3f65f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import com.amazonaws.services.s3.AmazonS3; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +112,9 @@ protected Configuration createConfiguration() { MAGIC_COMMITTER_ENABLED, S3A_COMMITTER_FACTORY_KEY, FS_S3A_COMMITTER_NAME, - FS_S3A_COMMITTER_STAGING_CONFLICT_MODE); + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, + FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + FAST_UPLOAD_BUFFER); conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); @@ -209,6 +212,7 @@ public static String randomJobId() throws Exception { */ @Override public void teardown() throws Exception { + Thread.currentThread().setName("teardown"); LOG.info("AbstractCommitITest::teardown"); waitForConsistency(); // make sure there are no failures any more @@ -359,7 +363,7 @@ private String pathToPrefix(Path path) { * @throws IOException IO Failure */ protected SuccessData verifySuccessMarker(Path dir) throws IOException { - return validateSuccessFile(dir, "", getFileSystem(), "query"); + return validateSuccessFile(dir, "", getFileSystem(), "query", 0); } /** @@ -437,13 +441,15 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId, * @param committerName name of committer to match, or "" * @param fs filesystem * @param origin origin (e.g. "teragen" for messages) + * @param minimumFileCount minimum number of files to have been created * @return the success data * @throws IOException IO failure */ public static SuccessData validateSuccessFile(final Path outputPath, final String committerName, final S3AFileSystem fs, - final String origin) throws IOException { + final String origin, + final int minimumFileCount) throws IOException { SuccessData successData = loadSuccessFile(fs, outputPath, origin); String commitDetails = successData.toString(); LOG.info("Committer name " + committerName + "\n{}", @@ -456,6 +462,9 @@ public static SuccessData validateSuccessFile(final Path outputPath, assertEquals("Wrong committer in " + commitDetails, committerName, successData.getCommitter()); } + Assertions.assertThat(successData.getFilenames()) + .describedAs("Files committed") + .hasSizeGreaterThanOrEqualTo(minimumFileCount); return successData; } @@ -485,8 +494,9 @@ public static SuccessData loadSuccessFile(final S3AFileSystem fs, status.isFile()); assertTrue("0 byte success file " + success + " from " + origin - + "; a s3guard committer was not used", + + "; an S3A committer was not used", status.getLen() > 0); + LOG.info("Loading committer success file {}", success); return SuccessData.load(fs, success); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java deleted file mode 100644 index 1a518474bcb03..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import com.google.common.collect.Sets; -import org.assertj.core.api.Assertions; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3AUtils; -import org.apache.hadoop.fs.s3a.commit.files.SuccessData; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.DurationInfo; - -import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; -import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; - -/** - * Test for an MR Job with all the different committers. - */ -public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest { - - private static final Logger LOG = - LoggerFactory.getLogger(AbstractITCommitMRJob.class); - - @Override - protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - disableFilesystemCaching(conf); - return conf; - } - - @Rule - public final TemporaryFolder temp = new TemporaryFolder(); - - @Test - public void testMRJob() throws Exception { - describe("Run a simple MR Job"); - - S3AFileSystem fs = getFileSystem(); - // final dest is in S3A - Path outputPath = path(getMethodName()); - // create and delete to force in a tombstone marker -see HADOOP-16207 - fs.mkdirs(outputPath); - fs.delete(outputPath, true); - - String commitUUID = UUID.randomUUID().toString(); - String suffix = isUniqueFilenames() ? ("-" + commitUUID) : ""; - int numFiles = getTestFileCount(); - List expectedFiles = new ArrayList<>(numFiles); - Set expectedKeys = Sets.newHashSet(); - for (int i = 0; i < numFiles; i += 1) { - File file = temp.newFile(i + ".text"); - try (FileOutputStream out = new FileOutputStream(file)) { - out.write(("file " + i).getBytes(StandardCharsets.UTF_8)); - } - String filename = String.format("part-m-%05d%s", i, suffix); - Path path = new Path(outputPath, filename); - expectedFiles.add(path.toString()); - expectedKeys.add("/" + fs.pathToKey(path)); - } - Collections.sort(expectedFiles); - - Job mrJob = createJob(); - JobConf jobConf = (JobConf) mrJob.getConfiguration(); - - mrJob.setOutputFormatClass(LoggingTextOutputFormat.class); - FileOutputFormat.setOutputPath(mrJob, outputPath); - - File mockResultsFile = temp.newFile("committer.bin"); - mockResultsFile.delete(); - String committerPath = "file:" + mockResultsFile; - jobConf.set("mock-results-file", committerPath); - jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID); - jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "/staging"); - - mrJob.setInputFormatClass(TextInputFormat.class); - FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI())); - - mrJob.setMapperClass(MapClass.class); - mrJob.setNumReduceTasks(0); - - // an attempt to set up log4j properly, which clearly doesn't work - URL log4j = getClass().getClassLoader().getResource("log4j.properties"); - if (log4j != null && log4j.getProtocol().equals("file")) { - Path log4jPath = new Path(log4j.toURI()); - LOG.debug("Using log4j path {}", log4jPath); - mrJob.addFileToClassPath(log4jPath); - String sysprops = String.format("-Xmx256m -Dlog4j.configuration=%s", - log4j); - jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops); - jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops); - jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops); - } - - applyCustomConfigOptions(jobConf); - // fail fast if anything goes wrong - mrJob.setMaxMapAttempts(1); - - mrJob.submit(); - try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) { - boolean succeeded = mrJob.waitForCompletion(true); - assertTrue("MR job failed", succeeded); - } - - waitForConsistency(); - verifyPathExists(fs, - "MR job Output directory not found," - + " even though the job did not report a failure", - outputPath); - assertIsDirectory(outputPath); - FileStatus[] results = fs.listStatus(outputPath, - S3AUtils.HIDDEN_FILE_FILTER); - int fileCount = results.length; - List actualFiles = new ArrayList<>(fileCount); - assertTrue("No files in output directory", fileCount != 0); - LOG.info("Found {} files", fileCount); - for (FileStatus result : results) { - LOG.debug("result: {}", result); - actualFiles.add(result.getPath().toString()); - } - Collections.sort(actualFiles); - - SuccessData successData = validateSuccessFile(outputPath, committerName(), - fs, "MR job"); - List successFiles = successData.getFilenames(); - String commitData = successData.toString(); - assertFalse("No filenames in " + commitData, - successFiles.isEmpty()); - - Assertions.assertThat(actualFiles) - .describedAs("Committed files in the job output directory") - .containsExactlyInAnyOrderElementsOf(expectedFiles); - - Assertions.assertThat(successFiles) - .describedAs("List of committed files in %s", commitData) - .containsExactlyInAnyOrderElementsOf(expectedKeys); - - assertPathDoesNotExist("temporary dir", - new Path(outputPath, CommitConstants.TEMPORARY)); - customPostExecutionValidation(outputPath, successData); - } - - /** - * Test Mapper. - * This is executed in separate process, and must not make any assumptions - * about external state. - */ - public static class MapClass - extends Mapper { - - private int operations; - private String id = ""; - private LongWritable l = new LongWritable(); - private Text t = new Text(); - - @Override - protected void setup(Context context) - throws IOException, InterruptedException { - super.setup(context); - // force in Log4J logging - org.apache.log4j.BasicConfigurator.configure(); - boolean scaleMap = context.getConfiguration() - .getBoolean(KEY_SCALE_TESTS_ENABLED, false); - operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS; - id = context.getTaskAttemptID().toString(); - } - - @Override - protected void map(LongWritable key, Text value, Context context) - throws IOException, InterruptedException { - for (int i = 0; i < operations; i++) { - l.set(i); - t.set(String.format("%s:%05d", id, i)); - context.write(l, t); - } - } - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java index 2e8f1f090403c..783c62686bad7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java @@ -19,8 +19,12 @@ package org.apache.hadoop.fs.s3a.commit; import java.io.IOException; -import java.util.UUID; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,19 +37,22 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration; import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES; /** * Full integration test MR jobs. * - * This is all done on shared static mini YARN and HDFS clusters, set up before - * any of the tests methods run. + * This is all done on shared static mini YARN and (optionally) HDFS clusters, + * set up before any of the tests methods run. * * To isolate tests properly for parallel test runs, that static state * needs to be stored in the final classes implementing the tests, and @@ -61,38 +68,54 @@ * If two subclasses of this class are instantiated in the same JVM, in order, * they are guaranteed to be isolated. * - * History: this is a superclass extracted from - * {@link AbstractITCommitMRJob} while adding support for testing terasorting. - * */ public abstract class AbstractYarnClusterITest extends AbstractCommitITest { private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnClusterITest.class); - private static final int TEST_FILE_COUNT = 2; - private static final int SCALE_TEST_FILE_COUNT = 50; + private static final int TEST_FILE_COUNT = 1; + private static final int SCALE_TEST_FILE_COUNT = 10; - public static final int SCALE_TEST_KEYS = 1000; + public static final int SCALE_TEST_KEYS = 100; public static final int BASE_TEST_KEYS = 10; + public static final int NO_OF_NODEMANAGERS = 2; + private boolean scaleTest; - private boolean uniqueFilenames = false; + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + + @AfterClass + public static void teardownClusters() throws IOException { + terminateCluster(clusterBinding); + clusterBinding = null; + } /** * This is the cluster binding which every subclass must create. */ protected static final class ClusterBinding { + private String clusterName; + private final MiniDFSClusterService hdfs; private final MiniMRYarnCluster yarn; public ClusterBinding( + final String clusterName, final MiniDFSClusterService hdfs, final MiniMRYarnCluster yarn) { - this.hdfs = checkNotNull(hdfs); + this.clusterName = clusterName; + this.hdfs = hdfs; this.yarn = checkNotNull(yarn); } @@ -100,6 +123,18 @@ public MiniDFSClusterService getHdfs() { return hdfs; } + /** + * Get the cluster FS, which will either be HDFS or the local FS. + * @return a filesystem. + * @throws IOException failure + */ + public FileSystem getClusterFS() throws IOException { + MiniDFSClusterService miniCluster = getHdfs(); + return miniCluster != null + ? miniCluster.getClusterFS() + : FileSystem.getLocal(yarn.getConfig()); + } + public MiniMRYarnCluster getYarn() { return yarn; } @@ -108,6 +143,10 @@ public Configuration getConf() { return getYarn().getConfig(); } + public String getClusterName() { + return clusterName; + } + public void terminate() { terminateService(getYarn()); terminateService(getHdfs()); @@ -115,74 +154,111 @@ public void terminate() { } /** - * Create the cluster binding. This must be done in - * class setup of the (final) subclass. - * The HDFS and YARN clusters share the same configuration, so + * Create the cluster binding. + * The configuration will be patched by propagating down options + * from the maven build (S3Guard binding etc) and turning off unwanted + * YARN features. + * + * If an HDFS cluster is requested, + * the HDFS and YARN clusters will share the same configuration, so * the HDFS cluster binding is implicitly propagated to YARN. + * If one is not requested, the local filesystem is used as the cluster FS. * @param conf configuration to start with. + * @param useHDFS should an HDFS cluster be instantiated. * @return the cluster binding. * @throws IOException failure. */ - protected static ClusterBinding createCluster(JobConf conf) - throws IOException { - + protected static ClusterBinding createCluster( + final JobConf conf, + final boolean useHDFS) throws IOException { + prepareTestConfiguration(conf); conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE); - - // create a unique cluster name. - String clusterName = "yarn-" + UUID.randomUUID(); - MiniDFSClusterService miniDFSClusterService = deployService(conf, - new MiniDFSClusterService()); + // minicluster tests overreact to not enough disk space. + conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false); + conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100); + // create a unique cluster name based on the current time in millis. + String timestamp = LocalDateTime.now().format( + DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS")); + String clusterName = "yarn-" + timestamp; + MiniDFSClusterService miniDFSClusterService = + useHDFS + ? deployService(conf, new MiniDFSClusterService()) + : null; MiniMRYarnCluster yarnCluster = deployService(conf, - new MiniMRYarnCluster(clusterName, 2)); - return new ClusterBinding(miniDFSClusterService, yarnCluster); + new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS)); + return new ClusterBinding(clusterName, miniDFSClusterService, yarnCluster); } - protected static void terminateCluster(ClusterBinding clusterBinding) { - if (clusterBinding != null) { - clusterBinding.terminate(); + /** + * Terminate the cluster if it is not null. + * @param cluster the cluster + */ + protected static void terminateCluster(ClusterBinding cluster) { + if (cluster != null) { + cluster.terminate(); } } /** - * Get the cluster binding for this subclass - * @return + * Get the cluster binding for this subclass. + * @return the cluster binding */ - protected abstract ClusterBinding getClusterBinding(); - - protected MiniDFSClusterService getHdfs() { - return getClusterBinding().getHdfs(); + protected ClusterBinding getClusterBinding() { + return clusterBinding; } - protected MiniMRYarnCluster getYarn() { return getClusterBinding().getYarn(); } - public FileSystem getLocalFS() { - return getHdfs().getLocalFS(); + /** + * Get the cluster filesystem -hdfs or local. + * @return the filesystem shared across the yarn nodes. + */ + protected FileSystem getClusterFS() throws IOException { + return getClusterBinding().getClusterFS(); } - protected FileSystem getDFS() { - return getHdfs().getClusterFS(); - } + + /** + * We stage work into a temporary directory rather than directly under + * the user's home directory, as that is often rejected by CI test + * runners. + */ + @Rule + public final TemporaryFolder stagingFilesDir = new TemporaryFolder(); /** * The name of the committer as returned by - * {@link AbstractS3ACommitter#getName()} and used for committer construction. + * {@link AbstractS3ACommitter#getName()} + * and used for committer construction. */ protected abstract String committerName(); + /** + * binding on demand rather than in a BeforeClass static method. + * Subclasses can override this to change the binding options. + * @return the cluster binding + */ + protected ClusterBinding demandCreateClusterBinding() throws Exception { + return createCluster(new JobConf(), false); + } + @Override public void setup() throws Exception { super.setup(); - assertNotNull("cluster is not bound", - getClusterBinding()); scaleTest = getTestPropertyBool( getConfiguration(), KEY_SCALE_TESTS_ENABLED, DEFAULT_SCALE_TESTS_ENABLED); + if (getClusterBinding() == null) { + clusterBinding = demandCreateClusterBinding(); + } + assertNotNull("cluster is not bound", + getClusterBinding()); + } @Override @@ -190,28 +266,46 @@ protected int getTestTimeoutMillis() { return SCALE_TEST_TIMEOUT_SECONDS * 1000; } - protected JobConf newJobConf() { - return new JobConf(getYarn().getConfig()); + /** + * Create a job configuration. + * This creates a new job conf from the yarn + * cluster configuration then calls + * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized. + * @return the new job configuration. + * @throws IOException failure + */ + protected JobConf newJobConf() throws IOException { + JobConf jobConf = new JobConf(getYarn().getConfig()); + jobConf.addResource(getConfiguration()); + applyCustomConfigOptions(jobConf); + return jobConf; } - protected Job createJob() throws IOException { - Configuration jobConf = getClusterBinding().getConf(); - jobConf.addResource(getConfiguration()); + protected Job createJob(Configuration jobConf) throws IOException { Job mrJob = Job.getInstance(jobConf, getMethodName()); patchConfigurationForCommitter(mrJob.getConfiguration()); return mrJob; } + /** + * Patch the (job) configuration for this committer. + * @param jobConf configuration to patch + * @return a configuration which will run this configuration. + */ protected Configuration patchConfigurationForCommitter( final Configuration jobConf) { jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, - uniqueFilenames); + isUniqueFilenames()); bindCommitter(jobConf, CommitConstants.S3A_COMMITTER_FACTORY, committerName()); // pass down the scale test flag - jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest); + jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, isScaleTest()); + // and fix the commit dir to the local FS across all workers. + String staging = stagingFilesDir.getRoot().getAbsolutePath(); + LOG.info("Staging temp dir is {}", staging); + jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, staging); return jobConf; } @@ -220,7 +314,7 @@ protected Configuration patchConfigurationForCommitter( * @return the number of mappers to create. */ public int getTestFileCount() { - return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; + return isScaleTest() ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; } /** @@ -258,6 +352,6 @@ public boolean isScaleTest() { } public boolean isUniqueFilenames() { - return uniqueFilenames; + return false; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java new file mode 100644 index 0000000000000..e3e44497d4e6a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.integration; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; +import org.assertj.core.api.Assertions; +import org.junit.FixMethodOrder; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.LoggingTextOutputFormat; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.DurationInfo; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; +import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; +import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test an MR Job with all the different committers. + *

+ * This is a fairly complex parameterization: it is designed to + * avoid the overhead of starting a Yarn cluster for + * individual committer types, so speed up operations. + *

+ * It also implicitly guarantees that there is never more than one of these + * MR jobs active at a time, so avoids overloading the test machine with too + * many processes. + * How the binding works: + *

    + *
  1. + * Each parameterized suite is configured through its own + * {@link CommitterTestBinding} subclass. + *
  2. + *
  3. + * JUnit runs these test suites one parameterized binding at a time. + *
  4. + *
  5. + * The test suites are declared to be executed in ascending order, so + * that for a specific binding, the order is {@link #test_000()}, + * {@link #test_100()} {@link #test_200_execute()} and finally + * {@link #test_500()}. + *
  6. + *
  7. + * {@link #test_000()} calls {@link CommitterTestBinding#validate()} to + * as to validate the state of the committer. This is primarily to + * verify that the binding setup mechanism is working. + *
  8. + *
  9. + * {@link #test_100()} is relayed to + * {@link CommitterTestBinding#test_100()}, + * for any preflight tests. + *
  10. + *
  11. + * The {@link #test_200_execute()} test runs the MR job for that + * particular binding with standard reporting and verification of the + * outcome. + *
  12. + *
  13. + * {@link #test_500()} test is relayed to + * {@link CommitterTestBinding#test_500()}, for any post-MR-job tests. + *
+ * + * A new S3A FileSystem instance is created for each test_ method, so the + * pre-execute and post-execute validators cannot inspect the state of the + * FS as part of their tests. + * However, as the MR workers and AM all run in their own processes, there's + * generally no useful information about the job in the local S3AFileSystem + * instance. + */ +@RunWith(Parameterized.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3ACommitterMRJob.class); + + /** + * Test array for parameterized test runs. + * + * @return the committer binding for this run. + */ + @Parameterized.Parameters(name = "{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {new DirectoryCommitterTestBinding()}, + {new PartitionCommitterTestBinding()}, + {new MagicCommitterTestBinding()}, + }); + } + + /** + * The committer binding for this instance. + */ + private final CommitterTestBinding committerTestBinding; + + /** + * Parameterized constructor. + * @param committerTestBinding binding for the test. + */ + public ITestS3ACommitterMRJob( + final CommitterTestBinding committerTestBinding) { + this.committerTestBinding = committerTestBinding; + } + + @Override + public void setup() throws Exception { + super.setup(); + // configure the test binding for this specific test case. + committerTestBinding.setup(getClusterBinding(), getFileSystem()); + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + disableFilesystemCaching(conf); + return conf; + } + + @Rule + public final TemporaryFolder localFilesDir = new TemporaryFolder(); + + @Override + protected String committerName() { + return committerTestBinding.getCommitterName(); + } + + @Override + public boolean useInconsistentClient() { + return committerTestBinding.useInconsistentClient(); + } + + /** + * Verify that the committer binding is happy. + */ + @Test + public void test_000() throws Throwable { + committerTestBinding.validate(); + + } + @Test + public void test_100() throws Throwable { + committerTestBinding.test_100(); + } + + @Test + public void test_200_execute() throws Exception { + describe("Run an MR with committer %s", committerName()); + + S3AFileSystem fs = getFileSystem(); + // final dest is in S3A + // we can't use the method name as the template places square braces into + // that and URI creation fails. + + Path outputPath = path("ITestS3ACommitterMRJob-execute-"+ committerName()); + // create and delete to force in a tombstone marker -see HADOOP-16207 + fs.mkdirs(outputPath); + fs.delete(outputPath, true); + + String commitUUID = UUID.randomUUID().toString(); + String suffix = isUniqueFilenames() ? ("-" + commitUUID) : ""; + int numFiles = getTestFileCount(); + + // create all the input files on the local FS. + List expectedFiles = new ArrayList<>(numFiles); + Set expectedKeys = Sets.newHashSet(); + for (int i = 0; i < numFiles; i += 1) { + File file = localFilesDir.newFile(i + ".text"); + try (FileOutputStream out = new FileOutputStream(file)) { + out.write(("file " + i).getBytes(StandardCharsets.UTF_8)); + } + String filename = String.format("part-m-%05d%s", i, suffix); + Path path = new Path(outputPath, filename); + expectedFiles.add(path.toString()); + expectedKeys.add("/" + fs.pathToKey(path)); + } + Collections.sort(expectedFiles); + + Job mrJob = createJob(newJobConf()); + JobConf jobConf = (JobConf) mrJob.getConfiguration(); + + mrJob.setOutputFormatClass(LoggingTextOutputFormat.class); + FileOutputFormat.setOutputPath(mrJob, outputPath); + + File mockResultsFile = localFilesDir.newFile("committer.bin"); + mockResultsFile.delete(); + String committerPath = "file:" + mockResultsFile; + jobConf.set("mock-results-file", committerPath); + + // setting up staging options is harmless for other committers + jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID); + + mrJob.setInputFormatClass(TextInputFormat.class); + FileInputFormat.addInputPath(mrJob, + new Path(localFilesDir.getRoot().toURI())); + + mrJob.setMapperClass(MapClass.class); + mrJob.setNumReduceTasks(0); + + // an attempt to set up log4j properly, which clearly doesn't work + URL log4j = getClass().getClassLoader().getResource("log4j.properties"); + if (log4j != null && "file".equals(log4j.getProtocol())) { + Path log4jPath = new Path(log4j.toURI()); + LOG.debug("Using log4j path {}", log4jPath); + mrJob.addFileToClassPath(log4jPath); + String sysprops = String.format("-Xmx128m -Dlog4j.configuration=%s", + log4j); + jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops); + jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops); + jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops); + } + + applyCustomConfigOptions(jobConf); + // fail fast if anything goes wrong + mrJob.setMaxMapAttempts(1); + + try (DurationInfo ignore = new DurationInfo(LOG, "Job Submit")) { + mrJob.submit(); + } + String jobID = mrJob.getJobID().toString(); + String logLocation = "logs under " + + getYarn().getTestWorkDir().getAbsolutePath(); + try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) { + mrJob.waitForCompletion(true); + } + JobStatus status = mrJob.getStatus(); + if (!mrJob.isSuccessful()) { + // failure of job. + // be as meaningful as possible. + String message = + String.format("Job %s failed in state %s with cause %s.\n" + + "Consult %s", + jobID, + status.getState(), + status.getFailureInfo(), + logLocation); + LOG.error(message); + fail(message); + } + + waitForConsistency(); + Path successPath = new Path(outputPath, _SUCCESS); + SuccessData successData = validateSuccessFile(outputPath, + committerName(), + fs, + "MR job " + jobID, + 1); + String commitData = successData.toString(); + + FileStatus[] results = fs.listStatus(outputPath, + S3AUtils.HIDDEN_FILE_FILTER); + int fileCount = results.length; + Assertions.assertThat(fileCount) + .describedAs("No files from job %s in output directory %s; see %s", + jobID, + outputPath, + logLocation) + .isNotEqualTo(0); + + List actualFiles = Arrays.stream(results) + .map(s -> s.getPath().toString()) + .sorted() + .collect(Collectors.toList()); + + Assertions.assertThat(actualFiles) + .describedAs("Files found in %s", outputPath) + .isEqualTo(expectedFiles); + + Assertions.assertThat(successData.getFilenames()) + .describedAs("Success files listed in %s:%s", + successPath, commitData) + .isNotEmpty() + .containsExactlyInAnyOrderElementsOf(expectedKeys); + + assertPathDoesNotExist("temporary dir should only be from" + + " classic file committers", + new Path(outputPath, CommitConstants.TEMPORARY)); + customPostExecutionValidation(outputPath, successData); + } + + @Override + protected void applyCustomConfigOptions(final JobConf jobConf) + throws IOException { + committerTestBinding.applyCustomConfigOptions(jobConf); + } + + @Override + protected void customPostExecutionValidation(final Path destPath, + final SuccessData successData) throws Exception { + committerTestBinding.validateResult(destPath, successData); + } + + /** + * This is the extra test which committer test bindings can add. + */ + @Test + public void test_500() throws Throwable { + committerTestBinding.test_500(); + } + + /** + * Test Mapper. + * This is executed in separate process, and must not make any assumptions + * about external state. + */ + public static class MapClass + extends Mapper { + + private int operations; + + private String id = ""; + + private LongWritable l = new LongWritable(); + + private Text t = new Text(); + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + // force in Log4J logging + org.apache.log4j.BasicConfigurator.configure(); + // and pick up scale test flag as passed down + boolean scaleMap = context.getConfiguration() + .getBoolean(KEY_SCALE_TESTS_ENABLED, false); + operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS; + id = context.getTaskAttemptID().toString(); + } + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + for (int i = 0; i < operations; i++) { + l.set(i); + t.set(String.format("%s:%05d", id, i)); + context.write(l, t); + } + } + } + + /** + * A binding class for committer tests. + * Subclasses of this will be instantiated and drive the parameterized + * test suite. + * + * These classes will be instantiated in a static array of the suite, and + * not bound to a cluster binding or filesystem. + * + * The per-method test {@link #setup()} method will call + * {@link #setup(ClusterBinding, S3AFileSystem)}, to link the instance + * to the specific test cluster and test filesystem in use + * in that test. + */ + private abstract static class CommitterTestBinding { + + /** + * Name. + */ + private final String committerName; + + /** + * Cluster binding. + */ + private ClusterBinding binding; + + /** + * The S3A filesystem. + */ + private S3AFileSystem remoteFS; + + /** + * Constructor. + * @param committerName name of the committer for messages. + */ + protected CommitterTestBinding(final String committerName) { + this.committerName = committerName; + } + + /** + * Set up the test binding: this is called during test setup. + * @param cluster the active test cluster. + * @param fs the S3A Filesystem used as a destination. + */ + private void setup( + ClusterBinding cluster, + S3AFileSystem fs) { + this.binding = cluster; + this.remoteFS = fs; + } + + protected String getCommitterName() { + return committerName; + } + + protected ClusterBinding getBinding() { + return binding; + } + + protected S3AFileSystem getRemoteFS() { + return remoteFS; + } + + protected FileSystem getClusterFS() throws IOException { + return getBinding().getClusterFS(); + } + + @Override + public String toString() { + return committerName; + } + + /** + * Override point to let implementations tune the MR Job conf. + * @param jobConf configuration + */ + protected void applyCustomConfigOptions(JobConf jobConf) + throws IOException { + } + + /** + * Should the inconsistent S3A client be used? + * @return true for inconsistent listing + */ + public abstract boolean useInconsistentClient(); + + /** + * Override point for any committer specific validation operations; + * called after the base assertions have all passed. + * @param destPath destination of work + * @param successData loaded success data + * @throws Exception failure + */ + protected void validateResult(Path destPath, + SuccessData successData) + throws Exception { + + } + + /** + * A test to run before the main {@link #test_200_execute()} test is + * invoked. + * @throws Throwable failure. + */ + void test_100() throws Throwable { + + } + + /** + * A test to run after the main {@link #test_200_execute()} test is + * invoked. + * @throws Throwable failure. + */ + void test_500() throws Throwable { + + } + + /** + * Validate the state of the binding. + * This is called in {@link #test_000()} so will + * fail independently of the other tests. + * @throws Throwable failure. + */ + public void validate() throws Throwable { + assertNotNull("Not bound to a cluster", binding); + assertNotNull("No cluster filesystem", getClusterFS()); + assertNotNull("No yarn cluster", binding.getYarn()); + } + } + + /** + * The directory staging committer. + */ + private static final class DirectoryCommitterTestBinding + extends CommitterTestBinding { + + private DirectoryCommitterTestBinding() { + super(DirectoryStagingCommitter.NAME); + } + + /** + * @return true for inconsistent listing + */ + public boolean useInconsistentClient() { + return true; + } + + /** + * Verify that staging commit dirs are made absolute under the user's + * home directory, so, in a secure cluster, private. + */ + @Override + void test_100() throws Throwable { + FileSystem fs = getClusterFS(); + Configuration conf = fs.getConf(); + String pri = "private"; + conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, pri); + Path dir = getMultipartUploadCommitsDirectory(conf, "uuid"); + Assertions.assertThat(dir.isAbsolute()) + .describedAs("non-absolute path") + .isTrue(); + String stagingTempDir = dir.toString().toLowerCase(Locale.ENGLISH); + String self = UserGroupInformation.getCurrentUser() + .getShortUserName().toLowerCase(Locale.ENGLISH); + Assertions.assertThat(stagingTempDir) + .describedAs("Staging committer temp path in cluster") + .contains(pri + "/" + self) + .endsWith("uuid/" + STAGING_UPLOADS); + } + } + + /** + * The partition committer test binding. + */ + private static final class PartitionCommitterTestBinding + extends CommitterTestBinding { + + private PartitionCommitterTestBinding() { + super(PartitionedStagingCommitter.NAME); + } + + /** + * @return true for inconsistent listing + */ + public boolean useInconsistentClient() { + return true; + } + } + + /** + * The magic committer test binding. + * This includes extra result validation. + */ + private static final class MagicCommitterTestBinding + extends CommitterTestBinding { + + private MagicCommitterTestBinding() { + super(MagicS3GuardCommitter.NAME); + } + + /** + * @return we need a consistent store. + */ + public boolean useInconsistentClient() { + return false; + } + + /** + * The result validation here is that there isn't a __magic directory + * any more. + * @param destPath destination of work + * @param successData loaded success data + * @throws Exception failure + */ + @Override + protected void validateResult(final Path destPath, + final SuccessData successData) + throws Exception { + Path magicDir = new Path(destPath, MAGIC); + + // if an FNFE isn't raised on getFileStatus, list out the directory + // tree + S3AFileSystem fs = getRemoteFS(); + // log the contents + lsR(fs, destPath, true); + intercept(FileNotFoundException.class, () -> { + final FileStatus st = fs.getFileStatus(magicDir); + StringBuilder result = new StringBuilder("Found magic dir which should" + + " have been deleted at ").append(st).append('\n'); + result.append("["); + applyLocatedFiles(fs.listFiles(magicDir, true), + (status) -> result.append(status.getPath()).append('\n')); + result.append("["); + return result.toString(); + }); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java deleted file mode 100644 index e403ab49b168e..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.magic; - -import java.io.FileNotFoundException; -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.files.SuccessData; -import org.apache.hadoop.mapred.JobConf; - -import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; -import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; - -/** - * Full integration test for the Magic Committer. - * - * There's no need to disable the committer setting for the filesystem here, - * because the committers are being instantiated in their own processes; - * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are - * passed down to these processes. - */ -public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - /** - * Need consistency here. - * @return false - */ - @Override - public boolean useInconsistentClient() { - return false; - } - - @Override - protected String committerName() { - return MagicS3GuardCommitter.NAME; - } - - /** - * Turn on the magic commit support for the FS, else nothing will work. - * @param conf configuration - */ - @Override - protected void applyCustomConfigOptions(JobConf conf) { - conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); - } - - /** - * Check that the magic dir was cleaned up. - * {@inheritDoc} - */ - @Override - protected void customPostExecutionValidation(Path destPath, - SuccessData successData) throws Exception { - Path magicDir = new Path(destPath, MAGIC); - - // if an FNFE isn't raised on getFileStatus, list out the directory - // tree - S3AFileSystem fs = getFileSystem(); - // log the contents - lsR(fs, destPath, true); - intercept(FileNotFoundException.class, () -> { - final FileStatus st = fs.getFileStatus(magicDir); - StringBuilder result = new StringBuilder("Found magic dir which should" - + " have been deleted at ").append(st).append('\n'); - result.append("["); - applyLocatedFiles(fs.listFiles(magicDir, true), - (status) -> result.append(status.getPath()).append('\n')); - result.append("["); - return result.toString(); - }); - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java deleted file mode 100644 index 1e44086b1e125..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; -import org.apache.hadoop.mapred.JobConf; - -/** - * Full integration test for the directory committer. - */ -public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return DirectoryStagingCommitter.NAME; - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java deleted file mode 100644 index 6106974ce74ed..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; -import org.apache.hadoop.mapred.JobConf; - -/** - * Full integration test for the partition committer. - */ -public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return PartitionedStagingCommitter.NAME; - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java deleted file mode 100644 index 218c72ac50ea0..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.hamcrest.core.StringContains; -import org.hamcrest.core.StringEndsWith; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; - -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; -import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; -import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; - -/** - * Full integration test for the staging committer. - */ -public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return StagingCommitter.NAME; - } - - /** - * Verify that staging commit dirs are made absolute under the user's - * home directory, so, in a secure cluster, private. - */ - @Test - public void testStagingDirectory() throws Throwable { - FileSystem hdfs = getDFS(); - Configuration conf = hdfs.getConf(); - conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private"); - Path dir = getMultipartUploadCommitsDirectory(conf, "UUID"); - assertThat("Directory " + dir + " path is wrong", - dir.toString(), - StringEndsWith.endsWith("UUID/" - + STAGING_UPLOADS)); - assertTrue("path unqualified", dir.isAbsolute()); - String self = UserGroupInformation.getCurrentUser().getShortUserName(); - assertThat(dir.toString(), - StringContains.containsString("/user/" + self + "/private")); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java deleted file mode 100644 index 72488132faf76..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.test.LambdaTestUtils; - -/** - * This is a test to verify that the committer will fail if the destination - * directory exists, and that this happens in job setup. - */ -public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return StagingCommitter.NAME; - } - - /** - * create the destination directory and expect a failure. - * @param conf configuration - */ - @Override - protected void applyCustomConfigOptions(JobConf conf) throws IOException { - // This is the destination in the S3 FS - String outdir = conf.get(FileOutputFormat.OUTDIR); - S3AFileSystem fs = getFileSystem(); - Path outputPath = new Path(outdir); - fs.mkdirs(outputPath); - } - - @Override - public void testMRJob() throws Exception { - LambdaTestUtils.intercept(FileAlreadyExistsException.class, - "Output directory", - super::testMRJob); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java deleted file mode 100644 index cb9cdd0f33455..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.terasort; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; -import org.apache.hadoop.mapred.JobConf; - -/** - * Terasort with the directory committer. - */ -public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - clusterBinding.terminate(); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return DirectoryStagingCommitter.NAME; - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java deleted file mode 100644 index e1b4eac627a59..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.terasort; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; -import org.apache.hadoop.mapred.JobConf; - -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; - -/** - * Terasort with the magic committer. - */ -public final class ITestTerasortMagicCommitter - extends AbstractCommitTerasortIT { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - clusterBinding.terminate(); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - @Override - protected String committerName() { - return MagicS3GuardCommitter.NAME; - } - - /** - * Turn on the magic commit support for the FS, else nothing will work. - * @param conf configuration - */ - @Override - protected void applyCustomConfigOptions(JobConf conf) { - conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java similarity index 50% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java index 479b3c80f7c55..dc6c6d19db9ab 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java @@ -19,24 +19,33 @@ package org.apache.hadoop.fs.s3a.commit.terasort; import java.io.File; -import java.nio.charset.Charset; +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; -import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.junit.Assume; import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runner.RunWith; import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.terasort.TeraGen; import org.apache.hadoop.examples.terasort.TeraSort; import org.apache.hadoop.examples.terasort.TeraSortConfigKeys; import org.apache.hadoop.examples.terasort.TeraValidate; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.StringUtils; @@ -44,45 +53,79 @@ import org.apache.hadoop.util.ToolRunner; import static java.util.Optional.empty; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; /** * Runs Terasort against S3A. * - * This is all done on a shared mini YARN and HDFS clusters, set up before - * any of the tests methods run. - * + * Parameterized by committer name, using a YARN cluster + * shared across all test runs. * The tests run in sequence, so each operation is isolated. - * This also means that the test paths deleted in test + * This also means that the test paths are deleted in test * teardown; shared variables must all be static. + * + * The test is a scale test; for each parameter it takes a few minutes to + * run the full suite. + * Before anyone calls that out as slow: try running the test with the file + * committer. */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) +@RunWith(Parameterized.class) @SuppressWarnings("StaticNonFinalField") -public abstract class AbstractCommitTerasortIT extends - AbstractYarnClusterITest { +public class ITestTerasortOnS3A extends AbstractYarnClusterITest { private static final Logger LOG = - LoggerFactory.getLogger(AbstractCommitTerasortIT.class); + LoggerFactory.getLogger(ITestTerasortOnS3A.class); - // all the durations are optional as they only get filled in when - // a test run successfully completes. Failed tests don't have numbers. - private static Optional terasortDuration = empty(); + public static final int EXPECTED_PARTITION_COUNT = 10; - private static Optional teragenStageDuration = empty(); + public static final int PARTITION_SAMPLE_SIZE = 1000; - private static Optional terasortStageDuration = empty(); + public static final int ROW_COUNT = 1000; - private static Optional teravalidateStageDuration = empty(); + /** + * Duration tracker created in the first of the test cases and closed + * in {@link #test_140_teracomplete()}. + */ + private static Optional terasortDuration = empty(); + /** + * Tracker of which stages are completed and how long they took. + */ + private static Map completedStages = new HashMap<>(); + + /** Name of the committer for this run. */ + private final String committerName; + + /** Base path for all the terasort input and output paths. */ private Path terasortPath; + /** Input (teragen) path. */ private Path sortInput; + /** Path where sorted data goes. */ private Path sortOutput; + /** Path for validated job's output. */ private Path sortValidate; + /** + * Test array for parameterized test runs. + * + * @return the committer binding for this run. + */ + @Parameterized.Parameters(name = "{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {DirectoryStagingCommitter.NAME}, + {MagicS3GuardCommitter.NAME}}); + } + + public ITestTerasortOnS3A(final String committerName) { + this.committerName = committerName; + } + /** * Not using special paths here. * @return false @@ -92,6 +135,11 @@ public boolean useInconsistentClient() { return false; } + @Override + protected String committerName() { + return committerName; + } + @Override public void setup() throws Exception { super.setup(); @@ -100,44 +148,88 @@ public void setup() throws Exception { } /** - * Set up for terasorting by initializing paths. - * The paths used must be unique across parallel runs. + * Set up the job conf with the options for terasort chosen by the scale + * options. + * @param conf configuration */ - private void prepareToTerasort() { + @Override + protected void applyCustomConfigOptions(JobConf conf) { // small sample size for faster runs - Configuration yarnConfig = getYarn().getConfig(); - yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000); - yarnConfig.setBoolean( - TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), - true); - yarnConfig.setBoolean( + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), + getSampleSizeForEachPartition()); + conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), + getExpectedPartitionCount()); + conf.setBoolean( TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false); - terasortPath = new Path("/terasort-" + getClass().getSimpleName()) + } + + private int getExpectedPartitionCount() { + return EXPECTED_PARTITION_COUNT; + } + + private int getSampleSizeForEachPartition() { + return PARTITION_SAMPLE_SIZE; + } + + protected int getRowCount() { + return ROW_COUNT; + } + + /** + * Set up the terasort by initializing paths variables + * The paths used must be unique across parameterized runs but + * common across all test cases in a single parameterized run. + */ + private void prepareToTerasort() { + // small sample size for faster runs + terasortPath = new Path("/terasort-" + committerName) .makeQualified(getFileSystem()); sortInput = new Path(terasortPath, "sortin"); sortOutput = new Path(terasortPath, "sortout"); sortValidate = new Path(terasortPath, "validate"); - if (!terasortDuration.isPresent()) { - terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort")); - } + + } + + /** + * Declare that a stage has completed. + * @param stage stage name/key in the map + * @param d duration. + */ + private static void completedStage(final String stage, + final DurationInfo d) { + completedStages.put(stage, d); } /** - * Execute a single stage in the terasort, - * @param stage Stage name for messages/assertions. + * Declare a stage which is required for this test case. + * @param stage stage name + */ + private static void requireStage(final String stage) { + Assume.assumeTrue( + "Required stage was not completed: " + stage, + completedStages.get(stage) != null); + } + + /** + * Execute a single stage in the terasort. + * Updates the completed stages map with the stage duration -if successful. + * @param stage Stage name for the stages map. * @param jobConf job conf - * @param dest destination directory -the _SUCCESS File will be expected here. + * @param dest destination directory -the _SUCCESS file will be expected here. * @param tool tool to run. * @param args args for the tool. + * @param minimumFileCount minimum number of files to have been created * @throws Exception any failure */ - private Optional executeStage( + private void executeStage( final String stage, final JobConf jobConf, final Path dest, final Tool tool, - final String[] args) throws Exception { + final String[] args, + final int minimumFileCount) throws Exception { int result; DurationInfo d = new DurationInfo(LOG, stage); try { @@ -145,22 +237,30 @@ private Optional executeStage( } finally { d.close(); } + dumpOutputTree(dest); assertEquals(stage + "(" + StringUtils.join(", ", args) + ")" + " failed", 0, result); - validateSuccessFile(dest, committerName(), getFileSystem(), stage); - return Optional.of(d); + validateSuccessFile(dest, committerName(), getFileSystem(), stage, + minimumFileCount); + completedStage(stage, d); } /** * Set up terasort by cleaning out the destination, and note the initial * time before any of the jobs are executed. + * + * This is executed first for each parameterized run. + * It is where all variables which need to be reset for each run need + * to be reset. */ @Test public void test_100_terasort_setup() throws Throwable { describe("Setting up for a terasort"); getFileSystem().delete(terasortPath, true); + completedStages = new HashMap<>(); + terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort")); } @Test @@ -170,42 +270,46 @@ public void test_110_teragen() throws Throwable { JobConf jobConf = newJobConf(); patchConfigurationForCommitter(jobConf); - teragenStageDuration = executeStage("Teragen", + executeStage("teragen", jobConf, sortInput, new TeraGen(), - new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()}); + new String[]{Integer.toString(getRowCount()), sortInput.toString()}, + 1); } + @Test public void test_120_terasort() throws Throwable { describe("Terasort from %s to %s", sortInput, sortOutput); + requireStage("teragen"); getFileSystem().delete(sortOutput, true); loadSuccessFile(getFileSystem(), sortInput, "previous teragen stage"); JobConf jobConf = newJobConf(); patchConfigurationForCommitter(jobConf); - // this job adds some data, so skip it. - jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); - terasortStageDuration = executeStage("TeraSort", + executeStage("terasort", jobConf, sortOutput, new TeraSort(), - new String[]{sortInput.toString(), sortOutput.toString()}); + new String[]{sortInput.toString(), sortOutput.toString()}, + 1); } @Test public void test_130_teravalidate() throws Throwable { describe("TeraValidate from %s to %s", sortOutput, sortValidate); + requireStage("terasort"); getFileSystem().delete(sortValidate, true); loadSuccessFile(getFileSystem(), sortOutput, "previous terasort stage"); JobConf jobConf = newJobConf(); patchConfigurationForCommitter(jobConf); - teravalidateStageDuration = executeStage("TeraValidate", + executeStage("teravalidate", jobConf, sortValidate, new TeraValidate(), - new String[]{sortOutput.toString(), sortValidate.toString()}); + new String[]{sortOutput.toString(), sortValidate.toString()}, + 1); } /** @@ -214,7 +318,10 @@ public void test_130_teravalidate() throws Throwable { */ @Test public void test_140_teracomplete() throws Throwable { - terasortDuration.get().close(); + terasortDuration.ifPresent(d -> { + d.close(); + completedStage("overall", d); + }); final StringBuilder results = new StringBuilder(); results.append("\"Operation\"\t\"Duration\"\n"); @@ -222,19 +329,20 @@ public void test_140_teracomplete() throws Throwable { // this is how you dynamically create a function in a method // for use afterwards. // Works because there's no IOEs being raised in this sequence. - BiConsumer> stage = - (s, od) -> - results.append(String.format("\"%s\"\t\"%s\"\n", - s, - od.map(DurationInfo::getDurationString).orElse(""))); - - stage.accept("Generate", teragenStageDuration); - stage.accept("Terasort", terasortStageDuration); - stage.accept("Validate", teravalidateStageDuration); - stage.accept("Completed", terasortDuration); + Consumer stage = (s) -> { + DurationInfo duration = completedStages.get(s); + results.append(String.format("\"%s\"\t\"%s\"\n", + s, + duration == null ? "" : duration)); + }; + + stage.accept("teragen"); + stage.accept("terasort"); + stage.accept("teravalidate"); + stage.accept("overall"); String text = results.toString(); File resultsFile = File.createTempFile("results", ".csv"); - FileUtils.write(resultsFile, text, Charset.forName("UTF-8")); + FileUtils.write(resultsFile, text, StandardCharsets.UTF_8); LOG.info("Results are in {}\n{}", resultsFile, text); } @@ -252,4 +360,18 @@ public void test_150_teracleanup() throws Throwable { public void test_200_directory_deletion() throws Throwable { getFileSystem().delete(terasortPath, true); } + + /** + * Dump the files under a path -but not fail if the path is not present., + * @param path path to dump + * @throws Exception any failure. + */ + protected void dumpOutputTree(Path path) throws Exception { + LOG.info("Files under output directory {}", path); + try { + lsR(getFileSystem(), path, true); + } catch (FileNotFoundException e) { + LOG.info("Output directory {} not found", path); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index f6162644e2535..6e20fbcda7efd 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -58,7 +58,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN # Log S3Guard classes #log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG # if set to debug, this will log the PUT/DELETE operations on a store -#log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG +log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG # Log Committer classes #log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG