Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Too much resources may be created when using lazy-warmup #173

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 87 additions & 76 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public class SimpleDequePool<POOLABLE> extends AbstractPool<POOLABLE> {

Disposable evictionTask;

// Flag used to avoid creating resources while warmup is in progress
volatile boolean warmupInProgress = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to prevent the drainLoop() being called concurrently instead? Let's say that > 1 concurrent acquires happen. Both increment the demand and add themselves to the Borrower queue. However, just one enters the drainLoop and delivers all necessary resources. A loop can check if more demand has been added before it exits in order to repeat the procedure and ensure every acquire is satisfied in the end. But that would guarantee no simultaneous warmup happens.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify why drainLoop can be executed concurrently: both evictInBackground and pendingOffer methods have a check like this:

			if (WIP.decrementAndGet(this) > 0) {
				drainLoop();
			}

And it doesn't exclude other threads from entering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I'm correct drainLoop can't be run concurrently, it is protected using the WIP design pattern.
However, I will need to give more thinking about your comment, thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a typical WIP pattern that is in use here. The above condition should be as the one in the drain method:

void drain() {
	if (WIP.getAndIncrement(this) == 0) {
		drainLoop();
	}
}

but it's not the case for evictInBackground and pendingOffer.


SimpleDequePool(PoolConfig<POOLABLE> poolConfig) {
super(poolConfig, Loggers.getLogger(SimpleDequePool.class));
this.idleResourceLeastRecentlyUsed = poolConfig.reuseIdleResourcesInLruOrder();
Expand Down Expand Up @@ -372,89 +375,97 @@ private void drainLoop() {
.schedule(() -> borrower.deliver(slot));
}
else {
/*==================================*
* One Borrower, but NO RESOURCE... *
*==================================*/
// Can we allocate more?
int permits = poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
/*==========================*
* ... and CANNOT ALLOCATE => MAX PENDING ENFORCING *
*==========================*/
//we don't have idle resource nor allocation permit
//we look at the borrowers and cull those that are above the maxPending limit (using pollLast!)
if (maxPending >= 0) {
borrowersCount = pendingSize;
int toCull = borrowersCount - maxPending;
for (int i = 0; i < toCull; i++) {
Borrower<POOLABLE> extraneous = pendingPoll(borrowers);
if (extraneous != null) {
//fail fast. differentiate slightly special case of maxPending == 0
if (maxPending == 0) {
extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit"));
}
else {
extraneous.fail(new PoolAcquirePendingLimitException(maxPending));
/*=========================================================================================================*
* One Borrower, but no idle resource, or some resources are still warming up
*=========================================================================================================*/

if(! warmupInProgress) {
// Warmup not in progress, can we allocate more ?
int permits = poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
/*==========================*
* ... and CANNOT ALLOCATE => MAX PENDING ENFORCING *
*==========================*/
//we don't have idle resource nor allocation permit
//we look at the borrowers and cull those that are above the maxPending limit (using pollLast!)
if (maxPending >= 0) {
borrowersCount = pendingSize;
int toCull = borrowersCount - maxPending;
for (int i = 0; i < toCull; i++) {
Borrower<POOLABLE> extraneous = pendingPoll(borrowers);
if (extraneous != null) {
//fail fast. differentiate slightly special case of maxPending == 0
if (maxPending == 0) {
extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit"));
}
else {
extraneous.fail(new PoolAcquirePendingLimitException(maxPending));
}
}
}
}
}
}
else {
/*=======================*
* ... and CAN ALLOCATE => Subscribe to allocator + Warmup *
*=======================*/
Borrower<POOLABLE> borrower = pendingPoll(borrowers);
if (borrower == null) {
continue; //we expect to detect pool is shut down in next round
}
if (isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
long start = clock.millis();
Mono<POOLABLE> allocator = allocatorWithScheduler();

Mono<POOLABLE> primary = allocator.doOnEach(sig -> {
if (sig.isOnNext()) {
POOLABLE newInstance = sig.get();
assert newInstance != null;
ACQUIRED.incrementAndGet(this);
metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start);
borrower.deliver(createSlot(newInstance));
else {
/*=======================*
* ... and CAN ALLOCATE => Subscribe to allocator + Warmup *
*=======================*/
Borrower<POOLABLE> borrower = pendingPoll(borrowers);
if (borrower == null) {
continue; //we expect to detect pool is shut down in next round
}
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start);
poolConfig.allocationStrategy()
.returnPermits(1);
borrower.fail(error);
if (isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
}
}).contextWrite(borrower.currentContext());
borrower.stopPendingCountdown();
long start = clock.millis();
Mono<POOLABLE> allocator = allocatorWithScheduler();

Mono<POOLABLE> primary = allocator.doOnEach(sig -> {
if (sig.isOnNext()) {
POOLABLE newInstance = sig.get();
assert newInstance != null;
ACQUIRED.incrementAndGet(this);
metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start);
borrower.deliver(createSlot(newInstance));
}
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start);
poolConfig.allocationStrategy()
.returnPermits(1);
borrower.fail(error);
}
}).contextWrite(borrower.currentContext());

if (permits == 1) {
//subscribe to the primary, which will directly feed to the borrower
primary.subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
}
else {
/*=============================================*
* (warm up in sequence to primary allocation) *
*=============================================*/
int toWarmup = permits - 1;
logger.debug("should warm up {} extra resources", toWarmup);

final long startWarmupIteration = clock.millis();
// flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency
// can be controlled from configuration
final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1);
Flux.range(1, toWarmup)
.map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain()))
.startWith(primary.doOnSuccess(__ -> drain()).then())
.flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified
.onErrorResume(e -> Mono.empty())
.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
if (permits == 1) {
//subscribe to the primary, which will directly feed to the borrower
primary.subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
}
else {
/*=============================================*
* (warm up in sequence to primary allocation) *
*=============================================*/
int toWarmup = permits - 1;
logger.debug("should warm up {} extra resources", toWarmup);

final long startWarmupIteration = clock.millis();
// flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency
// can be controlled from configuration
final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1);
warmupInProgress = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag does not guarantee exclusivity IMO. A stress test could be added to validate my accusation and I worry it would hold true :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think so

Copy link
Contributor Author

@pderop pderop Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see for the moment the concurrency issue, but I will investigate this, and will create a stress test.
The flag is only set from the drainLoop and then it is reset once the subscription to the Flux.range completes, and if drainLoop is missing the update of warmupInProgress, then drain() will cause drainLoop to be called again.
However, since both of you are seeing an issue here, I'll carefully revisit all this.

Flux.range(1, toWarmup)
.map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain()))
.startWith(primary.doOnSuccess(__ -> drain()).then())
.flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified
.onErrorResume(e -> Mono.empty())
.subscribe(aVoid -> {
}, alreadyPropagatedOrLogged -> drain(), () -> {
warmupInProgress = false;
drain();
});
Comment on lines +464 to +467
Copy link
Member

@violetagg violetagg Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On error warmupInProgress will continue to stay true. Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, the #175 is partially addressing this problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warmpupInProgress should be indeed reset even if there is an error. will think about this.

}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,13 @@
package reactor.pool;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.scheduler.clock.SchedulerClock;
import reactor.test.scheduler.VirtualTimeScheduler;

Expand Down Expand Up @@ -374,4 +377,66 @@ void testIsInactiveForMoreThan() {
.isTrue();
}

@Test
void testIssue172_block() throws InterruptedException {
Mono<Integer> allocator = Mono.just(1).subscribeOn(Schedulers.single());

InstrumentedPool<Integer> pool = PoolBuilder.from(allocator).sizeBetween(3, 7).buildPool();
pool.acquire().block();
pool.acquire().block();
assertThat(pool.metrics().allocatedSize()).isEqualTo(3);
}

@Test
void testIssue172_async() throws InterruptedException {
Mono<Integer> allocator = Mono.just(1).subscribeOn(Schedulers.single());

InstrumentedPool<Integer> pool = PoolBuilder.from(allocator).sizeBetween(3, 7).buildPool();
CountDownLatch latch = new CountDownLatch(2);
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
latch.await(5, TimeUnit.SECONDS);
assertThat(pool.metrics().allocatedSize()).isEqualTo(3);
}

@Test
void testIssue172_async_concurrent_warmup() throws InterruptedException {
Mono<Integer> allocator = Mono.just(1).subscribeOn(Schedulers.single());
pderop marked this conversation as resolved.
Show resolved Hide resolved

InstrumentedPool<Integer> pool = PoolBuilder.from(allocator).sizeBetween(3, 7, 3).buildPool();
CountDownLatch latch = new CountDownLatch(2);
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
latch.await(5, TimeUnit.SECONDS);
assertThat(pool.metrics().allocatedSize()).isEqualTo(3);
}

@Test
void testIssue172_async_with_delay_and_more_than_min() throws InterruptedException {
Mono<Integer> allocator = Mono.just(1).delayElement(Duration.ofSeconds(1));

InstrumentedPool<Integer> pool = PoolBuilder.from(allocator).sizeBetween(3, 7).buildPool();
CountDownLatch latch = new CountDownLatch(4);
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(pool.metrics().allocatedSize()).isEqualTo(4);
}

@Test
void testIssue172_async_with_delay_and_more_than_min_concurrent_warmup() throws InterruptedException {
Mono<Integer> allocator = Mono.just(1).delayElement(Duration.ofSeconds(1));

InstrumentedPool<Integer> pool = PoolBuilder.from(allocator).sizeBetween(3, 7, 3).buildPool();
CountDownLatch latch = new CountDownLatch(4);
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
pool.acquire().subscribe(integerPooledRef -> latch.countDown());
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(pool.metrics().allocatedSize()).isEqualTo(4);
}

}
Loading