Skip to content

Commit

Permalink
add effective connection provider to ConnectionProviderManager; allow…
Browse files Browse the repository at this point in the history
… ReadWriteSplittingPlugin to determine a correct connection provider type.
  • Loading branch information
sergiyvamz committed Dec 5, 2024
1 parent 6fe3bd9 commit 1484f1f
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ public ConnectionProvider getDefaultConnProvider() {
return this.defaultConnProvider;
}

public ConnectionProvider getEffectiveConnProvider() {
return this.effectiveConnProvider;
}

private interface PluginPipeline<T, E extends Exception> {

T call(final @NonNull ConnectionPlugin plugin, final @Nullable JdbcCallable<T, E> jdbcMethodFunc) throws E;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicReference;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.jdbc.cleanup.CanReleaseResources;

public class ConnectionProviderManager {

private static final ReentrantReadWriteLock connProviderLock = new ReentrantReadWriteLock();
private static ConnectionProvider connProvider = null;
private static AtomicReference<ConnectionProvider> customConnectionProvider = new AtomicReference<>(null);

private final ConnectionProvider defaultProvider;
private final @Nullable ConnectionProvider effectiveConnProvider;

private static ConnectionInitFunc connectionInitFunc = null;

Expand All @@ -39,9 +40,14 @@ public class ConnectionProviderManager {
* @param defaultProvider the default {@link ConnectionProvider} to use if a non-default
* ConnectionProvider has not been set or the non-default
* ConnectionProvider has been set but does not accept a requested URL
* @param effectiveConnProvider the non-default {@link ConnectionProvider} to use
*
*/
public ConnectionProviderManager(ConnectionProvider defaultProvider) {
public ConnectionProviderManager(
final ConnectionProvider defaultProvider,
final @Nullable ConnectionProvider effectiveConnProvider) {
this.defaultProvider = defaultProvider;
this.effectiveConnProvider = effectiveConnProvider;
}

/**
Expand All @@ -53,12 +59,7 @@ public ConnectionProviderManager(ConnectionProvider defaultProvider) {
* @param connProvider the {@link ConnectionProvider} to use to establish new connections
*/
public static void setConnectionProvider(ConnectionProvider connProvider) {
connProviderLock.writeLock().lock();
try {
ConnectionProviderManager.connProvider = connProvider;
} finally {
connProviderLock.writeLock().unlock();
}
customConnectionProvider.set(connProvider);
}

/**
Expand All @@ -76,15 +77,14 @@ public static void setConnectionProvider(ConnectionProvider connProvider) {
*/
public ConnectionProvider getConnectionProvider(
String driverProtocol, HostSpec host, Properties props) {
if (connProvider != null) {
connProviderLock.readLock().lock();
try {
if (connProvider != null && connProvider.acceptsUrl(driverProtocol, host, props)) {
return connProvider;
}
} finally {
connProviderLock.readLock().unlock();
}

final ConnectionProvider tmpCustomConnectionProvider = customConnectionProvider.get();
if (tmpCustomConnectionProvider != null && tmpCustomConnectionProvider.acceptsUrl(driverProtocol, host, props)) {
return tmpCustomConnectionProvider;
}

if (this.effectiveConnProvider != null && this.effectiveConnProvider.acceptsUrl(driverProtocol, host, props)) {
return this.effectiveConnProvider;
}

return defaultProvider;
Expand All @@ -110,23 +110,16 @@ public ConnectionProvider getDefaultProvider() {
* return false
*/
public boolean acceptsStrategy(HostRole role, String strategy) {
boolean acceptsStrategy = false;
if (connProvider != null) {
connProviderLock.readLock().lock();
try {
if (connProvider != null) {
acceptsStrategy = connProvider.acceptsStrategy(role, strategy);
}
} finally {
connProviderLock.readLock().unlock();
}
final ConnectionProvider tmpCustomConnectionProvider = customConnectionProvider.get();
if (tmpCustomConnectionProvider != null && tmpCustomConnectionProvider.acceptsStrategy(role, strategy)) {
return true;
}

if (!acceptsStrategy) {
acceptsStrategy = defaultProvider.acceptsStrategy(role, strategy);
if (this.effectiveConnProvider != null && this.effectiveConnProvider.acceptsStrategy(role, strategy)) {
return true;
}

return acceptsStrategy;
return this.defaultProvider.acceptsStrategy(role, strategy);
}

/**
Expand All @@ -150,24 +143,27 @@ public boolean acceptsStrategy(HostRole role, String strategy) {
public HostSpec getHostSpecByStrategy(List<HostSpec> hosts, HostRole role, String strategy, Properties props)
throws SQLException, UnsupportedOperationException {
HostSpec host = null;
if (connProvider != null) {
connProviderLock.readLock().lock();
try {
if (connProvider != null && connProvider.acceptsStrategy(role, strategy)) {
host = connProvider.getHostSpecByStrategy(hosts, role, strategy, props);
}
} catch (UnsupportedOperationException e) {
// The custom provider does not support the provided strategy, ignore it and try with the default provider.
} finally {
connProviderLock.readLock().unlock();
final ConnectionProvider tmpCustomConnectionProvider = customConnectionProvider.get();
try {
if (tmpCustomConnectionProvider != null && tmpCustomConnectionProvider.acceptsStrategy(role, strategy)) {
host = tmpCustomConnectionProvider.getHostSpecByStrategy(hosts, role, strategy, props);
}
} catch (UnsupportedOperationException e) {
// The custom provider does not support the provided strategy, ignore it and try with the other providers.
}

if (host == null) {
host = defaultProvider.getHostSpecByStrategy(hosts, role, strategy, props);
if (host != null) {
return host;
}

return host;
if (this.effectiveConnProvider != null && this.effectiveConnProvider.acceptsStrategy(role, strategy)) {
host = this.effectiveConnProvider.getHostSpecByStrategy(hosts, role, strategy, props);
if (host != null) {
return host;
}
}

return this.defaultProvider.getHostSpecByStrategy(hosts, role, strategy, props);
}

/**
Expand All @@ -176,26 +172,16 @@ public HostSpec getHostSpecByStrategy(List<HostSpec> hosts, HostRole role, Strin
* been cleared.
*/
public static void resetProvider() {
if (connProvider != null) {
connProviderLock.writeLock().lock();
connProvider = null;
connProviderLock.writeLock().unlock();
}
customConnectionProvider.set(null);
}

/**
* Releases any resources held by the available {@link ConnectionProvider} instances.
*/
public static void releaseResources() {
if (connProvider != null) {
connProviderLock.writeLock().lock();
try {
if (connProvider instanceof CanReleaseResources) {
((CanReleaseResources) connProvider).releaseResources();
}
} finally {
connProviderLock.writeLock().unlock();
}
final ConnectionProvider tmpCustomConnectionProvider = customConnectionProvider.get();
if (tmpCustomConnectionProvider instanceof CanReleaseResources) {
((CanReleaseResources) tmpCustomConnectionProvider).releaseResources();
}
}

Expand Down
3 changes: 3 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginService.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,11 @@ HostSpec getHostSpecByStrategy(List<HostSpec> hosts, HostRole role, String strat

HostSpecBuilder getHostSpecBuilder();

@Deprecated
ConnectionProvider getConnectionProvider();

boolean isPooledConnectionProvider(HostSpec host, Properties props);

String getDriverProtocol();

Properties getProperties();
Expand Down
11 changes: 11 additions & 0 deletions wrapper/src/main/java/software/amazon/jdbc/PluginServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class PluginServiceImpl implements PluginService, CanReleaseResources,
protected Dialect dialect;
protected TargetDriverDialect targetDriverDialect;
protected @Nullable final ConfigurationProfile configurationProfile;
protected final ConnectionProviderManager connectionProviderManager;

protected final SessionStateService sessionStateService;

Expand Down Expand Up @@ -140,6 +141,9 @@ public PluginServiceImpl(
this.exceptionManager = exceptionManager;
this.dialectProvider = dialectProvider != null ? dialectProvider : new DialectManager(this);
this.targetDriverDialect = targetDriverDialect;
this.connectionProviderManager = new ConnectionProviderManager(
this.pluginManager.getDefaultConnProvider(),
this.pluginManager.getEffectiveConnProvider());

this.sessionStateService = sessionStateService != null
? sessionStateService
Expand Down Expand Up @@ -235,10 +239,17 @@ private HostSpec getWriter(final @NonNull List<HostSpec> hosts) {
}

@Override
@Deprecated
public ConnectionProvider getConnectionProvider() {
return this.pluginManager.defaultConnProvider;
}

public boolean isPooledConnectionProvider(HostSpec host, Properties props) {
final ConnectionProvider connectionProvider =
this.connectionProviderManager.getConnectionProvider(this.driverProtocol, host, props);
return (connectionProvider instanceof PooledConnectionProvider);
}

@Override
public String getDriverProtocol() {
return this.driverProtocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public final class DefaultConnectionPlugin implements ConnectionPlugin {
private static final SqlMethodAnalyzer sqlMethodAnalyzer = new SqlMethodAnalyzer();

private final @NonNull ConnectionProvider defaultConnProvider;
private final @Nullable ConnectionProvider effectiveConnProvider;

private final ConnectionProviderManager connProviderManager;
private final PluginService pluginService;
Expand All @@ -77,7 +76,7 @@ public DefaultConnectionPlugin(
defaultConnProvider,
effectiveConnProvider,
pluginManagerService,
new ConnectionProviderManager(defaultConnProvider));
new ConnectionProviderManager(defaultConnProvider, effectiveConnProvider));
}

public DefaultConnectionPlugin(
Expand All @@ -99,7 +98,6 @@ public DefaultConnectionPlugin(
this.pluginService = pluginService;
this.pluginManagerService = pluginManagerService;
this.defaultConnProvider = defaultConnProvider;
this.effectiveConnProvider = effectiveConnProvider;
this.connProviderManager = connProviderManager;
}

Expand Down Expand Up @@ -173,18 +171,7 @@ public Connection connect(
final JdbcCallable<Connection, SQLException> connectFunc)
throws SQLException {

ConnectionProvider connProvider = null;

if (this.effectiveConnProvider != null) {
if (this.effectiveConnProvider.acceptsUrl(driverProtocol, hostSpec, props)) {
connProvider = this.effectiveConnProvider;
}
}

if (connProvider == null) {
connProvider =
this.connProviderManager.getConnectionProvider(driverProtocol, hostSpec, props);
}
ConnectionProvider connProvider = this.connProviderManager.getConnectionProvider(driverProtocol, hostSpec, props);

// It's guaranteed that this plugin is always the last in plugin chain so connectFunc can be
// ignored.
Expand Down Expand Up @@ -246,9 +233,6 @@ public boolean acceptsStrategy(HostRole role, String strategy) {
return false;
}

if (this.effectiveConnProvider != null) {
return this.effectiveConnProvider.acceptsStrategy(role, strategy);
}
return this.connProviderManager.acceptsStrategy(role, strategy);
}

Expand All @@ -272,10 +256,6 @@ public HostSpec getHostSpecByStrategy(final List<HostSpec> hosts, final HostRole
throw new SQLException(Messages.get("DefaultConnectionPlugin.noHostsAvailable"));
}

if (this.effectiveConnProvider != null) {
return this.effectiveConnProvider.getHostSpecByStrategy(hosts,
role, strategy, this.pluginService.getProperties());
}
return this.connProviderManager.getHostSpecByStrategy(hosts, role, strategy, this.pluginService.getProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
private final PluginService pluginService;
private final Properties properties;
private final String readerSelectorStrategy;
private final ConnectionProviderManager connProviderManager;
private volatile boolean inReadWriteSplit = false;
private HostListProviderService hostListProviderService;
private Connection writerConnection;
Expand All @@ -88,7 +87,6 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin
this.pluginService = pluginService;
this.properties = properties;
this.readerSelectorStrategy = READER_HOST_SELECTOR_STRATEGY.getString(properties);
this.connProviderManager = new ConnectionProviderManager(pluginService.getConnectionProvider());
}

/**
Expand Down Expand Up @@ -271,11 +269,7 @@ private boolean isReader(final @NonNull HostSpec hostSpec) {

private void getNewWriterConnection(final HostSpec writerHostSpec) throws SQLException {
final Connection conn = this.pluginService.connect(writerHostSpec, this.properties);
this.isWriterConnFromInternalPool = this.connProviderManager.getConnectionProvider(
this.pluginService.getDriverProtocol(),
writerHostSpec,
this.properties)
instanceof PooledConnectionProvider;
this.isWriterConnFromInternalPool = this.pluginService.isPooledConnectionProvider(writerHostSpec, this.properties);
setWriterConnection(conn, writerHostSpec);
switchCurrentConnectionTo(this.writerConnection, writerHostSpec);
}
Expand Down Expand Up @@ -502,11 +496,7 @@ private void getNewReaderConnection() throws SQLException {
HostSpec hostSpec = this.pluginService.getHostSpecByStrategy(HostRole.READER, this.readerSelectorStrategy);
try {
conn = this.pluginService.connect(hostSpec, this.properties);
this.isReaderConnFromInternalPool = this.connProviderManager.getConnectionProvider(
this.pluginService.getDriverProtocol(),
hostSpec,
this.properties)
instanceof PooledConnectionProvider;
this.isReaderConnFromInternalPool = this.pluginService.isPooledConnectionProvider(hostSpec, this.properties);
readerHost = hostSpec;
break;
} catch (final SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ public ConnectionProvider getConnectionProvider() {
return null;
}

@Override
public boolean isPooledConnectionProvider(HostSpec host, Properties props) {
return false;
}

@Override
public String getDriverProtocol() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,7 @@ public void testClosePooledReaderConnectionAfterSetReadOnly() throws SQLExceptio
.when(this.mockPluginService).getCurrentHostSpec();
doReturn(mockReaderConn1).when(mockPluginService).connect(readerHostSpec1, null);
when(mockPluginService.getDriverProtocol()).thenReturn("jdbc:postgresql://");

final HikariPooledConnectionProvider connProvider =
new HikariPooledConnectionProvider(
ReadWriteSplittingPluginTest::getHikariConfig,
ReadWriteSplittingPluginTest::getPoolKey
);
when(mockPluginService.getConnectionProvider()).thenReturn(connProvider);
when(mockPluginService.isPooledConnectionProvider(any(), any())).thenReturn(true);

final ReadWriteSplittingPlugin plugin = new ReadWriteSplittingPlugin(
mockPluginService,
Expand All @@ -592,13 +586,7 @@ public void testClosePooledWriterConnectionAfterSetReadOnly() throws SQLExceptio
.when(this.mockPluginService).getCurrentHostSpec();
doReturn(mockWriterConn).when(mockPluginService).connect(writerHostSpec, null);
when(mockPluginService.getDriverProtocol()).thenReturn("jdbc:postgresql://");

final HikariPooledConnectionProvider connProvider =
new HikariPooledConnectionProvider(
ReadWriteSplittingPluginTest::getHikariConfig,
ReadWriteSplittingPluginTest::getPoolKey
);
when(mockPluginService.getConnectionProvider()).thenReturn(connProvider);
when(mockPluginService.isPooledConnectionProvider(any(), any())).thenReturn(true);

final ReadWriteSplittingPlugin plugin = new ReadWriteSplittingPlugin(
mockPluginService,
Expand Down

0 comments on commit 1484f1f

Please sign in to comment.