Skip to content

Commit

Permalink
Introduce updateRoutingTableTimeout option
Browse files Browse the repository at this point in the history
Sets maximum amount of time the driver may wait for routing table acquisition.

This option allows setting API response time expectation. It does not limit the time the driver might need when getting routing table.
  • Loading branch information
injectives committed Jul 14, 2022
1 parent 2b9a2c2 commit 11878c0
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 15 deletions.
33 changes: 33 additions & 0 deletions driver/src/main/java/org/neo4j/driver/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class Config implements Serializable {

private final boolean logLeakedSessions;

private final long updateRoutingTableTimeoutMillis;

private final int maxConnectionPoolSize;

private final long idleTimeBeforeConnectionTest;
Expand All @@ -102,6 +104,7 @@ private Config(ConfigBuilder builder) {
this.logging = builder.logging;
this.logLeakedSessions = builder.logLeakedSessions;

this.updateRoutingTableTimeoutMillis = builder.updateRoutingTableTimeoutMillis;
this.idleTimeBeforeConnectionTest = builder.idleTimeBeforeConnectionTest;
this.maxConnectionLifetimeMillis = builder.maxConnectionLifetimeMillis;
this.maxConnectionPoolSize = builder.maxConnectionPoolSize;
Expand Down Expand Up @@ -137,6 +140,15 @@ public boolean logLeakedSessions() {
return logLeakedSessions;
}

/**
* Returns maximum amount of time the driver may wait for routing table acquisition.
*
* @return the maximum time in milliseconds
*/
public long updateRoutingTableTimeoutMillis() {
return updateRoutingTableTimeoutMillis;
}

/**
* Pooled connections that have been idle in the pool for longer than this timeout
* will be tested before they are used again, to ensure they are still live.
Expand Down Expand Up @@ -257,6 +269,7 @@ public String userAgent() {
public static class ConfigBuilder {
private Logging logging = DEV_NULL_LOGGING;
private boolean logLeakedSessions;
private long updateRoutingTableTimeoutMillis = TimeUnit.SECONDS.toMillis(90);
private int maxConnectionPoolSize = PoolSettings.DEFAULT_MAX_CONNECTION_POOL_SIZE;
private long idleTimeBeforeConnectionTest = PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST;
private long maxConnectionLifetimeMillis = PoolSettings.DEFAULT_MAX_CONNECTION_LIFETIME;
Expand Down Expand Up @@ -310,6 +323,26 @@ public ConfigBuilder withLeakedSessionsLogging() {
return this;
}

/**
* Sets maximum amount of time the driver may wait for routing table acquisition.
* <p>
* This option allows setting API response time expectation. It does not limit the time the driver might need when getting routing table.
* <p>
* Default is 90 seconds.
*
* @param value the maximum time amount
* @param unit the time unit
* @return this builder
*/
public ConfigBuilder withUpdateRoutingTableTimeout(long value, TimeUnit unit) {
var millis = unit.toMillis(value);
if (millis <= 0) {
throw new IllegalArgumentException("The provided value must be at least 1 millisecond.");
}
this.updateRoutingTableTimeoutMillis = millis;
return this;
}

/**
* Pooled connections that have been idle in the pool for longer than this timeout
* will be tested before they are used again, to ensure they are still live.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ protected LoadBalancer createLoadBalancer(
address,
routingSettings,
connectionPool,
config.updateRoutingTableTimeoutMillis(),
eventExecutorGroup,
createClock(),
config.logging(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.DatabaseNameUtil;
Expand All @@ -42,23 +46,27 @@
import org.neo4j.driver.internal.util.Futures;

public class RoutingTableRegistryImpl implements RoutingTableRegistry {
static final String TABLE_ACQUISITION_TIMEOUT_MESSAGE = "Failed to acquire routing table in configured timeout.";
private final ConcurrentMap<DatabaseName, RoutingTableHandler> routingTableHandlers;
private final Map<Principal, CompletionStage<DatabaseName>> principalToDatabaseNameStage;
private final RoutingTableHandlerFactory factory;
private final Logger log;
private final long updateRoutingTableTimeoutMillis;
private final Clock clock;
private final ConnectionPool connectionPool;
private final Rediscovery rediscovery;

public RoutingTableRegistryImpl(
ConnectionPool connectionPool,
Rediscovery rediscovery,
long updateRoutingTableTimeoutMillis,
Clock clock,
Logging logging,
long routingTablePurgeDelayMs) {
this(
new ConcurrentHashMap<>(),
new RoutingTableHandlerFactory(connectionPool, rediscovery, clock, logging, routingTablePurgeDelayMs),
updateRoutingTableTimeoutMillis,
clock,
connectionPool,
rediscovery,
Expand All @@ -68,13 +76,15 @@ public RoutingTableRegistryImpl(
RoutingTableRegistryImpl(
ConcurrentMap<DatabaseName, RoutingTableHandler> routingTableHandlers,
RoutingTableHandlerFactory factory,
long updateRoutingTableTimeoutMillis,
Clock clock,
ConnectionPool connectionPool,
Rediscovery rediscovery,
Logging logging) {
this.factory = factory;
this.routingTableHandlers = routingTableHandlers;
this.principalToDatabaseNameStage = new HashMap<>();
this.updateRoutingTableTimeoutMillis = updateRoutingTableTimeoutMillis;
this.clock = clock;
this.connectionPool = connectionPool;
this.rediscovery = rediscovery;
Expand All @@ -83,14 +93,18 @@ public RoutingTableRegistryImpl(

@Override
public CompletionStage<RoutingTableHandler> ensureRoutingTable(ConnectionContext context) {
return ensureDatabaseNameIsCompleted(context).thenCompose(ctxAndHandler -> {
ConnectionContext completedContext = ctxAndHandler.getContext();
RoutingTableHandler handler = ctxAndHandler.getHandler() != null
? ctxAndHandler.getHandler()
: getOrCreate(Futures.joinNowOrElseThrow(
completedContext.databaseNameFuture(), PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER));
return handler.ensureRoutingTable(completedContext).thenApply(ignored -> handler);
});
return ensureDatabaseNameIsCompleted(context)
.thenCompose(ctxAndHandler -> {
ConnectionContext completedContext = ctxAndHandler.getContext();
RoutingTableHandler handler = ctxAndHandler.getHandler() != null
? ctxAndHandler.getHandler()
: getOrCreate(Futures.joinNowOrElseThrow(
completedContext.databaseNameFuture(), PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER));
return handler.ensureRoutingTable(completedContext).thenApply(ignored -> handler);
})
.toCompletableFuture()
.orTimeout(updateRoutingTableTimeoutMillis, TimeUnit.MILLISECONDS)
.handle(this::handleTimeoutException);
}

private CompletionStage<ConnectionContextAndHandler> ensureDatabaseNameIsCompleted(ConnectionContext context) {
Expand Down Expand Up @@ -190,6 +204,19 @@ public Optional<RoutingTableHandler> getRoutingTableHandler(DatabaseName databas
return Optional.ofNullable(routingTableHandlers.get(databaseName));
}

private RoutingTableHandler handleTimeoutException(RoutingTableHandler handler, Throwable throwable) {
if (throwable != null) {
if (throwable instanceof TimeoutException) {
throw new ServiceUnavailableException(TABLE_ACQUISITION_TIMEOUT_MESSAGE, throwable);
} else if (throwable instanceof RuntimeException runtimeException) {
throw runtimeException;
} else {
throw new CompletionException(throwable);
}
}
return handler;
}

// For tests
public boolean contains(DatabaseName databaseName) {
return routingTableHandlers.containsKey(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public LoadBalancer(
BoltServerAddress initialRouter,
RoutingSettings settings,
ConnectionPool connectionPool,
long updateRoutingTableTimeoutMillis,
EventExecutorGroup eventExecutorGroup,
Clock clock,
Logging logging,
Expand All @@ -88,6 +89,7 @@ public LoadBalancer(
initialRouter, resolver, settings, clock, logging, requireNonNull(domainNameResolver)),
settings,
loadBalancingStrategy,
updateRoutingTableTimeoutMillis,
eventExecutorGroup,
clock,
logging);
Expand All @@ -98,12 +100,14 @@ private LoadBalancer(
Rediscovery rediscovery,
RoutingSettings settings,
LoadBalancingStrategy loadBalancingStrategy,
long updateRoutingTableTimeoutMillis,
EventExecutorGroup eventExecutorGroup,
Clock clock,
Logging logging) {
this(
connectionPool,
createRoutingTables(connectionPool, rediscovery, settings, clock, logging),
createRoutingTables(
connectionPool, rediscovery, settings, updateRoutingTableTimeoutMillis, clock, logging),
rediscovery,
loadBalancingStrategy,
eventExecutorGroup,
Expand Down Expand Up @@ -275,10 +279,16 @@ private static RoutingTableRegistry createRoutingTables(
ConnectionPool connectionPool,
Rediscovery rediscovery,
RoutingSettings settings,
long updateRoutingTableTimeoutMillis,
Clock clock,
Logging logging) {
return new RoutingTableRegistryImpl(
connectionPool, rediscovery, clock, logging, settings.routingTablePurgeDelayMs());
connectionPool,
rediscovery,
updateRoutingTableTimeoutMillis,
clock,
logging,
settings.routingTablePurgeDelayMs());
}

private static Rediscovery createRediscovery(
Expand Down
23 changes: 23 additions & 0 deletions driver/src/test/java/org/neo4j/driver/ConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ void shouldTurnOnLeakedSessionsLogging() {
assertTrue(Config.builder().withLeakedSessionsLogging().build().logLeakedSessions());
}

@Test
void shouldHaveDefaultUpdateRoutingTableTimeout() {
var defaultConfig = Config.defaultConfig();
assertEquals(TimeUnit.SECONDS.toMillis(90), defaultConfig.updateRoutingTableTimeoutMillis());
}

@Test
void shouldSetUpdateRoutingTableTimeout() {
var value = 1;
var config = Config.builder()
.withUpdateRoutingTableTimeout(value, TimeUnit.HOURS)
.build();
assertEquals(TimeUnit.HOURS.toMillis(value), config.updateRoutingTableTimeoutMillis());
}

@Test
void shouldRejectLessThen1Millisecond() {
var builder = Config.builder();
assertThrows(
IllegalArgumentException.class,
() -> builder.withUpdateRoutingTableTimeout(999_999, TimeUnit.NANOSECONDS));
}

@Test
void shouldHaveDefaultConnectionTimeout() {
Config defaultConfig = Config.defaultConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.DatabaseNameUtil.SYSTEM_DATABASE_NAME;
import static org.neo4j.driver.internal.DatabaseNameUtil.database;
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
import static org.neo4j.driver.internal.cluster.RoutingSettings.STALE_ROUTING_TABLE_PURGE_DELAY_MS;
import static org.neo4j.driver.internal.cluster.RoutingTableRegistryImpl.TABLE_ACQUISITION_TIMEOUT_MESSAGE;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.util.ClusterCompositionUtil.A;
import static org.neo4j.driver.internal.util.ClusterCompositionUtil.B;
Expand All @@ -48,13 +52,16 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.async.ImmutableConnectionContext;
Expand Down Expand Up @@ -135,7 +142,7 @@ void shouldReturnFreshRoutingTable(AccessMode mode) throws Throwable {
RoutingTableHandler handler = mockedRoutingTableHandler();
RoutingTableHandlerFactory factory = mockedHandlerFactory(handler);
RoutingTableRegistryImpl routingTables =
new RoutingTableRegistryImpl(map, factory, null, null, null, DEV_NULL_LOGGING);
new RoutingTableRegistryImpl(map, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING);

ImmutableConnectionContext context =
new ImmutableConnectionContext(defaultDatabase(), Collections.emptySet(), mode);
Expand All @@ -155,7 +162,7 @@ void shouldReturnServersInAllRoutingTables() throws Throwable {
map.put(database("Orange"), mockedRoutingTableHandler(E, F, C));
RoutingTableHandlerFactory factory = mockedHandlerFactory();
RoutingTableRegistryImpl routingTables =
new RoutingTableRegistryImpl(map, factory, null, null, null, DEV_NULL_LOGGING);
new RoutingTableRegistryImpl(map, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING);

// When
Set<BoltServerAddress> servers = routingTables.allServers();
Expand Down Expand Up @@ -198,6 +205,26 @@ void shouldRemoveStaleRoutingTableHandlers() throws Throwable {
assertThat(routingTables.allServers(), empty());
}

@Test
void shouldReturnExistingRoutingTableHandlerWhenFreshRoutingTables() throws Throwable {
// Given
var map = new ConcurrentHashMap<DatabaseName, RoutingTableHandler>();
var handler = mock(RoutingTableHandler.class);
given(handler.ensureRoutingTable(any())).willReturn(new CompletableFuture<>());
var database = database("neo4j");
map.put(database, handler);

var factory = mockedHandlerFactory();
var routingTables = new RoutingTableRegistryImpl(map, factory, 250, null, null, null, DEV_NULL_LOGGING);
var context = new ImmutableConnectionContext(database, Collections.emptySet(), AccessMode.READ);

// When & Then
var actual =
assertThrows(ServiceUnavailableException.class, () -> await(routingTables.ensureRoutingTable(context)));
assertEquals(TABLE_ACQUISITION_TIMEOUT_MESSAGE, actual.getMessage());
assertInstanceOf(TimeoutException.class, actual.getCause());
}

private RoutingTableHandler mockedRoutingTableHandler(BoltServerAddress... servers) {
RoutingTableHandler handler = mock(RoutingTableHandler.class);
when(handler.servers()).thenReturn(new HashSet<>(Arrays.asList(servers)));
Expand All @@ -207,7 +234,7 @@ private RoutingTableHandler mockedRoutingTableHandler(BoltServerAddress... serve

private RoutingTableRegistryImpl newRoutingTables(
ConcurrentMap<DatabaseName, RoutingTableHandler> handlers, RoutingTableHandlerFactory factory) {
return new RoutingTableRegistryImpl(handlers, factory, null, null, null, DEV_NULL_LOGGING);
return new RoutingTableRegistryImpl(handlers, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING);
}

private RoutingTableHandlerFactory mockedHandlerFactory(RoutingTableHandler handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private ConnectionPool newConnectionPool() {

private RoutingTableRegistryImpl newRoutingTables(ConnectionPool connectionPool, Rediscovery rediscovery) {
return new RoutingTableRegistryImpl(
connectionPool, rediscovery, clock, logging, STALE_ROUTING_TABLE_PURGE_DELAY_MS);
connectionPool, rediscovery, Long.MAX_VALUE, clock, logging, STALE_ROUTING_TABLE_PURGE_DELAY_MS);
}

private LoadBalancer newLoadBalancer(ConnectionPool connectionPool, RoutingTableRegistry routingTables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public class GetFeatures implements TestkitRequest {
"Detail:DefaultSecurityConfigValueEquality",
"Optimization:ImplicitDefaultArguments",
"Feature:Bolt:Patch:UTC",
"Feature:API:Type.Temporal"));
"Feature:API:Type.Temporal",
"Feature:API:UpdateRoutingTableTimeout"));

private static final Set<String> SYNC_FEATURES = new HashSet<>(Arrays.asList(
"Feature:Bolt:3.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public TestkitResponse process(TestkitState testkitState) {
domainNameResolver = callbackDomainNameResolver(testkitState);
}
Optional.ofNullable(data.userAgent).ifPresent(configBuilder::withUserAgent);
Optional.ofNullable(data.updateRoutingTableTimeoutMs)
.ifPresent(timeout -> configBuilder.withUpdateRoutingTableTimeout(timeout, TimeUnit.MILLISECONDS));
Optional.ofNullable(data.connectionTimeoutMs)
.ifPresent(timeout -> configBuilder.withConnectionTimeout(timeout, TimeUnit.MILLISECONDS));
Optional.ofNullable(data.fetchSize).ifPresent(configBuilder::withFetchSize);
Expand Down Expand Up @@ -278,6 +280,7 @@ public static class NewDriverBody {
private String userAgent;
private boolean resolverRegistered;
private boolean domainNameResolverRegistered;
private Long updateRoutingTableTimeoutMs;
private Long connectionTimeoutMs;
private Integer fetchSize;
private Long maxTxRetryTimeMs;
Expand Down

0 comments on commit 11878c0

Please sign in to comment.