Skip to content

Commit abed2fe

Browse files
committed
MAPREDUCE-7474. Task save failure stats in job stats + more
Statistics Collection and Printing * New statistic task_stage_save_summary_file to distinguish from other saving operations (job success/report file) * After a failure to save a task attempt, the iostats of the manifest are rebuilt so the stats on failures are updated. This will get into the final job _SUCCESS statistics so we can see if anything happened * Make the manifest print command something which can be invoked from the commandline: mapred successfile This is covered in the docs. The failure stats regeneration is nice; works by passing down a lambda-expression of the logic to (re)generate the manifest, and invoking this on every attempt. As this is where the stats are aggregated, it includes details on the previous failing attempts. Directory size for deletion * Optionally pass down directory count under job dir to cleanup stage * This is determined in job commit from aggregate statistics; unknown elsewhere (app abort etc.). * It is currently only logged; it may be possible to support an option of when to skip the initial serial delete, though it will depend on abfs login mechanism. Testing * More fault injection scenarios. * Ability to assert that iostats do not contain specific non-zero stats. This is used in ITestAbfsTerasort to assert no task save or rename failures. The stats before this change imply this did happen in a job commit; no other details, hence the new probe. * Log manifest committer at debug in mapred-core Note: if there's a retry process which means the operation can take minutes, the initial operation will block progress() callbacks so mapreduce jobs will fail. Spark is unaffected Change-Id: Id423267de89c7f31e4b1283f9c433b729ff0d87b
1 parent 3e5e1e6 commit abed2fe

File tree

19 files changed

+564
-115
lines changed

19 files changed

+564
-115
lines changed

hadoop-mapreduce-project/bin/mapred

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ function hadoop_usage
3737
hadoop_add_subcommand "frameworkuploader" admin "mapreduce framework upload"
3838
hadoop_add_subcommand "version" client "print the version"
3939
hadoop_add_subcommand "minicluster" client "CLI MiniCluster"
40+
hadoop_add_subcommand "successfile" client "Print a _SUCCESS manifest from the manifest and S3A committers"
4041
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
4142
}
4243

@@ -102,6 +103,9 @@ function mapredcmd_case
102103
version)
103104
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
104105
;;
106+
successfile)
107+
HADOOP_CLASSNAME=org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
108+
;;
105109
minicluster)
106110
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/timelineservice"'/*'
107111
hadoop_add_classpath "${HADOOP_YARN_HOME}/${YARN_DIR}/test"'/*'

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterStatisticNames.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ public final class ManifestCommitterStatisticNames {
187187
public static final String OP_SAVE_TASK_MANIFEST =
188188
"task_stage_save_task_manifest";
189189

190+
/**
191+
* Save a summary file: {@value}.
192+
*/
193+
public static final String OP_SAVE_SUMMARY_FILE =
194+
"task_stage_save_summary_file";
195+
190196
/**
191197
* Task abort: {@value}.
192198
*/

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/ManifestPrinter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
*/
3737
public class ManifestPrinter extends Configured implements Tool {
3838

39-
private static final String USAGE = "ManifestPrinter <success-file>";
39+
private static final String USAGE = "successfile <success-file>";
4040

4141
/**
4242
* Output for printing.
@@ -88,7 +88,7 @@ public ManifestSuccessData loadAndPrintManifest(FileSystem fs, Path path)
8888
return success;
8989
}
9090

91-
private void printManifest(ManifestSuccessData success) {
91+
public void printManifest(ManifestSuccessData success) {
9292
field("succeeded", success.getSuccess());
9393
field("created", success.getDate());
9494
field("committer", success.getCommitter());

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/InternalConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ private InternalConstants() {
8686
OP_MSYNC,
8787
OP_PREPARE_DIR_ANCESTORS,
8888
OP_RENAME_FILE,
89+
OP_SAVE_SUMMARY_FILE,
8990
OP_SAVE_TASK_MANIFEST,
9091

9192
OBJECT_LIST_REQUEST,

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java

Lines changed: 71 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.time.Duration;
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.function.Supplier;
2628

2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
@@ -62,6 +64,7 @@
6264
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_DELETE_DIR;
6365
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
6466
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC;
67+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_DIR;
6568
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
6669
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST;
6770
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker;
@@ -371,6 +374,7 @@ public final IOStatisticsStore getIOStatistics() {
371374
*/
372375
protected final void progress() {
373376
if (stageConfig.getProgressable() != null) {
377+
LOG.trace("{}: Progressing", getName());
374378
stageConfig.getProgressable().progress();
375379
}
376380
}
@@ -429,7 +433,7 @@ protected final boolean isFile(
429433
* @return status or null
430434
* @throws IOException IO Failure.
431435
*/
432-
protected final boolean delete(
436+
public final boolean delete(
433437
final Path path,
434438
final boolean recursive)
435439
throws IOException {
@@ -445,7 +449,7 @@ protected final boolean delete(
445449
* @return status or null
446450
* @throws IOException IO Failure.
447451
*/
448-
protected Boolean delete(
452+
public Boolean delete(
449453
final Path path,
450454
final boolean recursive,
451455
final String statistic)
@@ -467,7 +471,7 @@ protected Boolean delete(
467471
* @return outcome.
468472
* @throws IOException IO Failure.
469473
*/
470-
protected boolean deleteFile(
474+
public boolean deleteFile(
471475
final Path path,
472476
final String statistic)
473477
throws IOException {
@@ -482,7 +486,7 @@ protected boolean deleteFile(
482486
* @return true if the directory was created/exists.
483487
* @throws IOException IO Failure.
484488
*/
485-
protected final boolean mkdirs(
489+
public final boolean mkdirs(
486490
final Path path,
487491
final boolean escalateFailure)
488492
throws IOException {
@@ -519,7 +523,7 @@ protected final RemoteIterator<FileStatus> listStatusIterator(
519523
* @return the manifest.
520524
* @throws IOException IO Failure.
521525
*/
522-
protected final TaskManifest loadManifest(
526+
public final TaskManifest loadManifest(
523527
final FileStatus status)
524528
throws IOException {
525529
LOG.trace("{}: loadManifest('{}')", getName(), status);
@@ -612,45 +616,84 @@ protected final Path directoryMustExist(
612616
* @param manifestData the manifest/success file
613617
* @param tempPath temp path for the initial save
614618
* @param finalPath final path for rename.
619+
* @return the manifest saved.
615620
* @throws IOException failure to rename after retries.
616621
*/
617622
@SuppressWarnings("unchecked")
618-
protected final <T extends AbstractManifestData> void save(T manifestData,
623+
protected final <T extends AbstractManifestData> T save(
624+
final T manifestData,
619625
final Path tempPath,
620626
final Path finalPath) throws IOException {
627+
return saveManifest(() -> manifestData, tempPath, finalPath, OP_SAVE_TASK_MANIFEST);
628+
}
621629

622-
int retryCount = 0;
630+
/**
631+
* Generate and save a task manifest or summary file.
632+
* This is be done by writing to a temp path and then renaming.
633+
* <p>
634+
* If the destination path exists: Delete it before the rename.
635+
* <p>
636+
* This will retry so that a rename failure from abfs load or IO errors
637+
* such as delete or save failure will not fail the task.
638+
* <p>
639+
* The {@code manifestSource} supplier is invoked to get the manifest data
640+
* on every attempt.
641+
* This permits statistics to be updated, <i>including those of failures</i>.
642+
* @param manifestSource supplier the manifest/success file
643+
* @param tempPath temp path for the initial save
644+
* @param finalPath final path for rename.
645+
* @param statistic statistic to use for timing
646+
* @return the manifest saved.
647+
* @throws IOException failure to save/delete/rename after retries.
648+
*/
649+
@SuppressWarnings("unchecked")
650+
protected final <T extends AbstractManifestData> T saveManifest(
651+
final Supplier<T> manifestSource,
652+
final Path tempPath,
653+
final Path finalPath,
654+
String statistic) throws IOException {
655+
656+
AtomicInteger retryCount = new AtomicInteger(0);
623657
RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
624658
getStageConfig().getManifestSaveAttempts(),
625659
SAVE_SLEEP_INTERVAL,
626660
TimeUnit.MILLISECONDS);
627-
boolean success = false;
628-
while (!success) {
629-
try {
630-
LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
631-
getName(), tempPath, finalPath, retryCount);
632-
633-
// save the temp file, overwriting any which remains from an earlier attempt
634-
trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
635-
operations.save(manifestData, tempPath, true));
636-
637-
// delete the destination in case it exists either from a failed previous
638-
// attempt or from a concurrent task commit.
639-
deleteFile(finalPath, OP_DELETE);
640661

641-
// rename temp to final
642-
renameFile(tempPath, finalPath);
643-
// success flag is only set after the rename.
644-
success = true;
662+
// loop until returning a value or raising an exception
663+
while (true) {
664+
try {
665+
T manifestData = requireNonNull(manifestSource.get());
666+
trackDurationOfInvocation(getIOStatistics(), statistic, () -> {
667+
LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
668+
getName(), tempPath, finalPath, retryCount);
669+
670+
// delete temp path.
671+
// even though this is written with overwrite=true, this extra recursive
672+
// delete also handles a directory being there.
673+
deleteRecursive(tempPath, OP_DELETE);
674+
675+
// save the temp file, overwriting any which remains from an earlier attempt
676+
operations.save(manifestData, tempPath, true);
677+
678+
// delete the destination in case it exists either from a failed previous
679+
// attempt or from a concurrent task commit.
680+
delete(finalPath, true, OP_DELETE);
681+
682+
// rename temp to final
683+
renameFile(tempPath, finalPath);
684+
});
685+
// success: exit and return the final manifest data.
686+
return manifestData;
645687
} catch (IOException e) {
646688
// failure.
647689
// log then decide whether to sleep and retry or give up.
648690
LOG.warn("{}: Failed to save and commit file {} renamed to {}; retry count={}",
649691
getName(), tempPath, finalPath, retryCount, e);
650-
retryCount++;
692+
// increment that count.
693+
retryCount.incrementAndGet();
651694
RetryPolicy.RetryAction retryAction;
652695
try {
653-
retryAction = retryPolicy.shouldRetry(e, retryCount, 0, true);
696+
retryAction = retryPolicy.shouldRetry(e, retryCount.get(), 0, true);
654697
} catch (Exception ex) {
655698
// it's not clear why this probe can raise an exception; it is just
656699
// caught and mapped to a fail.
@@ -716,7 +759,7 @@ protected final void renameDir(final Path source, final Path dest)
716759

717760
maybeDeleteDest(true, dest);
718761
executeRenamingOperation("renameDir", source, dest,
719-
OP_RENAME_FILE, () ->
762+
OP_RENAME_DIR, () ->
720763
operations.renameDir(source, dest)
721764
);
722765
}
@@ -1019,8 +1062,7 @@ protected boolean deleteRecursive(
10191062
*/
10201063
protected IOException deleteRecursiveSuppressingExceptions(
10211064
final Path dir,
1022-
final String statistic)
1023-
throws IOException {
1065+
final String statistic) {
10241066
try {
10251067
deleteRecursive(dir, statistic);
10261068
return null;

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,18 @@ protected Result executeStage(
160160

161161
// to delete.
162162
LOG.info("{}: Deleting job directory {}", getName(), baseDir);
163+
final long directoryCount = args.directoryCount;
164+
if (directoryCount > 0) {
165+
// log the expected directory count, which drives duration in GCS
166+
// and may cause timeouts on azure if the count is too high for a
167+
// timely permissions tree scan.
168+
LOG.info("{}: Expected directory count: {}", getName(), directoryCount);
169+
}
163170

171+
progress();
172+
// check and maybe execute parallel delete of task attempt dirs.
164173
if (args.deleteTaskAttemptDirsInParallel) {
165174

166-
// parallel delete of task attempt dirs.
167175

168176
if (args.parallelDeleteAttemptBaseDeleteFirst) {
169177
// attempt to delete the base dir first.
@@ -180,9 +188,9 @@ protected Result executeStage(
180188
baseDirDeleted = true;
181189
} else {
182190
// failure: log and continue
183-
LOG.warn("{}: Exception on initial attempt at deleting base dir {}\n"
184-
+ "attempting parallel delete",
185-
getName(), baseDir, exception);
191+
LOG.warn("{}: Exception on initial attempt at deleting base dir {} and directory count {}"
192+
+ "\nFalling back to parallel delete",
193+
getName(), baseDir, directoryCount, exception);
186194
}
187195
}
188196
}
@@ -339,27 +347,37 @@ public static final class Arguments {
339347
/** Ignore failures? */
340348
private final boolean suppressExceptions;
341349

350+
/**
351+
* Non-final count of directories.
352+
* Default value, "0", means "unknown".
353+
* This can be dynamically updated during job commit.
354+
*/
355+
private long directoryCount;
356+
342357
/**
343358
* Arguments to the stage.
344359
* @param statisticName stage name to report
345360
* @param enabled is the stage enabled?
346361
* @param deleteTaskAttemptDirsInParallel delete task attempt dirs in
347362
* parallel?
348363
* @param parallelDeleteAttemptBaseDeleteFirst Make an initial attempt to
349-
* delete the base directory in a parallel delete?
364+
* delete the base directory in a parallel delete?
350365
* @param suppressExceptions suppress exceptions?
366+
* @param directoryCount directories under job dir; 0 means unknown.
351367
*/
352368
public Arguments(
353369
final String statisticName,
354370
final boolean enabled,
355371
final boolean deleteTaskAttemptDirsInParallel,
356372
final boolean parallelDeleteAttemptBaseDeleteFirst,
357-
final boolean suppressExceptions) {
373+
final boolean suppressExceptions,
374+
long directoryCount) {
358375
this.statisticName = statisticName;
359376
this.enabled = enabled;
360377
this.deleteTaskAttemptDirsInParallel = deleteTaskAttemptDirsInParallel;
361378
this.suppressExceptions = suppressExceptions;
362379
this.parallelDeleteAttemptBaseDeleteFirst = parallelDeleteAttemptBaseDeleteFirst;
380+
this.directoryCount = directoryCount;
363381
}
364382

365383
public String getStatisticName() {
@@ -382,6 +400,14 @@ public boolean isParallelDeleteAttemptBaseDeleteFirst() {
382400
return parallelDeleteAttemptBaseDeleteFirst;
383401
}
384402

403+
public long getDirectoryCount() {
404+
return directoryCount;
405+
}
406+
407+
public void setDirectoryCount(final long directoryCount) {
408+
this.directoryCount = directoryCount;
409+
}
410+
385411
@Override
386412
public String toString() {
387413
return "Arguments{" +
@@ -401,8 +427,9 @@ public String toString() {
401427
public static final Arguments DISABLED = new Arguments(OP_STAGE_JOB_CLEANUP,
402428
false,
403429
false,
404-
false, false
405-
);
430+
false,
431+
false,
432+
0);
406433

407434
/**
408435
* Build an options argument from a configuration, using the
@@ -430,7 +457,8 @@ public static Arguments cleanupStageOptionsFromConfig(
430457
enabled,
431458
deleteTaskAttemptDirsInParallel,
432459
parallelDeleteAttemptBaseDeleteFirst,
433-
suppressExceptions);
460+
suppressExceptions,
461+
0);
434462
}
435463

436464
/**

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.apache.commons.lang3.StringUtils.isNotBlank;
3838
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
3939
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT;
40+
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_COUNT_MEAN;
4041
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
4142
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS;
4243
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS;
@@ -161,7 +162,12 @@ protected CommitJobStage.Result executeStage(
161162
}
162163

163164
// optional cleanup
164-
new CleanupJobStage(stageConfig).apply(arguments.getCleanupArguments());
165+
final CleanupJobStage.Arguments cleanupArguments = arguments.getCleanupArguments();
166+
// determine the directory count
167+
cleanupArguments.setDirectoryCount(iostats.counters()
168+
.getOrDefault(COMMITTER_TASK_DIRECTORY_COUNT_MEAN, 0L));
169+
170+
new CleanupJobStage(stageConfig).apply(cleanupArguments);
165171

166172
// and then, after everything else: optionally validate.
167173
if (arguments.isValidateOutput()) {

0 commit comments

Comments
 (0)