From 49264555d7869b442cc529b71cd79733363d605f Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 19 Apr 2022 13:24:05 +0300 Subject: [PATCH] Add API for configuring minimum connections for HTTP/2 connection pool Fixes #1808 --- .../reactor/netty/http/Http2SettingsSpec.java | 51 +++++++- .../http/client/Http2ConnectionProvider.java | 3 +- .../reactor/netty/http/client/Http2Pool.java | 116 ++++++++++-------- .../netty/http/Http2SettingsSpecTests.java | 28 ++++- .../netty/http/client/Http2PoolTest.java | 113 +++++++++++++++-- .../DefaultPooledConnectionProviderTest.java | 69 +++++++++++ 6 files changed, 314 insertions(+), 66 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java index 773a6256d0..01448b8d8f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,8 @@ import java.util.Objects; +import static io.netty.handler.codec.http2.Http2CodecUtil.NUM_STANDARD_SETTINGS; + /** * A configuration builder to fine tune the {@link Http2Settings}. * @@ -78,6 +80,15 @@ public interface Builder { */ Builder maxHeaderListSize(long maxHeaderListSize); + /** + * Sets the {@code SETTINGS_MIN_CONNECTIONS} value. + * + * @param minConnections the {@code SETTINGS_MAX_HEADER_LIST_SIZE} value + * @return {@code this} + * @since 1.0.19 + */ + Builder minConnections(int minConnections); + /** * Sets the {@code SETTINGS_ENABLE_PUSH} value. * @@ -147,6 +158,18 @@ public Long maxHeaderListSize() { return maxHeaderListSize; } + /** + * Returns the configured {@code SETTINGS_MIN_CONNECTIONS} value or + * the default {@code 0}. + * + * @return the configured {@code SETTINGS_MIN_CONNECTIONS} value or + * the default {@code 0}. + * @since 1.0.19 + */ + public Integer minConnections() { + return minConnections; + } + /** * Returns the configured {@code SETTINGS_ENABLE_PUSH} value or null. * @@ -171,19 +194,24 @@ public boolean equals(Object o) { Objects.equals(maxConcurrentStreams, that.maxConcurrentStreams) && Objects.equals(maxFrameSize, that.maxFrameSize) && maxHeaderListSize.equals(that.maxHeaderListSize) && + minConnections.equals(that.minConnections) && Objects.equals(pushEnabled, that.pushEnabled); } @Override public int hashCode() { - return Objects.hash(headerTableSize, initialWindowSize, maxConcurrentStreams, maxFrameSize, maxHeaderListSize, pushEnabled); + return Objects.hash(headerTableSize, initialWindowSize, maxConcurrentStreams, maxFrameSize, maxHeaderListSize, + minConnections, pushEnabled); } + static final char SETTINGS_MIN_CONNECTIONS = NUM_STANDARD_SETTINGS + 1; + final Long headerTableSize; final Integer initialWindowSize; final Long maxConcurrentStreams; final Integer maxFrameSize; final Long maxHeaderListSize; + final Integer minConnections; final Boolean pushEnabled; Http2SettingsSpec(Build build) { @@ -193,11 +221,19 @@ public int hashCode() { maxConcurrentStreams = settings.maxConcurrentStreams(); maxFrameSize = settings.maxFrameSize(); maxHeaderListSize = settings.maxHeaderListSize(); + minConnections = settings.getIntValue(SETTINGS_MIN_CONNECTIONS); pushEnabled = settings.pushEnabled(); } static final class Build implements Builder { - final Http2Settings http2Settings = Http2Settings.defaultSettings(); + static final Long DEFAULT_MIN_CONNECTIONS = 0L; + + final Http2Settings http2Settings; + + Build() { + http2Settings = Http2Settings.defaultSettings(); + http2Settings.put(SETTINGS_MIN_CONNECTIONS, DEFAULT_MIN_CONNECTIONS); + } @Override public Http2SettingsSpec build() { @@ -234,6 +270,15 @@ public Builder maxHeaderListSize(long maxHeaderListSize) { return this; } + @Override + public Builder minConnections(int minConnections) { + if (minConnections < 0) { + throw new IllegalArgumentException("Setting MIN_CONNECTIONS is invalid: " + minConnections); + } + http2Settings.put(SETTINGS_MIN_CONNECTIONS, Long.valueOf(minConnections)); + return this; + } + /* @Override public Builder pushEnabled(boolean pushEnabled) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index cf76c73902..9e8f58f902 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -466,7 +466,8 @@ static final class PooledConnectionAllocator { this.remoteAddress = remoteAddress; this.resolver = resolver; this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE, - poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime())); + poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime(), + this.config.http2Settings != null ? this.config.http2Settings.minConnections() : 0)); } Publisher connectChannel() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index a86dd87f3b..5d0d2de7d0 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -80,6 +80,10 @@ *
  • {@link PoolMetrics#idleSize()} always returns {@code 0}.
  • * *

    + * If minimum connections is specified, the cached connections with active streams will be kept at that minimum + * (can be the best effort). However, if the cached connections have reached max concurrent streams, + * then new connections will be allocated up to the maximum connections limit. + *

    * Configurations that are not applicable *

    *

    This class is based on * https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java @@ -141,18 +144,24 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool. final Clock clock; final long maxLifeTime; + final int minConnections; final PoolConfig poolConfig; long lastInteractionTimestamp; - Http2Pool(PoolConfig poolConfig, long maxLifeTime) { + Http2Pool(PoolConfig poolConfig, long maxLifeTime, int minConnections) { if (poolConfig.allocationStrategy().getPermits(0) != 0) { - throw new IllegalArgumentException("No support for configuring minimum number of connections"); + throw new IllegalArgumentException( + "No support for configuring minimum number of connections via AllocationStrategy"); + } + if (minConnections > poolConfig.allocationStrategy().permitMaximum()) { + throw new IllegalArgumentException("Minimum number of connections must be less than or equal to maximum"); } this.clock = poolConfig.clock(); this.connections = new ConcurrentLinkedQueue<>(); this.lastInteractionTimestamp = clock.millis(); this.maxLifeTime = maxLifeTime; + this.minConnections = minConnections; this.pending = new ConcurrentLinkedDeque<>(); this.poolConfig = poolConfig; @@ -310,8 +319,11 @@ void drainLoop() { int borrowersCount = pendingSize; if (borrowersCount != 0) { + int resourcesCount = resources.size(); // find a connection that can be used for opening a new stream - Slot slot = findConnection(resources); + // when cached connections are below minimum connections, then allocate a new connection + boolean belowMinConnections = minConnections > 0 && resourcesCount < minConnections; + Slot slot = belowMinConnections ? null : findConnection(resources, resourcesCount); if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null) { @@ -338,54 +350,59 @@ void drainLoop() { } } else { - int permits = poolConfig.allocationStrategy().getPermits(1); - if (permits <= 0) { - if (maxPending >= 0) { - borrowersCount = pendingSize; - int toCull = borrowersCount - maxPending; - for (int i = 0; i < toCull; i++) { - Borrower extraneous = pollPending(borrowers, true); - if (extraneous != null) { - pendingAcquireLimitReached(extraneous, maxPending); - } - } - } + if (belowMinConnections && poolConfig.allocationStrategy().permitGranted() >= minConnections) { + // connections allocations were triggered } else { - Borrower borrower = pollPending(borrowers, true); - if (borrower == null) { - continue; + int permits = poolConfig.allocationStrategy().getPermits(1); + if (permits <= 0) { + if (maxPending >= 0) { + borrowersCount = pendingSize; + int toCull = borrowersCount - maxPending; + for (int i = 0; i < toCull; i++) { + Borrower extraneous = pollPending(borrowers, true); + if (extraneous != null) { + pendingAcquireLimitReached(extraneous, maxPending); + } + } + } } - if (isDisposed()) { - borrower.fail(new PoolShutdownException()); - return; + else { + Borrower borrower = pollPending(borrowers, true); + if (borrower == null) { + continue; + } + if (isDisposed()) { + borrower.fail(new PoolShutdownException()); + return; + } + borrower.stopPendingCountdown(); + Mono allocator = poolConfig.allocator(); + Mono primary = + allocator.doOnEach(sig -> { + if (sig.isOnNext()) { + Connection newInstance = sig.get(); + assert newInstance != null; + Slot newSlot = new Slot(this, newInstance); + if (log.isDebugEnabled()) { + log.debug(format(newInstance.channel(), "Channel activated")); + } + ACQUIRED.incrementAndGet(this); + newSlot.incrementConcurrencyAndGet(); + newSlot.deactivate(); + borrower.deliver(new Http2PooledRef(newSlot)); + } + else if (sig.isOnError()) { + Throwable error = sig.getThrowable(); + assert error != null; + poolConfig.allocationStrategy().returnPermits(1); + borrower.fail(error); + } + }) + .contextWrite(borrower.currentContext()); + + primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } - borrower.stopPendingCountdown(); - Mono allocator = poolConfig.allocator(); - Mono primary = - allocator.doOnEach(sig -> { - if (sig.isOnNext()) { - Connection newInstance = sig.get(); - assert newInstance != null; - Slot newSlot = new Slot(this, newInstance); - if (log.isDebugEnabled()) { - log.debug(format(newInstance.channel(), "Channel activated")); - } - ACQUIRED.incrementAndGet(this); - newSlot.incrementConcurrencyAndGet(); - newSlot.deactivate(); - borrower.deliver(new Http2PooledRef(newSlot)); - } - else if (sig.isOnError()) { - Throwable error = sig.getThrowable(); - assert error != null; - poolConfig.allocationStrategy().returnPermits(1); - borrower.fail(error); - } - }) - .contextWrite(borrower.currentContext()); - - primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain); } } } @@ -398,8 +415,7 @@ else if (sig.isOnError()) { } @Nullable - Slot findConnection(ConcurrentLinkedQueue resources) { - int resourcesCount = resources.size(); + Slot findConnection(ConcurrentLinkedQueue resources, int resourcesCount) { while (resourcesCount > 0) { // There are connections in the queue diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java index 7ba391a8fa..b7ead4c912 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/Http2SettingsSpecTests.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static reactor.netty.http.Http2SettingsSpec.Build.DEFAULT_MIN_CONNECTIONS; class Http2SettingsSpecTests { @@ -40,6 +41,7 @@ void headerTableSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue()); assertThat(spec.pushEnabled()).isNull(); } @@ -59,6 +61,7 @@ void initialWindowSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue()); assertThat(spec.pushEnabled()).isNull(); } @@ -78,6 +81,7 @@ void maxConcurrentStreams() { assertThat(spec.maxConcurrentStreams()).isEqualTo(123); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue()); assertThat(spec.pushEnabled()).isNull(); } @@ -97,6 +101,7 @@ void maxFrameSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isEqualTo(16384); assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue()); assertThat(spec.pushEnabled()).isNull(); } @@ -116,6 +121,7 @@ void maxHeaderListSize() { assertThat(spec.maxConcurrentStreams()).isNull(); assertThat(spec.maxFrameSize()).isNull(); assertThat(spec.maxHeaderListSize()).isEqualTo(123); + assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue()); assertThat(spec.pushEnabled()).isNull(); } @@ -126,6 +132,26 @@ void maxHeaderListSizeBadValues() { .withMessage("Setting MAX_HEADER_LIST_SIZE is invalid: -1"); } + @Test + void minConnections() { + builder.minConnections(4); + Http2SettingsSpec spec = builder.build(); + assertThat(spec.headerTableSize()).isNull(); + assertThat(spec.initialWindowSize()).isNull(); + assertThat(spec.maxConcurrentStreams()).isNull(); + assertThat(spec.maxFrameSize()).isNull(); + assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE); + assertThat(spec.minConnections()).isEqualTo(4); + assertThat(spec.pushEnabled()).isNull(); + } + + @Test + void minConnectionsBadValues() { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> builder.minConnections(-1)) + .withMessage("Setting MIN_CONNECTIONS is invalid: -1"); + } + /* @Test public void pushEnabled() { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index dc24581f27..7f43d3d340 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -22,6 +22,7 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2MultiplexHandler; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool; @@ -53,7 +54,7 @@ void acquireInvalidate() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 0)); try { List> acquired = new ArrayList<>(); @@ -92,7 +93,7 @@ void acquireRelease() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 0)); try { List> acquired = new ArrayList<>(); @@ -134,7 +135,7 @@ void evictClosedConnection() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 0)); Connection connection = null; try { @@ -197,7 +198,7 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 0)); Connection connection = null; try { @@ -256,7 +257,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 0)); Connection connection = null; try { @@ -310,7 +311,7 @@ void maxLifeTime() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, 0)); Connection connection1 = null; Connection connection2 = null; @@ -374,7 +375,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 2); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, 0)); Connection connection1 = null; Connection connection2 = null; @@ -434,7 +435,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10)); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, 0)); Connection connection = null; try { @@ -473,11 +474,101 @@ void maxLifeTimeMaxConnectionsReached() throws Exception { } @Test - void minConnectionsConfigNotSupported() { + void minConnections() { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 3); + InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .blockLast(Duration.ofSeconds(1)); + + assertThat(acquired).hasSize(3); + assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(2).poolable()); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } + } + + @Test + void minConnectionsExceedsMaximum() { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.empty()).sizeBetween(0, 2); + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, 3))); + } + + @Test + void minConnectionsMaxStreamsReached() { + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + Channel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build()); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 3); + InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 1)); + + List> acquired = new ArrayList<>(); + try { + Flux.range(0, 3) + .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add)) + .blockLast(Duration.ofSeconds(1)); + + assertThat(acquired).hasSize(3); + assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(1).poolable()); + assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(2).poolable()); + assertThat(acquired.get(1).poolable()).isNotSameAs(acquired.get(2).poolable()); + + for (PooledRef slot : acquired) { + slot.release().block(Duration.ofSeconds(1)); + } + + assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0); + } + finally { + for (PooledRef slot : acquired) { + Connection conn = slot.poolable(); + ((EmbeddedChannel) conn.channel()).finishAndReleaseAll(); + conn.dispose(); + } + } + } + + @Test + void minConnectionsViaAllocationStrategyNotSupported() { PoolBuilder> poolBuilder = PoolBuilder.from(Mono.empty()).sizeBetween(1, 2); assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1))); + .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, 0))); } @Test @@ -488,7 +579,7 @@ void nonHttp2ConnectionEmittedOnce() { .idleResourceReuseLruOrder() .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1)); + InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 0)); try { PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1)); diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 6ee8a43053..db6173055d 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -484,6 +484,75 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie } } + //https://github.com/reactor/reactor-netty/issues/1808 + @Test + void testMinConnections() throws Exception { + Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + Http2SslContextSpec clientCtx = + Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + + disposableServer = + createServer() + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(serverCtx)) + .route(routes -> routes.post("/", (req, res) -> res.send(req.receive().retain()))) + .bindNow(); + + int requestsNum = 100; + CountDownLatch latch = new CountDownLatch(1); + DefaultPooledConnectionProvider provider = + (DefaultPooledConnectionProvider) ConnectionProvider.create("testMinConnections", 20); + AtomicInteger counter = new AtomicInteger(); + HttpClient client = + createClient(provider, disposableServer.port()) + .wiretap(false) + .protocol(HttpProtocol.H2) + .secure(spec -> spec.sslContext(clientCtx)) + .http2Settings(builder -> builder.minConnections(5)) + .observe((conn, state) -> { + if (state == ConnectionObserver.State.CONNECTED) { + counter.incrementAndGet(); + } + if (state == ConnectionObserver.State.RELEASED) { + conn.channel().eventLoop().execute(() -> { + if (counter.decrementAndGet() == 0) { + latch.countDown(); + } + }); + } + }); + + try { + Flux.range(0, requestsNum) + .flatMap(i -> + client.post() + .uri("/") + .send(ByteBufMono.fromString(Mono.just("testMinConnections"))) + .responseContent() + .aggregate() + .asString()) + .blockLast(Duration.ofSeconds(5)); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + + assertThat(provider.channelPools).hasSize(1); + + @SuppressWarnings({"unchecked", "rawtypes"}) + InstrumentedPool channelPool = + provider.channelPools.values().toArray(new InstrumentedPool[0])[0]; + InstrumentedPool.PoolMetrics metrics = channelPool.metrics(); + assertThat(metrics.acquiredSize()).isEqualTo(0); + assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize()); + assertThat(metrics.allocatedSize()).isLessThan(10); + } + finally { + provider.disposeLater() + .block(Duration.ofSeconds(5)); + } + } + static final class TestPromise extends DefaultChannelPromise { final ChannelPromise parent;