From c390d531d0928cf4641cccb7526ad2c382b104eb Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 11 Jul 2023 14:45:31 +0200 Subject: [PATCH] Use single-threaded Scheduler to subscribe to allocator. Avoid co-location of event loops within the pool by default for drivers that use Reactor Netty with activated co-location. [resolves #190] Signed-off-by: Mark Paluch --- .../java/io/r2dbc/pool/ConnectionPool.java | 106 +++++++++--------- .../pool/ConnectionPoolConfiguration.java | 83 +++++++++----- 2 files changed, 109 insertions(+), 80 deletions(-) diff --git a/src/main/java/io/r2dbc/pool/ConnectionPool.java b/src/main/java/io/r2dbc/pool/ConnectionPool.java index 4af9265..3d3612e 100644 --- a/src/main/java/io/r2dbc/pool/ConnectionPool.java +++ b/src/main/java/io/r2dbc/pool/ConnectionPool.java @@ -27,6 +27,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.pool.InstrumentedPool; import reactor.pool.PoolBuilder; import reactor.pool.PoolConfig; @@ -111,39 +112,39 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { Mono create = Mono.defer(() -> { Mono mono = this.connectionPool.acquire() - .flatMap(ref -> { + .flatMap(ref -> { - if (logger.isDebugEnabled()) { - logger.debug("Obtaining new connection from the pool"); - } + if (logger.isDebugEnabled()) { + logger.debug("Obtaining new connection from the pool"); + } - Mono prepare = null; - if (ref.poolable() instanceof Lifecycle) { - prepare = Mono.from(((Lifecycle) ref.poolable()).postAllocate()); - } + Mono prepare = null; + if (ref.poolable() instanceof Lifecycle) { + prepare = Mono.from(((Lifecycle) ref.poolable()).postAllocate()); + } - if (configuration.getPostAllocate() != null) { + if (configuration.getPostAllocate() != null) { - Mono postAllocate = Mono.defer(() -> Mono.from(configuration.getPostAllocate().apply(ref.poolable()))); - prepare = prepare == null ? postAllocate : prepare.then(postAllocate); - } + Mono postAllocate = Mono.defer(() -> Mono.from(configuration.getPostAllocate().apply(ref.poolable()))); + prepare = prepare == null ? postAllocate : prepare.then(postAllocate); + } - PooledConnection connection = new PooledConnection(ref, this.preRelease); - Mono conn; - if (prepare == null) { - conn = getValidConnection(allocateValidation, connection); - } else { - conn = prepare.then(getValidConnection(allocateValidation, connection)); - } + PooledConnection connection = new PooledConnection(ref, this.preRelease); + Mono conn; + if (prepare == null) { + conn = getValidConnection(allocateValidation, connection); + } else { + conn = prepare.then(getValidConnection(allocateValidation, connection)); + } - conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable))); + conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable))); - return Operators.discardOnCancel(conn, () -> { - ref.release().subscribe(); - return false; - }); - }) - .name(acqName); + return Operators.discardOnCancel(conn, () -> { + ref.release().subscribe(); + return false; + }); + }) + .name(acqName); if (!this.maxAcquireTime.isNegative()) { @@ -161,7 +162,6 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { disposeConnection.accept(dropped); it.accept(dropped); }; - }).orElse(disposeConnection); return context.put(HOOK_ON_DROPPED, onNextDropped); @@ -170,7 +170,6 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { return mono; }); this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create; - } private Mono getValidConnection(Function> allocateValidation, Connection connection) { @@ -227,6 +226,11 @@ private InstrumentedPool createConnectionPool(ConnectionPoolConfigur // set timeout for create connection Mono allocator = Mono.from(factory.create()).name("Connection Allocation"); + + if (configuration.getAllocatorSubscribeOn() == null) { + allocator = allocator.subscribeOn(Schedulers.single()); + } + if (!maxCreateConnectionTime.isNegative()) { Consumer disposeConnection = dropped -> { @@ -243,7 +247,6 @@ private InstrumentedPool createConnectionPool(ConnectionPoolConfigur disposeConnection.accept(dropped); it.accept(dropped); }; - }).orElse(disposeConnection); return context.put(HOOK_ON_DROPPED, onNextDropped); @@ -267,11 +270,11 @@ private InstrumentedPool createConnectionPool(ConnectionPoolConfigur int cpuCount = Runtime.getRuntime().availableProcessors(); PoolBuilder> builder = PoolBuilder.from(allocator) - .clock(configuration.getClock()) - .metricsRecorder(metricsRecorder) - .evictionPredicate(evictionPredicate) - .destroyHandler(Connection::close) - .idleResourceReuseMruOrder(); // MRU to support eviction of idle + .clock(configuration.getClock()) + .metricsRecorder(metricsRecorder) + .evictionPredicate(evictionPredicate) + .destroyHandler(Connection::close) + .idleResourceReuseMruOrder(); // MRU to support eviction of idle if (maxSize == -1 || initialSize > 0) { builder.sizeBetween(Math.max(configuration.getMinIdle(), initialSize), maxSize == -1 ? Integer.MAX_VALUE : maxSize); @@ -325,25 +328,25 @@ public Mono disposeLater() { List errors = new ArrayList<>(); return Flux.fromIterable(this.destroyHandlers) - .flatMap(Mono::fromRunnable) - .concatWith(this.connectionPool.disposeLater()) - .onErrorContinue((throwable, o) -> { - errors.add(throwable); - }) - .then(Mono.defer(() -> { - if (errors.isEmpty()) { - return Mono.empty(); - } + .flatMap(Mono::fromRunnable) + .concatWith(this.connectionPool.disposeLater()) + .onErrorContinue((throwable, o) -> { + errors.add(throwable); + }) + .then(Mono.defer(() -> { + if (errors.isEmpty()) { + return Mono.empty(); + } - Throwable rootError = errors.get(0); - if (errors.size() == 1) { - return Mono.error(rootError); - } + Throwable rootError = errors.get(0); + if (errors.size() == 1) { + return Mono.error(rootError); + } - errors.subList(1, errors.size()).forEach(rootError::addSuppressed); + errors.subList(1, errors.size()).forEach(rootError::addSuppressed); - return Mono.error(rootError); - })); + return Mono.error(rootError); + })); } @Override @@ -451,7 +454,6 @@ public int getMaxAllocatedSize() { public int getMaxPendingAcquireSize() { return this.delegate.getMaxPendingAcquireSize(); } - } private class ConnectionPoolMXBeanImpl implements ConnectionPoolMXBean { @@ -491,7 +493,5 @@ public int getMaxAllocatedSize() { public int getMaxPendingAcquireSize() { return this.poolMetrics.getMaxPendingAcquireSize(); } - } - } diff --git a/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java b/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java index 2df1c32..e42faec 100644 --- a/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java +++ b/src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java @@ -22,6 +22,8 @@ import io.r2dbc.spi.ValidationDepth; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.pool.PoolBuilder; import reactor.pool.PoolConfig; import reactor.pool.PoolMetricsRecorder; @@ -50,6 +52,9 @@ public final class ConnectionPoolConfiguration { */ public static final Duration NO_TIMEOUT = Duration.ofMillis(-1); + @Nullable + private final Scheduler allocatorSubscribeOn; + private final int acquireRetry; private final Duration backgroundEvictionInterval; @@ -94,12 +99,13 @@ public final class ConnectionPoolConfiguration { @Nullable private final String validationQuery; - private ConnectionPoolConfiguration(int acquireRetry, @Nullable Duration backgroundEvictionInterval, ConnectionFactory connectionFactory, Clock clock, Consumer>> customizer, int initialSize, int maxSize, int minIdle, Duration maxAcquireTime, Duration maxCreateConnectionTime, Duration maxIdleTime, + private ConnectionPoolConfiguration(@Nullable Scheduler allocatorSubscribeOn, int acquireRetry, @Nullable Duration backgroundEvictionInterval, ConnectionFactory connectionFactory, Clock clock, Consumer>> customizer, int initialSize, int maxSize, int minIdle, Duration maxAcquireTime, Duration maxCreateConnectionTime, Duration maxIdleTime, Duration maxLifeTime, Duration maxValidationTime, PoolMetricsRecorder metricsRecorder, @Nullable String name, @Nullable Function> postAllocate, @Nullable Function> preRelease, boolean registerJmx, ValidationDepth validationDepth, @Nullable String validationQuery) { + this.allocatorSubscribeOn = allocatorSubscribeOn; this.acquireRetry = acquireRetry; this.connectionFactory = Assert.requireNonNull(connectionFactory, "ConnectionFactory must not be null"); this.clock = clock; @@ -142,6 +148,11 @@ public static Builder builder() { return new Builder(); } + @Nullable + Scheduler getAllocatorSubscribeOn() { + return this.allocatorSubscribeOn; + } + int getAcquireRetry() { return this.acquireRetry; } @@ -235,6 +246,8 @@ public static final class Builder { private static final int DEFAULT_SIZE = 10; + private @Nullable Scheduler allocatorSubscribeOn; + private int acquireRetry = 1; private Duration backgroundEvictionInterval = NO_TIMEOUT; @@ -283,6 +296,21 @@ public static final class Builder { private Builder() { } + /** + * Configure {@link Scheduler} to use for allocation. Defaults to {@link Schedulers#single()}. + * Configuring the scheduler can be relevant to coordinate thread co-location. + * + * @param scheduler the scheduler to use. + * @return this {@link Builder} + * @throws IllegalArgumentException if {@code scheduler} is null. + * @see Schedulers#single() + * @since 1.0.1 + */ + public Builder allocatorSubscribeOn(Scheduler scheduler) { + this.allocatorSubscribeOn = Assert.requireNonNull(scheduler, "scheduler must not be null"); + return this; + } + /** * Configure the number of acquire retries if the first acquiry attempt fails. * @@ -564,11 +592,11 @@ public Builder validationQuery(String validationQuery) { public ConnectionPoolConfiguration build() { applyDefaults(); validate(); - return new ConnectionPoolConfiguration(this.acquireRetry, this.backgroundEvictionInterval, this.connectionFactory, this.clock, this.customizer, this.initialSize, this.maxSize, - this.minIdle, - this.maxAcquireTime, this.maxCreateConnectionTime, this.maxIdleTime, this.maxLifeTime, this.maxValidationTime, this.metricsRecorder, this.name, this.postAllocate, this.preRelease, - this.registerJmx, - this.validationDepth, this.validationQuery + return new ConnectionPoolConfiguration(this.allocatorSubscribeOn, this.acquireRetry, this.backgroundEvictionInterval, this.connectionFactory, + this.clock, this.customizer, this.initialSize, this.maxSize, this.minIdle, + this.maxAcquireTime, this.maxCreateConnectionTime, this.maxIdleTime, this.maxLifeTime, this.maxValidationTime, + this.metricsRecorder, this.name, this.postAllocate, this.preRelease, this.registerJmx, + this.validationDepth, this.validationQuery ); } @@ -605,26 +633,27 @@ private void validate() { @Override public String toString() { return "Builder{" + - "acquireRetry='" + this.acquireRetry + '\'' + - ", backgroundEvictionInterval='" + this.backgroundEvictionInterval + '\'' + - ", connectionFactory='" + this.connectionFactory + '\'' + - ", clock='" + this.clock + '\'' + - ", initialSize='" + this.initialSize + '\'' + - ", minIdle='" + this.minIdle + '\'' + - ", maxSize='" + this.maxSize + '\'' + - ", maxAcquireTime='" + this.maxAcquireTime + '\'' + - ", maxCreateConnectionTime='" + this.maxCreateConnectionTime + '\'' + - ", maxIdleTime='" + this.maxIdleTime + '\'' + - ", maxLifeTime='" + this.maxLifeTime + '\'' + - ", maxValidationTime='" + this.maxValidationTime + '\'' + - ", metricsRecorder='" + this.metricsRecorder + '\'' + - ", name='" + this.name + '\'' + - ", postAllocate='" + this.postAllocate + '\'' + - ", preRelease='" + this.preRelease + '\'' + - ", registerJmx='" + this.registerJmx + '\'' + - ", validationDepth='" + this.validationDepth + '\'' + - ", validationQuery='" + this.validationQuery + '\'' + - '}'; + "allocatorSubscribeOn='" + this.allocatorSubscribeOn + '\'' + + ", acquireRetry='" + this.acquireRetry + '\'' + + ", backgroundEvictionInterval='" + this.backgroundEvictionInterval + '\'' + + ", connectionFactory='" + this.connectionFactory + '\'' + + ", clock='" + this.clock + '\'' + + ", initialSize='" + this.initialSize + '\'' + + ", minIdle='" + this.minIdle + '\'' + + ", maxSize='" + this.maxSize + '\'' + + ", maxAcquireTime='" + this.maxAcquireTime + '\'' + + ", maxCreateConnectionTime='" + this.maxCreateConnectionTime + '\'' + + ", maxIdleTime='" + this.maxIdleTime + '\'' + + ", maxLifeTime='" + this.maxLifeTime + '\'' + + ", maxValidationTime='" + this.maxValidationTime + '\'' + + ", metricsRecorder='" + this.metricsRecorder + '\'' + + ", name='" + this.name + '\'' + + ", postAllocate='" + this.postAllocate + '\'' + + ", preRelease='" + this.preRelease + '\'' + + ", registerJmx='" + this.registerJmx + '\'' + + ", validationDepth='" + this.validationDepth + '\'' + + ", validationQuery='" + this.validationQuery + '\'' + + '}'; } private static Duration applyDefault(@Nullable Duration duration) {