Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid setting ignoreNewTopologyRequestsEndTimeNano on initial connection #1221

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ public boolean forceRefreshHostList(final boolean shouldVerifyWriter, final long
return true;
}
} catch (TimeoutException ex) {
// do nothing
// do nothing.
LOGGER.finest(Messages.get("PluginServiceImpl.forceRefreshTimeout", new Object[]{timeoutMs}));
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class ClusterTopologyMonitorImpl implements ClusterTopologyMonitor {
protected long highRefreshRateEndTimeNano = 0;
protected final Object topologyUpdated = new Object();
protected final AtomicBoolean requestToUpdateTopology = new AtomicBoolean(false);
protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong(0);
protected final AtomicLong ignoreNewTopologyRequestsEndTimeNano = new AtomicLong(-1);
protected final ConcurrentHashMap<String /* host */, Thread> nodeThreads = new ConcurrentHashMap<>();
protected final AtomicBoolean nodeThreadsStop = new AtomicBoolean(false);
protected final AtomicReference<Connection> nodeThreadsWriterConnection = new AtomicReference<>(null);
Expand Down Expand Up @@ -188,6 +188,8 @@ public List<HostSpec> forceRefresh(final boolean shouldVerifyWriter, final long

// Previous failover has just completed. We can use results of it without triggering a new topology update.
List<HostSpec> currentHosts = this.topologyMap.get(this.clusterId);
LOGGER.finest(
Utils.logTopology(currentHosts, Messages.get("ClusterTopologyMonitorImpl.ignoringTopologyRequest")));
if (currentHosts != null) {
return currentHosts;
}
Expand Down Expand Up @@ -229,6 +231,7 @@ protected List<HostSpec> waitTillTopologyGetsUpdated(final long timeoutMs) throw
}

if (timeoutMs == 0) {
LOGGER.finest(Utils.logTopology(currentHosts, Messages.get("ClusterTopologyMonitorImpl.timeoutSetToZero")));
return currentHosts;
}

Expand All @@ -240,6 +243,7 @@ protected List<HostSpec> waitTillTopologyGetsUpdated(final long timeoutMs) throw
this.topologyUpdated.wait(1000);
}
} catch (InterruptedException ex) {
LOGGER.fine(Messages.get("ClusterTopologyMonitorImpl.interrupted"));
Thread.currentThread().interrupt();
return null;
}
Expand Down Expand Up @@ -282,6 +286,7 @@ public void run() {
if (this.isInPanicMode()) {

if (this.nodeThreads.isEmpty()) {
LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.startingNodeMonitoringThreads"));

// start node threads
this.nodeThreadsStop.set(false);
Expand Down Expand Up @@ -309,19 +314,28 @@ public void run() {
// otherwise let's try it again the next round

} else {

// node threads are running
// check if writer is already detected
final Connection writerConnection = this.nodeThreadsWriterConnection.get();
final HostSpec writerConnectionHostSpec = this.nodeThreadsWriterHostSpec.get();
if (writerConnection != null && writerConnectionHostSpec != null) {
LOGGER.finest(
Messages.get(
"ClusterTopologyMonitorImpl.writerPickedUpFromNodeMonitors",
new Object[]{writerConnectionHostSpec}));

this.closeConnection(this.monitoringConnection.get());
this.monitoringConnection.set(writerConnection);
this.writerHostSpec.set(writerConnectionHostSpec);
this.isVerifiedWriterConnection = true;
this.highRefreshRateEndTimeNano = System.nanoTime() + highRefreshPeriodAfterPanicNano;
this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);

// We verify the writer on initial connection and on failover, but we only want to ignore new topology
// requests after failover. To accomplish this, the first time we verify the writer we set the ignore end
// time to 0. Any future writer verifications will set it to a positive value.
if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) {
this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);
}

this.nodeThreadsStop.set(true);
for (Thread thread : this.nodeThreads.values()) {
Expand Down Expand Up @@ -427,7 +441,7 @@ protected Thread getNodeMonitoringThread(final HostSpec hostSpec, final @Nullabl
}

protected List<HostSpec> openAnyConnectionAndUpdateTopology() {

boolean wasWriterVerified = false;
if (this.monitoringConnection.get() == null) {

Connection conn;
Expand All @@ -448,14 +462,22 @@ protected List<HostSpec> openAnyConnectionAndUpdateTopology() {
try {
if (!StringUtils.isNullOrEmpty(this.getWriterNodeId(this.monitoringConnection.get()))) {
this.isVerifiedWriterConnection = true;
wasWriterVerified = true;

if (rdsHelper.isRdsInstance(this.initialHostSpec.getHost())) {
this.writerHostSpec.set(this.initialHostSpec);
LOGGER.finest("writerHostSpec: " + this.writerHostSpec.get().getHost());
LOGGER.finest(
Messages.get(
"ClusterTopologyMonitorImpl.writerMonitoringConnection",
new Object[]{this.writerHostSpec.get().getHost()}));
} else {
final String nodeId = this.getNodeId(this.monitoringConnection.get());
if (!StringUtils.isNullOrEmpty(nodeId)) {
this.writerHostSpec.set(this.createHost(nodeId, true, 0, null));
LOGGER.finest("writerHostSpec: " + this.writerHostSpec.get().getHost());
LOGGER.finest(
Messages.get(
"ClusterTopologyMonitorImpl.writerMonitoringConnection",
new Object[]{this.writerHostSpec.get().getHost()}));
}
}
}
Expand All @@ -471,6 +493,14 @@ protected List<HostSpec> openAnyConnectionAndUpdateTopology() {
}

final List<HostSpec> hosts = this.fetchTopologyAndUpdateCache(this.monitoringConnection.get());
if (wasWriterVerified) {
// We verify the writer on initial connection and on failover, but we only want to ignore new topology
// requests after failover. To accomplish this, the first time we verify the writer we set the ignore end
// time to 0. Any future writer verifications will set it to a positive value.
if (!this.ignoreNewTopologyRequestsEndTimeNano.compareAndSet(-1, 0)) {
this.ignoreNewTopologyRequestsEndTimeNano.set(System.nanoTime() + ignoreTopologyRequestNano);
}
}

if (hosts == null) {
// can't get topology; it might be something's wrong with a connection
Expand Down Expand Up @@ -550,7 +580,7 @@ protected void delay(boolean useHighRefreshRate) throws InterruptedException {
return hosts;
} catch (SQLException ex) {
// do nothing
LOGGER.log(Level.FINEST, "Error fetching topology:", ex);
LOGGER.finest(Messages.get("ClusterTopologyMonitorImpl.errorFetchingTopology", new Object[]{ex}));
}
return null;
}
Expand Down Expand Up @@ -760,7 +790,7 @@ public void run() {
writerId = this.monitor.getWriterNodeId(connection);

} catch (SQLSyntaxErrorException ex) {
LOGGER.severe(() -> Messages.get("ClusterTopologyMonitorImpl.invalidWriterQuery",
LOGGER.severe(() -> Messages.get("NodeMonitoringThread.invalidWriterQuery",
new Object[] {ex.getMessage()}));
throw new RuntimeException(ex);

Expand All @@ -771,21 +801,21 @@ public void run() {

if (!StringUtils.isNullOrEmpty(writerId)) {
// this prevents closing connection in finally block
if (!this.monitor.nodeThreadsWriterConnection
.compareAndSet(null, connection)) {
if (!this.monitor.nodeThreadsWriterConnection.compareAndSet(null, connection)) {
// writer connection is already setup
this.monitor.closeConnection(connection);

} else {
// writer connection is successfully set to writerConnection
LOGGER.fine(Messages.get("NodeMonitoringThread.detectedWriter", new Object[]{writerId}));
// When nodeThreadsWriterConnection and nodeThreadsWriterHostSpec are both set, the topology monitor may
// set ignoreNewTopologyRequestsEndTimeNano, in which case other threads will use the cached topology
// for the ignore duration, so we need to update the topology before setting nodeThreadsWriterHostSpec.
this.monitor.fetchTopologyAndUpdateCache(connection);
this.monitor.nodeThreadsWriterHostSpec.set(hostSpec);
LOGGER.fine("Detected writer: " + writerId);
this.monitor.nodeThreadsStop.set(true);

this.monitor.fetchTopologyAndUpdateCache(connection);
LOGGER.fine(Utils.logTopology(
this.monitor.topologyMap.get(this.monitor.clusterId)));

}

// Setting the connection to null here prevents the finally block
Expand Down Expand Up @@ -816,7 +846,7 @@ public void run() {
} finally {
this.monitor.closeConnection(connection);
final long end = System.nanoTime();
LOGGER.finest(() -> Messages.get("ClusterTopologyMonitorImpl.nodeThreadCompleted",
LOGGER.finest(() -> Messages.get("NodeMonitoringThread.threadCompleted",
new Object[] {TimeUnit.NANOSECONDS.toMillis(end - start)}));
}
}
Expand Down Expand Up @@ -853,7 +883,7 @@ private void readerThreadFetchTopology(final Connection connection, final @Nulla
// writer node has changed
this.writerChanged = true;

LOGGER.fine(() -> Messages.get("ClusterTopologyMonitorImpl.writerNodeChanged",
LOGGER.fine(() -> Messages.get("NodeMonitoringThread.writerNodeChanged",
new Object[] {writerHostSpec.getHost(), latestWriterHostSpec.getHost()}));

// we can update topology cache and notify all waiting threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@
if (!this.pluginService.forceRefreshHostList(true, this.failoverTimeoutMsSetting)) {
// "Unable to establish SQL connection to writer node"
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToWriter"));
LOGGER.severe(Messages.get("Failover.unableToRefreshHostList"));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter"));
}

Expand All @@ -477,14 +477,20 @@
final Properties copyProp = PropertyUtils.copyProperties(this.properties);
copyProp.setProperty(INTERNAL_CONNECT_PROPERTY_NAME, "true");

Connection writerCandidateConn = null;

Check warning on line 480 in wrapper/src/main/java/software/amazon/jdbc/plugin/failover2/FailoverConnectionPlugin.java

View workflow job for this annotation

GitHub Actions / Qodana Community for JVM

Unused assignment

Variable `writerCandidateConn` initializer `null` is redundant
final HostSpec writerCandidate = updatedHosts.stream()
.filter(x -> x.getRole() == HostRole.WRITER)
.findFirst()
.orElse(null);

if (writerCandidate == null) {
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Utils.logTopology(updatedHosts, Messages.get("Failover.noWriterHost")));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter"));
}

List<HostSpec> allowedHosts = this.pluginService.getHosts();
if (writerCandidate != null && !allowedHosts.contains(writerCandidate)) {
if (!allowedHosts.contains(writerCandidate)) {
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.newWriterNotAllowed",
new Object[] {writerCandidate.getHost(), Utils.logTopology(allowedHosts, "")}));
Expand All @@ -493,29 +499,24 @@
new Object[] {writerCandidate.getHost(), Utils.logTopology(allowedHosts, "")}));
}

if (writerCandidate != null) {
try {
writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp);
} catch (SQLException ex) {
// do nothing
}
}

if (writerCandidateConn == null) {
// "Unable to establish SQL connection to writer node"
try {
writerCandidateConn = this.pluginService.connect(writerCandidate, copyProp);
} catch (SQLException ex) {
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToWriter"));
LOGGER.severe(
Messages.get("Failover.exceptionConnectingToWriter", new Object[]{writerCandidate.getHost(), ex}));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter"));
}

if (this.pluginService.getHostRole(writerCandidateConn) != HostRole.WRITER) {
HostRole role = this.pluginService.getHostRole(writerCandidateConn);
if (role != HostRole.WRITER) {
try {
writerCandidateConn.close();
} catch (SQLException ex) {
// do nothing
}
this.failoverWriterFailedCounter.inc();
LOGGER.severe(Messages.get("Failover.unableToConnectToWriter"));
LOGGER.severe(Messages.get("Failover.unexpectedReaderRole", new Object[]{writerCandidate.getHost(), role}));
throw new FailoverFailedSQLException(Messages.get("Failover.unableToConnectToWriter"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,13 @@ ExecutionTimeConnectionPlugin.executionTime=Executed {0} in {1} nanos.
# Failover Connection Plugin
Failover.transactionResolutionUnknownError=Transaction resolution unknown. Please re-configure session state if required and try restarting the transaction.
Failover.connectionChangedError=The active SQL connection has changed due to a connection failure. Please re-configure session state if required.
Failover.exceptionConnectingToWriter=An exception occurred while trying to connect to writer ''{0}''. Exception: {1}
Failover.parameterValue={0}={1}
Failover.unableToConnect=Unable to establish a SQL connection due to an unexpected error.
Failover.unableToConnectToWriter=Unable to establish SQL connection to the writer instance.
Failover.unableToConnectToReader=Unable to establish SQL connection to the reader instance.
Failover.unableToRefreshHostList=Unable to establish SQL connection to the writer instance - the request to update the topology timed out or was unsuccessful.
Failover.unexpectedReaderRole=The writer was identified to be ''{0}'', but querying the instance for its role returned a role of {1}.
Failover.detectedException=Detected an exception while executing a command: {0}
Failover.failoverDisabled=Cluster-aware failover is disabled.
Failover.establishedConnection=Connected to: {0}
Expand All @@ -183,6 +186,7 @@ Failover.startReaderFailover=Starting reader failover procedure.
Failover.invalidNode=Node is no longer available in the topology: {0}
Failover.newWriterNotAllowed=The failover process identified the new writer but the host is not in the list of allowed hosts. New writer host: ''{0}''. Allowed hosts: {1}
Failover.noOperationsAfterConnectionClosed=No operations allowed after connection closed.
Failover.noWriterHost=Unable to find writer in updated host list:
Failover.readerFailoverElapsed=Reader failover elapsed in {0} ms.
Failover.writerFailoverElapsed=Writer failover elapsed in {0} ms.

Expand Down Expand Up @@ -275,7 +279,11 @@ MonitorImpl.stopMonitoringThread=Stop monitoring thread for {0}.

# Monitor Service Impl
MonitorServiceImpl.emptyAliasSet=Empty alias set passed for ''{0}''. Set should not be empty.
MonitorServiceImpl.errorPopulatingAliases=Error occurred while populating aliases: ''{0}''.

NodeMonitoringThread.detectedWriter=Writer detected by node monitoring thread: ''{0}''.
NodeMonitoringThread.invalidWriterQuery=The writer topology query is invalid: {0}
NodeMonitoringThread.threadCompleted=Node monitoring thread completed in {0} ms.
NodeMonitoringThread.writerNodeChanged=Writer node changed from ''{0}'' to node ''{1}''.

OktaAuthPlugin.unableToDetermineRegion=Unable to determine connection region. If you are using a non-standard RDS URL, please set the ''{0}'' property.
OktaAuthPlugin.requiredDependenciesMissing=OktaAuthPlugin requires the 'AWS Java SDK for AWS Secret Token Service' and 'JSoup' dependencies. Both of these dependencies must be registered on the classpath.
Expand All @@ -289,8 +297,7 @@ OktaCredentialsProviderFactory.samlRequestFailed=Okta SAML Assertion request fai
PluginServiceImpl.currentHostNotAllowed=The current host is not in the list of allowed hosts. Current host: ''{0}''. Allowed hosts: {1}
PluginServiceImpl.hostListEmpty=Current host list is empty.
PluginServiceImpl.releaseResources=Releasing resources.
PluginServiceImpl.hostListException=Exception while getting a host list.
PluginServiceImpl.hostAliasNotFound=Can''t find any host by the following aliases: ''{0}''.
PluginServiceImpl.forceRefreshTimeout=A timeout exception occurred after waiting {0}ms for refreshed topology.
PluginServiceImpl.hostsChangelistEmpty=There are no changes in the hosts' availability.
PluginServiceImpl.failedToRetrieveHostPort=Could not retrieve Host:Port for connection.
PluginServiceImpl.nonEmptyAliases=fillAliases called when HostSpec already contains the following aliases: ''{0}''.
Expand Down Expand Up @@ -378,14 +385,18 @@ NodeResponseTimeMonitor.openingConnection=Opening a Response time connection to
NodeResponseTimeMonitor.openedConnection=Opened Response time connection: {0}.

# Monitoring RDS HostList Provider
ClusterTopologyMonitorImpl.startMonitoringThread=Start cluster topology monitoring thread for {0}.
ClusterTopologyMonitorImpl.stopMonitoringThread=Stop cluster topology monitoring thread for {0}.
ClusterTopologyMonitorImpl.exceptionDuringMonitoringStop=Stopping cluster topology monitoring after unhandled exception was thrown in monitoring thread for node {0}.
ClusterTopologyMonitorImpl.startMonitoringThread=Start cluster topology monitoring thread for ''{0}''.
ClusterTopologyMonitorImpl.stopMonitoringThread=Stop cluster topology monitoring thread for ''{0}''.
ClusterTopologyMonitorImpl.exceptionDuringMonitoringStop=Stopping cluster topology monitoring after unhandled exception was thrown in monitoring thread for node ''{0}''.
ClusterTopologyMonitorImpl.invalidQuery=An error occurred while attempting to obtain the topology because the topology query was invalid. Please ensure you are connecting to an Aurora or RDS Db cluster.
ClusterTopologyMonitorImpl.errorGettingNetworkTimeout=An error occurred while getting the connection network timeout: {0}
ClusterTopologyMonitorImpl.invalidTopology=The topology query returned an invalid topology - no writer instance detected.
ClusterTopologyMonitorImpl.invalidWriterQuery=The writer topology query is invalid.
ClusterTopologyMonitorImpl.topologyNotUpdated=Topology hasn't been updated after {0} ms.
ClusterTopologyMonitorImpl.openedMonitoringConnection=Opened monitoring connection to node {0}.
ClusterTopologyMonitorImpl.nodeThreadCompleted=Thread completed in {0} ms.
ClusterTopologyMonitorImpl.writerNodeChanged=Writer node changed from {0} to node {1}.
ClusterTopologyMonitorImpl.openedMonitoringConnection=Opened monitoring connection to node ''{0}''.
ClusterTopologyMonitorImpl.ignoringTopologyRequest=A topology refresh was requested, but the topology was already updated recently. Returning cached hosts:
ClusterTopologyMonitorImpl.timeoutSetToZero=A topology refresh was requested, but the given timeout for the request was 0ms. Returning cached hosts:
ClusterTopologyMonitorImpl.interrupted=The thread was interrupted while waiting for updated topology.
ClusterTopologyMonitorImpl.startingNodeMonitoringThreads=Starting node monitoring threads.
ClusterTopologyMonitorImpl.writerPickedUpFromNodeMonitors=The writer host detected by the node monitors was picked up by the topology monitor: ''{0}''.
ClusterTopologyMonitorImpl.writerMonitoringConnection=The monitoring connection is connected to a writer: ''{0}''.
ClusterTopologyMonitorImpl.errorFetchingTopology=An error occurred while querying for topology: {0}
Loading