-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Too much resources may be created when using lazy-warmup #173
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,6 +99,9 @@ public class SimpleDequePool<POOLABLE> extends AbstractPool<POOLABLE> { | |
|
||
Disposable evictionTask; | ||
|
||
// Flag used to avoid creating resources while warmup is in progress | ||
volatile boolean warmupInProgress = false; | ||
|
||
SimpleDequePool(PoolConfig<POOLABLE> poolConfig) { | ||
super(poolConfig, Loggers.getLogger(SimpleDequePool.class)); | ||
this.idleResourceLeastRecentlyUsed = poolConfig.reuseIdleResourcesInLruOrder(); | ||
|
@@ -372,89 +375,97 @@ private void drainLoop() { | |
.schedule(() -> borrower.deliver(slot)); | ||
} | ||
else { | ||
/*==================================* | ||
* One Borrower, but NO RESOURCE... * | ||
*==================================*/ | ||
// Can we allocate more? | ||
int permits = poolConfig.allocationStrategy().getPermits(1); | ||
if (permits <= 0) { | ||
/*==========================* | ||
* ... and CANNOT ALLOCATE => MAX PENDING ENFORCING * | ||
*==========================*/ | ||
//we don't have idle resource nor allocation permit | ||
//we look at the borrowers and cull those that are above the maxPending limit (using pollLast!) | ||
if (maxPending >= 0) { | ||
borrowersCount = pendingSize; | ||
int toCull = borrowersCount - maxPending; | ||
for (int i = 0; i < toCull; i++) { | ||
Borrower<POOLABLE> extraneous = pendingPoll(borrowers); | ||
if (extraneous != null) { | ||
//fail fast. differentiate slightly special case of maxPending == 0 | ||
if (maxPending == 0) { | ||
extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit")); | ||
} | ||
else { | ||
extraneous.fail(new PoolAcquirePendingLimitException(maxPending)); | ||
/*=========================================================================================================* | ||
* One Borrower, but no idle resource, or some resources are still warming up | ||
*=========================================================================================================*/ | ||
|
||
if(! warmupInProgress) { | ||
// Warmup not in progress, can we allocate more ? | ||
int permits = poolConfig.allocationStrategy().getPermits(1); | ||
if (permits <= 0) { | ||
/*==========================* | ||
* ... and CANNOT ALLOCATE => MAX PENDING ENFORCING * | ||
*==========================*/ | ||
//we don't have idle resource nor allocation permit | ||
//we look at the borrowers and cull those that are above the maxPending limit (using pollLast!) | ||
if (maxPending >= 0) { | ||
borrowersCount = pendingSize; | ||
int toCull = borrowersCount - maxPending; | ||
for (int i = 0; i < toCull; i++) { | ||
Borrower<POOLABLE> extraneous = pendingPoll(borrowers); | ||
if (extraneous != null) { | ||
//fail fast. differentiate slightly special case of maxPending == 0 | ||
if (maxPending == 0) { | ||
extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit")); | ||
} | ||
else { | ||
extraneous.fail(new PoolAcquirePendingLimitException(maxPending)); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
else { | ||
/*=======================* | ||
* ... and CAN ALLOCATE => Subscribe to allocator + Warmup * | ||
*=======================*/ | ||
Borrower<POOLABLE> borrower = pendingPoll(borrowers); | ||
if (borrower == null) { | ||
continue; //we expect to detect pool is shut down in next round | ||
} | ||
if (isDisposed()) { | ||
borrower.fail(new PoolShutdownException()); | ||
return; | ||
} | ||
borrower.stopPendingCountdown(); | ||
long start = clock.millis(); | ||
Mono<POOLABLE> allocator = allocatorWithScheduler(); | ||
|
||
Mono<POOLABLE> primary = allocator.doOnEach(sig -> { | ||
if (sig.isOnNext()) { | ||
POOLABLE newInstance = sig.get(); | ||
assert newInstance != null; | ||
ACQUIRED.incrementAndGet(this); | ||
metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); | ||
borrower.deliver(createSlot(newInstance)); | ||
else { | ||
/*=======================* | ||
* ... and CAN ALLOCATE => Subscribe to allocator + Warmup * | ||
*=======================*/ | ||
Borrower<POOLABLE> borrower = pendingPoll(borrowers); | ||
if (borrower == null) { | ||
continue; //we expect to detect pool is shut down in next round | ||
} | ||
else if (sig.isOnError()) { | ||
Throwable error = sig.getThrowable(); | ||
assert error != null; | ||
metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start); | ||
poolConfig.allocationStrategy() | ||
.returnPermits(1); | ||
borrower.fail(error); | ||
if (isDisposed()) { | ||
borrower.fail(new PoolShutdownException()); | ||
return; | ||
} | ||
}).contextWrite(borrower.currentContext()); | ||
borrower.stopPendingCountdown(); | ||
long start = clock.millis(); | ||
Mono<POOLABLE> allocator = allocatorWithScheduler(); | ||
|
||
Mono<POOLABLE> primary = allocator.doOnEach(sig -> { | ||
if (sig.isOnNext()) { | ||
POOLABLE newInstance = sig.get(); | ||
assert newInstance != null; | ||
ACQUIRED.incrementAndGet(this); | ||
metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); | ||
borrower.deliver(createSlot(newInstance)); | ||
} | ||
else if (sig.isOnError()) { | ||
Throwable error = sig.getThrowable(); | ||
assert error != null; | ||
metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start); | ||
poolConfig.allocationStrategy() | ||
.returnPermits(1); | ||
borrower.fail(error); | ||
} | ||
}).contextWrite(borrower.currentContext()); | ||
|
||
if (permits == 1) { | ||
//subscribe to the primary, which will directly feed to the borrower | ||
primary.subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); | ||
} | ||
else { | ||
/*=============================================* | ||
* (warm up in sequence to primary allocation) * | ||
*=============================================*/ | ||
int toWarmup = permits - 1; | ||
logger.debug("should warm up {} extra resources", toWarmup); | ||
|
||
final long startWarmupIteration = clock.millis(); | ||
// flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency | ||
// can be controlled from configuration | ||
final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); | ||
Flux.range(1, toWarmup) | ||
.map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain())) | ||
.startWith(primary.doOnSuccess(__ -> drain()).then()) | ||
.flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified | ||
.onErrorResume(e -> Mono.empty()) | ||
.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); | ||
if (permits == 1) { | ||
//subscribe to the primary, which will directly feed to the borrower | ||
primary.subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); | ||
} | ||
else { | ||
/*=============================================* | ||
* (warm up in sequence to primary allocation) * | ||
*=============================================*/ | ||
int toWarmup = permits - 1; | ||
logger.debug("should warm up {} extra resources", toWarmup); | ||
|
||
final long startWarmupIteration = clock.millis(); | ||
// flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency | ||
// can be controlled from configuration | ||
final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); | ||
warmupInProgress = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This flag does not guarantee exclusivity IMO. A stress test could be added to validate my accusation and I worry it would hold true :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also think so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not see for the moment the concurrency issue, but I will investigate this, and will create a stress test. |
||
Flux.range(1, toWarmup) | ||
.map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain())) | ||
.startWith(primary.doOnSuccess(__ -> drain()).then()) | ||
.flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified | ||
.onErrorResume(e -> Mono.empty()) | ||
.subscribe(aVoid -> { | ||
}, alreadyPropagatedOrLogged -> drain(), () -> { | ||
warmupInProgress = false; | ||
drain(); | ||
}); | ||
Comment on lines
+464
to
+467
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch, the #175 is partially addressing this problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warmpupInProgress should be indeed reset even if there is an error. will think about this. |
||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to prevent the
drainLoop()
being called concurrently instead? Let's say that > 1 concurrent acquires happen. Both increment the demand and add themselves to theBorrower
queue. However, just one enters thedrainLoop
and delivers all necessary resources. A loop can check if more demand has been added before it exits in order to repeat the procedure and ensure every acquire is satisfied in the end. But that would guarantee no simultaneous warmup happens.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify why drainLoop can be executed concurrently: both evictInBackground and pendingOffer methods have a check like this:
And it doesn't exclude other threads from entering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I'm correct
drainLoop
can't be run concurrently, it is protected using the WIP design pattern.However, I will need to give more thinking about your comment, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a typical WIP pattern that is in use here. The above condition should be as the one in the
drain
method:but it's not the case for
evictInBackground
andpendingOffer
.