Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize pool warmup #171

Merged
merged 9 commits into from
Jul 9, 2023
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,16 +74,28 @@ static final class SizeBasedAllocationStrategy implements AllocationStrategy {

final int min;
final int max;
final int warmupParallelism;

volatile int permits;
static final AtomicIntegerFieldUpdater<SizeBasedAllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(SizeBasedAllocationStrategy.class, "permits");

SizeBasedAllocationStrategy(int min, int max) {
this(min, max, PoolBuilder.DEFAULT_WARMUP_PARALLELISM);
}

SizeBasedAllocationStrategy(int min, int max, int warmupParallelism) {
if (min < 0) throw new IllegalArgumentException("min must be positive or zero");
if (max < 1) throw new IllegalArgumentException("max must be strictly positive");
if (min > max) throw new IllegalArgumentException("min must be less than or equal to max");
if (min > 0 && warmupParallelism < 1) {
throw new IllegalArgumentException("warmupParallelism must be greater than 0");
}
if (min > 0 && warmupParallelism > min) {
throw new IllegalArgumentException("warmupParallelism must be less than or equal to min");
}
this.min = min;
this.max = max;
this.warmupParallelism = warmupParallelism;
PERMITS.lazySet(this, this.max);
}

Expand Down Expand Up @@ -146,5 +158,10 @@ public void returnPermits(int returned) {
}
}
}

@Override
public int warmupParallelism() {
return warmupParallelism;
}
}
}
20 changes: 19 additions & 1 deletion reactor-pool/src/main/java/reactor/pool/AllocationStrategy.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,4 +75,22 @@ public interface AllocationStrategy {
* is not consistent with the strategy's limits and delivered permits.
*/
void returnPermits(int returned);

/**
* Return the concurrency level used when the allocator is subscribed to during the warmup phase, if any.
* <p>
* The number of resources created concurrently will not exceed the value returned by {@code warmupParallelism()}.
* If the concurrency level is set to 1, pre-allocation of resources will be performed sequentially by subscribing to the allocator
* one at a time. The process waits for a resource to be created before subscribing again to the allocator.
* This sequence continues until all pre-allocated resources have been successfully created.
* <p>
* Defaults to 1
*
* @return The concurrency level used when the allocator is subscribed to during the warmup phase, must be positive,
* {@code 1} by default
* @since 1.0.1
*/
default int warmupParallelism() {
return PoolBuilder.DEFAULT_WARMUP_PARALLELISM;
}
}
35 changes: 32 additions & 3 deletions reactor-pool/src/main/java/reactor/pool/PoolBuilder.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -349,6 +349,10 @@ public PoolBuilder<T, CONF> releaseHandler(Function<T, ? extends Publisher<Void>
* {@code min} live resources before serving the acquire with (one of) the newly created resource(s).
* At the same time it MUST NOT allocate any resource if that would bring the number of live resources
* over the {@code max}, rejecting further allocations until some resources have been {@link PooledRef#release() released}.
* <p>
* Pre-allocation of warmed-up resources, if any, will be performed sequentially by subscribing to the allocator
* one at a time. The process waits for a resource to be created before subscribing again to the allocator.
* This sequence continues until all pre-allocated resources have been successfully created.
*
* @param min the minimum number of live resources to keep in the pool (can be best effort)
* @param max the maximum number of live resources to keep in the pool. use {@link Integer#MAX_VALUE} when you only need a
Expand All @@ -358,7 +362,32 @@ public PoolBuilder<T, CONF> releaseHandler(Function<T, ? extends Publisher<Void>
* @see #allocationStrategy(AllocationStrategy)
*/
public PoolBuilder<T, CONF> sizeBetween(int min, int max) {
return allocationStrategy(new AllocationStrategies.SizeBasedAllocationStrategy(min, max));
return sizeBetween(min, max, DEFAULT_WARMUP_PARALLELISM);
}

/**
* Replace the {@link AllocationStrategy} with one that lets the {@link Pool} allocate between {@code min} and {@code max} resources.
* When acquiring and there is no available resource, the pool should strive to warm up enough resources to reach
* {@code min} live resources before serving the acquire with (one of) the newly created resource(s).
* At the same time it MUST NOT allocate any resource if that would bring the number of live resources
* over the {@code max}, rejecting further allocations until some resources have been {@link PooledRef#release() released}.
*
* @param min the minimum number of live resources to keep in the pool (can be best effort)
* @param max the maximum number of live resources to keep in the pool. use {@link Integer#MAX_VALUE} when you only need a
* minimum and no upper bound
* @param warmupParallelism Specifies the concurrency level used when the allocator is subscribed to during the warmup phase, if any.
* During warmup, resources that can be pre-allocated will be created eagerly, but at most {@code warmupParallelism} resources are
* subscribed to at the same time.
* A {@code warmupParallelism} of 1 means that pre-allocation of resources is achieved by sequentially subscribing to the allocator,
* waiting for a resource to be created before subscribing a next time to the allocator, and so on until the last allocation
* completes.
* @return this {@link Pool} builder
* @see #sizeUnbounded()
* @see #allocationStrategy(AllocationStrategy)
* @since 1.0.1
*/
public PoolBuilder<T, CONF> sizeBetween(int min, int max, int warmupParallelism) {
return allocationStrategy(new AllocationStrategies.SizeBasedAllocationStrategy(min, max, warmupParallelism));
}

/**
Expand Down Expand Up @@ -501,5 +530,5 @@ static <T> BiPredicate<T, PooledRefMetadata> idlePredicate(Duration maxIdleTime)
static final Function<?, Mono<Void>> NOOP_HANDLER = it -> Mono.empty();
static final BiPredicate<?, ?> NEVER_PREDICATE = (ignored1, ignored2) -> false;
static final BiFunction<Runnable, Duration, Disposable> DEFAULT_PENDING_ACQUIRE_TIMER = (r, d) -> Schedulers.parallel().schedule(r, d.toNanos(), TimeUnit.NANOSECONDS);

static final int DEFAULT_WARMUP_PARALLELISM = 1;
}
24 changes: 15 additions & 9 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiPredicate;
import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -249,7 +250,10 @@ public Mono<Integer> warmup() {
.returnPermits(1);
});
}
return Flux.concat(allWarmups)
// merge will eagerly subscribe to all warmups from the current thread, but
// the parallelism can be controlled from configuration.
int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), allWarmups.length);
return Flux.merge(Flux.fromArray(allWarmups), mergeConcurrency)
.reduce(0, (count, p) -> count + 1);
});
}
Expand Down Expand Up @@ -442,13 +446,15 @@ else if (sig.isOnError()) {
logger.debug("should warm up {} extra resources", toWarmup);

final long startWarmupIteration = clock.millis();
Flux<Void> warmupFlux = Flux.range(1, toWarmup)
//individual warmup failures decrement the permit and are logged
.flatMap(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator));

primary.onErrorResume(e -> Mono.empty())
.thenMany(warmupFlux)
.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
// flatMap will eagerly subscribe to the allocator from the current thread, but the concurrency
// can be controlled from configuration
final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1);
Flux.range(1, toWarmup)
.map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain()))
.startWith(primary.doOnSuccess(__ -> drain()).then())
.flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified
.onErrorResume(e -> Mono.empty())
.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -250,4 +250,9 @@ public int permitMinimum() {
public int permitMaximum() {
return delegate.permitMaximum();
}

@Override
public int warmupParallelism() {
return delegate.warmupParallelism();
}
}
43 changes: 42 additions & 1 deletion reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1413,6 +1413,47 @@ void recordsAllocationLatenciesInWarmup(PoolStyle configAdjuster) {
assertThat(minError).as("allocation error latency").isGreaterThanOrEqualTo(200L);
}

@ParameterizedTestWithName
@MethodSource("allPools")
@Tag("metrics")
void recordsAllocationLatenciesInEagerWarmup(PoolStyle configAdjuster) {
AtomicBoolean flip = new AtomicBoolean();
//note the starter method here is irrelevant, only the config is created and passed to createPool
PoolBuilder<String, ?> builder = PoolBuilder
.from(Mono.defer(() -> {
if (flip.compareAndSet(false, true)) {
return Mono.just("foo").delayElement(Duration.ofMillis(100));
}
else {
flip.compareAndSet(true, false);
return Mono.delay(Duration.ofMillis(200)).then(Mono.error(new IllegalStateException("boom")));
}
}))
.sizeBetween(10, Integer.MAX_VALUE, 10)
.metricsRecorder(recorder)
.clock(recorder.getClock());
AbstractPool<String> pool = configAdjuster.apply(builder);

// warmup will eagerly subscribe 10 times to the allocator.
// The five first subscribtions will success (after around 100 millis), and some allocation should fail after around
// 200 millis.
assertThatIllegalStateException()
.isThrownBy(() -> pool.warmup().block());

// at least 5 allocation should be successful
assertThat(recorder.getAllocationSuccessCount()).isEqualTo(5);
// at least 1 allocation should have failed
assertThat(recorder.getAllocationErrorCount()).isGreaterThanOrEqualTo(1);
// at least 6 allocations should have taken place
assertThat(recorder.getAllocationTotalCount()).isGreaterThanOrEqualTo(6);

long minSuccess = recorder.getAllocationSuccessHistogram().getMinValue();
long minError = recorder.getAllocationErrorHistogram().getMinValue();

assertThat(minSuccess).as("allocation success latency").isGreaterThanOrEqualTo(100L);
assertThat(minError).as("allocation error latency").isGreaterThanOrEqualTo(200L);
}

@ParameterizedTestWithName
@MethodSource("allPools")
@Tag("metrics")
Expand Down
Loading