Skip to content

Commit

Permalink
HADOOP-16207 Improved S3A MR tests.
Browse files Browse the repository at this point in the history
Contributed by Steve Loughran.

Replaces the committer-specific terasort and MR test jobs with parameterization
of the (now single tests) and use of file:// over hdfs:// as the cluster FS.

The parameterization ensures that only one of the specific committer tests
run at a time -overloads of the test machines are less likely, and so the
suites can be pulled back into the parallel phase.

There's also more detailed validation of the stage outputs of the terasorting;
if one test fails the rest are all skipped. This and the fact that job
output is stored under target/yarn-${timestamp} means failures should
be more debuggable.

Change-Id: Iefa370ba73c6419496e6e69dd6673d00f37ff095
  • Loading branch information
steveloughran committed Oct 4, 2019
1 parent bca014b commit f44abc3
Show file tree
Hide file tree
Showing 16 changed files with 987 additions and 904 deletions.
4 changes: 0 additions & 4 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
<!-- Terasort MR jobs spawn enough processes that they use up all RAM -->
<exclude>**/ITestTerasort*.java</exclude>
<!-- MR jobs spawn enough processes that they use up all RAM -->
<exclude>**/ITest*CommitMRJob.java</exclude>
<!-- operations across the metastore -->
<exclude>**/ITestS3GuardDDBRootOperations.java</exclude>
</excludes>
Expand Down Expand Up @@ -231,8 +229,6 @@
<!-- the local FS. Running them sequentially guarantees isolation -->
<!-- and that they don't conflict with the other MR jobs for RAM -->
<include>**/ITestTerasort*.java</include>
<!-- MR jobs spawn enough processes that they use up all RAM -->
<include>**/ITest*CommitMRJob.java</include>
<!-- operations across the metastore -->
<include>**/ITestS3AContractRootDir.java</include>
<include>**/ITestS3GuardDDBRootOperations.java</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,8 @@ protected int commitTaskInternal(final TaskAttemptContext context,
// we will try to abort the ones that had already succeeded.
int commitCount = taskOutput.size();
final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>();
LOG.info("{}: uploading from staging directory to S3", getRole());
LOG.info("{}: uploading from staging directory to S3 {}", getRole(),
attemptPath);
LOG.info("{}: Saving pending data information to {}",
getRole(), commitsAttemptPath);
if (taskOutput.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private StagingCommitterConstants() {
/**
* The temporary path for staging data, if not explicitly set.
* By using an unqualified path, this will be qualified to be relative
* to the users' home directory, so protectec from access for others.
* to the users' home directory, so protected from access for others.
*/
public static final String FILESYSTEM_TEMP_PATH = "tmp/staging";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;

import com.amazonaws.services.s3.AmazonS3;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,7 +112,9 @@ protected Configuration createConfiguration() {
MAGIC_COMMITTER_ENABLED,
S3A_COMMITTER_FACTORY_KEY,
FS_S3A_COMMITTER_NAME,
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
FAST_UPLOAD_BUFFER);

conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
Expand Down Expand Up @@ -209,6 +212,7 @@ public static String randomJobId() throws Exception {
*/
@Override
public void teardown() throws Exception {
Thread.currentThread().setName("teardown");
LOG.info("AbstractCommitITest::teardown");
waitForConsistency();
// make sure there are no failures any more
Expand Down Expand Up @@ -359,7 +363,7 @@ private String pathToPrefix(Path path) {
* @throws IOException IO Failure
*/
protected SuccessData verifySuccessMarker(Path dir) throws IOException {
return validateSuccessFile(dir, "", getFileSystem(), "query");
return validateSuccessFile(dir, "", getFileSystem(), "query", 0);
}

/**
Expand Down Expand Up @@ -437,13 +441,15 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId,
* @param committerName name of committer to match, or ""
* @param fs filesystem
* @param origin origin (e.g. "teragen" for messages)
* @param minimumFileCount minimum number of files to have been created
* @return the success data
* @throws IOException IO failure
*/
public static SuccessData validateSuccessFile(final Path outputPath,
final String committerName,
final S3AFileSystem fs,
final String origin) throws IOException {
final String origin,
final int minimumFileCount) throws IOException {
SuccessData successData = loadSuccessFile(fs, outputPath, origin);
String commitDetails = successData.toString();
LOG.info("Committer name " + committerName + "\n{}",
Expand All @@ -456,6 +462,9 @@ public static SuccessData validateSuccessFile(final Path outputPath,
assertEquals("Wrong committer in " + commitDetails,
committerName, successData.getCommitter());
}
Assertions.assertThat(successData.getFilenames())
.describedAs("Files committed")
.hasSizeGreaterThanOrEqualTo(minimumFileCount);
return successData;
}

Expand Down Expand Up @@ -485,8 +494,9 @@ public static SuccessData loadSuccessFile(final S3AFileSystem fs,
status.isFile());
assertTrue("0 byte success file "
+ success + " from " + origin
+ "; a s3guard committer was not used",
+ "; an S3A committer was not used",
status.getLen() > 0);
LOG.info("Loading committer success file {}", success);
return SuccessData.load(fs, success);
}
}

This file was deleted.

Loading

0 comments on commit f44abc3

Please sign in to comment.