Skip to content

Commit

Permalink
#10569 improve warning on detected leaks and make weakly-references e…
Browse files Browse the repository at this point in the history
…ntries configurable

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Oct 18, 2023
1 parent 0c3c9af commit df6ff99
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null);
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null, false);
}

public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, boolean weakPool)
{
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null, true);
}

/**
Expand All @@ -131,7 +136,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
* @param bucketIndexFor a {@link IntUnaryOperator} that takes a capacity and returns a bucket index
* @param bucketCapacity a {@link IntUnaryOperator} that takes a bucket index and returns a capacity
*/
protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity)
protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, IntUnaryOperator bucketIndexFor, IntUnaryOperator bucketCapacity, boolean weakPool)
{
if (minCapacity <= 0)
minCapacity = 0;
Expand All @@ -153,8 +158,8 @@ protected ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int
for (int i = 0; i < directArray.length; i++)
{
int capacity = Math.min(bucketCapacity.applyAsInt(i), maxCapacity);
directArray[i] = new RetainedBucket(capacity, maxBucketSize);
indirectArray[i] = new RetainedBucket(capacity, maxBucketSize);
directArray[i] = new RetainedBucket(capacity, maxBucketSize, weakPool);
indirectArray[i] = new RetainedBucket(capacity, maxBucketSize, weakPool);
}

_minCapacity = minCapacity;
Expand Down Expand Up @@ -476,13 +481,13 @@ private static class RetainedBucket
private final Pool<RetainableByteBuffer> _pool;
private final int _capacity;

private RetainedBucket(int capacity, int poolSize)
private RetainedBucket(int capacity, int poolSize, boolean weakPool)
{
if (poolSize <= ConcurrentPool.OPTIMAL_MAX_SIZE)
_pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, false);
_pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, false, e -> 1, weakPool);
else
_pool = new CompoundPool<>(
new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, false),
new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, false, e -> 1, weakPool),
new QueuedPool<>(poolSize - ConcurrentPool.OPTIMAL_MAX_SIZE)
);
_capacity = capacity;
Expand Down Expand Up @@ -560,6 +565,11 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize)
}

public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
this(minCapacity, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, false);
}

public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, boolean weakPool)
{
super(minCapacity,
-1,
Expand All @@ -568,7 +578,8 @@ public Quadratic(int minCapacity, int maxCapacity, int maxBucketSize, long maxHe
maxHeapMemory,
maxDirectMemory,
c -> 32 - Integer.numberOfLeadingZeros(c - 1),
i -> 1 << i
i -> 1 << i,
weakPool
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
private final AutoLock lock = new AutoLock();
private final AtomicInteger nextIndex;
private final ToIntFunction<P> maxMultiplex;
private final boolean weakEntries;
private volatile boolean terminated;

/**
Expand Down Expand Up @@ -107,6 +108,11 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache)
* @param maxMultiplex a function that given the pooled object returns the max multiplex count
*/
public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToIntFunction<P> maxMultiplex)
{
this(strategyType, maxSize, cache, maxMultiplex, false);
}

public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToIntFunction<P> maxMultiplex, boolean weakEntries)
{
if (maxSize > OPTIMAL_MAX_SIZE && LOG.isDebugEnabled())
LOG.debug("{} configured with max size {} which is above the recommended value {}", getClass().getSimpleName(), maxSize, OPTIMAL_MAX_SIZE);
Expand All @@ -115,6 +121,7 @@ public ConcurrentPool(StrategyType strategyType, int maxSize, boolean cache, ToI
this.cache = cache ? new ThreadLocal<>() : null;
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
this.maxMultiplex = Objects.requireNonNull(maxMultiplex);
this.weakEntries = weakEntries;
}

public int getTerminatedCount()
Expand Down Expand Up @@ -181,9 +188,8 @@ public Entry<P> acquire()
{
if (entry.isGarbage())
{
if (LOG.isDebugEnabled())
LOG.debug("removing garbage cached entry {} for {}", entry, this);
cache.remove();
LOG.warn("Leaked cached entry {} detected for {}", entry, this);
}
else if (entry.tryAcquire())
{
Expand All @@ -205,10 +211,9 @@ else if (entry.tryAcquire())
{
if (entry.isGarbage())
{
if (LOG.isDebugEnabled())
LOG.debug("removing garbage entry {} for {}", entry, this);
entries.remove(index);
tries++;
LOG.warn("Leaked entry {} detected for {}", entry, this);
}
else if (entry.tryAcquire())
{
Expand All @@ -228,7 +233,7 @@ else if (entry.tryAcquire())
if (size == 0)
break;
}
if (++index == size)
if (++index >= size)
index = 0;
}

Expand Down Expand Up @@ -349,7 +354,7 @@ public int sweep()
if (ce.isGarbage())
{
toRemove.add(entry);
LOG.warn("Leaked pooled object detected: {}", ce);
LOG.warn("Leaked pooled object detected {} for {}", ce, this);
}
}
entries.removeAll(toRemove); // hard remove, these entries are garbage
Expand All @@ -359,13 +364,14 @@ public int sweep()
@Override
public String toString()
{
return String.format("%s@%x[inUse=%d,size=%d,max=%d,terminated=%b]",
return String.format("%s@%x[inUse=%d,size=%d,max=%d,terminated=%b,weak=%b]",
getClass().getSimpleName(),
hashCode(),
getInUseCount(),
size(),
getMaxSize(),
isTerminated());
isTerminated(),
weakEntries);
}

/**
Expand Down Expand Up @@ -453,7 +459,7 @@ public boolean enable(E pooled, boolean acquire)

if (tryEnable(acquire))
{
if (acquire)
if (pool.weakEntries && acquire)
this.released = null;
if (LOG.isDebugEnabled())
LOG.debug("enabled {} for {}", this, pool);
Expand All @@ -471,7 +477,8 @@ public boolean enable(E pooled, boolean acquire)
public E getPooled()
{
E ref = pooled == null ? null : pooled.get();
this.released = null;
if (pool.weakEntries)
this.released = null;
return ref;
}

Expand Down Expand Up @@ -556,7 +563,7 @@ private boolean tryRelease()
int newMultiplexCount = multiplexCount - 1;
if (state.compareAndSet(encoded, 0, newMultiplexCount))
{
if (newMultiplexCount == 0)
if (pool.weakEntries && newMultiplexCount == 0)
this.released = pooled.get();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ public void testRandomStrategy()
@Test
public void testLeakDetection()
{
ConcurrentPool<AtomicInteger> pool = new ConcurrentPool<>(FIRST, 4);
ConcurrentPool<AtomicInteger> pool = new ConcurrentPool<>(FIRST, 4, false, e -> 1, true);

// not keeping a hard ref onto the entry that is enabled & not acquired makes it survive
pool.reserve().enable(new AtomicInteger(1), false);
Expand Down

0 comments on commit df6ff99

Please sign in to comment.