Skip to content

Commit 3e5e1e6

Browse files
committed
MAPREDUCE-7474. review feedback and test improvements
Simulating more failure conditions. Still more to explore there, in particular "what if delete of rename target fails" Change-Id: Idb84f9c17a195702e6a2345b095f41e72865dd5b
1 parent 9193085 commit 3e5e1e6

File tree

11 files changed

+195
-59
lines changed

11 files changed

+195
-59
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public final class ManifestCommitterConstants {
146146
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true;
147147

148148
/**
149-
* Should parallel cleanup try to delete teh base first?
149+
* Should parallel cleanup try to delete the base first?
150150
* Best for azure as it skips the task attempt deletions unless
151151
* the toplevel delete fails.
152152
* Value: {@value}.
@@ -157,7 +157,7 @@ public final class ManifestCommitterConstants {
157157
/**
158158
* Default value of option {@link #OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST}: {@value}.
159159
*/
160-
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT = true;
160+
public static final boolean OPT_CLEANUP_PARALLEL_DELETE_BASE_FIRST_DEFAULT = false;
161161

162162
/**
163163
* Threads to use for IO.

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperations.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,15 @@ public boolean deleteFile(Path path)
113113
}
114114

115115
/**
116-
* Acquire the delete capacity then call {@code FileSystem#delete(Path, true)}
117-
* or equivalent.
116+
* Call {@code FileSystem#delete(Path, true)} or equivalent.
118117
* <p>
119118
* If it returns without an error: there is nothing at
120119
* the end of the path.
121120
* @param path path
122121
* @return outcome
123122
* @throws IOException failure.
124123
*/
125-
public boolean rmdir(Path path)
124+
public boolean deleteRecursive(Path path)
126125
throws IOException {
127126
return delete(path, true);
128127
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/ManifestStoreOperationsThroughFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public boolean delete(Path path, boolean recursive)
109109
}
110110

111111
@Override
112-
public boolean rmdir(final Path path) throws IOException {
112+
public boolean deleteRecursive(final Path path) throws IOException {
113113
return fileSystem.delete(path, true);
114114
}
115115

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbortTaskStage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ protected Path executeStage(final Boolean suppressExceptions)
5757
if (dir != null) {
5858
LOG.info("{}: Deleting task attempt directory {}", getName(), dir);
5959
if (suppressExceptions) {
60-
deleteDirSuppressingExceptions(dir, OP_DELETE_DIR);
60+
deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR);
6161
} else {
62-
deleteDir(dir, OP_DELETE_DIR);
62+
deleteRecursive(dir, OP_DELETE_DIR);
6363
}
6464
}
6565
return dir;

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ protected Boolean delete(
451451
final String statistic)
452452
throws IOException {
453453
if (recursive) {
454-
return deleteDir(path, statistic);
454+
return deleteRecursive(path, statistic);
455455
} else {
456456
return deleteFile(path, statistic);
457457
}
@@ -630,8 +630,15 @@ protected final <T extends AbstractManifestData> void save(T manifestData,
630630
LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
631631
getName(), tempPath, finalPath, retryCount);
632632

633+
// save the temp file, overwriting any which remains from an earlier attempt
633634
trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
634635
operations.save(manifestData, tempPath, true));
636+
637+
// delete the destination in case it exists either from a failed previous
638+
// attempt or from a concurrent task commit.
639+
deleteFile(finalPath, OP_DELETE);
640+
641+
// rename temp to final
635642
renameFile(tempPath, finalPath);
636643
// success flag is only set after the rename.
637644
success = true;
@@ -650,6 +657,7 @@ protected final <T extends AbstractManifestData> void save(T manifestData,
650657
LOG.debug("Failure in retry policy", ex);
651658
retryAction = RetryPolicy.RetryAction.FAIL;
652659
}
660+
LOG.debug("{}: Retry action: {}", getName(), retryAction.action);
653661
if (retryAction.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
654662
// too many failures: escalate.
655663
throw e;
@@ -679,16 +687,17 @@ public String getEtag(FileStatus status) {
679687
}
680688

681689
/**
682-
* Rename a file from source to dest; if the underlying FS API call
683-
* returned false that's escalated to an IOE.
690+
* Rename a file from source to dest.
691+
* <p>
692+
* The destination is always deleted through a call to
693+
* {@link #maybeDeleteDest(boolean, Path)}.
684694
* @param source source file.
685695
* @param dest dest file
686696
* @throws IOException failure
687697
* @throws PathIOException if the rename() call returned false.
688698
*/
689699
protected final void renameFile(final Path source, final Path dest)
690700
throws IOException {
691-
maybeDeleteDest(true, dest);
692701
executeRenamingOperation("renameFile", source, dest,
693702
OP_RENAME_FILE, () ->
694703
operations.renameFile(source, dest));
@@ -770,7 +779,7 @@ private void maybeDeleteDest(final boolean deleteDest, final Path dest) throws I
770779
final FileStatus st = getFileStatusOrNull(dest);
771780
if (st != null) {
772781
if (st.isDirectory()) {
773-
deleteDir(dest, OP_DELETE_DIR);
782+
deleteRecursive(dest, OP_DELETE_DIR);
774783
} else {
775784
deleteFile(dest, OP_DELETE);
776785
}
@@ -988,32 +997,32 @@ protected final TaskPool.Submitter getIOProcessors(int size) {
988997
}
989998

990999
/**
991-
* Delete a directory.
1000+
* Delete a directory (or a file)
9921001
* @param dir directory.
9931002
* @param statistic statistic to use
9941003
* @return true if the path is no longer present.
9951004
* @throws IOException exceptions raised in delete if not suppressed.
9961005
*/
997-
protected boolean deleteDir(
1006+
protected boolean deleteRecursive(
9981007
final Path dir,
9991008
final String statistic)
10001009
throws IOException {
10011010
return trackDuration(getIOStatistics(), statistic, () ->
1002-
operations.rmdir(dir));
1011+
operations.deleteRecursive(dir));
10031012
}
10041013

10051014
/**
1006-
* Delete a directory, suprressing exceptions.
1015+
* Delete a directory or file, catching exceptions.
10071016
* @param dir directory.
10081017
* @param statistic statistic to use
10091018
* @return any exception caught.
10101019
*/
1011-
protected IOException deleteDirSuppressingExceptions(
1020+
protected IOException deleteRecursiveSuppressingExceptions(
10121021
final Path dir,
10131022
final String statistic)
10141023
throws IOException {
10151024
try {
1016-
deleteDir(dir, statistic);
1025+
deleteRecursive(dir, statistic);
10171026
return null;
10181027
} catch (IOException ex) {
10191028
LOG.info("Error deleting {}: {}", dir, ex.toString());

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ protected Result executeStage(
174174
"Initial delete of %s", baseDir)) {
175175
exception = deleteOneDir(baseDir);
176176
if (exception == null) {
177-
// success: record this as the outcome, which
178-
// will skip the parallel delete.
177+
// success: record this as the outcome,
179178
outcome = Outcome.DELETED;
179+
// and will skip the parallel delete
180180
baseDirDeleted = true;
181181
} else {
182182
// failure: log and continue
@@ -276,7 +276,7 @@ private void rmTaskAttemptDir(FileStatus status) throws IOException {
276276
}
277277

278278
/**
279-
* Delete a directory.
279+
* Delete a directory suppressing exceptions.
280280
* The {@link #deleteFailureCount} counter.
281281
* is incremented on every failure.
282282
* @param dir directory
@@ -288,7 +288,7 @@ private IOException deleteOneDir(final Path dir)
288288

289289
deleteDirCount.incrementAndGet();
290290
return noteAnyDeleteFailure(
291-
deleteDirSuppressingExceptions(dir, OP_DELETE_DIR));
291+
deleteRecursiveSuppressingExceptions(dir, OP_DELETE_DIR));
292292
}
293293

294294
/**

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitTaskStage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,9 @@ public TaskManifest getTaskManifest() {
111111
return taskManifest;
112112
}
113113

114+
@Override
115+
public String toString() {
116+
return "Result{path=" + path + '}';
117+
}
114118
}
115119
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ Here are the main configuration options of the committer.
190190
| `mapreduce.manifest.committer.io.threads` | Thread count for parallel operations | `64` |
191191
| `mapreduce.manifest.committer.summary.report.directory` | directory to save reports. | `""` |
192192
| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete temporary directories in parallel | `true` |
193-
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `true` |
193+
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` |
194194
| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory| `false` |
195195
| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` |
196196
| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | Create a `_SUCCESS` marker file on successful completion. (and delete any existing one in job setup) | `true` |
@@ -423,7 +423,7 @@ may surface in cloud storage.
423423
| `mapreduce.fileoutputcommitter.cleanup.skipped` | Skip cleanup of `_temporary` directory| `false` |
424424
| `mapreduce.fileoutputcommitter.cleanup-failures.ignored` | Ignore errors during cleanup | `false` |
425425
| `mapreduce.manifest.committer.cleanup.parallel.delete` | Delete task attempt directories in parallel | `true` |
426-
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `true` |
426+
| `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` | Attempt to delete the base directory before parallel task attempts | `false` |
427427

428428
The algorithm is:
429429

@@ -444,7 +444,15 @@ if caught-exception and not "mapreduce.fileoutputcommitter.cleanup-failures.igno
444444
It's a bit complicated, but the goal is to perform a fast/scalable delete and
445445
throw a meaningful exception if that didn't work.
446446

447-
For ABFS the default settings should normally be left alone.
447+
For ABFS set `mapreduce.manifest.committer.cleanup.parallel.delete.base.first` to `true`
448+
which should normally result in less network IO and a faster cleanup.
449+
450+
```
451+
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true
452+
```
453+
454+
455+
448456

449457
For GCS, setting `mapreduce.manifest.committer.cleanup.parallel.delete.base.first`
450458
to `false` may speed up cleanup.
@@ -490,9 +498,15 @@ The core set of Azure-optimized options becomes
490498
</property>
491499

492500
<property>
493-
<name>spark.hadoop.fs.azure.io.rate.limit</name>
501+
<name>fs.azure.io.rate.limit</name>
494502
<value>1000</value>
495503
</property>
504+
505+
<property>
506+
<name>mapreduce.manifest.committer.cleanup.parallel.delete.base.first</name>
507+
<value>true</value>
508+
</property>
509+
496510
```
497511

498512
And optional settings for debugging/performance analysis
@@ -510,6 +524,7 @@ And optional settings for debugging/performance analysis
510524
```
511525
spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
512526
spark.hadoop.fs.azure.io.rate.limit 1000
527+
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true
513528
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
514529
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
515530

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractManifestCommitterTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,12 @@ public abstract class AbstractManifestCommitterTest
167167

168168
private static final int MAX_LEN = 64_000;
169169

170+
/**
171+
* How many attempts to save manifests before giving up.
172+
* Kept small to reduce sleep times and network delays.
173+
*/
174+
public static final int SAVE_ATTEMPTS = 3;
175+
170176
/**
171177
* Submitter for tasks; may be null.
172178
*/
@@ -799,6 +805,7 @@ protected StageConfig createStageConfig(
799805
.withJobAttemptNumber(jobAttemptNumber)
800806
.withJobDirectories(attemptDirs)
801807
.withName(String.format(NAME_FORMAT_JOB_ATTEMPT, jobId))
808+
.withManifestSaveAttempts(SAVE_ATTEMPTS)
802809
.withOperations(getStoreOperations())
803810
.withProgressable(getProgressCounter())
804811
.withSuccessMarkerFileLimit(100_000)

0 commit comments

Comments
 (0)