Skip to content

Commit 906ae51

Browse files
committed
HADOOP-18402. S3A committer NPE in spark job abort (#4735)
JobID.toString() and TaskID.toString() to only be called when the IDs are not null. This doesn't surface in MapReduce, but Spark SQL can trigger in job abort, where it may invok abortJob() with an incomplete TaskContext. This patch MUST be applied to branches containing HADOOP-17833. "Improve Magic Committer Performance." Contributed by Steve Loughran.
1 parent eee59a8 commit 906ae51

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/AuditContextUpdater.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hadoop.fs.audit.CommonAuditContext;
2323
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
2424
import org.apache.hadoop.mapreduce.JobContext;
25+
import org.apache.hadoop.mapreduce.JobID;
2526
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2627
import org.apache.hadoop.mapreduce.TaskAttemptID;
2728

@@ -49,12 +50,17 @@ public final class AuditContextUpdater {
4950
* @param jobContext job/task context.
5051
*/
5152
public AuditContextUpdater(final JobContext jobContext) {
52-
this.jobId = jobContext.getJobID().toString();
53+
JobID contextJobID = jobContext.getJobID();
54+
this.jobId = contextJobID != null
55+
? contextJobID.toString()
56+
: null;
5357

5458
if (jobContext instanceof TaskAttemptContext) {
5559
// it's a task, extract info for auditing
5660
final TaskAttemptID tid = ((TaskAttemptContext) jobContext).getTaskAttemptID();
57-
this.taskAttemptId = tid.toString();
61+
this.taskAttemptId = tid != null
62+
? tid.toString()
63+
: null;
5864
} else {
5965
this.taskAttemptId = null;
6066
}
@@ -70,7 +76,11 @@ public AuditContextUpdater(String jobId) {
7076
*/
7177
public void updateCurrentAuditContext() {
7278
final CommonAuditContext auditCtx = currentAuditContext();
73-
auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
79+
if (jobId != null) {
80+
auditCtx.put(AuditConstants.PARAM_JOB_ID, jobId);
81+
} else {
82+
currentAuditContext().remove(AuditConstants.PARAM_JOB_ID);
83+
}
7484
if (taskAttemptId != null) {
7585
auditCtx.put(AuditConstants.PARAM_TASK_ATTEMPT_ID, taskAttemptId);
7686
} else {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
4141
import org.apache.hadoop.io.IOUtils;
4242
import org.apache.hadoop.mapreduce.JobContext;
43+
import org.apache.hadoop.mapreduce.JobID;
4344
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
4445
import org.apache.hadoop.util.JsonSerialization;
4546
import org.apache.hadoop.util.Preconditions;
@@ -156,7 +157,12 @@ public CommitContext(
156157
this.commitOperations = commitOperations;
157158
this.jobContext = jobContext;
158159
this.conf = jobContext.getConfiguration();
159-
this.jobId = jobContext.getJobID().toString();
160+
JobID contextJobID = jobContext.getJobID();
161+
// either the job ID or make one up as it will be
162+
// used for the filename of any reports.
163+
this.jobId = contextJobID != null
164+
? contextJobID.toString()
165+
: ("job-without-id-at-" + System.currentTimeMillis());
160166
this.collectIOStatistics = conf.getBoolean(
161167
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
162168
S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);

0 commit comments

Comments
 (0)