Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pools directly configurable with long size and disable leak detection by default #197

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions api-changes.json
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,36 @@
"code": "java.method.addedToInterface",
"new": "method void java.util.concurrent.Flow.Publisher<T>::subscribe(java.util.concurrent.Flow.Subscriber<? super T>) @ stormpot.Completion",
"justification": "Major version change"
},

{
"ignore": true,
"code": "java.method.returnTypeChanged",
"old": "method int stormpot.PoolBuilder<T extends stormpot.Poolable>::getSize()",
"new": "method long stormpot.PoolBuilder<T extends stormpot.Poolable>::getSize()",
"justification": "Major version change"
},
{
"ignore": true,
"code": "java.method.nowAbstract",
"old": "method int stormpot.PoolBuilder<T extends stormpot.Poolable>::getSize()",
"new": "method long stormpot.PoolBuilder<T extends stormpot.Poolable>::getSize()",
"justification": "Major version change"
},
{
"ignore": true,
"code": "java.method.parameterTypeChanged",
"old": "parameter stormpot.PoolBuilder<T> stormpot.PoolBuilder<T extends stormpot.Poolable>::setSize(===int===)",
"new": "parameter stormpot.PoolBuilder<T> stormpot.PoolBuilder<T extends stormpot.Poolable>::setSize(===long===)",
"parameterIndex": "0",
"justification": "Major version change"
},
{
"ignore": true,
"code": "java.method.nowAbstract",
"old": "method stormpot.PoolBuilder<T> stormpot.PoolBuilder<T extends stormpot.Poolable>::setSize(int)",
"new": "method stormpot.PoolBuilder<T> stormpot.PoolBuilder<T extends stormpot.Poolable>::setSize(long)",
"justification": "Major version change"
}
]
}
Expand Down
7 changes: 3 additions & 4 deletions docs/config.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ The thread factory setting is not used by pools that operate in the _inline_ mod

=== Precise Leak Detection

Stormpot has precise leak detection enabled by default (except in the direct pool mode) because the CPU overhead is very low.
There is, however, a bit of memory overhead.
Therefor, it may make sense to disable this in use cases where memory is very constrained, and/or the pool contains a very large number of objects.
If you pool upwards a hundred thousand objects or more, you might want to disable it for performance reasons.
Stormpot has a precise leak detection feature.
It is disabled by default because it has some memory overhead and a bit of CPU overhead.
It can be enabled when you suspect your code might be leaking objects.

The precise leak detection feature lets Stormpot keep track of when objects that were meant to circulate in the pool, suddenly leak out and never come back.
The number of leaks detected is reported via the `getLeakedObjectsCount` method of the `ManagedPool` interface.
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/stormpot/PoolBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public sealed interface PoolBuilder<T extends Poolable>
* This means that a pool of size 1, whose single object have expired, will
* deallocate that one object before allocating a replacement.
* <p>
* The size must be at least one, or an {@link IllegalArgumentException} will
* The size must be at least zero, or an {@link IllegalArgumentException} will
* be thrown when building the pool.
* <p>
* Note that the pool size can be modified after the pool has been built, by
Expand All @@ -62,13 +62,13 @@ public sealed interface PoolBuilder<T extends Poolable>
* @param size The target pool size. Must be at least 0.
* @return This {@code PoolBuilder} instance.
*/
PoolBuilder<T> setSize(int size);
PoolBuilder<T> setSize(long size);

/**
* Get the currently configured size. The default is 10.
* @return The configured pool size.
*/
int getSize();
long getSize();

/**
* Set the {@link Allocator} or {@link Reallocator} to use for the pools we
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/stormpot/internal/BAllocThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -49,7 +49,7 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
private final PreciseLeakDetector leakDetector;
private final StackCompletion shutdownCompletion;
private final BlockingQueue<BSlot<T>> dead;
private final AtomicInteger poisonedSlots;
private final AtomicLong poisonedSlots;
private final long defaultDeadPollTimeout;
private final boolean optimizeForMemory;

Expand Down Expand Up @@ -84,7 +84,7 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
new PreciseLeakDetector() : null;
this.shutdownCompletion = new StackCompletion();
this.dead = new LinkedTransferQueue<>();
this.poisonedSlots = new AtomicInteger();
this.poisonedSlots = new AtomicLong();
this.defaultDeadPollTimeout = builder.getBackgroundExpirationCheckDelay();
this.optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
this.size = 0;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/stormpot/internal/BSlot.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.lang.ref.Reference;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class is very sensitive to the memory layout, so be careful to measure
Expand Down Expand Up @@ -53,14 +53,14 @@ public class BSlot<T extends Poolable> implements Slot, SlotInfo<T> {
private volatile int state;

final BlockingQueue<BSlot<T>> live;
final AtomicInteger poisonedSlots;
final AtomicLong poisonedSlots;
long stamp;
long createdNanos;
public T obj;
public Exception poison;
public Reference<Object> leakCheck; // Used by PreciseLeakDetector

public BSlot(BlockingQueue<BSlot<T>> live, AtomicInteger poisonedSlots) {
public BSlot(BlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
// Volatile write in the constructor: This object must be safely published,
// so that we are sure that the volatile write happens-before other
// threads observe the pointer to this object.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/stormpot/internal/BSlotPadded.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import stormpot.Poolable;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

@SuppressWarnings("unused")
public class BSlotPadded<T extends Poolable> extends BSlot<T> {
Expand All @@ -32,7 +32,7 @@ public class BSlotPadded<T extends Poolable> extends BSlot<T> {
private long p07;
private long p08;

public BSlotPadded(BlockingQueue<BSlot<T>> live, AtomicInteger poisonedSlots) {
public BSlotPadded(BlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
super(live, poisonedSlots);
}
}
12 changes: 6 additions & 6 deletions src/main/java/stormpot/internal/DirectAllocationController.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public final class DirectAllocationController<T extends Poolable> extends AllocationController<T> {
private final LinkedTransferQueue<BSlot<T>> live;
private final RefillPile<T> disregardPile;
private final BSlot<T> poisonPill;
private final int size;
private final AtomicInteger shutdownState;
private final AtomicInteger poisonedSlots;
private final long size;
private final AtomicLong shutdownState;
private final AtomicLong poisonedSlots;

DirectAllocationController(
LinkedTransferQueue<BSlot<T>> live,
Expand All @@ -41,7 +41,7 @@ public final class DirectAllocationController<T extends Poolable> extends Alloca
this.disregardPile = disregardPile;
this.poisonPill = poisonPill;
this.size = builder.getSize();
poisonedSlots = new AtomicInteger();
poisonedSlots = new AtomicLong();
Allocator<T> allocator = builder.getAllocator();
boolean optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
for (int i = 0; i < size; i++) {
Expand All @@ -56,7 +56,7 @@ public final class DirectAllocationController<T extends Poolable> extends Alloca
}
live.offer(slot);
}
shutdownState = new AtomicInteger(size);
shutdownState = new AtomicLong(size);
}

@Override
Expand Down
78 changes: 65 additions & 13 deletions src/main/java/stormpot/internal/IdentityHashSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,53 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.ToIntFunction;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.util.Objects.requireNonNull;

/**
* Like {@link java.util.IdentityHashMap}, but as a Set with no overhead for storing values.
*/
public class IdentityHashSet implements Iterable<Object> {
public final class IdentityHashSet implements Iterable<Object> {
// Implementation is based on open-addressing 2-choice hashing,
// with 8-element buckets, and an 8-element stash for unresolvable collisions.

private static final int BLOCK_LEN = 8;
private static final ToIntFunction<Object> DEFAULT_HASH = System::identityHashCode;

private final Object[] stash;
private final int subtreeMix;
private final ToIntFunction<Object> hashOf;
private int stashSize;
private int sectionLength;
private int sectionBlocks;
private Object[] table;
private boolean treeMode;

public IdentityHashSet() {
this(DEFAULT_HASH);
}

public IdentityHashSet(ToIntFunction<Object> hashOf) {
stash = new Object[BLOCK_LEN];
sectionBlocks = 8;
sectionLength = BLOCK_LEN * sectionBlocks;
table = new Object[sectionLength * 2];
subtreeMix = ThreadLocalRandom.current().nextInt();
this.hashOf = requireNonNull(hashOf, "hashOf");
}

public void add(Object obj) {
for (;;) {
if (treeMode) {
subtree(obj).add(obj);
return;
}
int mask = sectionBlocks - 1;
int ihash = hashOf(obj);
int ihash = hashOf.applyAsInt(obj);
int h1 = BLOCK_LEN * (ihash & mask);
int h2 = sectionLength + (BLOCK_LEN * (fmix32(ihash) & mask));
for (int i = 0; i < stashSize; i++) {
Expand Down Expand Up @@ -85,8 +105,12 @@ public void add(Object obj) {
}

public void remove(Object obj) {
if (treeMode) {
subtree(obj).remove(obj);
return;
}
int ihash = hashOf.applyAsInt(obj);
int mask = sectionBlocks - 1;
int ihash = hashOf(obj);
int h1 = BLOCK_LEN * (ihash & mask);
int h2 = sectionLength + (BLOCK_LEN * (fmix32(ihash) & mask));
for (int i = 0; i < stashSize; i++) {
Expand Down Expand Up @@ -115,16 +139,27 @@ public void remove(Object obj) {
}
}

private IdentityHashSet subtree(Object obj) {
int ihash = hashOf.applyAsInt(obj);
ihash = fmix32(ihash ^ subtreeMix) & sectionBlocks;
return ((IdentityHashSet) table[ihash]);
}

@Override
public Iterator<Object> iterator() {
// This is only used by the tests.
return Stream.concat(Stream.of(stash), Stream.of(table))
.filter(Objects::nonNull)
.iterator();
}

protected int hashOf(Object obj) {
return System.identityHashCode(obj);
if (treeMode) {
return Stream.of(table)
.flatMap(obj -> {
IdentityHashSet set = (IdentityHashSet) obj;
return StreamSupport.stream(set.spliterator(), false);
})
.iterator();
} else {
return Stream.concat(Stream.of(stash), Stream.of(table))
.filter(Objects::nonNull)
.iterator();
}
}

private static int fmix32(int key) {
Expand All @@ -138,13 +173,30 @@ private static int fmix32(int key) {
}

private void resize() {
int newSectionLength = sectionLength * 2;
int newTableLength = newSectionLength * 2;
int newSectionBlocks = sectionBlocks * 2;
Object[] stashCopy = Arrays.copyOfRange(stash, 0, stashSize);
Arrays.fill(stash, null);
stashSize = 0;
Object[] oldTable = table;
sectionLength *= 2;
sectionBlocks *= 2;
table = new Object[sectionLength * 2];

if (newTableLength >= 2_097_152) {
treeMode = true;
int len = 16384;
Object[] newTable = new Object[len];
for (int i = 0; i < len; i++) {
newTable[i] = new IdentityHashSet(hashOf);
}
table = newTable;
sectionLength = len;
sectionBlocks = len - 1;
} else {
sectionLength = newSectionLength;
sectionBlocks = newSectionBlocks;
table = new Object[newTableLength];
}

for (Object obj : stashCopy) {
add(obj);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public final class InlineAllocationController<T extends Poolable> extends AllocationController<T> {
private static final VarHandle SIZE;
Expand All @@ -54,7 +54,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
private final RefillPile<T> newAllocations;
private final BSlot<T> poisonPill;
private final MetricsRecorder metricsRecorder;
private final AtomicInteger poisonedSlots;
private final AtomicLong poisonedSlots;
private final PreciseLeakDetector leakDetector;
private final Reallocator<T> allocator;
private final boolean optimizeForMemory;
Expand All @@ -79,7 +79,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
this.newAllocations = newAllocations;
this.poisonPill = poisonPill;
this.metricsRecorder = builder.getMetricsRecorder();
poisonedSlots = new AtomicInteger();
poisonedSlots = new AtomicLong();
allocator = builder.getAdaptedReallocator();
optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
leakDetector = builder.isPreciseLeakDetectionEnabled() ?
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/stormpot/internal/PoolBuilderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public final class PoolBuilderImpl<T extends Poolable> implements PoolBuilder<T>
.factory();

public static final Map<AllocationProcessMode, PoolBuilderDefaults> DEFAULTS = Map.of(
THREADED, new PoolBuilderDefaults(after(8, 10, MINUTES), THREAD_FACTORY, true, true, 1000, true),
INLINE, new PoolBuilderDefaults(after(8, 10, MINUTES), THREAD_FACTORY, true, false, 0, true),
THREADED, new PoolBuilderDefaults(after(8, 10, MINUTES), THREAD_FACTORY, false, true, 1000, true),
INLINE, new PoolBuilderDefaults(after(8, 10, MINUTES), THREAD_FACTORY, false, false, 0, true),
DIRECT, new PoolBuilderDefaults(never(), THREAD_FACTORY, false, false, 0, true)
);

Expand All @@ -55,7 +55,7 @@ DIRECT, new PoolBuilderPermissions(false, true, false, false, false)
private final AllocationProcess allocationProcess;
private final PoolBuilderPermissions permissions;
private Allocator<T> allocator;
private int size = 10;
private long size = 10;
private Expiration<? super T> expiration;
private MetricsRecorder metricsRecorder;
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -83,7 +83,7 @@ public PoolBuilderImpl(AllocationProcess allocationProcess, Allocator<T> allocat
}

@Override
public synchronized PoolBuilder<T> setSize(int size) {
public synchronized PoolBuilder<T> setSize(long size) {
checkPermission(permissions.setSize(), "size");
if (size < 0) {
throw new IllegalArgumentException("Size must be at least 0, but was " + size + ".");
Expand All @@ -93,7 +93,7 @@ public synchronized PoolBuilder<T> setSize(int size) {
}

@Override
public synchronized int getSize() {
public synchronized long getSize() {
return size;
}

Expand Down
Loading