From 1eb22f165b975b5b423ae3e45ff53ca15cb43d6b Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 23 May 2024 11:02:32 +0200 Subject: [PATCH 1/5] [broker] EntryFilters fix NoClassDefFoundError due to closed classloader --- .../service/plugin/EntryFilterProvider.java | 3 ++- .../plugin/EntryFilterWithClassLoader.java | 24 +++++++++++++------ .../pulsar/common/nar/NarClassLoader.java | 16 +++++++++++++ 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index f93e561542eeb..53418744b5486 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -197,7 +197,8 @@ protected EntryFilter load(EntryFilterMetaData metadata) + " does not implement entry filter interface"); } EntryFilter pi = (EntryFilter) filter; - return new EntryFilterWithClassLoader(pi, ncl); + // the classloader is shared with the broker, the instance doesn't own it + return new EntryFilterWithClassLoader(pi, ncl, false); } catch (Throwable e) { if (e instanceof IOException) { throw (IOException) e; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java index c5c5721087788..77e77265b87b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -23,6 +23,7 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.common.nar.NarClassLoader; @Slf4j @@ -30,15 +31,19 @@ public class EntryFilterWithClassLoader implements EntryFilter { private final EntryFilter entryFilter; private final NarClassLoader classLoader; + private final boolean classLoaderOwned; - public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader) { + public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classLoader, boolean classLoaderOwned) { this.entryFilter = entryFilter; this.classLoader = classLoader; + this.classLoaderOwned = classLoaderOwned; } @Override public FilterResult filterEntry(Entry entry, FilterContext context) { - return entryFilter.filterEntry(entry, context); + try (ClassLoaderSwitcher switcher = new ClassLoaderSwitcher(classLoader)) { + return entryFilter.filterEntry(entry, context); + } } @VisibleForTesting @@ -48,11 +53,16 @@ public EntryFilter getEntryFilter() { @Override public void close() { - entryFilter.close(); - try { - classLoader.close(); - } catch (IOException e) { - log.error("close EntryFilterWithClassLoader failed", e); + try (ClassLoaderSwitcher switcher = new ClassLoaderSwitcher(classLoader)) { + entryFilter.close(); + } + if (classLoaderOwned) { + log.info("Closing classloader {} for EntryFilter {}", classLoader, entryFilter.getClass().getName()); + try { + classLoader.close(); + } catch (IOException e) { + log.error("close EntryFilterWithClassLoader failed", e); + } } } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java index 9736d8b47ef71..44cfc2872ef6b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java @@ -40,6 +40,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -135,6 +136,7 @@ public class NarClassLoader extends URLClassLoader { * The NAR for which this ClassLoader is responsible. */ private final File narWorkingDirectory; + private final AtomicBoolean closed = new AtomicBoolean(); private static final String TMP_DIR_PREFIX = "pulsar-nar"; @@ -292,4 +294,18 @@ protected String findLibrary(final String libname) { public String toString() { return NarClassLoader.class.getName() + "[" + narWorkingDirectory.getPath() + "]"; } + + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (closed.get()) { + log.warn("Loading class {} from a closed classloader ({})", name, this); + } + return super.loadClass(name, resolve); + } + + @Override + public void close() throws IOException { + closed.set(true); + super.close(); + } } From 5c904d62c20dafb6a0f2cea566e4bf2ae18f686d Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 24 May 2024 11:16:57 +0200 Subject: [PATCH 2/5] Fix test --- .../broker/service/plugin/FilterEntryTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 7b3daddcd9da0..f7388ef9eb990 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -239,9 +239,9 @@ public void testFilter() throws Exception { hasFilterField.setAccessible(true); NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); - EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter1, narClassLoader); + EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); EntryFilter filter2 = new EntryFilter2Test(); - EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter2, narClassLoader); + EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgsRecordingInvocations(EntryFilterWithClassLoader.class, filter2, narClassLoader, false); field.set(dispatcher, List.of(loader1, loader2)); hasFilterField.set(dispatcher, true); @@ -371,9 +371,9 @@ public void testFilteredMsgCount(String topic) throws Throwable { hasFilterField.setAccessible(true); NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); - EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + EntryFilterWithClassLoader loader1 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); EntryFilter filter2 = new EntryFilter2Test(); - EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader); + EntryFilterWithClassLoader loader2 = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader, false); field.set(dispatcher, List.of(loader1, loader2)); hasFilterField.set(dispatcher, true); @@ -463,10 +463,10 @@ public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscriptio NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); EntryFilterWithClassLoader loader1 = - spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); EntryFilter filter2 = new EntryFilterTest(); EntryFilterWithClassLoader loader2 = - spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader); + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter2, narClassLoader, false); field.set(dispatcher, List.of(loader1, loader2)); hasFilterField.set(dispatcher, true); From 47d8b2c797d3056e9162da6a2148fd79835d0bf3 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 24 May 2024 11:56:54 +0200 Subject: [PATCH 3/5] fix ConsumerStatsTest --- .../java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 024d8582fa213..5b2998216e8e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -409,7 +409,7 @@ public void testAvgMessagesPerEntry() throws Exception { EntryFilter filter = new EntryFilterProducerTest(); EntryFilterWithClassLoader loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter, - narClassLoader); + narClassLoader, false); Pair> entryFilters = Pair.of("filter", List.of(loader)); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService() From 863f39adb0c680beedd3c51e7cb11faa19d608b1 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 24 May 2024 14:17:39 +0200 Subject: [PATCH 4/5] Fix SubscriptionStatsTest --- .../org/apache/pulsar/broker/stats/SubscriptionStatsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index 3e71d8f211101..bc4cb73e5b6fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -208,7 +208,7 @@ public void testSubscriptionStats(final String topic, final String subName, bool NarClassLoader narClassLoader = mock(NarClassLoader.class); EntryFilter filter1 = new EntryFilterTest(); EntryFilterWithClassLoader loader1 = - spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader); + spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter1, narClassLoader, false); field.set(dispatcher, List.of(loader1)); hasFilterField.set(dispatcher, true); } From 4f5f0832cea7d20ba77aa4e636d74e6488c4c452 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 29 May 2024 13:06:55 +0200 Subject: [PATCH 5/5] Remove ClassLoaderSwitcher --- .../service/plugin/EntryFilterWithClassLoader.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java index 77e77265b87b5..aab46c62acdb4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterWithClassLoader.java @@ -23,7 +23,6 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; -import org.apache.pulsar.broker.ClassLoaderSwitcher; import org.apache.pulsar.common.nar.NarClassLoader; @Slf4j @@ -41,8 +40,12 @@ public EntryFilterWithClassLoader(EntryFilter entryFilter, NarClassLoader classL @Override public FilterResult filterEntry(Entry entry, FilterContext context) { - try (ClassLoaderSwitcher switcher = new ClassLoaderSwitcher(classLoader)) { + ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(classLoader); return entryFilter.filterEntry(entry, context); + } finally { + Thread.currentThread().setContextClassLoader(currentClassLoader); } } @@ -53,8 +56,12 @@ public EntryFilter getEntryFilter() { @Override public void close() { - try (ClassLoaderSwitcher switcher = new ClassLoaderSwitcher(classLoader)) { + ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(classLoader); entryFilter.close(); + } finally { + Thread.currentThread().setContextClassLoader(currentClassLoader); } if (classLoaderOwned) { log.info("Closing classloader {} for EntryFilter {}", classLoader, entryFilter.getClass().getName());