From acbca5afe0234b9696029aa28915c2badb16557e Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 10:15:27 +1100 Subject: [PATCH 01/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. --- .../eclipse/jetty/util/ConcurrentPool.java | 86 ++++++++++++++++--- 1 file changed, 74 insertions(+), 12 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index b7feb87e6dcd..79e489053560 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.util; import java.io.IOException; +import java.lang.ref.WeakReference; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -55,7 +56,7 @@ public class ConcurrentPool

implements Pool

, Dumpable private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPool.class); - private final List> entries = new CopyOnWriteArrayList<>(); + private final List> entries = new CopyOnWriteArrayList<>(); private final int maxSize; private final StrategyType strategyType; /* @@ -66,7 +67,7 @@ public class ConcurrentPool

implements Pool

, Dumpable * When an entry can't be found in the cache, the global list is iterated * with the configured strategy so the cache has no visible effect besides performance. */ - private final ThreadLocal> cache; + private final ThreadLocal> cache; // TODO does this make sense anymore? private final AutoLock lock = new AutoLock(); private final AtomicInteger nextIndex; private final ToIntFunction

maxMultiplex; @@ -117,7 +118,10 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToI public int getTerminatedCount() { - return (int)entries.stream().filter(Entry::isTerminated).count(); + return (int)entries.stream() + .map(Holder::getEntry) + .filter(Objects::nonNull) + .filter(Entry::isTerminated).count(); } /** @@ -155,7 +159,7 @@ public Entry

reserve() } ConcurrentEntry

entry = new ConcurrentEntry<>(this); - entries.add(entry); + entries.add(entry.getHolder()); if (LOG.isDebugEnabled()) LOG.debug("returning reserved entry {} for {}", entry, this); return entry; @@ -189,12 +193,26 @@ public Entry

acquire() { try { - ConcurrentEntry

entry = (ConcurrentEntry

)entries.get(index); - if (entry != null && entry.tryAcquire()) + Holder

holder = entries.get(index); + if (holder != null) { - if (LOG.isDebugEnabled()) - LOG.debug("returning entry {} for {}", entry, this); - return entry; + + ConcurrentEntry

entry = (ConcurrentEntry

)holder.getEntry(); + if (entry == null) + { + if (LOG.isDebugEnabled()) + LOG.warn("LEAK!!!"); + entries.remove(index); + continue; + } + + if (entry.tryAcquire()) + { + if (LOG.isDebugEnabled()) + LOG.debug("returning entry {} for {}", entry, this); + holder.free(); + return entry; + } } } catch (IndexOutOfBoundsException e) @@ -247,7 +265,8 @@ private boolean remove(Entry

entry) // No need to lock, no race with reserve() // and the race with terminate() is harmless. - boolean evicted = entries.remove(entry); + Holder

holder = ((ConcurrentEntry

)entry).getHolder(); + boolean evicted = holder != null && entries.remove(holder); if (LOG.isDebugEnabled()) LOG.debug("evicted {} {} for {}", evicted, entry, this); @@ -272,7 +291,10 @@ public Collection> terminate() // Field this.terminated must be modified with the lock held // because the list of entries is modified, see reserve(). terminated = true; - copy = List.copyOf(entries); + copy = entries.stream() + .map(Holder::getEntry) + .filter(Objects::nonNull) + .toList(); entries.clear(); } @@ -309,7 +331,7 @@ public int getMaxSize() @Override public Stream> stream() { - return entries.stream(); + return entries.stream().map(Holder::getEntry).filter(Objects::nonNull); } @Override @@ -386,10 +408,17 @@ public static class ConcurrentEntry implements Entry // Other threads accessing must check the state field above first, so a good before/after // relationship exists to make a memory barrier. private E pooled; + private final WeakReference> holder; public ConcurrentEntry(ConcurrentPool pool) { this.pool = pool; + holder = new WeakReference<>(new Holder<>(this)); + } + + private Holder getHolder() + { + return holder.get(); } @Override @@ -504,7 +533,12 @@ private boolean tryRelease() return false; int newMultiplexCount = multiplexCount - 1; if (state.compareAndSet(encoded, 0, newMultiplexCount)) + { + Holder holder = getHolder(); + if (holder != null && !holder.hold()) + continue; return true; + } } } @@ -597,4 +631,32 @@ public String toString() getPooled()); } } + + private static class Holder

+ { + private final WeakReference> _weak; + private volatile Entry

_hard; + + Holder(Entry

entry) + { + _weak = new WeakReference<>(entry); + _hard = entry; + } + + Entry

getEntry() + { + return _weak.get(); + } + + public boolean hold() + { + _hard = _weak.get(); + return _hard != null; + } + + public void free() + { + _hard = null; + } + } } From 9eebabba69243d150cb7f6c1311bbffbedbb40b3 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 10:20:15 +1100 Subject: [PATCH 02/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. back link may be hard --- .../main/java/org/eclipse/jetty/util/ConcurrentPool.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 79e489053560..9c09287c4d05 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -408,17 +408,17 @@ public static class ConcurrentEntry implements Entry // Other threads accessing must check the state field above first, so a good before/after // relationship exists to make a memory barrier. private E pooled; - private final WeakReference> holder; + private final Holder holder; public ConcurrentEntry(ConcurrentPool pool) { this.pool = pool; - holder = new WeakReference<>(new Holder<>(this)); + holder = new Holder<>(this); } private Holder getHolder() { - return holder.get(); + return holder; } @Override From 4fbcb0abb8f8e3482c3029f67812005ee9fdfc9b Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 10:27:28 +1100 Subject: [PATCH 03/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire --- .../eclipse/jetty/util/ConcurrentPool.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 9c09287c4d05..2dd0b02741d7 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -153,9 +153,21 @@ public Entry

reserve() int entriesSize = entries.size(); if (maxSize > 0 && entriesSize >= maxSize) { - if (LOG.isDebugEnabled()) - LOG.debug("no space: {} >= {}, cannot reserve entry for {}", entriesSize, maxSize, this); - return null; + // Sweep for collected entries + // TODO this could be better? + for (int i = 0; i < entries.size(); i++) + { + Holder

holder = entries.get(i); + if (holder.getEntry() == null) + entries.remove(holder); + } + entriesSize = entries.size(); + if (maxSize > 0 && entriesSize >= maxSize) + { + if (LOG.isDebugEnabled()) + LOG.debug("no space: {} >= {}, cannot reserve entry for {}", entriesSize, maxSize, this); + return null; + } } ConcurrentEntry

entry = new ConcurrentEntry<>(this); @@ -438,6 +450,8 @@ public boolean enable(E pooled, boolean acquire) { if (LOG.isDebugEnabled()) LOG.debug("enabled {} for {}", this, pool); + if (acquire) + getHolder().free(); return true; } From 86ecaa2f6240404e7542fd09cc290bcbcc89abae Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 10:33:19 +1100 Subject: [PATCH 04/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire removed ThreadLocal cache --- .../eclipse/jetty/util/ConcurrentPool.java | 55 ++++++++----------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 2dd0b02741d7..178a661a51f0 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -59,22 +59,13 @@ public class ConcurrentPool

implements Pool

, Dumpable private final List> entries = new CopyOnWriteArrayList<>(); private final int maxSize; private final StrategyType strategyType; - /* - * The cache is used to avoid hammering on the first index of the entry list. - * Caches can become poisoned (i.e.: containing entries that are in use) when - * the release isn't done by the acquiring thread or when the entry pool is - * undersized compared to the load applied on it. - * When an entry can't be found in the cache, the global list is iterated - * with the configured strategy so the cache has no visible effect besides performance. - */ - private final ThreadLocal> cache; // TODO does this make sense anymore? private final AutoLock lock = new AutoLock(); private final AtomicInteger nextIndex; private final ToIntFunction

maxMultiplex; private volatile boolean terminated; /** - *

Creates an instance with the specified strategy and no {@link ThreadLocal} cache.

+ *

Creates an instance with the specified strategy.

* * @param strategyType the strategy to used to lookup entries * @param maxSize the maximum number of pooled entries @@ -85,33 +76,50 @@ public ConcurrentPool(StrategyType strategyType, int maxSize) } /** - *

Creates an instance with the specified strategy and an optional {@link ThreadLocal} cache.

+ *

Creates an instance with the specified strategy.

* * @param strategyType the strategy to used to lookup entries * @param maxSize the maximum number of pooled entries * @param cache whether a {@link ThreadLocal} cache should be used for the most recently released entry + * @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID} */ + @Deprecated public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache) { this(strategyType, maxSize, cache, pooled -> 1); } + /** - *

Creates an instance with the specified strategy, an optional {@link ThreadLocal} cache. + *

Creates an instance with the specified strategy. * and a function that returns the max multiplex count for a given pooled object.

* * @param strategyType the strategy to used to lookup entries * @param maxSize the maximum number of pooled entries * @param cache whether a {@link ThreadLocal} cache should be used for the most recently released entry * @param maxMultiplex a function that given the pooled object returns the max multiplex count + * @deprecated cache is no longer supported. Use {@link StrategyType#THREAD_ID} */ + @Deprecated public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToIntFunction

maxMultiplex) + { + this(strategyType, maxSize, maxMultiplex); + } + + /** + *

Creates an instance with the specified strategy. + * and a function that returns the max multiplex count for a given pooled object.

+ * + * @param strategyType the strategy to used to lookup entries + * @param maxSize the maximum number of pooled entries + * @param maxMultiplex a function that given the pooled object returns the max multiplex count + */ + public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

maxMultiplex) { if (maxSize > OPTIMAL_MAX_SIZE && LOG.isDebugEnabled()) LOG.debug("{} configured with max size {} which is above the recommended value {}", getClass().getSimpleName(), maxSize, OPTIMAL_MAX_SIZE); this.maxSize = maxSize; this.strategyType = Objects.requireNonNull(strategyType); - this.cache = cache ? new ThreadLocal<>() : null; this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; this.maxMultiplex = Objects.requireNonNull(maxMultiplex); } @@ -188,17 +196,6 @@ public Entry

acquire() if (size == 0) return null; - if (cache != null) - { - Entry

entry = cache.get(); - if (entry != null && ((ConcurrentEntry

)entry).tryAcquire()) - { - if (LOG.isDebugEnabled()) - LOG.debug("returning cached entry {} for {}", entry, this); - return entry; - } - } - int index = startIndex(size); for (int tries = size; tries-- > 0; ) @@ -258,8 +255,6 @@ private int startIndex(int size) private boolean release(Entry

entry) { boolean released = ((ConcurrentEntry

)entry).tryRelease(); - if (released && cache != null) - cache.set(entry); if (LOG.isDebugEnabled()) LOG.debug("released {} {} for {}", released, entry, this); return released; @@ -268,8 +263,6 @@ private boolean release(Entry

entry) private boolean remove(Entry

entry) { boolean removed = ((ConcurrentEntry

)entry).tryRemove(); - if (cache != null) - cache.set(null); if (LOG.isDebugEnabled()) LOG.debug("removed {} {} for {}", removed, entry, this); if (!removed) @@ -318,8 +311,6 @@ public Collection> terminate() private boolean terminate(Entry

entry) { boolean terminated = ((ConcurrentEntry

)entry).tryTerminate(); - if (cache != null) - cache.set(null); if (!terminated) { if (LOG.isDebugEnabled()) @@ -385,9 +376,7 @@ public enum StrategyType /** * A strategy that uses the {@link Thread#getId()} of the current thread - * to select a starting point for an entry search. Whilst not as performant as - * using the {@link ThreadLocal} cache, it may be suitable when the pool is - * substantially smaller than the number of available threads. + * to select a starting point for an entry search. * No entries are favoured and contention is reduced. */ THREAD_ID, From 9e3d183a1cef80334b612df06d8f252a02856b9c Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 10:48:38 +1100 Subject: [PATCH 05/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added Leak test --- .../eclipse/jetty/util/ConcurrentPool.java | 19 ++++-- .../jetty/util/ConcurrentPoolTest.java | 66 +++++++++++++++++++ 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 178a661a51f0..865c4bcdf07a 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -162,13 +162,7 @@ public Entry

reserve() if (maxSize > 0 && entriesSize >= maxSize) { // Sweep for collected entries - // TODO this could be better? - for (int i = 0; i < entries.size(); i++) - { - Holder

holder = entries.get(i); - if (holder.getEntry() == null) - entries.remove(holder); - } + sweep(); entriesSize = entries.size(); if (maxSize > 0 && entriesSize >= maxSize) { @@ -186,6 +180,17 @@ public Entry

reserve() } } + void sweep() + { + // TODO this could be better? + for (int i = 0; i < entries.size(); i++) + { + Holder

holder = entries.get(i); + if (holder.getEntry() == null) + entries.remove(holder); + } + } + @Override public Entry

acquire() { diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index 20421588fbe7..62d8bca0be20 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -611,4 +611,70 @@ public void testRandomStrategy() assertThat(e3.getPooled().get(), greaterThan(10)); assertThat(e4.getPooled().get(), greaterThan(10)); } + + @ParameterizedTest + @MethodSource(value = "factories") + public void testLeak(Factory factory) + { + ConcurrentPool pool = factory.newPool(10); + pool.reserve().enable("idle", false); + assertThat(pool.size(), is(1)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(0)); + + Pool.Entry e1 = pool.reserve(); + e1.enable("held", true); + assertThat(e1.getPooled(), equalTo("held")); + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(1)); + + Pool.Entry e2 = pool.reserve(); + e2.enable("leaked", true); + assertThat(e2.getPooled(), equalTo("leaked")); + assertThat(pool.size(), is(3)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(2)); + e2 = null; + System.gc(); + System.gc(); + System.gc(); + pool.sweep(); + + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(1)); + + e1 = null; + System.gc(); + System.gc(); + System.gc(); + pool.sweep(); + + assertThat(pool.size(), is(1)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(1)); + assertThat(pool.getInUseCount(), is(0)); + + Pool.Entry e0 = pool.acquire(); + assertThat(e0.getPooled(), equalTo("idle")); + assertThat(pool.size(), is(1)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(1)); + + e0 = null; + System.gc(); + System.gc(); + System.gc(); + pool.sweep(); + assertThat(pool.size(), is(0)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(0)); + } } From 9218c7d99b770deb0ff4d0b790f3ec61268458b7 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 10:54:29 +1100 Subject: [PATCH 06/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings --- .../main/java/org/eclipse/jetty/util/ConcurrentPool.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 865c4bcdf07a..9e16b746ac30 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -187,7 +187,10 @@ void sweep() { Holder

holder = entries.get(i); if (holder.getEntry() == null) + { + LOG.warn("LEAKED {}", holder); entries.remove(holder); + } } } @@ -210,12 +213,11 @@ public Entry

acquire() Holder

holder = entries.get(index); if (holder != null) { - ConcurrentEntry

entry = (ConcurrentEntry

)holder.getEntry(); if (entry == null) { if (LOG.isDebugEnabled()) - LOG.warn("LEAK!!!"); + LOG.warn("LEAKED {}", holder); entries.remove(index); continue; } From 9014d68ee874588d3fc1e40d5bbc1a96706ea86f Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 14:28:28 +1100 Subject: [PATCH 07/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings --- .../eclipse/jetty/util/ConcurrentPool.java | 9 ++- .../jetty/util/ConcurrentPoolTest.java | 66 +++++++++++++++++-- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 9e16b746ac30..d3038620c3f2 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -182,14 +182,13 @@ public Entry

reserve() void sweep() { - // TODO this could be better? for (int i = 0; i < entries.size(); i++) { Holder

holder = entries.get(i); if (holder.getEntry() == null) { LOG.warn("LEAKED {}", holder); - entries.remove(holder); + entries.remove(i--); } } } @@ -446,8 +445,8 @@ public boolean enable(E pooled, boolean acquire) { if (LOG.isDebugEnabled()) LOG.debug("enabled {} for {}", this, pool); - if (acquire) - getHolder().free(); + if (!acquire) + getHolder().hold(); return true; } @@ -650,7 +649,7 @@ private static class Holder

Holder(Entry

entry) { _weak = new WeakReference<>(entry); - _hard = entry; + _hard = null; } Entry

getEntry() diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index 62d8bca0be20..2c59aaad20a0 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -614,7 +614,32 @@ public void testRandomStrategy() @ParameterizedTest @MethodSource(value = "factories") - public void testLeak(Factory factory) + public void testLeakReserved(Factory factory) + { + ConcurrentPool pool = factory.newPool(3); + Pool.Entry e0 = pool.reserve(); + Pool.Entry e1 = pool.reserve(); + Pool.Entry e2 = pool.reserve(); + + assertThat(pool.reserve(), nullValue()); + assertThat(pool.size(), is(3)); + assertThat(pool.getReservedCount(), is(3)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(0)); + + e0 = null; + System.gc(); + pool.sweep(); + + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(2)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(0)); + } + + @ParameterizedTest + @MethodSource(value = "factories") + public void testLeakEnabled(Factory factory) { ConcurrentPool pool = factory.newPool(10); pool.reserve().enable("idle", false); @@ -640,8 +665,6 @@ public void testLeak(Factory factory) assertThat(pool.getInUseCount(), is(2)); e2 = null; System.gc(); - System.gc(); - System.gc(); pool.sweep(); assertThat(pool.size(), is(2)); @@ -651,8 +674,6 @@ public void testLeak(Factory factory) e1 = null; System.gc(); - System.gc(); - System.gc(); pool.sweep(); assertThat(pool.size(), is(1)); @@ -669,12 +690,43 @@ public void testLeak(Factory factory) e0 = null; System.gc(); - System.gc(); - System.gc(); pool.sweep(); assertThat(pool.size(), is(0)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(0)); assertThat(pool.getInUseCount(), is(0)); } + + @ParameterizedTest + @MethodSource(value = "factories") + public void testSweepOnReserve(Factory factory) + { + ConcurrentPool pool = factory.newPool(3); + Pool.Entry e0 = pool.reserve(); + assertThat(e0, notNullValue()); + Pool.Entry e1 = pool.reserve(); + assertThat(e1, notNullValue()); + Pool.Entry e2 = pool.reserve(); + assertThat(e2, notNullValue()); + + assertThat(pool.reserve(), nullValue()); + assertThat(pool.size(), is(3)); + assertThat(pool.getReservedCount(), is(3)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(0)); + + e0 = null; + System.gc(); + + Pool.Entry e3 = pool.reserve(); + assertThat(e3, notNullValue()); + + e1.enable("one", false); + e2.enable("two", false); + e3.enable("three", true); + assertThat(pool.size(), is(3)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(2)); + assertThat(pool.getInUseCount(), is(1)); + } } From 1f91daff3ad188af0c2d0ba85f87b14526d298e6 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 26 Oct 2023 16:38:24 +1100 Subject: [PATCH 08/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings --- .../eclipse/jetty/util/ConcurrentPool.java | 47 +++++++++++++++---- .../jetty/util/ConcurrentPoolTest.java | 20 ++++++-- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index d3038620c3f2..3bb08adf7e4e 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -225,7 +225,6 @@ public Entry

acquire() { if (LOG.isDebugEnabled()) LOG.debug("returning entry {} for {}", entry, this); - holder.free(); return entry; } } @@ -445,8 +444,6 @@ public boolean enable(E pooled, boolean acquire) { if (LOG.isDebugEnabled()) LOG.debug("enabled {} for {}", this, pool); - if (!acquire) - getHolder().hold(); return true; } @@ -487,7 +484,10 @@ private boolean terminate() */ private boolean tryEnable(boolean acquire) { + if (!acquire) + getHolder().hold(); return state.compareAndSet(0, 0, -1, acquire ? 1 : 0); + } /** @@ -519,7 +519,19 @@ private boolean tryAcquire() return false; if (state.compareAndSet(encoded, 0, newMultiplexCount)) + { + if (newMultiplexCount == 1) + { + // We have acquired the entry for the first time, but might be racing with a previous release + // to hold it, so we spin wait to ensure it is held before we free it. + // The spin must end because the count is 1 after being incremented, so some other thread must + // have decremented it to 0 and will have either called free() or is just about to (see tryRelease) + while (!getHolder().held()) + Thread.onSpinWait(); + getHolder().free(); + } return true; + } } } @@ -543,9 +555,8 @@ private boolean tryRelease() int newMultiplexCount = multiplexCount - 1; if (state.compareAndSet(encoded, 0, newMultiplexCount)) { - Holder holder = getHolder(); - if (holder != null && !holder.hold()) - continue; + if (newMultiplexCount == 0) + getHolder().hold(); return true; } } @@ -587,7 +598,12 @@ private boolean tryRemove() boolean result = newMultiplexCount <= 0; removed = result ? -2 : -1; if (state.compareAndSet(encoded, removed, newMultiplexCount)) + { + // Hold the entry if count is zero to make sure any spinning tryAcquires do not loop forever + if (newMultiplexCount == 0) + getHolder().hold(); return result; + } } } @@ -595,6 +611,8 @@ private boolean tryTerminate() { while (true) { + // Hold the entry to make sure any spinning tryAcquires do not loop forever + getHolder().hold(); long encoded = state.get(); if (AtomicBiInteger.getHi(encoded) < 0) return false; @@ -657,9 +675,16 @@ Entry

getEntry() return _weak.get(); } - public boolean hold() + public void hold() + { + Entry

hard = _weak.get(); + _hard = hard; + if (hard == null && LOG.isDebugEnabled()) + LOG.warn("LEAKED {}", this); + } + + public boolean held() { - _hard = _weak.get(); return _hard != null; } @@ -667,5 +692,11 @@ public void free() { _hard = null; } + + @Override + public String toString() + { + return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak.get(), _hard); + } } } diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index 2c59aaad20a0..f019cfa83fa2 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -56,11 +56,10 @@ default ConcurrentPool newPool(int maxEntries) public static List factories() { return List.of( - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(FIRST, maxEntries, false, maxMultiplex), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(FIRST, maxEntries, true, maxMultiplex), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(RANDOM, maxEntries, false, maxMultiplex), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(THREAD_ID, maxEntries, false, maxMultiplex), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(ROUND_ROBIN, maxEntries, false, maxMultiplex) + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(FIRST, maxEntries, maxMultiplex), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(RANDOM, maxEntries, maxMultiplex), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(THREAD_ID, maxEntries, maxMultiplex), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(ROUND_ROBIN, maxEntries, maxMultiplex) ); } @@ -620,6 +619,9 @@ public void testLeakReserved(Factory factory) Pool.Entry e0 = pool.reserve(); Pool.Entry e1 = pool.reserve(); Pool.Entry e2 = pool.reserve(); + assertThat(e0, notNullValue()); + assertThat(e1, notNullValue()); + assertThat(e2, notNullValue()); assertThat(pool.reserve(), nullValue()); assertThat(pool.size(), is(3)); @@ -635,6 +637,8 @@ public void testLeakReserved(Factory factory) assertThat(pool.getReservedCount(), is(2)); assertThat(pool.getIdleCount(), is(0)); assertThat(pool.getInUseCount(), is(0)); + + assertThat(e0, nullValue()); } @ParameterizedTest @@ -695,6 +699,10 @@ public void testLeakEnabled(Factory factory) assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(0)); assertThat(pool.getInUseCount(), is(0)); + + assertThat(e0, nullValue()); + assertThat(e1, nullValue()); + assertThat(e2, nullValue()); } @ParameterizedTest @@ -728,5 +736,7 @@ public void testSweepOnReserve(Factory factory) assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(2)); assertThat(pool.getInUseCount(), is(1)); + + assertThat(e0, nullValue()); } } From ae9b7e86b79f5419b151fdd4b87d910d6726005c Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Thu, 26 Oct 2023 11:04:37 +0200 Subject: [PATCH 09/16] Modify ArrayByteBufferPool and ConcurrentPool to make weak-referencing configurable Signed-off-by: Ludovic Orban --- .../eclipse/jetty/io/ArrayByteBufferPool.java | 49 ++++++++++++++----- .../eclipse/jetty/util/ConcurrentPool.java | 45 ++++++++++++----- .../jetty/util/ConcurrentPoolTest.java | 8 +-- 3 files changed, 75 insertions(+), 27 deletions(-) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index 95d2bf312335..eab7ad49eafa 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -70,7 +70,8 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable /** * Creates a new ArrayByteBufferPool with a default configuration. - * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic; + * pooled buffers are hard-referenced at all times. */ public ArrayByteBufferPool() { @@ -79,7 +80,8 @@ public ArrayByteBufferPool() /** * Creates a new ArrayByteBufferPool with the given configuration. - * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic; + * pooled buffers are hard-referenced at all times. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -92,7 +94,8 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity) /** * Creates a new ArrayByteBufferPool with the given configuration. - * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic; + * pooled buffers are hard-referenced at all times. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -106,6 +109,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max /** * Creates a new ArrayByteBufferPool with the given configuration. + * Pooled buffers are hard-referenced at all times. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -116,7 +120,23 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max */ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) { - this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null); + this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null, false); + } + + /** + * Creates a new ArrayByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param factor the capacity factor + * @param maxCapacity the maximum ByteBuffer capacity + * @param maxBucketSize the maximum number of ByteBuffers for each bucket + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic + * @param weakPool true if the pooled buffers should be weakly referenced upon acquisition, false otherwise + */ + public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, boolean weakPool) + { + this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null, weakPool); } /** @@ -130,8 +150,9 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic * @param bucketIndexFor a {@link IntUnaryOperator} that takes a capacity and returns a bucket index * @param bucketCapacity a {@link IntUnaryOperator} that takes a bucket index and returns a capacity + * @param weakPool true if the underlying pool should weakly reference pooled buffers when they are acquired, false otherwise */ - protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity) + protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity, boolean weakPool) { if (minCapacity <= 0) minCapacity = 0; @@ -153,8 +174,8 @@ protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int for (int i = 0; i < directArray.length; i++) { int capacity = Math.min(bucketCapacity.applyAsInt(i), maxCapacity); - directArray[i] = new RetainedBucket(capacity, maxBucketSize); - indirectArray[i] = new RetainedBucket(capacity, maxBucketSize); + directArray[i] = new RetainedBucket(capacity, maxBucketSize, weakPool); + indirectArray[i] = new RetainedBucket(capacity, maxBucketSize, weakPool); } _minCapacity = minCapacity; @@ -482,13 +503,13 @@ private static class RetainedBucket private final Pool _pool; private final int _capacity; - private RetainedBucket(int capacity, int poolSize) + private RetainedBucket(int capacity, int poolSize, boolean weakPool) { if (poolSize <= ConcurrentPool.OPTIMAL_MAX_SIZE) - _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, false); + _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, e -> 1, weakPool); else _pool = new CompoundPool<>( - new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, false), + new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, e -> 1, weakPool), new QueuedPool<>(poolSize - ConcurrentPool.OPTIMAL_MAX_SIZE) ); _capacity = capacity; @@ -566,6 +587,11 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize) } public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) + { + this(minCapacity, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, false); + } + + public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, boolean weakPool) { super(minCapacity, -1, @@ -574,7 +600,8 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHe maxHeapMemory, maxDirectMemory, c -> 32 - Integer.numberOfLeadingZeros(c - 1), - i -> 1 << i + i -> 1 << i, + weakPool ); } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 3bb08adf7e4e..7f8275dd9b26 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -62,6 +62,7 @@ public class ConcurrentPool

implements Pool

, Dumpable private final AutoLock lock = new AutoLock(); private final AtomicInteger nextIndex; private final ToIntFunction

maxMultiplex; + private final boolean weak; private volatile boolean terminated; /** @@ -115,6 +116,20 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToI * @param maxMultiplex a function that given the pooled object returns the max multiplex count */ public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

maxMultiplex) + { + this(strategyType, maxSize, maxMultiplex, false); + } + + /** + *

Creates an instance with the specified strategy. + * and a function that returns the max multiplex count for a given pooled object.

+ * + * @param strategyType the strategy to used to lookup entries + * @param maxSize the maximum number of pooled entries + * @param maxMultiplex a function that given the pooled object returns the max multiplex count + * @param weak true if the pooled buffers should be weakly referenced upon acquisition, false otherwise + */ + public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

maxMultiplex, boolean weak) { if (maxSize > OPTIMAL_MAX_SIZE && LOG.isDebugEnabled()) LOG.debug("{} configured with max size {} which is above the recommended value {}", getClass().getSimpleName(), maxSize, OPTIMAL_MAX_SIZE); @@ -122,6 +137,7 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

m this.strategyType = Objects.requireNonNull(strategyType); this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; this.maxMultiplex = Objects.requireNonNull(maxMultiplex); + this.weak = weak; } public int getTerminatedCount() @@ -164,7 +180,7 @@ public Entry

reserve() // Sweep for collected entries sweep(); entriesSize = entries.size(); - if (maxSize > 0 && entriesSize >= maxSize) + if (entriesSize >= maxSize) { if (LOG.isDebugEnabled()) LOG.debug("no space: {} >= {}, cannot reserve entry for {}", entriesSize, maxSize, this); @@ -239,7 +255,7 @@ public Entry

acquire() if (size == 0) break; } - if (++index == size) + if (++index >= size) index = 0; } @@ -352,12 +368,13 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x[inUse=%d,size=%d,max=%d,terminated=%b]", + return String.format("%s@%x[inUse=%d,size=%d,max=%d,weak=%b,terminated=%b]", getClass().getSimpleName(), hashCode(), getInUseCount(), size(), getMaxSize(), + weak, isTerminated()); } @@ -419,7 +436,7 @@ public static class ConcurrentEntry implements Entry public ConcurrentEntry(ConcurrentPool pool) { this.pool = pool; - holder = new Holder<>(this); + holder = new Holder<>(this, pool.weak); } private Holder getHolder() @@ -661,23 +678,25 @@ public String toString() private static class Holder

{ - private final WeakReference> _weak; - private volatile Entry

_hard; + private final WeakReference> _weak; + private volatile ConcurrentEntry

_hard; - Holder(Entry

entry) + Holder(ConcurrentEntry

entry, boolean weak) { - _weak = new WeakReference<>(entry); - _hard = null; + _weak = weak ? new WeakReference<>(entry) : null; + _hard = weak ? null : entry; } Entry

getEntry() { - return _weak.get(); + return _weak == null ? _hard : _weak.get(); } public void hold() { - Entry

hard = _weak.get(); + if (_weak == null) + return; + ConcurrentEntry

hard = _weak.get(); _hard = hard; if (hard == null && LOG.isDebugEnabled()) LOG.warn("LEAKED {}", this); @@ -690,13 +709,15 @@ public boolean held() public void free() { + if (_weak == null) + return; _hard = null; } @Override public String toString() { - return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak.get(), _hard); + return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak == null ? "hard" : _weak.get(), _hard); } } } diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index f019cfa83fa2..cd28e32677d7 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -56,10 +56,10 @@ default ConcurrentPool newPool(int maxEntries) public static List factories() { return List.of( - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(FIRST, maxEntries, maxMultiplex), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(RANDOM, maxEntries, maxMultiplex), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(THREAD_ID, maxEntries, maxMultiplex), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(ROUND_ROBIN, maxEntries, maxMultiplex) + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(FIRST, maxEntries, maxMultiplex, true), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(RANDOM, maxEntries, maxMultiplex, true), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(THREAD_ID, maxEntries, maxMultiplex, true), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(ROUND_ROBIN, maxEntries, maxMultiplex, true) ); } From c0eaa4adcbaea4529a8a1adc9fb2245c990e8f27 Mon Sep 17 00:00:00 2001 From: gregw Date: Fri, 27 Oct 2023 10:00:17 +1100 Subject: [PATCH 10/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings --- .../eclipse/jetty/util/ConcurrentPool.java | 100 ++++++++++++------ 1 file changed, 69 insertions(+), 31 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 7f8275dd9b26..a624cb2d9c81 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -54,6 +54,7 @@ public class ConcurrentPool

implements Pool

, Dumpable */ public static final int OPTIMAL_MAX_SIZE = 256; + private static final boolean STRONG_DEFAULT = Boolean.getBoolean(ConcurrentPool.class.getName() + ".STRONG_DEFAULT"); private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPool.class); private final List> entries = new CopyOnWriteArrayList<>(); @@ -90,7 +91,6 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache) this(strategyType, maxSize, cache, pooled -> 1); } - /** *

Creates an instance with the specified strategy. * and a function that returns the max multiplex count for a given pooled object.

@@ -117,7 +117,7 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToI */ public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

maxMultiplex) { - this(strategyType, maxSize, maxMultiplex, false); + this(strategyType, maxSize, maxMultiplex, !STRONG_DEFAULT); } /** @@ -198,13 +198,16 @@ public Entry

reserve() void sweep() { - for (int i = 0; i < entries.size(); i++) + if (weak) { - Holder

holder = entries.get(i); - if (holder.getEntry() == null) + for (int i = 0; i < entries.size(); i++) { - LOG.warn("LEAKED {}", holder); - entries.remove(i--); + Holder

holder = entries.get(i); + if (holder.getEntry() == null) + { + LOG.warn("LEAKED {}", holder); + entries.remove(i--); + } } } } @@ -436,7 +439,7 @@ public static class ConcurrentEntry implements Entry public ConcurrentEntry(ConcurrentPool pool) { this.pool = pool; - holder = new Holder<>(this, pool.weak); + holder = pool.weak ? new WeakHolder<>(this) : new StrongHolder<>(this); } private Holder getHolder() @@ -541,10 +544,6 @@ private boolean tryAcquire() { // We have acquired the entry for the first time, but might be racing with a previous release // to hold it, so we spin wait to ensure it is held before we free it. - // The spin must end because the count is 1 after being incremented, so some other thread must - // have decremented it to 0 and will have either called free() or is just about to (see tryRelease) - while (!getHolder().held()) - Thread.onSpinWait(); getHolder().free(); } return true; @@ -676,48 +675,87 @@ public String toString() } } - private static class Holder

+ public interface Holder

+ { + Pool.Entry

getEntry(); + + void hold(); + + void free(); + } + + private static class StrongHolder

implements Holder

+ { + private final ConcurrentEntry

_strong; + + StrongHolder(ConcurrentEntry

entry) + { + _strong = entry; + } + + @Override + public Entry

getEntry() + { + return _strong; + } + + @Override + public void hold() + { + } + + @Override + public void free() + { + } + + @Override + public String toString() + { + return "%s@%x{%s}".formatted(this.getClass().getSimpleName(), hashCode(), _strong); + } + } + + private static class WeakHolder

implements Holder

{ private final WeakReference> _weak; - private volatile ConcurrentEntry

_hard; + private volatile ConcurrentEntry

_strong; - Holder(ConcurrentEntry

entry, boolean weak) + WeakHolder(ConcurrentEntry

entry) { - _weak = weak ? new WeakReference<>(entry) : null; - _hard = weak ? null : entry; + _weak = new WeakReference<>(entry); } - Entry

getEntry() + @Override + public Entry

getEntry() { - return _weak == null ? _hard : _weak.get(); + return _weak.get(); } + @Override public void hold() { - if (_weak == null) - return; ConcurrentEntry

hard = _weak.get(); - _hard = hard; + _strong = hard; if (hard == null && LOG.isDebugEnabled()) LOG.warn("LEAKED {}", this); } - public boolean held() - { - return _hard != null; - } - + @Override public void free() { - if (_weak == null) - return; - _hard = null; + // Free must only be called when we know the holder will be held. + // The spin must end because the count is 1 after being incremented, so some other thread must + // have decremented it to 0 and will have either called free() or is just about to (see tryRelease) + while (_strong == null) + Thread.onSpinWait(); + _strong = null; } @Override public String toString() { - return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak == null ? "hard" : _weak.get(), _hard); + return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak.get(), _strong); } } } From 8ce0260b580d421d4061ad71632c94a1c88790b5 Mon Sep 17 00:00:00 2001 From: gregw Date: Sat, 28 Oct 2023 09:48:40 +1100 Subject: [PATCH 11/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings no race in tests --- .../jetty/util/ConcurrentPoolTest.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index cd28e32677d7..1620ee63beed 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -18,13 +18,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.ToIntFunction; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import static org.awaitility.Awaitility.await; import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.FIRST; import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.RANDOM; import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.ROUND_ROBIN; @@ -611,6 +614,16 @@ public void testRandomStrategy() assertThat(e4.getPooled().get(), greaterThan(10)); } + private void waitForGC(ConcurrentPool pool, int size) + { + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + System.gc(); + pool.sweep(); + return pool.size(); + }, is(size)); + } + @ParameterizedTest @MethodSource(value = "factories") public void testLeakReserved(Factory factory) @@ -630,9 +643,7 @@ public void testLeakReserved(Factory factory) assertThat(pool.getInUseCount(), is(0)); e0 = null; - System.gc(); - pool.sweep(); - + waitForGC(pool, 2); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(2)); assertThat(pool.getIdleCount(), is(0)); @@ -668,8 +679,7 @@ public void testLeakEnabled(Factory factory) assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(2)); e2 = null; - System.gc(); - pool.sweep(); + waitForGC(pool, 2); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(0)); @@ -677,9 +687,7 @@ public void testLeakEnabled(Factory factory) assertThat(pool.getInUseCount(), is(1)); e1 = null; - System.gc(); - pool.sweep(); - + waitForGC(pool, 1); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); @@ -693,8 +701,7 @@ public void testLeakEnabled(Factory factory) assertThat(pool.getInUseCount(), is(1)); e0 = null; - System.gc(); - pool.sweep(); + waitForGC(pool, 0); assertThat(pool.size(), is(0)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(0)); @@ -724,9 +731,15 @@ public void testSweepOnReserve(Factory factory) assertThat(pool.getInUseCount(), is(0)); e0 = null; - System.gc(); + AtomicReference> entry = new AtomicReference<>(); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + System.gc(); + entry.set(pool.reserve()); + return entry.get(); + }, notNullValue()); - Pool.Entry e3 = pool.reserve(); + Pool.Entry e3 = entry.get(); assertThat(e3, notNullValue()); e1.enable("one", false); From 11a3dff24bc1f4b84a7025f23152967d7f9265c2 Mon Sep 17 00:00:00 2001 From: gregw Date: Mon, 30 Oct 2023 08:08:10 +1100 Subject: [PATCH 12/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings no race in tests --- .../eclipse/jetty/util/ConcurrentPool.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index a624cb2d9c81..34fbdb2bbc9f 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -614,12 +614,7 @@ private boolean tryRemove() boolean result = newMultiplexCount <= 0; removed = result ? -2 : -1; if (state.compareAndSet(encoded, removed, newMultiplexCount)) - { - // Hold the entry if count is zero to make sure any spinning tryAcquires do not loop forever - if (newMultiplexCount == 0) - getHolder().hold(); return result; - } } } @@ -627,8 +622,6 @@ private boolean tryTerminate() { while (true) { - // Hold the entry to make sure any spinning tryAcquires do not loop forever - getHolder().hold(); long encoded = state.get(); if (AtomicBiInteger.getHi(encoded) < 0) return false; @@ -735,19 +728,31 @@ public Entry

getEntry() @Override public void hold() { - ConcurrentEntry

hard = _weak.get(); - _strong = hard; - if (hard == null && LOG.isDebugEnabled()) + ConcurrentEntry

entry = _weak.get(); + if (entry == null) + { LOG.warn("LEAKED {}", this); + return; + } + + // hold must only be called when we know the holder will is free. + while (_strong != null && !entry.isTerminated()) + Thread.onSpinWait(); + _strong = entry; } @Override public void free() { + ConcurrentEntry

entry = _weak.get(); + if (entry == null) + { + LOG.warn("LEAKED {}", this); + return; + } + // Free must only be called when we know the holder will be held. - // The spin must end because the count is 1 after being incremented, so some other thread must - // have decremented it to 0 and will have either called free() or is just about to (see tryRelease) - while (_strong == null) + while (_strong == null && !entry.isTerminated()) Thread.onSpinWait(); _strong = null; } From 0d54890cb690f7e5566b6c7fceea1da5b25b4a40 Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 31 Oct 2023 12:17:32 +1100 Subject: [PATCH 13/16] Experiment with a weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings --- .../eclipse/jetty/util/ConcurrentPool.java | 96 ++++++++++++++----- .../jetty/util/ConcurrentPoolTest.java | 55 +++++++++++ 2 files changed, 128 insertions(+), 23 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 34fbdb2bbc9f..db04f6c2a1ea 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -54,6 +54,7 @@ public class ConcurrentPool

implements Pool

, Dumpable */ public static final int OPTIMAL_MAX_SIZE = 256; + // TODO replace with explicit configuration private static final boolean STRONG_DEFAULT = Boolean.getBoolean(ConcurrentPool.class.getName() + ".STRONG_DEFAULT"); private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPool.class); @@ -159,6 +160,17 @@ private int getMaxMultiplex(P pooled) return maxMultiplex.applyAsInt(pooled); } + protected void leaked(Holder

holder) + { + if (LOG.isDebugEnabled()) + { + if (holder instanceof ConcurrentPool.DebugWeakHolder

debugWeakHolder) + LOG.warn("LEAKED {}", this, debugWeakHolder.getLastFreed()); + else + LOG.warn("LEAKED {}", this); + } + } + @Override public Entry

reserve() { @@ -205,7 +217,7 @@ void sweep() Holder

holder = entries.get(i); if (holder.getEntry() == null) { - LOG.warn("LEAKED {}", holder); + leaked(holder); entries.remove(i--); } } @@ -234,8 +246,7 @@ public Entry

acquire() ConcurrentEntry

entry = (ConcurrentEntry

)holder.getEntry(); if (entry == null) { - if (LOG.isDebugEnabled()) - LOG.warn("LEAKED {}", holder); + leaked(holder); entries.remove(index); continue; } @@ -439,7 +450,9 @@ public static class ConcurrentEntry implements Entry public ConcurrentEntry(ConcurrentPool pool) { this.pool = pool; - holder = pool.weak ? new WeakHolder<>(this) : new StrongHolder<>(this); + holder = pool.weak + ? (LOG.isDebugEnabled() ? new DebugWeakHolder<>(pool, this) : new WeakHolder<>(this)) + : new StrongHolder<>(this); } private Holder getHolder() @@ -459,6 +472,8 @@ public boolean enable(E pooled, boolean acquire) throw new IllegalStateException("Entry already enabled " + this + " for " + pool); } this.pooled = pooled; + if (LOG.isDebugEnabled() && getHolder() instanceof ConcurrentPool.DebugWeakHolder debugWeakHolder) + debugWeakHolder.update(); if (tryEnable(acquire)) { @@ -668,7 +683,7 @@ public String toString() } } - public interface Holder

+ protected interface Holder

{ Pool.Entry

getEntry(); @@ -677,11 +692,11 @@ public interface Holder

void free(); } - private static class StrongHolder

implements Holder

+ protected static class StrongHolder

implements Holder

{ private final ConcurrentEntry

_strong; - StrongHolder(ConcurrentEntry

entry) + protected StrongHolder(ConcurrentEntry

entry) { _strong = entry; } @@ -709,12 +724,12 @@ public String toString() } } - private static class WeakHolder

implements Holder

+ protected static class WeakHolder

implements Holder

{ private final WeakReference> _weak; private volatile ConcurrentEntry

_strong; - WeakHolder(ConcurrentEntry

entry) + protected WeakHolder(ConcurrentEntry

entry) { _weak = new WeakReference<>(entry); } @@ -728,17 +743,7 @@ public Entry

getEntry() @Override public void hold() { - ConcurrentEntry

entry = _weak.get(); - if (entry == null) - { - LOG.warn("LEAKED {}", this); - return; - } - - // hold must only be called when we know the holder will is free. - while (_strong != null && !entry.isTerminated()) - Thread.onSpinWait(); - _strong = entry; + _strong = _weak.get(); } @Override @@ -746,10 +751,7 @@ public void free() { ConcurrentEntry

entry = _weak.get(); if (entry == null) - { - LOG.warn("LEAKED {}", this); return; - } // Free must only be called when we know the holder will be held. while (_strong == null && !entry.isTerminated()) @@ -763,4 +765,52 @@ public String toString() return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak.get(), _strong); } } + + protected static class DebugWeakHolder

extends WeakHolder

+ { + private final ConcurrentPool

_pool; + private Throwable _lastFreed; + + protected DebugWeakHolder(ConcurrentPool

pool, ConcurrentEntry

entry) + { + super(entry); + _pool = pool; + update(); + } + + public void update() + { + _lastFreed = new Throwable(Thread.currentThread().getName() + ":" + getEntry()); + } + + Throwable getLastFreed() + { + return _lastFreed; + } + + @Override + public void hold() + { + if (getEntry() == null) + _pool.leaked(this); + super.hold(); + } + + @Override + public void free() + { + update(); + Entry

entry = getEntry(); + if (entry == null) + _pool.leaked(this); + else + super.free(); + } + + @Override + public String toString() + { + return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), getEntry(), _lastFreed.getMessage()); + } + } } diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index 1620ee63beed..3fa9bd3b37e0 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -18,14 +18,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.ToIntFunction; +import org.eclipse.jetty.logging.JettyLevel; +import org.eclipse.jetty.logging.JettyLogger; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; import static org.awaitility.Awaitility.await; import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.FIRST; @@ -33,6 +37,7 @@ import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.ROUND_ROBIN; import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.THREAD_ID; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -752,4 +757,54 @@ public void testSweepOnReserve(Factory factory) assertThat(e0, nullValue()); } + + @Test + public void testLeakDebug() + { + if (LoggerFactory.getLogger(ConcurrentPool.class) instanceof JettyLogger jettyLogger) + { + jettyLogger.setLevel(JettyLevel.DEBUG); + jettyLogger.setHideStacks(true); + + List history = new CopyOnWriteArrayList<>(); + + ConcurrentPool pool = new ConcurrentPool<>(RANDOM, 10, p -> 1, true) + { + @Override + protected void leaked(Holder holder) + { + if (holder instanceof ConcurrentPool.DebugWeakHolder debugWeakHolder) + { + history.add(debugWeakHolder.getLastFreed()); + } + } + }; + Pool.Entry e0 = pool.reserve(); + Pool.Entry e1 = pool.reserve(); + e1.enable("test", true); + assertThat(e0, notNullValue()); + assertThat(e1, notNullValue()); + + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(1)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(1)); + + e0 = null; + e1 = null; + waitForGC(pool, 0); + assertThat(pool.size(), is(0)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(0)); + assertThat(e0, nullValue()); + assertThat(e1, nullValue()); + + assertThat(history.size(), is(2)); + assertThat(history.toString(), containsString("pooled=null")); + assertThat(history.toString(), containsString("pooled=test")); + assertThat(history.toString(), containsString(Thread.currentThread().getName() + ":")); + jettyLogger.setLevel(JettyLevel.WARN); + } + } } From bbcc930aa48b2db5e5ac480388a85f16f907662e Mon Sep 17 00:00:00 2001 From: gregw Date: Wed, 1 Nov 2023 08:36:20 +1100 Subject: [PATCH 14/16] A weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings --- .../eclipse/jetty/util/ConcurrentPool.java | 133 +++++------------- .../jetty/util/ConcurrentPoolTest.java | 2 +- 2 files changed, 35 insertions(+), 100 deletions(-) diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index db04f6c2a1ea..996c07b507e2 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -21,9 +21,11 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.ToIntFunction; import java.util.stream.Stream; +import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; @@ -36,10 +38,9 @@ *

This implementation offers a number of {@link StrategyType strategies} * used to select the entry returned from {@link #acquire()}, and its * capacity is bounded to a {@link #getMaxSize() max size}.

- *

A thread-local caching is also available, disabled by default, that - * is useful when entries are acquired and released by the same thread, - * but hampering performance when entries are acquired by one thread but - * released by a different thread.

+ *

When a pooled item is {@link #acquire() acquired} from this pool, it is only held + * by a {@link WeakReference}, so that if it is collected before being {@link #release(Entry) released}, + * then that leak is detected and the entry is {@link #remove(Entry) removed} (see {@link #getLeaked()}.

* * @param

the type of the pooled objects */ @@ -54,8 +55,6 @@ public class ConcurrentPool

implements Pool

, Dumpable */ public static final int OPTIMAL_MAX_SIZE = 256; - // TODO replace with explicit configuration - private static final boolean STRONG_DEFAULT = Boolean.getBoolean(ConcurrentPool.class.getName() + ".STRONG_DEFAULT"); private static final Logger LOG = LoggerFactory.getLogger(ConcurrentPool.class); private final List> entries = new CopyOnWriteArrayList<>(); @@ -64,7 +63,8 @@ public class ConcurrentPool

implements Pool

, Dumpable private final AutoLock lock = new AutoLock(); private final AtomicInteger nextIndex; private final ToIntFunction

maxMultiplex; - private final boolean weak; + private final LongAdder leaked = new LongAdder(); + private volatile boolean terminated; /** @@ -118,7 +118,12 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToI */ public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

maxMultiplex) { - this(strategyType, maxSize, maxMultiplex, !STRONG_DEFAULT); + if (maxSize > OPTIMAL_MAX_SIZE && LOG.isDebugEnabled()) + LOG.debug("{} configured with max size {} which is above the recommended value {}", getClass().getSimpleName(), maxSize, OPTIMAL_MAX_SIZE); + this.maxSize = maxSize; + this.strategyType = Objects.requireNonNull(strategyType); + this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; + this.maxMultiplex = Objects.requireNonNull(maxMultiplex); } /** @@ -130,23 +135,16 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

m * @param maxMultiplex a function that given the pooled object returns the max multiplex count * @param weak true if the pooled buffers should be weakly referenced upon acquisition, false otherwise */ + @Deprecated public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

maxMultiplex, boolean weak) { - if (maxSize > OPTIMAL_MAX_SIZE && LOG.isDebugEnabled()) - LOG.debug("{} configured with max size {} which is above the recommended value {}", getClass().getSimpleName(), maxSize, OPTIMAL_MAX_SIZE); - this.maxSize = maxSize; - this.strategyType = Objects.requireNonNull(strategyType); - this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; - this.maxMultiplex = Objects.requireNonNull(maxMultiplex); - this.weak = weak; + this(strategyType, maxSize, maxMultiplex); } - public int getTerminatedCount() + @ManagedAttribute("number of entries leaked (not released nor referenced)") + public long getLeaked() { - return (int)entries.stream() - .map(Holder::getEntry) - .filter(Objects::nonNull) - .filter(Entry::isTerminated).count(); + return leaked.longValue(); } /** @@ -160,15 +158,11 @@ private int getMaxMultiplex(P pooled) return maxMultiplex.applyAsInt(pooled); } - protected void leaked(Holder

holder) + void leaked(Holder

holder) { - if (LOG.isDebugEnabled()) - { - if (holder instanceof ConcurrentPool.DebugWeakHolder

debugWeakHolder) - LOG.warn("LEAKED {}", this, debugWeakHolder.getLastFreed()); - else - LOG.warn("LEAKED {}", this); - } + leaked.increment(); + if (holder instanceof ConcurrentPool.DebugWeakHolder

debugWeakHolder) + LOG.warn("LEAKED {}", this, debugWeakHolder.getLastFreed()); } @Override @@ -210,16 +204,13 @@ public Entry

reserve() void sweep() { - if (weak) + for (int i = 0; i < entries.size(); i++) { - for (int i = 0; i < entries.size(); i++) + Holder

holder = entries.get(i); + if (holder.getEntry() == null) { - Holder

holder = entries.get(i); - if (holder.getEntry() == null) - { - leaked(holder); - entries.remove(i--); - } + leaked(holder); + entries.remove(i--); } } } @@ -382,13 +373,12 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x[inUse=%d,size=%d,max=%d,weak=%b,terminated=%b]", + return String.format("%s@%x[inUse=%d,size=%d,max=%d,terminated=%b]", getClass().getSimpleName(), hashCode(), getInUseCount(), size(), getMaxSize(), - weak, isTerminated()); } @@ -450,9 +440,7 @@ public static class ConcurrentEntry implements Entry public ConcurrentEntry(ConcurrentPool pool) { this.pool = pool; - holder = pool.weak - ? (LOG.isDebugEnabled() ? new DebugWeakHolder<>(pool, this) : new WeakHolder<>(this)) - : new StrongHolder<>(this); + holder = LOG.isDebugEnabled() ? new DebugWeakHolder<>(this) : new Holder<>(this); } private Holder getHolder() @@ -522,7 +510,6 @@ private boolean tryEnable(boolean acquire) if (!acquire) getHolder().hold(); return state.compareAndSet(0, 0, -1, acquire ? 1 : 0); - } /** @@ -683,70 +670,26 @@ public String toString() } } - protected interface Holder

- { - Pool.Entry

getEntry(); - - void hold(); - - void free(); - } - - protected static class StrongHolder

implements Holder

- { - private final ConcurrentEntry

_strong; - - protected StrongHolder(ConcurrentEntry

entry) - { - _strong = entry; - } - - @Override - public Entry

getEntry() - { - return _strong; - } - - @Override - public void hold() - { - } - - @Override - public void free() - { - } - - @Override - public String toString() - { - return "%s@%x{%s}".formatted(this.getClass().getSimpleName(), hashCode(), _strong); - } - } - - protected static class WeakHolder

implements Holder

+ static class Holder

{ private final WeakReference> _weak; private volatile ConcurrentEntry

_strong; - protected WeakHolder(ConcurrentEntry

entry) + protected Holder(ConcurrentEntry

entry) { _weak = new WeakReference<>(entry); } - @Override public Entry

getEntry() { return _weak.get(); } - @Override public void hold() { _strong = _weak.get(); } - @Override public void free() { ConcurrentEntry

entry = _weak.get(); @@ -766,15 +709,13 @@ public String toString() } } - protected static class DebugWeakHolder

extends WeakHolder

+ static class DebugWeakHolder

extends Holder

{ - private final ConcurrentPool

_pool; private Throwable _lastFreed; - protected DebugWeakHolder(ConcurrentPool

pool, ConcurrentEntry

entry) + protected DebugWeakHolder(ConcurrentEntry

entry) { super(entry); - _pool = pool; update(); } @@ -791,8 +732,6 @@ Throwable getLastFreed() @Override public void hold() { - if (getEntry() == null) - _pool.leaked(this); super.hold(); } @@ -800,11 +739,7 @@ public void hold() public void free() { update(); - Entry

entry = getEntry(); - if (entry == null) - _pool.leaked(this); - else - super.free(); + super.free(); } @Override diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index 3fa9bd3b37e0..36c63d035d12 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -771,7 +771,7 @@ public void testLeakDebug() ConcurrentPool pool = new ConcurrentPool<>(RANDOM, 10, p -> 1, true) { @Override - protected void leaked(Holder holder) + void leaked(Holder holder) { if (holder instanceof ConcurrentPool.DebugWeakHolder debugWeakHolder) { From dd53992e4c52392f04f39b8924db9f611572a41c Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 2 Nov 2023 10:08:14 +1100 Subject: [PATCH 15/16] A weakly referenced ConcurrentPool The Pool is now a list of Holder instances, each with a WeakReference to an Entry and an optional hard reference. free on acquire added warnings --- .../jetty/client/MultiplexConnectionPool.java | 2 +- .../jetty/client/RandomConnectionPool.java | 2 +- .../client/RoundRobinConnectionPool.java | 2 +- .../tests/MultiplexedConnectionPoolTest.java | 8 +- .../eclipse/jetty/io/ArrayByteBufferPool.java | 4 +- .../eclipse/jetty/util/ConcurrentPool.java | 69 ++------------- .../jetty/util/ConcurrentPoolTest.java | 86 +++++++------------ 7 files changed, 45 insertions(+), 128 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index 6d4dcf602e36..6f5a0f26ebb2 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -44,7 +44,7 @@ public static ToIntFunction newMaxMultiplexer(int defaultMaxMultiple public MultiplexConnectionPool(Destination destination, int maxConnections, int initialMaxMultiplex) { - this(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false, newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex); + this(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex); } protected MultiplexConnectionPool(Destination destination, Pool.Factory poolFactory, int initialMaxMultiplex) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java index 387876781aa3..3188adfe1b42 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java @@ -25,6 +25,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool { public RandomConnectionPool(Destination destination, int maxConnections, int initialMaxMultiplex) { - super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.RANDOM, maxConnections, false, MultiplexConnectionPool.newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex); + super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.RANDOM, maxConnections, MultiplexConnectionPool.newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex); } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index eec1571d7b07..e4c45c1e2c57 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -50,7 +50,7 @@ public RoundRobinConnectionPool(Destination destination, int maxConnections) public RoundRobinConnectionPool(Destination destination, int maxConnections, int initialMaxMultiplex) { - super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.ROUND_ROBIN, maxConnections, false, newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex); + super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.ROUND_ROBIN, maxConnections, newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex); // If there are queued requests and connections get // closed due to idle timeout or overuse, we want to // aggressively try to open new connections to replace diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/MultiplexedConnectionPoolTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/MultiplexedConnectionPoolTest.java index 752bf5360091..c28aa61ad149 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/MultiplexedConnectionPoolTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/MultiplexedConnectionPoolTest.java @@ -112,7 +112,7 @@ public void testMaxDurationConnectionsWithMultiplexedPoolLifecycle() throws Exce { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); int initialMaxMultiplex = 10; - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false, MultiplexConnectionPool.newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, MultiplexConnectionPool.newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex) { @Override protected void onCreated(Connection connection) @@ -217,7 +217,7 @@ public void testStreamIdleTimeout() throws Exception { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); int initialMaxMultiplex = 10; - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false, MultiplexConnectionPool.newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, MultiplexConnectionPool.newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex) { @Override protected void onCreated(Connection connection) @@ -292,7 +292,7 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception ConnectionPoolFactory factory = new ConnectionPoolFactory("maxDurationConnectionsWithMultiplexedPool", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false, MultiplexConnectionPool.newMaxMultiplexer(MAX_MULTIPLEX)), MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, MultiplexConnectionPool.newMaxMultiplexer(MAX_MULTIPLEX)), MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) @@ -363,7 +363,7 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false, MultiplexConnectionPool.newMaxMultiplexer(MAX_MULTIPLEX)), MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, MultiplexConnectionPool.newMaxMultiplexer(MAX_MULTIPLEX)), MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index eab7ad49eafa..6851c2ca9cb9 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -506,10 +506,10 @@ private static class RetainedBucket private RetainedBucket(int capacity, int poolSize, boolean weakPool) { if (poolSize <= ConcurrentPool.OPTIMAL_MAX_SIZE) - _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, e -> 1, weakPool); + _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, e -> 1); else _pool = new CompoundPool<>( - new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, e -> 1, weakPool), + new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, e -> 1), new QueuedPool<>(poolSize - ConcurrentPool.OPTIMAL_MAX_SIZE) ); _capacity = capacity; diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index 996c07b507e2..c970e2b52c7d 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -89,7 +89,7 @@ public ConcurrentPool(StrategyType strategyType, int maxSize) @Deprecated public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache) { - this(strategyType, maxSize, cache, pooled -> 1); + this(strategyType, maxSize, pooled -> 1); } /** @@ -126,21 +126,6 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

m this.maxMultiplex = Objects.requireNonNull(maxMultiplex); } - /** - *

Creates an instance with the specified strategy. - * and a function that returns the max multiplex count for a given pooled object.

- * - * @param strategyType the strategy to used to lookup entries - * @param maxSize the maximum number of pooled entries - * @param maxMultiplex a function that given the pooled object returns the max multiplex count - * @param weak true if the pooled buffers should be weakly referenced upon acquisition, false otherwise - */ - @Deprecated - public ConcurrentPool(StrategyType strategyType, int maxSize, ToIntFunction

maxMultiplex, boolean weak) - { - this(strategyType, maxSize, maxMultiplex); - } - @ManagedAttribute("number of entries leaked (not released nor referenced)") public long getLeaked() { @@ -158,11 +143,11 @@ private int getMaxMultiplex(P pooled) return maxMultiplex.applyAsInt(pooled); } - void leaked(Holder

holder) + private void leaked(Holder

holder) { leaked.increment(); - if (holder instanceof ConcurrentPool.DebugWeakHolder

debugWeakHolder) - LOG.warn("LEAKED {}", this, debugWeakHolder.getLastFreed()); + if (LOG.isDebugEnabled()) + LOG.debug("Leaked " + holder); } @Override @@ -440,7 +425,7 @@ public static class ConcurrentEntry implements Entry public ConcurrentEntry(ConcurrentPool pool) { this.pool = pool; - holder = LOG.isDebugEnabled() ? new DebugWeakHolder<>(this) : new Holder<>(this); + holder = new Holder<>(this); } private Holder getHolder() @@ -460,8 +445,6 @@ public boolean enable(E pooled, boolean acquire) throw new IllegalStateException("Entry already enabled " + this + " for " + pool); } this.pooled = pooled; - if (LOG.isDebugEnabled() && getHolder() instanceof ConcurrentPool.DebugWeakHolder debugWeakHolder) - debugWeakHolder.update(); if (tryEnable(acquire)) { @@ -670,7 +653,7 @@ public String toString() } } - static class Holder

+ private static class Holder

{ private final WeakReference> _weak; private volatile ConcurrentEntry

_strong; @@ -708,44 +691,4 @@ public String toString() return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), _weak.get(), _strong); } } - - static class DebugWeakHolder

extends Holder

- { - private Throwable _lastFreed; - - protected DebugWeakHolder(ConcurrentEntry

entry) - { - super(entry); - update(); - } - - public void update() - { - _lastFreed = new Throwable(Thread.currentThread().getName() + ":" + getEntry()); - } - - Throwable getLastFreed() - { - return _lastFreed; - } - - @Override - public void hold() - { - super.hold(); - } - - @Override - public void free() - { - update(); - super.free(); - } - - @Override - public String toString() - { - return "%s@%x{%s,%s}".formatted(this.getClass().getSimpleName(), hashCode(), getEntry(), _lastFreed.getMessage()); - } - } } diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index 36c63d035d12..608fed88bd6e 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -18,18 +18,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.ToIntFunction; -import org.eclipse.jetty.logging.JettyLevel; -import org.eclipse.jetty.logging.JettyLogger; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.LoggerFactory; import static org.awaitility.Awaitility.await; import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.FIRST; @@ -37,7 +33,6 @@ import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.ROUND_ROBIN; import static org.eclipse.jetty.util.ConcurrentPool.StrategyType.THREAD_ID; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -64,10 +59,10 @@ default ConcurrentPool newPool(int maxEntries) public static List factories() { return List.of( - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(FIRST, maxEntries, maxMultiplex, true), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(RANDOM, maxEntries, maxMultiplex, true), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(THREAD_ID, maxEntries, maxMultiplex, true), - (maxEntries, maxMultiplex) -> new ConcurrentPool<>(ROUND_ROBIN, maxEntries, maxMultiplex, true) + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(FIRST, maxEntries, maxMultiplex), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(RANDOM, maxEntries, maxMultiplex), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(THREAD_ID, maxEntries, maxMultiplex), + (maxEntries, maxMultiplex) -> new ConcurrentPool<>(ROUND_ROBIN, maxEntries, maxMultiplex) ); } @@ -758,53 +753,32 @@ public void testSweepOnReserve(Factory factory) assertThat(e0, nullValue()); } - @Test - public void testLeakDebug() + @ParameterizedTest + @MethodSource(value = "factories") + public void testLeakDebug(Factory factory) { - if (LoggerFactory.getLogger(ConcurrentPool.class) instanceof JettyLogger jettyLogger) - { - jettyLogger.setLevel(JettyLevel.DEBUG); - jettyLogger.setHideStacks(true); - - List history = new CopyOnWriteArrayList<>(); - - ConcurrentPool pool = new ConcurrentPool<>(RANDOM, 10, p -> 1, true) - { - @Override - void leaked(Holder holder) - { - if (holder instanceof ConcurrentPool.DebugWeakHolder debugWeakHolder) - { - history.add(debugWeakHolder.getLastFreed()); - } - } - }; - Pool.Entry e0 = pool.reserve(); - Pool.Entry e1 = pool.reserve(); - e1.enable("test", true); - assertThat(e0, notNullValue()); - assertThat(e1, notNullValue()); - - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(1)); - - e0 = null; - e1 = null; - waitForGC(pool, 0); - assertThat(pool.size(), is(0)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(0)); - assertThat(e0, nullValue()); - assertThat(e1, nullValue()); - - assertThat(history.size(), is(2)); - assertThat(history.toString(), containsString("pooled=null")); - assertThat(history.toString(), containsString("pooled=test")); - assertThat(history.toString(), containsString(Thread.currentThread().getName() + ":")); - jettyLogger.setLevel(JettyLevel.WARN); - } + ConcurrentPool pool = factory.newPool(10); + Pool.Entry e0 = pool.reserve(); + Pool.Entry e1 = pool.reserve(); + e1.enable("test", true); + assertThat(e0, notNullValue()); + assertThat(e1, notNullValue()); + + assertThat(pool.size(), is(2)); + assertThat(pool.getReservedCount(), is(1)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(1)); + + e0 = null; + e1 = null; + waitForGC(pool, 0); + assertThat(pool.size(), is(0)); + assertThat(pool.getReservedCount(), is(0)); + assertThat(pool.getIdleCount(), is(0)); + assertThat(pool.getInUseCount(), is(0)); + assertThat(e0, nullValue()); + assertThat(e1, nullValue()); + + assertThat(pool.getLeaked(), is(2L)); } } From 8799ea4b1e68672ba4fbea5610dd751495c32d9e Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 9 Nov 2023 11:24:52 +0100 Subject: [PATCH 16/16] * Avoid using deprecated ConcurrentPool constructors. * Removed unused boolean parameter "weakPool" in ArrayByteBufferPool and updated javadocs. * Improved implementation of ConcurrentPool.Entry.tryEnable(). * Added javadocs for ConcurrentPool.Holder.free() and hold(). Signed-off-by: Simone Bordet --- .../jetty/client/DuplexConnectionPool.java | 2 +- .../eclipse/jetty/io/ArrayByteBufferPool.java | 45 ++++--------------- .../eclipse/jetty/util/ConcurrentPool.java | 31 ++++++++++--- .../util/compression/CompressionPool.java | 2 +- .../jetty/util/ConcurrentPoolTest.java | 2 +- .../jetty/util/PoolStrategyBenchmark.java | 14 ++---- 6 files changed, 40 insertions(+), 56 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java index 147ed8eaa8e5..efc44ec0eba5 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java @@ -21,6 +21,6 @@ public class DuplexConnectionPool extends AbstractConnectionPool { public DuplexConnectionPool(Destination destination, int maxConnections) { - super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false), 1); + super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections), 1); } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index 6851c2ca9cb9..92ff52e4210f 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -70,8 +70,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable /** * Creates a new ArrayByteBufferPool with a default configuration. - * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic; - * pooled buffers are hard-referenced at all times. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. */ public ArrayByteBufferPool() { @@ -80,8 +79,7 @@ public ArrayByteBufferPool() /** * Creates a new ArrayByteBufferPool with the given configuration. - * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic; - * pooled buffers are hard-referenced at all times. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -94,8 +92,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity) /** * Creates a new ArrayByteBufferPool with the given configuration. - * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic; - * pooled buffers are hard-referenced at all times. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -109,7 +106,6 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max /** * Creates a new ArrayByteBufferPool with the given configuration. - * Pooled buffers are hard-referenced at all times. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -120,23 +116,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max */ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) { - this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null, false); - } - - /** - * Creates a new ArrayByteBufferPool with the given configuration. - * - * @param minCapacity the minimum ByteBuffer capacity - * @param factor the capacity factor - * @param maxCapacity the maximum ByteBuffer capacity - * @param maxBucketSize the maximum number of ByteBuffers for each bucket - * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic - * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic - * @param weakPool true if the pooled buffers should be weakly referenced upon acquisition, false otherwise - */ - public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, boolean weakPool) - { - this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null, weakPool); + this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null); } /** @@ -150,9 +130,8 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic * @param bucketIndexFor a {@link IntUnaryOperator} that takes a capacity and returns a bucket index * @param bucketCapacity a {@link IntUnaryOperator} that takes a bucket index and returns a capacity - * @param weakPool true if the underlying pool should weakly reference pooled buffers when they are acquired, false otherwise */ - protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity, boolean weakPool) + protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity) { if (minCapacity <= 0) minCapacity = 0; @@ -174,8 +153,8 @@ protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int for (int i = 0; i < directArray.length; i++) { int capacity = Math.min(bucketCapacity.applyAsInt(i), maxCapacity); - directArray[i] = new RetainedBucket(capacity, maxBucketSize, weakPool); - indirectArray[i] = new RetainedBucket(capacity, maxBucketSize, weakPool); + directArray[i] = new RetainedBucket(capacity, maxBucketSize); + indirectArray[i] = new RetainedBucket(capacity, maxBucketSize); } _minCapacity = minCapacity; @@ -503,7 +482,7 @@ private static class RetainedBucket private final Pool _pool; private final int _capacity; - private RetainedBucket(int capacity, int poolSize, boolean weakPool) + private RetainedBucket(int capacity, int poolSize) { if (poolSize <= ConcurrentPool.OPTIMAL_MAX_SIZE) _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, e -> 1); @@ -587,11 +566,6 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize) } public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) - { - this(minCapacity, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, false); - } - - public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, boolean weakPool) { super(minCapacity, -1, @@ -600,8 +574,7 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHe maxHeapMemory, maxDirectMemory, c -> 32 - Integer.numberOfLeadingZeros(c - 1), - i -> 1 << i, - weakPool + i -> 1 << i ); } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index c970e2b52c7d..ae8337b724ac 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -75,7 +75,7 @@ public class ConcurrentPool

implements Pool

, Dumpable */ public ConcurrentPool(StrategyType strategyType, int maxSize) { - this(strategyType, maxSize, false); + this(strategyType, maxSize, pooled -> 1); } /** @@ -490,9 +490,10 @@ private boolean terminate() */ private boolean tryEnable(boolean acquire) { - if (!acquire) + boolean enabled = state.compareAndSet(0, 0, -1, acquire ? 1 : 0); + if (enabled && !acquire) getHolder().hold(); - return state.compareAndSet(0, 0, -1, acquire ? 1 : 0); + return enabled; } /** @@ -526,11 +527,7 @@ private boolean tryAcquire() if (state.compareAndSet(encoded, 0, newMultiplexCount)) { if (newMultiplexCount == 1) - { - // We have acquired the entry for the first time, but might be racing with a previous release - // to hold it, so we spin wait to ensure it is held before we free it. getHolder().free(); - } return true; } } @@ -653,6 +650,20 @@ public String toString() } } + /** + *

Holds a strong and a weak reference to an {@link Entry} to avoid holding + * on to entries that are not released, so that they can be garbage collected.

+ *

Methods {@link #hold()} and {@link #free()} work together to clear the + * strong reference when the entry is acquired, and assign it when the entry + * is released.

+ *

This class handles a race condition happening when an entry is being + * released with multiplex count going {@code 1 -> 0} by one thread that + * has not yet called {@link #hold()}, and immediately acquired by another + * thread that is calling {@link #free()}. + * The call to {@link #free()} spin loops until {@link #hold()} returns.

+ * + * @param

+ */ private static class Holder

{ private final WeakReference> _weak; @@ -668,11 +679,17 @@ public Entry

getEntry() return _weak.get(); } + /** + *

Called when an entry is released to the pool with multiplex count going from {@code 1} to {@code 0}.

+ */ public void hold() { _strong = _weak.get(); } + /** + *

Called when an entry is acquired from the pool with multiplex count going from {@code 0} to {@code 1}.

+ */ public void free() { ConcurrentEntry

entry = _weak.get(); diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java index ced0c8ac342a..77b4ed6db89f 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java @@ -96,7 +96,7 @@ protected void doStart() throws Exception { if (_capacity > 0) { - _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, _capacity, true); + _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, _capacity); addBean(_pool); } super.doStart(); diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java index 608fed88bd6e..1cb8b059f939 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/ConcurrentPoolTest.java @@ -46,7 +46,7 @@ public class ConcurrentPoolTest { - interface Factory + public interface Factory { default ConcurrentPool newPool(int maxEntries) { diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java index c7d7ce330e69..63ccffbd1276 100644 --- a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/PoolStrategyBenchmark.java @@ -41,12 +41,6 @@ public class PoolStrategyBenchmark }) public static String POOL_TYPE; - @Param({ - "false", - "true" - }) - public static boolean CACHE; - @Param({ "4", "16" @@ -64,10 +58,10 @@ public void setUp() throws Exception pool = switch (POOL_TYPE) { - case "First" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, SIZE, CACHE); - case "Random" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.RANDOM, SIZE, CACHE); - case "ThreadId" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, SIZE, CACHE); - case "RoundRobin" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.ROUND_ROBIN, SIZE, CACHE); + case "First" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, SIZE); + case "Random" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.RANDOM, SIZE); + case "ThreadId" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, SIZE); + case "RoundRobin" -> new ConcurrentPool<>(ConcurrentPool.StrategyType.ROUND_ROBIN, SIZE); default -> throw new IllegalStateException(); };