Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-3821: Ability to fail fast tasks that write too much to local disk. #314

Merged
merged 4 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2304,4 +2304,12 @@ static Set<String> getPropertySet() {
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";

/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty(type = "long")
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime;

import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
Expand All @@ -26,6 +27,8 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
Expand All @@ -35,6 +38,11 @@
import org.apache.tez.runtime.metrics.TaskCounterUpdater;

import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT;

public abstract class RuntimeTask {

Expand All @@ -54,6 +62,9 @@ public abstract class RuntimeTask {
private final TaskStatistics statistics;
private final AtomicBoolean progressNotified = new AtomicBoolean(false);

private final long lfsBytesWriteLimit;
private static final Logger LOG = LoggerFactory.getLogger(RuntimeTask.class);

protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
TezUmbilical tezUmbilical, String pid, boolean setupSysCounterUpdater) {
this.taskSpec = taskSpec;
Expand All @@ -71,6 +82,8 @@ protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
} else {
this.counterUpdater = null;
}
this.lfsBytesWriteLimit =
tezConf.getLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT);
}

protected enum State {
Expand Down Expand Up @@ -182,4 +195,36 @@ protected void setTaskDone() {
protected final boolean isUpdatingSystemCounters() {
return counterUpdater != null;
}

/**
* Check whether the task has exceeded any configured limits.
*
* @throws TaskLimitException in case the limit is exceeded.
*/
public void checkTaskLimits() throws TaskLimitException {
// check the limit for writing to local file system
if (lfsBytesWriteLimit >= 0) {
Long lfsBytesWritten = null;
try {
LocalFileSystem localFS = FileSystem.getLocal(tezConf);
lfsBytesWritten = FileSystem.getGlobalStorageStatistics().get(localFS.getScheme()).getLong("bytesWritten");
} catch (IOException e) {
LOG.warn("Could not get LocalFileSystem bytesWritten counter");
}
if (lfsBytesWritten != null && lfsBytesWritten > lfsBytesWriteLimit) {
throw new TaskLimitException(
"Too much write to local file system." + " current value is " + lfsBytesWritten + " the limit is "
+ lfsBytesWriteLimit);
}
}
}

/**
* Exception thrown when the task exceeds some configured limits.
*/
public static class TaskLimitException extends IOException {
public TaskLimitException(String str) {
super(str);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.RuntimeTask.TaskLimitException;
import org.apache.tez.runtime.api.*;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
Expand Down Expand Up @@ -262,6 +263,13 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t
sendCounters = true;
prevCounterSendHeartbeatNum = nonOobHeartbeatCounter.get();
}
try {
task.checkTaskLimits();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in heartbeat can significantly increase its runtime. As in, even small increase of few milliseconds can have impact on cluster usage. You may want to run this every 10 seconds here or in some other way. This should ideally handle sorter spills and merges, in which case it can handle another ticket which was created on similar lines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanx Rajesh, there was some logic withHEAP_MEMORY_USAGE_UPDATE_INTERVAL, I used similar logic here to make sure the check happens at every 10 sec

} catch (TaskLimitException tle) {
LOG.error("Task limit exceeded", tle);
askedToDie.set(true);
return new ResponseWrapper(true, 1);
}
updateEvent = new TezEvent(getStatusUpdateEvent(sendCounters), updateEventMetadata);
events.add(updateEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tez.runtime.task;

import static org.apache.tez.dag.api.TezConfiguration.TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand All @@ -27,7 +28,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -36,12 +41,21 @@

import com.google.common.collect.Lists;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.RuntimeTask.TaskLimitException;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
Expand All @@ -55,6 +69,9 @@
@SuppressWarnings("rawtypes")
public class TestTaskReporter {

private static final File TEST_DIR =
new File(System.getProperty("test.build.data"), TestTaskReporter.class.getName()).getAbsoluteFile();

@Test(timeout = 10000)
public void testContinuousHeartbeatsOnMaxEvents() throws Exception {

Expand Down Expand Up @@ -218,6 +235,38 @@ public void testStatusUpdateAfterInitializationAndCounterFlag() {

}

@Test
public void testLocalFileSystemBytesWrittenLimit() throws IOException {
TaskSpec mockSpec = mock(TaskSpec.class);
when(mockSpec.getInputs()).thenReturn(Collections.singletonList(mock(InputSpec.class)));
when(mockSpec.getOutputs()).thenReturn(Collections.singletonList(mock(OutputSpec.class)));
TezConfiguration tezConf = new TezConfiguration();
LogicalIOProcessorRuntimeTask lio1 =
new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null,
Runtime.getRuntime().maxMemory(), true, null, null);

LocalFileSystem localFS = FileSystem.getLocal(tezConf);
FileSystem.clearStatistics();
Path tmpPath =
new Path(TEST_DIR + "/testLocalFileSystemBytesWrittenLimit" + new Random(System.currentTimeMillis()).nextInt());
try (FSDataOutputStream out = localFS.create(tmpPath, true)) {
out.write(new byte[1024]);
}
// Check limits with default shouldn't throw exception.
lio1.checkTaskLimits();

tezConf.setLong(TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES, 10);
lio1 = new LogicalIOProcessorRuntimeTask(mockSpec, 0, tezConf, null, null, null, null, null, null, "", null,
Runtime.getRuntime().maxMemory(), true, null, null);

try {
lio1.checkTaskLimits();
Assert.fail("Expected to throw TaskLimitException");
} catch (TaskLimitException taskLimitException) {
Assert.assertTrue(taskLimitException.getMessage().contains("Too much write to local file system"));
}
}

private List<TezEvent> createEvents(int numEvents) {
List<TezEvent> list = Lists.newArrayListWithCapacity(numEvents);
for (int i = 0; i < numEvents; i++) {
Expand Down
Loading