Skip to content

Commit

Permalink
feat: node fastest response time strategy (#755)
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiyvamz authored Nov 28, 2023
1 parent 14d3ae4 commit 6bb6535
Show file tree
Hide file tree
Showing 18 changed files with 791 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import software.amazon.jdbc.plugin.failover.FailoverConnectionPluginFactory;
import software.amazon.jdbc.plugin.readwritesplitting.ReadWriteSplittingPluginFactory;
import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsPluginFactory;
import software.amazon.jdbc.plugin.strategy.fastestresponse.FastestResponseStrategyPluginFactory;
import software.amazon.jdbc.profile.ConfigurationProfile;
import software.amazon.jdbc.profile.DriverConfigurationProfiles;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.SqlState;
import software.amazon.jdbc.util.StringUtils;
Expand Down Expand Up @@ -71,6 +71,7 @@ public class ConnectionPluginChainBuilder {
put("driverMetaData", DriverMetaDataConnectionPluginFactory.class);
put("connectTime", ConnectTimeConnectionPluginFactory.class);
put("dev", DeveloperConnectionPluginFactory.class);
put("fastestResponseStrategy", FastestResponseStrategyPluginFactory.class);
}
};

Expand All @@ -90,9 +91,10 @@ public class ConnectionPluginChainBuilder {
put(ReadWriteSplittingPluginFactory.class, 600);
put(FailoverConnectionPluginFactory.class, 700);
put(HostMonitoringConnectionPluginFactory.class, 800);
put(IamAuthConnectionPluginFactory.class, 900);
put(AwsSecretsManagerConnectionPluginFactory.class, 1000);
put(LogQueryConnectionPluginFactory.class, 1100);
put(FastestResponseStrategyPluginFactory.class, 900);
put(IamAuthConnectionPluginFactory.class, 1000);
put(AwsSecretsManagerConnectionPluginFactory.class, 1100);
put(LogQueryConnectionPluginFactory.class, 1200);
put(ConnectTimeConnectionPluginFactory.class, WEIGHT_RELATIVE_TO_PRIOR_PLUGIN);
put(ExecutionTimeConnectionPluginFactory.class, WEIGHT_RELATIVE_TO_PRIOR_PLUGIN);
put(DeveloperConnectionPluginFactory.class, WEIGHT_RELATIVE_TO_PRIOR_PLUGIN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import software.amazon.jdbc.plugin.failover.FailoverConnectionPlugin;
import software.amazon.jdbc.plugin.readwritesplitting.ReadWriteSplittingPlugin;
import software.amazon.jdbc.plugin.staledns.AuroraStaleDnsPlugin;
import software.amazon.jdbc.plugin.strategy.fastestresponse.FastestResponseStrategyPlugin;
import software.amazon.jdbc.profile.ConfigurationProfile;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.SqlMethodAnalyzer;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class ConnectionPluginManager implements CanReleaseResources, Wrapper {
put(AwsSecretsManagerConnectionPlugin.class, "plugin:awsSecretsManager");
put(AuroraStaleDnsPlugin.class, "plugin:auroraStaleDns");
put(ReadWriteSplittingPlugin.class, "plugin:readWriteSplitting");
put(FastestResponseStrategyPlugin.class, "plugin:fastestResponseStrategy");
put(DefaultConnectionPlugin.class, "plugin:targetDriver");
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public List<HostSpec> refresh(final Connection connection) throws SQLException {
: this.hostListProviderService.getCurrentConnection();

final FetchTopologyResult results = getTopology(currentConnection, false);
LOGGER.finest(() -> Utils.logTopology(results.hosts));
LOGGER.finest(() -> Utils.logTopology(results.hosts, results.isCachedData ? "[From cache] " : ""));

this.hostList = results.hosts;
return Collections.unmodifiableList(hostList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public boolean acceptsStrategy(HostRole role, String strategy) {

@Override
public HostSpec getHostSpecByStrategy(final HostRole role, final String strategy)
throws UnsupportedOperationException {
throws SQLException, UnsupportedOperationException {
throw new UnsupportedOperationException("getHostSpecByStrategy is not supported by this plugin.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin
protected @NonNull Properties properties;
private final @NonNull Supplier<MonitorService> monitorServiceSupplier;
private final @NonNull PluginService pluginService;
private final @NonNull TelemetryFactory telemetryFactory;
private MonitorService monitorService;
private final RdsUtils rdsHelper;
private HostSpec monitoringHostSpec;
Expand Down Expand Up @@ -119,7 +118,6 @@ public HostMonitoringConnectionPlugin(
throw new IllegalArgumentException("monitorServiceSupplier");
}
this.pluginService = pluginService;
this.telemetryFactory = pluginService.getTelemetryFactory();
this.properties = properties;
this.monitorServiceSupplier = monitorServiceSupplier;
this.rdsHelper = rdsHelper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.plugin.strategy.fastestresponse;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.AwsWrapperProperty;
import software.amazon.jdbc.HostRole;
import software.amazon.jdbc.HostSpec;
import software.amazon.jdbc.JdbcCallable;
import software.amazon.jdbc.NodeChangeOptions;
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.PropertyDefinition;
import software.amazon.jdbc.RandomHostSelector;
import software.amazon.jdbc.plugin.AbstractConnectionPlugin;
import software.amazon.jdbc.util.CacheMap;

public class FastestResponseStrategyPlugin extends AbstractConnectionPlugin {

private static final Logger LOGGER =
Logger.getLogger(FastestResponseStrategyPlugin.class.getName());

public static final String FASTEST_RESPONSE_STRATEGY_NAME = "fastestResponse";

private static final Set<String> subscribedMethods =
Collections.unmodifiableSet(new HashSet<String>() {
{
add("notifyNodeListChanged");
add("acceptsStrategy");
add("getHostSpecByStrategy");
}
});

public static final AwsWrapperProperty RESPONSE_MEASUREMENT_INTERVAL_MILLIS =
new AwsWrapperProperty(
"responseMeasurementIntervalMs",
"30000",
"Interval in millis between measuring response time to a database node.");

protected static final CacheMap<String, HostSpec> cachedFastestResponseHostByRole = new CacheMap<>();
protected static final RandomHostSelector randomHostSelector = new RandomHostSelector();

protected final @NonNull PluginService pluginService;
protected final @NonNull Properties properties;
protected final @NonNull HostResponseTimeService hostResponseTimeService;
protected long cacheExpirationNano;

protected List<HostSpec> hosts = new ArrayList<>();

static {
PropertyDefinition.registerPluginProperties(FastestResponseStrategyPlugin.class);
PropertyDefinition.registerPluginProperties("frt-");
}

public FastestResponseStrategyPlugin(final PluginService pluginService, final @NonNull Properties properties) {
this(pluginService,
properties,
new HostResponseTimeServiceImpl(
pluginService,
properties,
RESPONSE_MEASUREMENT_INTERVAL_MILLIS.getInteger(properties)));
}

public FastestResponseStrategyPlugin(
final PluginService pluginService,
final @NonNull Properties properties,
final @NonNull HostResponseTimeService hostResponseTimeService) {

this.pluginService = pluginService;
this.properties = properties;
this.hostResponseTimeService = hostResponseTimeService;
this.cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos(
RESPONSE_MEASUREMENT_INTERVAL_MILLIS.getInteger(this.properties));
}

@Override
public Set<String> getSubscribedMethods() {
return subscribedMethods;
}

@Override
public Connection connect(
final String driverProtocol,
final HostSpec hostSpec,
final Properties props,
final boolean isInitialConnection,
final JdbcCallable<Connection, SQLException> connectFunc)
throws SQLException {

Connection conn = connectFunc.call();
if (isInitialConnection) {
this.hostResponseTimeService.setHosts(this.pluginService.getHosts());
}
return conn;
}

@Override
public Connection forceConnect(
final String driverProtocol,
final HostSpec hostSpec,
final Properties props,
final boolean isInitialConnection,
final JdbcCallable<Connection, SQLException> forceConnectFunc)
throws SQLException {

Connection conn = forceConnectFunc.call();
if (isInitialConnection) {
this.hostResponseTimeService.setHosts(this.pluginService.getHosts());
}
return conn;
}

@Override
public boolean acceptsStrategy(HostRole role, String strategy) {
return FASTEST_RESPONSE_STRATEGY_NAME.equalsIgnoreCase(strategy);
}

@Override
public HostSpec getHostSpecByStrategy(final HostRole role, final String strategy)
throws SQLException, UnsupportedOperationException {

if (!acceptsStrategy(role, strategy)) {
return null;
}

// The cache holds a host with the fastest response time.
// If cache doesn't have a host for a role, it's necessary to find the fastest node in the topology.
final HostSpec fastestResponseHost = cachedFastestResponseHostByRole.get(role.name());

if (fastestResponseHost != null) {
// Found a fastest host. Let find it in the the latest topology.
HostSpec foundHostSpec = this.pluginService.getHosts().stream()
.filter(x -> x.equals(fastestResponseHost))
.findAny()
.orElse(null);

if (foundHostSpec != null) {
// Found a host in the topology.
return foundHostSpec;
}

// It seems that the fastest cached host isn't in the latest topology.
// Let's ignore cached results and find the fastest host.
}

// Cached result isn't available. Need to find the fastest response time host.

final HostSpec calculatedFastestResponseHost = this.pluginService.getHosts().stream()
.filter(x -> role.equals(x.getRole()))
.map(x -> new ResponseTimeTuple(x, this.hostResponseTimeService.getResponseTime(x)))
.sorted(Comparator.comparingInt(x -> x.responseTime))
.map(x -> x.hostSpec)
.findFirst()
.orElse(null);

if (calculatedFastestResponseHost == null) {
// Unable to identify the fastest response host.
// As a last resort, let's use a random host selector.
return randomHostSelector.getHost(this.hosts, role, properties);
}

cachedFastestResponseHostByRole.put(role.name(), calculatedFastestResponseHost, this.cacheExpirationNano);

return calculatedFastestResponseHost;
}

@Override
public void notifyNodeListChanged(final Map<String, EnumSet<NodeChangeOptions>> changes) {
this.hosts = this.pluginService.getHosts();
this.hostResponseTimeService.setHosts(this.hosts);
}

private static class ResponseTimeTuple {
public HostSpec hostSpec;
public int responseTime;

public ResponseTimeTuple(final HostSpec hostSpec, int responseTime) {
this.hostSpec = hostSpec;
this.responseTime = responseTime;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.plugin.strategy.fastestresponse;

import java.util.Properties;
import software.amazon.jdbc.ConnectionPlugin;
import software.amazon.jdbc.ConnectionPluginFactory;
import software.amazon.jdbc.PluginService;

public class FastestResponseStrategyPluginFactory implements ConnectionPluginFactory {

@Override
public ConnectionPlugin getInstance(final PluginService pluginService, final Properties props) {
return new FastestResponseStrategyPlugin(pluginService, props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package software.amazon.jdbc.plugin.strategy.fastestresponse;

import java.util.List;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.HostSpec;

public interface HostResponseTimeService {

/**
* Return a response time in milliseconds to the host.
* Return Integer.MAX_VALUE if response time is not available.
*
* @param hostSpec the host details
* @return response time in milliseconds for a desired host. It should return Integer.MAX_VALUE if
* response time couldn't be measured.
*/
int getResponseTime(final HostSpec hostSpec);

/**
* Provides an updated host list to a service.
*/
void setHosts(final @NonNull List<HostSpec> hosts);
}
Loading

0 comments on commit 6bb6535

Please sign in to comment.