diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index a299f7d14d..aa6803055b 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -760,9 +760,11 @@ public void request(long n) { long estimateStreamsCount = pool.totalMaxConcurrentStreams - pool.acquired; int permits = pool.poolConfig.allocationStrategy().estimatePermitCount(); int pending = pool.pendingSize; - if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) { + if (permits + estimateStreamsCount <= pending) { pendingAcquireStart = pool.clock.millis(); - timeoutTask = pool.poolConfig.pendingAcquireTimer().apply(this, acquireTimeout); + if (!acquireTimeout.isZero()) { + timeoutTask = pool.poolConfig.pendingAcquireTimer().apply(this, acquireTimeout); + } } pool.doAcquire(this); } @@ -822,13 +824,15 @@ void fail(Throwable error) { } void stopPendingCountdown(boolean success) { - if (!timeoutTask.isDisposed()) { + if (pendingAcquireStart > 0) { if (success) { pool.poolConfig.metricsRecorder().recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart); } else { pool.poolConfig.metricsRecorder().recordPendingFailureAndLatency(pool.clock.millis() - pendingAcquireStart); } + + pendingAcquireStart = 0; } timeoutTask.dispose(); } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index e8e12517b6..aca6cb55b6 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -1208,34 +1208,51 @@ void nonHttp2ConnectionEmittedOnce() { @Test void recordsPendingCountAndLatencies() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); TestPoolMetricsRecorder recorder = new TestPoolMetricsRecorder(); PoolBuilder> poolBuilder = PoolBuilder.from(Mono.just(Connection.from(channel))) .metricsRecorder(recorder) .maxPendingAcquireUnbounded() .sizeBetween(0, 1); - Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); + Http2AllocationStrategy strategy = Http2AllocationStrategy.builder() + .maxConnections(1) + .maxConcurrentStreams(2) + .build(); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, strategy)); try { + List> acquired = new ArrayList<>(); //success, acquisition happens immediately - PooledRef pooledRef = http2Pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1)); - assertThat(pooledRef).isNotNull(); + http2Pool.acquire(Duration.ofMillis(1)).subscribe(acquired::add); + // success, acquisition happens immediately without timeout + http2Pool.acquire().subscribe(acquired::add); + + channel.runPendingTasks(); + + assertThat(acquired).hasSize(2); //success, acquisition happens after pending some time http2Pool.acquire(Duration.ofMillis(50)).subscribe(); + // success, acquisition happens after pending some time without timeout + http2Pool.acquire().subscribe(); + //error, timed out http2Pool.acquire(Duration.ofMillis(1)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - pooledRef.release().block(Duration.ofSeconds(1)); + acquired.get(0).release().block(Duration.ofSeconds(1)); + acquired.get(1).release().block(Duration.ofSeconds(1)); + + channel.runPendingTasks(); assertThat(recorder.pendingSuccessCounter) .as("pending success") - .isEqualTo(1); + .isEqualTo(2); assertThat(recorder.pendingErrorCounter) .as("pending errors")