Skip to content

Commit

Permalink
Simplify BlazePoolVirtualThreadSafeTap (#184)
Browse files Browse the repository at this point in the history
And put the delayed creation in BlazePool.getVirtualThreadSafeTap. This
also means we no longer leak the `this` reference in the BlazePool
constructor.
  • Loading branch information
chrisvest authored Oct 1, 2024
2 parents 6049ba9 + 6412ccd commit f0d836f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 68 deletions.
21 changes: 18 additions & 3 deletions src/main/java/stormpot/internal/BlazePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@
*/
public final class BlazePool<T extends Poolable> implements Pool<T>, ManagedPool {
private static final VarHandle SHUTDOWN;
private static final VarHandle VIRT_THR_TAP;

static {
try {
MethodHandles.Lookup lookup = MethodHandles.lookup();
SHUTDOWN = lookup.findVarHandle(BlazePool.class, "shutdown", boolean.class)
.withInvokeExactBehavior();
VIRT_THR_TAP = lookup.findVarHandle(BlazePool.class, "virtualThreadSafeTap",
BlazePoolVirtualThreadSafeTap.class).withInvokeExactBehavior();
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new AssertionError("Failed to initialise the shutdown VarHandle.", e);
}
Expand All @@ -74,7 +77,8 @@ public final class BlazePool<T extends Poolable> implements Pool<T>, ManagedPool
private final ThreadLocal<BSlotCache<T>> tlr;
private final Expiration<? super T> deallocRule;
private final MetricsRecorder metricsRecorder;
private final BlazePoolVirtualThreadSafeTap<T> virtualThreadSafeTap;
@SuppressWarnings("unused") // Assigned via VarHandle
private volatile BlazePoolVirtualThreadSafeTap<T> virtualThreadSafeTap;

/**
* A special slot used to signal that the pool has been shut down.
Expand All @@ -99,7 +103,6 @@ public BlazePool(PoolBuilderImpl<T> builder, AllocationProcess factory) {
metricsRecorder = builder.getMetricsRecorder();
allocator = factory.buildAllocationController(
live, disregardPile, newAllocations, builder, poisonPill);
virtualThreadSafeTap = new BlazePoolVirtualThreadSafeTap<>(this);
}

@Override
Expand Down Expand Up @@ -332,9 +335,21 @@ public PoolTap<T> getThreadSafeTap() {
return new BlazePoolThreadSafeTap<>(this);
}

@SuppressWarnings("unchecked")
@Override
public PoolTap<T> getVirtualThreadSafeTap() {
return virtualThreadSafeTap;
var tap = virtualThreadSafeTap;
if (tap == null) {
tap = new BlazePoolVirtualThreadSafeTap<>(this);
var tmp = (BlazePoolVirtualThreadSafeTap<T>) VIRT_THR_TAP.compareAndExchange(
this,
(BlazePoolVirtualThreadSafeTap<T>) null,
tap);
if (tmp != null) {
tap = tmp;
}
}
return tap;
}

@Override
Expand Down
76 changes: 12 additions & 64 deletions src/main/java/stormpot/internal/BlazePoolVirtualThreadSafeTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import stormpot.Poolable;
import stormpot.Timeout;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

/**
* The claim method in this pool tap offers similar thread-safety and
* performance to the default thread-safe pool tap, but without the use of
Expand All @@ -34,77 +31,39 @@
* @param <T> The poolable type.
*/
public final class BlazePoolVirtualThreadSafeTap<T extends Poolable> implements PoolTap<T> {
private static final VarHandle ARRAY_VH;
private static final VarHandle STRIPES_VH;

static {
ARRAY_VH = MethodHandles.arrayElementVarHandle(BSlotCache[].class)
.withInvokeExactBehavior();
assert ARRAY_VH.hasInvokeExactBehavior();
try {
STRIPES_VH = MethodHandles.lookup().findVarHandle(
BlazePoolVirtualThreadSafeTap.class, "stripes", BSlotCache[].class)
.withInvokeExactBehavior();
assert STRIPES_VH.hasInvokeExactBehavior();
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}

private static final int ARRAY_SIZE = 0x80; // 128
private static final int ARRAY_MASK = 0x7F;

private final BlazePool<T> pool;
@SuppressWarnings("unused") // Accessed via VarHandle
private BSlotCache<T>[] stripes;
private final BSlotCache<T>[] stripes;

@SuppressWarnings("unchecked")
BlazePoolVirtualThreadSafeTap(BlazePool<T> pool) {
this.pool = pool;
stripes = new BSlotCache[ARRAY_SIZE];
for (int i = 0; i < ARRAY_SIZE; i++) {
stripes[i] = new BSlotCache<>();
}
}

@Override
public T claim(Timeout timeout) throws PoolException, InterruptedException {
BSlotCache<T>[] stripes = getStripes();
long threadId = Thread.currentThread().threadId();
int threadId = (int) Thread.currentThread().threadId();
T obj = tryTlrClaim(threadId, stripes);
if (obj != null) {
return obj;
}
int index = (int) (threadId & ARRAY_MASK);
int index = threadId & ARRAY_MASK;
BSlotCache<T> cache = stripes[index];
assert cache != null;
return pool.claim(timeout, cache);
}

private BSlotCache<T>[] getStripes() {
BSlotCache<T>[] stripes = this.stripes;
if (stripes == null) {
stripes = createStripes();
}
return stripes;
}

@SuppressWarnings("unchecked")
private BSlotCache<T>[] createStripes() {
BSlotCache<T>[] stripes, witness;
stripes = (BSlotCache<T>[]) STRIPES_VH.getVolatile(this);
if (stripes == null) {
stripes = new BSlotCache[ARRAY_SIZE];
witness = (BSlotCache<T>[]) STRIPES_VH.compareAndExchange(this, (BSlotCache<T>[]) null, stripes);
if (witness != null) {
stripes = witness;
}
}
return stripes;
}

private T tryTlrClaim(long threadId, BSlotCache<T>[] stripes) {
private T tryTlrClaim(int threadId, BSlotCache<T>[] stripes) {
for (int i = 0; i < 4; i++) {
int index = (int) (threadId + i & ARRAY_MASK);
int index = threadId + i & ARRAY_MASK;
BSlotCache<T> cache = stripes[index];
if (cache == null) {
cache = createCacheSlot(stripes, index);
}
T obj = pool.tlrClaim(cache);
if (obj != null) {
return obj;
Expand All @@ -113,25 +72,14 @@ private T tryTlrClaim(long threadId, BSlotCache<T>[] stripes) {
return null;
}

@SuppressWarnings("unchecked")
private static <T extends Poolable> BSlotCache<T> createCacheSlot(BSlotCache<T>[] stripes, int index) {
BSlotCache<T> cache = new BSlotCache<>();
BSlotCache<T> tmp = (BSlotCache<T>) ARRAY_VH.compareAndExchange(stripes, index, (BSlotCache<T>) null, cache);
if (tmp != null) {
cache = tmp;
}
return cache;
}

@Override
public T tryClaim() throws PoolException {
BSlotCache<T>[] stripes = getStripes();
long threadId = Thread.currentThread().threadId();
int threadId = (int) Thread.currentThread().threadId();
T obj = tryTlrClaim(threadId, stripes);
if (obj != null) {
return obj;
}
int index = (int) (threadId & ARRAY_MASK);
int index = threadId & ARRAY_MASK;
BSlotCache<T> cache = stripes[index];
assert cache != null;
return pool.tryClaim(cache);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#Temporary solution. May be removed when this issue is solved: https://github.com/oracle/graal/issues/3028
Args = --initialize-at-build-time=stormpot.internal.PaddedAtomicInteger,stormpot.internal.BlazePoolVirtualThreadSafeTap,stormpot.internal.BlazePool
Args = --initialize-at-build-time=stormpot.internal.PaddedAtomicInteger,stormpot.internal.BlazePool

0 comments on commit f0d836f

Please sign in to comment.