diff --git a/build.gradle b/build.gradle index d365233465108..07dd84108f45f 100644 --- a/build.gradle +++ b/build.gradle @@ -114,6 +114,8 @@ ext { repo = file("$rootDir/.git").isDirectory() ? Grgit.open(currentDir: project.getRootDir()) : null commitId = determineCommitId() + + junitExtensionsAutodetectionEnabled = "junit.jupiter.extensions.autodetection.enabled" } allprojects { @@ -439,6 +441,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() + systemProperty(junitExtensionsAutodetectionEnabled, true) exclude testsToExclude @@ -467,6 +470,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() + systemProperty(junitExtensionsAutodetectionEnabled, true) exclude testsToExclude @@ -510,6 +514,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() + systemProperty(junitExtensionsAutodetectionEnabled, true) exclude testsToExclude diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java new file mode 100644 index 0000000000000..effa680a140cb --- /dev/null +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Namespace; +import org.junit.jupiter.api.extension.ExtensionContext.Store; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import scala.Tuple2; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { + private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( + Arrays.asList( + "Attach Listener", "client-metrics-reaper", "executor-", "feature-zk-node-event-process-thread", + "ForkJoinPool", "JMX", "junit-", "metrics-meter-tick-thread", "pool-", "process reaper", "RMI", + "scala-" + ) + ); + private static final String THREADS_KEY = "threads"; + + @Override + public void beforeEach(ExtensionContext context) { + getStore(context).put(THREADS_KEY, Thread.getAllStackTraces().keySet()); + } + + @Override + @SuppressWarnings("unchecked") + public void afterEach(ExtensionContext context) { + Set initialThreads = getStore(context).remove(THREADS_KEY, Set.class); + Tuple2, Object> unexpectedThreads = TestUtils.computeUntilTrue( + () -> unexpectedThreads(initialThreads), + DEFAULT_MAX_WAIT_MS, + 100L, + Set::isEmpty + ); + + assertTrue(unexpectedThreads._1.isEmpty(), "Found unexpected threads after executing test: " + + unexpectedThreads._1.stream().map(Objects::toString).collect(Collectors.joining(", "))); + } + + private Set unexpectedThreads(Set initialThreads) { + Set threads = new HashSet<>(Thread.getAllStackTraces().keySet()); + return threads.stream() + .filter(t -> !initialThreads.contains(t)) + .filter(t -> { + for (String s: EXPECTED_THREAD_NAMES) { + if (t.getName().contains(s)) + return false; + } + return true; + }) + .collect(Collectors.toSet()); + } + + private Store getStore(ExtensionContext context) { + return context.getStore(Namespace.create(getClass(), context.getRequiredTestMethod())); + } +} diff --git a/core/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/core/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000000..15f3eb7616d31 --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +kafka.test.junit.LeakTestingExtension \ No newline at end of file