Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ForkingTaskRunner: Set ActiveProcessorCount for tasks. #12592

Merged
merged 6 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.io.ByteStreams;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.druid.server.metrics.WorkerTaskCountStatsProvider;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand All @@ -75,6 +77,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -105,6 +108,7 @@ public class ForkingTaskRunner
private final StartupLoggingConfig startupLoggingConfig;
private final WorkerConfig workerConfig;

private volatile int numProcessorsPerTask = -1;
private volatile boolean stopping = false;

private static final AtomicLong LAST_REPORTED_FAILED_TASK_COUNT = new AtomicLong();
Expand Down Expand Up @@ -214,6 +218,13 @@ public TaskStatus call()
command.add("-cp");
command.add(taskClasspath);

if (numProcessorsPerTask < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is this really needed? Won't TaskRunner.run() always be called after the lifecycle start method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't be needed if start and run are called in the proper order. It's here as a safety check in case they aren't. (Maybe it would help debug tests that are doing things incorrectly.)

// numProcessorsPerTask is set by start()
throw new ISE("Not started");
}

command.add(StringUtils.format("-XX:ActiveProcessorCount=%d", numProcessorsPerTask));

Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
Iterables.addAll(command, config.getJavaOptsArray());

Expand Down Expand Up @@ -635,7 +646,7 @@ public Optional<ScalingStats> getScalingStats()
@Override
public void start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably needs to be annotated with @LifecycleStart. I wonder if just annotating the interface method TaskRunner.start() would work so that the implementations don't need to worry about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. I think it makes sense to add the annotations in the impls, because not all impls necessarily need lifecycle management. I added it here.

{
// No state setup required
setNumProcessorsPerTask();
}

@Override
Expand Down Expand Up @@ -788,6 +799,20 @@ public Long getWorkerSuccessfulTaskCount()
return successfulTaskCount - lastReportedSuccessfulTaskCount;
}

@VisibleForTesting
void setNumProcessorsPerTask()
{
// Divide number of available processors by the number of tasks.
// This prevents various automatically-sized thread pools from being unreasonably large (we don't want each
// task to size its pools as if it is the only thing on the entire machine).

final int availableProcessors = JvmUtils.getRuntimeInfo().getAvailableProcessors();
numProcessorsPerTask = Math.max(
1,
IntMath.divide(availableProcessors, workerConfig.getCapacity(), RoundingMode.CEILING)
);
}

protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo
}
};

forkingTaskRunner.setNumProcessorsPerTask();
final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals(
Expand Down Expand Up @@ -312,6 +313,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo
}
};

forkingTaskRunner.setNumProcessorsPerTask();
final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.SUCCESS, status.getStatusCode());
Assert.assertNull(status.getErrorMsg());
Expand Down Expand Up @@ -373,6 +375,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo
}
};

forkingTaskRunner.setNumProcessorsPerTask();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we call forkingTaskRunner.start() here instead to avoid exposing the setNumProcessorsPerTask() as a @VisibleForTesting method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but then thought it was best to be as minimal as possible for futureproofing. If we later add more functionality to start(), we wouldn't necessarily want that to execute in these tests.

final TaskStatus status = forkingTaskRunner.run(task).get();
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals("task failure test", status.getErrorMsg());
Expand Down Expand Up @@ -441,6 +444,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo
}
};

forkingTaskRunner.setNumProcessorsPerTask();
forkingTaskRunner.run(task).get();
Assert.assertTrue(xmxJavaOptsArrayIndex.get() > xmxJavaOptsIndex.get());
Assert.assertTrue(xmxJavaOptsIndex.get() >= 0);
Expand Down Expand Up @@ -509,6 +513,7 @@ int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File lo
}
};

forkingTaskRunner.setNumProcessorsPerTask();
ExecutionException e = Assert.assertThrows(ExecutionException.class, () -> forkingTaskRunner.run(task).get());
Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
+ " in context of task: " + task.getId() + " must be an array of strings.")
Expand Down