From f8064e7e178ad38a2e34c0ab08f7b706cecd828f Mon Sep 17 00:00:00 2001 From: Carlos Macasaet Date: Thu, 15 Oct 2020 00:03:08 -0700 Subject: [PATCH] Support parallelization in junit-vintage-engine This change updates the JUnit Vintage Test Engine to read the parallel configuration parameters, and if parallel execution is enabled, creates a thread pool for executing the test descriptors concurrently. The approach for configuring the thread pool emulates the logic used by the Jupiter Test Engine. Issue: #2229 --- .../vintage/engine/VintageTestEngine.java | 114 +++++++++++++- .../VintageTestEngineParallelismTests.java | 148 ++++++++++++++++++ .../samples/junit4/ConcurrencyTests.java | 46 ++++++ 3 files changed, 304 insertions(+), 4 deletions(-) create mode 100644 junit-vintage-engine/src/test/java/org/junit/vintage/engine/VintageTestEngineParallelismTests.java create mode 100644 junit-vintage-engine/src/testFixtures/java/org/junit/vintage/engine/samples/junit4/ConcurrencyTests.java diff --git a/junit-vintage-engine/src/main/java/org/junit/vintage/engine/VintageTestEngine.java b/junit-vintage-engine/src/main/java/org/junit/vintage/engine/VintageTestEngine.java index a4d8206ecaec..f472e2214097 100644 --- a/junit-vintage-engine/src/main/java/org/junit/vintage/engine/VintageTestEngine.java +++ b/junit-vintage-engine/src/main/java/org/junit/vintage/engine/VintageTestEngine.java @@ -14,15 +14,27 @@ import static org.junit.platform.engine.TestExecutionResult.successful; import static org.junit.vintage.engine.descriptor.VintageTestDescriptor.ENGINE_ID; +import java.util.Collection; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import org.apiguardian.api.API; +import org.junit.platform.commons.JUnitException; +import org.junit.platform.engine.ConfigurationParameters; import org.junit.platform.engine.EngineDiscoveryRequest; import org.junit.platform.engine.EngineExecutionListener; import org.junit.platform.engine.ExecutionRequest; import org.junit.platform.engine.TestDescriptor; import org.junit.platform.engine.TestEngine; import org.junit.platform.engine.UniqueId; +import org.junit.platform.engine.support.config.PrefixedConfigurationParameters; +import org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy; +import org.junit.platform.engine.support.hierarchical.ParallelExecutionConfiguration; +import org.junit.platform.engine.support.hierarchical.ParallelExecutionConfigurationStrategy; import org.junit.vintage.engine.descriptor.RunnerTestDescriptor; import org.junit.vintage.engine.descriptor.VintageEngineDescriptor; import org.junit.vintage.engine.discovery.VintageDiscoverer; @@ -36,6 +48,10 @@ @API(status = INTERNAL, since = "4.12") public final class VintageTestEngine implements TestEngine { + private static final String PARALLEL_CONFIG_PREFIX = "junit.jupiter.execution.parallel"; + private static final String PARALLEL_ENABLED_CONFIG = PARALLEL_CONFIG_PREFIX + ".enabled"; + private static final String PARALLEL_CONFIG = PARALLEL_CONFIG_PREFIX + ".config."; + @Override public String getId() { return ENGINE_ID; @@ -70,17 +86,107 @@ public void execute(ExecutionRequest request) { engineExecutionListener.executionStarted(engineDescriptor); RunnerExecutor runnerExecutor = new RunnerExecutor(engineExecutionListener, engineDescriptor.getTestSourceProvider()); - executeAllChildren(runnerExecutor, engineDescriptor); + + final ConfigurationParameters configurationParameters = request.getConfigurationParameters(); + final boolean parallelExecutionEnabled = configurationParameters.getBoolean(PARALLEL_ENABLED_CONFIG).orElse( + false); + + try (CloseableExecutor executor = parallelExecutionEnabled ? new ParallelExecutor(configurationParameters) + : new SerialExecutor()) { + executeAllChildren(executor, runnerExecutor, engineDescriptor); + } + catch (final InterruptedException e) { + throw new JUnitException("Error executing tests for engine " + getId() + ": " + e.getMessage(), e); + } + engineExecutionListener.executionFinished(engineDescriptor, successful()); } - private void executeAllChildren(RunnerExecutor runnerExecutor, TestDescriptor engineDescriptor) { + private void executeAllChildren(final Executor executor, RunnerExecutor runnerExecutor, + TestDescriptor engineDescriptor) throws InterruptedException { + final Collection children = engineDescriptor.getChildren(); + final CountDownLatch latch = new CountDownLatch(children.size()); // @formatter:off - engineDescriptor.getChildren() + children .stream() .map(RunnerTestDescriptor.class::cast) - .forEach(runnerExecutor::execute); + .map(descriptor -> (Runnable)() -> { + runnerExecutor.execute(descriptor); + latch.countDown(); + }) + .forEach(executor::execute); // @formatter:on + latch.await(); + } + + /** + * Wrapper for {@link Executor} to allow it to be used in a try-with-resources block. + */ + @API(status = INTERNAL, since = "5.8") + protected interface CloseableExecutor extends Executor, AutoCloseable { + default public void close() throws JUnitException { + }; + } + + /** + * {@link CloseableExecutor} that executes tasks synchronously. + * + * @since 5.8 + */ + @API(status = INTERNAL, since = "5.8") + protected class SerialExecutor implements CloseableExecutor { + public void execute(final Runnable command) { + command.run(); + } + + } + + /** + * {@link CloseableExecutor} backed by a {@link ForkJoinPool} that executes tasks asynchronously + * based on the settings in {@value #PARALLEL_CONFIG}. Clients *must* implement their own logic + * to wait for tasks to complete. + * + * @since 5.8 + */ + @API(status = INTERNAL, since = "5.8") + protected class ParallelExecutor implements CloseableExecutor { + + private final ForkJoinPool pool; + + /** + * @param parameters test execution configuration + */ + public ParallelExecutor(final ConfigurationParameters parameters) { + final ConfigurationParameters prefixedParameters = new PrefixedConfigurationParameters(parameters, + PARALLEL_CONFIG); + final String strategyName = prefixedParameters.get("strategy").orElse("dynamic"); + final ParallelExecutionConfigurationStrategy executionStrategy = DefaultParallelExecutionConfigurationStrategy.valueOf( + strategyName.toUpperCase()); + final ParallelExecutionConfiguration executionConfiguration = executionStrategy.createConfiguration( + prefixedParameters); + pool = new ForkJoinPool(executionConfiguration.getParallelism()); + } + + public void execute(final Runnable command) { + getPool().execute(command); + } + + public void close() throws JUnitException { + final ExecutorService executor = getPool(); + executor.shutdown(); + try { + executor.awaitTermination(1, TimeUnit.MINUTES); + } + catch (final InterruptedException ie) { + throw new JUnitException("Interrupted while waiting for forked tests to complete; " + ie.getMessage(), + ie); + } + } + + protected ForkJoinPool getPool() { + return pool; + } + } } diff --git a/junit-vintage-engine/src/test/java/org/junit/vintage/engine/VintageTestEngineParallelismTests.java b/junit-vintage-engine/src/test/java/org/junit/vintage/engine/VintageTestEngineParallelismTests.java new file mode 100644 index 000000000000..e6124ffe5fcd --- /dev/null +++ b/junit-vintage-engine/src/test/java/org/junit/vintage/engine/VintageTestEngineParallelismTests.java @@ -0,0 +1,148 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * All rights reserved. This program and the accompanying materials are + * made available under the terms of the Eclipse Public License v2.0 which + * accompanies this distribution and is available at + * + * https://www.eclipse.org/legal/epl-v20.html + */ + +package org.junit.vintage.engine; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.platform.engine.discovery.DiscoverySelectors.selectClass; + +import java.util.IntSummaryStatistics; +import java.util.concurrent.atomic.LongAdder; + +import org.junit.jupiter.api.Test; +import org.junit.platform.engine.EngineExecutionListener; +import org.junit.platform.engine.ExecutionRequest; +import org.junit.platform.engine.TestDescriptor; +import org.junit.platform.engine.TestEngine; +import org.junit.platform.engine.TestExecutionResult; +import org.junit.platform.engine.TestExecutionResult.Status; +import org.junit.platform.engine.UniqueId; +import org.junit.platform.launcher.LauncherDiscoveryRequest; +import org.junit.platform.launcher.core.LauncherDiscoveryRequestBuilder; +import org.junit.vintage.engine.samples.junit4.ConcurrencyTests.A; +import org.junit.vintage.engine.samples.junit4.ConcurrencyTests.B; + +/** + * Tests to ensure that vintage JUnit tests can run in parallel. + * At the moment, only concurrency of top-level containers is supported. + * + * @since 5.8 + */ +class VintageTestEngineParallelismTests { + + private static final String PARALLELISM = "junit.jupiter.execution.parallel.config.fixed.parallelism"; + private static final String PARALLEL_STRATEGY = "junit.jupiter.execution.parallel.config.strategy"; + private static final String DEFAULT_PARALLEL_MODE = "junit.jupiter.execution.parallel.mode.default"; + private static final String PARALLEL_EXECUTION_ENABLED = "junit.jupiter.execution.parallel.enabled"; + + /** + * Verify that if parallel execution is enabled, tests from separate classes will run concurrently. + */ + @Test + void verifyTestsRunConcurrently() { + // given + final LauncherDiscoveryRequest discoveryRequest = LauncherDiscoveryRequestBuilder.request().selectors( + selectClass(A.class), selectClass(B.class)).configurationParameter(PARALLEL_EXECUTION_ENABLED, + "true").configurationParameter(DEFAULT_PARALLEL_MODE, "concurrent").configurationParameter( + PARALLEL_STRATEGY, "fixed").configurationParameter(PARALLELISM, "2").build(); + final TestEngine engine = new VintageTestEngine(); + final TestDescriptor descriptor = engine.discover(discoveryRequest, UniqueId.forEngine(engine.getId())); + + final CountingListener listener = new CountingListener(); + final ExecutionRequest executionRequest = new ExecutionRequest(descriptor, listener, + discoveryRequest.getConfigurationParameters()); + + // when + engine.execute(executionRequest); + + // then + assertEquals(2, listener.getMaxConcurrentTests()); + assertEquals(2, listener.getMaxConcurrentClasses()); + } + + /** + * Verify that if parallel execution is disabled, tests will run sequentially. + */ + @Test + void verifyTestsRunSequentially() { + // given + final LauncherDiscoveryRequest discoveryRequest = LauncherDiscoveryRequestBuilder.request().selectors( + selectClass(A.class), selectClass(B.class)).configurationParameter(PARALLEL_EXECUTION_ENABLED, + "false").build(); + final TestEngine engine = new VintageTestEngine(); + final TestDescriptor descriptor = engine.discover(discoveryRequest, UniqueId.forEngine(engine.getId())); + + final CountingListener listener = new CountingListener(); + final ExecutionRequest executionRequest = new ExecutionRequest(descriptor, listener, + discoveryRequest.getConfigurationParameters()); + + // when + engine.execute(executionRequest); + + // then + assertEquals(1, listener.getMaxConcurrentTests()); + assertEquals(1, listener.getMaxConcurrentClasses()); + } + + /** + * {@link EngineExecutionListener} that exposes that maximum number of concurrent classes and tests. + * + * @since 5.8 + */ + protected class CountingListener implements EngineExecutionListener { + + private final IntSummaryStatistics classSummary = new IntSummaryStatistics(); + private final IntSummaryStatistics testSummary = new IntSummaryStatistics(); + private final LongAdder testAdder = new LongAdder(); + private final LongAdder classAdder = new LongAdder(); + + /** + * @return the largest number of tests that ran at the same time. + */ + public int getMaxConcurrentTests() { + return testSummary.getMax(); + } + + /** + * @return the largest number of classes that ran at the same time. + */ + public int getMaxConcurrentClasses() { + return classSummary.getMax(); + } + + public void executionStarted(final TestDescriptor testDescriptor) { + synchronized (this) { + final String displayName = testDescriptor.getDisplayName(); + if (testDescriptor.isTest()) { + testAdder.increment(); + testSummary.accept(testAdder.intValue()); + } + else if (displayName.equals("A") || displayName.equals("B")) { + classAdder.increment(); + classSummary.accept(classAdder.intValue()); + } + } + } + + public void executionFinished(final TestDescriptor testDescriptor, + final TestExecutionResult testExecutionResult) { + synchronized (this) { + final String displayName = testDescriptor.getDisplayName(); + assertEquals(Status.SUCCESSFUL, testExecutionResult.getStatus()); + if (testDescriptor.isTest()) { + testAdder.decrement(); + } + else if (displayName.equals("A") || displayName.equals("B")) { + classAdder.decrement(); + } + } + } + } +} diff --git a/junit-vintage-engine/src/testFixtures/java/org/junit/vintage/engine/samples/junit4/ConcurrencyTests.java b/junit-vintage-engine/src/testFixtures/java/org/junit/vintage/engine/samples/junit4/ConcurrencyTests.java new file mode 100644 index 000000000000..cbc7897941ec --- /dev/null +++ b/junit-vintage-engine/src/testFixtures/java/org/junit/vintage/engine/samples/junit4/ConcurrencyTests.java @@ -0,0 +1,46 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * All rights reserved. This program and the accompanying materials are + * made available under the terms of the Eclipse Public License v2.0 which + * accompanies this distribution and is available at + * + * https://www.eclipse.org/legal/epl-v20.html + */ + +package org.junit.vintage.engine.samples.junit4; + +import org.junit.Test; + +/** + * Vintage tests to reproduce various concurrency scenarios. + * + * @since 5.8 + */ +public class ConcurrencyTests { + + public static class A { + @Test + public final void test1() throws InterruptedException { + Thread.sleep(5); + } + + @Test + public final void test2() throws InterruptedException { + Thread.sleep(5); + } + } + + public static class B { + @Test + public final void test1() throws InterruptedException { + Thread.sleep(5); + } + + @Test + public final void test2() throws InterruptedException { + Thread.sleep(5); + } + } + +}