> {
+
+ /**
+ * Returns a deep copy of this instance.
+ *
+ * @return a deep copy of this instance
+ */
+ A copy();
+
+ /**
+ * Best-effort peek at the state of the strategy which indicates roughly how many more connections can currently be
+ * allocated. Should be paired with {@link #getPermits(int)} for an atomic permission.
+ *
+ * @return an ESTIMATED count of how many more connections can currently be allocated
+ */
+ int estimatePermitCount();
+
+ /**
+ * Try to get the permission to allocate a {@code desired} positive number of new connections. Returns the permissible
+ * number of connections which MUST be created (otherwise the internal live counter of the strategy might be off).
+ * This permissible number might be zero, and it can also be a greater number than {@code desired}.
+ * Once a connection is discarded from the pool, it must update the strategy using {@link #returnPermits(int)}
+ * (which can happen in batches or with value {@literal 1}).
+ *
+ * @param desired the desired number of new connections
+ * @return the actual number of new connections that MUST be created, can be 0 and can be more than {@code desired}
+ */
+ int getPermits(int desired);
+
+ /**
+ * Returns the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
+ *
+ * @return the best estimate of the number of permits currently granted, between 0 and {@link Integer#MAX_VALUE}
+ */
+ int permitGranted();
+
+ /**
+ * Return the minimum number of permits this strategy tries to maintain granted
+ * (reflecting a minimal size for the pool), or {@code 0} for scale-to-zero.
+ *
+ * @return the minimum number of permits this strategy tries to maintain, or {@code 0}
+ */
+ int permitMinimum();
+
+ /**
+ * Returns the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
+ *
+ * @return the maximum number of permits this strategy can grant in total, or {@link Integer#MAX_VALUE} for unbounded
+ */
+ int permitMaximum();
+
+ /**
+ * Update the strategy to indicate that N connections were discarded, potentially leaving space
+ * for N new ones to be allocated. Users MUST ensure that this method isn't called with a value greater than the
+ * number of held permits it has.
+ *
+ * Some strategy MIGHT throw an {@link IllegalArgumentException} if it can be determined the number of returned permits
+ * is not consistent with the strategy's limits and delivered permits.
+ */
+ void returnPermits(int returned);
+ }
+
/**
* Build a {@link ConnectionProvider} to cache and reuse a fixed maximum number of
* {@link Connection}. Further connections will be pending acquisition depending on
@@ -387,6 +449,7 @@ class ConnectionPoolSpec> implements Suppl
boolean metricsEnabled;
String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY;
Supplier extends ConnectionProvider.MeterRegistrar> registrar;
+ AllocationStrategy> allocationStrategy;
/**
* Returns {@link ConnectionPoolSpec} new instance with default properties.
@@ -410,6 +473,7 @@ private ConnectionPoolSpec() {
this.metricsEnabled = copy.metricsEnabled;
this.leasingStrategy = copy.leasingStrategy;
this.registrar = copy.registrar;
+ this.allocationStrategy = copy.allocationStrategy;
}
/**
@@ -428,10 +492,13 @@ public final SPEC pendingAcquireTimeout(Duration pendingAcquireTimeout) {
/**
* Set the options to use for configuring {@link ConnectionProvider} maximum connections per connection pool.
+ * This is a pre-made allocation strategy where only max connections is specified.
+ * Custom allocation strategies can be provided via {@link #allocationStrategy(AllocationStrategy)}.
* Default to {@link #DEFAULT_POOL_MAX_CONNECTIONS}.
*
* @param maxConnections the maximum number of connections (per connection pool) before start pending
* @return {@literal this}
+ * @see #allocationStrategy(AllocationStrategy)
* @throws IllegalArgumentException if maxConnections is negative
*/
public final SPEC maxConnections(int maxConnections) {
@@ -439,6 +506,7 @@ public final SPEC maxConnections(int maxConnections) {
throw new IllegalArgumentException("Max Connections value must be strictly positive");
}
this.maxConnections = maxConnections;
+ this.allocationStrategy = null;
return get();
}
@@ -580,6 +648,22 @@ public final SPEC evictInBackground(Duration evictionInterval) {
return get();
}
+ /**
+ * Limits in how many connections can be allocated and managed by the pool are driven by the
+ * provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last
+ * configured strategy, but most cases should be covered by the {@link #maxConnections()}
+ * pre-made allocation strategy.
+ *
+ * @param allocationStrategy the {@link AllocationStrategy} to use
+ * @return {@literal this}
+ * @see #maxConnections()
+ * @since 1.0.20
+ */
+ public final SPEC allocationStrategy(AllocationStrategy> allocationStrategy) {
+ this.allocationStrategy = Objects.requireNonNull(allocationStrategy, "allocationStrategy");
+ return get();
+ }
+
@Override
@SuppressWarnings("unchecked")
public SPEC get() {
diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
index 531678c3f6..d5650e8487 100644
--- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
+++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java
@@ -32,7 +32,6 @@
import reactor.netty.internal.util.Metrics;
import reactor.netty.transport.TransportConfig;
import reactor.netty.internal.util.MapUtils;
-import reactor.pool.AllocationStrategy;
import reactor.pool.InstrumentedPool;
import reactor.pool.Pool;
import reactor.pool.PoolBuilder;
@@ -91,6 +90,7 @@ public abstract class PooledConnectionProvider implements
final Duration poolInactivity;
final Duration disposeTimeout;
final Map maxConnections = new HashMap<>();
+ Mono onDispose;
protected PooledConnectionProvider(Builder builder) {
this(builder, null);
@@ -108,6 +108,7 @@ protected PooledConnectionProvider(Builder builder) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
}
+ this.onDispose = Mono.empty();
scheduleInactivePoolsDisposal();
}
@@ -197,10 +198,10 @@ public final Mono disposeLater() {
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
- return Mono.empty();
+ return onDispose;
}
channelPools.clear();
- return Mono.when(pools);
+ return onDispose.and(Mono.when(pools));
});
}
@@ -250,6 +251,10 @@ public String name() {
return name;
}
+ public void onDispose(Mono disposeMono) {
+ onDispose = onDispose.and(disposeMono);
+ }
+
protected abstract CoreSubscriber> createDisposableAcquire(
TransportConfig config,
ConnectionObserver connectionObserver,
@@ -372,6 +377,7 @@ protected static final class PoolFactory {
final Supplier extends MeterRegistrar> registrar;
final Clock clock;
final Duration disposeTimeout;
+ final AllocationStrategy> allocationStrategy;
PoolFactory(ConnectionPoolSpec> conf, Duration disposeTimeout) {
this(conf, disposeTimeout, null);
@@ -391,11 +397,12 @@ protected static final class PoolFactory {
this.registrar = conf.registrar;
this.clock = clock;
this.disposeTimeout = disposeTimeout;
+ this.allocationStrategy = conf.allocationStrategy;
}
public InstrumentedPool newPool(
Publisher allocator,
- @Nullable AllocationStrategy allocationStrategy,
+ @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function> destroyHandler,
BiPredicate evictionPredicate) {
if (disposeTimeout != null) {
@@ -407,7 +414,7 @@ public InstrumentedPool newPool(
public InstrumentedPool newPool(
Publisher allocator,
- @Nullable AllocationStrategy allocationStrategy,
+ @Nullable reactor.pool.AllocationStrategy allocationStrategy, // this is not used but kept for backwards compatibility
Function> destroyHandler,
BiPredicate evictionPredicate,
Function, InstrumentedPool> poolFactory) {
@@ -440,7 +447,12 @@ PoolBuilder> newPoolInternal(
DEFAULT_POOL_RETURN_PERMITS_SAMPLING_RATE));
}
else {
- poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
+ if (allocationStrategy == null) {
+ poolBuilder = poolBuilder.sizeBetween(0, maxConnections);
+ }
+ else {
+ poolBuilder = poolBuilder.allocationStrategy(new DelegatingAllocationStrategy(allocationStrategy.copy()));
+ }
}
if (clock != null) {
@@ -457,6 +469,15 @@ PoolBuilder> newPoolInternal(
return poolBuilder;
}
+ @Nullable
+ public AllocationStrategy> allocationStrategy() {
+ return allocationStrategy;
+ }
+
+ public long maxIdleTime() {
+ return this.maxIdleTime;
+ }
+
public long maxLifeTime() {
return maxLifeTime;
}
@@ -474,6 +495,45 @@ public String toString() {
", pendingAcquireTimeout=" + pendingAcquireTimeout +
'}';
}
+
+ static final class DelegatingAllocationStrategy implements reactor.pool.AllocationStrategy {
+
+ final AllocationStrategy> delegate;
+
+ DelegatingAllocationStrategy(AllocationStrategy> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int estimatePermitCount() {
+ return delegate.estimatePermitCount();
+ }
+
+ @Override
+ public int getPermits(int desired) {
+ return delegate.getPermits(desired);
+ }
+
+ @Override
+ public int permitGranted() {
+ return delegate.permitGranted();
+ }
+
+ @Override
+ public int permitMinimum() {
+ return delegate.permitMinimum();
+ }
+
+ @Override
+ public int permitMaximum() {
+ return delegate.permitMaximum();
+ }
+
+ @Override
+ public void returnPermits(int returned) {
+ delegate.returnPermits(returned);
+ }
+ }
}
static final class PoolKey {
diff --git a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java
index 6a37523989..64802ac61a 100644
--- a/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java
+++ b/reactor-netty-core/src/test/java/reactor/netty/resources/ConnectionProviderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 VMware, Inc. or its affiliates, All Rights Reserved.
+ * Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
class ConnectionProviderTest {
+ static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy();
static final String TEST_STRING = "";
static final Supplier TEST_SUPPLIER = () -> (a, b, c, d) -> {};
@@ -64,6 +65,9 @@ else if (Map.class == clazz) {
else if (Supplier.class == clazz) {
field.set(builder, TEST_SUPPLIER);
}
+ else if (ConnectionProvider.AllocationStrategy.class == clazz) {
+ field.set(builder, TEST_ALLOCATION_STRATEGY);
+ }
else if (boolean.class == clazz) {
field.setBoolean(builder, true);
}
@@ -74,4 +78,41 @@ else if (int.class == clazz) {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
}
+
+ static final class TestAllocationStrategy implements ConnectionProvider.AllocationStrategy {
+
+ @Override
+ public TestAllocationStrategy copy() {
+ return this;
+ }
+
+ @Override
+ public int estimatePermitCount() {
+ return 0;
+ }
+
+ @Override
+ public int getPermits(int desired) {
+ return 0;
+ }
+
+ @Override
+ public int permitGranted() {
+ return 0;
+ }
+
+ @Override
+ public int permitMinimum() {
+ return 0;
+ }
+
+ @Override
+ public int permitMaximum() {
+ return 0;
+ }
+
+ @Override
+ public void returnPermits(int returned) {
+ }
+ }
}
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java
index 801dcac824..a94268145b 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/HttpResources.java
@@ -15,6 +15,7 @@
*/
package reactor.netty.http;
+import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
@@ -147,6 +148,15 @@ public static HttpResources set(LoopResources loops) {
http2ConnectionProvider = new AtomicReference<>();
}
+ @Override
+ public void disposeWhen(SocketAddress remoteAddress) {
+ ConnectionProvider provider = http2ConnectionProvider.get();
+ if (provider != null) {
+ provider.disposeWhen(remoteAddress);
+ }
+ super.disposeWhen(remoteAddress);
+ }
+
@Override
public AddressResolverGroup> getOrCreateDefaultResolver() {
return super.getOrCreateDefaultResolver();
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
new file mode 100644
index 0000000000..2dd5c49c61
--- /dev/null
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package reactor.netty.http.client;
+
+import reactor.netty.resources.ConnectionProvider;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * HTTP/2 {@link ConnectionProvider.AllocationStrategy}.
+ *
+ * This class is based on
+ * https://github.com/reactor/reactor-pool/blob/d5cb5b72cdbcbbee8d781e06972c4da21766107f/src/main/java/reactor/pool/AllocationStrategies.java#L73
+ *
+ * @author Violeta Georgieva
+ * @since 1.0.20
+ */
+public final class Http2AllocationStrategy implements ConnectionProvider.AllocationStrategy {
+
+ public interface Builder {
+
+ /**
+ * Build a new {@link Http2AllocationStrategy}
+ *
+ * @return a new {@link Http2AllocationStrategy}
+ */
+ Http2AllocationStrategy build();
+
+ /**
+ * Configures the maximum number of the concurrent streams that can be opened to the remote peer.
+ * When evaluating how many streams can be opened to the remote peer,
+ * the minimum of this configuration and the remote peer configuration is taken (unless -1 is used).
+ * Default to {@code -1} - use always the remote peer configuration.
+ *
+ * @param maxConcurrentStreams the maximum number of the concurrent streams that can be opened to the remote peer
+ * @return {@code this}
+ */
+ Builder maxConcurrentStreams(long maxConcurrentStreams);
+
+ /**
+ * Configures the maximum number of live connections to keep in the pool.
+ * Default to {@link Integer#MAX_VALUE} - no upper limit.
+ *
+ * @param maxConnections the maximum number of live connections to keep in the pool
+ * @return {@code this}
+ */
+ Builder maxConnections(int maxConnections);
+
+ /**
+ * Configures the minimum number of live connections to keep in the pool (can be the best effort).
+ * Default to {@code 0}.
+ *
+ * @return {@code this}
+ */
+ Builder minConnections(int minConnections);
+ }
+
+ /**
+ * Creates a builder for {@link Http2AllocationStrategy}.
+ *
+ * @return a new {@link Http2AllocationStrategy.Builder}
+ */
+ public static Http2AllocationStrategy.Builder builder() {
+ return new Http2AllocationStrategy.Build();
+ }
+
+ @Override
+ public Http2AllocationStrategy copy() {
+ return new Http2AllocationStrategy(this);
+ }
+
+ @Override
+ public int estimatePermitCount() {
+ return PERMITS.get(this);
+ }
+
+ @Override
+ public int getPermits(int desired) {
+ if (desired < 0) {
+ return 0;
+ }
+
+ for (;;) {
+ int p = permits;
+ int target = Math.min(desired, p);
+
+ if (PERMITS.compareAndSet(this, p, p - target)) {
+ return target;
+ }
+ }
+ }
+
+ /**
+ * Returns the configured maximum number of the concurrent streams that can be opened to the remote peer.
+ *
+ * @return the configured maximum number of the concurrent streams that can be opened to the remote peer
+ */
+ public long maxConcurrentStreams() {
+ return maxConcurrentStreams;
+ }
+
+ @Override
+ public int permitGranted() {
+ return maxConnections - PERMITS.get(this);
+ }
+
+ @Override
+ public int permitMinimum() {
+ return minConnections;
+ }
+
+ @Override
+ public int permitMaximum() {
+ return maxConnections;
+ }
+
+ @Override
+ public void returnPermits(int returned) {
+ for (;;) {
+ int p = PERMITS.get(this);
+ if (p + returned > maxConnections) {
+ throw new IllegalArgumentException("Too many permits returned: returned=" + returned +
+ ", would bring to " + (p + returned) + "/" + maxConnections);
+ }
+ if (PERMITS.compareAndSet(this, p, p + returned)) {
+ return;
+ }
+ }
+ }
+
+ final long maxConcurrentStreams;
+ final int maxConnections;
+ final int minConnections;
+
+ volatile int permits;
+ static final AtomicIntegerFieldUpdater PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
+
+ Http2AllocationStrategy(Build build) {
+ this.maxConcurrentStreams = build.maxConcurrentStreams;
+ this.maxConnections = build.maxConnections;
+ this.minConnections = build.minConnections;
+ PERMITS.lazySet(this, this.maxConnections);
+ }
+
+ Http2AllocationStrategy(Http2AllocationStrategy copy) {
+ this.maxConcurrentStreams = copy.maxConcurrentStreams;
+ this.maxConnections = copy.maxConnections;
+ this.minConnections = copy.minConnections;
+ PERMITS.lazySet(this, this.maxConnections);
+ }
+
+ static final class Build implements Builder {
+ static final long DEFAULT_MAX_CONCURRENT_STREAMS = -1;
+ static final int DEFAULT_MAX_CONNECTIONS = Integer.MAX_VALUE;
+ static final int DEFAULT_MIN_CONNECTIONS = 0;
+
+ long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
+ int maxConnections = DEFAULT_MAX_CONNECTIONS;
+ int minConnections = DEFAULT_MIN_CONNECTIONS;
+
+ @Override
+ public Http2AllocationStrategy build() {
+ if (minConnections > maxConnections) {
+ throw new IllegalArgumentException("minConnections (" + minConnections + ")" +
+ " must be less than or equal to maxConnections (" + maxConnections + ")");
+ }
+ return new Http2AllocationStrategy(this);
+ }
+
+ @Override
+ public Builder maxConcurrentStreams(long maxConcurrentStreams) {
+ if (maxConcurrentStreams < -1) {
+ throw new IllegalArgumentException("maxConcurrentStreams must be greater than or equal to -1");
+ }
+ this.maxConcurrentStreams = maxConcurrentStreams;
+ return this;
+ }
+
+ @Override
+ public Builder maxConnections(int maxConnections) {
+ if (maxConnections < 1) {
+ throw new IllegalArgumentException("maxConnections must be strictly positive");
+ }
+ this.maxConnections = maxConnections;
+ return this;
+ }
+
+ @Override
+ public Builder minConnections(int minConnections) {
+ if (minConnections < 0) {
+ throw new IllegalArgumentException("minConnections must be positive or zero");
+ }
+ this.minConnections = minConnections;
+ return this;
+ }
+ }
+}
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
index 0b2e24ad0a..1d6786afe6 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java
@@ -15,14 +15,15 @@
*/
package reactor.netty.http.client;
+import io.micrometer.contextpropagation.ContextContainer;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.handler.ssl.ApplicationProtocolNames;
-import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
@@ -37,7 +38,6 @@
import reactor.core.publisher.Operators;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
-import reactor.netty.NettyPipeline;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.resources.ConnectionProvider;
@@ -76,6 +76,9 @@ final class Http2ConnectionProvider extends PooledConnectionProvider
Http2ConnectionProvider(ConnectionProvider parent) {
super(initConfiguration(parent));
this.parent = parent;
+ if (parent instanceof PooledConnectionProvider) {
+ ((PooledConnectionProvider>) parent).onDispose(disposeLater());
+ }
}
static Builder initConfiguration(ConnectionProvider parent) {
@@ -304,8 +307,7 @@ else if (p.state != null) {
return;
}
- HttpClientConfig.openStream(channel, this, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue)
- .addListener(this);
+ http2StreamChannelBootstrap(channel).open().addListener(this);
}
@Override
@@ -337,11 +339,12 @@ public void onUncaughtException(Connection connection, Throwable error) {
@Override
public void operationComplete(Future future) {
Channel channel = pooledRef.poolable().channel();
- Http2FrameCodec frameCodec = channel.pipeline().get(Http2FrameCodec.class);
+ ChannelHandlerContext frameCodec = ((Http2Pool.Http2PooledRef) pooledRef).slot.http2FrameCodecCtx();
if (future.isSuccess()) {
Http2StreamChannel ch = future.getNow();
- if (!channel.isActive() || frameCodec == null || !frameCodec.connection().local().canOpenStream()) {
+ if (!channel.isActive() || frameCodec == null ||
+ !((Http2FrameCodec) frameCodec.handler()).connection().local().canOpenStream()) {
invalidate(this);
if (!retried) {
if (log.isDebugEnabled()) {
@@ -357,14 +360,20 @@ public void operationComplete(Future future) {
}
}
else {
+ Http2ConnectionProvider.registerClose(ch, this);
+ ContextContainer container = ContextContainer.restore(propagationContext);
+ container.save(ch);
+ HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())),
+ opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue);
+
ChannelOperations, ?> ops = ChannelOperations.get(ch);
if (ops != null) {
obs.onStateChange(ops, STREAM_CONFIGURED);
sink.success(ops);
}
- Http2Connection.Endpoint localEndpoint = frameCodec.connection().local();
if (log.isDebugEnabled()) {
+ Http2Connection.Endpoint localEndpoint = ((Http2FrameCodec) frameCodec.handler()).connection().local();
logStreamsState(ch, localEndpoint, "Stream opened");
}
}
@@ -377,8 +386,8 @@ public void operationComplete(Future future) {
boolean isH2cUpgrade() {
Channel channel = pooledRef.poolable().channel();
- if (channel.pipeline().get(NettyPipeline.H2CUpgradeHandler) != null &&
- channel.pipeline().get(NettyPipeline.H2MultiplexHandler) == null) {
+ if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() != null &&
+ ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) {
ChannelOperations, ?> ops = ChannelOperations.get(channel);
if (ops != null) {
sink.success(ops);
@@ -390,11 +399,9 @@ boolean isH2cUpgrade() {
boolean notHttp2() {
Channel channel = pooledRef.poolable().channel();
- ChannelPipeline pipeline = channel.pipeline();
- SslHandler handler = pipeline.get(SslHandler.class);
- if (handler != null) {
- String protocol = handler.applicationProtocol() != null ? handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1;
- if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
+ String applicationProtocol = ((Http2Pool.Http2PooledRef) pooledRef).slot.applicationProtocol;
+ if (applicationProtocol != null) {
+ if (ApplicationProtocolNames.HTTP_1_1.equals(applicationProtocol)) {
// No information for the negotiated application-level protocol,
// or it is HTTP/1.1, continue as an HTTP/1.1 request
// and remove the connection from this pool.
@@ -405,15 +412,15 @@ boolean notHttp2() {
return true;
}
}
- else if (!ApplicationProtocolNames.HTTP_2.equals(handler.applicationProtocol())) {
+ else if (!ApplicationProtocolNames.HTTP_2.equals(applicationProtocol)) {
channel.attr(OWNER).set(null);
invalidate(this);
- sink.error(new IOException("Unknown protocol [" + protocol + "]."));
+ sink.error(new IOException("Unknown protocol [" + applicationProtocol + "]."));
return true;
}
}
- else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null &&
- pipeline.get(NettyPipeline.H2MultiplexHandler) == null) {
+ else if (((Http2Pool.Http2PooledRef) pooledRef).slot.h2cUpgradeHandlerCtx() == null &&
+ ((Http2Pool.Http2PooledRef) pooledRef).slot.http2MultiplexHandlerCtx() == null) {
// It is not H2. There are no handlers for H2C upgrade/H2C prior-knowledge,
// continue as an HTTP/1.1 request and remove the connection from this pool.
ChannelOperations, ?> ops = ChannelOperations.get(channel);
@@ -425,6 +432,27 @@ else if (pipeline.get(NettyPipeline.H2CUpgradeHandler) == null &&
}
return false;
}
+
+ static final AttributeKey HTTP2_STREAM_CHANNEL_BOOTSTRAP =
+ AttributeKey.valueOf("http2StreamChannelBootstrap");
+
+ static Http2StreamChannelBootstrap http2StreamChannelBootstrap(Channel channel) {
+ Http2StreamChannelBootstrap http2StreamChannelBootstrap;
+
+ for (;;) {
+ http2StreamChannelBootstrap = channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP).get();
+ if (http2StreamChannelBootstrap == null) {
+ http2StreamChannelBootstrap = new Http2StreamChannelBootstrap(channel);
+ }
+ else {
+ return http2StreamChannelBootstrap;
+ }
+ if (channel.attr(HTTP2_STREAM_CHANNEL_BOOTSTRAP)
+ .compareAndSet(null, http2StreamChannelBootstrap)) {
+ return http2StreamChannelBootstrap;
+ }
+ }
+ }
}
static final class PendingConnectionObserver implements ConnectionObserver {
@@ -471,7 +499,7 @@ static final class PooledConnectionAllocator {
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
- poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime()));
+ poolConFig -> new Http2Pool(poolConFig, poolFactory.allocationStrategy(), poolFactory.maxIdleTime(), poolFactory.maxLifeTime()));
}
Publisher connectChannel() {
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
index 74a5e7d179..d0ad16661b 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java
@@ -27,6 +27,26 @@
*/
enum Http2ConnectionProviderMeters implements DocumentedMeter {
+ /**
+ * The number of the connections in the connection pool that have been successfully acquired and are in active use.
+ */
+ ACTIVE_CONNECTIONS {
+ @Override
+ public String getName() {
+ return "reactor.netty.connection.provider.active.connections";
+ }
+
+ @Override
+ public KeyName[] getKeyNames() {
+ return Http2ConnectionProviderMeters.Http2ConnectionProviderMetersTags.values();
+ }
+
+ @Override
+ public Meter.Type getType() {
+ return Meter.Type.GAUGE;
+ }
+ },
+
/**
* The number of the active HTTP/2 streams.
*/
@@ -47,6 +67,26 @@ public Meter.Type getType() {
}
},
+ /**
+ * The number of the idle connections in the connection pool.
+ */
+ IDLE_CONNECTIONS {
+ @Override
+ public String getName() {
+ return "reactor.netty.connection.provider.idle.connections";
+ }
+
+ @Override
+ public KeyName[] getKeyNames() {
+ return Http2ConnectionProviderMeters.Http2ConnectionProviderMetersTags.values();
+ }
+
+ @Override
+ public Meter.Type getType() {
+ return Meter.Type.GAUGE;
+ }
+ },
+
/**
* The number of requests that are waiting for opening HTTP/2 stream.
*/
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
index a86dd87f3b..11f5b639b4 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java
@@ -17,16 +17,23 @@
import java.time.Clock;
import java.time.Duration;
+import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Function;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.SslHandler;
+import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
@@ -36,8 +43,8 @@
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
-import reactor.netty.ConnectionObserver;
-import reactor.netty.channel.ChannelOperations;
+import reactor.netty.FutureMono;
+import reactor.netty.NettyPipeline;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException;
@@ -45,6 +52,7 @@
import reactor.netty.internal.shaded.reactor.pool.PoolShutdownException;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import reactor.netty.internal.shaded.reactor.pool.PooledRefMetadata;
+import reactor.netty.resources.ConnectionProvider;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
@@ -59,7 +67,7 @@
*
* - The connection is closed.
* - The connection has reached its life time and there are no active streams.
- * - The connection has no active streams.
+ * - The connection has reached its idle time and there are no active streams.
* - When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1,
* and the negotiated protocol is HTTP/1.1.
*
@@ -75,17 +83,18 @@
*
* This pool always invalidate the {@link PooledRef}, there is no release functionality.
*
- * - {@link PoolMetrics#acquiredSize()} and {@link PoolMetrics#allocatedSize()} always return the number of
- * the active streams from all connections currently in the pool.
- * - {@link PoolMetrics#idleSize()} always returns {@code 0}.
+ * - {@link PoolMetrics#acquiredSize()}, {@link PoolMetrics#allocatedSize()} and {@link PoolMetrics#idleSize()}
+ * always return the number of the cached connections.
+ * - {@link Http2Pool#activeStreams()} always return the active streams from all connections currently in the pool.
*
*
+ * If minimum connections is specified, the cached connections with active streams will be kept at that minimum
+ * (can be the best effort). However, if the cached connections have reached max concurrent streams,
+ * then new connections will be allocated up to the maximum connections limit.
+ *
* Configurations that are not applicable
*
* - {@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.
- * - {@link PoolConfig#evictInBackgroundInterval()} and {@link PoolConfig#evictInBackgroundScheduler()} -
- * there are no idle resources in the pool. Once the connection does not have active streams, it
- * is returned to the parent pool.
* - {@link PoolConfig#evictionPredicate()} - the eviction predicate cannot be used as more complex
* checks have to be done. Also the pool uses filtering for the connections (a connection might not be able
* to be used but is required to stay in the pool).
@@ -94,7 +103,6 @@
* - {@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.
* - FIFO is used when obtaining the pending borrowers
* - Warm up functionality is not supported
- * - Setting minimum connections configuration is not supported
*
* This class is based on
* https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java
@@ -114,6 +122,10 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool.
static final AtomicReferenceFieldUpdater CONNECTIONS =
AtomicReferenceFieldUpdater.newUpdater(Http2Pool.class, ConcurrentLinkedQueue.class, "connections");
+ volatile int idleSize;
+ private static final AtomicIntegerFieldUpdater IDLE_SIZE =
+ AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "idleSize");
+
/**
* Pending borrowers queue. Never invoke directly the poll/add/remove methods and instead of that,
* use addPending/pollPending/removePending methods which take care of maintaining the pending queue size.
@@ -135,28 +147,41 @@ final class Http2Pool implements InstrumentedPool, InstrumentedPool.
@SuppressWarnings("rawtypes")
static final ConcurrentLinkedDeque TERMINATED = new ConcurrentLinkedDeque();
+ volatile long totalMaxConcurrentStreams;
+ static final AtomicLongFieldUpdater TOTAL_MAX_CONCURRENT_STREAMS =
+ AtomicLongFieldUpdater.newUpdater(Http2Pool.class, "totalMaxConcurrentStreams");
+
volatile int wip;
static final AtomicIntegerFieldUpdater WIP =
AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip");
final Clock clock;
+ final Long maxConcurrentStreams;
+ final long maxIdleTime;
final long maxLifeTime;
+ final int minConnections;
final PoolConfig poolConfig;
long lastInteractionTimestamp;
- Http2Pool(PoolConfig poolConfig, long maxLifeTime) {
- if (poolConfig.allocationStrategy().getPermits(0) != 0) {
- throw new IllegalArgumentException("No support for configuring minimum number of connections");
- }
+ Disposable evictionTask;
+
+ Http2Pool(PoolConfig poolConfig, @Nullable ConnectionProvider.AllocationStrategy> allocationStrategy,
+ long maxIdleTime, long maxLifeTime) {
this.clock = poolConfig.clock();
this.connections = new ConcurrentLinkedQueue<>();
this.lastInteractionTimestamp = clock.millis();
+ this.maxConcurrentStreams = allocationStrategy instanceof Http2AllocationStrategy ?
+ ((Http2AllocationStrategy) allocationStrategy).maxConcurrentStreams() : -1;
+ this.maxIdleTime = maxIdleTime;
this.maxLifeTime = maxLifeTime;
+ this.minConnections = allocationStrategy == null ? 0 : allocationStrategy.permitMinimum();
this.pending = new ConcurrentLinkedDeque<>();
this.poolConfig = poolConfig;
recordInteractionTimestamp();
+
+ scheduleEviction();
}
@Override
@@ -171,12 +196,12 @@ public Mono> acquire(Duration timeout) {
@Override
public int acquiredSize() {
- return acquired;
+ return allocatedSize() - idleSize();
}
@Override
public int allocatedSize() {
- return acquired;
+ return poolConfig.allocationStrategy().permitGranted();
}
@Override
@@ -192,15 +217,26 @@ public Mono disposeLater() {
@SuppressWarnings("unchecked")
ConcurrentLinkedDeque q = PENDING.getAndSet(this, TERMINATED);
if (q != TERMINATED) {
+ evictionTask.dispose();
+
Borrower p;
while ((p = pollPending(q, true)) != null) {
p.fail(new PoolShutdownException());
}
- // the last stream on that connection will release the connection to the parent pool
- // the structure should not contain connections with 0 streams as the last stream on that connection
- // always removes the connection from this pool
- CONNECTIONS.getAndSet(this, null);
+ @SuppressWarnings("unchecked")
+ ConcurrentLinkedQueue slots = CONNECTIONS.getAndSet(this, null);
+ if (slots != null) {
+ Mono closeMonos = Mono.empty();
+ while (!slots.isEmpty()) {
+ Slot slot = pollSlot(slots);
+ if (slot != null) {
+ slot.invalidate();
+ closeMonos = closeMonos.and(DEFAULT_DESTROY_HANDLER.apply(slot.connection));
+ }
+ }
+ return closeMonos;
+ }
}
return Mono.empty();
});
@@ -218,7 +254,7 @@ public int getMaxPendingAcquireSize() {
@Override
public int idleSize() {
- return 0;
+ return idleSize;
}
@Override
@@ -253,6 +289,10 @@ public Mono warmup() {
return Mono.just(0);
}
+ int activeStreams() {
+ return acquired;
+ }
+
void cancelAcquire(Borrower borrower) {
if (!isDisposed()) {
ConcurrentLinkedDeque q = pending;
@@ -260,15 +300,32 @@ void cancelAcquire(Borrower borrower) {
}
}
+ @SuppressWarnings("FutureReturnValueIgnored")
Mono destroyPoolable(Http2PooledRef ref) {
+ assert ref.slot.connection.channel().eventLoop().inEventLoop();
Mono mono = Mono.empty();
try {
+ // By default, check the connection for removal on acquire and invalidate (only if there are no active streams)
if (ref.slot.decrementConcurrencyAndGet() == 0) {
- ref.slot.invalidate();
- Connection connection = ref.poolable();
- Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class);
- if (frameCodec != null) {
- releaseConnection(connection);
+ // not HTTP/2 request
+ if (ref.slot.http2FrameCodecCtx() == null) {
+ ref.slot.invalidate();
+ removeSlot(ref.slot);
+ }
+ // If there is eviction in background, the background process will remove this connection
+ else if (poolConfig.evictInBackgroundInterval().isZero()) {
+ // not active
+ if (!ref.poolable().channel().isActive()) {
+ ref.slot.invalidate();
+ removeSlot(ref.slot);
+ }
+ // max life reached
+ else if (maxLifeReached(ref.slot)) {
+ //"FutureReturnValueIgnored" this is deliberate
+ ref.slot.connection.channel().close();
+ ref.slot.invalidate();
+ removeSlot(ref.slot);
+ }
}
}
}
@@ -311,81 +368,89 @@ void drainLoop() {
if (borrowersCount != 0) {
// find a connection that can be used for opening a new stream
- Slot slot = findConnection(resources);
+ // when cached connections are below minimum connections, then allocate a new connection
+ boolean belowMinConnections = minConnections > 0 &&
+ poolConfig.allocationStrategy().permitGranted() < minConnections;
+ Slot slot = belowMinConnections ? null : findConnection(resources);
if (slot != null) {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
- resources.offer(slot);
+ offerSlot(resources, slot);
continue;
}
if (isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
}
- if (slot.incrementConcurrencyAndGet() > 1) {
- borrower.stopPendingCountdown();
- if (log.isDebugEnabled()) {
- log.debug(format(slot.connection.channel(), "Channel activated"));
- }
- ACQUIRED.incrementAndGet(this);
- // we are ready here, the connection can be used for opening another stream
- slot.deactivate();
- poolConfig.acquisitionScheduler().schedule(() -> borrower.deliver(new Http2PooledRef(slot)));
- }
- else {
- addPending(borrowers, borrower, true);
- continue;
+ borrower.stopPendingCountdown();
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Channel activated"));
}
+ ACQUIRED.incrementAndGet(this);
+ slot.connection.channel().eventLoop().execute(() -> {
+ borrower.deliver(new Http2PooledRef(slot));
+ drain();
+ });
}
else {
- int permits = poolConfig.allocationStrategy().getPermits(1);
- if (permits <= 0) {
- if (maxPending >= 0) {
- borrowersCount = pendingSize;
- int toCull = borrowersCount - maxPending;
- for (int i = 0; i < toCull; i++) {
- Borrower extraneous = pollPending(borrowers, true);
- if (extraneous != null) {
- pendingAcquireLimitReached(extraneous, maxPending);
- }
- }
- }
+ int resourcesCount = idleSize;
+ if (minConnections > 0 &&
+ poolConfig.allocationStrategy().permitGranted() >= minConnections &&
+ resourcesCount == 0) {
+ // connections allocations were triggered
}
else {
- Borrower borrower = pollPending(borrowers, true);
- if (borrower == null) {
- continue;
+ int permits = poolConfig.allocationStrategy().getPermits(1);
+ if (permits <= 0) {
+ if (maxPending >= 0) {
+ borrowersCount = pendingSize;
+ int toCull = borrowersCount - maxPending;
+ for (int i = 0; i < toCull; i++) {
+ Borrower extraneous = pollPending(borrowers, true);
+ if (extraneous != null) {
+ pendingAcquireLimitReached(extraneous, maxPending);
+ }
+ }
+ }
}
- if (isDisposed()) {
- borrower.fail(new PoolShutdownException());
- return;
+ else {
+ if (permits > 1) {
+ // warmup is not supported
+ poolConfig.allocationStrategy().returnPermits(permits - 1);
+ }
+ Borrower borrower = pollPending(borrowers, true);
+ if (borrower == null) {
+ continue;
+ }
+ if (isDisposed()) {
+ borrower.fail(new PoolShutdownException());
+ return;
+ }
+ borrower.stopPendingCountdown();
+ Mono allocator = poolConfig.allocator();
+ Mono primary =
+ allocator.doOnEach(sig -> {
+ if (sig.isOnNext()) {
+ Connection newInstance = sig.get();
+ assert newInstance != null;
+ Slot newSlot = new Slot(this, newInstance);
+ if (log.isDebugEnabled()) {
+ log.debug(format(newInstance.channel(), "Channel activated"));
+ }
+ ACQUIRED.incrementAndGet(this);
+ borrower.deliver(new Http2PooledRef(newSlot));
+ }
+ else if (sig.isOnError()) {
+ Throwable error = sig.getThrowable();
+ assert error != null;
+ poolConfig.allocationStrategy().returnPermits(1);
+ borrower.fail(error);
+ }
+ })
+ .contextWrite(borrower.currentContext());
+
+ primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
- borrower.stopPendingCountdown();
- Mono allocator = poolConfig.allocator();
- Mono primary =
- allocator.doOnEach(sig -> {
- if (sig.isOnNext()) {
- Connection newInstance = sig.get();
- assert newInstance != null;
- Slot newSlot = new Slot(this, newInstance);
- if (log.isDebugEnabled()) {
- log.debug(format(newInstance.channel(), "Channel activated"));
- }
- ACQUIRED.incrementAndGet(this);
- newSlot.incrementConcurrencyAndGet();
- newSlot.deactivate();
- borrower.deliver(new Http2PooledRef(newSlot));
- }
- else if (sig.isOnError()) {
- Throwable error = sig.getThrowable();
- assert error != null;
- poolConfig.allocationStrategy().returnPermits(1);
- borrower.fail(error);
- }
- })
- .contextWrite(borrower.currentContext());
-
- primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
}
}
@@ -397,16 +462,78 @@ else if (sig.isOnError()) {
}
}
+ @SuppressWarnings("FutureReturnValueIgnored")
+ void evictInBackground() {
+ @SuppressWarnings("unchecked")
+ ConcurrentLinkedQueue resources = CONNECTIONS.get(this);
+ if (resources == null) {
+ //no need to schedule the task again, pool has been disposed
+ return;
+ }
+
+ if (WIP.getAndIncrement(this) == 0) {
+ if (pendingSize == 0) {
+ Iterator slots = resources.iterator();
+ while (slots.hasNext()) {
+ Slot slot = slots.next();
+ if (slot.concurrency() == 0) {
+ if (!slot.connection.channel().isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool"));
+ }
+ recordInteractionTimestamp();
+ slots.remove();
+ IDLE_SIZE.decrementAndGet(this);
+ slot.invalidate();
+ continue;
+ }
+
+ if (maxLifeReached(slot)) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool"));
+ }
+ //"FutureReturnValueIgnored" this is deliberate
+ slot.connection.channel().close();
+ recordInteractionTimestamp();
+ slots.remove();
+ IDLE_SIZE.decrementAndGet(this);
+ slot.invalidate();
+ continue;
+ }
+ }
+ if (maxIdleReached(slot)) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool"));
+ }
+ //"FutureReturnValueIgnored" this is deliberate
+ slot.connection.channel().close();
+ recordInteractionTimestamp();
+ slots.remove();
+ IDLE_SIZE.decrementAndGet(this);
+ slot.invalidate();
+ }
+ }
+ }
+ //at the end if there are racing drain calls, go into the drainLoop
+ if (WIP.decrementAndGet(this) > 0) {
+ drainLoop();
+ }
+ }
+ //schedule the next iteration
+ scheduleEviction();
+ }
+
@Nullable
+ @SuppressWarnings("FutureReturnValueIgnored")
Slot findConnection(ConcurrentLinkedQueue resources) {
- int resourcesCount = resources.size();
+ int resourcesCount = idleSize;
while (resourcesCount > 0) {
// There are connections in the queue
resourcesCount--;
// get the connection
- Slot slot = resources.poll();
+ Slot slot = pollSlot(resources);
if (slot == null) {
continue;
}
@@ -418,38 +545,51 @@ Slot findConnection(ConcurrentLinkedQueue resources) {
log.debug(format(slot.connection.channel(), "Channel is closed, {} active streams"),
slot.concurrency());
}
- resources.offer(slot);
+ offerSlot(resources, slot);
}
else {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Channel is closed, remove from pool"));
}
- resources.remove(slot);
+ slot.invalidate();
}
continue;
}
- // check that the connection's max lifetime has not been reached
- if (maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime) {
+ // check whether the connection's idle time has been reached
+ if (maxIdleReached(slot)) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool"));
+ }
+ //"FutureReturnValueIgnored" this is deliberate
+ slot.connection.channel().close();
+ slot.invalidate();
+ continue;
+ }
+
+ // check whether the connection's max lifetime has been reached
+ if (maxLifeReached(slot)) {
if (slot.concurrency() > 0) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Max life time is reached, {} active streams"),
slot.concurrency());
}
- resources.offer(slot);
+ offerSlot(resources, slot);
}
else {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Max life time is reached, remove from pool"));
}
- resources.remove(slot);
+ //"FutureReturnValueIgnored" this is deliberate
+ slot.connection.channel().close();
+ slot.invalidate();
}
continue;
}
// check that the connection's max active streams has not been reached
if (!slot.canOpenStream()) {
- resources.offer(slot);
+ offerSlot(resources, slot);
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Max active streams is reached"));
}
@@ -462,6 +602,14 @@ Slot findConnection(ConcurrentLinkedQueue resources) {
return null;
}
+ boolean maxIdleReached(Slot slot) {
+ return maxIdleTime != -1 && slot.idleTime() >= maxIdleTime;
+ }
+
+ boolean maxLifeReached(Slot slot) {
+ return maxLifeTime != -1 && slot.lifeTime() >= maxLifeTime;
+ }
+
void pendingAcquireLimitReached(Borrower borrower, int maxPending) {
if (maxPending == 0) {
borrower.fail(new PoolAcquirePendingLimitException(0,
@@ -530,33 +678,51 @@ int addPending(ConcurrentLinkedDeque borrowers, Borrower borrower, boo
return PENDING_SIZE.incrementAndGet(this);
}
- static boolean offerSlot(Slot slot) {
- @SuppressWarnings("unchecked")
- ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool);
- return q != null && q.offer(slot);
+ void offerSlot(@Nullable ConcurrentLinkedQueue slots, Slot slot) {
+ if (slots != null && slots.offer(slot)) {
+ IDLE_SIZE.incrementAndGet(this);
+ }
}
- static void releaseConnection(Connection connection) {
- ChannelOperations, ?> ops = connection.as(ChannelOperations.class);
- if (ops != null) {
- ops.listener().onStateChange(ops, ConnectionObserver.State.DISCONNECTING);
- }
- else if (connection instanceof ConnectionObserver) {
- ((ConnectionObserver) connection).onStateChange(connection, ConnectionObserver.State.DISCONNECTING);
+ @Nullable
+ Slot pollSlot(@Nullable ConcurrentLinkedQueue slots) {
+ if (slots == null) {
+ return null;
}
- else {
- connection.dispose();
+ Slot slot = slots.poll();
+ if (slot != null) {
+ IDLE_SIZE.decrementAndGet(this);
}
+ return slot;
}
- static void removeSlot(Slot slot) {
+ void removeSlot(Slot slot) {
@SuppressWarnings("unchecked")
ConcurrentLinkedQueue q = CONNECTIONS.get(slot.pool);
- if (q != null) {
- q.remove(slot);
+ if (q != null && q.remove(slot)) {
+ IDLE_SIZE.decrementAndGet(this);
}
}
+ void scheduleEviction() {
+ if (!poolConfig.evictInBackgroundInterval().isZero()) {
+ long nanosEvictionInterval = poolConfig.evictInBackgroundInterval().toNanos();
+ this.evictionTask = poolConfig.evictInBackgroundScheduler()
+ .schedule(this::evictInBackground, nanosEvictionInterval, TimeUnit.NANOSECONDS);
+ }
+ else {
+ this.evictionTask = Disposables.disposed();
+ }
+ }
+
+ static final Function> DEFAULT_DESTROY_HANDLER =
+ connection -> {
+ if (!connection.channel().isActive()) {
+ return Mono.empty();
+ }
+ return FutureMono.from(connection.channel().close());
+ };
+
static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable {
static final Disposable TIMEOUT_DISPOSED = Disposables.disposed();
@@ -589,7 +755,10 @@ Context currentContext() {
@Override
public void request(long n) {
if (Operators.validate(n)) {
- if (!acquireTimeout.isZero()) {
+ long estimateStreamsCount = pool.totalMaxConcurrentStreams - pool.acquired;
+ int permits = pool.poolConfig.allocationStrategy().estimatePermitCount();
+ int pending = pool.pendingSize;
+ if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) {
timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
pool.doAcquire(this);
@@ -627,7 +796,9 @@ public String toString() {
}
void deliver(Http2PooledRef poolSlot) {
- stopPendingCountdown();
+ assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
+ poolSlot.slot.incrementConcurrencyAndGet();
+ poolSlot.slot.deactivate();
if (get()) {
//CANCELLED or timeout reached
poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty()));
@@ -737,7 +908,7 @@ public String toString() {
}
}
- static final class Slot {
+ static final class Slot extends AtomicBoolean {
volatile int concurrency;
static final AtomicIntegerFieldUpdater CONCURRENCY =
@@ -746,18 +917,47 @@ static final class Slot {
final Connection connection;
final long creationTimestamp;
final Http2Pool pool;
+ final String applicationProtocol;
+
+ long idleTimestamp;
+ long maxConcurrentStreams;
+
+ volatile ChannelHandlerContext http2FrameCodecCtx;
+ volatile ChannelHandlerContext http2MultiplexHandlerCtx;
+ volatile ChannelHandlerContext h2cUpgradeHandlerCtx;
Slot(Http2Pool pool, Connection connection) {
this.connection = connection;
this.creationTimestamp = pool.clock.millis();
this.pool = pool;
+ SslHandler handler = connection.channel().pipeline().get(SslHandler.class);
+ if (handler != null) {
+ this.applicationProtocol = handler.applicationProtocol() != null ?
+ handler.applicationProtocol() : ApplicationProtocolNames.HTTP_1_1;
+ }
+ else {
+ this.applicationProtocol = null;
+ }
+ ChannelHandlerContext frameCodec = http2FrameCodecCtx();
+ if (frameCodec != null && http2MultiplexHandlerCtx() != null) {
+ this.maxConcurrentStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams();
+ this.maxConcurrentStreams = pool.maxConcurrentStreams == -1 ? maxConcurrentStreams :
+ Math.min(pool.maxConcurrentStreams, maxConcurrentStreams);
+ }
+ TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, this.maxConcurrentStreams);
}
boolean canOpenStream() {
- Http2FrameCodec frameCodec = connection.channel().pipeline().get(Http2FrameCodec.class);
- Http2MultiplexHandler multiplexHandler = connection.channel().pipeline().get(Http2MultiplexHandler.class);
- if (frameCodec != null && multiplexHandler != null) {
- int maxActiveStreams = frameCodec.connection().local().maxActiveStreams();
+ ChannelHandlerContext frameCodec = http2FrameCodecCtx();
+ if (frameCodec != null && http2MultiplexHandlerCtx() != null) {
+ long maxActiveStreams = ((Http2FrameCodec) frameCodec.handler()).connection().local().maxActiveStreams();
+ maxActiveStreams = pool.maxConcurrentStreams == -1 ? maxActiveStreams :
+ Math.min(pool.maxConcurrentStreams, maxActiveStreams);
+ long diff = maxActiveStreams - maxConcurrentStreams;
+ if (diff != 0) {
+ maxConcurrentStreams = maxActiveStreams;
+ TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, diff);
+ }
int concurrency = this.concurrency;
return concurrency < maxActiveStreams;
}
@@ -772,23 +972,70 @@ void deactivate() {
if (log.isDebugEnabled()) {
log.debug(format(connection.channel(), "Channel deactivated"));
}
- offerSlot(this);
+ @SuppressWarnings("unchecked")
+ ConcurrentLinkedQueue slots = CONNECTIONS.get(pool);
+ pool.offerSlot(slots, this);
}
int decrementConcurrencyAndGet() {
- return CONCURRENCY.decrementAndGet(this);
+ int concurrency = CONCURRENCY.decrementAndGet(this);
+ idleTimestamp = pool.clock.millis();
+ return concurrency;
}
- int incrementConcurrencyAndGet() {
- return CONCURRENCY.incrementAndGet(this);
+ long idleTime() {
+ if (concurrency() > 0) {
+ return 0L;
+ }
+ long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp;
+ return pool.clock.millis() - idleTime;
+ }
+
+ @Nullable
+ ChannelHandlerContext http2FrameCodecCtx() {
+ ChannelHandlerContext ctx = http2FrameCodecCtx;
+ if (ctx != null && !ctx.isRemoved()) {
+ return ctx;
+ }
+ ctx = connection.channel().pipeline().context(Http2FrameCodec.class);
+ http2FrameCodecCtx = ctx;
+ return ctx;
+ }
+
+ @Nullable
+ ChannelHandlerContext http2MultiplexHandlerCtx() {
+ ChannelHandlerContext ctx = http2MultiplexHandlerCtx;
+ if (ctx != null && !ctx.isRemoved()) {
+ return ctx;
+ }
+ ctx = connection.channel().pipeline().context(Http2MultiplexHandler.class);
+ http2MultiplexHandlerCtx = ctx;
+ return ctx;
+ }
+
+ @Nullable
+ ChannelHandlerContext h2cUpgradeHandlerCtx() {
+ ChannelHandlerContext ctx = h2cUpgradeHandlerCtx;
+ if (ctx != null && !ctx.isRemoved()) {
+ return ctx;
+ }
+ ctx = connection.channel().pipeline().context(NettyPipeline.H2CUpgradeHandler);
+ h2cUpgradeHandlerCtx = ctx;
+ return ctx;
+ }
+
+ void incrementConcurrencyAndGet() {
+ CONCURRENCY.incrementAndGet(this);
}
void invalidate() {
- if (log.isDebugEnabled()) {
- log.debug(format(connection.channel(), "Channel removed from pool"));
+ if (compareAndSet(false, true)) {
+ if (log.isDebugEnabled()) {
+ log.debug(format(connection.channel(), "Channel removed from pool"));
+ }
+ pool.poolConfig.allocationStrategy().returnPermits(1);
+ TOTAL_MAX_CONCURRENT_STREAMS.addAndGet(this.pool, -maxConcurrentStreams);
}
- pool.poolConfig.allocationStrategy().returnPermits(1);
- removeSlot(this);
}
long lifeTime() {
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java
index a432f55d41..f1808b573a 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2StreamBridgeClientHandler.java
@@ -17,15 +17,11 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
-import reactor.netty.Connection;
-import reactor.netty.ConnectionObserver;
-import reactor.netty.channel.ChannelOperations;
-
-import static reactor.netty.ReactorNetty.format;
/**
* This handler is intended to work together with {@link Http2StreamFrameToHttpObjectCodec}
@@ -35,33 +31,14 @@
* @author Violeta Georgieva
* @since 1.0.0
*/
+@ChannelHandler.Sharable
final class Http2StreamBridgeClientHandler extends ChannelDuplexHandler {
- final ConnectionObserver observer;
- final ChannelOperations.OnSetup opsFactory;
-
- Http2StreamBridgeClientHandler(ConnectionObserver listener, ChannelOperations.OnSetup opsFactory) {
- this.observer = listener;
- this.opsFactory = opsFactory;
- }
-
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) {
- if (HttpClientOperations.log.isDebugEnabled()) {
- HttpClientOperations.log.debug(format(ctx.channel(), "New HTTP/2 stream"));
- }
-
- ChannelOperations, ?> ops = opsFactory.create(Connection.from(ctx.channel()), observer, null);
- if (ops != null) {
- ops.bind();
- }
- }
-
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java
index 717e6e5dfa..a54f1fa89f 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java
@@ -53,16 +53,14 @@
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
-import io.netty.handler.codec.http2.Http2StreamChannel;
-import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
+import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.resolver.AddressResolverGroup;
-import io.netty.util.concurrent.Future;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.ChannelPipelineConfigurer;
@@ -505,6 +503,77 @@ Http2Settings http2Settings() {
return settings;
}
+ static void addStreamHandlers(
+ Channel ch,
+ ConnectionObserver obs,
+ ChannelOperations.OnSetup opsFactory,
+ boolean acceptGzip,
+ @Nullable ChannelMetricsRecorder metricsRecorder,
+ long responseTimeoutMillis,
+ @Nullable Function uriTagValue) {
+
+ if (HttpClientOperations.log.isDebugEnabled()) {
+ HttpClientOperations.log.debug(format(ch, "New HTTP/2 stream"));
+ }
+
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT)
+ .addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER);
+
+ if (acceptGzip) {
+ pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
+ }
+
+ ChannelOperations.addReactiveBridge(ch, opsFactory, obs);
+
+ if (metricsRecorder != null) {
+ if (metricsRecorder instanceof HttpClientMetricsRecorder) {
+ ChannelHandler handler;
+ Channel parent = ch.parent();
+ ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler);
+ if (existingHandler != null) {
+ // This use case can happen only in HTTP/2 clear text connection upgrade
+ parent.pipeline().remove(NettyPipeline.HttpMetricsHandler);
+ if (metricsRecorder instanceof MicrometerHttpClientMetricsRecorder) {
+ handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsHandler) existingHandler);
+ }
+ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
+ handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler);
+ }
+ else {
+ handler = new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler);
+ }
+ }
+ else {
+ if (metricsRecorder instanceof MicrometerHttpClientMetricsRecorder) {
+ handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, uriTagValue);
+ }
+ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
+ handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue);
+ }
+ else {
+ handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue);
+ }
+ }
+ pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler);
+ }
+ }
+
+ if (responseTimeoutMillis > -1) {
+ Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
+ new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline());
+ }
+
+ ChannelOperations, ?> ops = opsFactory.create(Connection.from(ch), obs, null);
+ if (ops != null) {
+ ops.bind();
+ }
+ }
+
static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder,
Http2Settings http2Settings, ConnectionObserver observer) {
Http2FrameCodecBuilder http2FrameCodecBuilder =
@@ -517,7 +586,8 @@ static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpRe
"reactor.netty.http.client.h2"));
}
- p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpCodec, http2FrameCodecBuilder.build())
+ p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2Flush, new FlushConsolidationHandler(1024, true))
+ .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpCodec, http2FrameCodecBuilder.build())
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2MultiplexHandler, new Http2MultiplexHandler(new H2Codec(acceptGzip)))
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer));
}
@@ -625,20 +695,6 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
}
}
- static Future openStream(
- Channel channel,
- Http2ConnectionProvider.DisposableAcquire owner,
- ConnectionObserver observer,
- ChannelOperations.OnSetup opsFactory,
- boolean acceptGzip,
- @Nullable ChannelMetricsRecorder metricsRecorder,
- @Nullable Function uriTagValue) {
- Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel);
- bootstrap.option(ChannelOption.AUTO_READ, false);
- bootstrap.handler(new H2Codec(owner, observer, opsFactory, acceptGzip, metricsRecorder, uriTagValue));
- return bootstrap.open();
- }
-
static final Pattern FOLLOW_REDIRECT_CODES = Pattern.compile("30[12378]");
static final BiPredicate FOLLOW_REDIRECT_PREDICATE =
@@ -656,6 +712,12 @@ static Future openStream(
static final int h11orH2C = h11 | h2c;
+ static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT =
+ new Http2StreamFrameToHttpObjectCodec(false);
+
+ static final Http2StreamBridgeClientHandler HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER =
+ new Http2StreamBridgeClientHandler();
+
static final Logger log = Loggers.getLogger(HttpClientConfig.class);
static final LoggingHandler LOGGING_HANDLER =
@@ -723,6 +785,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
}
static final class H2Codec extends ChannelInitializer {
+
final boolean acceptGzip;
final ChannelMetricsRecorder metricsRecorder;
final ConnectionObserver observer;
@@ -778,68 +841,14 @@ protected void initChannel(Channel ch) {
Http2ConnectionProvider.registerClose(ch, owner);
ContextContainer container = ContextContainer.restore(owner.propagationContext);
container.save(ch);
-
- addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory);
+ addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory,
+ acceptGzip, metricsRecorder, responseTimeoutMillis, uriTagValue);
}
else {
// Handle server pushes (inbound streams)
// TODO this is not supported
}
}
-
- void addStreamHandlers(Channel ch, ConnectionObserver obs, ChannelOperations.OnSetup opsFactory) {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(false))
- .addLast(NettyPipeline.HttpTrafficHandler, new Http2StreamBridgeClientHandler(obs, opsFactory));
-
- if (acceptGzip) {
- pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
- }
-
- ChannelOperations.addReactiveBridge(ch, opsFactory, obs);
-
- if (metricsRecorder != null) {
- if (metricsRecorder instanceof HttpClientMetricsRecorder) {
- ChannelHandler handler;
- Channel parent = ch.parent();
- ChannelHandler existingHandler = parent.pipeline().get(NettyPipeline.HttpMetricsHandler);
- if (existingHandler != null) {
- // This use case can happen only in HTTP/2 clear text connection upgrade
- parent.pipeline().remove(NettyPipeline.HttpMetricsHandler);
- if (metricsRecorder instanceof MicrometerHttpClientMetricsRecorder) {
- handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsHandler) existingHandler);
- }
- else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
- handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsHandler) existingHandler);
- }
- else {
- handler = new HttpClientMetricsHandler((HttpClientMetricsHandler) existingHandler);
- }
- }
- else {
- if (metricsRecorder instanceof MicrometerHttpClientMetricsRecorder) {
- handler = new MicrometerHttpClientMetricsHandler((MicrometerHttpClientMetricsRecorder) metricsRecorder, uriTagValue);
- }
- else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
- handler = new ContextAwareHttpClientMetricsHandler((ContextAwareHttpClientMetricsRecorder) metricsRecorder, uriTagValue);
- }
- else {
- handler = new HttpClientMetricsHandler((HttpClientMetricsRecorder) metricsRecorder, uriTagValue);
- }
- }
- pipeline.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpMetricsHandler, handler);
- }
- }
-
- if (responseTimeoutMillis > -1) {
- Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
- new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
- }
-
- if (log.isDebugEnabled()) {
- log.debug(format(ch, "Initialized HTTP/2 stream pipeline {}"), ch.pipeline());
- }
- }
}
static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
index df6d4b2e82..909d84957f 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/MicrometerHttp2ConnectionProviderMeterRegistrar.java
@@ -23,10 +23,12 @@
import java.net.SocketAddress;
import static reactor.netty.Metrics.REGISTRY;
+import static reactor.netty.http.client.Http2ConnectionProviderMeters.ACTIVE_CONNECTIONS;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.ACTIVE_STREAMS;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.Http2ConnectionProviderMetersTags.ID;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.Http2ConnectionProviderMetersTags.NAME;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.Http2ConnectionProviderMetersTags.REMOTE_ADDRESS;
+import static reactor.netty.http.client.Http2ConnectionProviderMeters.IDLE_CONNECTIONS;
import static reactor.netty.http.client.Http2ConnectionProviderMeters.PENDING_STREAMS;
final class MicrometerHttp2ConnectionProviderMeterRegistrar {
@@ -41,7 +43,15 @@ void registerMetrics(String poolName, String id, SocketAddress remoteAddress, In
String addressAsString = Metrics.formatSocketAddress(remoteAddress);
Tags tags = Tags.of(ID.getKeyName(), id, REMOTE_ADDRESS.getKeyName(), addressAsString, NAME.getKeyName(), poolName);
- Gauge.builder(ACTIVE_STREAMS.getName(), metrics, InstrumentedPool.PoolMetrics::acquiredSize)
+ Gauge.builder(ACTIVE_CONNECTIONS.getName(), metrics, InstrumentedPool.PoolMetrics::acquiredSize)
+ .tags(tags)
+ .register(REGISTRY);
+
+ Gauge.builder(ACTIVE_STREAMS.getName(), metrics, poolMetrics -> ((Http2Pool) poolMetrics).activeStreams())
+ .tags(tags)
+ .register(REGISTRY);
+
+ Gauge.builder(IDLE_CONNECTIONS.getName(), metrics, InstrumentedPool.PoolMetrics::idleSize)
.tags(tags)
.register(REGISTRY);
diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java
index b4f547a661..1c977f99f2 100644
--- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java
+++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java
@@ -414,7 +414,7 @@ static void addStreamHandlers(Channel ch,
if (accessLogEnabled) {
pipeline.addLast(NettyPipeline.AccessLogHandler, AccessLogHandlerFactory.H2.create(accessLog));
}
- pipeline.addLast(NettyPipeline.H2ToHttp11Codec, new Http2StreamFrameToHttpObjectCodec(true))
+ pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT)
.addLast(NettyPipeline.HttpTrafficHandler,
new Http2StreamBridgeServerHandler(compressPredicate, decoder, encoder, formDecoderProvider,
forwardedHeaderHandler, listener, mapHandle));
@@ -692,6 +692,9 @@ else if (metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder) {
static final int h11orH2C = h11 | h2c;
+ static final Http2StreamFrameToHttpObjectCodec HTTP2_STREAM_FRAME_TO_HTTP_OBJECT =
+ new Http2StreamFrameToHttpObjectCodec(true);
+
static final Logger log = Loggers.getLogger(HttpServerConfig.class);
static final LoggingHandler LOGGING_HANDLER =
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java
index 22d6210df9..328715fbe0 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java
@@ -407,7 +407,7 @@ private void doTestResponseTimeout(HttpClient client, long expectedTimeout)
timeout.set(((ReadTimeoutHandler) handler).getReaderIdleTimeInMillis());
}
})
- .doOnDisconnected(conn -> onDisconnected.set(handlerAvailable.test(conn)));
+ .doOnDisconnected(conn -> onDisconnected.set(conn.channel().isActive() && handlerAvailable.test(conn)));
Mono response =
localClient.get()
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java
new file mode 100644
index 0000000000..d708754cf2
--- /dev/null
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2AllocationStrategyTest.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package reactor.netty.http.client;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONCURRENT_STREAMS;
+import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MAX_CONNECTIONS;
+import static reactor.netty.http.client.Http2AllocationStrategy.Build.DEFAULT_MIN_CONNECTIONS;
+
+class Http2AllocationStrategyTest {
+ private Http2AllocationStrategy.Builder builder;
+
+ @BeforeEach
+ void setUp() {
+ builder = Http2AllocationStrategy.builder();
+ }
+
+ @Test
+ void build() {
+ builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(2);
+ assertThat(strategy.permitMaximum()).isEqualTo(2);
+ assertThat(strategy.permitMinimum()).isEqualTo(1);
+ }
+
+ @Test
+ void buildBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.maxConnections(1).minConnections(2).build())
+ .withMessage("minConnections (2) must be less than or equal to maxConnections (1)");
+ }
+
+ @Test
+ void copy() {
+ builder.maxConcurrentStreams(2).maxConnections(2).minConnections(1);
+ Http2AllocationStrategy strategy = builder.build();
+ Http2AllocationStrategy copy = strategy.copy();
+ assertThat(copy.maxConcurrentStreams()).isEqualTo(strategy.maxConcurrentStreams());
+ assertThat(copy.permitMaximum()).isEqualTo(strategy.permitMaximum());
+ assertThat(copy.permitMinimum()).isEqualTo(strategy.permitMinimum());
+ }
+
+ @Test
+ void maxConcurrentStreams() {
+ builder.maxConcurrentStreams(2);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(2);
+ assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS);
+ assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS);
+ }
+
+ @Test
+ void maxConcurrentStreamsBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.maxConcurrentStreams(-2))
+ .withMessage("maxConcurrentStreams must be greater than or equal to -1");
+ }
+
+ @Test
+ void permitMaximum() {
+ builder.maxConnections(2);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS);
+ assertThat(strategy.permitMaximum()).isEqualTo(2);
+ assertThat(strategy.permitMinimum()).isEqualTo(DEFAULT_MIN_CONNECTIONS);
+ }
+
+ @Test
+ void permitMaximumBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.maxConnections(0))
+ .withMessage("maxConnections must be strictly positive");
+ }
+
+ @Test
+ void permitMinimum() {
+ builder.minConnections(2);
+ Http2AllocationStrategy strategy = builder.build();
+ assertThat(strategy.maxConcurrentStreams()).isEqualTo(DEFAULT_MAX_CONCURRENT_STREAMS);
+ assertThat(strategy.permitMaximum()).isEqualTo(DEFAULT_MAX_CONNECTIONS);
+ assertThat(strategy.permitMinimum()).isEqualTo(2);
+ }
+
+ @Test
+ void permitMinimumBadValues() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> builder.minConnections(-1))
+ .withMessage("minConnections must be positive or zero");
+ }
+}
diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
index dc24581f27..6780f795c2 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java
@@ -22,9 +22,9 @@
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
-import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
@@ -38,9 +38,9 @@
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
class Http2PoolTest {
@@ -53,7 +53,7 @@ void acquireInvalidate() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
try {
List> acquired = new ArrayList<>();
@@ -61,21 +61,26 @@ void acquireInvalidate() {
http2Pool.acquire().subscribe(acquired::add);
http2Pool.acquire().subscribe(acquired::add);
+ channel.runPendingTasks();
+
assertThat(acquired).hasSize(3);
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3);
+ assertThat(http2Pool.activeStreams()).isEqualTo(3);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
for (PooledRef slot : acquired) {
slot.invalidate().block(Duration.ofSeconds(1));
}
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
for (PooledRef slot : acquired) {
// second invalidate() should be ignored and ACQUIRED size should remain the same
slot.invalidate().block(Duration.ofSeconds(1));
}
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
}
finally {
channel.finishAndReleaseAll();
@@ -92,7 +97,7 @@ void acquireRelease() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
try {
List> acquired = new ArrayList<>();
@@ -100,21 +105,26 @@ void acquireRelease() {
http2Pool.acquire().subscribe(acquired::add);
http2Pool.acquire().subscribe(acquired::add);
+ channel.runPendingTasks();
+
assertThat(acquired).hasSize(3);
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(3);
+ assertThat(http2Pool.activeStreams()).isEqualTo(3);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
for (PooledRef slot : acquired) {
slot.release().block(Duration.ofSeconds(1));
}
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
for (PooledRef slot : acquired) {
// second release() should be ignored and ACQUIRED size should remain the same
slot.release().block(Duration.ofSeconds(1));
}
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
}
finally {
channel.finishAndReleaseAll();
@@ -134,15 +144,16 @@ void evictClosedConnection() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
Connection connection = null;
try {
PooledRef acquired1 = http2Pool.acquire().block();
assertThat(acquired1).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection = acquired1.poolable();
ChannelId id1 = connection.channel().id();
@@ -153,19 +164,22 @@ void evictClosedConnection() throws Exception {
assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
acquired1.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
PooledRef acquired2 = http2Pool.acquire().block();
assertThat(acquired2).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection = acquired2.poolable();
ChannelId id2 = connection.channel().id();
@@ -174,8 +188,9 @@ void evictClosedConnection() throws Exception {
acquired2.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
- assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
}
finally {
if (connection != null) {
@@ -186,26 +201,37 @@ void evictClosedConnection() throws Exception {
}
@Test
- void evictClosedConnectionMaxConnectionsNotReached() throws Exception {
+ void evictClosedConnectionMaxConnectionsNotReached_1() throws Exception {
+ evictClosedConnectionMaxConnectionsNotReached(false);
+ }
+
+ @Test
+ void evictClosedConnectionMaxConnectionsNotReached_2() throws Exception {
+ evictClosedConnectionMaxConnectionsNotReached(true);
+ }
+
+ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond) throws Exception {
PoolBuilder> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
Channel channel = new EmbeddedChannel(
new TestChannelId(),
- Http2FrameCodecBuilder.forClient().build());
+ Http2FrameCodecBuilder.forClient().build(),
+ new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
return Connection.from(channel);
}))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
Connection connection = null;
try {
PooledRef acquired1 = http2Pool.acquire().block();
assertThat(acquired1).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
connection = acquired1.poolable();
ChannelId id1 = connection.channel().id();
@@ -216,25 +242,53 @@ void evictClosedConnectionMaxConnectionsNotReached() throws Exception {
assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
PooledRef acquired2 = http2Pool.acquire().block();
-
assertThat(acquired2).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2);
- assertThat(http2Pool.connections.size()).isEqualTo(2);
+
+ AtomicReference> acquired3 = new AtomicReference<>();
+ http2Pool.acquire().subscribe(acquired3::set);
connection = acquired2.poolable();
- ChannelId id2 = connection.channel().id();
+ ((EmbeddedChannel) connection.channel()).runPendingTasks();
+ assertThat(http2Pool.activeStreams()).isEqualTo(3);
+ assertThat(http2Pool.connections.size()).isEqualTo(2);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(2L * Integer.MAX_VALUE);
+
+ if (closeSecond) {
+ latch = new CountDownLatch(1);
+ ((EmbeddedChannel) connection.channel()).finishAndReleaseAll();
+ connection.onDispose(latch::countDown);
+ connection.dispose();
+
+ assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue();
+ }
+
+ ChannelId id2 = connection.channel().id();
assertThat(id1).isNotEqualTo(id2);
acquired1.invalidate().block();
acquired2.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
- assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
+
+ acquired3.get().invalidate().block();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ if (closeSecond) {
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+ }
+ else {
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
+ }
}
finally {
if (connection != null) {
@@ -256,15 +310,16 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
Connection connection = null;
try {
PooledRef acquired1 = http2Pool.acquire().block();
assertThat(acquired1).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection = acquired1.poolable();
CountDownLatch latch = new CountDownLatch(1);
@@ -274,21 +329,24 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
http2Pool.acquire(Duration.ofMillis(10))
.as(StepVerifier::create)
.expectError(PoolAcquireTimeoutException.class)
.verify(Duration.ofSeconds(1));
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
acquired1.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
}
finally {
if (connection != null) {
@@ -298,6 +356,347 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
}
}
+ @Test
+ void evictInBackgroundClosedConnection() throws Exception {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1)
+ .evictInBackground(Duration.ofSeconds(5));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
+
+ Connection connection = null;
+ try {
+ PooledRef acquired1 = http2Pool.acquire().block();
+
+ assertThat(acquired1).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection = acquired1.poolable();
+ ChannelId id1 = connection.channel().id();
+ CountDownLatch latch = new CountDownLatch(1);
+ ((EmbeddedChannel) connection.channel()).finishAndReleaseAll();
+ connection.onDispose(latch::countDown);
+ connection.dispose();
+
+ assertThat(latch.await(1, TimeUnit.SECONDS)).as("latch await").isTrue();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ acquired1.invalidate().block();
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ PooledRef acquired2 = http2Pool.acquire().block();
+
+ assertThat(acquired2).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection = acquired2.poolable();
+ ChannelId id2 = connection.channel().id();
+
+ assertThat(id1).isNotEqualTo(id2);
+
+ acquired2.invalidate().block();
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+ }
+ finally {
+ if (connection != null) {
+ ((EmbeddedChannel) connection.channel()).finishAndReleaseAll();
+ connection.dispose();
+ }
+ }
+ }
+
+ @Test
+ void evictInBackgroundMaxIdleTime() throws Exception {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1)
+ .evictInBackground(Duration.ofSeconds(5));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1));
+
+ Connection connection1 = null;
+ Connection connection2 = null;
+ try {
+ PooledRef acquired1 = http2Pool.acquire().block();
+
+ assertThat(acquired1).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection1 = acquired1.poolable();
+ ChannelId id1 = connection1.channel().id();
+
+ acquired1.invalidate().block();
+
+ Thread.sleep(15);
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ PooledRef acquired2 = http2Pool.acquire().block();
+
+ assertThat(acquired2).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection2 = acquired2.poolable();
+ ChannelId id2 = connection2.channel().id();
+
+ assertThat(id1).isNotEqualTo(id2);
+
+ acquired2.invalidate().block();
+
+ Thread.sleep(15);
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+ }
+ finally {
+ if (connection1 != null) {
+ ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
+ connection1.dispose();
+ }
+ if (connection2 != null) {
+ ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
+ connection2.dispose();
+ }
+ }
+ }
+
+ @Test
+ void evictInBackgroundMaxLifeTime() throws Exception {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1)
+ .evictInBackground(Duration.ofSeconds(5));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));
+
+ Connection connection1 = null;
+ Connection connection2 = null;
+ try {
+ PooledRef acquired1 = http2Pool.acquire().block();
+
+ assertThat(acquired1).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection1 = acquired1.poolable();
+ ChannelId id1 = connection1.channel().id();
+
+ Thread.sleep(10);
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ acquired1.invalidate().block();
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ PooledRef acquired2 = http2Pool.acquire().block();
+
+ assertThat(acquired2).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection2 = acquired2.poolable();
+ ChannelId id2 = connection2.channel().id();
+
+ assertThat(id1).isNotEqualTo(id2);
+
+ acquired2.invalidate().block();
+
+ Thread.sleep(10);
+
+ http2Pool.evictInBackground();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+ }
+ finally {
+ if (connection1 != null) {
+ ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
+ connection1.dispose();
+ }
+ if (connection2 != null) {
+ ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
+ connection2.dispose();
+ }
+ }
+ }
+
+ @Test
+ void maxIdleTime() throws Exception {
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1);
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1));
+
+ Connection connection1 = null;
+ Connection connection2 = null;
+ try {
+ PooledRef acquired1 = http2Pool.acquire().block();
+
+ assertThat(acquired1).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection1 = acquired1.poolable();
+ ChannelId id1 = connection1.channel().id();
+
+ acquired1.invalidate().block();
+
+ Thread.sleep(15);
+
+ PooledRef acquired2 = http2Pool.acquire().block();
+
+ assertThat(acquired2).isNotNull();
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ connection2 = acquired2.poolable();
+ ChannelId id2 = connection2.channel().id();
+
+ assertThat(id1).isNotEqualTo(id2);
+
+ acquired2.invalidate().block();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+ }
+ finally {
+ if (connection1 != null) {
+ ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
+ connection1.dispose();
+ }
+ if (connection2 != null) {
+ ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
+ connection2.dispose();
+ }
+ }
+ }
+
+ @Test
+ void maxIdleTimeActiveStreams() throws Exception {
+ EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.just(Connection.from(channel)))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(0, 1);
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, 10, -1));
+
+ Connection connection1 = null;
+ Connection connection2 = null;
+ try {
+ List> acquired = new ArrayList<>();
+ http2Pool.acquire().subscribe(acquired::add);
+ http2Pool.acquire().subscribe(acquired::add);
+
+ channel.runPendingTasks();
+
+ assertThat(acquired).hasSize(2);
+ assertThat(http2Pool.activeStreams()).isEqualTo(2);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
+
+ connection1 = acquired.get(0).poolable();
+ ChannelId id1 = connection1.channel().id();
+
+ acquired.get(0).invalidate().block();
+
+ Thread.sleep(15);
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
+
+ connection2 = acquired.get(1).poolable();
+ ChannelId id2 = connection2.channel().id();
+
+ assertThat(id1).isEqualTo(id2);
+
+ acquired.get(1).invalidate().block();
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
+ }
+ finally {
+ if (connection1 != null) {
+ ((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
+ connection1.dispose();
+ }
+ if (connection2 != null) {
+ ((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
+ connection2.dispose();
+ }
+ }
+ }
+
@Test
void maxLifeTime() throws Exception {
PoolBuilder> poolBuilder =
@@ -310,7 +709,7 @@ void maxLifeTime() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));
Connection connection1 = null;
Connection connection2 = null;
@@ -318,27 +717,31 @@ void maxLifeTime() throws Exception {
PooledRef acquired1 = http2Pool.acquire().block();
assertThat(acquired1).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection1 = acquired1.poolable();
ChannelId id1 = connection1.channel().id();
Thread.sleep(10);
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
acquired1.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
PooledRef acquired2 = http2Pool.acquire().block();
assertThat(acquired2).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection2 = acquired2.poolable();
ChannelId id2 = connection2.channel().id();
@@ -347,8 +750,9 @@ void maxLifeTime() throws Exception {
acquired2.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
- assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
}
finally {
if (connection1 != null) {
@@ -374,7 +778,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 50));
Connection connection1 = null;
Connection connection2 = null;
@@ -382,22 +786,25 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception {
PooledRef acquired1 = http2Pool.acquire().block();
assertThat(acquired1).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection1 = acquired1.poolable();
ChannelId id1 = connection1.channel().id();
- Thread.sleep(10);
+ Thread.sleep(50);
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
PooledRef acquired2 = http2Pool.acquire().block();
assertThat(acquired2).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(2);
+ assertThat(http2Pool.activeStreams()).isEqualTo(2);
assertThat(http2Pool.connections.size()).isEqualTo(2);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection2 = acquired2.poolable();
ChannelId id2 = connection2.channel().id();
@@ -407,8 +814,9 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception {
acquired1.invalidate().block();
acquired2.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
- assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
}
finally {
if (connection1 != null) {
@@ -434,35 +842,39 @@ void maxLifeTimeMaxConnectionsReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));
Connection connection = null;
try {
PooledRef acquired1 = http2Pool.acquire().block();
assertThat(acquired1).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
connection = acquired1.poolable();
Thread.sleep(10);
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
http2Pool.acquire(Duration.ofMillis(10))
.as(StepVerifier::create)
.expectError(PoolAcquireTimeoutException.class)
.verify(Duration.ofSeconds(1));
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
acquired1.invalidate().block();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
}
finally {
if (connection != null) {
@@ -473,11 +885,101 @@ void maxLifeTimeMaxConnectionsReached() throws Exception {
}
@Test
- void minConnectionsConfigNotSupported() {
+ void minConnections() {
+ EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
+ PoolBuilder> poolBuilder =
+ PoolBuilder.from(Mono.just(Connection.from(channel)))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(1, 3);
+ Http2AllocationStrategy strategy = Http2AllocationStrategy.builder()
+ .maxConnections(3)
+ .minConnections(1)
+ .build();
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1));
+
+ List> acquired = new ArrayList<>();
+ try {
+ Flux.range(0, 3)
+ .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add))
+ .subscribe();
+
+ channel.runPendingTasks();
+
+ assertThat(acquired).hasSize(3);
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(3);
+ assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(1).poolable());
+ assertThat(acquired.get(0).poolable()).isSameAs(acquired.get(2).poolable());
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
+
+ for (PooledRef slot : acquired) {
+ slot.release().block(Duration.ofSeconds(1));
+ }
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(Integer.MAX_VALUE);
+ }
+ finally {
+ for (PooledRef slot : acquired) {
+ Connection conn = slot.poolable();
+ ((EmbeddedChannel) conn.channel()).finishAndReleaseAll();
+ conn.dispose();
+ }
+ }
+ }
+
+ @Test
+ void minConnectionsMaxStreamsReached() {
PoolBuilder> poolBuilder =
- PoolBuilder.from(Mono.empty()).sizeBetween(1, 2);
- assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1)));
+ PoolBuilder.from(Mono.fromSupplier(() -> {
+ Channel channel = new EmbeddedChannel(
+ new TestChannelId(),
+ Http2FrameCodecBuilder.forClient().build());
+ return Connection.from(channel);
+ }))
+ .idleResourceReuseLruOrder()
+ .maxPendingAcquireUnbounded()
+ .sizeBetween(1, 3);
+ Http2AllocationStrategy strategy = Http2AllocationStrategy.builder()
+ .maxConnections(3)
+ .minConnections(1)
+ .build();
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy, -1, -1));
+
+ List> acquired = new ArrayList<>();
+ try {
+ Flux.range(0, 3)
+ .flatMap(i -> http2Pool.acquire().doOnNext(acquired::add))
+ .blockLast(Duration.ofSeconds(1));
+
+ assertThat(acquired).hasSize(3);
+
+ for (PooledRef pooledRef : acquired) {
+ ((EmbeddedChannel) pooledRef.poolable().channel()).runPendingTasks();
+ }
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(3);
+ assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(1).poolable());
+ assertThat(acquired.get(0).poolable()).isNotSameAs(acquired.get(2).poolable());
+ assertThat(acquired.get(1).poolable()).isNotSameAs(acquired.get(2).poolable());
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+
+ for (PooledRef slot : acquired) {
+ slot.release().block(Duration.ofSeconds(1));
+ }
+
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
+ }
+ finally {
+ for (PooledRef slot : acquired) {
+ Connection conn = slot.poolable();
+ ((EmbeddedChannel) conn.channel()).finishAndReleaseAll();
+ conn.dispose();
+ }
+ }
}
@Test
@@ -488,24 +990,27 @@ void nonHttp2ConnectionEmittedOnce() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
- InstrumentedPool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
+ Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, -1));
try {
PooledRef acquired = http2Pool.acquire().block(Duration.ofSeconds(1));
assertThat(acquired).isNotNull();
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
http2Pool.acquire(Duration.ofMillis(10))
.as(StepVerifier::create)
.expectError(PoolAcquireTimeoutException.class)
.verify(Duration.ofSeconds(1));
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(1);
+ assertThat(http2Pool.activeStreams()).isEqualTo(1);
acquired.invalidate().block(Duration.ofSeconds(1));
- assertThat(http2Pool.metrics().acquiredSize()).isEqualTo(0);
+ assertThat(http2Pool.activeStreams()).isEqualTo(0);
+ assertThat(http2Pool.connections.size()).isEqualTo(0);
+ assertThat(http2Pool.totalMaxConcurrentStreams).isEqualTo(0);
}
finally {
channel.finishAndReleaseAll();
diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java
index 6ee8a43053..0cad951be8 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java
@@ -15,6 +15,10 @@
*/
package reactor.netty.resources;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
@@ -28,7 +32,9 @@
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -43,6 +49,7 @@
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
+import reactor.netty.http.client.Http2AllocationStrategy;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.server.HttpServer;
import reactor.netty.internal.shaded.reactor.pool.InstrumentedPool;
@@ -62,18 +69,41 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
+import static reactor.netty.Metrics.ACTIVE_CONNECTIONS;
+import static reactor.netty.Metrics.CONNECTION_PROVIDER_PREFIX;
+import static reactor.netty.Metrics.IDLE_CONNECTIONS;
+import static reactor.netty.Metrics.NAME;
+import static reactor.netty.Metrics.REMOTE_ADDRESS;
+import static reactor.netty.Metrics.TOTAL_CONNECTIONS;
+import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED;
class DefaultPooledConnectionProviderTest extends BaseHttpTest {
static SelfSignedCertificate ssc;
+ private MeterRegistry registry;
+
@BeforeAll
static void createSelfSignedCertificate() throws CertificateException {
ssc = new SelfSignedCertificate();
}
+ @BeforeEach
+ void setUp() {
+ registry = new SimpleMeterRegistry();
+ Metrics.addRegistry(registry);
+ }
+
+ @AfterEach
+ void tearDown() {
+ Metrics.removeRegistry(registry);
+ registry.clear();
+ registry.close();
+ }
+
@Test
void testIssue903() {
Http11SslContextSpec serverCtx = Http11SslContextSpec.forServer(ssc.key(), ssc.cert());
@@ -290,7 +320,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
}
@Test
- void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception {
+ void testConnectionIdleWhenNoActiveStreams() throws Exception {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx =
Http2SslContextSpec.forClient()
@@ -307,19 +337,31 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception {
int requestsNum = 10;
CountDownLatch latch = new CountDownLatch(1);
DefaultPooledConnectionProvider provider =
- (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionReturnedToParentPoolWhenNoActiveStreams", 5);
+ (DefaultPooledConnectionProvider) ConnectionProvider.create("testConnectionIdleWhenNoActiveStreams", 5);
AtomicInteger counter = new AtomicInteger();
+ AtomicReference serverAddress = new AtomicReference<>();
HttpClient client =
createClient(provider, disposableServer.port())
- .wiretap(false)
+ .wiretap(false)
.protocol(HttpProtocol.H2)
.secure(spec -> spec.sslContext(clientCtx))
+ .metrics(true, Function.identity())
+ .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
.observe((conn, state) -> {
- if (state == ConnectionObserver.State.CONNECTED) {
+ if (state == STREAM_CONFIGURED) {
counter.incrementAndGet();
- }
- if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) {
- latch.countDown();
+ conn.onTerminate()
+ .subscribe(null,
+ t -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }),
+ () -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }));
}
});
@@ -328,7 +370,7 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception {
.flatMap(i ->
client.post()
.uri("/")
- .send(ByteBufMono.fromString(Mono.just("testConnectionReturnedToParentPoolWhenNoActiveStreams")))
+ .send(ByteBufMono.fromString(Mono.just("testConnectionIdleWhenNoActiveStreams")))
.responseContent()
.aggregate()
.asString())
@@ -336,14 +378,16 @@ void testConnectionReturnedToParentPoolWhenNoActiveStreams() throws Exception {
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
- assertThat(provider.channelPools).hasSize(1);
+ InetSocketAddress sa = (InetSocketAddress) serverAddress.get();
+ String address = sa.getHostString() + ":" + sa.getPort();
- @SuppressWarnings({"unchecked", "rawtypes"})
- InstrumentedPool channelPool =
- provider.channelPools.values().toArray(new InstrumentedPool[0])[0];
- InstrumentedPool.PoolMetrics metrics = channelPool.metrics();
- assertThat(metrics.acquiredSize()).isEqualTo(0);
- assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize());
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams")).isEqualTo(0);
+ double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.testConnectionIdleWhenNoActiveStreams");
+ double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "testConnectionIdleWhenNoActiveStreams");
+ assertThat(totalConn).isEqualTo(idleConn);
}
finally {
provider.disposeLater()
@@ -442,21 +486,33 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie
.bindNow();
DefaultPooledConnectionProvider provider =
- (DefaultPooledConnectionProvider) ConnectionProvider.create("", 5);
+ (DefaultPooledConnectionProvider) ConnectionProvider.create("doTestIssue1982", 5);
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
+ AtomicReference serverAddress = new AtomicReference<>();
HttpClient mainClient = clientCtx != null ?
HttpClient.create(provider).port(disposableServer.port()).secure(sslContextSpec -> sslContextSpec.sslContext(clientCtx)) :
HttpClient.create(provider).port(disposableServer.port());
HttpClient client =
mainClient.protocol(clientProtocols)
+ .metrics(true, Function.identity())
+ .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
.observe((conn, state) -> {
- if (state == ConnectionObserver.State.CONNECTED) {
+ if (state == STREAM_CONFIGURED) {
counter.incrementAndGet();
- }
- if (state == ConnectionObserver.State.RELEASED && counter.decrementAndGet() == 0) {
- latch.countDown();
+ conn.onTerminate()
+ .subscribe(null,
+ t -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }),
+ () -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }));
}
});
try {
@@ -471,12 +527,96 @@ private void doTestIssue1982(HttpProtocol[] serverProtocols, HttpProtocol[] clie
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
- @SuppressWarnings({"unchecked", "rawtypes"})
- InstrumentedPool channelPool =
- provider.channelPools.values().toArray(new InstrumentedPool[0])[0];
- InstrumentedPool.PoolMetrics metrics = channelPool.metrics();
- assertThat(metrics.acquiredSize()).isEqualTo(0);
- assertThat(metrics.allocatedSize()).isEqualTo(metrics.idleSize());
+ InetSocketAddress sa = (InetSocketAddress) serverAddress.get();
+ String address = sa.getHostString() + ":" + sa.getPort();
+
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982")).isEqualTo(0);
+ double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.doTestIssue1982");
+ double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "doTestIssue1982");
+ assertThat(totalConn).isEqualTo(idleConn);
+ }
+ finally {
+ provider.disposeLater()
+ .block(Duration.ofSeconds(5));
+ }
+ }
+
+ //https://github.com/reactor/reactor-netty/issues/1808
+ @Test
+ void testMinConnections() throws Exception {
+ Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
+ Http2SslContextSpec clientCtx =
+ Http2SslContextSpec.forClient()
+ .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));
+
+ disposableServer =
+ createServer()
+ .wiretap(false)
+ .protocol(HttpProtocol.H2)
+ .secure(spec -> spec.sslContext(serverCtx))
+ .route(routes -> routes.post("/", (req, res) -> res.send(req.receive().retain())))
+ .bindNow();
+
+ int requestsNum = 100;
+ CountDownLatch latch = new CountDownLatch(1);
+ DefaultPooledConnectionProvider provider =
+ (DefaultPooledConnectionProvider) ConnectionProvider.builder("testMinConnections")
+ .allocationStrategy(Http2AllocationStrategy.builder().maxConnections(20).minConnections(5).build())
+ .build();
+ AtomicInteger counter = new AtomicInteger();
+ AtomicReference serverAddress = new AtomicReference<>();
+ HttpClient client =
+ createClient(provider, disposableServer.port())
+ .wiretap(false)
+ .protocol(HttpProtocol.H2)
+ .secure(spec -> spec.sslContext(clientCtx))
+ .metrics(true, Function.identity())
+ .doAfterRequest((req, conn) -> serverAddress.set(conn.channel().remoteAddress()))
+ .observe((conn, state) -> {
+ if (state == STREAM_CONFIGURED) {
+ counter.incrementAndGet();
+ conn.onTerminate()
+ .subscribe(null,
+ t -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }),
+ () -> conn.channel().eventLoop().execute(() -> {
+ if (counter.decrementAndGet() == 0) {
+ latch.countDown();
+ }
+ }));
+ }
+ });
+
+ try {
+ Flux.range(0, requestsNum)
+ .flatMap(i ->
+ client.post()
+ .uri("/")
+ .send(ByteBufMono.fromString(Mono.just("testMinConnections")))
+ .responseContent()
+ .aggregate()
+ .asString())
+ .blockLast(Duration.ofSeconds(5));
+
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+
+ InetSocketAddress sa = (InetSocketAddress) serverAddress.get();
+ String address = sa.getHostString() + ":" + sa.getPort();
+
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.testMinConnections")).isEqualTo(0);
+ double idleConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "http2.testMinConnections");
+ double totalConn = getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS,
+ REMOTE_ADDRESS, address, NAME, "testMinConnections");
+ assertThat(totalConn).isEqualTo(idleConn);
+ assertThat(totalConn).isLessThan(10);
}
finally {
provider.disposeLater()
@@ -512,4 +652,13 @@ public boolean trySuccess(Void result) {
return r;
}
}
+
+ private double getGaugeValue(String gaugeName, String... tags) {
+ Gauge gauge = registry.find(gaugeName).tags(tags).gauge();
+ double result = -1;
+ if (gauge != null) {
+ result = gauge.value();
+ }
+ return result;
+ }
}
diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java
index ac752e082f..a466ff3982 100644
--- a/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java
+++ b/reactor-netty-http/src/test/java/reactor/netty/resources/PooledConnectionProviderDefaultMetricsTest.java
@@ -207,15 +207,17 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole
assertThat(metrics.get()).isTrue();
if (isSecured) {
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(1);
- assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(1);
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(1);
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, "http2." + poolName)).isEqualTo(0);
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, "http2." + poolName)).isEqualTo(1);
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_STREAMS, "http2." + poolName)).isEqualTo(0);
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, "http2." + poolName)).isEqualTo(0);
}
else {
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + TOTAL_CONNECTIONS, poolName)).isEqualTo(0);
- assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0);
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0);
}
- assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, poolName)).isEqualTo(0);
+ assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + IDLE_CONNECTIONS, poolName)).isEqualTo(0);
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + PENDING_CONNECTIONS, poolName)).isEqualTo(0);
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_CONNECTIONS, poolName)).isEqualTo(expectedMaxConnection);
assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, poolName)).isEqualTo(expectedMaxPendingAcquire);