diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred index 3e52556a08f0b..e3f1f924edddd 100755 --- a/hadoop-mapreduce-project/bin/mapred +++ b/hadoop-mapreduce-project/bin/mapred @@ -37,6 +37,7 @@ function hadoop_usage hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload" hadoop_add_subcommand "version" client "print the version" hadoop_add_subcommand "minicluster" client "CLI MiniCluster" + hadoop_add_subcommand "successfile" client "Print a _SUCCESS manifest from the manifest and S3A committers" hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true } @@ -102,6 +103,9 @@ function mapredcmd_case version) HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo ;; + successfile) + HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter + ;; minicluster) hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice"'/*' hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/test"'/*' diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java index 8a1ae0fcc9810..54d3799cb3cf4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConfig.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -51,6 +54,9 @@ */ public final class ManifestCommitterConfig implements IOStatisticsSource { + private static final Logger LOG = LoggerFactory.getLogger( + ManifestCommitterConfig.class); + /** * Final destination of work. * This is unqualified. @@ -153,6 +159,12 @@ public final class ManifestCommitterConfig implements IOStatisticsSource { */ private final int writerQueueCapacity; + /** + * How many attempts to save a task manifest by save and rename + * before giving up. + */ + private final int saveManifestAttempts; + /** * Constructor. * @param outputPath destination path of the job. @@ -198,6 +210,14 @@ public final class ManifestCommitterConfig implements IOStatisticsSource { this.writerQueueCapacity = conf.getInt( OPT_WRITER_QUEUE_CAPACITY, DEFAULT_WRITER_QUEUE_CAPACITY); + int attempts = conf.getInt(OPT_MANIFEST_SAVE_ATTEMPTS, + OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT); + if (attempts < 1) { + LOG.warn("Invalid value for {}: {}", + OPT_MANIFEST_SAVE_ATTEMPTS, attempts); + attempts = 1; + } + this.saveManifestAttempts = attempts; // if constructed with a task attempt, build the task ID and path. if (context instanceof TaskAttemptContext) { @@ -332,6 +352,10 @@ public String getName() { return name; } + public int getSaveManifestAttempts() { + return saveManifestAttempts; + } + /** * Get writer queue capacity. * @return the queue capacity diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java index dc5ccb2e1df3a..8f359e45000f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java @@ -132,7 +132,9 @@ public final class ManifestCommitterConstants { * Should dir cleanup do parallel deletion of task attempt dirs * before trying to delete the toplevel dirs. * For GCS this may deliver speedup, while on ABFS it may avoid - * timeouts in certain deployments. + * timeouts in certain deployments, something + * {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST} + * can alleviate. * Value: {@value}. */ public static final String OPT_CLEANUP_PARALLEL_DELETE = @@ -143,6 +145,20 @@ public final class ManifestCommitterConstants { */ public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true; + /** + * Should parallel cleanup try to delete the base first? + * Best for azure as it skips the task attempt deletions unless + * the toplevel delete fails. + * Value: {@value}. + */ + public static final String OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST = + OPT_PREFIX + "cleanup.parallel.delete.base.first"; + + /** + * Default value of option {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}: {@value}. + */ + public static final boolean OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT = false; + /** * Threads to use for IO. */ @@ -260,6 +276,19 @@ public final class ManifestCommitterConstants { */ public static final int DEFAULT_WRITER_QUEUE_CAPACITY = OPT_IO_PROCESSORS_DEFAULT; + /** + * How many attempts to save a task manifest by save and rename + * before giving up. + * Value: {@value}. + */ + public static final String OPT_MANIFEST_SAVE_ATTEMPTS = + OPT_PREFIX + "manifest.save.attempts"; + + /** + * Default value of {@link #OPT_MANIFEST_SAVE_ATTEMPTS}: {@value}. + */ + public static final int OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT = 5; + private ManifestCommitterConstants() { } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterStatisticNames.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterStatisticNames.java index 243fd6087328d..2326259a08966 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterStatisticNames.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterStatisticNames.java @@ -187,6 +187,12 @@ public final class ManifestCommitterStatisticNames { public static final String OP_SAVE_TASK_MANIFEST = "task_stage_save_task_manifest"; + /** + * Save a summary file: {@value}. + */ + public static final String OP_SAVE_SUMMARY_FILE = + "task_stage_save_summary_file"; + /** * Task abort: {@value}. */ @@ -259,6 +265,9 @@ public final class ManifestCommitterStatisticNames { public static final String OP_STAGE_TASK_SCAN_DIRECTORY = "task_stage_scan_directory"; + /** Delete a directory: {@value}. */ + public static final String OP_DELETE_DIR = "op_delete_dir"; + private ManifestCommitterStatisticNames() { } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestPrinter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestPrinter.java index c95ec7b11be05..f12f80c641268 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestPrinter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestPrinter.java @@ -36,7 +36,7 @@ */ public class ManifestPrinter extends Configured implements Tool { - private static final String USAGE = "ManifestPrinter "; + private static final String USAGE = "successfile "; /** * Output for printing. @@ -88,7 +88,7 @@ public ManifestSuccessData loadAndPrintManifest(FileSystem fs, Path path) return success; } - private void printManifest(ManifestSuccessData success) { + public void printManifest(ManifestSuccessData success) { field("succeeded", success.getSuccess()); field("created", success.getDate()); field("committer", success.getCommitter()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java index 15f9899f3551e..c90ea39d0c7fe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java @@ -73,6 +73,7 @@ private InternalConstants() { OP_CREATE_ONE_DIRECTORY, OP_DIRECTORY_SCAN, OP_DELETE, + OP_DELETE_DIR, OP_DELETE_FILE_UNDER_DESTINATION, OP_GET_FILE_STATUS, OP_IS_DIRECTORY, @@ -85,6 +86,7 @@ private InternalConstants() { OP_MSYNC, OP_PREPARE_DIR_ANCESTORS, OP_RENAME_FILE, + OP_SAVE_SUMMARY_FILE, OP_SAVE_TASK_MANIFEST, OBJECT_LIST_REQUEST, @@ -127,4 +129,11 @@ private InternalConstants() { /** Schemas of filesystems we know to not work with this committer. */ public static final Set UNSUPPORTED_FS_SCHEMAS = ImmutableSet.of("s3a", "wasb"); + + /** + * Interval in milliseconds between save retries. + * Value {@value} milliseconds. + */ + public static final int SAVE_SLEEP_INTERVAL = 500; + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java index b81fa9dd32add..03e3ce0f0ade0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java @@ -97,6 +97,35 @@ public boolean isFile(Path path) throws IOException { public abstract boolean delete(Path path, boolean recursive) throws IOException; + /** + * Forward to {@code delete(Path, true)} + * unless overridden. + *

+ * If it returns without an error: there is no file at + * the end of the path. + * @param path path + * @return outcome + * @throws IOException failure. + */ + public boolean deleteFile(Path path) + throws IOException { + return delete(path, false); + } + + /** + * Call {@code FileSystem#delete(Path, true)} or equivalent. + *

+ * If it returns without an error: there is nothing at + * the end of the path. + * @param path path + * @return outcome + * @throws IOException failure. + */ + public boolean deleteRecursive(Path path) + throws IOException { + return delete(path, true); + } + /** * Forward to {@link FileSystem#mkdirs(Path)}. * Usual "what does 'false' mean" ambiguity. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java index 9a0b972bc735b..ab3a6398de114 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java @@ -108,6 +108,11 @@ public boolean delete(Path path, boolean recursive) return fileSystem.delete(path, recursive); } + @Override + public boolean deleteRecursive(final Path path) throws IOException { + return fileSystem.delete(path, true); + } + @Override public boolean mkdirs(Path path) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java index c2b44c2a924fd..0ab7c08dc2386 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_ABORT_TASK; /** @@ -55,7 +56,11 @@ protected Path executeStage(final Boolean suppressExceptions) final Path dir = getTaskAttemptDir(); if (dir != null) { LOG.info("{}: Deleting task attempt directory {}", getName(), dir); - deleteDir(dir, suppressExceptions); + if (suppressExceptions) { + deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR); + } else { + deleteRecursive(dir, OP_DELETE_DIR); + } } return dir; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java index 161153c82faac..76bc0d7cd2799 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java @@ -21,7 +21,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.time.Duration; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,7 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; @@ -53,14 +56,18 @@ import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_DIR; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.SAVE_SLEEP_INTERVAL; /** * A Stage in Task/Job Commit. @@ -366,6 +373,7 @@ public final IOStatisticsStore getIOStatistics() { */ protected final void progress() { if (stageConfig.getProgressable() != null) { + LOG.trace("{}: Progressing", getName()); stageConfig.getProgressable().progress(); } } @@ -424,7 +432,7 @@ protected final boolean isFile( * @return status or null * @throws IOException IO Failure. */ - protected final boolean delete( + public final boolean delete( final Path path, final boolean recursive) throws IOException { @@ -440,14 +448,34 @@ protected final boolean delete( * @return status or null * @throws IOException IO Failure. */ - protected Boolean delete( + public Boolean delete( final Path path, final boolean recursive, final String statistic) throws IOException { - return trackDuration(getIOStatistics(), statistic, () -> { - return operations.delete(path, recursive); - }); + if (recursive) { + return deleteRecursive(path, statistic); + } else { + return deleteFile(path, statistic); + } + } + + /** + * Delete a file at a path. + *

+ * If it returns without an error: there is nothing at + * the end of the path. + * @param path path + * @param statistic statistic to update + * @return outcome. + * @throws IOException IO Failure. + */ + public boolean deleteFile( + final Path path, + final String statistic) + throws IOException { + return trackDuration(getIOStatistics(), statistic, () -> + operations.deleteFile(path)); } /** @@ -457,7 +485,7 @@ protected Boolean delete( * @return true if the directory was created/exists. * @throws IOException IO Failure. */ - protected final boolean mkdirs( + public final boolean mkdirs( final Path path, final boolean escalateFailure) throws IOException { @@ -494,7 +522,7 @@ protected final RemoteIterator listStatusIterator( * @return the manifest. * @throws IOException IO Failure. */ - protected final TaskManifest loadManifest( + public final TaskManifest loadManifest( final FileStatus status) throws IOException { LOG.trace("{}: loadManifest('{}')", getName(), status); @@ -582,19 +610,123 @@ protected final Path directoryMustExist( * Save a task manifest or summary. This will be done by * writing to a temp path and then renaming. * If the destination path exists: Delete it. + * This will retry so that a rename failure from abfs load or IO errors + * will not fail the task. * @param manifestData the manifest/success file * @param tempPath temp path for the initial save * @param finalPath final path for rename. - * @throws IOException failure to load/parse + * @return the manifest saved. + * @throws IOException failure to rename after retries. */ @SuppressWarnings("unchecked") - protected final void save(T manifestData, + protected final T save( + final T manifestData, final Path tempPath, final Path finalPath) throws IOException { - LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, finalPath); - trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () -> - operations.save(manifestData, tempPath, true)); - renameFile(tempPath, finalPath); + return saveManifest(() -> manifestData, tempPath, finalPath, OP_SAVE_TASK_MANIFEST); + } + + /** + * Generate and save a task manifest or summary file. + * This is be done by writing to a temp path and then renaming. + *

+ * If the destination path exists: Delete it before the rename. + *

+ * This will retry so that a rename failure from abfs load or IO errors + * such as delete or save failure will not fail the task. + *

+ * The {@code manifestSource} supplier is invoked to get the manifest data + * on every attempt. + * This permits statistics to be updated, including those of failures. + * @param manifestSource supplier the manifest/success file + * @param tempPath temp path for the initial save + * @param finalPath final path for rename. + * @param statistic statistic to use for timing + * @return the manifest saved. + * @throws IOException failure to save/delete/rename after retries. + */ + @SuppressWarnings("unchecked") + protected final T saveManifest( + final Supplier manifestSource, + final Path tempPath, + final Path finalPath, + String statistic) throws IOException { + + int retryCount = 0; + RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep( + getStageConfig().getManifestSaveAttempts(), + SAVE_SLEEP_INTERVAL, + TimeUnit.MILLISECONDS); + + boolean success = false; + T savedManifest = null; + // loop until returning a value or raising an exception + while (!success) { + try { + // get the latest manifest, which may include updated statistics + final T manifestData = requireNonNull(manifestSource.get()); + LOG.info("{}: save manifest to {} then rename as {}'); retry count={}", + getName(), tempPath, finalPath, retryCount); + trackDurationOfInvocation(getIOStatistics(), statistic, () -> { + + // delete temp path. + // even though this is written with overwrite=true, this extra recursive + // delete also handles a directory being there. + // this should not happen as no part of the commit protocol creates a directory + // -this is just a little bit of due diligence. + deleteRecursive(tempPath, OP_DELETE); + + // save the temp file. + operations.save(manifestData, tempPath, true); + // get the length and etag. + final FileStatus st = getFileStatus(tempPath); + + // commit rename of temporary file to the final path; deleting the destination first. + final CommitOutcome outcome = commitFile( + new FileEntry(tempPath, finalPath, st.getLen(), getEtag(st)), + true); + if (outcome.recovered) { + LOG.warn("Task manifest file {} committed using rename recovery", + manifestData); + } + + }); + // success: save the manifest and declare success + savedManifest = manifestData; + success = true; + } catch (IOException e) { + // failure. + // log then decide whether to sleep and retry or give up. + LOG.warn("{}: Failed to save and commit file {} renamed to {}; retry count={}", + getName(), tempPath, finalPath, retryCount, e); + // increment that count. + retryCount++; + RetryPolicy.RetryAction retryAction; + try { + retryAction = retryPolicy.shouldRetry(e, retryCount, 0, true); + } catch (Exception ex) { + // it's not clear why this probe can raise an exception; it is just + // caught and mapped to a fail. + LOG.debug("Failure in retry policy", ex); + retryAction = RetryPolicy.RetryAction.FAIL; + } + LOG.debug("{}: Retry action: {}", getName(), retryAction.action); + if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + // too many failures: escalate. + throw e; + } + // else, sleep + try { + LOG.info("{}: Sleeping for {} ms before retrying", + getName(), retryAction.delayMillis); + Thread.sleep(retryAction.delayMillis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + // success: return the manifest which was saved. + return savedManifest; } /** @@ -609,8 +741,10 @@ public String getEtag(FileStatus status) { } /** - * Rename a file from source to dest; if the underlying FS API call - * returned false that's escalated to an IOE. + * Rename a file from source to dest. + *

+ * The destination is always deleted through a call to + * {@link #maybeDeleteDest(boolean, Path)}. * @param source source file. * @param dest dest file * @throws IOException failure @@ -618,7 +752,6 @@ public String getEtag(FileStatus status) { */ protected final void renameFile(final Path source, final Path dest) throws IOException { - maybeDeleteDest(true, dest); executeRenamingOperation("renameFile", source, dest, OP_RENAME_FILE, () -> operations.renameFile(source, dest)); @@ -637,7 +770,7 @@ protected final void renameDir(final Path source, final Path dest) maybeDeleteDest(true, dest); executeRenamingOperation("renameDir", source, dest, - OP_RENAME_FILE, () -> + OP_RENAME_DIR, () -> operations.renameDir(source, dest) ); } @@ -669,13 +802,14 @@ protected final CommitOutcome commitFile(FileEntry entry, // note any delay which took place noteAnyRateLimiting(STORE_IO_RATE_LIMITED, result.getWaitTime()); } + return new CommitOutcome(result.recovered()); } else { // commit with a simple rename; failures will be escalated. executeRenamingOperation("renameFile", source, dest, OP_COMMIT_FILE_RENAME, () -> operations.renameFile(source, dest)); + return new CommitOutcome(false); } - return new CommitOutcome(); } /** @@ -696,12 +830,15 @@ protected boolean storeSupportsResilientCommit() { */ private void maybeDeleteDest(final boolean deleteDest, final Path dest) throws IOException { - if (deleteDest && getFileStatusOrNull(dest) != null) { - - boolean deleted = delete(dest, true); - // log the outcome in case of emergency diagnostics traces - // being needed. - LOG.debug("{}: delete('{}') returned {}'", getName(), dest, deleted); + if (deleteDest) { + final FileStatus st = getFileStatusOrNull(dest); + if (st != null) { + if (st.isDirectory()) { + deleteRecursive(dest, OP_DELETE_DIR); + } else { + deleteFile(dest, OP_DELETE); + } + } } } @@ -792,6 +929,14 @@ private PathIOException escalateRenameFailure(String operation, */ public static final class CommitOutcome { + /** + * Dit the commit recover from a failure? + */ + public final boolean recovered; + + public CommitOutcome(final boolean recovered) { + this.recovered = recovered; + } } /** @@ -866,7 +1011,7 @@ protected final Path getTaskAttemptDir() { } /** - * Get the task attemptDir; raise an NPE + * Get the task attemptDir and raise an NPE * if it is null. * @return a non-null task attempt dir. */ @@ -915,26 +1060,35 @@ protected final TaskPool.Submitter getIOProcessors(int size) { } /** - * Delete a directory, possibly suppressing exceptions. + * Delete a directory (or a file). * @param dir directory. - * @param suppressExceptions should exceptions be suppressed? + * @param statistic statistic to use + * @return true if the path is no longer present. * @throws IOException exceptions raised in delete if not suppressed. - * @return any exception caught and suppressed */ - protected IOException deleteDir( + protected boolean deleteRecursive( final Path dir, - final Boolean suppressExceptions) + final String statistic) throws IOException { + return trackDuration(getIOStatistics(), statistic, () -> + operations.deleteRecursive(dir)); + } + + /** + * Delete a directory or file, catching exceptions. + * @param dir directory. + * @param statistic statistic to use + * @return any exception caught. + */ + protected IOException deleteRecursiveSuppressingExceptions( + final Path dir, + final String statistic) { try { - delete(dir, true); + deleteRecursive(dir, statistic); return null; } catch (IOException ex) { LOG.info("Error deleting {}: {}", dir, ex.toString()); - if (!suppressExceptions) { - throw ex; - } else { - return ex; - } + return ex; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java index 77b80aaf67fd6..054ec26fb00f5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java @@ -40,7 +40,10 @@ import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED; import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP; /** @@ -49,7 +52,7 @@ * Returns: the outcome of the overall operation * The result is detailed purely for the benefit of tests, which need * to make assertions about error handling and fallbacks. - * + *

* There's a few known issues with the azure and GCS stores which * this stage tries to address. * - Google GCS directory deletion is O(entries), so is slower for big jobs. @@ -57,19 +60,28 @@ * when not the store owner triggers a scan down the tree to verify the * caller has the permission to delete each subdir. * If this scan takes over 90s, the operation can time out. - * + *

* The main solution for both of these is that task attempts are * deleted in parallel, in different threads. * This will speed up GCS cleanup and reduce the risk of * abfs related timeouts. * Exceptions during cleanup can be suppressed, * so that these do not cause the job to fail. - * + *

+ * There is one weakness of this design: the number of delete operations + * is 1 + number of task attempts, which, on ABFS can generate excessive + * load. + * For this reason, there is an option to attempt to delete the base directory + * first; if this does not time out then, on Azure ADLS Gen2 storage, + * this is the most efficient cleanup. + * Only if that attempt fails for any reason then the parallel delete + * phase takes place. + *

* Also, some users want to be able to run multiple independent jobs * targeting the same output directory simultaneously. * If one job deletes the directory `__temporary` all the others * will fail. - * + *

* This can be addressed by disabling cleanup entirely. * */ @@ -128,7 +140,7 @@ protected Result executeStage( stageName = getStageName(args); // this is $dest/_temporary final Path baseDir = requireNonNull(getStageConfig().getOutputTempSubDir()); - LOG.debug("{}: Cleaup of directory {} with {}", getName(), baseDir, args); + LOG.debug("{}: Cleanup of directory {} with {}", getName(), baseDir, args); if (!args.enabled) { LOG.info("{}: Cleanup of {} disabled", getName(), baseDir); return new Result(Outcome.DISABLED, baseDir, @@ -142,64 +154,105 @@ protected Result executeStage( } Outcome outcome = null; - IOException exception; + IOException exception = null; + boolean baseDirDeleted = false; // to delete. LOG.info("{}: Deleting job directory {}", getName(), baseDir); + final long directoryCount = args.directoryCount; + if (directoryCount > 0) { + // log the expected directory count, which drives duration in GCS + // and may cause timeouts on azure if the count is too high for a + // timely permissions tree scan. + LOG.info("{}: Expected directory count: {}", getName(), directoryCount); + } + progress(); + // check and maybe execute parallel delete of task attempt dirs. if (args.deleteTaskAttemptDirsInParallel) { - // Attempt to do a parallel delete of task attempt dirs; - // don't overreact if a delete fails, but stop trying - // to delete the others, and fall back to deleting the - // job dir. - Path taskSubDir - = getStageConfig().getJobAttemptTaskSubDir(); - try (DurationInfo info = new DurationInfo(LOG, - "parallel deletion of task attempts in %s", - taskSubDir)) { - RemoteIterator dirs = - RemoteIterators.filteringRemoteIterator( - listStatusIterator(taskSubDir), - FileStatus::isDirectory); - TaskPool.foreach(dirs) - .executeWith(getIOProcessors()) - .stopOnFailure() - .suppressExceptions(false) - .run(this::rmTaskAttemptDir); - getIOStatistics().aggregate((retrieveIOStatistics(dirs))); - - if (getLastDeleteException() != null) { - // one of the task attempts failed. - throw getLastDeleteException(); + + + if (args.parallelDeleteAttemptBaseDeleteFirst) { + // attempt to delete the base dir first. + // This can reduce ABFS delete load but may time out + // (which the fallback to parallel delete will handle). + // on GCS it is slow. + try (DurationInfo info = new DurationInfo(LOG, true, + "Initial delete of %s", baseDir)) { + exception = deleteOneDir(baseDir); + if (exception == null) { + // success: record this as the outcome, + outcome = Outcome.DELETED; + // and flag that the the parallel delete should be skipped because the + // base directory is alredy deleted. + baseDirDeleted = true; + } else { + // failure: log and continue + LOG.warn("{}: Exception on initial attempt at deleting base dir {}" + + " with directory count {}. Falling back to parallel delete", + getName(), baseDir, directoryCount, exception); + } + } + } + if (!baseDirDeleted) { + // no base delete attempted or it failed. + // Attempt to do a parallel delete of task attempt dirs; + // don't overreact if a delete fails, but stop trying + // to delete the others, and fall back to deleting the + // job dir. + Path taskSubDir + = getStageConfig().getJobAttemptTaskSubDir(); + try (DurationInfo info = new DurationInfo(LOG, true, + "parallel deletion of task attempts in %s", + taskSubDir)) { + RemoteIterator dirs = + RemoteIterators.filteringRemoteIterator( + listStatusIterator(taskSubDir), + FileStatus::isDirectory); + TaskPool.foreach(dirs) + .executeWith(getIOProcessors()) + .stopOnFailure() + .suppressExceptions(false) + .run(this::rmTaskAttemptDir); + getIOStatistics().aggregate((retrieveIOStatistics(dirs))); + + if (getLastDeleteException() != null) { + // one of the task attempts failed. + throw getLastDeleteException(); + } else { + // success: record this as the outcome. + outcome = Outcome.PARALLEL_DELETE; + } + } catch (FileNotFoundException ex) { + // not a problem if there's no dir to list. + LOG.debug("{}: Task attempt dir {} not found", getName(), taskSubDir); + outcome = Outcome.DELETED; + } catch (IOException ex) { + // failure. Log and continue + LOG.info( + "{}: Exception while listing/deleting task attempts under {}; continuing", + getName(), + taskSubDir, ex); } - // success: record this as the outcome. - outcome = Outcome.PARALLEL_DELETE; - } catch (FileNotFoundException ex) { - // not a problem if there's no dir to list. - LOG.debug("{}: Task attempt dir {} not found", getName(), taskSubDir); - outcome = Outcome.DELETED; - } catch (IOException ex) { - // failure. Log and continue - LOG.info( - "{}: Exception while listing/deleting task attempts under {}; continuing", - getName(), - taskSubDir, ex); - // not overreacting here as the base delete will still get executing - outcome = Outcome.DELETED; } } - // Now the top-level deletion; exception gets saved - exception = deleteOneDir(baseDir); - if (exception != null) { - // failure, report and continue - // assume failure. - outcome = Outcome.FAILURE; - } else { - // if the outcome isn't already recorded as parallel delete, - // mark is a simple delete. - if (outcome == null) { - outcome = Outcome.DELETED; + // Now the top-level deletion if not already executed; exception gets saved + if (!baseDirDeleted) { + exception = deleteOneDir(baseDir); + if (exception != null) { + // failure, report and continue + LOG.warn("{}: Exception on final attempt at deleting base dir {}" + + " with directory count {}", + getName(), baseDir, directoryCount, exception); + // assume failure. + outcome = Outcome.FAILURE; + } else { + // if the outcome isn't already recorded as parallel delete, + // mark is a simple delete. + if (outcome == null) { + outcome = Outcome.DELETED; + } } } @@ -235,7 +288,7 @@ private void rmTaskAttemptDir(FileStatus status) throws IOException { } /** - * Delete a directory. + * Delete a directory suppressing exceptions. * The {@link #deleteFailureCount} counter. * is incremented on every failure. * @param dir directory @@ -246,21 +299,22 @@ private IOException deleteOneDir(final Path dir) throws IOException { deleteDirCount.incrementAndGet(); - IOException ex = deleteDir(dir, true); - if (ex != null) { - deleteFailure(ex); - } - return ex; + return noteAnyDeleteFailure( + deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR)); } /** - * Note a failure. + * Note a failure if the exception is not null. * @param ex exception + * @return the exception */ - private synchronized void deleteFailure(IOException ex) { - // excaption: add the count - deleteFailureCount.incrementAndGet(); - lastDeleteException = ex; + private synchronized IOException noteAnyDeleteFailure(IOException ex) { + if (ex != null) { + // exception: add the count + deleteFailureCount.incrementAndGet(); + lastDeleteException = ex; + } + return ex; } /** @@ -287,26 +341,47 @@ public static final class Arguments { /** Attempt parallel delete of task attempt dirs? */ private final boolean deleteTaskAttemptDirsInParallel; + /** + * Make an initial attempt to delete the base directory. + * This will reduce IO load on abfs. If it times out, the + * parallel delete will be the fallback. + */ + private final boolean parallelDeleteAttemptBaseDeleteFirst; + /** Ignore failures? */ private final boolean suppressExceptions; + /** + * Non-final count of directories. + * Default value, "0", means "unknown". + * This can be dynamically updated during job commit. + */ + private long directoryCount; + /** * Arguments to the stage. * @param statisticName stage name to report * @param enabled is the stage enabled? * @param deleteTaskAttemptDirsInParallel delete task attempt dirs in * parallel? + * @param parallelDeleteAttemptBaseDeleteFirst Make an initial attempt to + * delete the base directory in a parallel delete? * @param suppressExceptions suppress exceptions? + * @param directoryCount directories under job dir; 0 means unknown. */ public Arguments( final String statisticName, final boolean enabled, final boolean deleteTaskAttemptDirsInParallel, - final boolean suppressExceptions) { + final boolean parallelDeleteAttemptBaseDeleteFirst, + final boolean suppressExceptions, + long directoryCount) { this.statisticName = statisticName; this.enabled = enabled; this.deleteTaskAttemptDirsInParallel = deleteTaskAttemptDirsInParallel; this.suppressExceptions = suppressExceptions; + this.parallelDeleteAttemptBaseDeleteFirst = parallelDeleteAttemptBaseDeleteFirst; + this.directoryCount = directoryCount; } public String getStatisticName() { @@ -325,6 +400,18 @@ public boolean isSuppressExceptions() { return suppressExceptions; } + public boolean isParallelDeleteAttemptBaseDeleteFirst() { + return parallelDeleteAttemptBaseDeleteFirst; + } + + public long getDirectoryCount() { + return directoryCount; + } + + public void setDirectoryCount(final long directoryCount) { + this.directoryCount = directoryCount; + } + @Override public String toString() { return "Arguments{" + @@ -332,6 +419,7 @@ public String toString() { + ", enabled=" + enabled + ", deleteTaskAttemptDirsInParallel=" + deleteTaskAttemptDirsInParallel + + ", parallelDeleteAttemptBaseDeleteFirst=" + parallelDeleteAttemptBaseDeleteFirst + ", suppressExceptions=" + suppressExceptions + '}'; } @@ -343,8 +431,9 @@ public String toString() { public static final Arguments DISABLED = new Arguments(OP_STAGE_JOB_CLEANUP, false, false, - false - ); + false, + false, + 0); /** * Build an options argument from a configuration, using the @@ -364,12 +453,16 @@ public static Arguments cleanupStageOptionsFromConfig( boolean deleteTaskAttemptDirsInParallel = conf.getBoolean( OPT_CLEANUP_PARALLEL_DELETE, OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT); + boolean parallelDeleteAttemptBaseDeleteFirst = conf.getBoolean( + OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST, + OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT); return new Arguments( statisticName, enabled, deleteTaskAttemptDirsInParallel, - suppressExceptions - ); + parallelDeleteAttemptBaseDeleteFirst, + suppressExceptions, + 0); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java index 60fc6492ee621..8e01f7f40cba9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java @@ -37,6 +37,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_COUNT_MEAN; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS; @@ -161,7 +162,12 @@ protected CommitJobStage.Result executeStage( } // optional cleanup - new CleanupJobStage(stageConfig).apply(arguments.getCleanupArguments()); + final CleanupJobStage.Arguments cleanupArguments = arguments.getCleanupArguments(); + // determine the directory count + cleanupArguments.setDirectoryCount(iostats.counters() + .getOrDefault(COMMITTER_TASK_DIRECTORY_COUNT_MEAN, 0L)); + + new CleanupJobStage(stageConfig).apply(cleanupArguments); // and then, after everything else: optionally validate. if (arguments.isValidateOutput()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitTaskStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitTaskStage.java index bf5ba27ab8ad5..6ac2dec06a146 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitTaskStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitTaskStage.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; @@ -69,19 +70,21 @@ protected CommitTaskStage.Result executeStage(final Void arguments) // the saving, but ... scanStage.addExecutionDurationToStatistics(getIOStatistics(), OP_STAGE_TASK_COMMIT); - // save a snapshot of the IO Statistics - final IOStatisticsSnapshot manifestStats = snapshotIOStatistics(); - manifestStats.aggregate(getIOStatistics()); - manifest.setIOStatistics(manifestStats); - - // Now save with rename - Path manifestPath = new SaveTaskManifestStage(getStageConfig()) - .apply(manifest); - return new CommitTaskStage.Result(manifestPath, manifest); + // Now save with retry, updating the statistics on every attempt. + Pair p = new SaveTaskManifestStage(getStageConfig()) + .apply(() -> { + /* save a snapshot of the IO Statistics */ + final IOStatisticsSnapshot manifestStats = snapshotIOStatistics(); + manifestStats.aggregate(getIOStatistics()); + manifest.setIOStatistics(manifestStats); + return manifest; + }); + return new CommitTaskStage.Result(p.getLeft(), p.getRight()); } /** - * Result of the stage. + * Result of the stage: the path the manifest was saved to + * and the manifest which was successfully saved. */ public static final class Result { /** The path the manifest was saved to. */ @@ -111,5 +114,9 @@ public TaskManifest getTaskManifest() { return taskManifest; } + @Override + public String toString() { + return "Result{path=" + path + '}'; + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java index 1618cf591a590..18dc35960eb31 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java @@ -105,7 +105,7 @@ protected Result executeStage( throws IOException { final List directories = createAllDirectories(manifestDirs); - LOG.debug("{}: Created {} directories", getName(), directories.size()); + LOG.info("{}: Created {} directories", getName(), directories.size()); return new Result(new HashSet<>(directories), dirMap); } @@ -163,8 +163,9 @@ private List createAllDirectories(final Collection manifestDirs) // Now the real work. final int createCount = leaves.size(); - LOG.info("Preparing {} directory/directories; {} parent dirs implicitly created", - createCount, parents.size()); + LOG.info("Preparing {} directory/directories; {} parent dirs implicitly created." + + " Files deleted: {}", + createCount, parents.size(), filesToDelete.size()); // now probe for and create the leaf dirs, which are those at the // bottom level @@ -232,7 +233,7 @@ private void deleteDirWithFile(Path dir) throws IOException { // report progress back progress(); LOG.info("{}: Deleting file {}", getName(), dir); - delete(dir, false, OP_DELETE); + deleteFile(dir, OP_DELETE); // note its final state addToDirectoryMap(dir, DirMapState.fileNowDeleted); } @@ -323,7 +324,7 @@ private DirMapState maybeCreateOneDirectory(DirEntry dirEntry) throws IOExceptio // is bad: delete a file LOG.info("{}: Deleting file where a directory should go: {}", getName(), st); - delete(path, false, OP_DELETE_FILE_UNDER_DESTINATION); + deleteFile(path, OP_DELETE_FILE_UNDER_DESTINATION); } else { // is good. LOG.warn("{}: Even though mkdirs({}) failed, there is now a directory there", diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveSuccessFileStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveSuccessFileStage.java index eb9c82f2ae739..96b94e609d673 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveSuccessFileStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveSuccessFileStage.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.TMP_SUFFIX; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_SUMMARY_FILE; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_SAVE_SUCCESS; @@ -72,7 +73,7 @@ protected Path executeStage(final ManifestSuccessData successData) LOG.debug("{}: Saving _SUCCESS file to {} via {}", successFile, getName(), successTempFile); - save(successData, successTempFile, successFile); + saveManifest(() -> successData, successTempFile, successFile, OP_SAVE_SUMMARY_FILE); return successFile; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java index fdaf0184cda20..179e7c22ef058 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SaveTaskManifestStage.java @@ -19,13 +19,16 @@ package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; import java.io.IOException; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_SAVE_MANIFEST; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestTempPathForTaskAttempt; @@ -38,16 +41,36 @@ * Uses both the task ID and task attempt ID to determine the temp filename; * Before the rename of (temp, final-path), any file at the final path * is deleted. + *

* This is so that when this stage is invoked in a task commit, its output * overwrites any of the first commit. * When it succeeds, therefore, unless there is any subsequent commit of * another task, the task manifest at the final path is from this * operation. - * - * Returns the path where the manifest was saved. + *

+ * If the save and rename fails, there are a limited number of retries, with no sleep + * interval. + * This is to briefly try recover from any transient rename() failure, including a + * race condition with any other task commit. + *

    + *
  1. If the previous task commit has already succeeded, this rename will overwrite it. + * Both task attempts will report success.
  2. + *
  3. If after, writing, another task attempt overwrites it, again, both + * task attempts will report success.
  4. + *
  5. If another task commits between the delete() and rename() operations, the retry will + * attempt to recover by repeating the manifest write, and then report success.
  6. + *
+ * This means that multiple task attempts may report success, but only one will have it actual + * manifest saved. + * The mapreduce and spark committers only schedule a second task commit attempt if the first + * task attempt's commit operation fails or fails to report success in the allocated time. + * The overwrite with retry loop is an attempt to ensure that the second attempt will report + * success, if a partitioned cluster means that the original TA commit is still in progress. + *

+ * Returns (the path where the manifest was saved, the manifest). */ public class SaveTaskManifestStage extends - AbstractJobOrTaskStage { + AbstractJobOrTaskStage, Pair> { private static final Logger LOG = LoggerFactory.getLogger( SaveTaskManifestStage.class); @@ -57,14 +80,16 @@ public SaveTaskManifestStage(final StageConfig stageConfig) { } /** - * Save the manifest to a temp file and rename to the final + * Generate and save a manifest to a temp file and rename to the final * manifest destination. - * @param manifest manifest + * The manifest is generated on each retried attempt. + * @param manifestSource supplier the manifest/success file + * * @return the path to the final entry * @throws IOException IO failure. */ @Override - protected Path executeStage(final TaskManifest manifest) + protected Pair executeStage(Supplier manifestSource) throws IOException { final Path manifestDir = getTaskManifestDir(); @@ -74,8 +99,9 @@ protected Path executeStage(final TaskManifest manifest) Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, getRequiredTaskAttemptId()); LOG.info("{}: Saving manifest file to {}", getName(), manifestFile); - save(manifest, manifestTempFile, manifestFile); - return manifestFile; + final TaskManifest manifest = + saveManifest(manifestSource, manifestTempFile, manifestFile, OP_SAVE_TASK_MANIFEST); + return Pair.of(manifestFile, manifest); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SetupJobStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SetupJobStage.java index 9b873252df2cb..6e17aae23d201 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SetupJobStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/SetupJobStage.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_SETUP; /** @@ -55,7 +56,7 @@ protected Path executeStage(final Boolean deleteMarker) throws IOException { createNewDirectory("Creating task manifest dir", getTaskManifestDir()); // delete any success marker if so instructed. if (deleteMarker) { - delete(getStageConfig().getJobSuccessMarkerPath(), false); + deleteFile(getStageConfig().getJobSuccessMarkerPath(), OP_DELETE); } return path; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java index b716d2f4b7f0c..55ff4f888881f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/StageConfig.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT; /** * Stage Config. @@ -172,6 +173,12 @@ public class StageConfig { */ private int successMarkerFileLimit = SUCCESS_MARKER_FILE_LIMIT; + /** + * How many attempts to save a manifest by save and rename + * before giving up: {@value}. + */ + private int manifestSaveAttempts = OPT_MANIFEST_SAVE_ATTEMPTS_DEFAULT; + public StageConfig() { } @@ -604,6 +611,21 @@ public int getSuccessMarkerFileLimit() { return successMarkerFileLimit; } + public int getManifestSaveAttempts() { + return manifestSaveAttempts; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public StageConfig withManifestSaveAttempts(final int value) { + checkOpen(); + manifestSaveAttempts = value; + return this; + } + /** * Enter the stage; calls back to * {@link #enterStageEventHandler} if non-null. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md index 6c2141820d878..859f293726bd3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/MapredCommands.md @@ -134,6 +134,11 @@ Usage: `mapred envvars` Display computed Hadoop environment variables. +# `successfile` + +Load and print a JSON `_SUCCESS` file from a [Manifest Committer](manifest_committer.html) or an S3A Committer, + + Administration Commands ----------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md index da199a48d14c0..0ac03080195d4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md @@ -15,14 +15,16 @@ # The Manifest Committer for Azure and Google Cloud Storage -This document how to use the _Manifest Committer_. + + +This documents how to use the _Manifest Committer_. The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world" queries, and performance and correctness on GCS. It also works with other filesystems, including HDFS. However, the design is optimized for object stores where -listing operatons are slow and expensive. +listing operations are slow and expensive. The architecture and implementation of the committer is covered in [Manifest Committer Architecture](manifest_committer_architecture.html). @@ -31,10 +33,16 @@ The architecture and implementation of the committer is covered in The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html). -It was added in March 2022, and should be considered unstable -in early releases. +It was added in March 2022. +As of April 2024, the problems which surfaced have been +* Memory use at scale. +* Directory deletion scalability. +* Resilience to task commit to rename failures. - +That is: the core algorithms is correct, but task commit +robustness was insufficient to some failure conditions. +And scale is always a challenge, even with components tested through +large TPC-DS test runs. ## Problem: @@ -70,10 +78,13 @@ This committer uses the extension point which came in for the S3A committers. Users can declare a new committer factory for abfs:// and gcs:// URLs. A suitably configured spark deployment will pick up the new committer. -Directory performance issues in job cleanup can be addressed by two options +Directory performance issues in job cleanup can be addressed by some options 1. The committer will parallelize deletion of task attempt directories before deleting the `_temporary` directory. -1. Cleanup can be disabled. . +2. An initial attempt to delete the `_temporary` directory before the parallel + attempt is made. +3. Exceptions can be supressed, so that cleanup failures do not fail the job +4. Cleanup can be disabled. The committer can be used with any filesystem client which has a "real" file rename() operation. @@ -112,8 +123,8 @@ These can be done in `core-site.xml`, if it is not defined in the `mapred-defaul ## Binding to the manifest committer in Spark. -In Apache Spark, the configuration can be done either with command line options (after the '--conf') or by using the `spark-defaults.conf` file. The following is an example of using `spark-defaults.conf` also including the configuration for Parquet with a subclass of the parquet -committer which uses the factory mechansim internally. +In Apache Spark, the configuration can be done either with command line options (after the `--conf`) or by using the `spark-defaults.conf` file. +The following is an example of using `spark-defaults.conf` also including the configuration for Parquet with a subclass of the parquet committer which uses the factory mechanism internally. ``` spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory @@ -184,6 +195,7 @@ Here are the main configuration options of the committer. | `mapreduce.manifest.committer.io.threads` | Thread count for parallel operations | `64` | | `mapreduce.manifest.committer.summary.report.directory` | directory to save reports. | `""` | | `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete temporary directories in parallel | `true` | +| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` | | `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory| `false` | | `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` | | `mapreduce.fileoutputcommitter.marksuccessfuljobs` | Create a `_SUCCESS` marker file on successful completion. (and delete any existing one in job setup) | `true` | @@ -238,37 +250,6 @@ Caveats are made against the store. The rate throttling option `mapreduce.manifest.committer.io.rate` can help avoid this. - -### `mapreduce.manifest.committer.writer.queue.capacity` - -This is a secondary scale option. -It controls the size of the queue for storing lists of files to rename from -the manifests loaded from the target filesystem, manifests loaded -from a pool of worker threads, and the single thread which saves -the entries from each manifest to an intermediate file in the local filesystem. - -Once the queue is full, all manifest loading threads will block. - -```xml - - mapreduce.manifest.committer.writer.queue.capacity - 32 - -``` - -As the local filesystem is usually much faster to write to than any cloud store, -this queue size should not be a limit on manifest load performance. - -It can help limit the amount of memory consumed during manifest load during -job commit. -The maximum number of loaded manifests will be: - -``` -mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads -``` - - - ## Optional: deleting target files in Job Commit The classic `FileOutputCommitter` deletes files at the destination paths @@ -403,6 +384,153 @@ hadoop org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestP This works for the files saved at the base of an output directory, and any reports saved to a report directory. +Example from a run of the `ITestAbfsTerasort` MapReduce terasort. + +``` +bin/mapred successfile abfs://testing@ukwest.dfs.core.windows.net/terasort/_SUCCESS + +Manifest file: abfs://testing@ukwest.dfs.core.windows.net/terasort/_SUCCESS +succeeded: true +created: 2024-04-18T18:34:34.003+01:00[Europe/London] +committer: org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter +hostname: pi5 +jobId: job_1713461587013_0003 +jobIdSource: JobID +Diagnostics + mapreduce.manifest.committer.io.threads = 192 + principal = alice + stage = committer_commit_job + +Statistics: +counters=((commit_file_rename=1) +(committer_bytes_committed=21) +(committer_commit_job=1) +(committer_files_committed=1) +(committer_task_directory_depth=2) +(committer_task_file_count=2) +(committer_task_file_size=21) +(committer_task_manifest_file_size=37157) +(job_stage_cleanup=1) +(job_stage_create_target_dirs=1) +(job_stage_load_manifests=1) +(job_stage_optional_validate_output=1) +(job_stage_rename_files=1) +(job_stage_save_success_marker=1) +(job_stage_setup=1) +(op_create_directories=1) +(op_delete=3) +(op_delete_dir=1) +(op_get_file_status=9) +(op_get_file_status.failures=6) +(op_list_status=3) +(op_load_all_manifests=1) +(op_load_manifest=2) +(op_mkdirs=4) +(op_msync=1) +(op_rename=2) +(op_rename.failures=1) +(task_stage_commit=2) +(task_stage_save_task_manifest=1) +(task_stage_scan_directory=2) +(task_stage_setup=2)); + +gauges=(); + +minimums=((commit_file_rename.min=141) +(committer_commit_job.min=2306) +(committer_task_directory_count=0) +(committer_task_directory_depth=1) +(committer_task_file_count=0) +(committer_task_file_size=0) +(committer_task_manifest_file_size=18402) +(job_stage_cleanup.min=196) +(job_stage_create_target_dirs.min=2) +(job_stage_load_manifests.min=687) +(job_stage_optional_validate_output.min=66) +(job_stage_rename_files.min=161) +(job_stage_save_success_marker.min=653) +(job_stage_setup.min=571) +(op_create_directories.min=1) +(op_delete.min=57) +(op_delete_dir.min=129) +(op_get_file_status.failures.min=57) +(op_get_file_status.min=55) +(op_list_status.min=202) +(op_load_all_manifests.min=445) +(op_load_manifest.min=171) +(op_mkdirs.min=67) +(op_msync.min=0) +(op_rename.failures.min=266) +(op_rename.min=139) +(task_stage_commit.min=206) +(task_stage_save_task_manifest.min=651) +(task_stage_scan_directory.min=206) +(task_stage_setup.min=127)); + +maximums=((commit_file_rename.max=141) +(committer_commit_job.max=2306) +(committer_task_directory_count=0) +(committer_task_directory_depth=1) +(committer_task_file_count=1) +(committer_task_file_size=21) +(committer_task_manifest_file_size=18755) +(job_stage_cleanup.max=196) +(job_stage_create_target_dirs.max=2) +(job_stage_load_manifests.max=687) +(job_stage_optional_validate_output.max=66) +(job_stage_rename_files.max=161) +(job_stage_save_success_marker.max=653) +(job_stage_setup.max=571) +(op_create_directories.max=1) +(op_delete.max=113) +(op_delete_dir.max=129) +(op_get_file_status.failures.max=231) +(op_get_file_status.max=61) +(op_list_status.max=300) +(op_load_all_manifests.max=445) +(op_load_manifest.max=436) +(op_mkdirs.max=123) +(op_msync.max=0) +(op_rename.failures.max=266) +(op_rename.max=139) +(task_stage_commit.max=302) +(task_stage_save_task_manifest.max=651) +(task_stage_scan_directory.max=302) +(task_stage_setup.max=157)); + +means=((commit_file_rename.mean=(samples=1, sum=141, mean=141.0000)) +(committer_commit_job.mean=(samples=1, sum=2306, mean=2306.0000)) +(committer_task_directory_count=(samples=4, sum=0, mean=0.0000)) +(committer_task_directory_depth=(samples=2, sum=2, mean=1.0000)) +(committer_task_file_count=(samples=4, sum=2, mean=0.5000)) +(committer_task_file_size=(samples=2, sum=21, mean=10.5000)) +(committer_task_manifest_file_size=(samples=2, sum=37157, mean=18578.5000)) +(job_stage_cleanup.mean=(samples=1, sum=196, mean=196.0000)) +(job_stage_create_target_dirs.mean=(samples=1, sum=2, mean=2.0000)) +(job_stage_load_manifests.mean=(samples=1, sum=687, mean=687.0000)) +(job_stage_optional_validate_output.mean=(samples=1, sum=66, mean=66.0000)) +(job_stage_rename_files.mean=(samples=1, sum=161, mean=161.0000)) +(job_stage_save_success_marker.mean=(samples=1, sum=653, mean=653.0000)) +(job_stage_setup.mean=(samples=1, sum=571, mean=571.0000)) +(op_create_directories.mean=(samples=1, sum=1, mean=1.0000)) +(op_delete.mean=(samples=3, sum=240, mean=80.0000)) +(op_delete_dir.mean=(samples=1, sum=129, mean=129.0000)) +(op_get_file_status.failures.mean=(samples=6, sum=614, mean=102.3333)) +(op_get_file_status.mean=(samples=3, sum=175, mean=58.3333)) +(op_list_status.mean=(samples=3, sum=671, mean=223.6667)) +(op_load_all_manifests.mean=(samples=1, sum=445, mean=445.0000)) +(op_load_manifest.mean=(samples=2, sum=607, mean=303.5000)) +(op_mkdirs.mean=(samples=4, sum=361, mean=90.2500)) +(op_msync.mean=(samples=1, sum=0, mean=0.0000)) +(op_rename.failures.mean=(samples=1, sum=266, mean=266.0000)) +(op_rename.mean=(samples=1, sum=139, mean=139.0000)) +(task_stage_commit.mean=(samples=2, sum=508, mean=254.0000)) +(task_stage_save_task_manifest.mean=(samples=1, sum=651, mean=651.0000)) +(task_stage_scan_directory.mean=(samples=2, sum=508, mean=254.0000)) +(task_stage_setup.mean=(samples=2, sum=284, mean=142.0000))); + +``` + ## Collecting Job Summaries `mapreduce.manifest.committer.summary.report.directory` The committer can be configured to save the `_SUCCESS` summary files to a report directory, @@ -431,46 +559,62 @@ This allows for the statistics of jobs to be collected irrespective of their out saving the `_SUCCESS` marker is enabled, and without problems caused by a chain of queries overwriting the markers. +The `mapred successfile` operation can be used to print these reports. # Cleanup Job cleanup is convoluted as it is designed to address a number of issues which may surface in cloud storage. -* Slow performance for deletion of directories. -* Timeout when deleting very deep and wide directory trees. +* Slow performance for deletion of directories (GCS). +* Timeout when deleting very deep and wide directory trees (Azure). * General resilience to cleanup issues escalating to job failures. -| Option | Meaning | Default Value | -|--------|---------|---------------| -| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory| `false` | -| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` | -| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete task attempt directories in parallel | `true` | +| Option | Meaning | Default Value | +|-------------------------------------------------------------------|--------------------------------------------------------------------|---------------| +| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory | `false` | +| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` | +| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete task attempt directories in parallel | `true` | +| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` | The algorithm is: -``` -if `mapreduce.fileoutputcommitter.cleanup.skipped`: +```python +if "mapreduce.fileoutputcommitter.cleanup.skipped": return -if `mapreduce.manifest.committer.cleanup.parallel.delete`: - attempt parallel delete of task directories; catch any exception -if not `mapreduce.fileoutputcommitter.cleanup.skipped`: - delete(`_temporary`); catch any exception -if caught-exception and not `mapreduce.fileoutputcommitter.cleanup-failures.ignored`: - throw caught-exception +if "mapreduce.manifest.committer.cleanup.parallel.delete": + if "mapreduce.manifest.committer.cleanup.parallel.delete.base.first" : + if delete("_temporary"): + return + delete(list("$task-directories")) catch any exception +if not "mapreduce.fileoutputcommitter.cleanup.skipped": + delete("_temporary"); catch any exception +if caught-exception and not "mapreduce.fileoutputcommitter.cleanup-failures.ignored": + raise caught-exception ``` It's a bit complicated, but the goal is to perform a fast/scalable delete and throw a meaningful exception if that didn't work. -When working with ABFS and GCS, these settings should normally be left alone. -If somehow errors surface during cleanup, enabling the option to -ignore failures will ensure the job still completes. +For ABFS set `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` to `true` +which should normally result in less network IO and a faster cleanup. + +``` +spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true +``` + +For GCS, setting `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` +to `false` may speed up cleanup. + +If somehow errors surface during cleanup, ignoring failures will ensure the job +is still considered a success. +`mapreduce.fileoutputcommitter.cleanup-failures.ignored = true` + Disabling cleanup even avoids the overhead of cleanup, but requires a workflow or manual operation to clean up all -`_temporary` directories on a regular basis. - +`_temporary` directories on a regular basis: +`mapreduce.fileoutputcommitter.cleanup.skipped = true`. # Working with Azure ADLS Gen2 Storage @@ -504,9 +648,15 @@ The core set of Azure-optimized options becomes - spark.hadoop.fs.azure.io.rate.limit - 10000 + fs.azure.io.rate.limit + 1000 + + + + mapreduce.manifest.committer.cleanup.parallel.delete.base.first + true + ``` And optional settings for debugging/performance analysis @@ -514,7 +664,7 @@ And optional settings for debugging/performance analysis ```xml mapreduce.manifest.committer.summary.report.directory - abfs:// Path within same store/separate store + Path within same store/separate store Optional: path to where job summaries are saved ``` @@ -523,14 +673,15 @@ And optional settings for debugging/performance analysis ``` spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory -spark.hadoop.fs.azure.io.rate.limit 10000 +spark.hadoop.fs.azure.io.rate.limit 1000 +spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries) ``` -## Experimental: ABFS Rename Rate Limiting `fs.azure.io.rate.limit` +## ABFS Rename Rate Limiting `fs.azure.io.rate.limit` To avoid triggering store throttling and backoff delays, as well as other throttling-related failure conditions file renames during job commit @@ -544,13 +695,12 @@ may issue. Set the option to `0` remove all rate limiting. -The default value of this is set to 10000, which is the default IO capacity for -an ADLS storage account. +The default value of this is set to 1000. ```xml fs.azure.io.rate.limit - 10000 + 1000 maximum number of renames attempted per second ``` @@ -569,7 +719,7 @@ If server-side throttling took place, signs of this can be seen in * The store service's logs and their throttling status codes (usually 503 or 500). * The job statistic `commit_file_rename_recovered`. This statistic indicates that ADLS throttling manifested as failures in renames, failures which were recovered - from in the comitter. + from in the committer. If these are seen -or other applications running at the same time experience throttling/throttling-triggered problems, consider reducing the value of @@ -598,13 +748,14 @@ The Spark settings to switch to this committer are spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol - +spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first false spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries) ``` The store's directory delete operations are `O(files)` so the value of `mapreduce.manifest.committer.cleanup.parallel.delete` -SHOULD be left at the default of `true`. +SHOULD be left at the default of `true`, but +`mapreduce.manifest.committer.cleanup.parallel.delete.base.first` changed to `false` For mapreduce, declare the binding in `core-site.xml`or `mapred-site.xml` ```xml @@ -639,19 +790,33 @@ spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOut spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: URI of a directory for job summaries) ``` -# Advanced Topics - -## Advanced Configuration options +# Advanced Configuration options There are some advanced options which are intended for development and testing, rather than production use. -| Option | Meaning | Default Value | -|--------|----------------------------------------------|---------------| -| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` | -| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` | -| `mapreduce.manifest.committer.writer.queue.capacity` | Queue capacity for writing intermediate file | `32` | +| Option | Meaning | Default Value | +|-----------------------------------------------------------|-------------------------------------------------------------|---------------| +| `mapreduce.manifest.committer.manifest.save.attempts` | How many attempts should be made to commit a task manifest? | `5` | +| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` | +| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` | +| `mapreduce.manifest.committer.writer.queue.capacity` | Queue capacity for writing intermediate file | `32` | + +### `mapreduce.manifest.committer.manifest.save.attempts` + +The number of attempts which should be made to save a task attempt manifest, which is done by +1. Writing the file to a temporary file in the job attempt directory. +2. Deleting any existing task manifest +3. Renaming the temporary file to the final filename. +This may fail for unrecoverable reasons (permissions, permanent loss of network, service down,...) or it may be +a transient problem which may not reoccur if another attempt is made to write the data. + +The number of attempts to make is set by `mapreduce.manifest.committer.manifest.save.attempts`; +the sleep time increases with each attempt. + +Consider increasing the default value if task attempts fail to commit their work +and fail to recover from network problems. ### Validating output `mapreduce.manifest.committer.validate.output` @@ -691,6 +856,34 @@ There is no need to alter these values, except when writing new implementations something which is only needed if the store provides extra integration support for the committer. +### `mapreduce.manifest.committer.writer.queue.capacity` + +This is a secondary scale option. +It controls the size of the queue for storing lists of files to rename from +the manifests loaded from the target filesystem, manifests loaded +from a pool of worker threads, and the single thread which saves +the entries from each manifest to an intermediate file in the local filesystem. + +Once the queue is full, all manifest loading threads will block. + +```xml + + mapreduce.manifest.committer.writer.queue.capacity + 32 + +``` + +As the local filesystem is usually much faster to write to than any cloud store, +this queue size should not be a limit on manifest load performance. + +It can help limit the amount of memory consumed during manifest load during +job commit. +The maximum number of loaded manifests will be: + +``` +mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads +``` + ## Support for concurrent jobs to the same directory It *may* be possible to run multiple jobs targeting the same directory tree. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md index 55806fb6f5b45..a1d8cb5fc3da8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer_architecture.md @@ -19,6 +19,7 @@ This document describes the architecture and other implementation/correctness aspects of the [Manifest Committer](manifest_committer.html) The protocol and its correctness are covered in [Manifest Committer Protocol](manifest_committer_protocol.html). + The _Manifest_ committer is a committer for work which provides performance on ABFS for "real world" @@ -278,6 +279,11 @@ The manifest committer assumes that the amount of data being stored in memory is because there is no longer the need to store an etag for every block of every file being committed. +This assumption turned out not to hold for some jobs: +[MAPREDUCE-7435. ManifestCommitter OOM on azure job](https://issues.apache.org/jira/browse/MAPREDUCE-7435) + +The strategy here was to read in all manifests and stream their entries to a local file, as Hadoop +Writable objects -hence with lower marshalling overhead than JSON. #### Duplicate creation of directories in the dest dir diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java index 5b64d544bc551..57c0c39ed9b7f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java @@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SaveTaskManifestStage; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage; @@ -167,6 +168,12 @@ public abstract class AbstractManifestCommitterTest private static final int MAX_LEN = 64_000; + /** + * How many attempts to save manifests before giving up. + * Kept small to reduce sleep times and network delays. + */ + public static final int SAVE_ATTEMPTS = 4; + /** * Submitter for tasks; may be null. */ @@ -771,6 +778,9 @@ protected StageConfig createStageConfigForJob( /** * Create the stage config for job or task but don't finalize it. * Uses {@link #TASK_IDS} for job/task ID. + * The store operations is extracted from + * {@link #getStoreOperations()}, which is how fault injection + * can be set up. * @param jobAttemptNumber job attempt number * @param taskIndex task attempt index; -1 for job attempt only. * @param taskAttemptNumber task attempt number @@ -796,6 +806,7 @@ protected StageConfig createStageConfig( .withJobAttemptNumber(jobAttemptNumber) .withJobDirectories(attemptDirs) .withName(String.format(NAME_FORMAT_JOB_ATTEMPT, jobId)) + .withManifestSaveAttempts(SAVE_ATTEMPTS) .withOperations(getStoreOperations()) .withProgressable(getProgressCounter()) .withSuccessMarkerFileLimit(100_000) @@ -924,7 +935,7 @@ protected TaskManifest executeOneTaskAttempt(final int task, } // save the manifest for this stage. - new SaveTaskManifestStage(taskStageConfig).apply(manifest); + new SaveTaskManifestStage(taskStageConfig).apply(() -> manifest); return manifest; } @@ -998,7 +1009,9 @@ protected void assertCleanupResult( * Create and execute a cleanup stage. * @param enabled is the stage enabled? * @param deleteTaskAttemptDirsInParallel delete task attempt dirs in - * parallel? + * parallel? + * @param attemptBaseDeleteFirst Make an initial attempt to + * delete the base directory * @param suppressExceptions suppress exceptions? * @param outcome expected outcome. * @param expectedDirsDeleted #of directories deleted. -1 for no checks @@ -1008,13 +1021,18 @@ protected void assertCleanupResult( protected CleanupJobStage.Result cleanup( final boolean enabled, final boolean deleteTaskAttemptDirsInParallel, + boolean attemptBaseDeleteFirst, final boolean suppressExceptions, final CleanupJobStage.Outcome outcome, final int expectedDirsDeleted) throws IOException { StageConfig stageConfig = getJobStageConfig(); CleanupJobStage.Result result = new CleanupJobStage(stageConfig) .apply(new CleanupJobStage.Arguments(OP_STAGE_JOB_CLEANUP, - enabled, deleteTaskAttemptDirsInParallel, suppressExceptions)); + enabled, + deleteTaskAttemptDirsInParallel, + attemptBaseDeleteFirst, + suppressExceptions, + 0)); assertCleanupResult(result, outcome, expectedDirsDeleted); return result; } @@ -1038,6 +1056,24 @@ protected String readText(final Path path) throws IOException { StandardCharsets.UTF_8); } + /** + * Make the store operations unreliable. + * If it already was then reset the failure options. + * @return the store operations + */ + protected UnreliableManifestStoreOperations makeStoreOperationsUnreliable() { + UnreliableManifestStoreOperations failures; + final ManifestStoreOperations wrappedOperations = getStoreOperations(); + if (wrappedOperations instanceof UnreliableManifestStoreOperations) { + failures = (UnreliableManifestStoreOperations) wrappedOperations; + failures.reset(); + } else { + failures = new UnreliableManifestStoreOperations(wrappedOperations); + setStoreOperations(failures); + } + return failures; + } + /** * Counter. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterTestSupport.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterTestSupport.java index 3b52fe9875641..31620e55239ae 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterTestSupport.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterTestSupport.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.RecordWriter; @@ -314,6 +315,21 @@ static void assertDirEntryMatch( .isEqualTo(type); } + /** + * Assert that none of the named statistics have any failure counts, + * which may be from being null or 0. + * @param iostats statistics + * @param names base name of the statistics (i.e. without ".failures" suffix) + */ + public static void assertNoFailureStatistics(IOStatistics iostats, String... names) { + final Map counters = iostats.counters(); + for (String name : names) { + Assertions.assertThat(counters.get(name + ".failures")) + .describedAs("Failure count of %s", name) + .matches(f -> f == null || f == 0); + } + } + /** * Save a manifest to an entry file; returning the loaded manifest data. * Caller MUST clean up the temp file. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCleanupStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCleanupStage.java index 8d551c505209c..c8c766a43cff3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCleanupStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCleanupStage.java @@ -80,17 +80,25 @@ public void setup() throws Exception { @Test public void testCleanupInParallelHealthy() throws Throwable { describe("parallel cleanup of TA dirs."); - cleanup(true, true, false, + cleanup(true, true, false, false, CleanupJobStage.Outcome.PARALLEL_DELETE, PARALLEL_DELETE_COUNT); verifyJobDirsCleanedUp(); } + @Test + public void testCleanupInParallelHealthyBaseFirst() throws Throwable { + describe("parallel cleanup of TA dirs with base first: one operation"); + cleanup(true, true, true, false, + CleanupJobStage.Outcome.DELETED, ROOT_DELETE_COUNT); + verifyJobDirsCleanedUp(); + } + @Test public void testCleanupSingletonHealthy() throws Throwable { describe("Cleanup with a single delete. Not the default; would be best on HDFS"); - cleanup(true, false, false, + cleanup(true, false, false, false, CleanupJobStage.Outcome.DELETED, ROOT_DELETE_COUNT); verifyJobDirsCleanedUp(); } @@ -99,31 +107,69 @@ public void testCleanupSingletonHealthy() throws Throwable { public void testCleanupNoDir() throws Throwable { describe("parallel cleanup MUST not fail if there's no dir"); // first do the cleanup - cleanup(true, true, false, + cleanup(true, true, false, false, CleanupJobStage.Outcome.PARALLEL_DELETE, PARALLEL_DELETE_COUNT); // now expect cleanup by single delete still works // the delete count is 0 as pre check skips it - cleanup(true, false, false, + cleanup(true, false, false, false, + CleanupJobStage.Outcome.NOTHING_TO_CLEAN_UP, 0); + cleanup(true, true, true, false, CleanupJobStage.Outcome.NOTHING_TO_CLEAN_UP, 0); // if skipped, that happens first - cleanup(false, true, false, + cleanup(false, true, false, false, CleanupJobStage.Outcome.DISABLED, 0); } @Test public void testFailureInParallelDelete() throws Throwable { - describe("A parallel delete fails, but the base delete works"); + describe("A parallel delete fails, but the fallback base delete works"); // pick one of the manifests TaskManifest manifest = manifests.get(4); - Path taPath = new Path(manifest.getTaskAttemptDir()); - failures.addDeletePathToFail(taPath); - cleanup(true, true, false, + failures.addDeletePathToFail(new Path(manifest.getTaskAttemptDir())); + cleanup(true, true, false, false, CleanupJobStage.Outcome.DELETED, PARALLEL_DELETE_COUNT); } + @Test + public void testFailureInParallelBaseDelete() throws Throwable { + describe("A parallel delete fails in the base delete; the parallel stage works"); + + // base path will timeout on first delete; the parallel delete will take place + failures.addDeletePathToTimeOut(getJobStageConfig().getOutputTempSubDir()); + failures.setFailureLimit(1); + cleanup(true, true, false, false, + CleanupJobStage.Outcome.PARALLEL_DELETE, PARALLEL_DELETE_COUNT); + } + + @Test + public void testDoubleFailureInParallelBaseDelete() throws Throwable { + describe("A parallel delete fails with the base delete and a task attempt dir"); + + // base path will timeout on first delete; the parallel delete will take place + failures.addDeletePathToTimeOut(getJobStageConfig().getOutputTempSubDir()); + TaskManifest manifest = manifests.get(4); + failures.addDeletePathToFail(new Path(manifest.getTaskAttemptDir())); + failures.setFailureLimit(2); + cleanup(true, true, true, false, + CleanupJobStage.Outcome.DELETED, PARALLEL_DELETE_COUNT + 1); + } + + @Test + public void testTripleFailureInParallelBaseDelete() throws Throwable { + describe("All delete phases will fail"); + + // base path will timeout on first delete; the parallel delete will take place + failures.addDeletePathToTimeOut(getJobStageConfig().getOutputTempSubDir()); + TaskManifest manifest = manifests.get(4); + failures.addDeletePathToFail(new Path(manifest.getTaskAttemptDir())); + failures.setFailureLimit(4); + cleanup(true, true, true, true, + CleanupJobStage.Outcome.FAILURE, PARALLEL_DELETE_COUNT + 1); + } + /** * If there's no job task attempt subdir then the list of it will raise * and FNFE; this MUST be caught and the base delete executed. @@ -135,7 +181,7 @@ public void testParallelDeleteNoTaskAttemptDir() throws Throwable { StageConfig stageConfig = getJobStageConfig(); // TA dir doesn't exist, so listing will fail. failures.addPathNotFound(stageConfig.getJobAttemptTaskSubDir()); - cleanup(true, true, false, + cleanup(true, true, false, false, CleanupJobStage.Outcome.DELETED, ROOT_DELETE_COUNT); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCommitTaskStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCommitTaskStage.java index 4f4162d46cb9f..95de9a32eecd1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCommitTaskStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCommitTaskStage.java @@ -19,13 +19,21 @@ package org.apache.hadoop.mapreduce.lib.output.committer.manifest; import java.io.FileNotFoundException; +import java.net.SocketTimeoutException; import org.assertj.core.api.Assertions; import org.junit.Test; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitJobStage; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CommitTaskStage; @@ -33,14 +41,27 @@ import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupTaskStage; import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestPathForTask; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.manifestTempPathForTaskAttempt; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations.E_TIMEOUT; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations.generatedErrorMessage; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** - * Test committing a task. + * Test committing a task, with lots of fault injection to validate + * resilience to transient failures. */ public class TestCommitTaskStage extends AbstractManifestCommitterTest { + public static final String TASK1 = String.format("task_%03d", 1); + + public static final String TASK1_ATTEMPT1 = String.format("%s_%02d", + TASK1, 1); + @Override public void setup() throws Exception { super.setup(); @@ -51,6 +72,15 @@ public void setup() throws Exception { new SetupJobStage(stageConfig).apply(true); } + + /** + * Create a stage config for job 1 task1 attempt 1. + * @return a task stage configuration. + */ + private StageConfig createStageConfig() { + return createTaskStageConfig(JOB1, TASK1, TASK1_ATTEMPT1); + } + @Test public void testCommitMissingDirectory() throws Throwable { @@ -108,8 +138,9 @@ public void testCommitEmptyDirectory() throws Throwable { OP_STAGE_JOB_CLEANUP, true, true, - false - ))); + false, + false, + 0))); // review success file final Path successPath = outcome.getSuccessPath(); @@ -123,4 +154,283 @@ public void testCommitEmptyDirectory() throws Throwable { .isEmpty(); } + + @Test + public void testManifestSaveFailures() throws Throwable { + describe("Test recovery of manifest save/rename failures"); + + UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable(); + + StageConfig stageConfig = createStageConfig(); + + new SetupTaskStage(stageConfig).apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + // final manifest file is by task ID + Path manifestFile = manifestPathForTask(manifestDir, + stageConfig.getTaskId()); + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + // manifest save will fail but recover before the task gives up. + failures.addSaveToFail(manifestTempFile); + + // will fail because too many attempts failed. + failures.setFailureLimit(SAVE_ATTEMPTS + 1); + intercept(PathIOException.class, generatedErrorMessage("save"), () -> + new CommitTaskStage(stageConfig).apply(null)); + + // will succeed because the failure limit is set lower + failures.setFailureLimit(SAVE_ATTEMPTS - 1); + new CommitTaskStage(stageConfig).apply(null); + + describe("Testing timeouts on rename operations."); + // now do it for the renames, which will fail after the rename + failures.reset(); + failures.addTimeoutBeforeRename(manifestTempFile); + + // first verify that if too many attempts fail, the task will fail + failures.setFailureLimit(SAVE_ATTEMPTS + 1); + intercept(SocketTimeoutException.class, E_TIMEOUT, () -> + new CommitTaskStage(stageConfig).apply(null)); + + // reduce the limit and expect the stage to succeed. + failures.setFailureLimit(SAVE_ATTEMPTS - 1); + new CommitTaskStage(stageConfig).apply(null); + } + + /** + * Save with renaming failing before the rename; the source file + * will be present on the next attempt. + * The successfully saved manifest file is loaded and its statistics + * examined to verify that the failure count is updated. + */ + @Test + public void testManifestRenameEarlyTimeouts() throws Throwable { + describe("Testing timeouts on rename operations."); + + UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable(); + StageConfig stageConfig = createStageConfig(); + + new SetupTaskStage(stageConfig).apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + // final manifest file is by task ID + Path manifestFile = manifestPathForTask(manifestDir, + stageConfig.getTaskId()); + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + + // configure for which will fail after the rename + failures.addTimeoutBeforeRename(manifestTempFile); + + // first verify that if too many attempts fail, the task will fail + failures.setFailureLimit(SAVE_ATTEMPTS + 1); + intercept(SocketTimeoutException.class, E_TIMEOUT, () -> + new CommitTaskStage(stageConfig).apply(null)); + // and that the IO stats are updated + final IOStatisticsStore iostats = stageConfig.getIOStatistics(); + assertThatStatisticCounter(iostats, OP_SAVE_TASK_MANIFEST + ".failures") + .isEqualTo(SAVE_ATTEMPTS); + + // reduce the limit and expect the stage to succeed. + iostats.reset(); + failures.setFailureLimit(SAVE_ATTEMPTS); + final CommitTaskStage.Result r = new CommitTaskStage(stageConfig).apply(null); + + // load in the manifest + final TaskManifest loadedManifest = TaskManifest.load(getFileSystem(), r.getPath()); + final IOStatisticsSnapshot loadedIOStats = loadedManifest.getIOStatistics(); + LOG.info("Statistics of file successfully saved:\nD {}", + ioStatisticsToPrettyString(loadedIOStats)); + assertThatStatisticCounter(loadedIOStats, OP_SAVE_TASK_MANIFEST + ".failures") + .isEqualTo(SAVE_ATTEMPTS - 1); + } + + @Test + public void testManifestRenameLateTimeoutsFailure() throws Throwable { + describe("Testing timeouts on rename operations."); + + UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable(); + StageConfig stageConfig = createStageConfig(); + + new SetupTaskStage(stageConfig).apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + failures.addTimeoutAfterRename(manifestTempFile); + + // if too many attempts fail, the task will fail + failures.setFailureLimit(SAVE_ATTEMPTS + 1); + intercept(SocketTimeoutException.class, E_TIMEOUT, () -> + new CommitTaskStage(stageConfig).apply(null)); + + } + + @Test + public void testManifestRenameLateTimeoutsRecovery() throws Throwable { + describe("Testing recovery from late timeouts on rename operations."); + + UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable(); + StageConfig stageConfig = createStageConfig(); + + new SetupTaskStage(stageConfig).apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + failures.addTimeoutAfterRename(manifestTempFile); + + // reduce the limit and expect the stage to succeed. + failures.setFailureLimit(SAVE_ATTEMPTS); + stageConfig.getIOStatistics().reset(); + new CommitTaskStage(stageConfig).apply(null); + final CommitTaskStage.Result r = new CommitTaskStage(stageConfig).apply(null); + + // load in the manifest + final TaskManifest loadedManifest = TaskManifest.load(getFileSystem(), r.getPath()); + final IOStatisticsSnapshot loadedIOStats = loadedManifest.getIOStatistics(); + LOG.info("Statistics of file successfully saved:\n{}", + ioStatisticsToPrettyString(loadedIOStats)); + // the failure event is one less than the limit. + assertThatStatisticCounter(loadedIOStats, OP_SAVE_TASK_MANIFEST + ".failures") + .isEqualTo(SAVE_ATTEMPTS - 1); + } + + @Test + public void testFailureToDeleteManifestPath() throws Throwable { + describe("Testing failure in the delete call made before renaming the manifest"); + + UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable(); + StageConfig stageConfig = createStageConfig(); + + new SetupTaskStage(stageConfig).apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + // final manifest file is by task ID + Path manifestFile = manifestPathForTask(manifestDir, + stageConfig.getTaskId()); + // put a file in as there is a check for it before the delete + ContractTestUtils.touch(getFileSystem(), manifestFile); + /* and the delete shall fail */ + failures.addDeletePathToFail(manifestFile); + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + + // first verify that if too many attempts fail, the task will fail + failures.setFailureLimit(SAVE_ATTEMPTS + 1); + intercept(PathIOException.class, () -> + new CommitTaskStage(stageConfig).apply(null)); + + // reduce the limit and expect the stage to succeed. + failures.setFailureLimit(SAVE_ATTEMPTS - 1); + new CommitTaskStage(stageConfig).apply(null); + } + + + /** + * Failure of delete before saving the manifest to a temporary path. + */ + @Test + public void testFailureOfDeleteBeforeSavingTemporaryFile() throws Throwable { + describe("Testing failure in the delete call made before rename"); + + UnreliableManifestStoreOperations failures = makeStoreOperationsUnreliable(); + StageConfig stageConfig = createStageConfig(); + + new SetupTaskStage(stageConfig).apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + // delete will fail + failures.addDeletePathToFail(manifestTempFile); + + // first verify that if too many attempts fail, the task will fail + failures.setFailureLimit(SAVE_ATTEMPTS + 1); + intercept(PathIOException.class, () -> + new CommitTaskStage(stageConfig).apply(null)); + + // reduce the limit and expect the stage to succeed. + failures.setFailureLimit(SAVE_ATTEMPTS - 1); + new CommitTaskStage(stageConfig).apply(null); + + } + /** + * Rename target is a directory. + */ + @Test + public void testRenameTargetIsDir() throws Throwable { + describe("Rename target is a directory"); + + final ManifestStoreOperations operations = getStoreOperations(); + StageConfig stageConfig = createStageConfig(); + + final SetupTaskStage setup = new SetupTaskStage(stageConfig); + setup.apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + // final manifest file is by task ID + Path manifestFile = manifestPathForTask(manifestDir, + stageConfig.getTaskId()); + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + // add a directory where the manifest file is to go + setup.mkdirs(manifestFile, true); + ContractTestUtils.assertIsDirectory(getFileSystem(), manifestFile); + new CommitTaskStage(stageConfig).apply(null); + + // this must be a file. + final FileStatus st = operations.getFileStatus(manifestFile); + Assertions.assertThat(st) + .describedAs("File status of %s", manifestFile) + .matches(FileStatus::isFile, "is a file"); + + // and it must load. + final TaskManifest manifest = setup.loadManifest(st); + Assertions.assertThat(manifest) + .matches(m -> m.getTaskID().equals(TASK1)) + .matches(m -> m.getTaskAttemptID().equals(TASK1_ATTEMPT1)); + } + + /** + * Manifest temp file path is a directory. + */ + @Test + public void testManifestTempFileIsDir() throws Throwable { + describe("Manifest temp file path is a directory"); + + final ManifestStoreOperations operations = getStoreOperations(); + StageConfig stageConfig = createStageConfig(); + + final SetupTaskStage setup = new SetupTaskStage(stageConfig); + setup.apply("setup"); + + final Path manifestDir = stageConfig.getTaskManifestDir(); + // final manifest file is by task ID + Path manifestFile = manifestPathForTask(manifestDir, + stageConfig.getTaskId()); + Path manifestTempFile = manifestTempPathForTaskAttempt(manifestDir, + stageConfig.getTaskAttemptId()); + + // add a directory where the manifest file is to go + setup.mkdirs(manifestTempFile, true); + new CommitTaskStage(stageConfig).apply(null); + + final TaskManifest manifest = setup.loadManifest( + operations.getFileStatus(manifestFile)); + Assertions.assertThat(manifest) + .matches(m -> m.getTaskID().equals(TASK1)) + .matches(m -> m.getTaskAttemptID().equals(TASK1_ATTEMPT1)); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java index c471ef11a88d4..b2d3c3f84a6bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestCreateOutputDirectoriesStage.java @@ -247,7 +247,7 @@ public void testPrepareDirtyTree() throws Throwable { CreateOutputDirectoriesStage attempt2 = new CreateOutputDirectoriesStage( createStageConfigForJob(JOB1, destDir) - .withDeleteTargetPaths(true)); + .withDeleteTargetPaths(false)); // attempt will fail because one of the entries marked as // a file to delete is now a non-empty directory LOG.info("Executing failing attempt to create the directories"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java index 4bc2ce9bcf648..152b2c86e0f9c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestJobThroughManifestCommitter.java @@ -598,7 +598,8 @@ public void test_0450_validationDetectsFailures() throws Throwable { public void test_0900_cleanupJob() throws Throwable { describe("Cleanup job"); CleanupJobStage.Arguments arguments = new CleanupJobStage.Arguments( - OP_STAGE_JOB_CLEANUP, true, true, false); + OP_STAGE_JOB_CLEANUP, true, true, + false, false, 0); // the first run will list the three task attempt dirs and delete each // one before the toplevel dir. CleanupJobStage.Result result = new CleanupJobStage( @@ -615,7 +616,7 @@ public void test_0900_cleanupJob() throws Throwable { * Needed to clean up the shared test root, as test case teardown * does not do it. */ - //@Test + @Test public void test_9999_cleanupTestDir() throws Throwable { if (shouldDeleteTestRootAtEndOfTestRun()) { deleteSharedTestRoot(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java index 4dd7fe2dbcea5..ce20e02457a89 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/TestLoadManifestsStage.java @@ -176,7 +176,7 @@ public void testSaveThenLoadManyManifests() throws Throwable { // and skipping the rename stage (which is going to fail), // go straight to cleanup new CleanupJobStage(stageConfig).apply( - new CleanupJobStage.Arguments("", true, true, false)); + new CleanupJobStage.Arguments("", true, true, false, false, 0)); heapinfo(heapInfo, "cleanup"); ManifestSuccessData success = createManifestOutcome(stageConfig, OP_STAGE_JOB_COMMIT); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java index 811fc704a2a33..61a6ce1421e38 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableManifestStoreOperations.java @@ -21,8 +21,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +49,7 @@ * This is for testing. It could be implemented via * Mockito 2 spy code but is not so that: * 1. It can be backported to Hadoop versions using Mockito 1.x. - * 2. It can be extended to use in production. This is why it is in - * the production module -to allow for downstream tests to adopt it. + * 2. It can be extended to use in production. * 3. You can actually debug what's going on. */ @InterfaceAudience.Private @@ -69,6 +70,12 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations { */ public static final String SIMULATED_FAILURE = "Simulated failure"; + /** + * Default failure limit. + * Set to a large enough value that most tests don't hit it. + */ + private static final int DEFAULT_FAILURE_LIMIT = Integer.MAX_VALUE; + /** * Underlying store operations to wrap. */ @@ -110,6 +117,16 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations { */ private final Set renameDestDirsToFail = new HashSet<>(); + /** + * Source paths of rename operations to time out before the rename request is issued. + */ + private final Set renamePathsToTimeoutBeforeRename = new HashSet<>(); + + /** + * Source paths of rename operations to time out after the rename request has succeeded. + */ + private final Set renamePathsToTimeoutAfterRename = new HashSet<>(); + /** * Path of save() to fail. */ @@ -125,6 +142,11 @@ public class UnreliableManifestStoreOperations extends ManifestStoreOperations { */ private boolean renameToFailWithException = true; + /** + * How many failures before an operation is passed through. + */ + private final AtomicInteger failureLimit = new AtomicInteger(DEFAULT_FAILURE_LIMIT); + /** * Constructor. * @param wrappedOperations operations to wrap. @@ -133,16 +155,19 @@ public UnreliableManifestStoreOperations(final ManifestStoreOperations wrappedOp this.wrappedOperations = wrappedOperations; } - /** * Reset everything. */ public void reset() { deletePathsToFail.clear(); deletePathsToTimeOut.clear(); + failureLimit.set(DEFAULT_FAILURE_LIMIT); pathNotFound.clear(); renameSourceFilesToFail.clear(); renameDestDirsToFail.clear(); + renamePathsToTimeoutBeforeRename.clear(); + renamePathsToTimeoutAfterRename.clear(); + saveToFail.clear(); timeoutSleepTimeMillis = 0; } @@ -219,6 +244,21 @@ public void addRenameDestDirsFail(Path path) { renameDestDirsToFail.add(requireNonNull(path)); } + /** + * Add a source path to timeout before the rename. + * @param path path to add. + */ + public void addTimeoutBeforeRename(Path path) { + renamePathsToTimeoutBeforeRename.add(requireNonNull(path)); + } + /** + * Add a source path to timeout after the rename. + * @param path path to add. + */ + public void addTimeoutAfterRename(Path path) { + renamePathsToTimeoutAfterRename.add(requireNonNull(path)); + } + /** * Add a path to the list of paths where save will fail. * @param path path to add. @@ -228,7 +268,16 @@ public void addSaveToFail(Path path) { } /** - * Raise an exception if the path is in the set of target paths. + * Set the failure limit. + * @param limit limit + */ + public void setFailureLimit(int limit) { + failureLimit.set(limit); + } + + /** + * Raise an exception if the path is in the set of target paths + * and the failure limit is not exceeded. * @param operation operation which failed. * @param path path to check * @param paths paths to probe for {@code path} being in. @@ -236,20 +285,56 @@ public void addSaveToFail(Path path) { */ private void maybeRaiseIOE(String operation, Path path, Set paths) throws IOException { + if (paths.contains(path) && decrementAndCheckFailureLimit()) { + // hand off to the inner check. + maybeRaiseIOENoFailureLimitCheck(operation, path, paths); + } + } + + /** + * Raise an exception if the path is in the set of target paths. + * No checks on failure count are performed. + * @param operation operation which failed. + * @param path path to check + * @param paths paths to probe for {@code path} being in. + * @throws IOException simulated failure + */ + private void maybeRaiseIOENoFailureLimitCheck(String operation, Path path, Set paths) + throws IOException { if (paths.contains(path)) { LOG.info("Simulating failure of {} with {}", operation, path); throw new PathIOException(path.toString(), - SIMULATED_FAILURE + " of " + operation); + generatedErrorMessage(operation)); } } + /** + * Given an operation, return the error message which is used for the simulated + * {@link PathIOException}. + * @param operation operation name + * @return error text + */ + public static String generatedErrorMessage(final String operation) { + return SIMULATED_FAILURE + " of " + operation; + } + + /** + * Check if the failure limit is exceeded. + * Call this after any other trigger checks, as it decrements the counter. + * + * @return true if the limit is not exceeded. + */ + private boolean decrementAndCheckFailureLimit() { + return failureLimit.decrementAndGet() > 0; + } + /** * Verify that a path is not on the file not found list. * @param path path * @throws FileNotFoundException if configured to fail. */ private void verifyExists(Path path) throws FileNotFoundException { - if (pathNotFound.contains(path)) { + if (pathNotFound.contains(path) && decrementAndCheckFailureLimit()) { throw new FileNotFoundException(path.toString()); } } @@ -260,11 +345,12 @@ private void verifyExists(Path path) throws FileNotFoundException { * @param operation operation which failed. * @param path path to check * @param paths paths to probe for {@code path} being in. - * @throws IOException simulated timeout + * @throws SocketTimeoutException simulated timeout + * @throws InterruptedIOException if the sleep is interrupted. */ private void maybeTimeout(String operation, Path path, Set paths) - throws IOException { - if (paths.contains(path)) { + throws SocketTimeoutException, InterruptedIOException { + if (paths.contains(path) && decrementAndCheckFailureLimit()) { LOG.info("Simulating timeout of {} with {}", operation, path); try { if (timeoutSleepTimeMillis > 0) { @@ -273,14 +359,16 @@ private void maybeTimeout(String operation, Path path, Set paths) } catch (InterruptedException e) { throw new InterruptedIOException(e.toString()); } - throw new PathIOException(path.toString(), - "ErrorCode=" + OPERATION_TIMED_OUT + throw new SocketTimeoutException( + path.toString() + ": " + operation + + " ErrorCode=" + OPERATION_TIMED_OUT + " ErrorMessage=" + E_TIMEOUT); } } @Override public FileStatus getFileStatus(final Path path) throws IOException { + maybeTimeout("getFileStatus()", path, pathNotFound); verifyExists(path); return wrappedOperations.getFileStatus(path); } @@ -304,17 +392,23 @@ public boolean mkdirs(final Path path) throws IOException { public boolean renameFile(final Path source, final Path dest) throws IOException { String op = "rename"; + maybeTimeout(op, source, renamePathsToTimeoutBeforeRename); if (renameToFailWithException) { maybeRaiseIOE(op, source, renameSourceFilesToFail); maybeRaiseIOE(op, dest.getParent(), renameDestDirsToFail); } else { - if (renameSourceFilesToFail.contains(source) - || renameDestDirsToFail.contains(dest.getParent())) { + // logic to determine whether rename should just return false. + if ((renameSourceFilesToFail.contains(source) + || renameDestDirsToFail.contains(dest.getParent()) + && decrementAndCheckFailureLimit())) { LOG.info("Failing rename({}, {})", source, dest); return false; } } - return wrappedOperations.renameFile(source, dest); + final boolean b = wrappedOperations.renameFile(source, dest); + // post rename timeout. + maybeTimeout(op, source, renamePathsToTimeoutAfterRename); + return b; } @Override @@ -358,13 +452,19 @@ public boolean storeSupportsResilientCommit() { @Override public CommitFileResult commitFile(final FileEntry entry) throws IOException { + final String op = "commitFile"; + final Path source = entry.getSourcePath(); + maybeTimeout(op, source, renamePathsToTimeoutBeforeRename); if (renameToFailWithException) { - maybeRaiseIOE("commitFile", - entry.getSourcePath(), renameSourceFilesToFail); - maybeRaiseIOE("commitFile", + maybeRaiseIOE(op, + source, renameSourceFilesToFail); + maybeRaiseIOE(op, entry.getDestPath().getParent(), renameDestDirsToFail); } - return wrappedOperations.commitFile(entry); + final CommitFileResult result = wrappedOperations.commitFile(entry); + // post rename timeout. + maybeTimeout(op, source, renamePathsToTimeoutAfterRename); + return result; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/log4j.properties b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/log4j.properties index 81a3f6ad5d248..ba3ce740caf05 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/log4j.properties +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/log4j.properties @@ -17,3 +17,5 @@ log4j.threshold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n + +log4j.logger.org.apache.hadoop.mapreduce.lib.output.committer.manifest=DEBUG \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index f7f562e15824a..e229c799846a8 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -45,6 +45,8 @@ 7200 10 1000 + + 100 @@ -400,7 +402,8 @@ ${fs.azure.scale.test.timeout} ${fs.azure.scale.test.list.performance.threads} ${fs.azure.scale.test.list.performance.files} - + + ${http.maxConnections} **/azure/Test*.java **/azure/**/Test*.java @@ -431,6 +434,8 @@ ${fs.azure.scale.test.timeout} ${fs.azure.scale.test.list.performance.threads} ${fs.azure.scale.test.list.performance.files} + + ${http.maxConnections} **/azure/**/TestRollingWindowAverage*.java @@ -604,6 +609,8 @@ ${fs.azure.scale.test.enabled} ${fs.azure.scale.test.timeout} + + ${http.maxConnections} @@ -792,6 +799,8 @@ ${fs.azure.scale.test.timeout} ${fs.azure.scale.test.list.performance.threads} ${fs.azure.scale.test.list.performance.files} + + ${http.maxConnections} @@ -842,7 +851,8 @@ ${fs.azure.scale.test.timeout} ${fs.azure.scale.test.list.performance.threads} ${fs.azure.scale.test.list.performance.files} - + + ${http.maxConnections} **/ITestWasbAbfsCompatibility.java **/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java @@ -891,6 +901,8 @@ ${fs.azure.scale.test.timeout} ${fs.azure.scale.test.list.performance.threads} ${fs.azure.scale.test.list.performance.files} + + ${http.maxConnections} ${fs.azure.scale.test.timeout} false diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index ea7bf943a73d0..0af485bbe56b1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -159,7 +159,7 @@ public final class FileSystemConfigurations { /** * IO rate limit. Value: {@value} */ - public static final int RATE_LIMIT_DEFAULT = 10_000; + public static final int RATE_LIMIT_DEFAULT = 1_000; private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java index da2a650489077..92ba8a4024a2c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/AbfsCommitTestHelper.java @@ -23,6 +23,7 @@ import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS; /** @@ -51,9 +52,10 @@ static Configuration prepareTestConfiguration( final String size = Integer.toString(192); conf.setIfUnset(ManifestCommitterConstants.OPT_IO_PROCESSORS, size); conf.setIfUnset(ManifestCommitterConstants.OPT_WRITER_QUEUE_CAPACITY, size); - // no need for parallel delete here as we aren't at the scale where unified delete - // is going to time out - conf.setBooleanIfUnset(ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE, false); + // enable parallel delete but ask for base deletion first, + // which is now our recommended azure option + conf.setBoolean(ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE, true); + conf.setBoolean(OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST, true); return conf; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java index 4b21b838decc5..820938b2d68ef 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/commit/ITestAbfsTerasort.java @@ -39,6 +39,7 @@ import org.apache.hadoop.examples.terasort.TeraSortConfigKeys; import org.apache.hadoop.examples.terasort.TeraValidate; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; import org.apache.hadoop.mapred.JobConf; @@ -52,6 +53,9 @@ import static java.util.Optional.empty; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.assertNoFailureStatistics; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.loadSuccessFile; import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.validateSuccessFile; @@ -95,6 +99,11 @@ public class ITestAbfsTerasort extends AbstractAbfsClusterITest { protected static final IOStatisticsSnapshot JOB_IOSTATS = snapshotIOStatistics(); + /** + * Map of stage -> success file. + */ + private static final Map SUCCESS_FILES = new HashMap<>(); + /** Base path for all the terasort input and output paths. */ private Path terasortPath; @@ -188,9 +197,10 @@ private static void requireStage(final String stage) { * @param tool tool to run. * @param args args for the tool. * @param minimumFileCount minimum number of files to have been created + * @return the job success file. * @throws Exception any failure */ - private void executeStage( + private ManifestSuccessData executeStage( final String stage, final JobConf jobConf, final Path dest, @@ -213,9 +223,20 @@ private void executeStage( + " failed", 0, result); final ManifestSuccessData successFile = validateSuccessFile(getFileSystem(), dest, minimumFileCount, ""); - JOB_IOSTATS.aggregate(successFile.getIOStatistics()); - + final IOStatistics iostats = successFile.getIOStatistics(); + JOB_IOSTATS.aggregate(iostats); + SUCCESS_FILES.put(stage, successFile); completedStage(stage, d); + + // now assert there were no failures recorded in the IO statistics + // for critical functions. + // these include collected statistics from manifest save + // operations. + assertNoFailureStatistics(iostats, + stage, + OP_SAVE_TASK_MANIFEST, + OP_RENAME_FILE); + return successFile; } /** @@ -319,6 +340,7 @@ public void test_140_teracomplete() throws Throwable { File resultsFile = File.createTempFile("results", ".csv"); FileUtils.write(resultsFile, text, StandardCharsets.UTF_8); LOG.info("Results are in {}\n{}", resultsFile, text); + LOG.info("Report directory {}", getReportDir()); } /**