Skip to content

Commit ec7797b

Browse files
committed
fix: add expiration time for cached reader connections so they are not reused forever
1 parent 6f025d9 commit ec7797b

File tree

4 files changed

+174
-37
lines changed

4 files changed

+174
-37
lines changed

docs/using-the-jdbc-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ The read/write splitting plugin is not loaded by default. To load the plugin, in
88

99
```
1010
final Properties properties = new Properties();
11-
properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover,efm");
11+
properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover2,efm2");
1212
```
1313

1414
If you would like to use the read/write splitting plugin without the failover plugin, make sure you have the `readWriteSplitting` plugin in the `wrapperPlugins` property, and that the failover plugin is not part of it.
@@ -30,7 +30,10 @@ The read/write splitting plugin is not currently supported for non-Aurora cluste
3030
> [!WARNING]\
3131
> If internal connection pools are enabled, database passwords may not be verified with every connection request. The initial connection request for each database instance in the cluster will verify the password, but subsequent requests may return a cached pool connection without re-verifying the password. This behavior is inherent to the nature of connection pools in general and not a bug with the driver. `ConnectionProviderManager.releaseResources` can be called to close all pools and remove all cached pool connections. See [InternalConnectionPoolPasswordWarning.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/InternalConnectionPoolPasswordWarning.java) for more details.
3232
33-
Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. Future calls to `setReadOnly `on the same `Connection` object will not require opening a new physical connection. However, calling `setReadOnly(true)` for the first time on a new `Connection` object will require the plugin to establish another new physical connection to a reader. If your application frequently calls `setReadOnly`, you can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the read/write plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects.
33+
Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. By default, this cached connection will never expire, meaning all subsequent `setReadOnly(true)` calls on the same `Connection` object will keep reusing the same reader connection.
34+
If your application frequently calls `setReadOnly`, this may have a performance impact. There are two ways to improve performance:
35+
1. You can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the Read/Write Splitting plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects.
36+
2. You can also use the [`cachedReaderKeepAliveTimeoutMs` connection parameter](#reader-keep-alive-timeout). This sets an expiration time on the reader connection. When `setReadOnly(true)` is called and the reader connection has expired, the plugin will create a new reader connection using the specified [reader selection strategy](#reader-selection).
3437

3538
> [!NOTE]\
3639
> Initial connections to a cluster URL will not be pooled. The driver does not pool cluster URLs because it can be problematic to pool a URL that resolves to different instances over time. The main benefit of internal connection pools is when setReadOnly is called. When setReadOnly is called (regardless of the initial connection URL), an internal pool will be created for the writer/reader that the plugin switches to and connections for that instance can be reused in the future.
@@ -87,6 +90,16 @@ To indicate which selection strategy to use, the `readerHostSelectorStrategy` co
8790
props.setProperty(ReadWriteSplittingPlugin.READER_HOST_SELECTOR_STRATEGY.name, "leastConnections");
8891
```
8992

93+
## Reader keep-alive timeout
94+
If no connection pool is used, reader connections created by calls to `setReadOnly(true)` will be cached for the entire lifetime of the `Connection` object. This may have a negative performance impact if your application makes frequent calls to `setReadOnly(true)`, as all read traffic is directed to a single reader instance.
95+
To improve performance, you can specify a timeout for the cached reader connection using `cachedReaderKeepAliveTimeoutMs`. Once the reader has expired, the next call to `setReadOnly(true)` will create a new reader connection determined by the reader host selection strategy.
96+
97+
```java
98+
final Properties properties = new Properties();
99+
properties.setProperty("cachedReaderKeepAliveTimeoutMs", "600000");
100+
```
101+
> [!NOTE]\
102+
> If a connection pool is used, this setting is ignored and the lifespan of this cached connection object will be handled by the connection pool instead.
90103
91104
## Limitations
92105

wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Properties;
2626
import java.util.Set;
2727
import java.util.logging.Level;
28+
import java.util.concurrent.TimeUnit;
2829
import java.util.logging.Logger;
2930
import org.checkerframework.checker.nullness.qual.NonNull;
3031
import software.amazon.jdbc.AwsWrapperProperty;
@@ -40,6 +41,7 @@
4041
import software.amazon.jdbc.cleanup.CanReleaseResources;
4142
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
4243
import software.amazon.jdbc.plugin.failover.FailoverSQLException;
44+
import software.amazon.jdbc.util.CacheItem;
4345
import software.amazon.jdbc.util.Messages;
4446
import software.amazon.jdbc.util.SqlState;
4547
import software.amazon.jdbc.util.Utils;
@@ -81,17 +83,24 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
8183
private volatile boolean inReadWriteSplit = false;
8284
private HostListProviderService hostListProviderService;
8385
private Connection writerConnection;
84-
private Connection readerConnection;
8586
private HostSpec readerHostSpec;
8687
private boolean isReaderConnFromInternalPool;
8788
private boolean isWriterConnFromInternalPool;
89+
private CacheItem<Connection> readerConnection;
8890

8991
public static final AwsWrapperProperty READER_HOST_SELECTOR_STRATEGY =
9092
new AwsWrapperProperty(
9193
"readerHostSelectorStrategy",
9294
"random",
9395
"The strategy that should be used to select a new reader host.");
9496

97+
public static final AwsWrapperProperty CACHED_READER_KEEP_ALIVE_TIMEOUT =
98+
new AwsWrapperProperty(
99+
"cachedReaderKeepAliveTimeoutMs",
100+
"0",
101+
"The time in milliseconds to keep a reader connection alive in the cache. "
102+
+ "Default value 0 means the Wrapper will keep reusing the same cached reader connection.");
103+
95104
static {
96105
PropertyDefinition.registerPluginProperties(ReadWriteSplittingPlugin.class);
97106
}
@@ -114,7 +123,7 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
114123
this(pluginService, properties);
115124
this.hostListProviderService = hostListProviderService;
116125
this.writerConnection = writerConnection;
117-
this.readerConnection = readerConnection;
126+
this.readerConnection = new CacheItem<>(readerConnection, CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties));
118127
}
119128

120129
@Override
@@ -147,7 +156,7 @@ public Connection connect(
147156
if (!pluginService.acceptsStrategy(hostSpec.getRole(), this.readerSelectorStrategy)) {
148157
throw new UnsupportedOperationException(
149158
Messages.get("ReadWriteSplittingPlugin.unsupportedHostSpecSelectorStrategy",
150-
new Object[] { this.readerSelectorStrategy }));
159+
new Object[] {this.readerSelectorStrategy}));
151160
}
152161

153162
final Connection currentConnection = connectFunc.call();
@@ -209,8 +218,8 @@ public <T, E extends Exception> T execute(
209218
if (this.writerConnection != null && !this.writerConnection.isClosed()) {
210219
this.writerConnection.clearWarnings();
211220
}
212-
if (this.readerConnection != null && !this.readerConnection.isClosed()) {
213-
this.readerConnection.clearWarnings();
221+
if (this.readerConnection != null && isConnectionUsable(this.readerConnection.get())) {
222+
this.readerConnection.get().clearWarnings();
214223
}
215224
} catch (final SQLException e) {
216225
throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e);
@@ -284,7 +293,8 @@ private void setWriterConnection(final Connection writerConnection,
284293
}
285294

286295
private void setReaderConnection(final Connection conn, final HostSpec host) {
287-
this.readerConnection = conn;
296+
closeReaderConnectionIfIdle(this.readerConnection);
297+
this.readerConnection = new CacheItem<>(conn, this.getKeepAliveTimeout(host));
288298
this.readerHostSpec = host;
289299
LOGGER.finest(
290300
() -> Messages.get(
@@ -321,7 +331,7 @@ void switchConnectionIfRequired(final boolean readOnly) throws SQLException {
321331
} catch (final SQLException e) {
322332
if (!isConnectionUsable(currentConnection)) {
323333
logAndThrowException(
324-
Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", new Object[] { e.getMessage() }),
334+
Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", new Object[] {e.getMessage()}),
325335
SqlState.CONNECTION_UNABLE_TO_CONNECT,
326336
e);
327337
return;
@@ -389,7 +399,7 @@ private void switchToWriterConnection(
389399
}
390400

391401
if (this.isReaderConnFromInternalPool) {
392-
this.closeConnectionIfIdle(this.readerConnection);
402+
this.closeReaderConnectionIfIdle(this.readerConnection);
393403
}
394404

395405
LOGGER.finer(() -> Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter",
@@ -426,15 +436,15 @@ private void switchToReaderConnection(final List<HostSpec> hosts)
426436
Messages.get(
427437
"ReadWriteSplittingPlugin.previousReaderNotAllowed",
428438
new Object[] {this.readerHostSpec, Utils.logTopology(hosts, "")}));
429-
closeConnectionIfIdle(this.readerConnection);
439+
closeReaderConnectionIfIdle(this.readerConnection);
430440
}
431441

432442
this.inReadWriteSplit = true;
433-
if (!isConnectionUsable(this.readerConnection)) {
443+
if (this.readerConnection == null || !isConnectionUsable(this.readerConnection.get())) {
434444
initializeReaderConnection(hosts);
435445
} else {
436446
try {
437-
switchCurrentConnectionTo(this.readerConnection, this.readerHostSpec);
447+
switchCurrentConnectionTo(this.readerConnection.get(), this.readerHostSpec);
438448
LOGGER.finer(() -> Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader",
439449
new Object[] {this.readerHostSpec.getUrl()}));
440450
} catch (SQLException e) {
@@ -447,15 +457,13 @@ private void switchToReaderConnection(final List<HostSpec> hosts)
447457
new Object[] {this.readerHostSpec.getUrl()}));
448458
}
449459

450-
this.readerConnection.close();
451-
this.readerConnection = null;
452-
this.readerHostSpec = null;
460+
closeReaderConnectionIfIdle(this.readerConnection);
453461
initializeReaderConnection(hosts);
454462
}
455463
}
456464

457465
if (this.isWriterConnFromInternalPool) {
458-
this.closeConnectionIfIdle(this.writerConnection);
466+
this.closeWriterConnectionIfIdle(this.writerConnection);
459467
}
460468
}
461469

@@ -518,39 +526,60 @@ private void getNewReaderConnection() throws SQLException {
518526
() -> Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader",
519527
new Object[] {finalReaderHost.getUrl()}));
520528
setReaderConnection(conn, readerHost);
521-
switchCurrentConnectionTo(this.readerConnection, this.readerHostSpec);
529+
switchCurrentConnectionTo(this.readerConnection.get(), this.readerHostSpec);
522530
}
523531

524532
private boolean isConnectionUsable(final Connection connection) throws SQLException {
525533
return connection != null && !connection.isClosed();
526534
}
527535

536+
private long getKeepAliveTimeout(final HostSpec host) {
537+
if (this.pluginService.isPooledConnectionProvider(host, properties)) {
538+
// Let the connection pool handle the lifetime of the reader connection.
539+
return 0;
540+
}
541+
final long keepAliveMs = CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties);
542+
return keepAliveMs > 0 ? System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(keepAliveMs) : 0;
543+
}
544+
528545
@Override
529546
public void releaseResources() {
530547
closeIdleConnections();
531548
}
532549

533550
private void closeIdleConnections() {
534551
LOGGER.finest(() -> Messages.get("ReadWriteSplittingPlugin.closingInternalConnections"));
535-
closeConnectionIfIdle(this.readerConnection);
536-
closeConnectionIfIdle(this.writerConnection);
552+
closeReaderConnectionIfIdle(this.readerConnection);
553+
closeWriterConnectionIfIdle(this.writerConnection);
537554
}
538555

539-
void closeConnectionIfIdle(final Connection internalConnection) {
556+
void closeReaderConnectionIfIdle(CacheItem<Connection> readerConnection) {
557+
if (readerConnection == null) {
558+
return;
559+
}
560+
540561
final Connection currentConnection = this.pluginService.getCurrentConnection();
562+
final Connection readerConnectionCache = readerConnection.get(true);
563+
541564
try {
542-
if (internalConnection != null
543-
&& internalConnection != currentConnection
544-
&& !internalConnection.isClosed()) {
545-
internalConnection.close();
546-
if (internalConnection == writerConnection) {
547-
writerConnection = null;
548-
}
565+
if (isConnectionUsable(readerConnectionCache) && readerConnectionCache != currentConnection) {
566+
readerConnectionCache.close();
567+
}
568+
} catch (SQLException e) {
569+
// Do nothing.
570+
}
549571

550-
if (internalConnection == readerConnection) {
551-
readerConnection = null;
552-
readerHostSpec = null;
553-
}
572+
this.readerConnection = null;
573+
this.readerHostSpec = null;
574+
}
575+
576+
void closeWriterConnectionIfIdle(final Connection internalConnection) {
577+
final Connection currentConnection = this.pluginService.getCurrentConnection();
578+
try {
579+
if (isConnectionUsable(internalConnection)
580+
&& internalConnection != currentConnection) {
581+
internalConnection.close();
582+
writerConnection = null;
554583
}
555584
} catch (final SQLException e) {
556585
// ignore
@@ -565,6 +594,6 @@ Connection getWriterConnection() {
565594
}
566595

567596
Connection getReaderConnection() {
568-
return this.readerConnection;
597+
return this.readerConnection == null ? null : this.readerConnection.get();
569598
}
570599
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package software.amazon.jdbc.util;
18+
19+
import java.util.Objects;
20+
21+
public class CacheItem<V> {
22+
23+
final V item;
24+
final long expirationTime;
25+
26+
public CacheItem(final V item, final long expirationTime) {
27+
this.item = item;
28+
this.expirationTime = expirationTime;
29+
}
30+
31+
public boolean isExpired() {
32+
if (expirationTime <= 0) {
33+
// No expiration time.
34+
return false;
35+
}
36+
return System.nanoTime() > expirationTime;
37+
}
38+
39+
public V get() {
40+
return get(false);
41+
}
42+
43+
public V get(final boolean returnExpired) {
44+
return (this.isExpired() && !returnExpired) ? null : item;
45+
}
46+
47+
@Override
48+
public int hashCode() {
49+
return Objects.hashCode(item);
50+
}
51+
52+
@Override
53+
public boolean equals(Object obj) {
54+
if (this == obj) {
55+
return true;
56+
}
57+
if (!(obj instanceof CacheItem)) {
58+
return false;
59+
}
60+
CacheItem<?> other = (CacheItem<?>) obj;
61+
return Objects.equals(this.item, other.item);
62+
}
63+
64+
@Override
65+
public String toString() {
66+
return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]";
67+
}
68+
}

wrapper/src/test/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPluginTest.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import software.amazon.jdbc.OldConnectionSuggestedAction;
5555
import software.amazon.jdbc.PluginService;
5656
import software.amazon.jdbc.PropertyDefinition;
57-
import software.amazon.jdbc.dialect.Dialect;
5857
import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy;
5958
import software.amazon.jdbc.plugin.failover.FailoverSuccessSQLException;
6059
import software.amazon.jdbc.util.SqlState;
@@ -95,7 +94,6 @@ public class ReadWriteSplittingPluginTest {
9594
@Mock private JdbcCallable<Connection, SQLException> mockConnectFunc;
9695
@Mock private JdbcCallable<ResultSet, SQLException> mockSqlFunction;
9796
@Mock private PluginService mockPluginService;
98-
@Mock private Dialect mockDialect;
9997
@Mock private HostListProviderService mockHostListProviderService;
10098
@Mock private Connection mockWriterConn;
10199
@Mock private Connection mockNewWriterConn;
@@ -355,6 +353,35 @@ public void testSetReadOnlyOnClosedConnection() throws SQLException {
355353
assertNull(plugin.getReaderConnection());
356354
}
357355

356+
@Test
357+
public void testSetReadOnly_readerExpires() throws SQLException, InterruptedException {
358+
when(this.mockPluginService.connect(eq(readerHostSpec1), any(Properties.class), any()))
359+
.thenReturn(mockReaderConn1)
360+
.thenReturn(mockReaderConn2);
361+
362+
final Properties propsWithExpirationTime = new Properties();
363+
propsWithExpirationTime.put("cachedReaderKeepAliveTimeoutMs", "5000");
364+
365+
final ReadWriteSplittingPlugin plugin = new ReadWriteSplittingPlugin(
366+
mockPluginService,
367+
propsWithExpirationTime);
368+
369+
plugin.switchConnectionIfRequired(true);
370+
assertEquals(mockReaderConn1, plugin.getReaderConnection());
371+
372+
Thread.sleep(1000);
373+
374+
plugin.switchConnectionIfRequired(true);
375+
// Ensure the cached reader connection hasn't changed yet since it hasn't expired.
376+
assertEquals(mockReaderConn1, plugin.getReaderConnection());
377+
378+
Thread.sleep(6000);
379+
plugin.switchConnectionIfRequired(true);
380+
381+
// Ensure the cached reader connection has expired and updated.
382+
assertEquals(mockReaderConn2, plugin.getReaderConnection());
383+
}
384+
358385
@Test
359386
public void testExecute_failoverToNewWriter() throws SQLException {
360387
when(mockSqlFunction.call()).thenThrow(FailoverSuccessSQLException.class);
@@ -580,7 +607,7 @@ public void testClosePooledReaderConnectionAfterSetReadOnly() throws SQLExceptio
580607
spyPlugin.switchConnectionIfRequired(true);
581608
spyPlugin.switchConnectionIfRequired(false);
582609

583-
verify(spyPlugin, times(1)).closeConnectionIfIdle(eq(mockReaderConn1));
610+
verify(spyPlugin, times(1)).closeReaderConnectionIfIdle(any());
584611
}
585612

586613
@Test
@@ -607,7 +634,7 @@ public void testClosePooledWriterConnectionAfterSetReadOnly() throws SQLExceptio
607634
spyPlugin.switchConnectionIfRequired(false);
608635
spyPlugin.switchConnectionIfRequired(true);
609636

610-
verify(spyPlugin, times(1)).closeConnectionIfIdle(eq(mockWriterConn));
637+
verify(spyPlugin, times(1)).closeWriterConnectionIfIdle(eq(mockWriterConn));
611638
}
612639

613640
private static HikariConfig getHikariConfig(HostSpec hostSpec, Properties props) {

0 commit comments

Comments
 (0)