Skip to content

Commit

Permalink
Add maxValidationTime parameter
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriel Calin <calingabriel89@gmail.com>

[resolves r2dbc#175]
  • Loading branch information
Gabriel Calin committed Aug 17, 2022
1 parent 2f21aac commit 1ef374a
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Mono<Object> resultMono = Mono.usingWhen(pooledConnectionFactory.create(),
| `maxIdleTime` | Maximum idle time of the connection in the pool. Negative values indicate no timeout. Defaults to `30` minutes.<br />This value is used as an interval for background eviction of idle connections unless configuring `backgroundEvictionInterval`.
| `maxAcquireTime` | Maximum time to acquire connection from pool. Negative values indicate no timeout. Defaults to no timeout.
| `maxCreateConnectionTime` | Maximum time to create a new connection. Negative values indicate no timeout. Defaults to no timeout.
| `maxValidationTime` | Maximum time to validate connection from pool. Negative values indicate no timeout. Defaults to no timeout.
| `poolName` | Name of the Connection Pool.
| `postAllocate` | Lifecycle function to prepare a connection after allocating it.
| `preRelease ` | Lifecycle function to prepare/cleanup a connection before releasing it.
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/r2dbc/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class ConnectionPool implements ConnectionFactory, Disposable, Closeable,

private final Duration maxAcquireTime;

private final Duration maxValidationTime;

private final List<Runnable> destroyHandlers = new ArrayList<>();

private final Optional<PoolMetrics> poolMetrics;
Expand All @@ -90,6 +92,7 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
this.connectionPool = createConnectionPool(Assert.requireNonNull(configuration, "ConnectionPoolConfiguration must not be null"));
this.factory = configuration.getConnectionFactory();
this.maxAcquireTime = configuration.getMaxAcquireTime();
this.maxValidationTime = configuration.getMaxValidationTime();
this.poolMetrics = Optional.ofNullable(this.connectionPool.metrics()).map(PoolMetricsWrapper::new);
this.preRelease = configuration.getPreRelease();

Expand Down Expand Up @@ -175,10 +178,10 @@ private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> all

private Function<Connection, Mono<Void>> getValidationFunction(ConnectionPoolConfiguration configuration) {

String timeoutMessage = String.format("Validation timed out after %dms", this.maxAcquireTime.toMillis());
String timeoutMessage = String.format("Validation timed out after %dms", this.maxValidationTime.toMillis());

if (!this.maxAcquireTime.isNegative()) {
return getValidation(configuration).andThen(mono -> mono.timeout(this.maxAcquireTime).onErrorMap(TimeoutException.class, e -> new R2dbcTimeoutException(timeoutMessage, e)));
if (!this.maxValidationTime.isNegative()) {
return getValidation(configuration).andThen(mono -> mono.timeout(this.maxValidationTime).onErrorMap(TimeoutException.class, e -> new R2dbcTimeoutException(timeoutMessage, e)));
}

return getValidation(configuration);
Expand Down
27 changes: 25 additions & 2 deletions src/main/java/io/r2dbc/pool/ConnectionPoolConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,14 @@ public final class ConnectionPoolConfiguration {
@Nullable
private final String validationQuery;

private final Duration maxValidationTime;

private ConnectionPoolConfiguration(int acquireRetry, @Nullable Duration backgroundEvictionInterval, ConnectionFactory connectionFactory, Clock clock, Consumer<PoolBuilder<Connection, ?
extends PoolConfig<? extends Connection>>> customizer, int initialSize, int maxSize, int minIdle, Duration maxIdleTime, Duration maxCreateConnectionTime, Duration maxAcquireTime,
Duration maxLifeTime,
PoolMetricsRecorder metricsRecorder, @Nullable String name, @Nullable Function<? super Connection, ? extends Publisher<Void>> postAllocate,
@Nullable Function<? super Connection, ? extends Publisher<Void>> preRelease, boolean registerJmx, ValidationDepth validationDepth,
@Nullable String validationQuery) {
@Nullable String validationQuery, Duration maxValidationTime) {
this.acquireRetry = acquireRetry;
this.connectionFactory = Assert.requireNonNull(connectionFactory, "ConnectionFactory must not be null");
this.clock = clock;
Expand All @@ -116,6 +118,7 @@ private ConnectionPoolConfiguration(int acquireRetry, @Nullable Duration backgro
this.validationDepth = validationDepth;
this.validationQuery = validationQuery;
this.backgroundEvictionInterval = backgroundEvictionInterval;
this.maxValidationTime = maxValidationTime;
}

/**
Expand Down Expand Up @@ -218,6 +221,11 @@ String getValidationQuery() {
return this.validationQuery;
}

@Nullable
Duration getMaxValidationTime() {
return this.maxValidationTime;
}

/**
* A builder for {@link ConnectionPoolConfiguration} instances.
* <p>
Expand Down Expand Up @@ -270,6 +278,8 @@ public static final class Builder {

private ValidationDepth validationDepth = ValidationDepth.LOCAL;

private Duration maxValidationTime = NO_TIMEOUT; // negative value indicates no-timeout

private Builder() {
}

Expand Down Expand Up @@ -529,6 +539,18 @@ public Builder validationQuery(String validationQuery) {
return this;
}

/**
* Configure {@link Duration timeout} for validating a {@link Connection} from pool. Default is no timeout.
*
* @param maxValidationTime the maximum time to validate connection from pool. {@link Duration#ZERO} indicates that the connection must be immediately validated
* otherwise validation fails. A negative or a {@code null} value results in not applying a timeout.
* @return this {@link Builder}
*/
public Builder maxValidationTime(Duration maxValidationTime) {
this.maxValidationTime = applyDefault(maxValidationTime);
return this;
}

/**
* Returns a configured {@link ConnectionPoolConfiguration}.
*
Expand All @@ -541,7 +563,7 @@ public ConnectionPoolConfiguration build() {
return new ConnectionPoolConfiguration(this.acquireRetry, this.backgroundEvictionInterval, this.connectionFactory, this.clock, this.customizer, this.initialSize, this.maxSize,
this.minIdle,
this.maxIdleTime, this.maxCreateConnectionTime, this.maxAcquireTime, this.maxLifeTime, this.metricsRecorder, this.name, this.postAllocate, this.preRelease, this.registerJmx,
this.validationDepth, this.validationQuery
this.validationDepth, this.validationQuery, this.maxValidationTime
);
}

Expand Down Expand Up @@ -596,6 +618,7 @@ public String toString() {
", registerJmx='" + this.registerJmx + '\'' +
", validationDepth='" + this.validationDepth + '\'' +
", validationQuery='" + this.validationQuery + '\'' +
", maxValidationTime='" + this.maxValidationTime + '\'' +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public class PoolingConnectionFactoryProvider implements ConnectionFactoryProvid
*/
public static final Option<ValidationDepth> VALIDATION_DEPTH = Option.valueOf("validationDepth");

/**
* MaxValidationTime {@link Option}.
*/
public static final Option<Duration> MAX_VALIDATION_TIME = Option.valueOf("maxValidationTime");

private static final String COLON = ":";

/**
Expand Down Expand Up @@ -201,6 +206,7 @@ static ConnectionPoolConfiguration buildConfiguration(ConnectionFactoryOptions c
mapper.from(REGISTER_JMX).as(OptionMapper::toBoolean).to(builder::registerJmx);
mapper.fromExact(VALIDATION_QUERY).to(builder::validationQuery);
mapper.from(VALIDATION_DEPTH).as(validationDepth -> OptionMapper.toEnum(validationDepth, ValidationDepth.class)).to(builder::validationDepth);
mapper.from(MAX_VALIDATION_TIME).as(OptionMapper::toDuration).to(builder::maxValidationTime);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void configuration() {
.backgroundEvictionInterval(Duration.ofMillis(2000))
.maxCreateConnectionTime(Duration.ofMinutes(1))
.maxAcquireTime(Duration.ofMinutes(2))
.maxValidationTime(Duration.ofMinutes(1))
.initialSize(2)
.maxSize(20)
.name("bar")
Expand All @@ -57,6 +58,7 @@ void configuration() {
.hasFieldOrPropertyWithValue("backgroundEvictionInterval", Duration.ofMillis(2000))
.hasFieldOrPropertyWithValue("maxCreateConnectionTime", Duration.ofMinutes(1))
.hasFieldOrPropertyWithValue("maxAcquireTime", Duration.ofMinutes(2))
.hasFieldOrPropertyWithValue("maxValidationTime", Duration.ofMinutes(1))
.hasFieldOrPropertyWithValue("initialSize", 2)
.hasFieldOrPropertyWithValue("maxSize", 20)
.hasFieldOrPropertyWithValue("name", "bar")
Expand All @@ -77,6 +79,7 @@ void configurationDefaults() {
.hasFieldOrPropertyWithValue("backgroundEvictionInterval", Duration.ofMillis(-1))
.hasFieldOrPropertyWithValue("maxCreateConnectionTime", Duration.ofMillis(-1))
.hasFieldOrPropertyWithValue("maxAcquireTime", Duration.ofMillis(-1))
.hasFieldOrPropertyWithValue("maxValidationTime", Duration.ofMillis(-1))
.hasFieldOrPropertyWithValue("initialSize", 10)
.hasFieldOrPropertyWithValue("maxSize", 10)
.hasFieldOrPropertyWithValue("registerJmx", false);
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,37 @@ void shouldNotTimeoutAcquireConnectionWhenPooled() {
verify(connectionFactoryMock).create();
}

@Test
@SuppressWarnings("unchecked")
void shouldTimeoutValidation() {

ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class);
Connection connectionMock = mock(Connection.class);

when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.defer(() ->
Mono.delay(Duration.ofSeconds(1)).thenReturn(connectionMock))
);
when(connectionMock.validate(any())).thenReturn(Mono.defer(() ->
Mono.delay(Duration.ofSeconds(10)).thenReturn(false))
);
when(connectionMock.close()).thenReturn(Mono.empty());

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock)
.acquireRetry(0)
.maxValidationTime(Duration.ofSeconds(5))
.maxAcquireTime(Duration.ofSeconds(15))
.build();

StepVerifier.withVirtualTime(() -> new ConnectionPool(configuration).create())
.expectSubscription()
.thenAwait(Duration.ofSeconds(7))
.expectError(R2dbcTimeoutException.class)
.verify();

verify(connectionFactoryMock).create();
verify(connectionMock).close();
}

@Test
@SuppressWarnings("unchecked")
void shouldReusePooledConnectionAfterTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ void shouldApplyMaxCreateConnectionTime() {
.hasFieldOrPropertyWithValue("maxCreateConnectionTime", Duration.ofMinutes(30));
}

@Test
void shouldApplyMaxValidationTime() {

ConnectionFactoryOptions options =
ConnectionFactoryOptions.parse("r2dbc:pool:mock://host?maxValidationTime=PT30M");

ConnectionPoolConfiguration configuration = PoolingConnectionFactoryProvider.buildConfiguration(options);

assertThat(configuration)
.hasFieldOrPropertyWithValue("maxValidationTime", Duration.ofMinutes(30));
}

@Test
void shouldApplyLifecycleFunctions() {

Expand Down

0 comments on commit 1ef374a

Please sign in to comment.