diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 7a80131f..3ca61315 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -99,6 +99,9 @@ public class SimpleDequePool extends AbstractPool { Disposable evictionTask; + // Flag used to avoid creating resources while warmup is in progress + volatile boolean warmupInProgress = false; + SimpleDequePool(PoolConfig poolConfig) { super(poolConfig, Loggers.getLogger(SimpleDequePool.class)); this.idleResourceLeastRecentlyUsed = poolConfig.reuseIdleResourcesInLruOrder(); @@ -372,89 +375,97 @@ private void drainLoop() { .schedule(() -> borrower.deliver(slot)); } else { - /*==================================* - * One Borrower, but NO RESOURCE... * - *==================================*/ - // Can we allocate more? - int permits = poolConfig.allocationStrategy().getPermits(1); - if (permits <= 0) { - /*==========================* - * ... and CANNOT ALLOCATE => MAX PENDING ENFORCING * - *==========================*/ - //we don't have idle resource nor allocation permit - //we look at the borrowers and cull those that are above the maxPending limit (using pollLast!) - if (maxPending >= 0) { - borrowersCount = pendingSize; - int toCull = borrowersCount - maxPending; - for (int i = 0; i < toCull; i++) { - Borrower extraneous = pendingPoll(borrowers); - if (extraneous != null) { - //fail fast. differentiate slightly special case of maxPending == 0 - if (maxPending == 0) { - extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit")); - } - else { - extraneous.fail(new PoolAcquirePendingLimitException(maxPending)); + /*=========================================================================================================* + * One Borrower, but no idle resource, or some resources are still warming up + *=========================================================================================================*/ + + if(! warmupInProgress) { + // Warmup not in progress, can we allocate more ? + int permits = poolConfig.allocationStrategy().getPermits(1); + if (permits <= 0) { + /*==========================* + * ... and CANNOT ALLOCATE => MAX PENDING ENFORCING * + *==========================*/ + //we don't have idle resource nor allocation permit + //we look at the borrowers and cull those that are above the maxPending limit (using pollLast!) + if (maxPending >= 0) { + borrowersCount = pendingSize; + int toCull = borrowersCount - maxPending; + for (int i = 0; i < toCull; i++) { + Borrower extraneous = pendingPoll(borrowers); + if (extraneous != null) { + //fail fast. differentiate slightly special case of maxPending == 0 + if (maxPending == 0) { + extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit")); + } + else { + extraneous.fail(new PoolAcquirePendingLimitException(maxPending)); + } } } } } - } - else { - /*=======================* - * ... and CAN ALLOCATE => Subscribe to allocator + Warmup * - *=======================*/ - Borrower borrower = pendingPoll(borrowers); - if (borrower == null) { - continue; //we expect to detect pool is shut down in next round - } - if (isDisposed()) { - borrower.fail(new PoolShutdownException()); - return; - } - borrower.stopPendingCountdown(); - long start = clock.millis(); - Mono allocator = allocatorWithScheduler(); - - Mono primary = allocator.doOnEach(sig -> { - if (sig.isOnNext()) { - POOLABLE newInstance = sig.get(); - assert newInstance != null; - ACQUIRED.incrementAndGet(this); - metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); - borrower.deliver(createSlot(newInstance)); + else { + /*=======================* + * ... and CAN ALLOCATE => Subscribe to allocator + Warmup * + *=======================*/ + Borrower borrower = pendingPoll(borrowers); + if (borrower == null) { + continue; //we expect to detect pool is shut down in next round } - else if (sig.isOnError()) { - Throwable error = sig.getThrowable(); - assert error != null; - metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start); - poolConfig.allocationStrategy() - .returnPermits(1); - borrower.fail(error); + if (isDisposed()) { + borrower.fail(new PoolShutdownException()); + return; } - }).contextWrite(borrower.currentContext()); + borrower.stopPendingCountdown(); + long start = clock.millis(); + Mono allocator = allocatorWithScheduler(); + + Mono primary = allocator.doOnEach(sig -> { + if (sig.isOnNext()) { + POOLABLE newInstance = sig.get(); + assert newInstance != null; + ACQUIRED.incrementAndGet(this); + metricsRecorder.recordAllocationSuccessAndLatency(clock.millis() - start); + borrower.deliver(createSlot(newInstance)); + } + else if (sig.isOnError()) { + Throwable error = sig.getThrowable(); + assert error != null; + metricsRecorder.recordAllocationFailureAndLatency(clock.millis() - start); + poolConfig.allocationStrategy() + .returnPermits(1); + borrower.fail(error); + } + }).contextWrite(borrower.currentContext()); - if (permits == 1) { - //subscribe to the primary, which will directly feed to the borrower - primary.subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); - } - else { - /*=============================================* - * (warm up in sequence to primary allocation) * - *=============================================*/ - int toWarmup = permits - 1; - logger.debug("should warm up {} extra resources", toWarmup); - - final long startWarmupIteration = clock.millis(); - // flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency - // can be controlled from configuration - final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); - Flux.range(1, toWarmup) - .map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain())) - .startWith(primary.doOnSuccess(__ -> drain()).then()) - .flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified - .onErrorResume(e -> Mono.empty()) - .subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); + if (permits == 1) { + //subscribe to the primary, which will directly feed to the borrower + primary.subscribe(alreadyPropagated -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); + } + else { + /*=============================================* + * (warm up in sequence to primary allocation) * + *=============================================*/ + int toWarmup = permits - 1; + logger.debug("should warm up {} extra resources", toWarmup); + + final long startWarmupIteration = clock.millis(); + // flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency + // can be controlled from configuration + final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); + warmupInProgress = true; + Flux.range(1, toWarmup) + .map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain())) + .startWith(primary.doOnSuccess(__ -> drain()).then()) + .flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified + .onErrorResume(e -> Mono.empty()) + .subscribe(aVoid -> { + }, alreadyPropagatedOrLogged -> drain(), () -> { + warmupInProgress = false; + drain(); + }); + } } } } diff --git a/reactor-pool/src/test/java/reactor/pool/SimpleDequePoolInstrumentationTest.java b/reactor-pool/src/test/java/reactor/pool/SimpleDequePoolInstrumentationTest.java index ab9a6317..055b29bf 100644 --- a/reactor-pool/src/test/java/reactor/pool/SimpleDequePoolInstrumentationTest.java +++ b/reactor-pool/src/test/java/reactor/pool/SimpleDequePoolInstrumentationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,13 @@ package reactor.pool; import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.scheduler.clock.SchedulerClock; import reactor.test.scheduler.VirtualTimeScheduler; @@ -374,4 +377,66 @@ void testIsInactiveForMoreThan() { .isTrue(); } + @Test + void testIssue172_block() throws InterruptedException { + Mono allocator = Mono.just(1).subscribeOn(Schedulers.single()); + + InstrumentedPool pool = PoolBuilder.from(allocator).sizeBetween(3, 7).buildPool(); + pool.acquire().block(); + pool.acquire().block(); + assertThat(pool.metrics().allocatedSize()).isEqualTo(3); + } + + @Test + void testIssue172_async() throws InterruptedException { + Mono allocator = Mono.just(1).subscribeOn(Schedulers.single()); + + InstrumentedPool pool = PoolBuilder.from(allocator).sizeBetween(3, 7).buildPool(); + CountDownLatch latch = new CountDownLatch(2); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + latch.await(5, TimeUnit.SECONDS); + assertThat(pool.metrics().allocatedSize()).isEqualTo(3); + } + + @Test + void testIssue172_async_concurrent_warmup() throws InterruptedException { + Mono allocator = Mono.just(1).subscribeOn(Schedulers.single()); + + InstrumentedPool pool = PoolBuilder.from(allocator).sizeBetween(3, 7, 3).buildPool(); + CountDownLatch latch = new CountDownLatch(2); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + latch.await(5, TimeUnit.SECONDS); + assertThat(pool.metrics().allocatedSize()).isEqualTo(3); + } + + @Test + void testIssue172_async_with_delay_and_more_than_min() throws InterruptedException { + Mono allocator = Mono.just(1).delayElement(Duration.ofSeconds(1)); + + InstrumentedPool pool = PoolBuilder.from(allocator).sizeBetween(3, 7).buildPool(); + CountDownLatch latch = new CountDownLatch(4); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(pool.metrics().allocatedSize()).isEqualTo(4); + } + + @Test + void testIssue172_async_with_delay_and_more_than_min_concurrent_warmup() throws InterruptedException { + Mono allocator = Mono.just(1).delayElement(Duration.ofSeconds(1)); + + InstrumentedPool pool = PoolBuilder.from(allocator).sizeBetween(3, 7, 3).buildPool(); + CountDownLatch latch = new CountDownLatch(4); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + pool.acquire().subscribe(integerPooledRef -> latch.countDown()); + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(pool.metrics().allocatedSize()).isEqualTo(4); + } + }