Skip to content

Commit

Permalink
Favour FutureTask for concurrency support
Browse files Browse the repository at this point in the history
TestNG currently has a custom implementation of 
ThreadPoolExecutor named GraphThreadPoolExecutor.

This class was created to facilitate concurrency
in a DAG.

Now that we are on JDK11, we can very well move
over to leveraging FutureTask based implementations
and thus decouple ourselves from the Executor 
and just focus on orchestrating the next node retrieval
for execution.

Since this is experimental, we are currently providing
a JVM based switch that can fall back to the old 
Behaviour in case of any issues.

JVM argument to use “-Dtestng.favor.custom.thread-pool.executor=true”
  • Loading branch information
krmahadevan committed Dec 18, 2023
1 parent 3cb01b4 commit 34c7ba1
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,24 @@ public final class RuntimeBehavior {
public static final String IGNORE_CALLBACK_INVOCATION_SKIPS = "testng.ignore.callback.skip";
public static final String SYMMETRIC_LISTENER_EXECUTION = "testng.listener.execution.symmetric";
public static final String PREFERENTIAL_LISTENERS = "testng.preferential.listeners.package";
public static final String FAVOR_CUSTOM_THREAD_POOL_EXECUTOR =
"testng.favor.custom.thread-pool.executor";

private RuntimeBehavior() {}

public static boolean ignoreCallbackInvocationSkips() {
return Boolean.getBoolean(IGNORE_CALLBACK_INVOCATION_SKIPS);
}

/**
* @return - <code>true</code> if TestNG is to be using its custom implementation of {@link
* java.util.concurrent.ThreadPoolExecutor} for running concurrent tests. Defaults to <code>
* false</code>
*/
public static boolean favourCustomThreadPoolExecutor() {
return Boolean.getBoolean(FAVOR_CUSTOM_THREAD_POOL_EXECUTOR);
}

/**
* @return - A comma separated list of packages that represent special listeners which users will
* expect to be executed after executing the regular listeners. Here special listeners can be
Expand Down
83 changes: 83 additions & 0 deletions testng-core/src/main/java/org/testng/SuiteTaskExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.testng;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.testng.internal.IConfiguration;
import org.testng.internal.RuntimeBehavior;
import org.testng.internal.Utils;
import org.testng.internal.thread.TestNGThreadFactory;
import org.testng.internal.thread.graph.GraphOrchestrator;
import org.testng.log4testng.Logger;
import org.testng.thread.IExecutorFactory;
import org.testng.thread.ITestNGThreadPoolExecutor;
import org.testng.thread.IThreadWorkerFactory;

class SuiteTaskExecutor {
private final BlockingQueue<Runnable> queue;
private final IDynamicGraph<ISuite> graph;
private final IThreadWorkerFactory<ISuite> factory;
private final IConfiguration configuration;

private final int threadPoolSize;

private ExecutorService service;

private static final Logger LOGGER = Logger.getLogger(SuiteTaskExecutor.class);

public SuiteTaskExecutor(
IConfiguration configuration,
IThreadWorkerFactory<ISuite> factory,
BlockingQueue<Runnable> queue,
IDynamicGraph<ISuite> graph,
int threadPoolSize) {
this.configuration = configuration;
this.factory = factory;
this.queue = queue;
this.graph = graph;
this.threadPoolSize = threadPoolSize;
}

public void execute() {
String name = "suites-";
if (RuntimeBehavior.favourCustomThreadPoolExecutor()) {
IExecutorFactory execFactory = configuration.getExecutorFactory();
ITestNGThreadPoolExecutor executor =
execFactory.newSuiteExecutor(
name,
graph,
factory,
threadPoolSize,
threadPoolSize,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
queue,
null);
executor.run();
service = executor;
} else {
service =
new ThreadPoolExecutor(
threadPoolSize,
threadPoolSize,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
queue,
new TestNGThreadFactory(name));
GraphOrchestrator<ISuite> executor = new GraphOrchestrator<>(service, factory, graph, null);
executor.run();
}
}

public void awaitCompletion() {
Utils.log("TestNG", 2, "Starting executor for all suites");
try {
boolean ignored = service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
service.shutdownNow();
} catch (InterruptedException handled) {
Thread.currentThread().interrupt();
LOGGER.error(handled.getMessage(), handled);
}
}
}
40 changes: 15 additions & 25 deletions testng-core/src/main/java/org/testng/TestNG.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.testng.SuiteRunner.TestListenersContainer;
import org.testng.annotations.ITestAnnotation;
import org.testng.collections.Lists;
Expand Down Expand Up @@ -60,7 +59,6 @@
import org.testng.reporters.XMLReporter;
import org.testng.reporters.jq.Main;
import org.testng.thread.IExecutorFactory;
import org.testng.thread.ITestNGThreadPoolExecutor;
import org.testng.thread.IThreadWorkerFactory;
import org.testng.util.Strings;
import org.testng.xml.IPostProcessor;
Expand Down Expand Up @@ -835,6 +833,8 @@ public void setVerbose(int verbose) {
m_verbose = verbose;
}

/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
@Deprecated
public void setExecutorFactoryClass(String clazzName) {
this.m_executorFactory = createExecutorFactoryInstanceUsing(clazzName);
}
Expand All @@ -853,10 +853,14 @@ private IExecutorFactory createExecutorFactoryInstanceUsing(String clazzName) {
clazzName + " does not implement " + IExecutorFactory.class.getName());
}

/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
@Deprecated
public void setExecutorFactory(IExecutorFactory factory) {
this.m_executorFactory = factory;
}

/** This method stands deprecated as of TestNG <code>v7.9.0</code>. */
@Deprecated
public IExecutorFactory getExecutorFactory() {
if (this.m_executorFactory == null) {
this.m_executorFactory = createExecutorFactoryInstanceUsing(DEFAULT_THREADPOOL_FACTORY);
Expand Down Expand Up @@ -1227,29 +1231,15 @@ public List<ISuite> runSuitesLocally() {
IThreadWorkerFactory<ISuite> factory =
new SuiteWorkerFactory(
suiteRunnerMap, 0 /* verbose hasn't been set yet */, getDefaultSuiteName());
ITestNGThreadPoolExecutor pooledExecutor =
this.getExecutorFactory()
.newSuiteExecutor(
"suites",
suiteGraph,
factory,
m_suiteThreadPoolSize,
m_suiteThreadPoolSize,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
null);

Utils.log("TestNG", 2, "Starting executor for all suites");
// Run all suites in parallel
pooledExecutor.run();
try {
pooledExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
pooledExecutor.shutdownNow();
} catch (InterruptedException handled) {
Thread.currentThread().interrupt();
error("Error waiting for concurrent executors to finish " + handled.getMessage());
}
SuiteTaskExecutor taskExecutor =
new SuiteTaskExecutor(
this.m_configuration,
factory,
new LinkedBlockingQueue<>(),
suiteGraph,
m_suiteThreadPoolSize);
taskExecutor.execute();
taskExecutor.awaitCompletion();

//
// Generate the suites report
Expand Down
39 changes: 5 additions & 34 deletions testng-core/src/main/java/org/testng/TestRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -61,7 +60,6 @@
import org.testng.internal.objects.IObjectDispenser;
import org.testng.junit.IJUnitTestRunner;
import org.testng.log4testng.Logger;
import org.testng.thread.ITestNGThreadPoolExecutor;
import org.testng.thread.IThreadWorkerFactory;
import org.testng.thread.IWorker;
import org.testng.util.Strings;
Expand Down Expand Up @@ -761,8 +759,6 @@ private static BlockingQueue<Runnable> newQueue(boolean needPrioritySort) {
private void privateRun(XmlTest xmlTest) {
boolean parallel = xmlTest.getParallel().isParallel();

// parallel
int threadCount = parallel ? xmlTest.getThreadCount() : 1;
// Make sure we create a graph based on the intercepted methods, otherwise an interceptor
// removing methods would cause the graph never to terminate (because it would expect
// termination from methods that never get invoked).
Expand Down Expand Up @@ -798,36 +794,11 @@ private void privateRun(XmlTest xmlTest) {
if (graph.getNodeCount() <= 0) {
return;
}
ITestNGThreadPoolExecutor executor =
this.m_configuration
.getExecutorFactory()
.newTestMethodExecutor(
"test=" + xmlTest.getName(),
graph,
this,
threadCount,
threadCount,
0,
TimeUnit.MILLISECONDS,
newQueue(needPrioritySort),
methodComparator);
executor.run();
try {
long timeOut = m_xmlTest.getTimeOut(XmlTest.DEFAULT_TIMEOUT_MS);
Utils.log(
"TestRunner",
2,
"Starting executor for test "
+ m_xmlTest.getName()
+ " with time out:"
+ timeOut
+ " milliseconds.");
executor.awaitTermination(timeOut, TimeUnit.MILLISECONDS);
executor.shutdownNow();
} catch (InterruptedException handled) {
LOGGER.error(handled.getMessage(), handled);
Thread.currentThread().interrupt();
}
TestTaskExecutor taskExecutor =
new TestTaskExecutor(
m_configuration, xmlTest, this, newQueue(needPrioritySort), graph, methodComparator);
taskExecutor.execute();
taskExecutor.awaitCompletion();
return;
}
List<ITestNGMethod> freeNodes = graph.getFreeNodes();
Expand Down
95 changes: 95 additions & 0 deletions testng-core/src/main/java/org/testng/TestTaskExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.testng;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.testng.internal.IConfiguration;
import org.testng.internal.RuntimeBehavior;
import org.testng.internal.Utils;
import org.testng.internal.thread.TestNGThreadFactory;
import org.testng.internal.thread.graph.GraphOrchestrator;
import org.testng.log4testng.Logger;
import org.testng.thread.IExecutorFactory;
import org.testng.thread.ITestNGThreadPoolExecutor;
import org.testng.thread.IThreadWorkerFactory;
import org.testng.xml.XmlTest;

class TestTaskExecutor {
private final BlockingQueue<Runnable> queue;
private final Comparator<ITestNGMethod> comparator;
private final IDynamicGraph<ITestNGMethod> graph;
private final XmlTest xmlTest;
private final IThreadWorkerFactory<ITestNGMethod> factory;
private final IConfiguration configuration;
private final long timeOut;

private ExecutorService service;

private static final Logger LOGGER = Logger.getLogger(TestTaskExecutor.class);

public TestTaskExecutor(
IConfiguration configuration,
XmlTest xmlTest,
IThreadWorkerFactory<ITestNGMethod> factory,
BlockingQueue<Runnable> queue,
IDynamicGraph<ITestNGMethod> graph,
Comparator<ITestNGMethod> comparator) {
this.configuration = configuration;
this.xmlTest = xmlTest;
this.factory = factory;
this.queue = queue;
this.graph = graph;
this.comparator = comparator;
this.timeOut = xmlTest.getTimeOut(XmlTest.DEFAULT_TIMEOUT_MS);
}

public void execute() {
String name = "test-" + xmlTest.getName();
int threadCount = xmlTest.getThreadCount();
threadCount = Math.max(threadCount, 1);
if (RuntimeBehavior.favourCustomThreadPoolExecutor()) {
IExecutorFactory execFactory = configuration.getExecutorFactory();
ITestNGThreadPoolExecutor executor =
execFactory.newTestMethodExecutor(
name,
graph,
factory,
threadCount,
threadCount,
0,
TimeUnit.MILLISECONDS,
queue,
comparator);
executor.run();
service = executor;
} else {
service =
new ThreadPoolExecutor(
threadCount,
threadCount,
0,
TimeUnit.MILLISECONDS,
queue,
new TestNGThreadFactory(name));
GraphOrchestrator<ITestNGMethod> executor =
new GraphOrchestrator<>(service, factory, graph, comparator);
executor.run();
}
}

public void awaitCompletion() {
String msg =
String.format(
"Starting executor test %d with time out: %d milliseconds.", timeOut, timeOut);
Utils.log("TestTaskExecutor", 2, msg);
try {
boolean ignored = service.awaitTermination(timeOut, TimeUnit.MILLISECONDS);
service.shutdownNow();
} catch (InterruptedException handled) {
LOGGER.error(handled.getMessage(), handled);
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
import org.testng.thread.ITestNGThreadPoolExecutor;
import org.testng.thread.IThreadWorkerFactory;

/**
* @deprecated - This implementation stands deprecated as of TestNG <code>v7.9.0</code>. There are
* no alternatives for this implementation.
*/
@Deprecated
public class DefaultThreadPoolExecutorFactory implements IExecutorFactory {

@Override
Expand Down
Loading

0 comments on commit 34c7ba1

Please sign in to comment.