From 3f4280b1e1326b73e7520cb41a3287bcc04e15c7 Mon Sep 17 00:00:00 2001 From: Sophia Chu Date: Thu, 6 Nov 2025 09:14:36 -0800 Subject: [PATCH] feat: srw --- .../CompatibilityCrossPlugins.md | 12 +- .../CompatibilityDatabaseTypes.md | 50 +- .../CompatibilityEndpoints.md | 4 + .../UsingTheJdbcDriver.md | 1 + .../UsingTheSimpleReadWriteSplittingPlugin.md | 98 +++ .../SimpleReadWriteSplittingMySQLExample.java | 142 ++++ ...mpleReadWriteSplittingPostgresExample.java | 142 ++++ .../jdbc/ConnectionPluginChainBuilder.java | 3 + ...AuroraInitialConnectionStrategyPlugin.java | 6 +- .../jdbc/plugin/DefaultConnectionPlugin.java | 7 +- .../failover/FailoverConnectionPlugin.java | 47 +- .../failover2/FailoverConnectionPlugin.java | 9 + .../srw/SimpleReadWriteSplittingPlugin.java | 678 ++++++++++++++++++ ...SimpleReadWriteSplittingPluginFactory.java | 29 + .../plugin/staledns/AuroraStaleDnsHelper.java | 1 + ..._advanced_jdbc_wrapper_messages.properties | 7 +- .../container/ConnectionStringHelper.java | 13 + .../tests/ReadWriteSplittingTests.java | 52 +- .../tests/SimpleReadWriteSplittingTest.java | 205 ++++++ .../CustomEndpointMonitorImplTest.java | 14 +- .../SimpleReadWriteSplittingPluginTest.java | 630 ++++++++++++++++ 21 files changed, 2080 insertions(+), 70 deletions(-) create mode 100644 docs/using-the-jdbc-driver/using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md create mode 100644 examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingMySQLExample.java create mode 100644 examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingPostgresExample.java create mode 100644 wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPlugin.java create mode 100644 wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginFactory.java create mode 100644 wrapper/src/test/java/integration/container/tests/SimpleReadWriteSplittingTest.java create mode 100644 wrapper/src/test/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginTest.java diff --git a/docs/using-the-jdbc-driver/CompatibilityCrossPlugins.md b/docs/using-the-jdbc-driver/CompatibilityCrossPlugins.md index d40aee79d..744ecce9d 100644 --- a/docs/using-the-jdbc-driver/CompatibilityCrossPlugins.md +++ b/docs/using-the-jdbc-driver/CompatibilityCrossPlugins.md @@ -24,6 +24,7 @@ | [initialConnection](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | | | | | | [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | | | | | | [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | |
@@ -46,6 +47,7 @@ | [initialConnection](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | | | | | | [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | | | | | | [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | |
@@ -64,6 +66,7 @@ | [initialConnection](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | | | | | | [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | | | | | | [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | |
@@ -78,6 +81,7 @@ | [initialConnection](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | | | | | | [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | | | | | | [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | |
@@ -88,9 +92,11 @@ | [initialConnection](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | | | | | | [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | | | | | | [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | |
-| Plugin codes / Plugin codes | [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | -|--------------------------------------------------|-------------------------------------------------------------------|--------------------------------------------------| -| [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | +| Plugin codes / Plugin codes | [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | +|------------------------------------------------------------------|-------------------------------------------------------------------|----------------------------------------------------------| +| [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | diff --git a/docs/using-the-jdbc-driver/CompatibilityDatabaseTypes.md b/docs/using-the-jdbc-driver/CompatibilityDatabaseTypes.md index 0aeb66b9d..fd46c9206 100644 --- a/docs/using-the-jdbc-driver/CompatibilityDatabaseTypes.md +++ b/docs/using-the-jdbc-driver/CompatibilityDatabaseTypes.md @@ -16,6 +16,7 @@ | [okta](./using-plugins/UsingTheOktaAuthPlugin.md) | | | | | ~~auroraStaleDns~~ | | | | | [readWriteSplitting](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | | [auroraConnectionTracker](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | | | | | [driverMetaData](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | | | | | connectTime | | | | @@ -27,27 +28,28 @@
-| Plugin codes / Database types | [RDS Multi-AZ DB Instance deployment (2 instances)](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.MultiAZSingleStandby.html)
(MySQL and PG) | [RDS Single-AZ Instance deployment (1 instance)](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Overview.DBInstance.html)
(MySQL and PG) | Community Database
(MySQL and PG) | -|---------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------| -| executionTime | | | | -| logQuery | | | | -| dataCache | | | | -| customEndpoint | | | | -| [efm](./using-plugins/UsingTheHostMonitoringPlugin.md) | | | | -| [efm2](./using-plugins/UsingTheHostMonitoringPlugin.md#host-monitoring-plugin-v2) | | | | -| [failover](./using-plugins/UsingTheFailoverPlugin.md) | | | | -| [failover2](./using-plugins/UsingTheFailover2Plugin.md) | | | | -| [iam](./using-plugins/UsingTheIamAuthenticationPlugin.md) | | | | -| [awsSecretsManager](./using-plugins/UsingTheAwsSecretsManagerPlugin.md) | | | | -| [federatedAuth](./using-plugins/UsingTheFederatedAuthPlugin.md) | | | | -| [okta](./using-plugins/UsingTheOktaAuthPlugin.md) | | | | -| ~~auroraStaleDns~~ | | | | -| [readWriteSplitting](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | | | | -| [auroraConnectionTracker](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | | | | -| [driverMetaData](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | | | | -| connectTime | | | | -| [dev](./using-plugins/UsingTheDeveloperPlugin.md) | | | | -| fastestResponseStrategy | | | | -| [initialConnection](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | | | | -| [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | | | | -| [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | | +| Plugin codes / Database types | [RDS Multi-AZ DB Instance deployment (2 instances)](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.MultiAZSingleStandby.html)
(MySQL and PG) | [RDS Single-AZ Instance deployment (1 instance)](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Overview.DBInstance.html)
(MySQL and PG) | Community Database
(MySQL and PG) | +|---------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------| +| executionTime | | | | +| logQuery | | | | +| dataCache | | | | +| customEndpoint | | | | +| [efm](./using-plugins/UsingTheHostMonitoringPlugin.md) | | | | +| [efm2](./using-plugins/UsingTheHostMonitoringPlugin.md#host-monitoring-plugin-v2) | | | | +| [failover](./using-plugins/UsingTheFailoverPlugin.md) | | | | +| [failover2](./using-plugins/UsingTheFailover2Plugin.md) | | | | +| [iam](./using-plugins/UsingTheIamAuthenticationPlugin.md) | | | | +| [awsSecretsManager](./using-plugins/UsingTheAwsSecretsManagerPlugin.md) | | | | +| [federatedAuth](./using-plugins/UsingTheFederatedAuthPlugin.md) | | | | +| [okta](./using-plugins/UsingTheOktaAuthPlugin.md) | | | | +| ~~auroraStaleDns~~ | | | | +| [readWriteSplitting](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | (disable `verifyNewSrwConnections`) | +| [auroraConnectionTracker](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | | | | +| [driverMetaData](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | | | | +| connectTime | | | | +| [dev](./using-plugins/UsingTheDeveloperPlugin.md) | | | | +| fastestResponseStrategy | | | | +| [initialConnection](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | | | | +| [limitless](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | | | | +| [bg](./using-plugins/UsingTheBlueGreenPlugin.md) | | | | diff --git a/docs/using-the-jdbc-driver/CompatibilityEndpoints.md b/docs/using-the-jdbc-driver/CompatibilityEndpoints.md index 10c76cdce..7ee8f07d2 100644 --- a/docs/using-the-jdbc-driver/CompatibilityEndpoints.md +++ b/docs/using-the-jdbc-driver/CompatibilityEndpoints.md @@ -33,6 +33,7 @@ There are many different URL types (endpoints) that can be used with The AWS JDB | [okta](./using-plugins/UsingTheOktaAuthPlugin.md) | | | ~~auroraStaleDns~~ | | | [readWriteSplitting](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | [auroraConnectionTracker](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | | | [driverMetaData](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | | | connectTime | | @@ -60,6 +61,7 @@ There are many different URL types (endpoints) that can be used with The AWS JDB | [okta](./using-plugins/UsingTheOktaAuthPlugin.md) | | | | | | ~~auroraStaleDns~~ | | | | | | [readWriteSplitting](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | | | [auroraConnectionTracker](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | | | | | | [driverMetaData](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | | | | | | connectTime | | | | | @@ -87,6 +89,7 @@ There are many different URL types (endpoints) that can be used with The AWS JDB | [okta](./using-plugins/UsingTheOktaAuthPlugin.md) | | | | | | ~~auroraStaleDns~~ | | | | | | [readWriteSplitting](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | | | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | | | [auroraConnectionTracker](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | | | | | | [driverMetaData](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | | | | | | connectTime | | | | | @@ -115,6 +118,7 @@ There are many different URL types (endpoints) that can be used with The AWS JDB | [okta](./using-plugins/UsingTheOktaAuthPlugin.md) | (requires special configuration) | (requires special configuration) | | ~~auroraStaleDns~~ | | | | [readWriteSplitting](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | | | +| [srw](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | | | | [auroraConnectionTracker](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | | | | [driverMetaData](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | | | | connectTime | | | diff --git a/docs/using-the-jdbc-driver/UsingTheJdbcDriver.md b/docs/using-the-jdbc-driver/UsingTheJdbcDriver.md index b3ec4cc89..b7fdfc545 100644 --- a/docs/using-the-jdbc-driver/UsingTheJdbcDriver.md +++ b/docs/using-the-jdbc-driver/UsingTheJdbcDriver.md @@ -211,6 +211,7 @@ The AWS JDBC Driver has several built-in plugins that are available to use. Plea | [Aurora Connection Tracker Plugin](./using-plugins/UsingTheAuroraConnectionTrackerPlugin.md) | `auroraConnectionTracker` | Aurora, RDS Multi-AZ DB Cluster | Tracks all the opened connections. In the event of a cluster failover, the plugin will close all the impacted connections to the node. This plugin is enabled by default. | None | | [Driver Metadata Connection Plugin](./using-plugins/UsingTheDriverMetadataConnectionPlugin.md) | `driverMetaData` | Any database | Allows user application to override the return value of `DatabaseMetaData#getDriverName` | None | | [Read Write Splitting Plugin](./using-plugins/UsingTheReadWriteSplittingPlugin.md) | `readWriteSplitting` | Aurora | Enables read write splitting functionality where users can switch between database reader and writer instances. | None | +| [Simple Read Write Splitting Plugin](./using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md) | `srw` | Any database | Enables read write splitting functionality where users can switch between reader and writer endpoints. | None | | [Developer Plugin](./using-plugins/UsingTheDeveloperPlugin.md) | `dev` | Any database | Helps developers test various everyday scenarios including rare events like network outages and database cluster failover. The plugin allows injecting and raising an expected exception, then verifying how applications handle it. | None | | [Aurora Initial Connection Strategy](./using-plugins/UsingTheAuroraInitialConnectionStrategyPlugin.md) | `initialConnection` | Aurora | Allows users to configure their initial connection strategy to reader cluster endpoints. Prevents incorrectly opening a new connection to an old writer node when DNS records have not yet updated after a recent failover event.

This plugin is **strongly** suggested when using cluster writer endpoint, cluster reader endpoint or global database endpoint in the connection string.

:warning:**Note:** Contrary to `failover` and `failover2` plugins, `initialConnection` plugin doesn't implement failover support itself. It helps to eliminate opening wrong connections to an old writer node after cluster failover is completed. | None | | [Limitless Connection Plugin](./using-plugins/UsingTheLimitlessConnectionPlugin.md) | `limitless` | Aurora | Enables client-side load-balancing of Transaction Routers on Amazon Aurora Limitless Databases . | None | diff --git a/docs/using-the-jdbc-driver/using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md b/docs/using-the-jdbc-driver/using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md new file mode 100644 index 000000000..35cce92a1 --- /dev/null +++ b/docs/using-the-jdbc-driver/using-plugins/UsingTheSimpleReadWriteSplittingPlugin.md @@ -0,0 +1,98 @@ +# Simple Read/Write Splitting Plugin + +The Simple Read/Write Splitting Plugin adds functionality to switch between endpoints via calls to the `Connection#setReadOnly` method. Based on the values provided in the properties, upon calling `setReadOnly(true)`, the plugin will connect to the specified endpoint for read operations. When `setReadOnly(false)` is called, the plugin will connect to the specified endpoint for write operations. Future calls to `setReadOnly` will switch between the established writer and reader connections according to the boolean argument you supply to the `setReadOnly` method. + +The plugin will use the current connection, which may be the writer or initial connection, as a fallback if the reader connection is unable to be established, or if connection verification is enabled and the connection is not to a reader host. + +The plugin does not rely on cluster topology. It relies purely on the provided endpoints and their DNS resolution. + +## Loading the Simple Read/Write Splitting Plugin + +The Simple Read/Write Splitting Plugin is not loaded by default. To load the plugin, include it in the `wrapperPlugins` connection parameter. + +``` +final Properties properties = new Properties(); +properties.setProperty(PropertyDefinition.PLUGINS.name, "srw,failover2,efm2"); +properties.setProperty("srwWriteEndpoint", "test-db.cluster-XYZ.us-east-2.rds.amazonaws.com"); +properties.setProperty("srwReadEndpoint", "test-db.cluster-ro-XYZ.us-east-2.rds.amazonaws.com"); +``` + +If you would like to use the Simple Read/Write Splitting Plugin without the failover plugin, make sure you have the `srw` plugin in the `wrapperPlugins` property, and that the failover plugin is not part of it. +``` +final Properties properties = new Properties(); +properties.setProperty(PropertyDefinition.PLUGINS.name, "srw"); +properties.setProperty("srwWriteEndpoint", "test-db.cluster-XYZ.us-east-2.rds.amazonaws.com"); +properties.setProperty("srwReadEndpoint", "test-db.cluster-ro-XYZ.us-east-2.rds.amazonaws.com"); +``` + +## Simple Read/Write Splitting Plugin Parameters +| Parameter | Value | Required | Description | Default Value | Example Value | +|------------------------------|:-------:|:--------:|:----------------------------------------------------------------------------------------------------------------------------------------------|---------------|--------------------------------------------------------------| +| `srwWriteEndpoint` | String | Yes | The endpoint to connect to when `setReadOnly(false)` is called. | `null` | `.cluster-..rds.amazonaws.com` | +| `srwReadEndpoint` | String | Yes | The endpoint to connect to when `setReadOnly(true)` is called. | `null` | `.cluster-ro-..rds.amazonaws.com` | +| `verifyNewSrwConnections` | Boolean | No | Enables role verification for new connections made by the Simple Read/Write Splitting Plugin. | `true` | `true`, `false` | +| `verifyOpenedConnectionType` | String | No | If `verifyNewSrwConnections` is set to `true`, this parameter will verify the initial opened connection to be either a writer or a reader. | `null` | `writer`, `reader` | +| `srwConnectRetryTimeoutMs` | Integer | No | If `verifyNewSrwConnections` is set to `true`, this parameter sets the maximum allowed time in milliseconds for retrying connection attempts. | `60000` | `60000` | +| `srwConnectRetryIntervalMs` | Integer | No | If `verifyNewSrwConnections` is set to `true`, this parameter sets the time delay in milliseconds between each retry of opening a connection. | `1000` | `1000` | + +## How the Simple Read/Write Splitting Plugin Verifies Connections + +The property `verifyNewSrwConnections` is enabled by default. This means that when new connections are made with the Simple Read/Write Splitting Plugin, a query is sent to the new connection to verify its role. If the connection cannot be verified as having the correct role—that is, a write connection is not connected to a writer, or a read connection is not connected to a reader—the plugin will retry the connection up to the time limit of `srwConnectRetryTimeoutMs`. + +The values of `srwConnectRetryTimeoutMs` and `srwConnectRetryIntervalMs` control the timing and aggressiveness of the plugin's retries. + +Additionally, to consistently ensure the role of connections made with the plugin, the plugin also provides role verification for the initial connection. When connecting with an RDS writer cluster or reader cluster endpoint, the plugin will retry the initial connection up to `srwConnectRetryTimeoutMs` until it has verified connection to the intended role of the endpoint. +If it is unable to return a verified initial connection, it will log a message and continue with the normal workflow of the other plugins. +When connecting with custom endpoints and other non-standard URLs, role verification on the initial connection can also be triggered by providing the expected role through the `verifyOpenedConnectionType` parameter. Set this to `writer` or `reader` accordingly. + +## Limitations When Verifying Connections + +#### Non-RDS clusters +The verification step determines the role of the connection by executing a query against it. If the endpoint is not part of an Aurora or RDS cluster, disable `verifyNewSrwConnections` by setting it to `false` in the properties. + +#### Autocommit +The verification logic results in errors such as `Cannot change transaction read-only property in the middle of a transaction` from the underlying driver when: +- autocommit is set to false +- setReadOnly is called +- as part of setReadOnly, a new connection is opened +- that connection's role is verified + +This is a result of the plugin executing the role-verification query against a new connection, and when autocommit is false, this opens a transaction. + +If autocommit is essential to a workflow, either ensure the plugin has connected to the desired target connection of the setReadOnly query before setting autocommit to false or disable `verifyNewSrwConnections`. Examples of the former can be found in the [Simple Read/Write Splitting Examples](UsingTheSimpleReadWriteSplittingPlugin.md#examples). + +## Using the Simple Read/Write Splitting Plugin with RDS Proxy + +RDS Proxy provides connection pooling and management that significantly improve application scalability by reducing database connection overhead and enabling thousands of concurrent connections through +connection multiplexing. Connecting exclusively through the proxy endpoint ensures consistent connection management, automatic failover handling, and centralized monitoring, while protecting the underlying database from connection exhaustion +and providing a stable abstraction layer that remains consistent even when database topology changes. By providing the read/write endpoint and a read-only endpoint to the Simple Read/Write Splitting Plugin, the AWS JDBC Driver will connect using +these endpoints any time setReadOnly is called. + +To take full advantage of the benefits of RDS Proxy, it is recommended to only connect through RDS Proxy endpoints. See [Using the AWS JDBC Driver with RDS Proxy](./../../../README.md#rds-proxy) for limitations. + +## Using the Simple Read/Write Splitting Plugin against non-RDS clusters + +The Simple Read/Write Splitting Plugin can be used to switch between any two endpoints. If the endpoints do not direct to an RDS cluster, ensure the property `verifyNewSrwConnections` is set to `false`. See [Limitations of verifyNewSrwConnections](UsingTheSimpleReadWriteSplittingPlugin.md#non-rds-clusters) for details. + +## Limitations + +### General plugin limitations + +When a Statement or ResultSet is created, it is internally bound to the database connection established at that moment. There is no standard JDBC functionality to change the internal connection used by Statement or ResultSet objects. Consequently, even if the read/write plugin switches the internal connection, any Statements/ResultSets created before this will continue using the old database connection. This bypasses the desired functionality provided by the plugin. To prevent these scenarios, an exception will be thrown if your code uses any Statements/ResultSets created before a change in internal connection. To solve this problem, please ensure you create new Statement/ResultSet objects after switching between the writer/reader. + +Verify plugin compatibility within your driver configuration using the [compatibility guide](../Compatibility.md). + +### Failover + +Immediately following a failover event, due to DNS caching, an RDS cluster endpoint may connect to the previous writer, and the read-only endpoint may connect to the new primary instance. + +To avoid stale DNS connections, enable `verifyNewSrwConnections`, as this will retry the connection. Service for Aurora clusters is typically restored in less than 60 seconds, and often less than 30 seconds. RDS Proxy endpoints to Aurora databases can update in as little as 3 seconds. Depending on your configuration and cluster availability `srwConnectRetryTimeoutMs` and `srwConnectRetryIntervalMs` may be set to customize the timing of the retries. + +Following failover, endpoints that point to specific instances will be impacted if their target instance was demoted to a reader or promoted to a writer. The Simple Read/Write Splitting Plugin always connects to the endpoint provided in the initial connection properties when `setReadOnly` is called. We suggest using endpoints that return connections with a specific role such as cluster or read-only endpoints, or using the [Read/Write Splitting Plugin](UsingTheReadWriteSplittingPlugin.md) to connect to instances based on the cluster's current topology. + +### Session state + +The plugin supports session state transfer when switching connections. All attributes mentioned in [Session State](../SessionState.md) are automatically transferred to a new connection. + +## Examples +[SimpleReadWriteSplittingPostgresExample.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingPostgresExample.java) and [SimpleReadWriteSplittingMySQLExample.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingMySQLExample.java) demonstrate how to enable and configure Simple Read/Write Splitting with the AWS JDBC Driver. diff --git a/examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingMySQLExample.java b/examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingMySQLExample.java new file mode 100644 index 000000000..445e17000 --- /dev/null +++ b/examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingMySQLExample.java @@ -0,0 +1,142 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import software.amazon.jdbc.PropertyDefinition; +import software.amazon.jdbc.plugin.failover.FailoverFailedSQLException; +import software.amazon.jdbc.plugin.failover.FailoverSuccessSQLException; +import software.amazon.jdbc.plugin.failover.TransactionStateUnknownSQLException; + + +public class SimpleReadWriteSplittingMySQLExample { + + // User configures connection properties here + public static final String MYSQL_CONNECTION_STRING = + "jdbc:aws-wrapper:mysql://test-db.cluster-XYZ.us-east-2.rds.amazonaws.com:3306/srwExample"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String SRW_WRITE_ENDPOINT = "test-db.cluster-XYZ.us-east-2.rds.amazonaws.com"; + private static final String SRW_READ_ENDPOINT = "test-db.cluster-ro-XYZ.us-east-2.rds.amazonaws.com"; + + public static void main(String[] args) throws SQLException { + + final Properties props = new Properties(); + + // Enable srw, failover, and efm2 plugins and set properties + props.setProperty(PropertyDefinition.PLUGINS.name, "srw,failover2,efm2"); + props.setProperty(PropertyDefinition.USER.name, USERNAME); + props.setProperty(PropertyDefinition.PASSWORD.name, PASSWORD); + props.setProperty("srwWriteEndpoint", SRW_WRITE_ENDPOINT); + props.setProperty("srwReadEndpoint", SRW_READ_ENDPOINT); + + /* Setup Step: Open connection and create tables - uncomment this section to create table and test values */ + // try (final Connection connection = DriverManager.getConnection(MYSQL_CONNECTION_STRING, props)) { + // setInitialSessionSettings(connection); + // executeWithFailoverHandling(connection, + // "CREATE TABLE bank_test (id int primary key, name varchar(40), account_balance int)"); + // executeWithFailoverHandling(connection, + // "INSERT INTO bank_test VALUES (0, 'Jane Doe', 200), (1, 'John Smith', 200), (2, 'Sally Smith', 200), (3, 'Joe Smith', 200)"); + // } + + // Example Step: Open connection and perform transaction + try (final Connection conn = DriverManager.getConnection(MYSQL_CONNECTION_STRING, props)) { + setInitialSessionSettings(conn); + + // Establish the internal reader connection before setting autocommit to false. + conn.setReadOnly(true); + // Switch back to the cluster endpoint to perform write operations. + conn.setReadOnly(false); + + // Begin business transaction + conn.setAutoCommit(false); + + // Example business transaction + executeWithFailoverHandling( + conn, + "UPDATE bank_test SET account_balance=account_balance - 100 WHERE name='Jane Doe'"); + executeWithFailoverHandling( + conn, + "UPDATE bank_test SET account_balance=account_balance + 100 WHERE name='John Smith'"); + + // Commit business transaction + conn.commit(); + // Internally switch to a reader connection + conn.setReadOnly(true); + + for (int i = 0; i < 4; i++) { + executeWithFailoverHandling(conn, "SELECT * FROM bank_test WHERE id = " + i); + } + + } catch (FailoverFailedSQLException e) { + // User application should open a new connection, check the results of the failed transaction and re-run it if + // needed. See: + // https://github.com/aws/aws-advanced-jdbc-wrapper/blob/main/docs/using-the-jdbc-driver/using-plugins/UsingTheFailoverPlugin.md#08001---unable-to-establish-sql-connection + throw e; + } catch (TransactionStateUnknownSQLException e) { + // User application should check the status of the failed transaction and restart it if needed. See: + // https://github.com/aws/aws-advanced-jdbc-wrapper/blob/main/docs/using-the-jdbc-driver/using-plugins/UsingTheFailoverPlugin.md#08007---transaction-resolution-unknown + throw e; + } catch (SQLException e) { + // Unexpected exception unrelated to failover. This should be handled by the user application. + throw e; + } + } + + public static void processResults(ResultSet results) { + // User can process results as needed + } + + public static void setInitialSessionSettings(Connection conn) throws SQLException { + try (Statement stmt1 = conn.createStatement()) { + // User can edit settings + stmt1.executeUpdate("SET time_zone = 'UTC'"); + } + } + + public static void executeWithFailoverHandling(Connection conn, String query) throws SQLException { + try (Statement stmt = conn.createStatement()) { + boolean hasResults = stmt.execute(query); + if (hasResults) { + processResults(stmt.getResultSet()); + } + } catch (FailoverFailedSQLException e) { + // Connection failed, and JDBC wrapper failed to reconnect to a new instance. + throw e; + } catch (FailoverSuccessSQLException e) { + // Query execution failed and JDBC wrapper successfully failed over to a new elected writer node. + // Reconfigure the connection + setInitialSessionSettings(conn); + // Re-run query + try (Statement stmt = conn.createStatement()) { + boolean hasResults = stmt.execute(query); + if (hasResults) { + processResults(stmt.getResultSet()); + } + } + } catch (TransactionStateUnknownSQLException e) { + // Connection failed while executing a business transaction. + // Transaction status is unknown. The driver has successfully reconnected to a new writer. + throw e; + } + } +} diff --git a/examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingPostgresExample.java b/examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingPostgresExample.java new file mode 100644 index 000000000..9125e1604 --- /dev/null +++ b/examples/AWSDriverExample/src/main/java/software/amazon/SimpleReadWriteSplittingPostgresExample.java @@ -0,0 +1,142 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import software.amazon.jdbc.PropertyDefinition; +import software.amazon.jdbc.plugin.failover.FailoverFailedSQLException; +import software.amazon.jdbc.plugin.failover.FailoverSuccessSQLException; +import software.amazon.jdbc.plugin.failover.TransactionStateUnknownSQLException; + + +public class SimpleReadWriteSplittingPostgresExample { + + // User configures connection properties here + public static final String POSTGRESQL_CONNECTION_STRING = + "jdbc:aws-wrapper:postgresql://test-db.cluster-XYZ.us-east-2.rds.amazonaws.com:5432/srwExample"; + private static final String USERNAME = "username"; + private static final String PASSWORD = "password"; + private static final String SRW_WRITE_ENDPOINT = "test-db.cluster-XYZ.us-east-2.rds.amazonaws.com"; + private static final String SRW_READ_ENDPOINT = "test-db.cluster-ro-XYZ.us-east-2.rds.amazonaws.com"; + + public static void main(String[] args) throws SQLException { + + final Properties props = new Properties(); + + // Enable srw, failover, and efm2 plugins and set properties + props.setProperty(PropertyDefinition.PLUGINS.name, "srw,failover2,efm2"); + props.setProperty(PropertyDefinition.USER.name, USERNAME); + props.setProperty(PropertyDefinition.PASSWORD.name, PASSWORD); + props.setProperty("srwWriteEndpoint", SRW_WRITE_ENDPOINT); + props.setProperty("srwReadEndpoint", SRW_READ_ENDPOINT); + + /* Setup Step: Open connection and create tables - uncomment this section to create table and test values */ + // try (final Connection connection = DriverManager.getConnection(POSTGRESQL_CONNECTION_STRING, props)) { + // setInitialSessionSettings(connection); + // executeWithFailoverHandling(connection, + // "CREATE TABLE bank_test (id int primary key, name varchar(40), account_balance int)"); + // executeWithFailoverHandling(connection, + // "INSERT INTO bank_test VALUES (0, 'Jane Doe', 200), (1, 'John Smith', 200), (2, 'Sally Smith', 200), (3, 'Joe Smith', 200)"); + // } + + // Example Step: Open connection and perform transaction + try (final Connection conn = DriverManager.getConnection(POSTGRESQL_CONNECTION_STRING, props)) { + setInitialSessionSettings(conn); + + // Establish the internal reader connection before setting autocommit to false. + conn.setReadOnly(true); + // Switch back to the cluster endpoint to perform write operations. + conn.setReadOnly(false); + + // Begin business transaction + conn.setAutoCommit(false); + + // Example business transaction + executeWithFailoverHandling( + conn, + "UPDATE bank_test SET account_balance=account_balance - 100 WHERE name='Jane Doe'"); + executeWithFailoverHandling( + conn, + "UPDATE bank_test SET account_balance=account_balance + 100 WHERE name='John Smith'"); + + // Commit business transaction + conn.commit(); + // Internally switch to a reader connection + conn.setReadOnly(true); + + for (int i = 0; i < 4; i++) { + executeWithFailoverHandling(conn, "SELECT * FROM bank_test WHERE id = " + i); + } + + } catch (FailoverFailedSQLException e) { + // User application should open a new connection, check the results of the failed transaction and re-run it if + // needed. See: + // https://github.com/aws/aws-advanced-jdbc-wrapper/blob/main/docs/using-the-jdbc-driver/using-plugins/UsingTheFailoverPlugin.md#08001---unable-to-establish-sql-connection + throw e; + } catch (TransactionStateUnknownSQLException e) { + // User application should check the status of the failed transaction and restart it if needed. See: + // https://github.com/aws/aws-advanced-jdbc-wrapper/blob/main/docs/using-the-jdbc-driver/using-plugins/UsingTheFailoverPlugin.md#08007---transaction-resolution-unknown + throw e; + } catch (SQLException e) { + // Unexpected exception unrelated to failover. This should be handled by the user application. + throw e; + } + } + + public static void processResults(ResultSet results) { + // User can process results as needed + } + + public static void setInitialSessionSettings(Connection conn) throws SQLException { + try (Statement stmt1 = conn.createStatement()) { + // User can edit settings + stmt1.executeUpdate("SET TIME ZONE 'UTC'"); + } + } + + public static void executeWithFailoverHandling(Connection conn, String query) throws SQLException { + try (Statement stmt = conn.createStatement()) { + boolean hasResults = stmt.execute(query); + if (hasResults) { + processResults(stmt.getResultSet()); + } + } catch (FailoverFailedSQLException e) { + // Connection failed, and JDBC wrapper failed to reconnect to a new instance. + throw e; + } catch (FailoverSuccessSQLException e) { + // Query execution failed and JDBC wrapper successfully failed over to a new elected writer node. + // Reconfigure the connection + setInitialSessionSettings(conn); + // Re-run query + try (Statement stmt = conn.createStatement()) { + boolean hasResults = stmt.execute(query); + if (hasResults) { + processResults(stmt.getResultSet()); + } + } + } catch (TransactionStateUnknownSQLException e) { + // Connection failed while executing a business transaction. + // Transaction status is unknown. The driver has successfully reconnected to a new writer. + throw e; + } + } +} diff --git a/wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginChainBuilder.java b/wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginChainBuilder.java index 3f1d89eac..b0104cda6 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginChainBuilder.java +++ b/wrapper/src/main/java/software/amazon/jdbc/ConnectionPluginChainBuilder.java @@ -48,6 +48,7 @@ import software.amazon.jdbc.plugin.iam.IamAuthConnectionPluginFactory; import software.amazon.jdbc.plugin.limitless.LimitlessConnectionPluginFactory; import software.amazon.jdbc.plugin.readwritesplitting.ReadWriteSplittingPluginFactory; +import software.amazon.jdbc.plugin.srw.SimpleReadWriteSplittingPluginFactory; import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsPluginFactory; import software.amazon.jdbc.plugin.strategy.fastestresponse.FastestResponseStrategyPluginFactory; import software.amazon.jdbc.profile.ConfigurationProfile; @@ -80,6 +81,7 @@ public class ConnectionPluginChainBuilder { put("okta", new OktaAuthPluginFactory()); put("auroraStaleDns", new AuroraStaleDnsPluginFactory()); put("readWriteSplitting", new ReadWriteSplittingPluginFactory()); + put("srw", new SimpleReadWriteSplittingPluginFactory()); put("auroraConnectionTracker", new AuroraConnectionTrackerPluginFactory()); put("driverMetaData", new DriverMetaDataConnectionPluginFactory()); put("connectTime", new ConnectTimeConnectionPluginFactory()); @@ -107,6 +109,7 @@ public class ConnectionPluginChainBuilder { put(AuroraStaleDnsPluginFactory.class, 500); put(BlueGreenConnectionPluginFactory.class, 550); put(ReadWriteSplittingPluginFactory.class, 600); + put(SimpleReadWriteSplittingPluginFactory.class, 610); put(FailoverConnectionPluginFactory.class, 700); put(software.amazon.jdbc.plugin.failover2.FailoverConnectionPluginFactory.class, 710); put(HostMonitoringConnectionPluginFactory.class, 800); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java index 1dbee4029..456ec00d1 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/AuroraInitialConnectionStrategyPlugin.java @@ -80,7 +80,7 @@ public class AuroraInitialConnectionStrategyPlugin extends AbstractConnectionPlu null, "Force to verify an opened connection to be either a writer or a reader."); - private enum VerifyOpenedConnectionType { + public enum VerifyOpenedConnectionType { WRITER, READER; @@ -104,7 +104,7 @@ public static VerifyOpenedConnectionType fromValue(String value) { private HostListProviderService hostListProviderService; private final RdsUtils rdsUtils = new RdsUtils(); - private VerifyOpenedConnectionType verifyOpenedConnectionType = null; + private final VerifyOpenedConnectionType verifyOpenedConnectionType; static { PropertyDefinition.registerPluginProperties(AuroraInitialConnectionStrategyPlugin.class); @@ -392,8 +392,6 @@ private HostSpec getReader(final Properties props, final @Nullable String awsReg } else { return this.pluginService.getHostSpecByStrategy(HostRole.READER, strategy); } - } catch (UnsupportedOperationException ex) { - throw ex; } catch (SQLException ex) { // host isn't found return null; diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/DefaultConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/DefaultConnectionPlugin.java index 2427cc816..ea107e83f 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/DefaultConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/DefaultConnectionPlugin.java @@ -122,6 +122,10 @@ public T execute( TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext( this.pluginService.getTargetName(), TelemetryTraceLevel.NESTED); + // Check previous autocommit value before calling jdbcMethodFunc. + final boolean doesSwitchAutoCommitFalseTrue = sqlMethodAnalyzer.doesSwitchAutoCommitFalseTrue( + this.pluginService.getCurrentConnection(), methodName, jdbcMethodArgs); + T result; try { result = jdbcMethodFunc.call(); @@ -144,8 +148,7 @@ public T execute( } else if ( sqlMethodAnalyzer.doesCloseTransaction(currentConn, methodName, jdbcMethodArgs) // According to the JDBC spec, transactions are committed if autocommit is switched from false to true. - || sqlMethodAnalyzer.doesSwitchAutoCommitFalseTrue(currentConn, methodName, - jdbcMethodArgs)) { + || doesSwitchAutoCommitFalseTrue) { this.pluginManagerService.setInTransaction(false); } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java index 25ce77ecb..9aa655b0d 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover/FailoverConnectionPlugin.java @@ -346,35 +346,38 @@ void initHostProvider( @Override public void notifyNodeListChanged(final Map> changes) { + try { + if (!this.enableFailoverSetting) { + return; + } - if (!this.enableFailoverSetting) { - return; - } - - if (LOGGER.isLoggable(Level.FINEST)) { - final StringBuilder sb = new StringBuilder("Changes:"); - for (final Map.Entry> change : changes.entrySet()) { - if (sb.length() > 0) { - sb.append("\n"); + if (LOGGER.isLoggable(Level.FINEST)) { + final StringBuilder sb = new StringBuilder("Changes:"); + for (final Map.Entry> change : changes.entrySet()) { + if (sb.length() > 0) { + sb.append("\n"); + } + sb.append(String.format("\tHost '%s': %s", change.getKey(), change.getValue())); } - sb.append(String.format("\tHost '%s': %s", change.getKey(), change.getValue())); + LOGGER.finest(sb.toString()); } - LOGGER.finest(sb.toString()); - } - final HostSpec currentHost = this.pluginService.getCurrentHostSpec(); - final String url = currentHost.getUrl(); - if (isNodeStillValid(url, changes)) { - return; - } - - for (final String alias : currentHost.getAliases()) { - if (isNodeStillValid(alias + "/", changes)) { + final HostSpec currentHost = this.pluginService.getCurrentHostSpec(); + final String url = currentHost.getUrl(); + if (isNodeStillValid(url, changes)) { return; } - } - LOGGER.fine(() -> Messages.get("Failover.invalidNode", new Object[]{currentHost})); + for (final String alias : currentHost.getAliases()) { + if (isNodeStillValid(alias + "/", changes)) { + return; + } + } + + LOGGER.fine(() -> Messages.get("Failover.invalidNode", new Object[]{currentHost})); + } finally { + this.staleDnsHelper.notifyNodeListChanged(changes); + } } private boolean isNodeStillValid(final String node, final Map> changes) { diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java index bd1d6b660..695c5fd65 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java @@ -22,8 +22,10 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -36,6 +38,7 @@ import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.JdbcCallable; import software.amazon.jdbc.JdbcMethod; +import software.amazon.jdbc.NodeChangeOptions; import software.amazon.jdbc.PluginManagerService; import software.amazon.jdbc.PluginService; import software.amazon.jdbc.PropertyDefinition; @@ -174,6 +177,7 @@ public FailoverConnectionPlugin(final PluginService pluginService, final Propert methods.add(JdbcMethod.INITHOSTPROVIDER.methodName); methods.add(JdbcMethod.CONNECT.methodName); + methods.add(JdbcMethod.NOTIFYNODELISTCHANGED.methodName); methods.add(JdbcMethod.CONNECTION_SETAUTOCOMMIT.methodName); methods.addAll(this.pluginService.getTargetDriverDialect().getNetworkBoundMethodNames(this.properties)); this.subscribedMethods = Collections.unmodifiableSet(methods); @@ -793,4 +797,9 @@ public Connection connect( return conn; } + + @Override + public void notifyNodeListChanged(final Map> changes) { + this.staleDnsHelper.notifyNodeListChanged(changes); + } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPlugin.java new file mode 100644 index 000000000..1d526148c --- /dev/null +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPlugin.java @@ -0,0 +1,678 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.jdbc.plugin.srw; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import org.checkerframework.checker.nullness.qual.NonNull; +import software.amazon.jdbc.AwsWrapperProperty; +import software.amazon.jdbc.HostListProviderService; +import software.amazon.jdbc.HostRole; +import software.amazon.jdbc.HostSpec; +import software.amazon.jdbc.HostSpecBuilder; +import software.amazon.jdbc.JdbcCallable; +import software.amazon.jdbc.JdbcMethod; +import software.amazon.jdbc.NodeChangeOptions; +import software.amazon.jdbc.OldConnectionSuggestedAction; +import software.amazon.jdbc.PluginService; +import software.amazon.jdbc.PropertyDefinition; +import software.amazon.jdbc.cleanup.CanReleaseResources; +import software.amazon.jdbc.hostavailability.HostAvailability; +import software.amazon.jdbc.plugin.AbstractConnectionPlugin; +import software.amazon.jdbc.plugin.AuroraInitialConnectionStrategyPlugin; +import software.amazon.jdbc.plugin.failover.FailoverSQLException; +import software.amazon.jdbc.plugin.readwritesplitting.ReadWriteSplittingSQLException; +import software.amazon.jdbc.util.Messages; +import software.amazon.jdbc.util.RdsUrlType; +import software.amazon.jdbc.util.RdsUtils; +import software.amazon.jdbc.util.SqlState; +import software.amazon.jdbc.util.StringUtils; +import software.amazon.jdbc.util.WrapperUtils; + +public class SimpleReadWriteSplittingPlugin extends AbstractConnectionPlugin + implements CanReleaseResources { + + private static final Logger LOGGER = Logger.getLogger(SimpleReadWriteSplittingPlugin.class.getName()); + private static final Set subscribedMethods = + Collections.unmodifiableSet(new HashSet() { + { + add(JdbcMethod.CONNECT.methodName); + add(JdbcMethod.INITHOSTPROVIDER.methodName); + add(JdbcMethod.NOTIFYCONNECTIONCHANGED.methodName); + add(JdbcMethod.CONNECTION_SETREADONLY.methodName); + add(JdbcMethod.CONNECTION_CLEARWARNINGS.methodName); + add(JdbcMethod.STATEMENT_EXECUTE.methodName); + add(JdbcMethod.STATEMENT_EXECUTEQUERY.methodName); + add(JdbcMethod.STATEMENT_EXECUTEBATCH.methodName); + add(JdbcMethod.STATEMENT_EXECUTEUPDATE.methodName); + add(JdbcMethod.PREPAREDSTATEMENT_EXECUTE.methodName); + add(JdbcMethod.PREPAREDSTATEMENT_EXECUTEUPDATE.methodName); + add(JdbcMethod.PREPAREDSTATEMENT_EXECUTELARGEUPDATE.methodName); + add(JdbcMethod.PREPAREDSTATEMENT_EXECUTEQUERY.methodName); + add(JdbcMethod.PREPAREDSTATEMENT_EXECUTEBATCH.methodName); + add(JdbcMethod.CALLABLESTATEMENT_EXECUTE.methodName); + add(JdbcMethod.CALLABLESTATEMENT_EXECUTEQUERY.methodName); + add(JdbcMethod.CALLABLESTATEMENT_EXECUTELARGEUPDATE.methodName); + add(JdbcMethod.CALLABLESTATEMENT_EXECUTEBATCH.methodName); + add(JdbcMethod.CALLABLESTATEMENT_EXECUTEUPDATE.methodName); + add(JdbcMethod.CONNECTION_SETAUTOCOMMIT.methodName); + } + }); + + private final PluginService pluginService; + private final Properties properties; + private final RdsUtils rdsUtils = new RdsUtils(); + private final boolean verifyNewConnections; + private volatile boolean inReadWriteSplit = false; + private HostListProviderService hostListProviderService; + private Connection writerConnection; + private Connection readerConnection; + private final String writeEndpoint; + private final String readEndpoint; + private HostSpec readEndpointHostSpec; + private HostSpec writeEndpointHostSpec; + private final AuroraInitialConnectionStrategyPlugin.VerifyOpenedConnectionType verifyOpenedConnectionType; + private final int connectRetryIntervalMs; + private final long connectRetryTimeoutMs; + + public static final AwsWrapperProperty SRW_READ_ENDPOINT = + new AwsWrapperProperty( + "srwReadEndpoint", + null, + "The read-only endpoint that should be used to connect to a new reader host."); + public static final AwsWrapperProperty SRW_WRITE_ENDPOINT = + new AwsWrapperProperty( + "srwWriteEndpoint", + null, + "The read-write/cluster endpoint that should be used to connect to the writer."); + public static final AwsWrapperProperty VERIFY_NEW_SRW_CONNECTIONS = + new AwsWrapperProperty( + "verifyNewSrwConnections", + "true", + "Enables role-verification for new connections made by the Simple Read Write Splitting Plugin.", + false, + new String[] { + "true", "false" + }); + + public static final AwsWrapperProperty SRW_CONNECT_RETRY_TIMEOUT_MS = + new AwsWrapperProperty( + "srwConnectRetryTimeoutMs", + "60000", + "Maximum allowed time for the retries opening a connection."); + + public static final AwsWrapperProperty SRW_CONNECT_RETRY_INTERVAL_MS = + new AwsWrapperProperty( + "srwConnectRetryIntervalMs", + "1000", + "Time between each retry of opening a connection."); + + static { + PropertyDefinition.registerPluginProperties(SimpleReadWriteSplittingPlugin.class); + } + + SimpleReadWriteSplittingPlugin(final PluginService pluginService, final Properties properties) { + this.writeEndpoint = SRW_WRITE_ENDPOINT.getString(properties); + if (StringUtils.isNullOrEmpty(writeEndpoint)) { + throw new + RuntimeException( + Messages.get( + "SimpleReadWriteSplittingPlugin.missingRequiredConfigParameter", + new Object[] {SRW_WRITE_ENDPOINT.name})); + } + this.readEndpoint = SRW_READ_ENDPOINT.getString(properties); + if (StringUtils.isNullOrEmpty(readEndpoint)) { + throw new + RuntimeException( + Messages.get( + "SimpleReadWriteSplittingPlugin.missingRequiredConfigParameter", + new Object[] {SRW_READ_ENDPOINT.name})); + } + this.pluginService = pluginService; + this.properties = properties; + this.verifyNewConnections = VERIFY_NEW_SRW_CONNECTIONS.getBoolean(properties); + this.verifyOpenedConnectionType = + AuroraInitialConnectionStrategyPlugin.VerifyOpenedConnectionType.fromValue( + AuroraInitialConnectionStrategyPlugin.VERIFY_OPENED_CONNECTION_TYPE.getString(properties)); + this.connectRetryIntervalMs = SRW_CONNECT_RETRY_INTERVAL_MS.getInteger(properties); + this.connectRetryTimeoutMs = SRW_CONNECT_RETRY_TIMEOUT_MS.getInteger(properties); + } + + /** + * For testing purposes only. + */ + SimpleReadWriteSplittingPlugin( + final PluginService pluginService, + final Properties properties, + final HostListProviderService hostListProviderService, + final Connection writerConnection, + final Connection readerConnection, + final HostSpec writeEndpointHostSpec, + final HostSpec readEndpointHostSpec) { + this(pluginService, properties); + this.hostListProviderService = hostListProviderService; + this.writerConnection = writerConnection; + this.readerConnection = readerConnection; + this.writeEndpointHostSpec = writeEndpointHostSpec; + this.readEndpointHostSpec = readEndpointHostSpec; + } + + @Override + public Set getSubscribedMethods() { + return subscribedMethods; + } + + @Override + public void initHostProvider( + final String driverProtocol, + final String initialUrl, + final Properties props, + final HostListProviderService hostListProviderService, + final JdbcCallable initHostProviderFunc) + throws SQLException { + + this.hostListProviderService = hostListProviderService; + initHostProviderFunc.call(); + } + + @Override + public OldConnectionSuggestedAction notifyConnectionChanged( + final EnumSet changes) { + try { + updateInternalConnectionInfo(); + } catch (final SQLException e) { + // ignore + } + + if (this.inReadWriteSplit) { + return OldConnectionSuggestedAction.PRESERVE; + } + return OldConnectionSuggestedAction.NO_OPINION; + } + + @Override + public Connection connect( + final String driverProtocol, + final HostSpec hostSpec, + final Properties props, + final boolean isInitialConnection, + final JdbcCallable connectFunc) + throws SQLException { + + if (!isInitialConnection || !this.verifyNewConnections) { + // No verification required. Continue with a normal workflow. + return connectFunc.call(); + } + + final RdsUrlType type = this.rdsUtils.identifyRdsType(hostSpec.getHost()); + + if (type == RdsUrlType.RDS_WRITER_CLUSTER + || type == RdsUrlType.RDS_GLOBAL_WRITER_CLUSTER + || this.verifyOpenedConnectionType == AuroraInitialConnectionStrategyPlugin.VerifyOpenedConnectionType.WRITER) { + Connection writerCandidateConn = this.getVerifiedConnection(props, hostSpec, HostRole.WRITER, connectFunc); + if (writerCandidateConn == null) { + // Can't get writer connection. Continue with a normal workflow. + return connectFunc.call(); + } + this.setInitialConnectionHostSpec(writerCandidateConn, hostSpec); + return writerCandidateConn; + } + + if (type == RdsUrlType.RDS_READER_CLUSTER + || this.verifyOpenedConnectionType == AuroraInitialConnectionStrategyPlugin.VerifyOpenedConnectionType.READER) { + Connection readerCandidateConn = + this.getVerifiedConnection(props, hostSpec, HostRole.READER, connectFunc); + if (readerCandidateConn == null) { + // Can't get a reader connection. Continue with a normal workflow. + return connectFunc.call(); + } + this.setInitialConnectionHostSpec(readerCandidateConn, hostSpec); + return readerCandidateConn; + } + + // Continue with a normal workflow. + return connectFunc.call(); + } + + private Connection getVerifiedConnection( + final Properties props, + final HostSpec hostSpec, + final HostRole hostRole, + final JdbcCallable connectFunc) + throws SQLException { + + final long endTimeNano = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(this.connectRetryTimeoutMs); + + Connection candidateConn; + + while (System.nanoTime() < endTimeNano) { + + candidateConn = null; + + try { + if (hostSpec == null) { + if (connectFunc == null) { + // Unable to connect to verify role. + break; + } + // No HostSpec provided, still verify role. + candidateConn = connectFunc.call(); + } else { + candidateConn = this.pluginService.connect(hostSpec, props, this); + } + + if (candidateConn == null || this.pluginService.getHostRole(candidateConn) != hostRole) { + // The connection does not have the desired role. Retry. + this.closeConnection(candidateConn); + this.delay(); + continue; + } + + // Connection is valid and verified. + return candidateConn; + } catch (SQLException ex) { + this.closeConnection(candidateConn); + if (this.pluginService.isLoginException(ex, this.pluginService.getTargetDriverDialect())) { + throw WrapperUtils.wrapExceptionIfNeeded(SQLException.class, ex); + } + this.delay(); + } catch (Throwable ex) { + this.closeConnection(candidateConn); + throw ex; + } + } + + LOGGER.fine( + () -> Messages.get("SimpleReadWriteSplittingPlugin.verificationFailed", + new Object[] {hostRole, this.connectRetryTimeoutMs})); + return null; + } + + private void setInitialConnectionHostSpec(Connection conn, HostSpec hostSpec) { + if (hostSpec == null) { + try { + hostSpec = this.pluginService.identifyConnection(conn); + } catch (Exception e) { + // Ignore error + } + } + + if (hostSpec != null) { + hostListProviderService.setInitialConnectionHostSpec(hostSpec); + } + } + + @Override + public T execute( + final Class resultClass, + final Class exceptionClass, + final Object methodInvokeOn, + final String methodName, + final JdbcCallable jdbcMethodFunc, + final Object[] args) + throws E { + final Connection conn = WrapperUtils.getConnectionFromSqlObject(methodInvokeOn); + if (conn != null && conn != this.pluginService.getCurrentConnection()) { + LOGGER.fine( + () -> Messages.get("ReadWriteSplittingPlugin.executingAgainstOldConnection", + new Object[] {methodInvokeOn})); + return jdbcMethodFunc.call(); + } + + if (JdbcMethod.CONNECTION_CLEARWARNINGS.methodName.equals(methodName)) { + try { + if (this.writerConnection != null && !this.writerConnection.isClosed()) { + this.writerConnection.clearWarnings(); + } + if (this.readerConnection != null && !this.readerConnection.isClosed()) { + this.readerConnection.clearWarnings(); + } + } catch (final SQLException e) { + throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e); + } + } + + if (JdbcMethod.CONNECTION_SETREADONLY.methodName.equals(methodName) + && args != null + && args.length > 0) { + try { + switchConnectionIfRequired((Boolean) args[0]); + } catch (final SQLException e) { + throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e); + } + } + + try { + return jdbcMethodFunc.call(); + } catch (final Exception e) { + if (e instanceof FailoverSQLException) { + LOGGER.finer( + () -> Messages.get("ReadWriteSplittingPlugin.failoverExceptionWhileExecutingCommand", + new Object[] {methodName})); + closeIdleConnections(); + } else { + LOGGER.finest( + () -> Messages.get("ReadWriteSplittingPlugin.exceptionWhileExecutingCommand", + new Object[] {methodName})); + } + throw e; + } + } + + private void updateInternalConnectionInfo() throws SQLException { + final Connection currentConnection = this.pluginService.getCurrentConnection(); + final HostSpec currentHost = this.pluginService.getCurrentHostSpec(); + if (currentConnection == null || currentHost == null) { + return; + } + + // Only update internal connection info if connection is to the endpoint and different from internal connection. + if (isWriteEndpoint(currentHost) && !currentConnection.equals(this.writerConnection) + && (!this.verifyNewConnections || this.pluginService.getHostRole(currentConnection) == HostRole.WRITER)) { + setWriterConnection(currentConnection, currentHost); + } else if (isReadEndpoint(currentHost) && !currentConnection.equals(this.readerConnection) + && (!this.verifyNewConnections || this.pluginService.getHostRole(currentConnection) == HostRole.READER)) { + setReaderConnection(currentConnection, currentHost); + } + } + + private boolean isWriteEndpoint(final @NonNull HostSpec hostSpec) { + return this.writeEndpoint.equalsIgnoreCase(hostSpec.getHost()); + } + + private boolean isReadEndpoint(final @NonNull HostSpec hostSpec) { + return this.readEndpoint.equalsIgnoreCase(hostSpec.getHost()); + } + + private void setWriterConnection(final Connection conn, final HostSpec host) { + this.writerConnection = conn; + this.writeEndpointHostSpec = host; + LOGGER.finest( + () -> Messages.get( + "ReadWriteSplittingPlugin.setWriterConnection", + new Object[] { + host.getUrl()})); + } + + private void setReaderConnection(final Connection conn, final HostSpec host) { + this.readerConnection = conn; + this.readEndpointHostSpec = host; + LOGGER.finest( + () -> Messages.get( + "ReadWriteSplittingPlugin.setReaderConnection", + new Object[] { + host.getUrl()})); + } + + void switchConnectionIfRequired(final boolean readOnly) throws SQLException { + final Connection currentConnection = this.pluginService.getCurrentConnection(); + if (currentConnection != null && currentConnection.isClosed()) { + logAndThrowException(Messages.get("ReadWriteSplittingPlugin.setReadOnlyOnClosedConnection"), + SqlState.CONNECTION_NOT_OPEN); + } + + final HostSpec currentHost = this.pluginService.getCurrentHostSpec(); + if (readOnly) { + if (!pluginService.isInTransaction() && !isReadEndpoint(currentHost)) { + try { + switchToReaderConnection(); + } catch (final SQLException e) { + if (!isConnectionUsable(currentConnection)) { + logAndThrowException( + Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", new Object[]{e.getMessage()}), + e); + } + // Failed to switch to the reader endpoint. The current connection will be used as a fallback. + LOGGER.fine(() -> Messages.get( + "SimpleReadWriteSplittingPlugin.fallbackToCurrentConnection", + new Object[] { + this.pluginService.getCurrentHostSpec().getUrl(), + e.getMessage()})); + } + } + } else { + if (!isWriteEndpoint(currentHost) && pluginService.isInTransaction()) { + logAndThrowException( + Messages.get("ReadWriteSplittingPlugin.setReadOnlyFalseInTransaction"), + SqlState.ACTIVE_SQL_TRANSACTION); + } + + if (!isWriteEndpoint(currentHost)) { + try { + switchToWriterConnection(); + LOGGER.finer(() -> Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", + new Object[] {writeEndpointHostSpec.getUrl()})); + } catch (final SQLException e) { + logAndThrowException(Messages.get("ReadWriteSplittingPlugin.errorSwitchingToWriter"), + e); + } + } + } + } + + private void logAndThrowException(final String logMessage) throws SQLException { + LOGGER.severe(logMessage); + throw new ReadWriteSplittingSQLException(logMessage); + } + + private void logAndThrowException(final String logMessage, final SqlState sqlState) + throws SQLException { + LOGGER.severe(logMessage); + throw new ReadWriteSplittingSQLException(logMessage, sqlState.getState()); + } + + private void logAndThrowException( + final String logMessage, final Throwable cause) + throws SQLException { + LOGGER.fine(logMessage); + throw new ReadWriteSplittingSQLException(logMessage, SqlState.CONNECTION_UNABLE_TO_CONNECT.getState(), cause); + } + + private void switchToReaderConnection() throws SQLException { + final Connection currentConnection = this.pluginService.getCurrentConnection(); + final HostSpec currentHost = this.pluginService.getCurrentHostSpec(); + if (isReadEndpoint(currentHost) && isConnectionUsable(currentConnection)) { + // Already connected to the read-only endpoint. + return; + } + + this.inReadWriteSplit = true; + if (!isConnectionUsable(this.readerConnection)) { + getNewReaderConnection(); + } else { + try { + switchCurrentConnectionTo(this.readerConnection, this.readEndpointHostSpec); + LOGGER.finer(() -> Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", + new Object[] {this.readEndpointHostSpec.getUrl()})); + } catch (SQLException e) { + if (e.getMessage() != null) { + LOGGER.warning( + () -> Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReaderWithCause", + new Object[] {this.readEndpointHostSpec.getUrl(), e.getMessage()})); + } else { + LOGGER.warning(() -> Messages.get("ReadWriteSplittingPlugin.errorSwitchingToCachedReader", + new Object[] {this.readEndpointHostSpec.getUrl()})); + } + + this.readerConnection.close(); + this.readerConnection = null; + getNewReaderConnection(); + } + } + } + + private void switchToWriterConnection() throws SQLException { + final Connection currentConnection = this.pluginService.getCurrentConnection(); + final HostSpec currentHost = this.pluginService.getCurrentHostSpec(); + if (isWriteEndpoint(currentHost) && isConnectionUsable(currentConnection)) { + // Already connected to the cluster/read-write endpoint. + return; + } + + this.inReadWriteSplit = true; + if (!isConnectionUsable(this.writerConnection)) { + getNewWriterConnection(); + } else { + switchCurrentConnectionTo(this.writerConnection, this.writeEndpointHostSpec); + } + } + + private void getNewWriterConnection() throws SQLException { + if (this.writeEndpointHostSpec == null) { + this.writeEndpointHostSpec = createHostSpec(this.writeEndpoint, HostRole.WRITER); + } + final Connection conn; + if (this.verifyNewConnections) { + conn = this.getVerifiedConnection( + this.properties, this.writeEndpointHostSpec, HostRole.WRITER, null); + } else { + conn = this.pluginService.connect(this.writeEndpointHostSpec, this.properties, this); + } + + if (conn == null) { + logAndThrowException( + Messages.get("SimpleReadWriteSplittingPlugin.failedToConnectToWriter", + new Object[]{this.writeEndpoint})); + } + + setWriterConnection(conn, writeEndpointHostSpec); + switchCurrentConnectionTo(this.writerConnection, writeEndpointHostSpec); + } + + private void getNewReaderConnection() throws SQLException { + if (this.readEndpointHostSpec == null) { + this.readEndpointHostSpec = createHostSpec(this.readEndpoint, HostRole.READER); + } + final Connection conn; + + if (this.verifyNewConnections) { + conn = this.getVerifiedConnection( + this.properties, this.readEndpointHostSpec, HostRole.READER, null); + } else { + conn = this.pluginService.connect(this.readEndpointHostSpec, this.properties, this); + } + + if (conn == null) { + logAndThrowException(Messages.get("ReadWriteSplittingPlugin.failedToConnectToReader", + new Object[]{this.readEndpoint}), + SqlState.CONNECTION_UNABLE_TO_CONNECT); + } + + LOGGER.finest( + () -> Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader", + new Object[]{readEndpointHostSpec.getUrl()})); + + // Store reader connection for reuse. + setReaderConnection(conn, readEndpointHostSpec); + switchCurrentConnectionTo(conn, this.readEndpointHostSpec); + LOGGER.finer(() -> Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", + new Object[] {readEndpoint})); + } + + private void switchCurrentConnectionTo( + final Connection newConnection, + final HostSpec newConnectionHost) + throws SQLException { + + final Connection currentConnection = this.pluginService.getCurrentConnection(); + if (currentConnection == newConnection) { + return; + } + this.pluginService.setCurrentConnection(newConnection, newConnectionHost); + LOGGER.finest(() -> Messages.get( + "ReadWriteSplittingPlugin.settingCurrentConnection", + new Object[] { + newConnectionHost.getUrl()})); + } + + private HostSpec createHostSpec(String host, HostRole role) { + HostSpecBuilder hostSpecBuilder = this.hostListProviderService.getHostSpecBuilder(); + return hostSpecBuilder + .host(host) + .port(this.hostListProviderService.getCurrentHostSpec().getPort()) + .role(role) + .availability(HostAvailability.AVAILABLE) + .build(); + } + + private boolean isConnectionUsable(final Connection connection) throws SQLException { + return connection != null && !connection.isClosed(); + } + + @Override + public void releaseResources() { + closeIdleConnections(); + } + + private void closeIdleConnections() { + LOGGER.finest(() -> Messages.get("ReadWriteSplittingPlugin.closingInternalConnections")); + closeConnectionIfIdle(this.readerConnection); + closeConnectionIfIdle(this.writerConnection); + this.readerConnection = null; + this.writerConnection = null; + } + + void closeConnectionIfIdle(final Connection internalConnection) { + final Connection currentConnection = this.pluginService.getCurrentConnection(); + try { + if (internalConnection != null + && internalConnection != currentConnection + && !internalConnection.isClosed()) { + internalConnection.close(); + } + } catch (final SQLException e) { + // ignore + } + } + + private void closeConnection(final Connection connection) { + if (connection != null) { + try { + connection.close(); + } catch (final SQLException ex) { + // ignore + } + } + } + + private void delay() { + try { + TimeUnit.MILLISECONDS.sleep(this.connectRetryIntervalMs); + } catch (InterruptedException ex) { + // ignore + } + } + + /** + * Methods for testing purposes only. + */ + Connection getWriterConnection() { + return this.writerConnection; + } + + Connection getReaderConnection() { + return this.readerConnection; + } +} diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginFactory.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginFactory.java new file mode 100644 index 000000000..ed2224003 --- /dev/null +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.jdbc.plugin.srw; + +import java.util.Properties; +import software.amazon.jdbc.ConnectionPlugin; +import software.amazon.jdbc.ConnectionPluginFactory; +import software.amazon.jdbc.PluginService; + +public class SimpleReadWriteSplittingPluginFactory implements ConnectionPluginFactory { + @Override + public ConnectionPlugin getInstance(final PluginService pluginService, final Properties props) { + return new SimpleReadWriteSplittingPlugin(pluginService, props); + } +} diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/staledns/AuroraStaleDnsHelper.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/staledns/AuroraStaleDnsHelper.java index 69864104d..420ba8840 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/staledns/AuroraStaleDnsHelper.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/staledns/AuroraStaleDnsHelper.java @@ -183,6 +183,7 @@ public void notifyNodeListChanged(final Map> LOGGER.finest(() -> Messages.get("AuroraStaleDnsHelper.reset")); this.writerHostSpec = null; this.writerHostAddress = null; + return; } } } diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index dbfc72a4b..ee6cef01c 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -317,12 +317,17 @@ ReadWriteSplittingPlugin.exceptionWhileExecutingCommand=Detected an exception wh ReadWriteSplittingPlugin.failoverExceptionWhileExecutingCommand=Detected a failover exception while executing a command: ''{0}'' ReadWriteSplittingPlugin.executingAgainstOldConnection=Executing method against old connection: ''{0}'' ReadWriteSplittingPlugin.noReadersAvailable=The plugin was unable to establish a reader connection to any reader instance. -ReadWriteSplittingPlugin.successfullyConnectedToReader=Successfully connected to a new reader host: ''{0}'' +ReadWriteSplittingPlugin.successfullyConnectedToReader=Successfully connected to reader host: ''{0}'' ReadWriteSplittingPlugin.failedToConnectToReader=Failed to connect to reader host: ''{0}'' ReadWriteSplittingPlugin.unsupportedHostSpecSelectorStrategy=Unsupported host selection strategy ''{0}'' specified in plugin configuration parameter ''readerHostSelectorStrategy''. Please visit the Read/Write Splitting Plugin documentation for all supported strategies. ReadWriteSplittingPlugin.errorVerifyingInitialHostSpecRole=An error occurred while obtaining the connected host's role. This could occur if the connection is broken or if you are not connected to an Aurora database. ReadWriteSplittingPlugin.previousReaderNotAllowed=The previous reader connection cannot be used because it is no longer in the list of allowed hosts. Previous reader: {0}. Allowed hosts: {1} +SimpleReadWriteSplittingPlugin.verificationFailed=The plugin was unable to establish a {0} connection within {1} ms. +SimpleReadWriteSplittingPlugin.failedToConnectToWriter=A writer connection was requested via setReadOnly, but the plugin was unable to establish a writer connection with the writer endpoint ''{0}''. +SimpleReadWriteSplittingPlugin.missingRequiredConfigParameter=Configuration parameter ''{0}'' is required. +SimpleReadWriteSplittingPlugin.fallbackToCurrentConnection=Failed to switch to reader host. The current connection will be used as a fallback: ''{0}''. Error: {1} + SAMLCredentialsProviderFactory.getSamlAssertionFailed=Failed to get SAML Assertion due to exception: ''{0}'' SamlAuthPlugin.javaStsSdkNotInClasspath=Required dependency 'AWS Java SDK for AWS Secret Token Service' is not on the classpath. SamlAuthPlugin.unhandledException=Unhandled exception: ''{0}'' diff --git a/wrapper/src/test/java/integration/container/ConnectionStringHelper.java b/wrapper/src/test/java/integration/container/ConnectionStringHelper.java index 7e8b2dfc8..84d176959 100644 --- a/wrapper/src/test/java/integration/container/ConnectionStringHelper.java +++ b/wrapper/src/test/java/integration/container/ConnectionStringHelper.java @@ -106,6 +106,19 @@ public static String getWrapperUrl(TestInstanceInfo instance) { TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()); } + public static String getWrapperUrl(String host) { + return getWrapperUrl( + TestEnvironment.getCurrent().getCurrentDriver(), + host, + TestEnvironment.getCurrent() + .getInfo() + .getDatabaseInfo() + .getInstances() + .get(0) + .getPort(), + TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getDefaultDbName()); + } + public static String getWrapperUrl(String host, int port, String databaseName) { return getWrapperUrl(TestEnvironment.getCurrent().getCurrentDriver(), host, port, databaseName); } diff --git a/wrapper/src/test/java/integration/container/tests/ReadWriteSplittingTests.java b/wrapper/src/test/java/integration/container/tests/ReadWriteSplittingTests.java index b8c96edbe..c973c401b 100644 --- a/wrapper/src/test/java/integration/container/tests/ReadWriteSplittingTests.java +++ b/wrapper/src/test/java/integration/container/tests/ReadWriteSplittingTests.java @@ -54,6 +54,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -120,7 +121,7 @@ public void tearDownEach() { } - protected static Properties getProxiedPropsWithFailover() { + protected Properties getProxiedPropsWithFailover() { final Properties props = getPropsWithFailover(); AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.set(props, "?." + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointSuffix() @@ -128,7 +129,7 @@ protected static Properties getProxiedPropsWithFailover() { return props; } - protected static Properties getProxiedProps() { + protected Properties getProxiedProps() { final Properties props = getProps(); AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.set(props, "?." + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointSuffix() @@ -145,18 +146,34 @@ protected static Properties getDefaultPropsNoPlugins() { return props; } - protected static Properties getProps() { + protected Properties getProps() { final Properties props = getDefaultPropsNoPlugins(); PropertyDefinition.PLUGINS.set(props, "readWriteSplitting"); return props; } - protected static Properties getPropsWithFailover() { + protected Properties getPropsWithFailover() { final Properties props = getDefaultPropsNoPlugins(); PropertyDefinition.PLUGINS.set(props, "failover,efm2,readWriteSplitting"); return props; } + protected String getWriterEndpoint() { + return TestEnvironment.getCurrent() + .getInfo() + .getDatabaseInfo() + .getInstances() + .get(0) + .getHost(); + } + + protected String getReaderEndpoint() { + return TestEnvironment.getCurrent() + .getInfo() + .getDatabaseInfo() + .getClusterReadOnlyEndpoint(); + } + @TestTemplate public void test_connectToWriter_switchSetReadOnly() throws SQLException { final String url = ConnectionStringHelper.getWrapperUrl(); @@ -199,7 +216,7 @@ public void test_connectToReader_setReadOnlyTrueFalse() throws SQLException { conn.setReadOnly(true); final String currentConnectionId = auroraUtil.queryInstanceId(conn); - assertEquals(readerConnectionId, currentConnectionId); + assertTrue(connectedToCorrectReaderInstance(readerConnectionId, currentConnectionId)); conn.setReadOnly(false); final String writerConnectionId = auroraUtil.queryInstanceId(conn); @@ -208,6 +225,11 @@ public void test_connectToReader_setReadOnlyTrueFalse() throws SQLException { } } + boolean connectedToCorrectReaderInstance(String readerConnectionId, String currentConnectionId) { + // When connected to a reader, the ReadWriteSplittingPlugin does not change connections on conn.setReadOnly(true). + return Objects.equals(readerConnectionId, currentConnectionId); + } + // Assumes the writer is stored as the first instance and all other instances are readers. protected String getWrapperReaderInstanceUrl() { TestInstanceInfo readerInstance = @@ -217,7 +239,7 @@ protected String getWrapperReaderInstanceUrl() { @TestTemplate public void test_connectToReaderCluster_setReadOnlyTrueFalse() throws SQLException { - final String url = ConnectionStringHelper.getWrapperReaderClusterUrl(); + final String url = ConnectionStringHelper.getWrapperUrl(getReaderEndpoint()); LOGGER.finest("Connecting to url " + url); try (final Connection conn = DriverManager.getConnection(url, getProps())) { final String readerConnectionId = auroraUtil.queryInstanceId(conn); @@ -294,7 +316,8 @@ public void test_setReadOnlyFalseInTransaction_setAutocommitFalse() throws SQLEx @TestTemplate @EnableOnDatabaseEngine({DatabaseEngine.MYSQL}) public void test_setReadOnlyTrueInTransaction() throws SQLException { - try (final Connection conn = DriverManager.getConnection(ConnectionStringHelper.getWrapperUrl(), getProps())) { + try (final Connection conn = DriverManager.getConnection( + ConnectionStringHelper.getWrapperUrl(getWriterEndpoint()), getProps())) { final String writerConnectionId = auroraUtil.queryInstanceId(conn); @@ -336,7 +359,9 @@ public void test_setReadOnlyTrue_allReadersDown() throws SQLException { final String writerConnectionId = auroraUtil.queryInstanceId(conn); - // Kill all reader instances + // Kill reader endpoint and all reader instances + ProxyHelper.disableConnectivity(TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo() + .getClusterReadOnlyEndpoint()); final List instanceIDs = TestEnvironment.getCurrent().getInfo().getDatabaseInfo().getInstances().stream() .map(TestInstanceInfo::getInstanceId).collect(Collectors.toList()); @@ -448,8 +473,11 @@ public void test_writerFailover_setReadOnlyTrueFalse() throws SQLException { final String originalWriterId = auroraUtil.queryInstanceId(conn); - // Kill all reader instances - List instances = TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstances(); + // Kill reader endpoint and all reader instances + ProxyHelper.disableConnectivity(TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo() + .getClusterReadOnlyEndpoint()); + List instances = TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo() + .getInstances(); for (int i = 1; i < instances.size(); i++) { ProxyHelper.disableConnectivity(instances.get(i).getInstanceId()); } @@ -559,7 +587,7 @@ public void test_failoverReaderToWriter_setReadOnlyTrueFalse() final List instances = TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo() .getInstances(); - // Kill all instances except the writer + // Kill all instances except the writer and cluster endpoint for (final TestInstanceInfo instance : instances) { final String instanceId = instance.getInstanceId(); if (writerConnectionId.equals(instanceId)) { @@ -567,6 +595,8 @@ public void test_failoverReaderToWriter_setReadOnlyTrueFalse() } ProxyHelper.disableConnectivity(instanceId); } + ProxyHelper.disableConnectivity(TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo() + .getClusterReadOnlyEndpoint()); auroraUtil.assertFirstQueryThrows(conn, FailoverSuccessSQLException.class); assertFalse(conn.isClosed()); diff --git a/wrapper/src/test/java/integration/container/tests/SimpleReadWriteSplittingTest.java b/wrapper/src/test/java/integration/container/tests/SimpleReadWriteSplittingTest.java new file mode 100644 index 000000000..bf01874cd --- /dev/null +++ b/wrapper/src/test/java/integration/container/tests/SimpleReadWriteSplittingTest.java @@ -0,0 +1,205 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.container.tests; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import integration.TestEnvironmentFeatures; +import integration.container.ConnectionStringHelper; +import integration.container.TestDriverProvider; +import integration.container.TestEnvironment; +import integration.container.condition.DisableOnTestFeature; +import integration.container.condition.EnableOnNumOfInstances; +import integration.container.condition.EnableOnTestFeature; +import integration.container.condition.MakeSureFirstInstanceWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import software.amazon.jdbc.PropertyDefinition; +import software.amazon.jdbc.hostlistprovider.AuroraHostListProvider; +import software.amazon.jdbc.plugin.srw.SimpleReadWriteSplittingPlugin; + +@TestMethodOrder(MethodOrderer.MethodName.class) +@ExtendWith(TestDriverProvider.class) +@EnableOnTestFeature(TestEnvironmentFeatures.FAILOVER_SUPPORTED) +@DisableOnTestFeature({ + TestEnvironmentFeatures.PERFORMANCE, + TestEnvironmentFeatures.RUN_HIBERNATE_TESTS_ONLY, + TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY, + TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT, + TestEnvironmentFeatures.RUN_DB_METRICS_ONLY}) +@EnableOnNumOfInstances(min = 2) +@MakeSureFirstInstanceWriter +@Order(15) + + +public class SimpleReadWriteSplittingTest extends ReadWriteSplittingTests { + String pluginCode = "srw"; + String pluginCodesWithFailover = "failover2,efm2,srw"; + + protected Properties getSrwProps(boolean proxied, String plugins) { + final Properties props = getDefaultPropsNoPlugins(); + PropertyDefinition.PLUGINS.set(props, plugins); + if (proxied) { + props.setProperty(SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.name, + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getClusterEndpoint()); + props.setProperty(SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.name, + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getClusterReadOnlyEndpoint()); + } else { + props.setProperty(SimpleReadWriteSplittingPlugin.VERIFY_NEW_SRW_CONNECTIONS.name, "false"); + props.setProperty(SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.name, getWriterEndpoint()); + props.setProperty(SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.name, getReaderEndpoint()); + } + return props; + } + + @TestTemplate + public void test_IncorrectReaderEndpoint() throws SQLException { + final Properties props = getDefaultPropsNoPlugins(); + PropertyDefinition.PLUGINS.set(props, pluginCode); + props.setProperty(SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.name, getWriterEndpoint()); + props.setProperty(SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.name, getWriterEndpoint()); + + try (final Connection conn = DriverManager.getConnection( + ConnectionStringHelper.getWrapperClusterEndpointUrl(), props)) { + final String writerConnectionId = auroraUtil.queryInstanceId(conn); + + // Switch to reader successfully + conn.setReadOnly(true); + final String readerConnectionId = auroraUtil.queryInstanceId(conn); + // Should stay on writer as fallback since reader endpoint points to a writer. + assertEquals(writerConnectionId, readerConnectionId); + + // Going to the write endpoint will be the same connection again. + conn.setReadOnly(false); + final String finalConnectionId = auroraUtil.queryInstanceId(conn); + assertEquals(writerConnectionId, finalConnectionId); + } + } + + @TestTemplate + public void test_autoCommitStatePreserved_acrossConnectionSwitches() throws SQLException { + try (final Connection conn = DriverManager.getConnection(ConnectionStringHelper.getWrapperUrl(), getProps())) { + + // Set autoCommit to false on writer + conn.setAutoCommit(false); + assertFalse(conn.getAutoCommit()); + final String writerConnectionId = auroraUtil.queryInstanceId(conn); + conn.commit(); + + // Switch to reader - autoCommit should remain false + conn.setReadOnly(true); + assertFalse(conn.getAutoCommit()); + final String readerConnectionId = auroraUtil.queryInstanceId(conn); + assertNotEquals(writerConnectionId, readerConnectionId); + + // Change autoCommit on reader + conn.setAutoCommit(true); + assertTrue(conn.getAutoCommit()); + + // Switch back to writer - autoCommit should be true + conn.setReadOnly(false); + assertTrue(conn.getAutoCommit()); + final String finalWriterConnectionId = auroraUtil.queryInstanceId(conn); + assertEquals(writerConnectionId, finalWriterConnectionId); + } + } + + @Override + protected Properties getProps() { + return getSrwProps(false, pluginCode); + } + + @Override + protected Properties getProxiedPropsWithFailover() { + final Properties props = getSrwProps(true, pluginCodesWithFailover); + AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.set(props, + "?." + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointSuffix() + + ":" + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointPort()); + return props; + } + + @Override + protected Properties getProxiedProps() { + final Properties props = getSrwProps(true, pluginCode); + AuroraHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.set(props, + "?." + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointSuffix() + + ":" + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointPort()); + return props; + } + + @Override + protected Properties getPropsWithFailover() { + return getSrwProps(false, pluginCodesWithFailover); + } + + @Override + protected boolean connectedToCorrectReaderInstance(String readerConnectionId, String currentConnectionId) { + // On conn.setReadOnly(true), the SimpleReadWriteSplittingPlugin ensures connection to the reader endpoint. + // If connected to a reader instance originally, the connection will change: return true no matter values. + return true; + } + + @Override + protected String getWriterEndpoint() { + return TestEnvironment.getCurrent() + .getInfo() + .getDatabaseInfo() + .getClusterEndpoint(); + } + + @TestTemplate + @Disabled("Skipping because it's not applicable to SimpleReadWriteSplitting.") + @Override + public void test_pooledConnectionFailoverWithClusterURL() { + // Skip this test for simple read write splitting as it relies on there NOT being a stored read/write splitting + // connection to the cluster endpoint. + } + + @TestTemplate + @Disabled("Skipping because it's not applicable to SimpleReadWriteSplitting.") + @Override + public void test_failoverToNewReader_setReadOnlyFalseTrue() { + // Skip this test for simple read write splitting as disabling connectivity to a reader cluster endpoint does not + // trigger reader to reader failover but rather forces defaulting to the writer. + } + + @TestTemplate + @Disabled("Skipping because it's not applicable to SimpleReadWriteSplitting.") + @Override + public void test_pooledConnection_leastConnectionsStrategy() { + // Skip this test for simple read write splitting as there is no reader selection strategy. + } + + @TestTemplate + @Disabled("Skipping because it's not applicable to SimpleReadWriteSplitting.") + @Override + public void test_pooledConnection_leastConnectionsWithPoolMapping() { + // Skip this test for simple read write splitting as there is no reader selection strategy. + } +} + diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/customendpoint/CustomEndpointMonitorImplTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/customendpoint/CustomEndpointMonitorImplTest.java index afa6570e0..a6f75fbf4 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/customendpoint/CustomEndpointMonitorImplTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/customendpoint/CustomEndpointMonitorImplTest.java @@ -39,6 +39,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.testcontainers.shaded.org.checkerframework.checker.nullness.qual.Nullable; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.rds.RdsClient; import software.amazon.awssdk.services.rds.model.DBClusterEndpoint; @@ -123,9 +124,16 @@ public void testRun() throws InterruptedException { mockRdsClientFunc); monitor.start(); - // Wait for 2 run cycles. The first will return an unexpected number of endpoints in the API response, the second - // will return the expected number of endpoints (one). - TimeUnit.MILLISECONDS.sleep(100); + // Wait for after 2 run cycles. The first will return an unexpected number of endpoints in the API response, the + // second will return the expected number of endpoints (one). + @Nullable CustomEndpointInfo customEndpointInfo = CustomEndpointMonitorImpl.customEndpointInfoCache + .get(host.getUrl()); + int runCycles = 0; + while (customEndpointInfo == null && runCycles < 5) { + TimeUnit.MILLISECONDS.sleep(50); + runCycles++; + customEndpointInfo = CustomEndpointMonitorImpl.customEndpointInfoCache.get(host.getUrl()); + } assertEquals(expectedInfo, CustomEndpointMonitorImpl.customEndpointInfoCache.get(host.getUrl())); monitor.stop(); diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginTest.java new file mode 100644 index 000000000..fd4cc7868 --- /dev/null +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/srw/SimpleReadWriteSplittingPluginTest.java @@ -0,0 +1,630 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.jdbc.plugin.srw; + +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.EnumSet; +import java.util.Properties; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import software.amazon.jdbc.HostListProviderService; +import software.amazon.jdbc.HostRole; +import software.amazon.jdbc.HostSpec; +import software.amazon.jdbc.HostSpecBuilder; +import software.amazon.jdbc.JdbcCallable; +import software.amazon.jdbc.NodeChangeOptions; +import software.amazon.jdbc.OldConnectionSuggestedAction; +import software.amazon.jdbc.PluginService; +import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy; +import software.amazon.jdbc.plugin.AuroraInitialConnectionStrategyPlugin; +import software.amazon.jdbc.plugin.readwritesplitting.ReadWriteSplittingSQLException; + +public class SimpleReadWriteSplittingPluginTest { + private static final int TEST_PORT = 5432; + private static final String WRITE_ENDPOINT = "writer.cluster-xyz.us-east-1.rds.amazonaws.com"; + private static final String READ_ENDPOINT = "reader.cluster-xyz.us-east-1.rds.amazonaws.com"; + private static final Properties defaultProps = new Properties(); + + private final HostSpec writerHostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host(WRITE_ENDPOINT).port(TEST_PORT).role(HostRole.WRITER).build(); + private final HostSpec readerHostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host(READ_ENDPOINT).port(TEST_PORT).role(HostRole.READER).build(); + + private AutoCloseable closeable; + + @Mock private JdbcCallable mockConnectFunc; + @Mock private PluginService mockPluginService; + @Mock private HostListProviderService mockHostListProviderService; + @Mock private Connection mockWriterConn; + @Mock private Connection mockClosedWriterConn; + @Mock private Connection mockReaderConn; + @Mock private Statement mockStatement; + @Mock private ResultSet mockResultSet; + @Mock private EnumSet mockChanges; + @Mock private HostSpecBuilder mockHostSpecBuilder; + + @BeforeEach + public void init() throws SQLException { + closeable = MockitoAnnotations.openMocks(this); + mockDefaultBehavior(); + setupDefaultProperties(); + } + + @AfterEach + void cleanUp() throws Exception { + closeable.close(); + defaultProps.clear(); + } + + void setupDefaultProperties() { + SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.set(defaultProps, WRITE_ENDPOINT); + SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.set(defaultProps, READ_ENDPOINT); + } + + void mockDefaultBehavior() throws SQLException { + when(this.mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(this.mockPluginService.getCurrentHostSpec()).thenReturn(writerHostSpec); + when(this.mockPluginService.connect(eq(writerHostSpec), any(Properties.class), any())) + .thenReturn(mockWriterConn); + when(this.mockPluginService.getHostRole(mockWriterConn)).thenReturn(HostRole.WRITER); + when(this.mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + when(this.mockPluginService.connect(eq(readerHostSpec), any(Properties.class), any())) + .thenReturn(mockReaderConn); + when(this.mockConnectFunc.call()).thenReturn(mockWriterConn); + when(mockWriterConn.createStatement()).thenReturn(mockStatement); + when(mockReaderConn.createStatement()).thenReturn(mockStatement); + when(mockStatement.executeQuery(any(String.class))).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true); + when(mockClosedWriterConn.isClosed()).thenReturn(true); + when(mockHostListProviderService.getHostSpecBuilder()).thenReturn(mockHostSpecBuilder); + when(mockHostListProviderService.getCurrentHostSpec()).thenReturn(writerHostSpec); + when(mockHostSpecBuilder.host(any())).thenReturn(mockHostSpecBuilder); + when(mockHostSpecBuilder.port(any(Integer.class))).thenReturn(mockHostSpecBuilder); + when(mockHostSpecBuilder.role(any())).thenReturn(mockHostSpecBuilder); + when(mockHostSpecBuilder.availability(any())).thenReturn(mockHostSpecBuilder); + when(mockHostSpecBuilder.build()).thenReturn(writerHostSpec, readerHostSpec); + } + + @Test + public void testConstructor_missingWriteEndpoint() { + Properties props = new Properties(); + // No write endpoint set + SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.set(props, READ_ENDPOINT); + + assertThrows(RuntimeException.class, () -> + new SimpleReadWriteSplittingPlugin(mockPluginService, props, null, null, null, null, null)); + } + + @Test + public void testSwitchToReader_noReaderEndpoint() throws SQLException { + Properties props = new Properties(); + SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.set(props, WRITE_ENDPOINT); + // No read endpoint set + + assertThrows(RuntimeException.class, () -> + new SimpleReadWriteSplittingPlugin(mockPluginService, props, null, null, null, null, null)); + } + + @Test + public void testSetReadOnly_trueFalse() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(writerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + null, + writerHostSpec, + readerHostSpec); + + plugin.switchConnectionIfRequired(true); + + verify(mockPluginService, times(1)) + .setCurrentConnection(eq(mockReaderConn), eq(readerHostSpec)); + assertEquals(mockReaderConn, plugin.getReaderConnection()); + assertEquals(mockWriterConn, plugin.getWriterConnection()); + + when(mockPluginService.getCurrentConnection()).thenReturn(mockReaderConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(readerHostSpec); + + plugin.switchConnectionIfRequired(false); + + verify(mockPluginService, times(1)) + .setCurrentConnection(eq(mockWriterConn), eq(writerHostSpec)); + } + + @Test + public void testSetReadOnlyTrue_alreadyOnReader() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockReaderConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(readerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + mockReaderConn, + null, null); + + plugin.switchConnectionIfRequired(true); + + verify(mockPluginService, never()) + .setCurrentConnection(any(Connection.class), any(HostSpec.class)); + } + + @Test + public void testSetReadOnlyFalse_alreadyOnWriter() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(writerHostSpec); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + mockReaderConn, + null, null); + + plugin.switchConnectionIfRequired(false); + + verify(mockPluginService, never()) + .setCurrentConnection(any(Connection.class), any(HostSpec.class)); + } + + @Test + public void testSetReadOnlyFalse_inTransaction() { + when(mockPluginService.getCurrentConnection()).thenReturn(mockReaderConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(readerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(true); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + mockReaderConn, + null, null); + + assertThrows(ReadWriteSplittingSQLException.class, () -> + plugin.switchConnectionIfRequired(false)); + } + + @Test + public void testSetReadOnly_closedConnection() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockClosedWriterConn); + when(mockClosedWriterConn.isClosed()).thenReturn(true); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + mockReaderConn, + null, null); + + assertThrows(ReadWriteSplittingSQLException.class, () -> + plugin.switchConnectionIfRequired(true)); + } + + @Test + public void testNotifyConnectionChanged_inReadWriteSplit() { + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + mockReaderConn, + writerHostSpec, + readerHostSpec); + + // Simulate being in read-write split mode + try { + plugin.switchConnectionIfRequired(true); + } catch (SQLException e) { + // ignore for test setup + } + + OldConnectionSuggestedAction result = plugin.notifyConnectionChanged(mockChanges); + assertEquals(OldConnectionSuggestedAction.PRESERVE, result); + } + + @Test + public void testNotifyConnectionChanged_notInReadWriteSplit() { + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + mockReaderConn, + null, null); + + OldConnectionSuggestedAction result = plugin.notifyConnectionChanged(mockChanges); + assertEquals(OldConnectionSuggestedAction.NO_OPINION, result); + } + + @Test + public void testReleaseResources() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(mockReaderConn.isClosed()).thenReturn(false); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + mockWriterConn, + mockReaderConn, + null, null); + + plugin.releaseResources(); + + verify(mockReaderConn, times(1)).close(); + } + + @Test + public void testWrongRoleConnection_writerEndpointToReader() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockReaderConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(readerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + when(mockPluginService.getHostRole(any())).thenReturn(HostRole.READER); // Wrong role + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + null, + null); + + assertThrows(ReadWriteSplittingSQLException.class, () -> + plugin.switchConnectionIfRequired(false)); + } + + @Test + public void testWrongRoleConnection_readerEndpointToWriter() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(writerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + when(mockPluginService.getHostRole(any())).thenReturn(HostRole.WRITER); // Wrong role for reader + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + null, null); + + plugin.switchConnectionIfRequired(true); + + // While it should use the current connection as fallback, it should not store it. + assertEquals(null, plugin.getReaderConnection()); + } + + @Test + public void testGetVerifiedConnection_wrongRoleRetryReader() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(writerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + when(mockPluginService.connect(eq(readerHostSpec), any(Properties.class), any())) + .thenReturn(mockWriterConn) // First call returns wrong role + .thenReturn(mockReaderConn); // Second call returns correct role + when(mockPluginService.getHostRole(mockWriterConn)).thenReturn(HostRole.WRITER); + when(mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + null, + readerHostSpec); + + plugin.switchConnectionIfRequired(true); + + verify(mockPluginService, times(2)) + .connect(eq(readerHostSpec), any(Properties.class), any()); + verify(mockWriterConn, times(1)).close(); + assertEquals(mockReaderConn, plugin.getReaderConnection()); + } + + @Test + public void testGetVerifiedConnection_wrongRoleRetryWriter() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockReaderConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(readerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + when(mockPluginService.connect(eq(writerHostSpec), any(Properties.class), any())) + .thenReturn(mockReaderConn) // First call returns wrong role + .thenReturn(mockWriterConn); // Second call returns correct role + when(mockPluginService.getHostRole(mockWriterConn)).thenReturn(HostRole.WRITER); + when(mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + writerHostSpec, + null); + + plugin.switchConnectionIfRequired(false); + + verify(mockPluginService, times(2)) + .connect(eq(writerHostSpec), any(Properties.class), any()); + verify(mockReaderConn, times(1)).close(); + assertEquals(mockWriterConn, plugin.getWriterConnection()); + } + + @Test + public void testGetVerifiedConnection_sqlExceptionRetry() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(writerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + when(mockPluginService.connect(eq(readerHostSpec), any(Properties.class), any())) + .thenThrow(new SQLException("Connection failed")) + .thenReturn(mockReaderConn); + when(mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + when(mockPluginService.isLoginException(any(SQLException.class), any())).thenReturn(false); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + null, + readerHostSpec); + + plugin.switchConnectionIfRequired(true); + + verify(mockPluginService, times(2)) + .connect(eq(readerHostSpec), any(Properties.class), any()); + assertEquals(mockReaderConn, plugin.getReaderConnection()); + } + + @Test + public void testGetVerifiedConnection_loginExceptionRetry() throws SQLException { + when(mockPluginService.getCurrentConnection()).thenReturn(mockWriterConn); + when(mockPluginService.getCurrentHostSpec()).thenReturn(writerHostSpec); + when(mockPluginService.isInTransaction()).thenReturn(false); + when(mockPluginService.connect(eq(readerHostSpec), any(Properties.class), any())) + .thenThrow(new SQLException("Login exception")) + .thenReturn(mockReaderConn); + when(mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + when(mockPluginService.isLoginException(any(SQLException.class), any())).thenReturn(true); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + null, + readerHostSpec); + + plugin.switchConnectionIfRequired(true); + + verify(mockPluginService, times(1)) + .connect(eq(readerHostSpec), any(Properties.class), any()); + // While it should use the current connection as fallback, it should not store it. + assertEquals(null, plugin.getReaderConnection()); + } + + @Test + public void testConnect_nonInitialConnection() throws SQLException { + when(mockConnectFunc.call()).thenReturn(mockWriterConn); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, defaultProps); + + Connection result = plugin.connect("jdbc:postgresql", writerHostSpec, defaultProps, false, mockConnectFunc); + + assertEquals(mockWriterConn, result); + verify(mockConnectFunc, times(1)).call(); + } + + @Test + public void testConnect_verificationDisabled() throws SQLException { + Properties props = new Properties(); + SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.set(props, WRITE_ENDPOINT); + SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.set(props, READ_ENDPOINT); + SimpleReadWriteSplittingPlugin.VERIFY_NEW_SRW_CONNECTIONS.set(props, "false"); + + when(mockConnectFunc.call()).thenReturn(mockWriterConn); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, props); + + Connection result = plugin.connect("jdbc:postgresql", writerHostSpec, props, true, mockConnectFunc); + + assertEquals(mockWriterConn, result); + verify(mockConnectFunc, times(1)).call(); + } + + @Test + public void testConnect_writerClusterEndpoint() throws SQLException { + final HostSpec writerClusterHost = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("test-cluster.cluster-xyz.us-east-1.rds.amazonaws.com").port(TEST_PORT).role(HostRole.WRITER).build(); + + when(mockConnectFunc.call()).thenReturn(mockWriterConn); + when(mockPluginService.connect(eq(writerClusterHost), any(Properties.class), any())) + .thenReturn(mockWriterConn); + when(mockPluginService.getHostRole(mockWriterConn)).thenReturn(HostRole.WRITER); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + null, + null); + + Connection result = plugin.connect("jdbc:postgresql", writerClusterHost, defaultProps, true, mockConnectFunc); + + assertEquals(mockWriterConn, result); + verify(mockPluginService, times(1)).connect(eq(writerClusterHost), any(Properties.class), any()); + } + + @Test + public void testConnect_readerClusterEndpoint() throws SQLException { + HostSpec readerClusterHost = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("test-cluster.cluster-ro-xyz.us-east-1.rds.amazonaws.com").port(TEST_PORT).role(HostRole.READER).build(); + + when(mockConnectFunc.call()).thenReturn(mockReaderConn); + when(mockPluginService.connect(eq(readerClusterHost), any(Properties.class), any())) + .thenReturn(mockReaderConn); + when(mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + defaultProps, + mockHostListProviderService, + null, + null, + null, + null); + + Connection result = plugin.connect("jdbc:postgresql", readerClusterHost, defaultProps, true, mockConnectFunc); + + assertEquals(mockReaderConn, result); + verify(mockPluginService, times(1)).connect(eq(readerClusterHost), any(Properties.class), any()); + } + + @Test + public void testConnect_verificationFailsFallback() throws SQLException { + final HostSpec writerClusterHost = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("test-cluster.cluster-xyz.us-east-1.rds.amazonaws.com").port(TEST_PORT).role(HostRole.WRITER).build(); + + Properties timeoutProps = new Properties(); + SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.set(timeoutProps, WRITE_ENDPOINT); + SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.set(timeoutProps, READ_ENDPOINT); + SimpleReadWriteSplittingPlugin.SRW_CONNECT_RETRY_TIMEOUT_MS.set(timeoutProps, "100"); + + when(mockConnectFunc.call()).thenReturn(mockWriterConn); + when(mockPluginService.connect(eq(writerClusterHost), any(Properties.class), any())) + .thenReturn(mockReaderConn); + when(mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, timeoutProps); + + Connection result = plugin.connect("jdbc:postgresql", writerClusterHost, timeoutProps, true, mockConnectFunc); + + assertEquals(mockWriterConn, result); + verify(mockConnectFunc, times(1)).call(); + } + + @Test + public void testConnect_nonRdsClusterEndpoint() throws SQLException { + final HostSpec customHost = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("custom-db.example.com").port(TEST_PORT).role(HostRole.WRITER).build(); + + when(mockConnectFunc.call()).thenReturn(mockWriterConn); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, defaultProps); + + Connection result = plugin.connect("jdbc:postgresql", customHost, defaultProps, true, mockConnectFunc); + + assertEquals(mockWriterConn, result); + verify(mockConnectFunc, times(1)).call(); + } + + @Test + public void testConnect_nonRdsClusterEndpointWriterVerify() throws SQLException { + final HostSpec customHost = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("custom-db.example.com").port(TEST_PORT).role(HostRole.WRITER).build(); + + Properties props = new Properties(); + SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.set(props, WRITE_ENDPOINT); + SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.set(props, READ_ENDPOINT); + AuroraInitialConnectionStrategyPlugin.VERIFY_OPENED_CONNECTION_TYPE.set(props, "writer"); + + //when(mockConnectFunc.call()).thenReturn(mockWriterConn); + when(mockPluginService.connect(eq(customHost), any(Properties.class), any())) + .thenReturn(mockWriterConn); + when(mockPluginService.getHostRole(mockWriterConn)).thenReturn(HostRole.WRITER); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + props, + mockHostListProviderService, + null, + null, + null, + null); + + Connection result = plugin.connect("jdbc:postgresql", customHost, defaultProps, true, mockConnectFunc); + + assertEquals(mockWriterConn, result); + verify(mockPluginService, times(1)).connect(eq(customHost), any(Properties.class), any()); + verify(mockConnectFunc, times(0)).call(); + } + + @Test + public void testConnect_nonRdsClusterEndpointReaderVerify() throws SQLException { + final HostSpec customHost = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) + .host("custom-db.example.com").port(TEST_PORT).role(HostRole.READER).build(); + + Properties props = new Properties(); + SimpleReadWriteSplittingPlugin.SRW_WRITE_ENDPOINT.set(props, WRITE_ENDPOINT); + SimpleReadWriteSplittingPlugin.SRW_READ_ENDPOINT.set(props, READ_ENDPOINT); + AuroraInitialConnectionStrategyPlugin.VERIFY_OPENED_CONNECTION_TYPE.set(props, "reader"); + + //when(mockConnectFunc.call()).thenReturn(mockWriterConn); + when(mockPluginService.connect(eq(customHost), any(Properties.class), any())) + .thenReturn(mockReaderConn); + when(mockPluginService.getHostRole(mockReaderConn)).thenReturn(HostRole.READER); + + final SimpleReadWriteSplittingPlugin plugin = new SimpleReadWriteSplittingPlugin( + mockPluginService, + props, + mockHostListProviderService, + null, + null, + null, + null); + + Connection result = plugin.connect("jdbc:postgresql", customHost, defaultProps, true, mockConnectFunc); + + assertEquals(mockReaderConn, result); + verify(mockPluginService, times(1)).connect(eq(customHost), any(Properties.class), any()); + verify(mockConnectFunc, times(0)).call(); + } +}