From f44abc3e11676579bdea94fce045d081ae38e6c3 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 4 Oct 2019 14:11:22 +0100 Subject: [PATCH] HADOOP-16207 Improved S3A MR tests. Contributed by Steve Loughran. Replaces the committer-specific terasort and MR test jobs with parameterization of the (now single tests) and use of file:// over hdfs:// as the cluster FS. The parameterization ensures that only one of the specific committer tests run at a time -overloads of the test machines are less likely, and so the suites can be pulled back into the parallel phase. There's also more detailed validation of the stage outputs of the terasorting; if one test fails the rest are all skipped. This and the fact that job output is stored under target/yarn-${timestamp} means failures should be more debuggable. Change-Id: Iefa370ba73c6419496e6e69dd6673d00f37ff095 --- hadoop-tools/hadoop-aws/pom.xml | 4 - .../s3a/commit/staging/StagingCommitter.java | 3 +- .../staging/StagingCommitterConstants.java | 2 +- .../fs/s3a/commit/AbstractCommitITest.java | 18 +- .../fs/s3a/commit/AbstractITCommitMRJob.java | 223 ------ .../s3a/commit/AbstractYarnClusterITest.java | 196 ++++-- .../integration/ITestS3ACommitterMRJob.java | 644 ++++++++++++++++++ .../commit/magic/ITestMagicCommitMRJob.java | 120 ---- .../ITestDirectoryCommitMRJob.java | 61 -- .../ITestPartitionCommitMRJob.java | 62 -- .../integration/ITestStagingCommitMRJob.java | 94 --- .../ITestStagingCommitMRJobBadDest.java | 89 --- .../ITestTerasortDirectoryCommitter.java | 62 -- .../terasort/ITestTerasortMagicCommitter.java | 73 -- ...erasortIT.java => ITestTerasortOnS3A.java} | 238 +++++-- .../src/test/resources/log4j.properties | 2 +- 16 files changed, 987 insertions(+), 904 deletions(-) delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/{AbstractCommitTerasortIT.java => ITestTerasortOnS3A.java} (50%) diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index ff330e52dc01e..bd204b08a612f 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -188,8 +188,6 @@ **/ITestDynamoDBMetadataStoreScale.java **/ITestTerasort*.java - - **/ITest*CommitMRJob.java **/ITestS3GuardDDBRootOperations.java @@ -231,8 +229,6 @@ **/ITestTerasort*.java - - **/ITest*CommitMRJob.java **/ITestS3AContractRootDir.java **/ITestS3GuardDDBRootOperations.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 7ec447863c150..833edd4a6b022 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -677,7 +677,8 @@ protected int commitTaskInternal(final TaskAttemptContext context, // we will try to abort the ones that had already succeeded. int commitCount = taskOutput.size(); final Queue commits = new ConcurrentLinkedQueue<>(); - LOG.info("{}: uploading from staging directory to S3", getRole()); + LOG.info("{}: uploading from staging directory to S3 {}", getRole(), + attemptPath); LOG.info("{}: Saving pending data information to {}", getRole(), commitsAttemptPath); if (taskOutput.isEmpty()) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java index c5fb967863953..c41715bd497d8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java @@ -34,7 +34,7 @@ private StagingCommitterConstants() { /** * The temporary path for staging data, if not explicitly set. * By using an unqualified path, this will be qualified to be relative - * to the users' home directory, so protectec from access for others. + * to the users' home directory, so protected from access for others. */ public static final String FILESYSTEM_TEMP_PATH = "tmp/staging"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java index 6023daa13875b..1cf3fb4a3f65f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import com.amazonaws.services.s3.AmazonS3; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +112,9 @@ protected Configuration createConfiguration() { MAGIC_COMMITTER_ENABLED, S3A_COMMITTER_FACTORY_KEY, FS_S3A_COMMITTER_NAME, - FS_S3A_COMMITTER_STAGING_CONFLICT_MODE); + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, + FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + FAST_UPLOAD_BUFFER); conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); @@ -209,6 +212,7 @@ public static String randomJobId() throws Exception { */ @Override public void teardown() throws Exception { + Thread.currentThread().setName("teardown"); LOG.info("AbstractCommitITest::teardown"); waitForConsistency(); // make sure there are no failures any more @@ -359,7 +363,7 @@ private String pathToPrefix(Path path) { * @throws IOException IO Failure */ protected SuccessData verifySuccessMarker(Path dir) throws IOException { - return validateSuccessFile(dir, "", getFileSystem(), "query"); + return validateSuccessFile(dir, "", getFileSystem(), "query", 0); } /** @@ -437,13 +441,15 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId, * @param committerName name of committer to match, or "" * @param fs filesystem * @param origin origin (e.g. "teragen" for messages) + * @param minimumFileCount minimum number of files to have been created * @return the success data * @throws IOException IO failure */ public static SuccessData validateSuccessFile(final Path outputPath, final String committerName, final S3AFileSystem fs, - final String origin) throws IOException { + final String origin, + final int minimumFileCount) throws IOException { SuccessData successData = loadSuccessFile(fs, outputPath, origin); String commitDetails = successData.toString(); LOG.info("Committer name " + committerName + "\n{}", @@ -456,6 +462,9 @@ public static SuccessData validateSuccessFile(final Path outputPath, assertEquals("Wrong committer in " + commitDetails, committerName, successData.getCommitter()); } + Assertions.assertThat(successData.getFilenames()) + .describedAs("Files committed") + .hasSizeGreaterThanOrEqualTo(minimumFileCount); return successData; } @@ -485,8 +494,9 @@ public static SuccessData loadSuccessFile(final S3AFileSystem fs, status.isFile()); assertTrue("0 byte success file " + success + " from " + origin - + "; a s3guard committer was not used", + + "; an S3A committer was not used", status.getLen() > 0); + LOG.info("Loading committer success file {}", success); return SuccessData.load(fs, success); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java deleted file mode 100644 index 1a518474bcb03..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import com.google.common.collect.Sets; -import org.assertj.core.api.Assertions; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3AUtils; -import org.apache.hadoop.fs.s3a.commit.files.SuccessData; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.DurationInfo; - -import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; -import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; - -/** - * Test for an MR Job with all the different committers. - */ -public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest { - - private static final Logger LOG = - LoggerFactory.getLogger(AbstractITCommitMRJob.class); - - @Override - protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - disableFilesystemCaching(conf); - return conf; - } - - @Rule - public final TemporaryFolder temp = new TemporaryFolder(); - - @Test - public void testMRJob() throws Exception { - describe("Run a simple MR Job"); - - S3AFileSystem fs = getFileSystem(); - // final dest is in S3A - Path outputPath = path(getMethodName()); - // create and delete to force in a tombstone marker -see HADOOP-16207 - fs.mkdirs(outputPath); - fs.delete(outputPath, true); - - String commitUUID = UUID.randomUUID().toString(); - String suffix = isUniqueFilenames() ? ("-" + commitUUID) : ""; - int numFiles = getTestFileCount(); - List expectedFiles = new ArrayList<>(numFiles); - Set expectedKeys = Sets.newHashSet(); - for (int i = 0; i < numFiles; i += 1) { - File file = temp.newFile(i + ".text"); - try (FileOutputStream out = new FileOutputStream(file)) { - out.write(("file " + i).getBytes(StandardCharsets.UTF_8)); - } - String filename = String.format("part-m-%05d%s", i, suffix); - Path path = new Path(outputPath, filename); - expectedFiles.add(path.toString()); - expectedKeys.add("/" + fs.pathToKey(path)); - } - Collections.sort(expectedFiles); - - Job mrJob = createJob(); - JobConf jobConf = (JobConf) mrJob.getConfiguration(); - - mrJob.setOutputFormatClass(LoggingTextOutputFormat.class); - FileOutputFormat.setOutputPath(mrJob, outputPath); - - File mockResultsFile = temp.newFile("committer.bin"); - mockResultsFile.delete(); - String committerPath = "file:" + mockResultsFile; - jobConf.set("mock-results-file", committerPath); - jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID); - jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "/staging"); - - mrJob.setInputFormatClass(TextInputFormat.class); - FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI())); - - mrJob.setMapperClass(MapClass.class); - mrJob.setNumReduceTasks(0); - - // an attempt to set up log4j properly, which clearly doesn't work - URL log4j = getClass().getClassLoader().getResource("log4j.properties"); - if (log4j != null && log4j.getProtocol().equals("file")) { - Path log4jPath = new Path(log4j.toURI()); - LOG.debug("Using log4j path {}", log4jPath); - mrJob.addFileToClassPath(log4jPath); - String sysprops = String.format("-Xmx256m -Dlog4j.configuration=%s", - log4j); - jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops); - jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops); - jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops); - } - - applyCustomConfigOptions(jobConf); - // fail fast if anything goes wrong - mrJob.setMaxMapAttempts(1); - - mrJob.submit(); - try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) { - boolean succeeded = mrJob.waitForCompletion(true); - assertTrue("MR job failed", succeeded); - } - - waitForConsistency(); - verifyPathExists(fs, - "MR job Output directory not found," - + " even though the job did not report a failure", - outputPath); - assertIsDirectory(outputPath); - FileStatus[] results = fs.listStatus(outputPath, - S3AUtils.HIDDEN_FILE_FILTER); - int fileCount = results.length; - List actualFiles = new ArrayList<>(fileCount); - assertTrue("No files in output directory", fileCount != 0); - LOG.info("Found {} files", fileCount); - for (FileStatus result : results) { - LOG.debug("result: {}", result); - actualFiles.add(result.getPath().toString()); - } - Collections.sort(actualFiles); - - SuccessData successData = validateSuccessFile(outputPath, committerName(), - fs, "MR job"); - List successFiles = successData.getFilenames(); - String commitData = successData.toString(); - assertFalse("No filenames in " + commitData, - successFiles.isEmpty()); - - Assertions.assertThat(actualFiles) - .describedAs("Committed files in the job output directory") - .containsExactlyInAnyOrderElementsOf(expectedFiles); - - Assertions.assertThat(successFiles) - .describedAs("List of committed files in %s", commitData) - .containsExactlyInAnyOrderElementsOf(expectedKeys); - - assertPathDoesNotExist("temporary dir", - new Path(outputPath, CommitConstants.TEMPORARY)); - customPostExecutionValidation(outputPath, successData); - } - - /** - * Test Mapper. - * This is executed in separate process, and must not make any assumptions - * about external state. - */ - public static class MapClass - extends Mapper { - - private int operations; - private String id = ""; - private LongWritable l = new LongWritable(); - private Text t = new Text(); - - @Override - protected void setup(Context context) - throws IOException, InterruptedException { - super.setup(context); - // force in Log4J logging - org.apache.log4j.BasicConfigurator.configure(); - boolean scaleMap = context.getConfiguration() - .getBoolean(KEY_SCALE_TESTS_ENABLED, false); - operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS; - id = context.getTaskAttemptID().toString(); - } - - @Override - protected void map(LongWritable key, Text value, Context context) - throws IOException, InterruptedException { - for (int i = 0; i < operations; i++) { - l.set(i); - t.set(String.format("%s:%05d", id, i)); - context.write(l, t); - } - } - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java index 2e8f1f090403c..783c62686bad7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java @@ -19,8 +19,12 @@ package org.apache.hadoop.fs.s3a.commit; import java.io.IOException; -import java.util.UUID; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,19 +37,22 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration; import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES; /** * Full integration test MR jobs. * - * This is all done on shared static mini YARN and HDFS clusters, set up before - * any of the tests methods run. + * This is all done on shared static mini YARN and (optionally) HDFS clusters, + * set up before any of the tests methods run. * * To isolate tests properly for parallel test runs, that static state * needs to be stored in the final classes implementing the tests, and @@ -61,38 +68,54 @@ * If two subclasses of this class are instantiated in the same JVM, in order, * they are guaranteed to be isolated. * - * History: this is a superclass extracted from - * {@link AbstractITCommitMRJob} while adding support for testing terasorting. - * */ public abstract class AbstractYarnClusterITest extends AbstractCommitITest { private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnClusterITest.class); - private static final int TEST_FILE_COUNT = 2; - private static final int SCALE_TEST_FILE_COUNT = 50; + private static final int TEST_FILE_COUNT = 1; + private static final int SCALE_TEST_FILE_COUNT = 10; - public static final int SCALE_TEST_KEYS = 1000; + public static final int SCALE_TEST_KEYS = 100; public static final int BASE_TEST_KEYS = 10; + public static final int NO_OF_NODEMANAGERS = 2; + private boolean scaleTest; - private boolean uniqueFilenames = false; + /** + * The static cluster binding with the lifecycle of this test; served + * through instance-level methods for sharing across methods in the + * suite. + */ + @SuppressWarnings("StaticNonFinalField") + private static ClusterBinding clusterBinding; + + + @AfterClass + public static void teardownClusters() throws IOException { + terminateCluster(clusterBinding); + clusterBinding = null; + } /** * This is the cluster binding which every subclass must create. */ protected static final class ClusterBinding { + private String clusterName; + private final MiniDFSClusterService hdfs; private final MiniMRYarnCluster yarn; public ClusterBinding( + final String clusterName, final MiniDFSClusterService hdfs, final MiniMRYarnCluster yarn) { - this.hdfs = checkNotNull(hdfs); + this.clusterName = clusterName; + this.hdfs = hdfs; this.yarn = checkNotNull(yarn); } @@ -100,6 +123,18 @@ public MiniDFSClusterService getHdfs() { return hdfs; } + /** + * Get the cluster FS, which will either be HDFS or the local FS. + * @return a filesystem. + * @throws IOException failure + */ + public FileSystem getClusterFS() throws IOException { + MiniDFSClusterService miniCluster = getHdfs(); + return miniCluster != null + ? miniCluster.getClusterFS() + : FileSystem.getLocal(yarn.getConfig()); + } + public MiniMRYarnCluster getYarn() { return yarn; } @@ -108,6 +143,10 @@ public Configuration getConf() { return getYarn().getConfig(); } + public String getClusterName() { + return clusterName; + } + public void terminate() { terminateService(getYarn()); terminateService(getHdfs()); @@ -115,74 +154,111 @@ public void terminate() { } /** - * Create the cluster binding. This must be done in - * class setup of the (final) subclass. - * The HDFS and YARN clusters share the same configuration, so + * Create the cluster binding. + * The configuration will be patched by propagating down options + * from the maven build (S3Guard binding etc) and turning off unwanted + * YARN features. + * + * If an HDFS cluster is requested, + * the HDFS and YARN clusters will share the same configuration, so * the HDFS cluster binding is implicitly propagated to YARN. + * If one is not requested, the local filesystem is used as the cluster FS. * @param conf configuration to start with. + * @param useHDFS should an HDFS cluster be instantiated. * @return the cluster binding. * @throws IOException failure. */ - protected static ClusterBinding createCluster(JobConf conf) - throws IOException { - + protected static ClusterBinding createCluster( + final JobConf conf, + final boolean useHDFS) throws IOException { + prepareTestConfiguration(conf); conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false); conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE); - - // create a unique cluster name. - String clusterName = "yarn-" + UUID.randomUUID(); - MiniDFSClusterService miniDFSClusterService = deployService(conf, - new MiniDFSClusterService()); + // minicluster tests overreact to not enough disk space. + conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false); + conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100); + // create a unique cluster name based on the current time in millis. + String timestamp = LocalDateTime.now().format( + DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS")); + String clusterName = "yarn-" + timestamp; + MiniDFSClusterService miniDFSClusterService = + useHDFS + ? deployService(conf, new MiniDFSClusterService()) + : null; MiniMRYarnCluster yarnCluster = deployService(conf, - new MiniMRYarnCluster(clusterName, 2)); - return new ClusterBinding(miniDFSClusterService, yarnCluster); + new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS)); + return new ClusterBinding(clusterName, miniDFSClusterService, yarnCluster); } - protected static void terminateCluster(ClusterBinding clusterBinding) { - if (clusterBinding != null) { - clusterBinding.terminate(); + /** + * Terminate the cluster if it is not null. + * @param cluster the cluster + */ + protected static void terminateCluster(ClusterBinding cluster) { + if (cluster != null) { + cluster.terminate(); } } /** - * Get the cluster binding for this subclass - * @return + * Get the cluster binding for this subclass. + * @return the cluster binding */ - protected abstract ClusterBinding getClusterBinding(); - - protected MiniDFSClusterService getHdfs() { - return getClusterBinding().getHdfs(); + protected ClusterBinding getClusterBinding() { + return clusterBinding; } - protected MiniMRYarnCluster getYarn() { return getClusterBinding().getYarn(); } - public FileSystem getLocalFS() { - return getHdfs().getLocalFS(); + /** + * Get the cluster filesystem -hdfs or local. + * @return the filesystem shared across the yarn nodes. + */ + protected FileSystem getClusterFS() throws IOException { + return getClusterBinding().getClusterFS(); } - protected FileSystem getDFS() { - return getHdfs().getClusterFS(); - } + + /** + * We stage work into a temporary directory rather than directly under + * the user's home directory, as that is often rejected by CI test + * runners. + */ + @Rule + public final TemporaryFolder stagingFilesDir = new TemporaryFolder(); /** * The name of the committer as returned by - * {@link AbstractS3ACommitter#getName()} and used for committer construction. + * {@link AbstractS3ACommitter#getName()} + * and used for committer construction. */ protected abstract String committerName(); + /** + * binding on demand rather than in a BeforeClass static method. + * Subclasses can override this to change the binding options. + * @return the cluster binding + */ + protected ClusterBinding demandCreateClusterBinding() throws Exception { + return createCluster(new JobConf(), false); + } + @Override public void setup() throws Exception { super.setup(); - assertNotNull("cluster is not bound", - getClusterBinding()); scaleTest = getTestPropertyBool( getConfiguration(), KEY_SCALE_TESTS_ENABLED, DEFAULT_SCALE_TESTS_ENABLED); + if (getClusterBinding() == null) { + clusterBinding = demandCreateClusterBinding(); + } + assertNotNull("cluster is not bound", + getClusterBinding()); + } @Override @@ -190,28 +266,46 @@ protected int getTestTimeoutMillis() { return SCALE_TEST_TIMEOUT_SECONDS * 1000; } - protected JobConf newJobConf() { - return new JobConf(getYarn().getConfig()); + /** + * Create a job configuration. + * This creates a new job conf from the yarn + * cluster configuration then calls + * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized. + * @return the new job configuration. + * @throws IOException failure + */ + protected JobConf newJobConf() throws IOException { + JobConf jobConf = new JobConf(getYarn().getConfig()); + jobConf.addResource(getConfiguration()); + applyCustomConfigOptions(jobConf); + return jobConf; } - protected Job createJob() throws IOException { - Configuration jobConf = getClusterBinding().getConf(); - jobConf.addResource(getConfiguration()); + protected Job createJob(Configuration jobConf) throws IOException { Job mrJob = Job.getInstance(jobConf, getMethodName()); patchConfigurationForCommitter(mrJob.getConfiguration()); return mrJob; } + /** + * Patch the (job) configuration for this committer. + * @param jobConf configuration to patch + * @return a configuration which will run this configuration. + */ protected Configuration patchConfigurationForCommitter( final Configuration jobConf) { jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, - uniqueFilenames); + isUniqueFilenames()); bindCommitter(jobConf, CommitConstants.S3A_COMMITTER_FACTORY, committerName()); // pass down the scale test flag - jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest); + jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, isScaleTest()); + // and fix the commit dir to the local FS across all workers. + String staging = stagingFilesDir.getRoot().getAbsolutePath(); + LOG.info("Staging temp dir is {}", staging); + jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, staging); return jobConf; } @@ -220,7 +314,7 @@ protected Configuration patchConfigurationForCommitter( * @return the number of mappers to create. */ public int getTestFileCount() { - return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; + return isScaleTest() ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT; } /** @@ -258,6 +352,6 @@ public boolean isScaleTest() { } public boolean isUniqueFilenames() { - return uniqueFilenames; + return false; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java new file mode 100644 index 0000000000000..e3e44497d4e6a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.integration; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; +import org.assertj.core.api.Assertions; +import org.junit.FixMethodOrder; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.LoggingTextOutputFormat; +import org.apache.hadoop.fs.s3a.commit.files.SuccessData; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.DurationInfo; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; +import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; +import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test an MR Job with all the different committers. + *

+ * This is a fairly complex parameterization: it is designed to + * avoid the overhead of starting a Yarn cluster for + * individual committer types, so speed up operations. + *

+ * It also implicitly guarantees that there is never more than one of these + * MR jobs active at a time, so avoids overloading the test machine with too + * many processes. + * How the binding works: + *

    + *
  1. + * Each parameterized suite is configured through its own + * {@link CommitterTestBinding} subclass. + *
  2. + *
  3. + * JUnit runs these test suites one parameterized binding at a time. + *
  4. + *
  5. + * The test suites are declared to be executed in ascending order, so + * that for a specific binding, the order is {@link #test_000()}, + * {@link #test_100()} {@link #test_200_execute()} and finally + * {@link #test_500()}. + *
  6. + *
  7. + * {@link #test_000()} calls {@link CommitterTestBinding#validate()} to + * as to validate the state of the committer. This is primarily to + * verify that the binding setup mechanism is working. + *
  8. + *
  9. + * {@link #test_100()} is relayed to + * {@link CommitterTestBinding#test_100()}, + * for any preflight tests. + *
  10. + *
  11. + * The {@link #test_200_execute()} test runs the MR job for that + * particular binding with standard reporting and verification of the + * outcome. + *
  12. + *
  13. + * {@link #test_500()} test is relayed to + * {@link CommitterTestBinding#test_500()}, for any post-MR-job tests. + *
+ * + * A new S3A FileSystem instance is created for each test_ method, so the + * pre-execute and post-execute validators cannot inspect the state of the + * FS as part of their tests. + * However, as the MR workers and AM all run in their own processes, there's + * generally no useful information about the job in the local S3AFileSystem + * instance. + */ +@RunWith(Parameterized.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestS3ACommitterMRJob.class); + + /** + * Test array for parameterized test runs. + * + * @return the committer binding for this run. + */ + @Parameterized.Parameters(name = "{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {new DirectoryCommitterTestBinding()}, + {new PartitionCommitterTestBinding()}, + {new MagicCommitterTestBinding()}, + }); + } + + /** + * The committer binding for this instance. + */ + private final CommitterTestBinding committerTestBinding; + + /** + * Parameterized constructor. + * @param committerTestBinding binding for the test. + */ + public ITestS3ACommitterMRJob( + final CommitterTestBinding committerTestBinding) { + this.committerTestBinding = committerTestBinding; + } + + @Override + public void setup() throws Exception { + super.setup(); + // configure the test binding for this specific test case. + committerTestBinding.setup(getClusterBinding(), getFileSystem()); + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + disableFilesystemCaching(conf); + return conf; + } + + @Rule + public final TemporaryFolder localFilesDir = new TemporaryFolder(); + + @Override + protected String committerName() { + return committerTestBinding.getCommitterName(); + } + + @Override + public boolean useInconsistentClient() { + return committerTestBinding.useInconsistentClient(); + } + + /** + * Verify that the committer binding is happy. + */ + @Test + public void test_000() throws Throwable { + committerTestBinding.validate(); + + } + @Test + public void test_100() throws Throwable { + committerTestBinding.test_100(); + } + + @Test + public void test_200_execute() throws Exception { + describe("Run an MR with committer %s", committerName()); + + S3AFileSystem fs = getFileSystem(); + // final dest is in S3A + // we can't use the method name as the template places square braces into + // that and URI creation fails. + + Path outputPath = path("ITestS3ACommitterMRJob-execute-"+ committerName()); + // create and delete to force in a tombstone marker -see HADOOP-16207 + fs.mkdirs(outputPath); + fs.delete(outputPath, true); + + String commitUUID = UUID.randomUUID().toString(); + String suffix = isUniqueFilenames() ? ("-" + commitUUID) : ""; + int numFiles = getTestFileCount(); + + // create all the input files on the local FS. + List expectedFiles = new ArrayList<>(numFiles); + Set expectedKeys = Sets.newHashSet(); + for (int i = 0; i < numFiles; i += 1) { + File file = localFilesDir.newFile(i + ".text"); + try (FileOutputStream out = new FileOutputStream(file)) { + out.write(("file " + i).getBytes(StandardCharsets.UTF_8)); + } + String filename = String.format("part-m-%05d%s", i, suffix); + Path path = new Path(outputPath, filename); + expectedFiles.add(path.toString()); + expectedKeys.add("/" + fs.pathToKey(path)); + } + Collections.sort(expectedFiles); + + Job mrJob = createJob(newJobConf()); + JobConf jobConf = (JobConf) mrJob.getConfiguration(); + + mrJob.setOutputFormatClass(LoggingTextOutputFormat.class); + FileOutputFormat.setOutputPath(mrJob, outputPath); + + File mockResultsFile = localFilesDir.newFile("committer.bin"); + mockResultsFile.delete(); + String committerPath = "file:" + mockResultsFile; + jobConf.set("mock-results-file", committerPath); + + // setting up staging options is harmless for other committers + jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID); + + mrJob.setInputFormatClass(TextInputFormat.class); + FileInputFormat.addInputPath(mrJob, + new Path(localFilesDir.getRoot().toURI())); + + mrJob.setMapperClass(MapClass.class); + mrJob.setNumReduceTasks(0); + + // an attempt to set up log4j properly, which clearly doesn't work + URL log4j = getClass().getClassLoader().getResource("log4j.properties"); + if (log4j != null && "file".equals(log4j.getProtocol())) { + Path log4jPath = new Path(log4j.toURI()); + LOG.debug("Using log4j path {}", log4jPath); + mrJob.addFileToClassPath(log4jPath); + String sysprops = String.format("-Xmx128m -Dlog4j.configuration=%s", + log4j); + jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops); + jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops); + jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops); + } + + applyCustomConfigOptions(jobConf); + // fail fast if anything goes wrong + mrJob.setMaxMapAttempts(1); + + try (DurationInfo ignore = new DurationInfo(LOG, "Job Submit")) { + mrJob.submit(); + } + String jobID = mrJob.getJobID().toString(); + String logLocation = "logs under " + + getYarn().getTestWorkDir().getAbsolutePath(); + try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) { + mrJob.waitForCompletion(true); + } + JobStatus status = mrJob.getStatus(); + if (!mrJob.isSuccessful()) { + // failure of job. + // be as meaningful as possible. + String message = + String.format("Job %s failed in state %s with cause %s.\n" + + "Consult %s", + jobID, + status.getState(), + status.getFailureInfo(), + logLocation); + LOG.error(message); + fail(message); + } + + waitForConsistency(); + Path successPath = new Path(outputPath, _SUCCESS); + SuccessData successData = validateSuccessFile(outputPath, + committerName(), + fs, + "MR job " + jobID, + 1); + String commitData = successData.toString(); + + FileStatus[] results = fs.listStatus(outputPath, + S3AUtils.HIDDEN_FILE_FILTER); + int fileCount = results.length; + Assertions.assertThat(fileCount) + .describedAs("No files from job %s in output directory %s; see %s", + jobID, + outputPath, + logLocation) + .isNotEqualTo(0); + + List actualFiles = Arrays.stream(results) + .map(s -> s.getPath().toString()) + .sorted() + .collect(Collectors.toList()); + + Assertions.assertThat(actualFiles) + .describedAs("Files found in %s", outputPath) + .isEqualTo(expectedFiles); + + Assertions.assertThat(successData.getFilenames()) + .describedAs("Success files listed in %s:%s", + successPath, commitData) + .isNotEmpty() + .containsExactlyInAnyOrderElementsOf(expectedKeys); + + assertPathDoesNotExist("temporary dir should only be from" + + " classic file committers", + new Path(outputPath, CommitConstants.TEMPORARY)); + customPostExecutionValidation(outputPath, successData); + } + + @Override + protected void applyCustomConfigOptions(final JobConf jobConf) + throws IOException { + committerTestBinding.applyCustomConfigOptions(jobConf); + } + + @Override + protected void customPostExecutionValidation(final Path destPath, + final SuccessData successData) throws Exception { + committerTestBinding.validateResult(destPath, successData); + } + + /** + * This is the extra test which committer test bindings can add. + */ + @Test + public void test_500() throws Throwable { + committerTestBinding.test_500(); + } + + /** + * Test Mapper. + * This is executed in separate process, and must not make any assumptions + * about external state. + */ + public static class MapClass + extends Mapper { + + private int operations; + + private String id = ""; + + private LongWritable l = new LongWritable(); + + private Text t = new Text(); + + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + super.setup(context); + // force in Log4J logging + org.apache.log4j.BasicConfigurator.configure(); + // and pick up scale test flag as passed down + boolean scaleMap = context.getConfiguration() + .getBoolean(KEY_SCALE_TESTS_ENABLED, false); + operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS; + id = context.getTaskAttemptID().toString(); + } + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + for (int i = 0; i < operations; i++) { + l.set(i); + t.set(String.format("%s:%05d", id, i)); + context.write(l, t); + } + } + } + + /** + * A binding class for committer tests. + * Subclasses of this will be instantiated and drive the parameterized + * test suite. + * + * These classes will be instantiated in a static array of the suite, and + * not bound to a cluster binding or filesystem. + * + * The per-method test {@link #setup()} method will call + * {@link #setup(ClusterBinding, S3AFileSystem)}, to link the instance + * to the specific test cluster and test filesystem in use + * in that test. + */ + private abstract static class CommitterTestBinding { + + /** + * Name. + */ + private final String committerName; + + /** + * Cluster binding. + */ + private ClusterBinding binding; + + /** + * The S3A filesystem. + */ + private S3AFileSystem remoteFS; + + /** + * Constructor. + * @param committerName name of the committer for messages. + */ + protected CommitterTestBinding(final String committerName) { + this.committerName = committerName; + } + + /** + * Set up the test binding: this is called during test setup. + * @param cluster the active test cluster. + * @param fs the S3A Filesystem used as a destination. + */ + private void setup( + ClusterBinding cluster, + S3AFileSystem fs) { + this.binding = cluster; + this.remoteFS = fs; + } + + protected String getCommitterName() { + return committerName; + } + + protected ClusterBinding getBinding() { + return binding; + } + + protected S3AFileSystem getRemoteFS() { + return remoteFS; + } + + protected FileSystem getClusterFS() throws IOException { + return getBinding().getClusterFS(); + } + + @Override + public String toString() { + return committerName; + } + + /** + * Override point to let implementations tune the MR Job conf. + * @param jobConf configuration + */ + protected void applyCustomConfigOptions(JobConf jobConf) + throws IOException { + } + + /** + * Should the inconsistent S3A client be used? + * @return true for inconsistent listing + */ + public abstract boolean useInconsistentClient(); + + /** + * Override point for any committer specific validation operations; + * called after the base assertions have all passed. + * @param destPath destination of work + * @param successData loaded success data + * @throws Exception failure + */ + protected void validateResult(Path destPath, + SuccessData successData) + throws Exception { + + } + + /** + * A test to run before the main {@link #test_200_execute()} test is + * invoked. + * @throws Throwable failure. + */ + void test_100() throws Throwable { + + } + + /** + * A test to run after the main {@link #test_200_execute()} test is + * invoked. + * @throws Throwable failure. + */ + void test_500() throws Throwable { + + } + + /** + * Validate the state of the binding. + * This is called in {@link #test_000()} so will + * fail independently of the other tests. + * @throws Throwable failure. + */ + public void validate() throws Throwable { + assertNotNull("Not bound to a cluster", binding); + assertNotNull("No cluster filesystem", getClusterFS()); + assertNotNull("No yarn cluster", binding.getYarn()); + } + } + + /** + * The directory staging committer. + */ + private static final class DirectoryCommitterTestBinding + extends CommitterTestBinding { + + private DirectoryCommitterTestBinding() { + super(DirectoryStagingCommitter.NAME); + } + + /** + * @return true for inconsistent listing + */ + public boolean useInconsistentClient() { + return true; + } + + /** + * Verify that staging commit dirs are made absolute under the user's + * home directory, so, in a secure cluster, private. + */ + @Override + void test_100() throws Throwable { + FileSystem fs = getClusterFS(); + Configuration conf = fs.getConf(); + String pri = "private"; + conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, pri); + Path dir = getMultipartUploadCommitsDirectory(conf, "uuid"); + Assertions.assertThat(dir.isAbsolute()) + .describedAs("non-absolute path") + .isTrue(); + String stagingTempDir = dir.toString().toLowerCase(Locale.ENGLISH); + String self = UserGroupInformation.getCurrentUser() + .getShortUserName().toLowerCase(Locale.ENGLISH); + Assertions.assertThat(stagingTempDir) + .describedAs("Staging committer temp path in cluster") + .contains(pri + "/" + self) + .endsWith("uuid/" + STAGING_UPLOADS); + } + } + + /** + * The partition committer test binding. + */ + private static final class PartitionCommitterTestBinding + extends CommitterTestBinding { + + private PartitionCommitterTestBinding() { + super(PartitionedStagingCommitter.NAME); + } + + /** + * @return true for inconsistent listing + */ + public boolean useInconsistentClient() { + return true; + } + } + + /** + * The magic committer test binding. + * This includes extra result validation. + */ + private static final class MagicCommitterTestBinding + extends CommitterTestBinding { + + private MagicCommitterTestBinding() { + super(MagicS3GuardCommitter.NAME); + } + + /** + * @return we need a consistent store. + */ + public boolean useInconsistentClient() { + return false; + } + + /** + * The result validation here is that there isn't a __magic directory + * any more. + * @param destPath destination of work + * @param successData loaded success data + * @throws Exception failure + */ + @Override + protected void validateResult(final Path destPath, + final SuccessData successData) + throws Exception { + Path magicDir = new Path(destPath, MAGIC); + + // if an FNFE isn't raised on getFileStatus, list out the directory + // tree + S3AFileSystem fs = getRemoteFS(); + // log the contents + lsR(fs, destPath, true); + intercept(FileNotFoundException.class, () -> { + final FileStatus st = fs.getFileStatus(magicDir); + StringBuilder result = new StringBuilder("Found magic dir which should" + + " have been deleted at ").append(st).append('\n'); + result.append("["); + applyLocatedFiles(fs.listFiles(magicDir, true), + (status) -> result.append(status.getPath()).append('\n')); + result.append("["); + return result.toString(); + }); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java deleted file mode 100644 index e403ab49b168e..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.magic; - -import java.io.FileNotFoundException; -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.files.SuccessData; -import org.apache.hadoop.mapred.JobConf; - -import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; -import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; - -/** - * Full integration test for the Magic Committer. - * - * There's no need to disable the committer setting for the filesystem here, - * because the committers are being instantiated in their own processes; - * the settings in {@link AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are - * passed down to these processes. - */ -public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - /** - * Need consistency here. - * @return false - */ - @Override - public boolean useInconsistentClient() { - return false; - } - - @Override - protected String committerName() { - return MagicS3GuardCommitter.NAME; - } - - /** - * Turn on the magic commit support for the FS, else nothing will work. - * @param conf configuration - */ - @Override - protected void applyCustomConfigOptions(JobConf conf) { - conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); - } - - /** - * Check that the magic dir was cleaned up. - * {@inheritDoc} - */ - @Override - protected void customPostExecutionValidation(Path destPath, - SuccessData successData) throws Exception { - Path magicDir = new Path(destPath, MAGIC); - - // if an FNFE isn't raised on getFileStatus, list out the directory - // tree - S3AFileSystem fs = getFileSystem(); - // log the contents - lsR(fs, destPath, true); - intercept(FileNotFoundException.class, () -> { - final FileStatus st = fs.getFileStatus(magicDir); - StringBuilder result = new StringBuilder("Found magic dir which should" - + " have been deleted at ").append(st).append('\n'); - result.append("["); - applyLocatedFiles(fs.listFiles(magicDir, true), - (status) -> result.append(status.getPath()).append('\n')); - result.append("["); - return result.toString(); - }); - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java deleted file mode 100644 index 1e44086b1e125..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; -import org.apache.hadoop.mapred.JobConf; - -/** - * Full integration test for the directory committer. - */ -public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return DirectoryStagingCommitter.NAME; - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java deleted file mode 100644 index 6106974ce74ed..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; -import org.apache.hadoop.mapred.JobConf; - -/** - * Full integration test for the partition committer. - */ -public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return PartitionedStagingCommitter.NAME; - } -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java deleted file mode 100644 index 218c72ac50ea0..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.hamcrest.core.StringContains; -import org.hamcrest.core.StringEndsWith; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.UserGroupInformation; - -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH; -import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; -import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; - -/** - * Full integration test for the staging committer. - */ -public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return StagingCommitter.NAME; - } - - /** - * Verify that staging commit dirs are made absolute under the user's - * home directory, so, in a secure cluster, private. - */ - @Test - public void testStagingDirectory() throws Throwable { - FileSystem hdfs = getDFS(); - Configuration conf = hdfs.getConf(); - conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private"); - Path dir = getMultipartUploadCommitsDirectory(conf, "UUID"); - assertThat("Directory " + dir + " path is wrong", - dir.toString(), - StringEndsWith.endsWith("UUID/" - + STAGING_UPLOADS)); - assertTrue("path unqualified", dir.isAbsolute()); - String self = UserGroupInformation.getCurrentUser().getShortUserName(); - assertThat(dir.toString(), - StringContains.containsString("/user/" + self + "/private")); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java deleted file mode 100644 index 72488132faf76..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.staging.integration; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; -import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; -import org.apache.hadoop.mapred.FileAlreadyExistsException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.test.LambdaTestUtils; - -/** - * This is a test to verify that the committer will fail if the destination - * directory exists, and that this happens in job setup. - */ -public final class ITestStagingCommitMRJobBadDest extends AbstractITCommitMRJob { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - terminateCluster(clusterBinding); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return StagingCommitter.NAME; - } - - /** - * create the destination directory and expect a failure. - * @param conf configuration - */ - @Override - protected void applyCustomConfigOptions(JobConf conf) throws IOException { - // This is the destination in the S3 FS - String outdir = conf.get(FileOutputFormat.OUTDIR); - S3AFileSystem fs = getFileSystem(); - Path outputPath = new Path(outdir); - fs.mkdirs(outputPath); - } - - @Override - public void testMRJob() throws Exception { - LambdaTestUtils.intercept(FileAlreadyExistsException.class, - "Output directory", - super::testMRJob); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java deleted file mode 100644 index cb9cdd0f33455..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.terasort; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; -import org.apache.hadoop.mapred.JobConf; - -/** - * Terasort with the directory committer. - */ -public final class ITestTerasortDirectoryCommitter extends AbstractCommitTerasortIT { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - clusterBinding.terminate(); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - - @Override - protected String committerName() { - return DirectoryStagingCommitter.NAME; - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java deleted file mode 100644 index e1b4eac627a59..0000000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.commit.terasort; - -import java.io.IOException; - -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; -import org.apache.hadoop.mapred.JobConf; - -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; - -/** - * Terasort with the magic committer. - */ -public final class ITestTerasortMagicCommitter - extends AbstractCommitTerasortIT { - - /** - * The static cluster binding with the lifecycle of this test; served - * through instance-level methods for sharing across methods in the - * suite. - */ - @SuppressWarnings("StaticNonFinalField") - private static ClusterBinding clusterBinding; - - @BeforeClass - public static void setupClusters() throws IOException { - clusterBinding = createCluster(new JobConf()); - } - - @AfterClass - public static void teardownClusters() throws IOException { - clusterBinding.terminate(); - } - - @Override - public ClusterBinding getClusterBinding() { - return clusterBinding; - } - @Override - protected String committerName() { - return MagicS3GuardCommitter.NAME; - } - - /** - * Turn on the magic commit support for the FS, else nothing will work. - * @param conf configuration - */ - @Override - protected void applyCustomConfigOptions(JobConf conf) { - conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java similarity index 50% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java index 479b3c80f7c55..dc6c6d19db9ab 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java @@ -19,24 +19,33 @@ package org.apache.hadoop.fs.s3a.commit.terasort; import java.io.File; -import java.nio.charset.Charset; +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; -import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.junit.Assume; import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runner.RunWith; import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.terasort.TeraGen; import org.apache.hadoop.examples.terasort.TeraSort; import org.apache.hadoop.examples.terasort.TeraSortConfigKeys; import org.apache.hadoop.examples.terasort.TeraValidate; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; +import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.StringUtils; @@ -44,45 +53,79 @@ import org.apache.hadoop.util.ToolRunner; import static java.util.Optional.empty; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; /** * Runs Terasort against S3A. * - * This is all done on a shared mini YARN and HDFS clusters, set up before - * any of the tests methods run. - * + * Parameterized by committer name, using a YARN cluster + * shared across all test runs. * The tests run in sequence, so each operation is isolated. - * This also means that the test paths deleted in test + * This also means that the test paths are deleted in test * teardown; shared variables must all be static. + * + * The test is a scale test; for each parameter it takes a few minutes to + * run the full suite. + * Before anyone calls that out as slow: try running the test with the file + * committer. */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) +@RunWith(Parameterized.class) @SuppressWarnings("StaticNonFinalField") -public abstract class AbstractCommitTerasortIT extends - AbstractYarnClusterITest { +public class ITestTerasortOnS3A extends AbstractYarnClusterITest { private static final Logger LOG = - LoggerFactory.getLogger(AbstractCommitTerasortIT.class); + LoggerFactory.getLogger(ITestTerasortOnS3A.class); - // all the durations are optional as they only get filled in when - // a test run successfully completes. Failed tests don't have numbers. - private static Optional terasortDuration = empty(); + public static final int EXPECTED_PARTITION_COUNT = 10; - private static Optional teragenStageDuration = empty(); + public static final int PARTITION_SAMPLE_SIZE = 1000; - private static Optional terasortStageDuration = empty(); + public static final int ROW_COUNT = 1000; - private static Optional teravalidateStageDuration = empty(); + /** + * Duration tracker created in the first of the test cases and closed + * in {@link #test_140_teracomplete()}. + */ + private static Optional terasortDuration = empty(); + /** + * Tracker of which stages are completed and how long they took. + */ + private static Map completedStages = new HashMap<>(); + + /** Name of the committer for this run. */ + private final String committerName; + + /** Base path for all the terasort input and output paths. */ private Path terasortPath; + /** Input (teragen) path. */ private Path sortInput; + /** Path where sorted data goes. */ private Path sortOutput; + /** Path for validated job's output. */ private Path sortValidate; + /** + * Test array for parameterized test runs. + * + * @return the committer binding for this run. + */ + @Parameterized.Parameters(name = "{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {DirectoryStagingCommitter.NAME}, + {MagicS3GuardCommitter.NAME}}); + } + + public ITestTerasortOnS3A(final String committerName) { + this.committerName = committerName; + } + /** * Not using special paths here. * @return false @@ -92,6 +135,11 @@ public boolean useInconsistentClient() { return false; } + @Override + protected String committerName() { + return committerName; + } + @Override public void setup() throws Exception { super.setup(); @@ -100,44 +148,88 @@ public void setup() throws Exception { } /** - * Set up for terasorting by initializing paths. - * The paths used must be unique across parallel runs. + * Set up the job conf with the options for terasort chosen by the scale + * options. + * @param conf configuration */ - private void prepareToTerasort() { + @Override + protected void applyCustomConfigOptions(JobConf conf) { // small sample size for faster runs - Configuration yarnConfig = getYarn().getConfig(); - yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000); - yarnConfig.setBoolean( - TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), - true); - yarnConfig.setBoolean( + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), + getSampleSizeForEachPartition()); + conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), + getExpectedPartitionCount()); + conf.setBoolean( TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false); - terasortPath = new Path("/terasort-" + getClass().getSimpleName()) + } + + private int getExpectedPartitionCount() { + return EXPECTED_PARTITION_COUNT; + } + + private int getSampleSizeForEachPartition() { + return PARTITION_SAMPLE_SIZE; + } + + protected int getRowCount() { + return ROW_COUNT; + } + + /** + * Set up the terasort by initializing paths variables + * The paths used must be unique across parameterized runs but + * common across all test cases in a single parameterized run. + */ + private void prepareToTerasort() { + // small sample size for faster runs + terasortPath = new Path("/terasort-" + committerName) .makeQualified(getFileSystem()); sortInput = new Path(terasortPath, "sortin"); sortOutput = new Path(terasortPath, "sortout"); sortValidate = new Path(terasortPath, "validate"); - if (!terasortDuration.isPresent()) { - terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort")); - } + + } + + /** + * Declare that a stage has completed. + * @param stage stage name/key in the map + * @param d duration. + */ + private static void completedStage(final String stage, + final DurationInfo d) { + completedStages.put(stage, d); } /** - * Execute a single stage in the terasort, - * @param stage Stage name for messages/assertions. + * Declare a stage which is required for this test case. + * @param stage stage name + */ + private static void requireStage(final String stage) { + Assume.assumeTrue( + "Required stage was not completed: " + stage, + completedStages.get(stage) != null); + } + + /** + * Execute a single stage in the terasort. + * Updates the completed stages map with the stage duration -if successful. + * @param stage Stage name for the stages map. * @param jobConf job conf - * @param dest destination directory -the _SUCCESS File will be expected here. + * @param dest destination directory -the _SUCCESS file will be expected here. * @param tool tool to run. * @param args args for the tool. + * @param minimumFileCount minimum number of files to have been created * @throws Exception any failure */ - private Optional executeStage( + private void executeStage( final String stage, final JobConf jobConf, final Path dest, final Tool tool, - final String[] args) throws Exception { + final String[] args, + final int minimumFileCount) throws Exception { int result; DurationInfo d = new DurationInfo(LOG, stage); try { @@ -145,22 +237,30 @@ private Optional executeStage( } finally { d.close(); } + dumpOutputTree(dest); assertEquals(stage + "(" + StringUtils.join(", ", args) + ")" + " failed", 0, result); - validateSuccessFile(dest, committerName(), getFileSystem(), stage); - return Optional.of(d); + validateSuccessFile(dest, committerName(), getFileSystem(), stage, + minimumFileCount); + completedStage(stage, d); } /** * Set up terasort by cleaning out the destination, and note the initial * time before any of the jobs are executed. + * + * This is executed first for each parameterized run. + * It is where all variables which need to be reset for each run need + * to be reset. */ @Test public void test_100_terasort_setup() throws Throwable { describe("Setting up for a terasort"); getFileSystem().delete(terasortPath, true); + completedStages = new HashMap<>(); + terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort")); } @Test @@ -170,42 +270,46 @@ public void test_110_teragen() throws Throwable { JobConf jobConf = newJobConf(); patchConfigurationForCommitter(jobConf); - teragenStageDuration = executeStage("Teragen", + executeStage("teragen", jobConf, sortInput, new TeraGen(), - new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()}); + new String[]{Integer.toString(getRowCount()), sortInput.toString()}, + 1); } + @Test public void test_120_terasort() throws Throwable { describe("Terasort from %s to %s", sortInput, sortOutput); + requireStage("teragen"); getFileSystem().delete(sortOutput, true); loadSuccessFile(getFileSystem(), sortInput, "previous teragen stage"); JobConf jobConf = newJobConf(); patchConfigurationForCommitter(jobConf); - // this job adds some data, so skip it. - jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); - terasortStageDuration = executeStage("TeraSort", + executeStage("terasort", jobConf, sortOutput, new TeraSort(), - new String[]{sortInput.toString(), sortOutput.toString()}); + new String[]{sortInput.toString(), sortOutput.toString()}, + 1); } @Test public void test_130_teravalidate() throws Throwable { describe("TeraValidate from %s to %s", sortOutput, sortValidate); + requireStage("terasort"); getFileSystem().delete(sortValidate, true); loadSuccessFile(getFileSystem(), sortOutput, "previous terasort stage"); JobConf jobConf = newJobConf(); patchConfigurationForCommitter(jobConf); - teravalidateStageDuration = executeStage("TeraValidate", + executeStage("teravalidate", jobConf, sortValidate, new TeraValidate(), - new String[]{sortOutput.toString(), sortValidate.toString()}); + new String[]{sortOutput.toString(), sortValidate.toString()}, + 1); } /** @@ -214,7 +318,10 @@ public void test_130_teravalidate() throws Throwable { */ @Test public void test_140_teracomplete() throws Throwable { - terasortDuration.get().close(); + terasortDuration.ifPresent(d -> { + d.close(); + completedStage("overall", d); + }); final StringBuilder results = new StringBuilder(); results.append("\"Operation\"\t\"Duration\"\n"); @@ -222,19 +329,20 @@ public void test_140_teracomplete() throws Throwable { // this is how you dynamically create a function in a method // for use afterwards. // Works because there's no IOEs being raised in this sequence. - BiConsumer> stage = - (s, od) -> - results.append(String.format("\"%s\"\t\"%s\"\n", - s, - od.map(DurationInfo::getDurationString).orElse(""))); - - stage.accept("Generate", teragenStageDuration); - stage.accept("Terasort", terasortStageDuration); - stage.accept("Validate", teravalidateStageDuration); - stage.accept("Completed", terasortDuration); + Consumer stage = (s) -> { + DurationInfo duration = completedStages.get(s); + results.append(String.format("\"%s\"\t\"%s\"\n", + s, + duration == null ? "" : duration)); + }; + + stage.accept("teragen"); + stage.accept("terasort"); + stage.accept("teravalidate"); + stage.accept("overall"); String text = results.toString(); File resultsFile = File.createTempFile("results", ".csv"); - FileUtils.write(resultsFile, text, Charset.forName("UTF-8")); + FileUtils.write(resultsFile, text, StandardCharsets.UTF_8); LOG.info("Results are in {}\n{}", resultsFile, text); } @@ -252,4 +360,18 @@ public void test_150_teracleanup() throws Throwable { public void test_200_directory_deletion() throws Throwable { getFileSystem().delete(terasortPath, true); } + + /** + * Dump the files under a path -but not fail if the path is not present., + * @param path path to dump + * @throws Exception any failure. + */ + protected void dumpOutputTree(Path path) throws Exception { + LOG.info("Files under output directory {}", path); + try { + lsR(getFileSystem(), path, true); + } catch (FileNotFoundException e) { + LOG.info("Output directory {} not found", path); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index f6162644e2535..6e20fbcda7efd 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -58,7 +58,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN # Log S3Guard classes #log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG # if set to debug, this will log the PUT/DELETE operations on a store -#log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG +log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG # Log Committer classes #log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG