diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index 90eeeb569b..2f1be9c000 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.ClusterStorageCapacityExceededException; import org.apache.tez.common.Preconditions; import com.google.common.collect.Multimap; import org.apache.commons.lang.exception.ExceptionUtils; @@ -90,7 +91,7 @@ public class TezTaskRunner2 { // TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the context. private volatile Throwable firstException; private volatile EventMetaData exceptionSourceInfo; - private volatile TaskFailureType firstTaskFailureType; + volatile TaskFailureType firstTaskFailureType; private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false); private volatile boolean oobSignalErrorInProgress = false; @@ -204,7 +205,7 @@ public TaskRunner2Result run() { synchronized (this) { if (isRunningState()) { trySettingEndReason(EndReason.TASK_ERROR); - registerFirstException(TaskFailureType.NON_FATAL, e, null); + registerFirstException(getTaskFailureType(e), e, null); LOG.warn("Exception from RunnerCallable", e); } } @@ -305,7 +306,7 @@ void processCallableResult(TaskRunner2CallableResult executionResult) { if (isRunningState()) { if (executionResult.error != null) { trySettingEndReason(EndReason.TASK_ERROR); - registerFirstException(TaskFailureType.NON_FATAL, executionResult.error, null); + registerFirstException(getTaskFailureType(executionResult.error), executionResult.error, null); } else { trySettingEndReason(EndReason.SUCCESS); taskComplete.set(true); @@ -584,4 +585,13 @@ private void logAborting(String abortReason) { LOG.info("Attempting to abort {} due to an invocation of {}", task.getTaskAttemptID(), abortReason); } + + private TaskFailureType getTaskFailureType(Throwable e) { + boolean hasClusterStorageCapacityExceededException = + ExceptionUtils.indexOfType(e, ClusterStorageCapacityExceededException.class) != -1; + if (hasClusterStorageCapacityExceededException) { + return TaskFailureType.FATAL; + } + return TaskFailureType.NON_FATAL; + } } \ No newline at end of file diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java index 810c3ac82b..aeaec53124 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java @@ -38,6 +38,9 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.fs.ClusterStorageCapacityExceededException; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.tez.common.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -77,6 +80,7 @@ import org.apache.tez.runtime.common.resources.ScalingAllocator; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor; +import org.apache.tez.runtime.task.TaskRunner2Callable.TaskRunner2CallableResult; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -653,6 +657,40 @@ public void testKilledAfterComplete() throws IOException, InterruptedException, } } + @Test + public void testClusterStorageCapacityFatalError() throws IOException { + // Try having a ClusterStorageCapacityExceededException, which is nested within several exceptions. + TezTaskRunner2ForTest taskRunner = createTaskRunnerForTest(); + TaskRunner2CallableResult executionResult = new TaskRunner2CallableResult(new Exception( + new IllegalArgumentException(new ClusterStorageCapacityExceededException("cluster capacity blown")))); + taskRunner.processCallableResult(executionResult); + + assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType()); + + // Try having a child class of ClusterStorageCapacityExceededException, which is nested within several exceptions. + taskRunner = createTaskRunnerForTest(); + executionResult = new TaskRunner2CallableResult( + new Exception(new IllegalArgumentException(new NSQuotaExceededException("Namespace quota blown")))); + taskRunner.processCallableResult(executionResult); + + assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType()); + + // Try having a ClusterStorageCapacityExceededException as the first exception (non-nested) + taskRunner = createTaskRunnerForTest(); + executionResult = + new TaskRunner2CallableResult(new ClusterStorageCapacityExceededException("cluster capacity blown")); + taskRunner.processCallableResult(executionResult); + + assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType()); + + // Try having some other exception, for that it should be NON_FATAL + taskRunner = createTaskRunnerForTest(); + executionResult = new TaskRunner2CallableResult(new Exception(new IllegalArgumentException("Generic Exception"))); + taskRunner.processCallableResult(executionResult); + + assertEquals(TaskFailureType.NON_FATAL, taskRunner.getFirstTaskFailureType()); + } + private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) { Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) || @@ -747,6 +785,11 @@ private TezTaskRunner2 createTaskRunner(ApplicationId appId, processorConf, false, updateSysCounters); } + private TezTaskRunner2ForTest createTaskRunnerForTest() throws IOException { + return (TezTaskRunner2ForTest) createTaskRunner(ApplicationId.newInstance(10000, 1), null, null, null, + TestProcessor.class.getName(), TestProcessor.CONF_EMPTY, true, false); + } + private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId, TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical, TaskReporter taskReporter, @@ -827,6 +870,9 @@ executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim(), sharedExecutor); } + public TaskFailureType getFirstTaskFailureType() { + return firstTaskFailureType; + } @Override @VisibleForTesting