From 3eaa3ab3efb44878b38b8753ef9b3b1c05b077ad Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 2 Jan 2024 02:19:55 +0500 Subject: [PATCH 1/9] KAFKA-16072: JUnit 5 extension to detect thread leak Added LeakTestingExtension based on TestUtils#verifyNoUnexpectedThreads --- build.gradle | 3 ++ .../test/junit/LeakTestingExtension.java | 29 +++++++++++++++++++ .../org.junit.jupiter.api.extension.Extension | 16 ++++++++++ 3 files changed, 48 insertions(+) create mode 100644 core/src/test/java/kafka/test/junit/LeakTestingExtension.java create mode 100644 core/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension diff --git a/build.gradle b/build.gradle index f7abbf4f0b2ef..c2cedd726c397 100644 --- a/build.gradle +++ b/build.gradle @@ -438,6 +438,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() + systemProperty("junit.jupiter.extensions.autodetection.enabled", true) exclude testsToExclude @@ -466,6 +467,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() + systemProperty("junit.jupiter.extensions.autodetection.enabled", true) exclude testsToExclude @@ -509,6 +511,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() + systemProperty("junit.jupiter.extensions.autodetection.enabled", true) exclude testsToExclude diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java new file mode 100644 index 0000000000000..c3cf9b464710f --- /dev/null +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test.junit; + +import kafka.utils.TestUtils; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class LeakTestingExtension implements AfterEachCallback { + @Override + public void afterEach(ExtensionContext extensionContext) { + TestUtils.verifyNoUnexpectedThreads("@AfterEach"); + } +} diff --git a/core/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/core/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000000..15f3eb7616d31 --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +kafka.test.junit.LeakTestingExtension \ No newline at end of file From cfd1716e2c871026f391261aa47a82d841f74ea1 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 2 Jan 2024 17:04:50 +0500 Subject: [PATCH 2/9] KAFKA-16072: JUnit 5 extension to detect thread leak refactoring --- build.gradle | 8 +-- .../test/junit/LeakTestingExtension.java | 54 ++++++++++++++++++- 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/build.gradle b/build.gradle index c2cedd726c397..d01a243357286 100644 --- a/build.gradle +++ b/build.gradle @@ -112,6 +112,8 @@ ext { repo = file("$rootDir/.git").isDirectory() ? Grgit.open(currentDir: project.getRootDir()) : null commitId = determineCommitId() + + junitExtensionsAutodetectionEnabled = "junit.jupiter.extensions.autodetection.enabled" } allprojects { @@ -438,7 +440,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() - systemProperty("junit.jupiter.extensions.autodetection.enabled", true) + systemProperty(junitExtensionsAutodetectionEnabled, true) exclude testsToExclude @@ -467,7 +469,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() - systemProperty("junit.jupiter.extensions.autodetection.enabled", true) + systemProperty(junitExtensionsAutodetectionEnabled, true) exclude testsToExclude @@ -511,7 +513,7 @@ subprojects { displayGranularity = 0 } logTestStdout.rehydrate(delegate, owner, this)() - systemProperty("junit.jupiter.extensions.autodetection.enabled", true) + systemProperty(junitExtensionsAutodetectionEnabled, true) exclude testsToExclude diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index c3cf9b464710f..b3bf9a528f5e0 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -19,11 +19,61 @@ import kafka.utils.TestUtils; import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; -public class LeakTestingExtension implements AfterEachCallback { +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import scala.Tuple2; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { + private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( + Arrays.asList("junit-", "ForkJoinPool", "metrics-meter-tick-thread", "pool-", "scala-") + ); + private Set initialThreads; + + @Override + public void beforeEach(ExtensionContext extensionContext) { + initialThreads = Thread.getAllStackTraces().keySet(); + } + @Override public void afterEach(ExtensionContext extensionContext) { - TestUtils.verifyNoUnexpectedThreads("@AfterEach"); + Tuple2, Object> unexpectedThreads = TestUtils.computeUntilTrue( + this::unexpectedThreads, + DEFAULT_MAX_WAIT_MS, + 100L, + Set::isEmpty + ); + + assertTrue(unexpectedThreads._1.isEmpty(), "Found unexpected threads after executing test: " + + unexpectedThreads._1.stream().map(Objects::toString).collect(Collectors.joining(", "))); + } + + private Set unexpectedThreads() { + Set finalThreads = Thread.getAllStackTraces().keySet(); + + if (initialThreads.size() != finalThreads.size()) { + Set leakedThreads = new HashSet<>(finalThreads); + leakedThreads.removeAll(initialThreads); + return leakedThreads.stream() + .filter(t -> { + for (String s: EXPECTED_THREAD_NAMES) { + if (t.getName().contains(s)) + return false; + } + return true; + }) + .collect(Collectors.toSet()); + } + + return Collections.emptySet(); } } From 6387125326859d5be9108cc6f6fdf9316cf0638c Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 3 Jan 2024 15:56:36 +0500 Subject: [PATCH 3/9] KAFKA-16072: JUnit 5 extension to detect thread leak add expected thread names --- core/src/test/java/kafka/test/junit/LeakTestingExtension.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index b3bf9a528f5e0..56477cdfc1593 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -35,7 +35,8 @@ public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( - Arrays.asList("junit-", "ForkJoinPool", "metrics-meter-tick-thread", "pool-", "scala-") + Arrays.asList("junit-", "JMX", "feature-zk-node-event-process-thread", "ForkJoinPool", "executor-", + "kafka-scheduler-","metrics-meter-tick-thread", "ReplicaFetcherThread", "scala-", "pool-") ); private Set initialThreads; From 5c707ad697118227e3a156c17ca0b681082f2372 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 3 Jan 2024 20:57:07 +0500 Subject: [PATCH 4/9] KAFKA-16072: JUnit 5 extension to detect thread leak add expected thread names --- core/src/test/java/kafka/test/junit/LeakTestingExtension.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index 56477cdfc1593..928a818ab4d23 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -36,7 +36,7 @@ public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( Arrays.asList("junit-", "JMX", "feature-zk-node-event-process-thread", "ForkJoinPool", "executor-", - "kafka-scheduler-","metrics-meter-tick-thread", "ReplicaFetcherThread", "scala-", "pool-") + "kafka-scheduler-", "metrics-meter-tick-thread", "ReplicaFetcherThread", "scala-", "pool-") ); private Set initialThreads; From 7035b90952e98fc0682eb60b5b268997c5267801 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 3 Jan 2024 22:49:20 +0500 Subject: [PATCH 5/9] KAFKA-16072: JUnit 5 extension to detect thread leak used ExtensionContext.Store --- .../test/junit/LeakTestingExtension.java | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index 928a818ab4d23..0007c6489192b 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -21,6 +21,8 @@ import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Namespace; +import org.junit.jupiter.api.extension.ExtensionContext.Store; import java.util.Arrays; import java.util.Collections; @@ -36,19 +38,22 @@ public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( Arrays.asList("junit-", "JMX", "feature-zk-node-event-process-thread", "ForkJoinPool", "executor-", - "kafka-scheduler-", "metrics-meter-tick-thread", "ReplicaFetcherThread", "scala-", "pool-") + "ExpirationReaper", "kafka-scheduler-", "metrics-meter-tick-thread", "ReplicaFetcherThread", + "scala-", "pool-") ); - private Set initialThreads; + private static final String THREADS_KEY = "threads"; @Override - public void beforeEach(ExtensionContext extensionContext) { - initialThreads = Thread.getAllStackTraces().keySet(); + public void beforeEach(ExtensionContext context) { + getStore(context).put(THREADS_KEY, Thread.getAllStackTraces().keySet()); } @Override - public void afterEach(ExtensionContext extensionContext) { + @SuppressWarnings("unchecked") + public void afterEach(ExtensionContext context) { + Set initialThreads = getStore(context).remove(THREADS_KEY, Set.class); Tuple2, Object> unexpectedThreads = TestUtils.computeUntilTrue( - this::unexpectedThreads, + () -> unexpectedThreads(initialThreads), DEFAULT_MAX_WAIT_MS, 100L, Set::isEmpty @@ -58,7 +63,7 @@ public void afterEach(ExtensionContext extensionContext) { unexpectedThreads._1.stream().map(Objects::toString).collect(Collectors.joining(", "))); } - private Set unexpectedThreads() { + private Set unexpectedThreads(Set initialThreads) { Set finalThreads = Thread.getAllStackTraces().keySet(); if (initialThreads.size() != finalThreads.size()) { @@ -77,4 +82,8 @@ private Set unexpectedThreads() { return Collections.emptySet(); } + + private Store getStore(ExtensionContext context) { + return context.getStore(Namespace.create(getClass(), context.getRequiredTestMethod())); + } } From 9c4a43184ef8740b42ee49d75f7aab1f7cfcb2be Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Thu, 4 Jan 2024 04:00:59 +0500 Subject: [PATCH 6/9] KAFKA-16072: JUnit 5 extension to detect thread leak update EXPECTED_THREAD_NAMES --- core/src/test/java/kafka/test/junit/LeakTestingExtension.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index 0007c6489192b..8ea72b860f0e2 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -38,8 +38,7 @@ public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( Arrays.asList("junit-", "JMX", "feature-zk-node-event-process-thread", "ForkJoinPool", "executor-", - "ExpirationReaper", "kafka-scheduler-", "metrics-meter-tick-thread", "ReplicaFetcherThread", - "scala-", "pool-") + "metrics-meter-tick-thread", "scala-", "pool-") ); private static final String THREADS_KEY = "threads"; From 7460dad3be887ea719f518ea40ae9333e83602a4 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Fri, 5 Jan 2024 22:19:30 +0500 Subject: [PATCH 7/9] KAFKA-16072: JUnit 5 extension to detect thread leak refactoring --- .../test/junit/LeakTestingExtension.java | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index 8ea72b860f0e2..3e2f8b185887d 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -25,7 +25,6 @@ import org.junit.jupiter.api.extension.ExtensionContext.Store; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Set; @@ -63,23 +62,17 @@ public void afterEach(ExtensionContext context) { } private Set unexpectedThreads(Set initialThreads) { - Set finalThreads = Thread.getAllStackTraces().keySet(); - - if (initialThreads.size() != finalThreads.size()) { - Set leakedThreads = new HashSet<>(finalThreads); - leakedThreads.removeAll(initialThreads); - return leakedThreads.stream() - .filter(t -> { - for (String s: EXPECTED_THREAD_NAMES) { - if (t.getName().contains(s)) - return false; - } - return true; - }) - .collect(Collectors.toSet()); - } - - return Collections.emptySet(); + Set threads = new HashSet<>(Thread.getAllStackTraces().keySet()); + return threads.stream() + .filter(t -> !initialThreads.contains(t)) + .filter(t -> { + for (String s: EXPECTED_THREAD_NAMES) { + if (t.getName().contains(s)) + return false; + } + return true; + }) + .collect(Collectors.toSet()); } private Store getStore(ExtensionContext context) { From d26d948f61796d28113cf47c90024360b6e87580 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Sun, 7 Jan 2024 15:08:20 +0500 Subject: [PATCH 8/9] KAFKA-16072: JUnit 5 extension to detect thread leak updated EXPECTED_THREAD_NAMES --- core/src/test/java/kafka/test/junit/LeakTestingExtension.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index 3e2f8b185887d..cd7bc970de8b8 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -36,8 +36,8 @@ public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( - Arrays.asList("junit-", "JMX", "feature-zk-node-event-process-thread", "ForkJoinPool", "executor-", - "metrics-meter-tick-thread", "scala-", "pool-") + Arrays.asList("Attach Listener", "executor-", "feature-zk-node-event-process-thread", "ForkJoinPool", "JMX", + "junit-", "metrics-meter-tick-thread", "pool-", "process reaper", "RMI", "scala-") ); private static final String THREADS_KEY = "threads"; From 93b5985250cdcd33421086f087d16f185ecd750e Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Sun, 11 Feb 2024 02:01:33 +0500 Subject: [PATCH 9/9] KAFKA-16072: JUnit 5 extension to detect thread leak add expected thread names --- .../test/java/kafka/test/junit/LeakTestingExtension.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java index cd7bc970de8b8..effa680a140cb 100644 --- a/core/src/test/java/kafka/test/junit/LeakTestingExtension.java +++ b/core/src/test/java/kafka/test/junit/LeakTestingExtension.java @@ -36,8 +36,11 @@ public class LeakTestingExtension implements BeforeEachCallback, AfterEachCallback { private static final Set EXPECTED_THREAD_NAMES = new HashSet<>( - Arrays.asList("Attach Listener", "executor-", "feature-zk-node-event-process-thread", "ForkJoinPool", "JMX", - "junit-", "metrics-meter-tick-thread", "pool-", "process reaper", "RMI", "scala-") + Arrays.asList( + "Attach Listener", "client-metrics-reaper", "executor-", "feature-zk-node-event-process-thread", + "ForkJoinPool", "JMX", "junit-", "metrics-meter-tick-thread", "pool-", "process reaper", "RMI", + "scala-" + ) ); private static final String THREADS_KEY = "threads";