Skip to content

Commit

Permalink
Use single-threaded Scheduler to subscribe to allocator.
Browse files Browse the repository at this point in the history
Avoid co-location of event loops within the pool by default for drivers that use Reactor Netty with activated co-location.

[resolves #190]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Jul 11, 2023
1 parent 2139771 commit c390d53
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 80 deletions.
106 changes: 53 additions & 53 deletions src/main/java/io/r2dbc/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.pool.InstrumentedPool;
import reactor.pool.PoolBuilder;
import reactor.pool.PoolConfig;
Expand Down Expand Up @@ -111,39 +112,39 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
Mono<Connection> create = Mono.defer(() -> {

Mono<Connection> mono = this.connectionPool.acquire()
.flatMap(ref -> {
.flatMap(ref -> {

if (logger.isDebugEnabled()) {
logger.debug("Obtaining new connection from the pool");
}
if (logger.isDebugEnabled()) {
logger.debug("Obtaining new connection from the pool");
}

Mono<Void> prepare = null;
if (ref.poolable() instanceof Lifecycle) {
prepare = Mono.from(((Lifecycle) ref.poolable()).postAllocate());
}
Mono<Void> prepare = null;
if (ref.poolable() instanceof Lifecycle) {
prepare = Mono.from(((Lifecycle) ref.poolable()).postAllocate());
}

if (configuration.getPostAllocate() != null) {
if (configuration.getPostAllocate() != null) {

Mono<Void> postAllocate = Mono.defer(() -> Mono.from(configuration.getPostAllocate().apply(ref.poolable())));
prepare = prepare == null ? postAllocate : prepare.then(postAllocate);
}
Mono<Void> postAllocate = Mono.defer(() -> Mono.from(configuration.getPostAllocate().apply(ref.poolable())));
prepare = prepare == null ? postAllocate : prepare.then(postAllocate);
}

PooledConnection connection = new PooledConnection(ref, this.preRelease);
Mono<Connection> conn;
if (prepare == null) {
conn = getValidConnection(allocateValidation, connection);
} else {
conn = prepare.then(getValidConnection(allocateValidation, connection));
}
PooledConnection connection = new PooledConnection(ref, this.preRelease);
Mono<Connection> conn;
if (prepare == null) {
conn = getValidConnection(allocateValidation, connection);
} else {
conn = prepare.then(getValidConnection(allocateValidation, connection));
}

conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable)));
conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable)));

return Operators.discardOnCancel(conn, () -> {
ref.release().subscribe();
return false;
});
})
.name(acqName);
return Operators.discardOnCancel(conn, () -> {
ref.release().subscribe();
return false;
});
})
.name(acqName);

if (!this.maxAcquireTime.isNegative()) {

Expand All @@ -161,7 +162,6 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
disposeConnection.accept(dropped);
it.accept(dropped);
};

}).orElse(disposeConnection);

return context.put(HOOK_ON_DROPPED, onNextDropped);
Expand All @@ -170,7 +170,6 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
return mono;
});
this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create;

}

private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, Connection connection) {
Expand Down Expand Up @@ -227,6 +226,11 @@ private InstrumentedPool<Connection> createConnectionPool(ConnectionPoolConfigur

// set timeout for create connection
Mono<Connection> allocator = Mono.<Connection>from(factory.create()).name("Connection Allocation");

if (configuration.getAllocatorSubscribeOn() == null) {
allocator = allocator.subscribeOn(Schedulers.single());
}

if (!maxCreateConnectionTime.isNegative()) {

Consumer<Object> disposeConnection = dropped -> {
Expand All @@ -243,7 +247,6 @@ private InstrumentedPool<Connection> createConnectionPool(ConnectionPoolConfigur
disposeConnection.accept(dropped);
it.accept(dropped);
};

}).orElse(disposeConnection);

return context.put(HOOK_ON_DROPPED, onNextDropped);
Expand All @@ -267,11 +270,11 @@ private InstrumentedPool<Connection> createConnectionPool(ConnectionPoolConfigur

int cpuCount = Runtime.getRuntime().availableProcessors();
PoolBuilder<Connection, PoolConfig<Connection>> builder = PoolBuilder.from(allocator)
.clock(configuration.getClock())
.metricsRecorder(metricsRecorder)
.evictionPredicate(evictionPredicate)
.destroyHandler(Connection::close)
.idleResourceReuseMruOrder(); // MRU to support eviction of idle
.clock(configuration.getClock())
.metricsRecorder(metricsRecorder)
.evictionPredicate(evictionPredicate)
.destroyHandler(Connection::close)
.idleResourceReuseMruOrder(); // MRU to support eviction of idle

if (maxSize == -1 || initialSize > 0) {
builder.sizeBetween(Math.max(configuration.getMinIdle(), initialSize), maxSize == -1 ? Integer.MAX_VALUE : maxSize);
Expand Down Expand Up @@ -325,25 +328,25 @@ public Mono<Void> disposeLater() {

List<Throwable> errors = new ArrayList<>();
return Flux.fromIterable(this.destroyHandlers)
.flatMap(Mono::fromRunnable)
.concatWith(this.connectionPool.disposeLater())
.onErrorContinue((throwable, o) -> {
errors.add(throwable);
})
.then(Mono.defer(() -> {
if (errors.isEmpty()) {
return Mono.empty();
}
.flatMap(Mono::fromRunnable)
.concatWith(this.connectionPool.disposeLater())
.onErrorContinue((throwable, o) -> {
errors.add(throwable);
})
.then(Mono.defer(() -> {
if (errors.isEmpty()) {
return Mono.empty();
}

Throwable rootError = errors.get(0);
if (errors.size() == 1) {
return Mono.error(rootError);
}
Throwable rootError = errors.get(0);
if (errors.size() == 1) {
return Mono.error(rootError);
}

errors.subList(1, errors.size()).forEach(rootError::addSuppressed);
errors.subList(1, errors.size()).forEach(rootError::addSuppressed);

return Mono.error(rootError);
}));
return Mono.error(rootError);
}));
}

@Override
Expand Down Expand Up @@ -451,7 +454,6 @@ public int getMaxAllocatedSize() {
public int getMaxPendingAcquireSize() {
return this.delegate.getMaxPendingAcquireSize();
}

}

private class ConnectionPoolMXBeanImpl implements ConnectionPoolMXBean {
Expand Down Expand Up @@ -491,7 +493,5 @@ public int getMaxAllocatedSize() {
public int getMaxPendingAcquireSize() {
return this.poolMetrics.getMaxPendingAcquireSize();
}

}

}
83 changes: 56 additions & 27 deletions src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.r2dbc.spi.ValidationDepth;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.pool.PoolBuilder;
import reactor.pool.PoolConfig;
import reactor.pool.PoolMetricsRecorder;
Expand Down Expand Up @@ -50,6 +52,9 @@ public final class ConnectionPoolConfiguration {
*/
public static final Duration NO_TIMEOUT = Duration.ofMillis(-1);

@Nullable
private final Scheduler allocatorSubscribeOn;

private final int acquireRetry;

private final Duration backgroundEvictionInterval;
Expand Down Expand Up @@ -94,12 +99,13 @@ public final class ConnectionPoolConfiguration {
@Nullable
private final String validationQuery;

private ConnectionPoolConfiguration(int acquireRetry, @Nullable Duration backgroundEvictionInterval, ConnectionFactory connectionFactory, Clock clock, Consumer<PoolBuilder<Connection, ?
extends PoolConfig<? extends Connection>>> customizer, int initialSize, int maxSize, int minIdle, Duration maxAcquireTime, Duration maxCreateConnectionTime, Duration maxIdleTime,
private ConnectionPoolConfiguration(@Nullable Scheduler allocatorSubscribeOn, int acquireRetry, @Nullable Duration backgroundEvictionInterval, ConnectionFactory connectionFactory, Clock clock, Consumer<PoolBuilder<Connection, ?
extends PoolConfig<? extends Connection>>> customizer, int initialSize, int maxSize, int minIdle, Duration maxAcquireTime, Duration maxCreateConnectionTime, Duration maxIdleTime,
Duration maxLifeTime, Duration maxValidationTime, PoolMetricsRecorder metricsRecorder, @Nullable String name,
@Nullable Function<? super Connection, ? extends Publisher<Void>> postAllocate,
@Nullable Function<? super Connection, ? extends Publisher<Void>> preRelease, boolean registerJmx, ValidationDepth validationDepth,
@Nullable String validationQuery) {
this.allocatorSubscribeOn = allocatorSubscribeOn;
this.acquireRetry = acquireRetry;
this.connectionFactory = Assert.requireNonNull(connectionFactory, "ConnectionFactory must not be null");
this.clock = clock;
Expand Down Expand Up @@ -142,6 +148,11 @@ public static Builder builder() {
return new Builder();
}

@Nullable
Scheduler getAllocatorSubscribeOn() {
return this.allocatorSubscribeOn;
}

int getAcquireRetry() {
return this.acquireRetry;
}
Expand Down Expand Up @@ -235,6 +246,8 @@ public static final class Builder {

private static final int DEFAULT_SIZE = 10;

private @Nullable Scheduler allocatorSubscribeOn;

private int acquireRetry = 1;

private Duration backgroundEvictionInterval = NO_TIMEOUT;
Expand Down Expand Up @@ -283,6 +296,21 @@ public static final class Builder {
private Builder() {
}

/**
* Configure {@link Scheduler} to use for allocation. Defaults to {@link Schedulers#single()}.
* Configuring the scheduler can be relevant to coordinate thread co-location.
*
* @param scheduler the scheduler to use.
* @return this {@link Builder}
* @throws IllegalArgumentException if {@code scheduler} is null.
* @see Schedulers#single()
* @since 1.0.1
*/
public Builder allocatorSubscribeOn(Scheduler scheduler) {
this.allocatorSubscribeOn = Assert.requireNonNull(scheduler, "scheduler must not be null");
return this;
}

/**
* Configure the number of acquire retries if the first acquiry attempt fails.
*
Expand Down Expand Up @@ -564,11 +592,11 @@ public Builder validationQuery(String validationQuery) {
public ConnectionPoolConfiguration build() {
applyDefaults();
validate();
return new ConnectionPoolConfiguration(this.acquireRetry, this.backgroundEvictionInterval, this.connectionFactory, this.clock, this.customizer, this.initialSize, this.maxSize,
this.minIdle,
this.maxAcquireTime, this.maxCreateConnectionTime, this.maxIdleTime, this.maxLifeTime, this.maxValidationTime, this.metricsRecorder, this.name, this.postAllocate, this.preRelease,
this.registerJmx,
this.validationDepth, this.validationQuery
return new ConnectionPoolConfiguration(this.allocatorSubscribeOn, this.acquireRetry, this.backgroundEvictionInterval, this.connectionFactory,
this.clock, this.customizer, this.initialSize, this.maxSize, this.minIdle,
this.maxAcquireTime, this.maxCreateConnectionTime, this.maxIdleTime, this.maxLifeTime, this.maxValidationTime,
this.metricsRecorder, this.name, this.postAllocate, this.preRelease, this.registerJmx,
this.validationDepth, this.validationQuery
);
}

Expand Down Expand Up @@ -605,26 +633,27 @@ private void validate() {
@Override
public String toString() {
return "Builder{" +
"acquireRetry='" + this.acquireRetry + '\'' +
", backgroundEvictionInterval='" + this.backgroundEvictionInterval + '\'' +
", connectionFactory='" + this.connectionFactory + '\'' +
", clock='" + this.clock + '\'' +
", initialSize='" + this.initialSize + '\'' +
", minIdle='" + this.minIdle + '\'' +
", maxSize='" + this.maxSize + '\'' +
", maxAcquireTime='" + this.maxAcquireTime + '\'' +
", maxCreateConnectionTime='" + this.maxCreateConnectionTime + '\'' +
", maxIdleTime='" + this.maxIdleTime + '\'' +
", maxLifeTime='" + this.maxLifeTime + '\'' +
", maxValidationTime='" + this.maxValidationTime + '\'' +
", metricsRecorder='" + this.metricsRecorder + '\'' +
", name='" + this.name + '\'' +
", postAllocate='" + this.postAllocate + '\'' +
", preRelease='" + this.preRelease + '\'' +
", registerJmx='" + this.registerJmx + '\'' +
", validationDepth='" + this.validationDepth + '\'' +
", validationQuery='" + this.validationQuery + '\'' +
'}';
"allocatorSubscribeOn='" + this.allocatorSubscribeOn + '\'' +
", acquireRetry='" + this.acquireRetry + '\'' +
", backgroundEvictionInterval='" + this.backgroundEvictionInterval + '\'' +
", connectionFactory='" + this.connectionFactory + '\'' +
", clock='" + this.clock + '\'' +
", initialSize='" + this.initialSize + '\'' +
", minIdle='" + this.minIdle + '\'' +
", maxSize='" + this.maxSize + '\'' +
", maxAcquireTime='" + this.maxAcquireTime + '\'' +
", maxCreateConnectionTime='" + this.maxCreateConnectionTime + '\'' +
", maxIdleTime='" + this.maxIdleTime + '\'' +
", maxLifeTime='" + this.maxLifeTime + '\'' +
", maxValidationTime='" + this.maxValidationTime + '\'' +
", metricsRecorder='" + this.metricsRecorder + '\'' +
", name='" + this.name + '\'' +
", postAllocate='" + this.postAllocate + '\'' +
", preRelease='" + this.preRelease + '\'' +
", registerJmx='" + this.registerJmx + '\'' +
", validationDepth='" + this.validationDepth + '\'' +
", validationQuery='" + this.validationQuery + '\'' +
'}';
}

private static Duration applyDefault(@Nullable Duration duration) {
Expand Down

0 comments on commit c390d53

Please sign in to comment.