diff --git a/driver/src/main/java/org/neo4j/driver/Config.java b/driver/src/main/java/org/neo4j/driver/Config.java index 060b64f871..ded83c2277 100644 --- a/driver/src/main/java/org/neo4j/driver/Config.java +++ b/driver/src/main/java/org/neo4j/driver/Config.java @@ -79,6 +79,8 @@ public class Config implements Serializable { private final boolean logLeakedSessions; + private final long updateRoutingTableTimeoutMillis; + private final int maxConnectionPoolSize; private final long idleTimeBeforeConnectionTest; @@ -102,6 +104,7 @@ private Config(ConfigBuilder builder) { this.logging = builder.logging; this.logLeakedSessions = builder.logLeakedSessions; + this.updateRoutingTableTimeoutMillis = builder.updateRoutingTableTimeoutMillis; this.idleTimeBeforeConnectionTest = builder.idleTimeBeforeConnectionTest; this.maxConnectionLifetimeMillis = builder.maxConnectionLifetimeMillis; this.maxConnectionPoolSize = builder.maxConnectionPoolSize; @@ -137,6 +140,15 @@ public boolean logLeakedSessions() { return logLeakedSessions; } + /** + * Returns maximum amount of time the driver may wait for routing table acquisition. + * + * @return the maximum time in milliseconds + */ + public long updateRoutingTableTimeoutMillis() { + return updateRoutingTableTimeoutMillis; + } + /** * Pooled connections that have been idle in the pool for longer than this timeout * will be tested before they are used again, to ensure they are still live. @@ -257,6 +269,7 @@ public String userAgent() { public static class ConfigBuilder { private Logging logging = DEV_NULL_LOGGING; private boolean logLeakedSessions; + private long updateRoutingTableTimeoutMillis = TimeUnit.SECONDS.toMillis(90); private int maxConnectionPoolSize = PoolSettings.DEFAULT_MAX_CONNECTION_POOL_SIZE; private long idleTimeBeforeConnectionTest = PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST; private long maxConnectionLifetimeMillis = PoolSettings.DEFAULT_MAX_CONNECTION_LIFETIME; @@ -310,6 +323,26 @@ public ConfigBuilder withLeakedSessionsLogging() { return this; } + /** + * Sets maximum amount of time the driver may wait for routing table acquisition. + *

+ * This option allows setting API response time expectation. It does not limit the time the driver might need when getting routing table. + *

+ * Default is 90 seconds. + * + * @param value the maximum time amount + * @param unit the time unit + * @return this builder + */ + public ConfigBuilder withUpdateRoutingTableTimeout(long value, TimeUnit unit) { + var millis = unit.toMillis(value); + if (millis <= 0) { + throw new IllegalArgumentException("The provided value must be at least 1 millisecond."); + } + this.updateRoutingTableTimeoutMillis = millis; + return this; + } + /** * Pooled connections that have been idle in the pool for longer than this timeout * will be tested before they are used again, to ensure they are still live. diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 2e9dc95c1a..a3200f4c93 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -281,6 +281,7 @@ protected LoadBalancer createLoadBalancer( address, routingSettings, connectionPool, + config.updateRoutingTableTimeoutMillis(), eventExecutorGroup, createClock(), config.logging(), diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java index a5cb238bbe..6dc3f21c6a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java @@ -27,12 +27,16 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.DatabaseNameUtil; @@ -42,10 +46,12 @@ import org.neo4j.driver.internal.util.Futures; public class RoutingTableRegistryImpl implements RoutingTableRegistry { + static final String TABLE_ACQUISITION_TIMEOUT_MESSAGE = "Failed to acquire routing table in configured timeout."; private final ConcurrentMap routingTableHandlers; private final Map> principalToDatabaseNameStage; private final RoutingTableHandlerFactory factory; private final Logger log; + private final long updateRoutingTableTimeoutMillis; private final Clock clock; private final ConnectionPool connectionPool; private final Rediscovery rediscovery; @@ -53,12 +59,14 @@ public class RoutingTableRegistryImpl implements RoutingTableRegistry { public RoutingTableRegistryImpl( ConnectionPool connectionPool, Rediscovery rediscovery, + long updateRoutingTableTimeoutMillis, Clock clock, Logging logging, long routingTablePurgeDelayMs) { this( new ConcurrentHashMap<>(), new RoutingTableHandlerFactory(connectionPool, rediscovery, clock, logging, routingTablePurgeDelayMs), + updateRoutingTableTimeoutMillis, clock, connectionPool, rediscovery, @@ -68,6 +76,7 @@ public RoutingTableRegistryImpl( RoutingTableRegistryImpl( ConcurrentMap routingTableHandlers, RoutingTableHandlerFactory factory, + long updateRoutingTableTimeoutMillis, Clock clock, ConnectionPool connectionPool, Rediscovery rediscovery, @@ -75,6 +84,7 @@ public RoutingTableRegistryImpl( this.factory = factory; this.routingTableHandlers = routingTableHandlers; this.principalToDatabaseNameStage = new HashMap<>(); + this.updateRoutingTableTimeoutMillis = updateRoutingTableTimeoutMillis; this.clock = clock; this.connectionPool = connectionPool; this.rediscovery = rediscovery; @@ -83,14 +93,18 @@ public RoutingTableRegistryImpl( @Override public CompletionStage ensureRoutingTable(ConnectionContext context) { - return ensureDatabaseNameIsCompleted(context).thenCompose(ctxAndHandler -> { - ConnectionContext completedContext = ctxAndHandler.getContext(); - RoutingTableHandler handler = ctxAndHandler.getHandler() != null - ? ctxAndHandler.getHandler() - : getOrCreate(Futures.joinNowOrElseThrow( - completedContext.databaseNameFuture(), PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER)); - return handler.ensureRoutingTable(completedContext).thenApply(ignored -> handler); - }); + return ensureDatabaseNameIsCompleted(context) + .thenCompose(ctxAndHandler -> { + ConnectionContext completedContext = ctxAndHandler.getContext(); + RoutingTableHandler handler = ctxAndHandler.getHandler() != null + ? ctxAndHandler.getHandler() + : getOrCreate(Futures.joinNowOrElseThrow( + completedContext.databaseNameFuture(), PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER)); + return handler.ensureRoutingTable(completedContext).thenApply(ignored -> handler); + }) + .toCompletableFuture() + .orTimeout(updateRoutingTableTimeoutMillis, TimeUnit.MILLISECONDS) + .handle(this::handleTimeoutException); } private CompletionStage ensureDatabaseNameIsCompleted(ConnectionContext context) { @@ -190,6 +204,19 @@ public Optional getRoutingTableHandler(DatabaseName databas return Optional.ofNullable(routingTableHandlers.get(databaseName)); } + private RoutingTableHandler handleTimeoutException(RoutingTableHandler handler, Throwable throwable) { + if (throwable != null) { + if (throwable instanceof TimeoutException) { + throw new ServiceUnavailableException(TABLE_ACQUISITION_TIMEOUT_MESSAGE, throwable); + } else if (throwable instanceof RuntimeException runtimeException) { + throw runtimeException; + } else { + throw new CompletionException(throwable); + } + } + return handler; + } + // For tests public boolean contains(DatabaseName databaseName) { return routingTableHandlers.containsKey(databaseName); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 13979f6503..511715454d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -76,6 +76,7 @@ public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connectionPool, + long updateRoutingTableTimeoutMillis, EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging, @@ -88,6 +89,7 @@ public LoadBalancer( initialRouter, resolver, settings, clock, logging, requireNonNull(domainNameResolver)), settings, loadBalancingStrategy, + updateRoutingTableTimeoutMillis, eventExecutorGroup, clock, logging); @@ -98,12 +100,14 @@ private LoadBalancer( Rediscovery rediscovery, RoutingSettings settings, LoadBalancingStrategy loadBalancingStrategy, + long updateRoutingTableTimeoutMillis, EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging) { this( connectionPool, - createRoutingTables(connectionPool, rediscovery, settings, clock, logging), + createRoutingTables( + connectionPool, rediscovery, settings, updateRoutingTableTimeoutMillis, clock, logging), rediscovery, loadBalancingStrategy, eventExecutorGroup, @@ -275,10 +279,16 @@ private static RoutingTableRegistry createRoutingTables( ConnectionPool connectionPool, Rediscovery rediscovery, RoutingSettings settings, + long updateRoutingTableTimeoutMillis, Clock clock, Logging logging) { return new RoutingTableRegistryImpl( - connectionPool, rediscovery, clock, logging, settings.routingTablePurgeDelayMs()); + connectionPool, + rediscovery, + updateRoutingTableTimeoutMillis, + clock, + logging, + settings.routingTablePurgeDelayMs()); } private static Rediscovery createRediscovery( diff --git a/driver/src/test/java/org/neo4j/driver/ConfigTest.java b/driver/src/test/java/org/neo4j/driver/ConfigTest.java index b983d1f67f..8f177a6435 100644 --- a/driver/src/test/java/org/neo4j/driver/ConfigTest.java +++ b/driver/src/test/java/org/neo4j/driver/ConfigTest.java @@ -147,6 +147,29 @@ void shouldTurnOnLeakedSessionsLogging() { assertTrue(Config.builder().withLeakedSessionsLogging().build().logLeakedSessions()); } + @Test + void shouldHaveDefaultUpdateRoutingTableTimeout() { + var defaultConfig = Config.defaultConfig(); + assertEquals(TimeUnit.SECONDS.toMillis(90), defaultConfig.updateRoutingTableTimeoutMillis()); + } + + @Test + void shouldSetUpdateRoutingTableTimeout() { + var value = 1; + var config = Config.builder() + .withUpdateRoutingTableTimeout(value, TimeUnit.HOURS) + .build(); + assertEquals(TimeUnit.HOURS.toMillis(value), config.updateRoutingTableTimeoutMillis()); + } + + @Test + void shouldRejectLessThen1Millisecond() { + var builder = Config.builder(); + assertThrows( + IllegalArgumentException.class, + () -> builder.withUpdateRoutingTableTimeout(999_999, TimeUnit.NANOSECONDS)); + } + @Test void shouldHaveDefaultConnectionTimeout() { Config defaultConfig = Config.defaultConfig(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImplTest.java index e84950c04c..e63c84bd52 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImplTest.java @@ -25,9 +25,12 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -35,6 +38,7 @@ import static org.neo4j.driver.internal.DatabaseNameUtil.database; import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase; import static org.neo4j.driver.internal.cluster.RoutingSettings.STALE_ROUTING_TABLE_PURGE_DELAY_MS; +import static org.neo4j.driver.internal.cluster.RoutingTableRegistryImpl.TABLE_ACQUISITION_TIMEOUT_MESSAGE; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.ClusterCompositionUtil.A; import static org.neo4j.driver.internal.util.ClusterCompositionUtil.B; @@ -48,13 +52,16 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.async.ImmutableConnectionContext; @@ -135,7 +142,7 @@ void shouldReturnFreshRoutingTable(AccessMode mode) throws Throwable { RoutingTableHandler handler = mockedRoutingTableHandler(); RoutingTableHandlerFactory factory = mockedHandlerFactory(handler); RoutingTableRegistryImpl routingTables = - new RoutingTableRegistryImpl(map, factory, null, null, null, DEV_NULL_LOGGING); + new RoutingTableRegistryImpl(map, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING); ImmutableConnectionContext context = new ImmutableConnectionContext(defaultDatabase(), Collections.emptySet(), mode); @@ -155,7 +162,7 @@ void shouldReturnServersInAllRoutingTables() throws Throwable { map.put(database("Orange"), mockedRoutingTableHandler(E, F, C)); RoutingTableHandlerFactory factory = mockedHandlerFactory(); RoutingTableRegistryImpl routingTables = - new RoutingTableRegistryImpl(map, factory, null, null, null, DEV_NULL_LOGGING); + new RoutingTableRegistryImpl(map, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING); // When Set servers = routingTables.allServers(); @@ -198,6 +205,26 @@ void shouldRemoveStaleRoutingTableHandlers() throws Throwable { assertThat(routingTables.allServers(), empty()); } + @Test + void shouldReturnExistingRoutingTableHandlerWhenFreshRoutingTables() throws Throwable { + // Given + var map = new ConcurrentHashMap(); + var handler = mock(RoutingTableHandler.class); + given(handler.ensureRoutingTable(any())).willReturn(new CompletableFuture<>()); + var database = database("neo4j"); + map.put(database, handler); + + var factory = mockedHandlerFactory(); + var routingTables = new RoutingTableRegistryImpl(map, factory, 250, null, null, null, DEV_NULL_LOGGING); + var context = new ImmutableConnectionContext(database, Collections.emptySet(), AccessMode.READ); + + // When & Then + var actual = + assertThrows(ServiceUnavailableException.class, () -> await(routingTables.ensureRoutingTable(context))); + assertEquals(TABLE_ACQUISITION_TIMEOUT_MESSAGE, actual.getMessage()); + assertInstanceOf(TimeoutException.class, actual.getCause()); + } + private RoutingTableHandler mockedRoutingTableHandler(BoltServerAddress... servers) { RoutingTableHandler handler = mock(RoutingTableHandler.class); when(handler.servers()).thenReturn(new HashSet<>(Arrays.asList(servers))); @@ -207,7 +234,7 @@ private RoutingTableHandler mockedRoutingTableHandler(BoltServerAddress... serve private RoutingTableRegistryImpl newRoutingTables( ConcurrentMap handlers, RoutingTableHandlerFactory factory) { - return new RoutingTableRegistryImpl(handlers, factory, null, null, null, DEV_NULL_LOGGING); + return new RoutingTableRegistryImpl(handlers, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING); } private RoutingTableHandlerFactory mockedHandlerFactory(RoutingTableHandler handler) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java index 78597e748a..e92746e689 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java @@ -323,7 +323,7 @@ private ConnectionPool newConnectionPool() { private RoutingTableRegistryImpl newRoutingTables(ConnectionPool connectionPool, Rediscovery rediscovery) { return new RoutingTableRegistryImpl( - connectionPool, rediscovery, clock, logging, STALE_ROUTING_TABLE_PURGE_DELAY_MS); + connectionPool, rediscovery, Long.MAX_VALUE, clock, logging, STALE_ROUTING_TABLE_PURGE_DELAY_MS); } private LoadBalancer newLoadBalancer(ConnectionPool connectionPool, RoutingTableRegistry routingTables) { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 6334b6e0c0..bacda6671f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -57,7 +57,8 @@ public class GetFeatures implements TestkitRequest { "Detail:DefaultSecurityConfigValueEquality", "Optimization:ImplicitDefaultArguments", "Feature:Bolt:Patch:UTC", - "Feature:API:Type.Temporal")); + "Feature:API:Type.Temporal", + "Feature:API:UpdateRoutingTableTimeout")); private static final Set SYNC_FEATURES = new HashSet<>(Arrays.asList( "Feature:Bolt:3.0", diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 7b4bb52afa..b75b2b302c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -101,6 +101,8 @@ public TestkitResponse process(TestkitState testkitState) { domainNameResolver = callbackDomainNameResolver(testkitState); } Optional.ofNullable(data.userAgent).ifPresent(configBuilder::withUserAgent); + Optional.ofNullable(data.updateRoutingTableTimeoutMs) + .ifPresent(timeout -> configBuilder.withUpdateRoutingTableTimeout(timeout, TimeUnit.MILLISECONDS)); Optional.ofNullable(data.connectionTimeoutMs) .ifPresent(timeout -> configBuilder.withConnectionTimeout(timeout, TimeUnit.MILLISECONDS)); Optional.ofNullable(data.fetchSize).ifPresent(configBuilder::withFetchSize); @@ -278,6 +280,7 @@ public static class NewDriverBody { private String userAgent; private boolean resolverRegistered; private boolean domainNameResolverRegistered; + private Long updateRoutingTableTimeoutMs; private Long connectionTimeoutMs; private Integer fetchSize; private Long maxTxRetryTimeMs;