diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 014a494ad8603..fdbdf37a26ab3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -837,4 +837,10 @@ private Constants() { public static final String AWS_SERVICE_IDENTIFIER_S3 = "S3"; public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB"; public static final String AWS_SERVICE_IDENTIFIER_STS = "STS"; + + /** + * How long to wait for the thread pool to terminate when cleaning up. + * Value: {@value} seconds. + */ + public static final int THREAD_POOL_SHUTDOWN_DELAY_SECONDS = 30; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 9431884eb18a3..26f16a7b23271 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -154,6 +154,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.SemaphoredDelegatingExecutor; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -3062,6 +3063,12 @@ public void close() throws IOException { transfers.shutdownNow(true); transfers = null; } + HadoopExecutors.shutdown(boundedThreadPool, LOG, + THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + boundedThreadPool = null; + HadoopExecutors.shutdown(unboundedThreadPool, LOG, + THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + unboundedThreadPool = null; S3AUtils.closeAll(LOG, metadataStore, instrumentation); metadataStore = null; instrumentation = null; @@ -4064,7 +4071,7 @@ public List listMultipartUploads(String prefix) */ @Retries.OnceRaw void abortMultipartUpload(String destKey, String uploadId) { - LOG.info("Aborting multipart upload {} to {}", uploadId, destKey); + LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey); getAmazonS3Client().abortMultipartUpload( new AbortMultipartUploadRequest(getBucket(), destKey, @@ -4084,7 +4091,7 @@ void abortMultipartUpload(MultipartUpload upload) { uploadId = upload.getUploadId(); if (LOG.isInfoEnabled()) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - LOG.info("Aborting multipart upload {} to {} initiated by {} on {}", + LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}", uploadId, destKey, upload.getInitiator(), df.format(upload.getInitiated())); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index e9ed972c6a16e..15f73901cb873 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -611,11 +611,14 @@ public void getMetrics(MetricsCollector collector, boolean all) { public void close() { synchronized (metricsSystemLock) { + // it is critical to close each quantile, as they start a scheduled + // task in a shared thread pool. putLatencyQuantile.stop(); throttleRateQuantile.stop(); metricsSystem.unregisterSource(metricsSourceName); int activeSources = --metricsSourceActiveCounter; if (activeSources == 0) { + LOG.debug("Shutting down metrics publisher"); metricsSystem.publishMetricsNow(); metricsSystem.shutdown(); metricsSystem = null; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index a49ab52b1ffd2..e82fbda63dd0c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -18,18 +18,18 @@ package org.apache.hadoop.fs.s3a.commit; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import com.amazonaws.services.s3.model.MultipartUpload; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +49,9 @@ import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS; import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; @@ -66,11 +68,28 @@ * to handle the creation of a committer when the destination is unknown. * * Requiring an output directory simplifies coding and testing. + * + * The original implementation loaded all .pendingset files + * before attempting any commit/abort operations. + * While straightforward and guaranteeing that no changes were made to the + * destination until all files had successfully been loaded -it didn't scale; + * the list grew until it exceeded heap size. + * + * The second iteration builds up an {@link ActiveCommit} class with the + * list of .pendingset files to load and then commit; that can be done + * incrementally and in parallel. + * As a side effect of this change, unless/until changed, + * the commit/abort/revert of all files uploaded by a single task will be + * serialized. This may slow down these operations if there are many files + * created by a few tasks, and the HTTP connection pool in the S3A + * committer was large enough for more all the parallel POST requests. */ public abstract class AbstractS3ACommitter extends PathOutputCommitter { private static final Logger LOG = LoggerFactory.getLogger(AbstractS3ACommitter.class); + public static final String THREAD_PREFIX = "s3a-committer-pool-"; + /** * Thread pool for task execution. */ @@ -349,16 +368,11 @@ public void recoverTask(TaskAttemptContext taskContext) throws IOException { * @throws IOException IO failure */ protected void maybeCreateSuccessMarkerFromCommits(JobContext context, - List pending) throws IOException { + ActiveCommit pending) throws IOException { List filenames = new ArrayList<>(pending.size()); - for (SinglePendingCommit commit : pending) { - String key = commit.getDestinationKey(); - if (!key.startsWith("/")) { - // fix up so that FS.makeQualified() sets up the path OK - key = "/" + key; - } - filenames.add(key); - } + // The list of committed objects in pending is size limited in + // ActiveCommit.uploadCommitted. + filenames.addAll(pending.committedObjects); maybeCreateSuccessMarker(context, filenames); } @@ -390,22 +404,25 @@ protected void maybeCreateSuccessMarker(JobContext context, } /** - * Base job setup deletes the success marker. - * TODO: Do we need this? + * Base job setup (optionally) deletes the success marker and + * always creates the destination directory. + * When objects are committed that dest dir marker will inevitably + * be deleted; creating it now ensures there is something at the end + * while the job is in progress -and if nothing is created, that + * it is still there. * @param context context * @throws IOException IO failure */ -/* @Override public void setupJob(JobContext context) throws IOException { - if (createJobMarker) { - try (DurationInfo d = new DurationInfo("Deleting _SUCCESS marker")) { + try (DurationInfo d = new DurationInfo(LOG, "preparing destination")) { + if (createJobMarker){ commitOperations.deleteSuccessMarker(getOutputPath()); } + getDestFS().mkdirs(getOutputPath()); } } -*/ @Override public void setupTask(TaskAttemptContext context) throws IOException { @@ -430,28 +447,152 @@ protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context) } /** - * Commit a list of pending uploads. + * Commit all the pending uploads. + * Each file listed in the ActiveCommit instance is queued for processing + * in a separate thread; its contents are loaded and then (sequentially) + * committed. + * On a failure or abort of a single file's commit, all its uploads are + * aborted. + * The revert operation lists the files already committed and deletes them. * @param context job context - * @param pending list of pending uploads + * @param pending pending uploads * @throws IOException on any failure */ - protected void commitPendingUploads(JobContext context, - List pending) throws IOException { + protected void commitPendingUploads( + final JobContext context, + final ActiveCommit pending) throws IOException { if (pending.isEmpty()) { LOG.warn("{}: No pending uploads to commit", getRole()); } - LOG.debug("{}: committing the output of {} task(s)", - getRole(), pending.size()); - try(CommitOperations.CommitContext commitContext + try (DurationInfo ignored = new DurationInfo(LOG, + "committing the output of %s task(s)", pending.size()); + CommitOperations.CommitContext commitContext = initiateCommitOperation()) { - Tasks.foreach(pending) + + Tasks.foreach(pending.getSourceFiles()) .stopOnFailure() + .suppressExceptions(false) .executeWith(buildThreadPool(context)) + .abortWith(path -> + loadAndAbort(commitContext, pending, path, true, false)) + .revertWith(path -> + loadAndRevert(commitContext, pending, path)) + .run(path -> + loadAndCommit(commitContext, pending, path)); + } + } + + /** + * Run a precommit check that all files are loadable. + * This check avoids the situation where the inability to read + * a file only surfaces partway through the job commit, so + * results in the destination being tainted. + * @param context job context + * @param pending the pending operations + * @throws IOException any failure + */ + protected void precommitCheckPendingFiles( + final JobContext context, + final ActiveCommit pending) throws IOException { + + FileSystem sourceFS = pending.getSourceFS(); + try (DurationInfo ignored = + new DurationInfo(LOG, "Preflight Load of pending files")) { + + Tasks.foreach(pending.getSourceFiles()) + .stopOnFailure() + .suppressExceptions(false) + .executeWith(buildThreadPool(context)) + .run(path -> PendingSet.load(sourceFS, path)); + } + } + + /** + * Load a pendingset file and commit all of its contents. + * @param commitContext context to commit through + * @param activeCommit commit state + * @param path path to load + * @throws IOException failure + */ + private void loadAndCommit( + final CommitOperations.CommitContext commitContext, + final ActiveCommit activeCommit, + final Path path) throws IOException { + + try (DurationInfo ignored = + new DurationInfo(LOG, false, "Committing %s", path)) { + PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path); + Tasks.foreach(pendingSet.getCommits()) + .stopOnFailure() + .suppressExceptions(false) + .executeWith(singleCommitThreadPool()) .onFailure((commit, exception) -> commitContext.abortSingleCommit(commit)) .abortWith(commitContext::abortSingleCommit) .revertWith(commitContext::revertCommit) - .run(commitContext::commitOrFail); + .run(commit -> { + commitContext.commitOrFail(commit); + activeCommit.uploadCommitted( + commit.getDestinationKey(), commit.getLength()); + }); + } + } + + /** + * Load a pendingset file and revert all of its contents. + * @param commitContext context to commit through + * @param activeCommit commit state + * @param path path to load + * @throws IOException failure + */ + private void loadAndRevert( + final CommitOperations.CommitContext commitContext, + final ActiveCommit activeCommit, + final Path path) throws IOException { + + try (DurationInfo ignored = + new DurationInfo(LOG, false, "Committing %s", path)) { + PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), path); + Tasks.foreach(pendingSet.getCommits()) + .suppressExceptions(true) + .run(commitContext::revertCommit); + } + } + + /** + * Load a pendingset file and abort all of its contents. + * @param commitContext context to commit through + * @param activeCommit commit state + * @param path path to load + * @param deleteRemoteFiles should remote files be deleted? + * @throws IOException failure + */ + private void loadAndAbort( + final CommitOperations.CommitContext commitContext, + final ActiveCommit activeCommit, + final Path path, + final boolean suppressExceptions, + final boolean deleteRemoteFiles) throws IOException { + + try (DurationInfo ignored = + new DurationInfo(LOG, false, "Aborting %s", path)) { + PendingSet pendingSet = PendingSet.load(activeCommit.getSourceFS(), + path); + FileSystem fs = getDestFS(); + Tasks.foreach(pendingSet.getCommits()) + .executeWith(singleCommitThreadPool()) + .suppressExceptions(suppressExceptions) + .run(commit -> { + try { + commitContext.abortSingleCommit(commit); + } catch (FileNotFoundException e) { + // Commit ID was not known; file may exist. + // delete it if instructed to do so. + if (deleteRemoteFiles) { + fs.delete(commit.destinationPath(), false); + } + } + }); } } @@ -465,44 +606,15 @@ protected CommitOperations.CommitContext initiateCommitOperation() return getCommitOperations().initiateCommitOperation(getOutputPath()); } - /** - * Try to read every pendingset file and build a list of them/ - * In the case of a failure to read the file, exceptions are held until all - * reads have been attempted. - * @param context job context - * @param suppressExceptions whether to suppress exceptions. - * @param fs job attempt fs - * @param pendingCommitFiles list of files found in the listing scan - * @return the list of commits - * @throws IOException on a failure when suppressExceptions is false. - */ - protected List loadPendingsetFiles( - JobContext context, - boolean suppressExceptions, - FileSystem fs, - Iterable pendingCommitFiles) throws IOException { - - final List pending = Collections.synchronizedList( - Lists.newArrayList()); - Tasks.foreach(pendingCommitFiles) - .suppressExceptions(suppressExceptions) - .executeWith(buildThreadPool(context)) - .run(pendingCommitFile -> - pending.addAll( - PendingSet.load(fs, pendingCommitFile.getPath()).getCommits()) - ); - return pending; - } - /** * Internal Job commit operation: where the S3 requests are made * (potentially in parallel). * @param context job context - * @param pending pending request + * @param pending pending commits * @throws IOException any failure */ protected void commitJobInternal(JobContext context, - List pending) + ActiveCommit pending) throws IOException { commitPendingUploads(context, pending); @@ -523,6 +635,9 @@ public void abortJob(JobContext context, JobStatus.State state) * This must clean up operations; it is called when a commit fails, as * well as in an {@link #abortJob(JobContext, JobStatus.State)} call. * The base implementation calls {@link #cleanup(JobContext, boolean)} + * so cleans up the filesystems and destroys the thread pool. + * Subclasses must always invoke this superclass method after their + * own operations. * @param context job context * @param suppressExceptions should exceptions be suppressed? * @throws IOException any IO problem raised when suppressExceptions is false. @@ -536,13 +651,15 @@ protected void abortJobInternal(JobContext context, /** * Abort all pending uploads to the destination directory during * job cleanup operations. + * Note: this instantiates the thread pool if required -so + * {@link #destroyThreadPool()} must be called after this. * @param suppressExceptions should exceptions be suppressed * @throws IOException IO problem */ protected void abortPendingUploadsInCleanup( boolean suppressExceptions) throws IOException { Path dest = getOutputPath(); - try (DurationInfo d = + try (DurationInfo ignored = new DurationInfo(LOG, "Aborting all pending commits under %s", dest); CommitOperations.CommitContext commitContext @@ -565,13 +682,18 @@ protected void abortPendingUploadsInCleanup( } /** - * Subclass-specific pre commit actions. + * Subclass-specific pre-Job-commit actions. + * The staging committers all load the pending files to verify that + * they can be loaded. + * The Magic committer does not, because of the overhead of reading files + * from S3 makes it too expensive. * @param context job context * @param pending the pending operations * @throws IOException any failure */ - protected void preCommitJob(JobContext context, - List pending) throws IOException { + @VisibleForTesting + public void preCommitJob(JobContext context, + ActiveCommit pending) throws IOException { } /** @@ -584,7 +706,7 @@ protected void preCommitJob(JobContext context, *

* Commit internal: do the final commit sequence. *

- * The final commit action is to build the {@code __SUCCESS} file entry. + * The final commit action is to build the {@code _SUCCESS} file entry. *

* @param context job context * @throws IOException any failure @@ -594,7 +716,7 @@ public void commitJob(JobContext context) throws IOException { String id = jobIdString(context); try (DurationInfo d = new DurationInfo(LOG, "%s: commitJob(%s)", getRole(), id)) { - List pending + ActiveCommit pending = listPendingUploadsToCommit(context); preCommitJob(context, pending); commitJobInternal(context, pending); @@ -629,12 +751,13 @@ protected void jobCompleted(boolean success) { * @return a list of pending uploads. * @throws IOException Any IO failure */ - protected abstract List listPendingUploadsToCommit( + protected abstract ActiveCommit listPendingUploadsToCommit( JobContext context) throws IOException; /** - * Cleanup the job context, including aborting anything pending. + * Cleanup the job context, including aborting anything pending + * and destroying the thread pool. * @param context job context * @param suppressExceptions should exceptions be suppressed? * @throws IOException any failure if exceptions were not suppressed. @@ -645,6 +768,7 @@ protected void cleanup(JobContext context, "Cleanup job %s", jobIdString(context))) { abortPendingUploadsInCleanup(suppressExceptions); } finally { + destroyThreadPool(); cleanupStagingDirs(); } } @@ -715,7 +839,7 @@ protected String getRole() { /** * Returns an {@link ExecutorService} for parallel tasks. The number of - * threads in the thread-pool is set by s3.multipart.committer.num-threads. + * threads in the thread-pool is set by fs.s3a.committer.threads. * If num-threads is 0, this will return null; * * @param context the JobContext for this commit @@ -730,10 +854,10 @@ protected final synchronized ExecutorService buildThreadPool( DEFAULT_COMMITTER_THREADS); LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads); if (numThreads > 0) { - threadPool = Executors.newFixedThreadPool(numThreads, + threadPool = HadoopExecutors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("s3-committer-pool-%d") + .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d") .build()); } else { return null; @@ -742,6 +866,40 @@ protected final synchronized ExecutorService buildThreadPool( return threadPool; } + /** + * Destroy any thread pools; wait for that to finish, + * but don't overreact if it doesn't finish in time. + */ + protected synchronized void destroyThreadPool() { + if (threadPool != null) { + LOG.debug("Destroying thread pool"); + HadoopExecutors.shutdown(threadPool, LOG, + THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + threadPool = null; + } + } + + /** + * Get the thread pool for executing the single file commit/revert + * within the commit of all uploads of a single task. + * This is currently null; it is here to allow the Tasks class to + * provide the logic for execute/revert. + * Why not use the existing thread pool? Too much fear of deadlocking, + * and tasks are being committed in parallel anyway. + * @return null. always. + */ + protected final synchronized ExecutorService singleCommitThreadPool() { + return null; + } + + /** + * Does this committer have a thread pool? + * @return true if a thread pool exists. + */ + public synchronized boolean hasThreadPool() { + return threadPool != null; + } + /** * Delete the task attempt path without raising any errors. * @param context task context @@ -755,6 +913,8 @@ protected void deleteTaskAttemptPathQuietly(TaskAttemptContext context) { /** * Abort all pending uploads in the list. + * This operation is used by the magic committer as part of its + * rollback after a failure during task commit. * @param context job context * @param pending pending uploads * @param suppressExceptions should exceptions be suppressed @@ -779,4 +939,172 @@ protected void abortPendingUploads(JobContext context, } } + /** + * Abort all pending uploads in the list. + * @param context job context + * @param pending pending uploads + * @param suppressExceptions should exceptions be suppressed? + * @param deleteRemoteFiles should remote files be deleted? + * @throws IOException any exception raised + */ + protected void abortPendingUploads( + final JobContext context, + final ActiveCommit pending, + final boolean suppressExceptions, + final boolean deleteRemoteFiles) throws IOException { + + if (pending.isEmpty()) { + LOG.info("{}: no pending commits to abort", getRole()); + } else { + try (DurationInfo d = new DurationInfo(LOG, + "Aborting %s uploads", pending.size()); + CommitOperations.CommitContext commitContext + = initiateCommitOperation()) { + Tasks.foreach(pending.getSourceFiles()) + .executeWith(buildThreadPool(context)) + .suppressExceptions(suppressExceptions) + .run(path -> + loadAndAbort(commitContext, + pending, + path, + suppressExceptions, + deleteRemoteFiles)); + } + } + } + + /** + * State of the active commit operation. + * + * It contains a list of all pendingset files to load as the source + * of outstanding commits to complete/abort, + * and tracks the files uploaded. + * + * To avoid running out of heap by loading all the source files + * simultaneously: + *
    + *
  1. + * The list of files to load is passed round but + * the contents are only loaded on demand. + *
  2. + *
  3. + * The number of written files tracked for logging in + * the _SUCCESS file are limited to a small amount -enough + * for testing only. + *
  4. + *
+ */ + public static class ActiveCommit { + + private static final AbstractS3ACommitter.ActiveCommit EMPTY + = new ActiveCommit(null, new ArrayList<>()); + + /** All pendingset files to iterate through. */ + private final List sourceFiles; + + /** + * Filesystem for the source files. + */ + private final FileSystem sourceFS; + + /** + * List of committed objects; only built up until the commit limit is + * reached. + */ + private final List committedObjects = new ArrayList<>(); + + /** + * The total number of committed objects. + */ + private int committedObjectCount; + + /** + * Total number of bytes committed. + */ + private long committedBytes; + + /** + * Construct from a source FS and list of files. + * @param sourceFS filesystem containing the list of pending files + * @param sourceFiles .pendingset files to load and commit. + */ + public ActiveCommit( + final FileSystem sourceFS, + final List sourceFiles) { + this.sourceFiles = sourceFiles; + this.sourceFS = sourceFS; + } + + /** + * Create an active commit of the given pending files. + * @param pendingFS source filesystem. + * @param statuses list of file status or subclass to use. + * @return the commit + */ + public static ActiveCommit fromStatusList( + final FileSystem pendingFS, + final List statuses) { + return new ActiveCommit(pendingFS, + statuses.stream() + .map(FileStatus::getPath) + .collect(Collectors.toList())); + } + + /** + * Get the empty entry. + * @return an active commit with no pending files. + */ + public static ActiveCommit empty() { + return EMPTY; + } + + public List getSourceFiles() { + return sourceFiles; + } + + public FileSystem getSourceFS() { + return sourceFS; + } + + /** + * Note that a file was committed. + * Increase the counter of files and total size. + * If there is room in the committedFiles list, the file + * will be added to the list and so end up in the _SUCCESS file. + * @param key key of the committed object. + * @param size size in bytes. + */ + public synchronized void uploadCommitted(String key, long size) { + if (committedObjects.size() < SUCCESS_MARKER_FILE_LIMIT) { + committedObjects.add( + key.startsWith("/") ? key : ("/" + key)); + } + committedObjectCount++; + committedBytes += size; + } + + public synchronized List getCommittedObjects() { + return committedObjects; + } + + public synchronized int getCommittedFileCount() { + return committedObjectCount; + } + + public synchronized long getCommittedBytes() { + return committedBytes; + } + + public int size() { + return sourceFiles.size(); + } + + public boolean isEmpty() { + return sourceFiles.isEmpty(); + } + + public void add(Path path) { + sourceFiles.add(path); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java index b3bcca1b97ae5..6e7a99f50ef93 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java @@ -51,7 +51,7 @@ public PathOutputCommitter createOutputCommitter(Path outputPath, throw new PathCommitException(outputPath, "Filesystem not supported by this committer"); } - LOG.info("Using Commmitter {} for {}", + LOG.info("Using Committer {} for {}", outputCommitter, outputPath); return outputCommitter; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index c9b0337bcb26a..3e28a5d2cf96f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -255,4 +255,10 @@ private CommitConstants() { public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS = "fs.s3a.committer.staging.abort.pending.uploads"; + /** + * The limit to the number of committed objects tracked during + * job commits and saved to the _SUCCESS file. + */ + public static final int SUCCESS_MARKER_FILE_LIMIT = 100; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java index cf84cb32eb8b7..e0273fa11a584 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java @@ -52,6 +52,12 @@ * Applications reading this data should use/check the {@link #name} field * to differentiate from any other JSON-based manifest and to identify * changes in the output format. + * + * Note: to deal with scale issues, the S3A committers do not include any + * more than the number of objects listed in + * {@link org.apache.hadoop.fs.s3a.commit.CommitConstants#SUCCESS_MARKER_FILE_LIMIT}. + * This is intended to suffice for basic integration tests. + * Larger tests should examine the generated files themselves. */ @SuppressWarnings("unused") @InterfaceAudience.Private diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 969286e4d8eb0..99121730644b1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -109,11 +109,11 @@ public void setupJob(JobContext context) throws IOException { * @return a list of pending commits. * @throws IOException Any IO failure */ - protected List listPendingUploadsToCommit( + protected ActiveCommit listPendingUploadsToCommit( JobContext context) throws IOException { FileSystem fs = getDestFS(); - return loadPendingsetFiles(context, false, fs, + return ActiveCommit.fromStatusList(fs, listAndFilter(fs, getJobAttemptPath(context), false, CommitOperations.PENDINGSET_FILTER)); } @@ -174,6 +174,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { } finally { // delete the task attempt so there's no possibility of a second attempt deleteTaskAttemptPathQuietly(context); + destroyThreadPool(); } getCommitOperations().taskCompleted(true); } @@ -181,7 +182,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { /** * Inner routine for committing a task. * The list of pending commits is loaded and then saved to the job attempt - * dir. + * dir in a single pendingset file. * Failure to load any file or save the final file triggers an abort of * all known pending commits. * @param context context @@ -250,6 +251,7 @@ public void abortTask(TaskAttemptContext context) throws IOException { deleteQuietly( attemptPath.getFileSystem(context.getConfiguration()), attemptPath, true); + destroyThreadPool(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java index 32642c9e98fad..1a5a63c940f47 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java @@ -20,7 +20,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathExistsException; import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; -import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -66,7 +64,6 @@ public String getName() { @Override public void setupJob(JobContext context) throws IOException { - super.setupJob(context); Path outputPath = getOutputPath(); FileSystem fs = getDestFS(); ConflictResolution conflictResolution = getConflictResolutionMode( @@ -91,10 +88,10 @@ public void setupJob(JobContext context) throws IOException { } } catch (FileNotFoundException ignored) { // there is no destination path, hence, no conflict. - // make the parent directory, which also triggers a recursive directory - // creation operation - fs.mkdirs(outputPath); } + // make the parent directory, which also triggers a recursive directory + // creation operation + super.setupJob(context); } /** @@ -106,8 +103,12 @@ public void setupJob(JobContext context) throws IOException { * @throws IOException any failure */ @Override - protected void preCommitJob(JobContext context, - List pending) throws IOException { + public void preCommitJob( + final JobContext context, + final ActiveCommit pending) throws IOException { + + // see if the files can be loaded. + super.preCommitJob(context, pending); Path outputPath = getOutputPath(); FileSystem fs = getDestFS(); Configuration fsConf = fs.getConf(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java index b51bcb5f9c338..20aca3cf49ae0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java @@ -20,10 +20,11 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,11 +33,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.Tasks; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.DurationInfo; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.COMMITTER_NAME_PARTITIONED; /** * Partitioned committer. @@ -52,6 +56,9 @@ *
  • REPLACE: delete the destination partition in the job commit * (i.e. after and only if all tasks have succeeded.
  • * + * To determine the paths, the precommit process actually has to read + * in all source files, independently of the final commit phase. + * This is inefficient, though some parallelization here helps. */ public class PartitionedStagingCommitter extends StagingCommitter { @@ -107,6 +114,7 @@ protected int commitTaskInternal(TaskAttemptContext context, } /** + * All * Job-side conflict resolution. * The partition path conflict resolution actions are: *
      @@ -119,13 +127,15 @@ protected int commitTaskInternal(TaskAttemptContext context, * @throws IOException any failure */ @Override - protected void preCommitJob(JobContext context, - List pending) throws IOException { + public void preCommitJob( + final JobContext context, + final ActiveCommit pending) throws IOException { FileSystem fs = getDestFS(); // enforce conflict resolution Configuration fsConf = fs.getConf(); + boolean shouldPrecheckPendingFiles = true; switch (getConflictResolutionMode(context, fsConf)) { case FAIL: // FAIL checking is done on the task side, so this does nothing @@ -134,21 +144,84 @@ protected void preCommitJob(JobContext context, // no check is needed because the output may exist for appending break; case REPLACE: - Set partitions = pending.stream() - .map(SinglePendingCommit::destinationPath) - .map(Path::getParent) - .collect(Collectors.toCollection(Sets::newLinkedHashSet)); - for (Path partitionPath : partitions) { - LOG.debug("{}: removing partition path to be replaced: " + - getRole(), partitionPath); - fs.delete(partitionPath, true); - } + // identify and replace the destination partitions + replacePartitions(context, pending); + // and so there is no need to do another check. + shouldPrecheckPendingFiles = false; break; default: throw new PathCommitException("", getRole() + ": unknown conflict resolution mode: " + getConflictResolutionMode(context, fsConf)); } + if (shouldPrecheckPendingFiles) { + precommitCheckPendingFiles(context, pending); + } + } + + /** + * Identify all partitions which need to be replaced and then delete them. + * The original implementation relied on all the pending commits to be + * loaded so could simply enumerate them. + * This iteration does not do that; it has to reload all the files + * to build the set, after which it initiates the delete process. + * This is done in parallel. + *
      +   *   Set partitions = pending.stream()
      +   *     .map(Path::getParent)
      +   *     .collect(Collectors.toCollection(Sets::newLinkedHashSet));
      +   *   for (Path partitionPath : partitions) {
      +   *     LOG.debug("{}: removing partition path to be replaced: " +
      +   *     getRole(), partitionPath);
      +   *     fs.delete(partitionPath, true);
      +   *   }
      +   * 
      + * + * @param context job context + * @param pending the pending operations + * @throws IOException any failure + */ + private void replacePartitions( + final JobContext context, + final ActiveCommit pending) throws IOException { + + Map partitions = new ConcurrentHashMap<>(); + FileSystem sourceFS = pending.getSourceFS(); + ExecutorService pool = buildThreadPool(context); + try (DurationInfo ignored = + new DurationInfo(LOG, "Replacing partitions")) { + + // the parent directories are saved to a concurrent hash map. + // for a marginal optimisation, the previous parent is tracked, so + // if a task writes many files to the same dir, the synchronized map + // is updated only once. + Tasks.foreach(pending.getSourceFiles()) + .stopOnFailure() + .suppressExceptions(false) + .executeWith(pool) + .run(path -> { + PendingSet pendingSet = PendingSet.load(sourceFS, path); + Path lastParent = null; + for (SinglePendingCommit commit : pendingSet.getCommits()) { + Path parent = commit.destinationPath().getParent(); + if (parent != null && !parent.equals(lastParent)) { + partitions.put(parent, ""); + lastParent = parent; + } + } + }); + } + // now do the deletes + FileSystem fs = getDestFS(); + Tasks.foreach(partitions.keySet()) + .stopOnFailure() + .suppressExceptions(false) + .executeWith(pool) + .run(partitionPath -> { + LOG.debug("{}: removing partition path to be replaced: " + + getRole(), partitionPath); + fs.delete(partitionPath, true); + }); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 833edd4a6b022..6cc9e488523fb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.Queue; @@ -457,6 +456,7 @@ public void setupJob(JobContext context) throws IOException { context.getConfiguration().set( InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid); wrappedCommitter.setupJob(context); + super.setupJob(context); } /** @@ -466,7 +466,7 @@ public void setupJob(JobContext context) throws IOException { * @throws IOException Any IO failure */ @Override - protected List listPendingUploadsToCommit( + protected ActiveCommit listPendingUploadsToCommit( JobContext context) throws IOException { return listPendingUploads(context, false); @@ -480,7 +480,7 @@ protected List listPendingUploadsToCommit( * then this may not match the actual set of pending operations * @throws IOException shouldn't be raised, but retained for the compiler */ - protected List listPendingUploadsToAbort( + protected ActiveCommit listPendingUploadsToAbort( JobContext context) throws IOException { return listPendingUploads(context, true); } @@ -493,13 +493,14 @@ protected List listPendingUploadsToAbort( * then this may not match the actual set of pending operations * @throws IOException Any IO failure which wasn't swallowed. */ - protected List listPendingUploads( + protected ActiveCommit listPendingUploads( JobContext context, boolean suppressExceptions) throws IOException { - try { - Path wrappedJobAttemptPath = wrappedCommitter.getJobAttemptPath(context); + try (DurationInfo ignored = new DurationInfo(LOG, + "Listing pending uploads")) { + Path wrappedJobAttemptPath = getJobAttemptPath(context); final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem( context.getConfiguration()); - return loadPendingsetFiles(context, suppressExceptions, attemptFS, + return ActiveCommit.fromStatusList(attemptFS, listAndFilter(attemptFS, wrappedJobAttemptPath, false, HIDDEN_FILE_FILTER)); @@ -512,7 +513,7 @@ protected List listPendingUploads( maybeIgnore(suppressExceptions, "Listing pending uploads", e); } // reached iff an IOE was caught and swallowed - return new ArrayList<>(0); + return ActiveCommit.empty(); } @Override @@ -558,8 +559,8 @@ protected void abortJobInternal(JobContext context, boolean failed = false; try (DurationInfo d = new DurationInfo(LOG, "%s: aborting job in state %s ", r, jobIdString(context))) { - List pending = listPendingUploadsToAbort(context); - abortPendingUploads(context, pending, suppressExceptions); + ActiveCommit pending = listPendingUploadsToAbort(context); + abortPendingUploads(context, pending, suppressExceptions, true); } catch (FileNotFoundException e) { // nothing to list LOG.debug("No job directory to read uploads from"); @@ -571,6 +572,7 @@ protected void abortJobInternal(JobContext context, } } + /** * Delete the working paths of a job. *
        @@ -646,6 +648,8 @@ public void commitTask(TaskAttemptContext context) throws IOException { getRole(), context.getTaskAttemptID(), e); getCommitOperations().taskCompleted(false); throw e; + } finally { + destroyThreadPool(); } getCommitOperations().taskCompleted(true); } @@ -694,6 +698,7 @@ protected int commitTaskInternal(final TaskAttemptContext context, try { Tasks.foreach(taskOutput) .stopOnFailure() + .suppressExceptions(false) .executeWith(buildThreadPool(context)) .run(stat -> { Path path = stat.getPath(); @@ -779,6 +784,8 @@ public void abortTask(TaskAttemptContext context) throws IOException { LOG.error("{}: exception when aborting task {}", getRole(), context.getTaskAttemptID(), e); throw e; + } finally { + destroyThreadPool(); } } @@ -901,4 +908,20 @@ public static String getConfictModeOption(JobContext context, defVal).toUpperCase(Locale.ENGLISH); } + /** + * Pre-commit actions for a job. + * Loads all the pending files to verify they can be loaded + * and parsed. + * @param context job context + * @param pending pending commits + * @throws IOException any failure + */ + @Override + public void preCommitJob( + final JobContext context, + final ActiveCommit pending) throws IOException { + + // see if the files can be loaded. + precommitCheckPendingFiles(context, pending); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java index 6e81452e26035..9a80a897dae8f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java @@ -19,17 +19,21 @@ package org.apache.hadoop.fs.s3a; import java.io.IOException; +import java.util.Set; +import org.assertj.core.api.Assertions; +import org.junit.AfterClass; import org.junit.Test; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getCurrentThreadNames; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.listInitialThreadsForLifecycleChecks; import static org.apache.hadoop.test.LambdaTestUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.E_FS_CLOSED; /** - * Tests of the S3A FileSystem which is closed; just make sure - * that that basic file Ops fail meaningfully. + * Tests of the S3A FileSystem which is closed. */ public class ITestS3AClosedFS extends AbstractS3ATestBase { @@ -47,6 +51,16 @@ public void teardown() { // no op, as the FS is closed } + private static final Set THREAD_SET + = listInitialThreadsForLifecycleChecks();; + + @AfterClass + public static void checkForThreadLeakage() { + Assertions.assertThat(getCurrentThreadNames()) + .describedAs("The threads at the end of the test run") + .isSubsetOf(THREAD_SET); + } + @Test public void testClosedGetFileStatus() throws Exception { intercept(IOException.class, E_FS_CLOSED, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 1889c05431066..bd38b402aadcc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -63,7 +63,10 @@ import java.text.SimpleDateFormat; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; @@ -598,7 +601,7 @@ public static Configuration prepareTestConfiguration(final Configuration conf) { String tmpDir = conf.get(HADOOP_TMP_DIR, "target/build/test"); if (testUniqueForkId != null) { // patch temp dir for the specific branch - tmpDir = tmpDir + File.pathSeparatorChar + testUniqueForkId; + tmpDir = tmpDir + File.separator + testUniqueForkId; conf.set(HADOOP_TMP_DIR, tmpDir); } conf.set(BUFFER_DIR, tmpDir); @@ -1346,4 +1349,41 @@ public static S3AFileStatus awaitFileStatus(S3AFileSystem fs, STABILIZATION_TIME, PROBE_INTERVAL_MILLIS, () -> fs.getFileStatus(testFilePath)); } + + /** + * This creates a set containing all current threads and some well-known + * thread names whose existence should not fail test runs. + * They are generally static cleaner threads created by various classes + * on instantiation. + * @return a set of threads to use in later assertions. + */ + public static Set listInitialThreadsForLifecycleChecks() { + Set threadSet = getCurrentThreadNames(); + // static filesystem statistics cleaner + threadSet.add( + "org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner"); + // AWS progress callbacks + threadSet.add("java-sdk-progress-listener-callback-thread"); + // another AWS thread + threadSet.add("java-sdk-http-connection-reaper"); + // java.lang.UNIXProcess. maybe if chmod is called? + threadSet.add("process reaper"); + // once a quantile has been scheduled, the mutable quantile thread pool + // is initialized; it has a minimum thread size of 1. + threadSet.add("MutableQuantiles-0"); + // IDE? + threadSet.add("Attach Listener"); + return threadSet; + } + + /** + * Get a set containing the names of all active threads. + * @return the current set of threads. + */ + public static Set getCurrentThreadNames() { + return Thread.getAllStackTraces().keySet() + .stream() + .map(Thread::getName) + .collect(Collectors.toCollection(TreeSet::new)); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 822e3617f1b0c..cacd54d12e90c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -25,7 +25,10 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.assertj.core.api.Assertions; +import org.junit.AfterClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,6 +181,20 @@ public void teardown() throws Exception { super.teardown(); } + /** + * This only looks for leakage of committer thread pools, + * and not any other leaked threads, such as those from S3A FS instances. + */ + @AfterClass + public static void checkForThreadLeakage() { + List committerThreads = getCurrentThreadNames().stream() + .filter(n -> n.startsWith(AbstractS3ACommitter.THREAD_PREFIX)) + .collect(Collectors.toList()); + Assertions.assertThat(committerThreads) + .describedAs("Outstanding committer threads") + .isEmpty(); + } + /** * Add the specified job to the current list of jobs to abort in teardown. * @param jobData job data. @@ -518,6 +535,7 @@ protected void commit(AbstractS3ACommitter committer, describe("\ncommitting job"); committer.commitJob(jContext); describe("commit complete\n"); + verifyCommitterHasNoThreads(committer); } } @@ -574,7 +592,7 @@ public void testRecoveryAndCleanup() throws Exception { // Commit the task. This will promote data and metadata to where // job commits will pick it up on commit or abort. - committer.commitTask(tContext); + commitTask(committer, tContext); assertTaskAttemptPathDoesNotExist(committer, tContext); Configuration conf2 = jobData.job.getConfiguration(); @@ -600,6 +618,7 @@ public void testRecoveryAndCleanup() throws Exception { committer2.abortJob(jContext2, JobStatus.State.KILLED); // now, state of system may still have pending data assertNoMultipartUploadsPending(outDir); + verifyCommitterHasNoThreads(committer2); } protected void assertTaskAttemptPathDoesNotExist( @@ -742,7 +761,7 @@ public void testCommitLifecycle() throws Exception { describe("2. Committing task"); assertTrue("No files to commit were found by " + committer, committer.needsTaskCommit(tContext)); - committer.commitTask(tContext); + commitTask(committer, tContext); // this is only task commit; there MUST be no part- files in the dest dir waitForConsistency(); @@ -758,7 +777,7 @@ public void testCommitLifecycle() throws Exception { describe("3. Committing job"); assertMultipartUploadsPending(outDir); - committer.commitJob(jContext); + commitJob(committer, jContext); // validate output describe("4. Validating content"); @@ -809,7 +828,7 @@ public void testCommitterWithFailure() throws Exception { // now fail job expectSimulatedFailureOnJobCommit(jContext, committer); - committer.commitJob(jContext); + commitJob(committer, jContext); // but the data got there, due to the order of operations. validateContent(outDir, shouldExpectSuccessMarker()); @@ -1011,6 +1030,7 @@ public void testAbortTaskThenJob() throws Exception { committer.abortJob(jobData.jContext, JobStatus.State.FAILED); assertJobAbortCleanedUp(jobData); + verifyCommitterHasNoThreads(committer); } /** @@ -1064,6 +1084,7 @@ public void testFailAbort() throws Exception { // try again; expect abort to be idempotent. committer.abortJob(jContext, JobStatus.State.FAILED); assertNoMultipartUploadsPending(outDir); + verifyCommitterHasNoThreads(committer); } public void assertPart0000DoesNotExist(Path dir) throws Exception { @@ -1223,8 +1244,8 @@ public void testOutputFormatIntegration() throws Throwable { validateTaskAttemptPathAfterWrite(dest); assertTrue("Committer does not have data to commit " + committer, committer.needsTaskCommit(tContext)); - committer.commitTask(tContext); - committer.commitJob(jContext); + commitTask(committer, tContext); + commitJob(committer, jContext); // validate output verifySuccessMarker(outDir); } @@ -1257,6 +1278,7 @@ public void testAMWorkflow() throws Throwable { AbstractS3ACommitter committer2 = (AbstractS3ACommitter) outputFormat.getOutputCommitter(newAttempt); committer2.abortTask(tContext); + verifyCommitterHasNoThreads(committer2); assertNoMultipartUploadsPending(getOutDir()); } @@ -1306,19 +1328,19 @@ public void testParallelJobsToAdjacentPaths() throws Throwable { // at this point, job1 and job2 both have uncommitted tasks // commit tasks in order task 2, task 1. - committer2.commitTask(tContext2); - committer1.commitTask(tContext1); + commitTask(committer2, tContext2); + commitTask(committer1, tContext1); assertMultipartUploadsPending(job1Dest); assertMultipartUploadsPending(job2Dest); // commit jobs in order job 1, job 2 - committer1.commitJob(jContext1); + commitJob(committer1, jContext1); assertNoMultipartUploadsPending(job1Dest); getPart0000(job1Dest); assertMultipartUploadsPending(job2Dest); - committer2.commitJob(jContext2); + commitJob(committer2, jContext2); getPart0000(job2Dest); assertNoMultipartUploadsPending(job2Dest); } finally { @@ -1379,4 +1401,36 @@ protected void validateTaskAttemptWorkingDirectory( TaskAttemptContext context) throws IOException { } + /** + * Commit a task then validate the state of the committer afterwards. + * @param committer committer + * @param tContext task context + * @throws IOException IO failure + */ + protected void commitTask(final AbstractS3ACommitter committer, + final TaskAttemptContext tContext) throws IOException { + committer.commitTask(tContext); + verifyCommitterHasNoThreads(committer); + } + + /** + * Commit a job then validate the state of the committer afterwards. + * @param committer committer + * @param jContext job context + * @throws IOException IO failure + */ + protected void commitJob(final AbstractS3ACommitter committer, + final JobContext jContext) throws IOException { + committer.commitJob(jContext); + verifyCommitterHasNoThreads(committer); + } + + /** + * Verify that the committer does not have a thread pool. + * @param committer committer to validate. + */ + protected void verifyCommitterHasNoThreads(AbstractS3ACommitter committer) { + assertFalse("Committer has an active thread pool", + committer.hasThreadPool()); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java index 5e6fb82362cdc..4ee39f1bfa08e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java @@ -73,7 +73,7 @@ public class TestTasks extends HadoopTestBase { * more checks on single thread than parallel ops. * @return a list of parameter tuples. */ - @Parameterized.Parameters + @Parameterized.Parameters(name = "threads={0}") public static Collection params() { return Arrays.asList(new Object[][]{ {0}, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java index e3e44497d4e6a..1045a2929c097 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java @@ -632,10 +632,10 @@ protected void validateResult(final Path destPath, final FileStatus st = fs.getFileStatus(magicDir); StringBuilder result = new StringBuilder("Found magic dir which should" + " have been deleted at ").append(st).append('\n'); - result.append("["); + result.append(" ["); applyLocatedFiles(fs.listFiles(magicDir, true), - (status) -> result.append(status.getPath()).append('\n')); - result.append("["); + (status) -> result.append(" ").append(status.getPath()).append('\n')); + result.append("]"); return result.toString(); }); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java index fd585d05b2e72..f368bf25c77c7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java @@ -251,6 +251,12 @@ public static void verifyExistenceChecked(FileSystem mockS3, Path path) verify(mockS3).getFileStatus(path); } + /** + * Verify that mkdirs was invoked once. + * @param mockS3 mock + * @param path path to check + * @throws IOException from the mkdirs signature. + */ public static void verifyMkdirsInvoked(FileSystem mockS3, Path path) throws IOException { verify(mockS3).mkdirs(path); @@ -320,12 +326,7 @@ public abstract static class JobCommitterTest @Before public void setupJob() throws Exception { - this.jobConf = new JobConf(); - jobConf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, - UUID.randomUUID().toString()); - jobConf.setBoolean( - CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, - false); + this.jobConf = createJobConf(); this.job = new JobContextImpl(jobConf, JOB_ID); this.results = new StagingTestBase.ClientResults(); @@ -338,6 +339,16 @@ public void setupJob() throws Exception { wrapperFS.setAmazonS3Client(mockClient); } + protected JobConf createJobConf() { + JobConf conf = new JobConf(); + conf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, + UUID.randomUUID().toString()); + conf.setBoolean( + CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + false); + return conf; + } + public S3AFileSystem getMockS3A() { return mockFS; } @@ -461,6 +472,11 @@ public List getDeletes() { return deletes; } + public List getDeletePaths() { + return deletes.stream().map(DeleteObjectRequest::getKey).collect( + Collectors.toList()); + } + public void resetDeletes() { deletes.clear(); } @@ -478,6 +494,14 @@ public void resetRequests() { requests.clear(); } + public void addUpload(String id, String key) { + activeUploads.put(id, key); + } + + public void addUploads(Map uploadMap) { + activeUploads.putAll(uploadMap); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( @@ -648,8 +672,9 @@ public static AmazonS3 newMockS3Client(final ClientResults results, } CompleteMultipartUploadRequest req = getArgumentAt(invocation, 0, CompleteMultipartUploadRequest.class); + String uploadId = req.getUploadId(); + removeUpload(results, uploadId); results.commits.add(req); - results.activeUploads.remove(req.getUploadId()); return newResult(req); } @@ -669,14 +694,7 @@ public static AmazonS3 newMockS3Client(final ClientResults results, AbortMultipartUploadRequest req = getArgumentAt(invocation, 0, AbortMultipartUploadRequest.class); String id = req.getUploadId(); - String p = results.activeUploads.remove(id); - if (p == null) { - // upload doesn't exist - AmazonS3Exception ex = new AmazonS3Exception( - "not found " + id); - ex.setStatusCode(404); - throw ex; - } + removeUpload(results, id); results.aborts.add(req); return null; } @@ -729,6 +747,24 @@ public static AmazonS3 newMockS3Client(final ClientResults results, return mockClient; } + /** + * Remove an upload from the upload map. + * @param results result set + * @param uploadId The upload ID to remove + * @throws AmazonS3Exception with error code 404 if the id is unknown. + */ + protected static void removeUpload(final ClientResults results, + final String uploadId) { + String removed = results.activeUploads.remove(uploadId); + if (removed == null) { + // upload doesn't exist + AmazonS3Exception ex = new AmazonS3Exception( + "not found " + uploadId); + ex.setStatusCode(404); + throw ex; + } + } + private static CompleteMultipartUploadResult newResult( CompleteMultipartUploadRequest req) { return new CompleteMultipartUploadResult(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java new file mode 100644 index 0000000000000..6d93e5fa788ff --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestDirectoryCommitterScale.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.amazonaws.services.s3.model.PartETag; +import com.google.common.collect.Maps; +import org.assertj.core.api.Assertions; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.DurationInfo; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.PENDINGSET_SUFFIX; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.BUCKET; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPath; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.outputPathUri; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.pathIsDirectory; + +/** + * Scale test of the directory committer: if there are many, many files + * does job commit overload. + * This is a mock test as to avoid the overhead of going near S3; + * it does use a lot of local filesystem files though so as to + * simulate real large scale deployment better. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class TestDirectoryCommitterScale + extends StagingTestBase.JobCommitterTest { + + private static final Logger LOG = + LoggerFactory.getLogger(TestDirectoryCommitterScale.class); + + public static final int TASKS = 500; + + public static final int FILES_PER_TASK = 10; + + public static final int TOTAL_COMMIT_COUNT = FILES_PER_TASK * TASKS; + + public static final int BLOCKS_PER_TASK = 1000; + + private static File stagingDir; + + private static LocalFileSystem localFS; + + private static Path stagingPath; + + private static Map activeUploads = + Maps.newHashMap(); + + @Override + DirectoryCommitterForTesting newJobCommitter() throws Exception { + return new DirectoryCommitterForTesting(outputPath, + createTaskAttemptForJob()); + } + + @BeforeClass + public static void setupStaging() throws Exception { + stagingDir = File.createTempFile("staging", ""); + stagingDir.delete(); + stagingDir.mkdir(); + stagingPath = new Path(stagingDir.toURI()); + localFS = FileSystem.getLocal(new Configuration()); + } + + + @AfterClass + public static void teardownStaging() throws IOException { + try { + if (stagingDir != null) { + FileUtils.deleteDirectory(stagingDir); + } + } catch (IOException ignored) { + + } + } + + @Override + protected JobConf createJobConf() { + JobConf conf = super.createJobConf(); + conf.setInt( + CommitConstants.FS_S3A_COMMITTER_THREADS, + 100); + return conf; + } + + protected Configuration getJobConf() { + return getJob().getConfiguration(); + } + + @Test + public void test_010_createTaskFiles() throws Exception { + try (DurationInfo ignored = + new DurationInfo(LOG, "Creating %d test files in %s", + TOTAL_COMMIT_COUNT, stagingDir)) { + createTasks(); + } + } + + /** + * Create the mock uploads of the tasks and save + * to .pendingset files. + * @throws IOException failure. + */ + private void createTasks() throws IOException { + // create a stub multipart commit containing multiple files. + + // step1: a list of tags. + // this is the md5sum of hadoop 3.2.1.tar + String tag = "9062dcf18ffaee254821303bbd11c72b"; + List etags = IntStream.rangeClosed(1, BLOCKS_PER_TASK + 1) + .mapToObj(i -> new PartETag(i, tag)) + .collect(Collectors.toList()); + SinglePendingCommit base = new SinglePendingCommit(); + base.setBucket(BUCKET); + base.setJobId("0000"); + base.setLength(914688000); + base.bindCommitData(etags); + // these get overwritten + base.setDestinationKey("/base"); + base.setUploadId("uploadId"); + base.setUri(outputPathUri.toString()); + + SinglePendingCommit[] singles = new SinglePendingCommit[FILES_PER_TASK]; + byte[] bytes = base.toBytes(); + for (int i = 0; i < FILES_PER_TASK; i++) { + singles[i] = SinglePendingCommit.serializer().fromBytes(bytes); + } + // now create the files, using this as the template + + int uploadCount = 0; + for (int task = 0; task < TASKS; task++) { + PendingSet pending = new PendingSet(); + String taskId = String.format("task-%04d", task); + + for (int i = 0; i < FILES_PER_TASK; i++) { + String uploadId = String.format("%05d-task-%04d-file-%02d", + uploadCount, task, i); + // longer paths to take up more space. + Path p = new Path(outputPath, + "datasets/examples/testdirectoryscale/" + + "year=2019/month=09/day=26/hour=20/second=53" + + uploadId); + URI dest = p.toUri(); + SinglePendingCommit commit = singles[i]; + String key = dest.getPath(); + commit.setDestinationKey(key); + commit.setUri(dest.toString()); + commit.touch(Instant.now().toEpochMilli()); + commit.setTaskId(taskId); + commit.setUploadId(uploadId); + pending.add(commit); + activeUploads.put(uploadId, key); + } + Path path = new Path(stagingPath, + String.format("task-%04d." + PENDINGSET_SUFFIX, task)); + pending.save(localFS, path, true); + } + } + + + @Test + public void test_020_loadFilesToAttempt() throws Exception { + DirectoryStagingCommitter committer = newJobCommitter(); + + Configuration jobConf = getJobConf(); + jobConf.set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); + FileSystem mockS3 = getMockS3A(); + pathIsDirectory(mockS3, outputPath); + try (DurationInfo ignored = + new DurationInfo(LOG, "listing pending uploads")) { + AbstractS3ACommitter.ActiveCommit activeCommit + = committer.listPendingUploadsToCommit(getJob()); + Assertions.assertThat(activeCommit.getSourceFiles()) + .describedAs("Source files of %s", activeCommit) + .hasSize(TASKS); + } + } + + @Test + public void test_030_commitFiles() throws Exception { + DirectoryCommitterForTesting committer = newJobCommitter(); + StagingTestBase.ClientResults results = getMockResults(); + results.addUploads(activeUploads); + Configuration jobConf = getJobConf(); + jobConf.set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); + S3AFileSystem mockS3 = getMockS3A(); + pathIsDirectory(mockS3, outputPath); + + try (DurationInfo ignored = + new DurationInfo(LOG, "Committing Job")) { + committer.commitJob(getJob()); + } + + Assertions.assertThat(results.getCommits()) + .describedAs("commit count") + .hasSize(TOTAL_COMMIT_COUNT); + AbstractS3ACommitter.ActiveCommit activeCommit = committer.activeCommit; + Assertions.assertThat(activeCommit.getCommittedObjects()) + .describedAs("committed objects in active commit") + .hasSize(Math.min(TOTAL_COMMIT_COUNT, + CommitConstants.SUCCESS_MARKER_FILE_LIMIT)); + Assertions.assertThat(activeCommit.getCommittedFileCount()) + .describedAs("committed objects in active commit") + .isEqualTo(TOTAL_COMMIT_COUNT); + + } + + @Test + public void test_040_abortFiles() throws Exception { + DirectoryStagingCommitter committer = newJobCommitter(); + getMockResults().addUploads(activeUploads); + Configuration jobConf = getJobConf(); + jobConf.set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); + FileSystem mockS3 = getMockS3A(); + pathIsDirectory(mockS3, outputPath); + + committer.abortJob(getJob(), JobStatus.State.FAILED); + } + + + /** + * Committer overridden for better testing. + */ + private static final class DirectoryCommitterForTesting extends + DirectoryStagingCommitter { + private ActiveCommit activeCommit; + + private DirectoryCommitterForTesting(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + @Override + protected void initOutput(Path out) throws IOException { + super.initOutput(out); + setOutputPath(out); + } + + /** + * Returns the mock FS without checking FS type. + * @param out output path + * @param config job/task config + * @return a filesystem. + * @throws IOException failure to get the FS + */ + @Override + protected FileSystem getDestinationFS(Path out, Configuration config) + throws IOException { + return out.getFileSystem(config); + } + + @Override + public Path getJobAttemptPath(JobContext context) { + return stagingPath; + } + + @Override + protected void commitJobInternal(final JobContext context, + final ActiveCommit pending) + throws IOException { + activeCommit = pending; + super.commitJobInternal(context, pending); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java index 8939296719ab6..15ea75476a93d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java @@ -35,6 +35,7 @@ import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.google.common.collect.Sets; +import org.assertj.core.api.Assertions; import org.hamcrest.core.StringStartsWith; import org.junit.After; import org.junit.Before; @@ -535,33 +536,31 @@ public void testJobCommitFailure() throws Exception { return jobCommitter.toString(); }); - assertEquals("Should have succeeded to commit some uploads", - 5, results.getCommits().size()); - - assertEquals("Should have deleted the files that succeeded", - 5, results.getDeletes().size()); Set commits = results.getCommits() .stream() - .map((commit) -> commit.getBucketName() + commit.getKey()) + .map(commit -> + "s3a://" + commit.getBucketName() + "/" + commit.getKey()) .collect(Collectors.toSet()); Set deletes = results.getDeletes() .stream() - .map((delete) -> delete.getBucketName() + delete.getKey()) + .map(delete -> + "s3a://" + delete.getBucketName() + "/" + delete.getKey()) .collect(Collectors.toSet()); - assertEquals("Committed and deleted objects should match", - commits, deletes); - - assertEquals("Mismatch in aborted upload count", - 7, results.getAborts().size()); + Assertions.assertThat(commits) + .describedAs("Committed objects compared to deleted paths %s", results) + .containsExactlyInAnyOrderElementsOf(deletes); + Assertions.assertThat(results.getAborts()) + .describedAs("aborted count in %s", results) + .hasSize(7); Set uploadIds = getCommittedIds(results.getCommits()); uploadIds.addAll(getAbortedIds(results.getAborts())); - - assertEquals("Should have committed/deleted or aborted all uploads", - uploads, uploadIds); + Assertions.assertThat(uploadIds) + .describedAs("Combined commit/delete and aborted upload IDs") + .containsExactlyInAnyOrderElementsOf(uploads); assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java index 994ecef3a83ec..98075b827a7c2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; @@ -84,17 +85,18 @@ protected void verifyFailureConflictOutcome() throws Exception { () -> committer.setupJob(getJob())); // but there are no checks in job commit (HADOOP-15469) - committer.commitJob(getJob()); + // this is done by calling the preCommit method directly, + committer.preCommitJob(getJob(), AbstractS3ACommitter.ActiveCommit.empty()); - reset((FileSystem) getMockS3A()); + reset(getMockS3A()); pathDoesNotExist(getMockS3A(), outputPath); committer.setupJob(getJob()); verifyExistenceChecked(getMockS3A(), outputPath); verifyMkdirsInvoked(getMockS3A(), outputPath); - verifyNoMoreInteractions((FileSystem) getMockS3A()); + verifyNoMoreInteractions(getMockS3A()); - reset((FileSystem) getMockS3A()); + reset(getMockS3A()); pathDoesNotExist(getMockS3A(), outputPath); committer.commitJob(getJob()); verifyCompletion(getMockS3A()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java index e7410e33fba94..872097ff6f032 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java @@ -18,20 +18,21 @@ package org.apache.hadoop.fs.s3a.commit.staging; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.UUID; -import com.google.common.collect.Lists; import org.junit.Test; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.MockS3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -59,37 +60,59 @@ PartitionedStagingCommitter newJobCommitter() throws IOException { /** * Subclass of the Partitioned Staging committer used in the test cases. */ - private static final class PartitionedStagingCommitterForTesting + private final class PartitionedStagingCommitterForTesting extends PartitionedCommitterForTesting { - private boolean aborted = false; + private boolean aborted; private PartitionedStagingCommitterForTesting(TaskAttemptContext context) throws IOException { super(StagingTestBase.outputPath, context); } + /** + * Generate pending uploads to commit. + * This is quite complex as the mock pending uploads need to be saved + * to a filesystem for the next stage of the commit process. + * To simulate multiple commit, more than one .pendingset file is created, + * @param context job context + * @return an active commit containing a list of paths to valid pending set + * file. + * @throws IOException IO failure + */ @Override - protected List listPendingUploadsToCommit( + protected ActiveCommit listPendingUploadsToCommit( JobContext context) throws IOException { - List pending = Lists.newArrayList(); + LocalFileSystem localFS = FileSystem.getLocal(getConf()); + ActiveCommit activeCommit = new ActiveCommit(localFS, + new ArrayList<>(0)); + // need to create some pending entries. for (String dateint : Arrays.asList("20161115", "20161116")) { + PendingSet pendingSet = new PendingSet(); for (String hour : Arrays.asList("13", "14")) { + String uploadId = UUID.randomUUID().toString(); String key = OUTPUT_PREFIX + "/dateint=" + dateint + "/hour=" + hour + - "/" + UUID.randomUUID().toString() + ".parquet"; + "/" + uploadId + ".parquet"; SinglePendingCommit commit = new SinglePendingCommit(); commit.setBucket(BUCKET); commit.setDestinationKey(key); commit.setUri("s3a://" + BUCKET + "/" + key); - commit.setUploadId(UUID.randomUUID().toString()); + commit.setUploadId(uploadId); ArrayList etags = new ArrayList<>(); etags.add("tag1"); commit.setEtags(etags); - pending.add(commit); + pendingSet.add(commit); + // register the upload so commit operations are not rejected + getMockResults().addUpload(uploadId, key); } + File file = File.createTempFile("staging", ".pendingset"); + file.deleteOnExit(); + Path path = new Path(file.toURI()); + pendingSet.save(localFS, path, true); + activeCommit.add(path); } - return pending; + return activeCommit; } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java index 8116b79eb7fe7..4b568263ba71b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java @@ -27,6 +27,7 @@ import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.assertj.core.api.Assertions; import org.junit.BeforeClass; import org.junit.Test; @@ -114,18 +115,7 @@ public void testFail() throws Exception { reset(mockS3); committer.commitTask(getTAC()); - Set files = Sets.newHashSet(); - for (InitiateMultipartUploadRequest request : - getMockResults().getRequests().values()) { - assertEquals(BUCKET, request.getBucketName()); - files.add(request.getKey()); - } - assertEquals("Should have the right number of uploads", - relativeFiles.size(), files.size()); - - Set expected = buildExpectedList(committer); - - assertEquals("Should have correct paths", expected, files); + verifyFilesCreated(committer); } @Test @@ -146,18 +136,29 @@ public void testAppend() throws Exception { pathExists(mockS3, new Path(outputPath, relativeFiles.get(2)).getParent()); committer.commitTask(getTAC()); + verifyFilesCreated(committer); + } + + /** + * Verify that the files created matches that expected. + * @param committer committer + */ + protected void verifyFilesCreated( + final PartitionedStagingCommitter committer) { Set files = Sets.newHashSet(); for (InitiateMultipartUploadRequest request : getMockResults().getRequests().values()) { assertEquals(BUCKET, request.getBucketName()); files.add(request.getKey()); } - assertEquals("Should have the right number of uploads", - relativeFiles.size(), files.size()); + Assertions.assertThat(files) + .describedAs("Should have the right number of uploads") + .hasSize(relativeFiles.size()); Set expected = buildExpectedList(committer); - - assertEquals("Should have correct paths", expected, files); + Assertions.assertThat(files) + .describedAs("Should have correct paths") + .containsExactlyInAnyOrderElementsOf(expected); } @Test @@ -180,18 +181,7 @@ public void testReplace() throws Exception { pathExists(mockS3, new Path(outputPath, relativeFiles.get(3)).getParent()); committer.commitTask(getTAC()); - Set files = Sets.newHashSet(); - for (InitiateMultipartUploadRequest request : - getMockResults().getRequests().values()) { - assertEquals(BUCKET, request.getBucketName()); - files.add(request.getKey()); - } - assertEquals("Should have the right number of uploads", - relativeFiles.size(), files.size()); - - Set expected = buildExpectedList(committer); - - assertEquals("Should have correct paths", expected, files); + verifyFilesCreated(committer); } public Set buildExpectedList(StagingCommitter committer) {