diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index db831832c9..1a641ac0f2 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -18,6 +18,7 @@ import io.netty.resolver.AddressResolverGroup; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.ConnectionObserver; import reactor.netty.ReactorNetty; @@ -32,6 +33,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; @@ -449,6 +451,7 @@ class ConnectionPoolSpec> implements Suppl boolean metricsEnabled; String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY; Supplier registrar; + BiFunction pendingAcquireTimer; AllocationStrategy allocationStrategy; /** @@ -473,6 +476,7 @@ private ConnectionPoolSpec() { this.metricsEnabled = copy.metricsEnabled; this.leasingStrategy = copy.leasingStrategy; this.registrar = copy.registrar; + this.pendingAcquireTimer = copy.pendingAcquireTimer; this.allocationStrategy = copy.allocationStrategy; } @@ -648,6 +652,43 @@ public final SPEC evictInBackground(Duration evictionInterval) { return get(); } + /** + * Set the option to use for configuring {@link ConnectionProvider} pending acquire timer. + * The pending acquire timer must be specified as a function which is used to schedule a pending acquire timeout + * when there is no idle connection and no new connection can be created currently. + * The function takes as argument a {@link Duration} which is the one configured by {@link #pendingAcquireTimeout(Duration)}. + *

+ * Use this function if you want to specify your own implementation for scheduling pending acquire timers. + * + *

Default to {@link Schedulers#parallel()}. + * + *

Examples using Netty HashedWheelTimer implementation:

+ *
+		 * {@code
+		 * final static HashedWheelTimer wheel = new HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024);
+		 *
+		 * HttpClient client = HttpClient.create(
+		 *     ConnectionProvider.builder("myprovider")
+		 *         .pendingAcquireTimeout(Duration.ofMillis(10000))
+		 *         .pendingAcquireTimer((r, d) -> {
+		 *             Timeout t = wheel.newTimeout(timeout -> r.run(), d.toMillis(), TimeUnit.MILLISECONDS);
+		 *             return () -> t.cancel();
+		 *         })
+		 *         .build());
+		 * }
+		 * 
+ * + * @param pendingAcquireTimer the function to apply when scheduling pending acquire timers + * @return {@literal this} + * @throws NullPointerException if pendingAcquireTimer is null + * @since 1.0.20 + * @see #pendingAcquireTimeout(Duration) + */ + public final SPEC pendingAcquireTimer(BiFunction pendingAcquireTimer) { + this.pendingAcquireTimer = Objects.requireNonNull(pendingAcquireTimer, "pendingAcquireTimer"); + return get(); + } + /** * Limits in how many connections can be allocated and managed by the pool are driven by the * provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index e065c4f268..d010a9af64 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Supplier; @@ -369,6 +370,7 @@ protected static final class PoolFactory { final Supplier registrar; final Clock clock; final Duration disposeTimeout; + final BiFunction pendingAcquireTimer; final AllocationStrategy allocationStrategy; PoolFactory(ConnectionPoolSpec conf, Duration disposeTimeout) { @@ -389,6 +391,7 @@ protected static final class PoolFactory { this.registrar = conf.registrar; this.clock = clock; this.disposeTimeout = disposeTimeout; + this.pendingAcquireTimer = conf.pendingAcquireTimer; this.allocationStrategy = conf.allocationStrategy; } @@ -447,6 +450,10 @@ PoolBuilder> newPoolInternal( } } + if (pendingAcquireTimer != null) { + poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer); + } + if (clock != null) { poolBuilder = poolBuilder.clock(clock); } diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java index 64802ac61a..ffc289b3a5 100644 --- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java +++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java @@ -16,12 +16,14 @@ package reactor.netty.resources; import org.junit.jupiter.api.Test; +import reactor.core.Disposable; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.function.BiFunction; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; @@ -31,6 +33,7 @@ class ConnectionProviderTest { static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy(); static final String TEST_STRING = ""; static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {}; + static final BiFunction TEST_BI_FUNCTION = (r, duration) -> () -> {}; @Test void testBuilderCopyConstructor() throws IllegalAccessException { @@ -74,6 +77,9 @@ else if (boolean.class == clazz) { else if (int.class == clazz) { field.setInt(builder, 1); } + else if (BiFunction.class == clazz) { + field.set(builder, TEST_BI_FUNCTION); + } else { throw new IllegalArgumentException("Unknown field type " + clazz); } diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 11f5b639b4..733a0078c5 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -41,7 +41,6 @@ import reactor.core.Scannable; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.FutureMono; import reactor.netty.NettyPipeline; @@ -759,7 +758,7 @@ public void request(long n) { int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); int pending = pool.pendingSize; if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { - timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS); + timeoutTask = pool.poolConfig.pendingAcquireTimer().apply(this, acquireTimeout); } pool.doAcquire(this); } @@ -1042,4 +1041,4 @@ long lifeTime() { return pool.clock.millis() - creationTimestamp; } } -} \ No newline at end of file +} diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java index 59b77318a4..ebd06044da 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2Tests.java @@ -21,9 +21,11 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.core.scheduler.Schedulers; import reactor.netty.BaseHttpTest; import reactor.netty.ByteBufFlux; import reactor.netty.ByteBufMono; @@ -43,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.IntStream; @@ -159,14 +162,35 @@ private void doTestIssue1071(int length, String expectedResponse, int expectedCo @Test void testMaxActiveStreams_1_CustomPool() throws Exception { - ConnectionProvider provider = + doTestMaxActiveStreams_1_CustomPool(null); + } + + @Test + void testMaxActiveStreams_1_CustomPool_Custom_AcquireTimer() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + BiFunction timer = (r, d) -> { + Runnable wrapped = () -> { + r.run(); + latch.countDown(); + }; + return Schedulers.single().schedule(wrapped, d.toNanos(), TimeUnit.NANOSECONDS); + }; + doTestMaxActiveStreams_1_CustomPool(timer); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + void doTestMaxActiveStreams_1_CustomPool(BiFunction pendingAcquireTimer) throws Exception { + ConnectionProvider.Builder builder = ConnectionProvider.builder("testMaxActiveStreams_1_CustomPool") - .maxConnections(1) - .pendingAcquireTimeout(Duration.ofMillis(10)) // the default is 45s - .build(); + .maxConnections(1) + .pendingAcquireTimeout(Duration.ofMillis(10)); // the default is 45s + if (pendingAcquireTimer != null) { + builder = builder.pendingAcquireTimer(pendingAcquireTimer); + } + ConnectionProvider provider = builder.build(); doTestMaxActiveStreams(HttpClient.create(provider), 1, 1, 1); provider.disposeLater() - .block(); + .block(); } @Test diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index 6780f795c2..25f99f4ff4 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -22,8 +22,10 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2MultiplexHandler; import org.junit.jupiter.api.Test; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException; import reactor.netty.internal.shaded.reactor.pool.PoolBuilder; @@ -38,6 +40,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; @@ -832,6 +835,24 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { @Test void maxLifeTimeMaxConnectionsReached() throws Exception { + doMaxLifeTimeMaxConnectionsReached(null); + } + + @Test + void maxLifeTimeMaxConnectionsReachedWithCustomTimer() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + BiFunction timer = (r, d) -> { + Runnable wrapped = () -> { + r.run(); + latch.countDown(); + }; + return Schedulers.single().schedule(wrapped, d.toNanos(), TimeUnit.NANOSECONDS); + }; + doMaxLifeTimeMaxConnectionsReached(timer); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + private void doMaxLifeTimeMaxConnectionsReached(BiFunction pendingAcquireTimer) throws Exception { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.fromSupplier(() -> { Channel channel = new EmbeddedChannel( @@ -842,6 +863,9 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); + if (pendingAcquireTimer != null) { + poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer); + } Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10)); Connection connection = null;