Skip to content

Commit

Permalink
Allow to configure connection pool aquire timers (#2175)
Browse files Browse the repository at this point in the history
Added an API to provide a hook function for changing the default timer implementation for the pending acquire timers.
  • Loading branch information
pderop authored Jun 9, 2022
1 parent 6530f76 commit d5d0e97
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.resolver.AddressResolverGroup;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.ReactorNetty;
Expand All @@ -32,6 +33,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -449,6 +451,7 @@ class ConnectionPoolSpec<SPEC extends ConnectionPoolSpec<SPEC>> implements Suppl
boolean metricsEnabled;
String leasingStrategy = DEFAULT_POOL_LEASING_STRATEGY;
Supplier<? extends ConnectionProvider.MeterRegistrar> registrar;
BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
AllocationStrategy<?> allocationStrategy;

/**
Expand All @@ -473,6 +476,7 @@ private ConnectionPoolSpec() {
this.metricsEnabled = copy.metricsEnabled;
this.leasingStrategy = copy.leasingStrategy;
this.registrar = copy.registrar;
this.pendingAcquireTimer = copy.pendingAcquireTimer;
this.allocationStrategy = copy.allocationStrategy;
}

Expand Down Expand Up @@ -648,6 +652,43 @@ public final SPEC evictInBackground(Duration evictionInterval) {
return get();
}

/**
* Set the option to use for configuring {@link ConnectionProvider} pending acquire timer.
* The pending acquire timer must be specified as a function which is used to schedule a pending acquire timeout
* when there is no idle connection and no new connection can be created currently.
* The function takes as argument a {@link Duration} which is the one configured by {@link #pendingAcquireTimeout(Duration)}.
* <p>
* Use this function if you want to specify your own implementation for scheduling pending acquire timers.
*
* <p> Default to {@link Schedulers#parallel()}.
*
* <p>Examples using Netty HashedWheelTimer implementation:</p>
* <pre>
* {@code
* final static HashedWheelTimer wheel = new HashedWheelTimer(10, TimeUnit.MILLISECONDS, 1024);
*
* HttpClient client = HttpClient.create(
* ConnectionProvider.builder("myprovider")
* .pendingAcquireTimeout(Duration.ofMillis(10000))
* .pendingAcquireTimer((r, d) -> {
* Timeout t = wheel.newTimeout(timeout -> r.run(), d.toMillis(), TimeUnit.MILLISECONDS);
* return () -> t.cancel();
* })
* .build());
* }
* </pre>
*
* @param pendingAcquireTimer the function to apply when scheduling pending acquire timers
* @return {@literal this}
* @throws NullPointerException if pendingAcquireTimer is null
* @since 1.0.20
* @see #pendingAcquireTimeout(Duration)
*/
public final SPEC pendingAcquireTimer(BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer) {
this.pendingAcquireTimer = Objects.requireNonNull(pendingAcquireTimer, "pendingAcquireTimer");
return get();
}

/**
* Limits in how many connections can be allocated and managed by the pool are driven by the
* provided {@link AllocationStrategy}. This is a customization escape hatch that replaces the last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -369,6 +370,7 @@ protected static final class PoolFactory<T extends Connection> {
final Supplier<? extends MeterRegistrar> registrar;
final Clock clock;
final Duration disposeTimeout;
final BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
final AllocationStrategy<?> allocationStrategy;

PoolFactory(ConnectionPoolSpec<?> conf, Duration disposeTimeout) {
Expand All @@ -389,6 +391,7 @@ protected static final class PoolFactory<T extends Connection> {
this.registrar = conf.registrar;
this.clock = clock;
this.disposeTimeout = disposeTimeout;
this.pendingAcquireTimer = conf.pendingAcquireTimer;
this.allocationStrategy = conf.allocationStrategy;
}

Expand Down Expand Up @@ -447,6 +450,10 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
}
}

if (pendingAcquireTimer != null) {
poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer);
}

if (clock != null) {
poolBuilder = poolBuilder.clock(clock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package reactor.netty.resources;

import org.junit.jupiter.api.Test;
import reactor.core.Disposable;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -31,6 +33,7 @@ class ConnectionProviderTest {
static final TestAllocationStrategy TEST_ALLOCATION_STRATEGY = new TestAllocationStrategy();
static final String TEST_STRING = "";
static final Supplier<ConnectionProvider.MeterRegistrar> TEST_SUPPLIER = () -> (a, b, c, d) -> {};
static final BiFunction<Runnable, Duration, Disposable> TEST_BI_FUNCTION = (r, duration) -> () -> {};

@Test
void testBuilderCopyConstructor() throws IllegalAccessException {
Expand Down Expand Up @@ -74,6 +77,9 @@ else if (boolean.class == clazz) {
else if (int.class == clazz) {
field.setInt(builder, 1);
}
else if (BiFunction.class == clazz) {
field.set(builder, TEST_BI_FUNCTION);
}
else {
throw new IllegalArgumentException("Unknown field type " + clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.netty.NettyPipeline;
Expand Down Expand Up @@ -759,7 +758,7 @@ public void request(long n) {
int permits = pool.poolConfig.allocationStrategy().estimatePermitCount();
int pending = pool.pendingSize;
if (!acquireTimeout.isZero() && permits + estimateStreamsCount <= pending) {
timeoutTask = Schedulers.parallel().schedule(this, acquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
timeoutTask = pool.poolConfig.pendingAcquireTimer().apply(this, acquireTimeout);
}
pool.doAcquire(this);
}
Expand Down Expand Up @@ -1042,4 +1041,4 @@ long lifeTime() {
return pool.clock.millis() - creationTimestamp;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
import reactor.netty.BaseHttpTest;
import reactor.netty.ByteBufFlux;
import reactor.netty.ByteBufMono;
Expand All @@ -43,6 +45,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -159,14 +162,35 @@ private void doTestIssue1071(int length, String expectedResponse, int expectedCo

@Test
void testMaxActiveStreams_1_CustomPool() throws Exception {
ConnectionProvider provider =
doTestMaxActiveStreams_1_CustomPool(null);
}

@Test
void testMaxActiveStreams_1_CustomPool_Custom_AcquireTimer() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
BiFunction<Runnable, Duration, Disposable> timer = (r, d) -> {
Runnable wrapped = () -> {
r.run();
latch.countDown();
};
return Schedulers.single().schedule(wrapped, d.toNanos(), TimeUnit.NANOSECONDS);
};
doTestMaxActiveStreams_1_CustomPool(timer);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}

void doTestMaxActiveStreams_1_CustomPool(BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer) throws Exception {
ConnectionProvider.Builder builder =
ConnectionProvider.builder("testMaxActiveStreams_1_CustomPool")
.maxConnections(1)
.pendingAcquireTimeout(Duration.ofMillis(10)) // the default is 45s
.build();
.maxConnections(1)
.pendingAcquireTimeout(Duration.ofMillis(10)); // the default is 45s
if (pendingAcquireTimer != null) {
builder = builder.pendingAcquireTimer(pendingAcquireTimer);
}
ConnectionProvider provider = builder.build();
doTestMaxActiveStreams(HttpClient.create(provider), 1, 1, 1);
provider.disposeLater()
.block();
.block();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
Expand All @@ -38,6 +40,7 @@
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -832,6 +835,24 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception {

@Test
void maxLifeTimeMaxConnectionsReached() throws Exception {
doMaxLifeTimeMaxConnectionsReached(null);
}

@Test
void maxLifeTimeMaxConnectionsReachedWithCustomTimer() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
BiFunction<Runnable, Duration, Disposable> timer = (r, d) -> {
Runnable wrapped = () -> {
r.run();
latch.countDown();
};
return Schedulers.single().schedule(wrapped, d.toNanos(), TimeUnit.NANOSECONDS);
};
doMaxLifeTimeMaxConnectionsReached(timer);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}

private void doMaxLifeTimeMaxConnectionsReached(BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer) throws Exception {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
Channel channel = new EmbeddedChannel(
Expand All @@ -842,6 +863,9 @@ void maxLifeTimeMaxConnectionsReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
if (pendingAcquireTimer != null) {
poolBuilder = poolBuilder.pendingAcquireTimer(pendingAcquireTimer);
}
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null, -1, 10));

Connection connection = null;
Expand Down

0 comments on commit d5d0e97

Please sign in to comment.