Skip to content

Commit

Permalink
Rework drainLoop so that extra pending acquires are better culled (#122)
Browse files Browse the repository at this point in the history
This commit reworks the whole SimpleDequePool#drainLoop and adds some
WIP-protected logic to `pendingOffer`.

By evaluating the `PENDING_COUNT` and  queue size behind the guard of
the WIP, we're better at catching the pending borrowers that should
be failed fast due to going over the `maxPending` limit, while reliably
taking into account the available permits (and the borrowers that have
effectively been served via newly permitted allocations).

The `AbstractPool.PENDING_COUNT` field is now incremented immediately
upon `pendingOffer`, but is decremented back if the `pendingOffer` can
win the WIP and start a simplified drain loop in which we detect that
a resource is or will be available for that pending.

This refactor also ensures that `drain` is called only once in the
scenario where N allocations take place (warm up case).

Finally, the increased robustness of the solution is covered by a
stress test using JCStress.

Fixes #121.
  • Loading branch information
simonbasle authored Mar 5, 2021
1 parent 2289920 commit af38f66
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 126 deletions.
84 changes: 84 additions & 0 deletions src/jcstress/java/reactor/pool/SimpleDequePoolStressTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,88 @@ public void arbiter(II_Result r) {
}
}

@JCStressTest
@Outcome(id = "1, 3, 1, 1", expect = ACCEPTABLE, desc = "1 obtained, 3 rejected, 1 pending")
@Outcome(id = "1, 4, 0, 0", expect = ACCEPTABLE_INTERESTING, desc = "1 obtained, all overeagerly rejected")
@Outcome(id = "1, 0, 4, 4", expect = FORBIDDEN, desc = "1 obtained and all others pending")
@Outcome(id = "1, 1, 3, 3", expect = FORBIDDEN, desc = "1 obtained but 3 pending")
@Outcome(id = "1, 2, 2, 2", expect = FORBIDDEN, desc = "1 obtained but 2 pending")
@State
public static class MaxPendingAcquireHammeredWithOnePermit {

final AtomicBoolean firstResourceCreated = new AtomicBoolean();

final AtomicInteger obtained = new AtomicInteger();
final AtomicInteger rejected = new AtomicInteger();

final SimpleDequePool<AtomicInteger> pool = PoolBuilder
.from(Mono.fromCallable(() -> new AtomicInteger(firstResourceCreated.getAndSet(true) ? 2 : 1)))
.sizeBetween(0, 1)
.maxPendingAcquire(1)
.build(conf -> new SimpleDequePool<>(conf, true));

@Actor
public void acquisition1() {
pool.acquire().subscribe(
v -> obtained.incrementAndGet(),
e -> {
if (e instanceof PoolAcquirePendingLimitException) {
rejected.incrementAndGet();
}
});
}

@Actor
public void acquisition2() {
pool.acquire().subscribe(
v -> obtained.incrementAndGet(),
e -> {
if (e instanceof PoolAcquirePendingLimitException) {
rejected.incrementAndGet();
}
});
}

@Actor
public void acquisition3() {
pool.acquire().subscribe(
v -> obtained.incrementAndGet(),
e -> {
if (e instanceof PoolAcquirePendingLimitException) {
rejected.incrementAndGet();
}
});
}

@Actor
public void acquisition4() {
pool.acquire().subscribe(
v -> obtained.incrementAndGet(),
e -> {
if (e instanceof PoolAcquirePendingLimitException) {
rejected.incrementAndGet();
}
});
}

@Actor
public void acquisition5() {
pool.acquire().subscribe(
v -> obtained.incrementAndGet(),
e -> {
if (e instanceof PoolAcquirePendingLimitException) {
rejected.incrementAndGet();
}
});
}

@Arbiter
public void arbiter(IIII_Result r) {
r.r1 = obtained.get();
r.r2 = rejected.get();
r.r3 = pool.pending.size();
r.r4 = SimpleDequePool.PENDING_COUNT.get(pool);
}
}

}
Loading

0 comments on commit af38f66

Please sign in to comment.