Skip to content

Commit

Permalink
TEZ-4110: Make Tez fail fast when DFS quota is exceeded. (#313)
Browse files Browse the repository at this point in the history
* TEZ-4110: Make Tez fail fast when DFS quota is exceeded.

* Add Test.

* Fix Typo.
  • Loading branch information
ayushtkn authored Oct 16, 2023
1 parent 5bba1ff commit 4bc87e2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.exception.ExceptionUtils;
Expand Down Expand Up @@ -90,7 +91,7 @@ public class TezTaskRunner2 {
// TaskRunnerCallable, a failure to heartbeat, or a signalFatalError on the context.
private volatile Throwable firstException;
private volatile EventMetaData exceptionSourceInfo;
private volatile TaskFailureType firstTaskFailureType;
volatile TaskFailureType firstTaskFailureType;
private final AtomicBoolean errorReporterToAm = new AtomicBoolean(false);

private volatile boolean oobSignalErrorInProgress = false;
Expand Down Expand Up @@ -204,7 +205,7 @@ public TaskRunner2Result run() {
synchronized (this) {
if (isRunningState()) {
trySettingEndReason(EndReason.TASK_ERROR);
registerFirstException(TaskFailureType.NON_FATAL, e, null);
registerFirstException(getTaskFailureType(e), e, null);
LOG.warn("Exception from RunnerCallable", e);
}
}
Expand Down Expand Up @@ -305,7 +306,7 @@ void processCallableResult(TaskRunner2CallableResult executionResult) {
if (isRunningState()) {
if (executionResult.error != null) {
trySettingEndReason(EndReason.TASK_ERROR);
registerFirstException(TaskFailureType.NON_FATAL, executionResult.error, null);
registerFirstException(getTaskFailureType(executionResult.error), executionResult.error, null);
} else {
trySettingEndReason(EndReason.SUCCESS);
taskComplete.set(true);
Expand Down Expand Up @@ -584,4 +585,13 @@ private void logAborting(String abortReason) {
LOG.info("Attempting to abort {} due to an invocation of {}", task.getTaskAttemptID(),
abortReason);
}

private TaskFailureType getTaskFailureType(Throwable e) {
boolean hasClusterStorageCapacityExceededException =
ExceptionUtils.indexOfType(e, ClusterStorageCapacityExceededException.class) != -1;
if (hasClusterStorageCapacityExceededException) {
return TaskFailureType.FATAL;
}
return TaskFailureType.NON_FATAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.fs.ClusterStorageCapacityExceededException;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.tez.common.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -77,6 +80,7 @@
import org.apache.tez.runtime.common.resources.ScalingAllocator;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor;
import org.apache.tez.runtime.task.TaskRunner2Callable.TaskRunner2CallableResult;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -653,6 +657,40 @@ public void testKilledAfterComplete() throws IOException, InterruptedException,
}
}

@Test
public void testClusterStorageCapacityFatalError() throws IOException {
// Try having a ClusterStorageCapacityExceededException, which is nested within several exceptions.
TezTaskRunner2ForTest taskRunner = createTaskRunnerForTest();
TaskRunner2CallableResult executionResult = new TaskRunner2CallableResult(new Exception(
new IllegalArgumentException(new ClusterStorageCapacityExceededException("cluster capacity blown"))));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType());

// Try having a child class of ClusterStorageCapacityExceededException, which is nested within several exceptions.
taskRunner = createTaskRunnerForTest();
executionResult = new TaskRunner2CallableResult(
new Exception(new IllegalArgumentException(new NSQuotaExceededException("Namespace quota blown"))));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType());

// Try having a ClusterStorageCapacityExceededException as the first exception (non-nested)
taskRunner = createTaskRunnerForTest();
executionResult =
new TaskRunner2CallableResult(new ClusterStorageCapacityExceededException("cluster capacity blown"));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.FATAL, taskRunner.getFirstTaskFailureType());

// Try having some other exception, for that it should be NON_FATAL
taskRunner = createTaskRunnerForTest();
executionResult = new TaskRunner2CallableResult(new Exception(new IllegalArgumentException("Generic Exception")));
taskRunner.processCallableResult(executionResult);

assertEquals(TaskFailureType.NON_FATAL, taskRunner.getFirstTaskFailureType());
}

private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) {

Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) ||
Expand Down Expand Up @@ -747,6 +785,11 @@ private TezTaskRunner2 createTaskRunner(ApplicationId appId,
processorConf, false, updateSysCounters);
}

private TezTaskRunner2ForTest createTaskRunnerForTest() throws IOException {
return (TezTaskRunner2ForTest) createTaskRunner(ApplicationId.newInstance(10000, 1), null, null, null,
TestProcessor.class.getName(), TestProcessor.CONF_EMPTY, true, false);
}

private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
Expand Down Expand Up @@ -827,6 +870,9 @@ executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim(),
sharedExecutor);
}

public TaskFailureType getFirstTaskFailureType() {
return firstTaskFailureType;
}

@Override
@VisibleForTesting
Expand Down

0 comments on commit 4bc87e2

Please sign in to comment.