From 628acaec54669d19a52ec64069807a38079bc59b Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Mon, 18 Oct 2021 14:05:30 +0100 Subject: [PATCH 1/2] Replace AddressSet with standard Java collections This update also prevents unnecessary connection pool closures by keeping `disused` addresses until next routing table update and reusing them. --- .../async/pool/ConnectionPoolImpl.java | 34 +--- .../driver/internal/cluster/AddressSet.java | 113 ----------- .../internal/cluster/ClusterRoutingTable.java | 146 +++++++++----- .../internal/cluster/RediscoveryImpl.java | 4 +- .../driver/internal/cluster/RoutingTable.java | 6 +- .../cluster/loadbalancing/LoadBalancer.java | 18 +- .../neo4j/driver/internal/util/LockUtil.java | 61 ++++++ .../internal/cluster/AddressSetTest.java | 185 ------------------ .../internal/cluster/RediscoveryTest.java | 5 +- .../cluster/RoutingTableHandlerTest.java | 3 +- .../loadbalancing/LoadBalancerTest.java | 33 ++-- .../neo4j/driver/internal/util/Matchers.java | 78 -------- .../messages/requests/GetRoutingTable.java | 12 +- 13 files changed, 200 insertions(+), 498 deletions(-) delete mode 100644 driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java delete mode 100644 driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java index c6f5de94bd..314d38850e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java @@ -32,10 +32,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Supplier; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; @@ -54,6 +52,8 @@ import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener; import static org.neo4j.driver.internal.util.Futures.combineErrors; import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError; +import static org.neo4j.driver.internal.util.LockUtil.executeWithLock; +import static org.neo4j.driver.internal.util.LockUtil.executeWithLockAsync; public class ConnectionPoolImpl implements ConnectionPool { @@ -342,34 +342,4 @@ private CompletableFuture closeAllPools() } ) .toArray( CompletableFuture[]::new ) ); } - - private void executeWithLock( Lock lock, Runnable runnable ) - { - executeWithLock( lock, () -> - { - runnable.run(); - return null; - } ); - } - - private T executeWithLock( Lock lock, Supplier supplier ) - { - lock.lock(); - try - { - return supplier.get(); - } - finally - { - lock.unlock(); - } - } - - private void executeWithLockAsync( Lock lock, Supplier> stageSupplier ) - { - lock.lock(); - CompletableFuture.completedFuture( lock ) - .thenCompose( ignored -> stageSupplier.get() ) - .whenComplete( ( ignored, throwable ) -> lock.unlock() ); - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java deleted file mode 100644 index a08a914ea9..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.cluster; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.Set; - -import org.neo4j.driver.internal.BoltServerAddress; - -public class AddressSet -{ - private static final BoltServerAddress[] NONE = {}; - - private volatile BoltServerAddress[] addresses = NONE; - - public BoltServerAddress[] toArray() - { - return addresses; - } - - public int size() - { - return addresses.length; - } - - /** - * Updates addresses using the provided set. - *

- * It aims to retain existing addresses by checking if they are present in the new set. To benefit from this, the provided set MUST contain specifically - * {@link BoltServerAddress} instances with equal host and connection host values. - * - * @param newAddresses the new address set. - */ - public synchronized void retainAllAndAdd( Set newAddresses ) - { - BoltServerAddress[] addressesArr = new BoltServerAddress[newAddresses.size()]; - int insertionIdx = 0; - for ( BoltServerAddress address : addresses ) - { - BoltServerAddress lookupAddress = - BoltServerAddress.class.equals( address.getClass() ) ? address : new BoltServerAddress( address.host(), address.port() ); - if ( newAddresses.remove( lookupAddress ) ) - { - addressesArr[insertionIdx] = address; - insertionIdx++; - } - } - Iterator addressIterator = newAddresses.iterator(); - for ( ; insertionIdx < addressesArr.length && addressIterator.hasNext(); insertionIdx++ ) - { - addressesArr[insertionIdx] = addressIterator.next(); - } - addresses = addressesArr; - } - - public synchronized void replaceIfPresent( BoltServerAddress oldAddress, BoltServerAddress newAddress ) - { - for ( int i = 0; i < addresses.length; i++ ) - { - if ( addresses[i].equals( oldAddress ) ) - { - addresses[i] = newAddress; - } - } - } - - public synchronized void remove( BoltServerAddress address ) - { - BoltServerAddress[] addresses = this.addresses; - if ( addresses != null ) - { - for ( int i = 0; i < addresses.length; i++ ) - { - if ( addresses[i].equals( address ) ) - { - if ( addresses.length == 1 ) - { - this.addresses = NONE; - return; - } - BoltServerAddress[] copy = new BoltServerAddress[addresses.length - 1]; - System.arraycopy( addresses, 0, copy, 0, i ); - System.arraycopy( addresses, i + 1, copy, i, addresses.length - i - 1 ); - this.addresses = copy; - return; - } - } - } - } - - @Override - public String toString() - { - return "AddressSet=" + Arrays.toString( addresses ); - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 432c17e811..5c28e90357 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -22,6 +22,10 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.neo4j.driver.AccessMode; import org.neo4j.driver.internal.BoltServerAddress; @@ -30,24 +34,27 @@ import static java.lang.String.format; import static java.util.Arrays.asList; +import static org.neo4j.driver.internal.util.LockUtil.executeWithLock; public class ClusterRoutingTable implements RoutingTable { private static final int MIN_ROUTERS = 1; + private final ReadWriteLock tableLock = new ReentrantReadWriteLock(); + private final DatabaseName databaseName; private final Clock clock; - private volatile long expirationTimestamp; - private final AddressSet readers; - private final AddressSet writers; - private final AddressSet routers; + private final Set disused = new LinkedHashSet<>(); - private final DatabaseName databaseName; // specifies the database this routing table is acquired for - private boolean preferInitialRouter; + private long expirationTimestamp; + private boolean preferInitialRouter = true; + private Set readers = Collections.emptySet(); + private Set writers = Collections.emptySet(); + private Set routers = Collections.emptySet(); public ClusterRoutingTable( DatabaseName ofDatabase, Clock clock, BoltServerAddress... routingAddresses ) { this( ofDatabase, clock ); - routers.retainAllAndAdd( new LinkedHashSet<>( asList( routingAddresses ) ) ); + routers = Collections.unmodifiableSet( new LinkedHashSet<>( asList( routingAddresses ) ) ); } private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock ) @@ -55,77 +62,85 @@ private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock ) this.databaseName = ofDatabase; this.clock = clock; this.expirationTimestamp = clock.millis() - 1; - this.preferInitialRouter = true; - - this.readers = new AddressSet(); - this.writers = new AddressSet(); - this.routers = new AddressSet(); } @Override public boolean isStaleFor( AccessMode mode ) { - return expirationTimestamp < clock.millis() || - routers.size() < MIN_ROUTERS || - mode == AccessMode.READ && readers.size() == 0 || - mode == AccessMode.WRITE && writers.size() == 0; + return executeWithLock( tableLock.readLock(), () -> + expirationTimestamp < clock.millis() || + routers.size() < MIN_ROUTERS || + mode == AccessMode.READ && readers.size() == 0 || + mode == AccessMode.WRITE && writers.size() == 0 ); } @Override public boolean hasBeenStaleFor( long extraTime ) { - long totalTime = expirationTimestamp + extraTime; + long totalTime = executeWithLock( tableLock.readLock(), () -> expirationTimestamp ) + extraTime; if ( totalTime < 0 ) { totalTime = Long.MAX_VALUE; } - return totalTime < clock.millis(); + return totalTime < clock.millis(); } @Override - public synchronized void update( ClusterComposition cluster ) + public void update( ClusterComposition cluster ) { - expirationTimestamp = cluster.expirationTimestamp(); - readers.retainAllAndAdd( cluster.readers() ); - writers.retainAllAndAdd( cluster.writers() ); - routers.retainAllAndAdd( cluster.routers() ); - preferInitialRouter = !cluster.hasWriters(); + executeWithLock( tableLock.writeLock(), () -> + { + expirationTimestamp = cluster.expirationTimestamp(); + readers = newWithReusedAddresses( readers, disused, cluster.readers() ); + writers = newWithReusedAddresses( writers, disused, cluster.writers() ); + routers = newWithReusedAddresses( routers, disused, cluster.routers() ); + disused.clear(); + preferInitialRouter = !cluster.hasWriters(); + } ); } @Override - public synchronized void forget( BoltServerAddress address ) + public void forget( BoltServerAddress address ) { - routers.remove( address ); - readers.remove( address ); - writers.remove( address ); + executeWithLock( tableLock.writeLock(), () -> + { + routers = newWithoutAddressIfPresent( routers, address ); + readers = newWithoutAddressIfPresent( readers, address ); + writers = newWithoutAddressIfPresent( writers, address ); + disused.add( address ); + } ); } @Override - public AddressSet readers() + public Set readers() { - return readers; + return executeWithLock( tableLock.readLock(), () -> readers ); } @Override - public AddressSet writers() + public Set writers() { - return writers; + return executeWithLock( tableLock.readLock(), () -> writers ); } @Override - public AddressSet routers() + public Set routers() { - return routers; + return executeWithLock( tableLock.readLock(), () -> routers ); } @Override public Set servers() { - Set servers = new HashSet<>(); - Collections.addAll( servers, readers.toArray() ); - Collections.addAll( servers, writers.toArray() ); - Collections.addAll( servers, routers.toArray() ); - return servers; + return executeWithLock( tableLock.readLock(), () -> + { + Set servers = new HashSet<>(); + servers.addAll( readers ); + servers.addAll( writers ); + servers.addAll( routers ); + servers.addAll( disused ); + return servers; + } ); } @Override @@ -137,31 +152,70 @@ public DatabaseName database() @Override public void forgetWriter( BoltServerAddress toRemove ) { - writers.remove( toRemove ); + executeWithLock( tableLock.writeLock(), () -> + { + writers = newWithoutAddressIfPresent( writers, toRemove ); + disused.add( toRemove ); + } ); } @Override public void replaceRouterIfPresent( BoltServerAddress oldRouter, BoltServerAddress newRouter ) { - routers.replaceIfPresent( oldRouter, newRouter ); + executeWithLock( tableLock.writeLock(), () -> routers = newWithAddressReplacedIfPresent( routers, oldRouter, newRouter ) ); } @Override public boolean preferInitialRouter() { - return preferInitialRouter; + return executeWithLock( tableLock.readLock(), () -> preferInitialRouter ); } @Override public long expirationTimestamp() { - return expirationTimestamp; + return executeWithLock( tableLock.readLock(), () -> expirationTimestamp ); } @Override - public synchronized String toString() + public String toString() + { + return executeWithLock( tableLock.readLock(), () -> + format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'", + expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() ) ); + } + + private Set newWithoutAddressIfPresent( Set addresses, BoltServerAddress addressToSkip ) + { + return newWithAddressReplacedIfPresent( addresses, addressToSkip, null ); + } + + private Set newWithAddressReplacedIfPresent( Set addresses, BoltServerAddress oldAddress, + BoltServerAddress newAddress ) + { + if ( !addresses.contains( oldAddress ) ) + { + return addresses; + } + Stream addressStream = addresses.stream(); + addressStream = newAddress != null + ? addressStream.map( address -> address.equals( oldAddress ) ? newAddress : address ) + : addressStream.filter( address -> !address.equals( oldAddress ) ); + return Collections.unmodifiableSet( (Set) addressStream.collect( Collectors.toCollection( LinkedHashSet::new ) ) ); + } + + private Set newWithReusedAddresses( Set currentAddresses, Set disusedAddresses, + Set newAddresses ) + { + Set result = Stream.concat( currentAddresses.stream(), disusedAddresses.stream() ) + .filter( address -> newAddresses.remove( toBoltServerAddress( address ) ) ) + .collect( Collectors.toCollection( LinkedHashSet::new ) ); + result.addAll( newAddresses ); + return Collections.unmodifiableSet( result ); + } + + private BoltServerAddress toBoltServerAddress( BoltServerAddress address ) { - return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'", - expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() ); + return BoltServerAddress.class.equals( address.getClass() ) ? address : new BoltServerAddress( address.host(), address.port() ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java index 06985c0379..f1ed45d6b9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java @@ -197,10 +197,8 @@ private CompletionStage lookupOnKnownRouters( Ro Set seenServers, Bookmark bookmark, String impersonatedUser, Throwable baseError ) { - BoltServerAddress[] addresses = routingTable.routers().toArray(); - CompletableFuture result = completedWithNull(); - for ( BoltServerAddress address : addresses ) + for ( BoltServerAddress address : routingTable.routers() ) { result = result .thenCompose( diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index dbe58f22ee..28d2d0e780 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -34,11 +34,11 @@ public interface RoutingTable void forget( BoltServerAddress address ); - AddressSet readers(); + Set readers(); - AddressSet writers(); + Set writers(); - AddressSet routers(); + Set routers(); Set servers(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 2b980e666d..593d68686c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -35,7 +36,6 @@ import org.neo4j.driver.internal.DomainNameResolver; import org.neo4j.driver.internal.async.ConnectionContext; import org.neo4j.driver.internal.async.connection.RoutingConnection; -import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.ClusterCompositionProvider; import org.neo4j.driver.internal.cluster.Rediscovery; import org.neo4j.driver.internal.cluster.RediscoveryImpl; @@ -68,6 +68,7 @@ public class LoadBalancer implements ConnectionProvider "Failed to obtain connection towards %s server. Known routing table is: %s"; private static final String CONNECTION_ACQUISITION_ATTEMPT_FAILURE_MESSAGE = "Failed to obtain a connection towards address %s, will try other addresses if available. Complete failure is reported separately from this entry."; + private static final BoltServerAddress[] BOLT_SERVER_ADDRESSES_EMPTY_ARRAY = new BoltServerAddress[0]; private final ConnectionPool connectionPool; private final RoutingTableRegistry routingTables; private final LoadBalancingStrategy loadBalancingStrategy; @@ -193,16 +194,15 @@ private CompletionStage supportsMultiDb( BoltServerAddress address ) private CompletionStage acquire( AccessMode mode, RoutingTable routingTable ) { - AddressSet addresses = addressSet( mode, routingTable ); CompletableFuture result = new CompletableFuture<>(); List attemptExceptions = new ArrayList<>(); - acquire( mode, routingTable, addresses, result, attemptExceptions ); + acquire( mode, routingTable, result, attemptExceptions ); return result; } - private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet addresses, CompletableFuture result, - List attemptErrors ) + private void acquire( AccessMode mode, RoutingTable routingTable, CompletableFuture result, List attemptErrors ) { + Set addresses = addressSet( mode, routingTable ); BoltServerAddress address = selectAddress( mode, addresses ); if ( address == null ) @@ -227,7 +227,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add log.debug( attemptMessage, error ); attemptErrors.add( error ); routingTable.forget( address ); - eventExecutorGroup.next().execute( () -> acquire( mode, routingTable, addresses, result, attemptErrors ) ); + eventExecutorGroup.next().execute( () -> acquire( mode, routingTable, result, attemptErrors ) ); } else { @@ -241,7 +241,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add } ); } - private static AddressSet addressSet( AccessMode mode, RoutingTable routingTable ) + private static Set addressSet( AccessMode mode, RoutingTable routingTable ) { switch ( mode ) { @@ -254,9 +254,9 @@ private static AddressSet addressSet( AccessMode mode, RoutingTable routingTable } } - private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers ) + private BoltServerAddress selectAddress( AccessMode mode, Set servers ) { - BoltServerAddress[] addresses = servers.toArray(); + BoltServerAddress[] addresses = servers.toArray( BOLT_SERVER_ADDRESSES_EMPTY_ARRAY ); switch ( mode ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java new file mode 100644 index 0000000000..f308921ef3 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/util/LockUtil.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +public class LockUtil +{ + public static void executeWithLock( Lock lock, Runnable runnable ) + { + lock.lock(); + try + { + runnable.run(); + } + finally + { + lock.unlock(); + } + } + + public static T executeWithLock( Lock lock, Supplier supplier ) + { + lock.lock(); + try + { + return supplier.get(); + } + finally + { + lock.unlock(); + } + } + + public static void executeWithLockAsync( Lock lock, Supplier> stageSupplier ) + { + lock.lock(); + CompletableFuture.completedFuture( lock ) + .thenCompose( ignored -> stageSupplier.get() ) + .whenComplete( ( ignored, throwable ) -> lock.unlock() ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java deleted file mode 100644 index a91d5676b6..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.cluster; - -import org.junit.jupiter.api.Test; - -import java.net.InetAddress; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Set; - -import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.ResolvedBoltServerAddress; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertSame; - -class AddressSetTest -{ - @Test - void shouldPreserveOrderWhenAdding() throws Exception - { - // given - Set servers = addresses( "one", "two", "tre" ); - - AddressSet set = new AddressSet(); - set.retainAllAndAdd( servers ); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - - // when - servers.add( new BoltServerAddress( "fyr" ) ); - set.retainAllAndAdd( servers ); - - // then - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ), - new BoltServerAddress( "fyr" )}, set.toArray() ); - } - - @Test - void shouldPreserveOrderWhenRemoving() throws Exception - { - // given - Set servers = addresses( "one", "two", "tre" ); - AddressSet set = new AddressSet(); - set.retainAllAndAdd( servers ); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - - // when - set.remove( new BoltServerAddress( "one" ) ); - - // then - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - } - - @Test - void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception - { - // given - Set servers = addresses( "one", "two", "tre" ); - AddressSet set = new AddressSet(); - set.retainAllAndAdd( servers ); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - - // when - servers.remove( new BoltServerAddress( "one" ) ); - set.retainAllAndAdd( servers ); - - // then - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, set.toArray() ); - } - - @Test - void shouldExposeEmptyArrayWhenEmpty() - { - AddressSet addressSet = new AddressSet(); - - BoltServerAddress[] addresses = addressSet.toArray(); - - assertEquals( 0, addresses.length ); - } - - @Test - void shouldExposeCorrectArray() - { - AddressSet addressSet = new AddressSet(); - addressSet.retainAllAndAdd( addresses( "one", "two", "tre" ) ); - - BoltServerAddress[] addresses = addressSet.toArray(); - - assertArrayEquals( new BoltServerAddress[]{ - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" )}, addresses ); - } - - @Test - void shouldHaveSizeZeroWhenEmpty() - { - AddressSet addressSet = new AddressSet(); - - assertEquals( 0, addressSet.size() ); - } - - @Test - void shouldHaveCorrectSize() - { - AddressSet addressSet = new AddressSet(); - addressSet.retainAllAndAdd( addresses( "one", "two" ) ); - - assertEquals( 2, addressSet.size() ); - } - - @Test - void shouldRetainExistingAddresses() - { - AddressSet addressSet = new AddressSet(); - BoltServerAddress address0 = new BoltServerAddress( "node0", 7687 ); - BoltServerAddress address1 = new ResolvedBoltServerAddress( "node1", 7687, new InetAddress[]{InetAddress.getLoopbackAddress()} ); - BoltServerAddress address2 = new BoltServerAddress( "node2", 7687 ); - BoltServerAddress address3 = new BoltServerAddress( "node3", 7687 ); - BoltServerAddress address4 = new BoltServerAddress( "node4", 7687 ); - addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( address0, address1, address2, address3, address4 ) ) ); - - BoltServerAddress sameAddress0 = new BoltServerAddress( "node0", 7687 ); - BoltServerAddress sameAddress1 = new BoltServerAddress( "node1", 7687 ); - BoltServerAddress differentAddress2 = new BoltServerAddress( "different-node2", 7687 ); - BoltServerAddress sameAddress3 = new BoltServerAddress( "node3", 7687 ); - BoltServerAddress sameAddress4 = new BoltServerAddress( "node4", 7687 ); - addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( sameAddress0, sameAddress1, differentAddress2, sameAddress3, sameAddress4 ) ) ); - - assertEquals( 5, addressSet.size() ); - assertSame( addressSet.toArray()[0], address0 ); - assertSame( addressSet.toArray()[1], address1 ); - assertSame( addressSet.toArray()[2], address3 ); - assertSame( addressSet.toArray()[3], address4 ); - assertSame( addressSet.toArray()[4], differentAddress2 ); - } - - private static Set addresses( String... strings ) - { - Set set = new LinkedHashSet<>(); - for ( String string : strings ) - { - set.add( new BoltServerAddress( string ) ); - } - return set; - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index cd1e760849..d86fb075db 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -26,8 +26,10 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; @@ -517,8 +519,7 @@ private static RoutingTable routingTableMock( BoltServerAddress... routers ) private static RoutingTable routingTableMock( boolean preferInitialRouter, BoltServerAddress... routers ) { RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet addressSet = new AddressSet(); - addressSet.retainAllAndAdd( asOrderedSet( routers ) ); + Set addressSet = new LinkedHashSet<>( asOrderedSet( routers ) ); when( routingTable.routers() ).thenReturn( addressSet ); when( routingTable.database() ).thenReturn( defaultDatabase() ); when( routingTable.preferInitialRouter() ).thenReturn( preferInitialRouter ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java index 709b547434..42f50e4758 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java @@ -259,8 +259,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.isStaleFor( mode ) ).thenReturn( true ); - AddressSet addresses = new AddressSet(); - addresses.retainAllAndAdd( new HashSet<>( singletonList( LOCAL_DEFAULT ) ) ); + Set addresses = new LinkedHashSet<>( singletonList( LOCAL_DEFAULT ) ); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); when( routingTable.database() ).thenReturn( defaultDatabase() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index ed07a78d8a..f45023d67f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -27,6 +27,7 @@ import org.mockito.InOrder; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; @@ -43,7 +44,6 @@ import org.neo4j.driver.internal.DatabaseNameUtil; import org.neo4j.driver.internal.async.ConnectionContext; import org.neo4j.driver.internal.async.connection.RoutingConnection; -import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.ClusterComposition; import org.neo4j.driver.internal.cluster.ClusterRoutingTable; import org.neo4j.driver.internal.cluster.Rediscovery; @@ -101,10 +101,8 @@ void returnsCorrectAccessMode( AccessMode mode ) { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet readerAddresses = mock( AddressSet.class ); - AddressSet writerAddresses = mock( AddressSet.class ); - when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} ); - when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{B} ); + Set readerAddresses = new LinkedHashSet<>( Collections.singleton( A ) ); + Set writerAddresses = new LinkedHashSet<>( Collections.singleton( B ) ); when( routingTable.readers() ).thenReturn( readerAddresses ); when( routingTable.writers() ).thenReturn( writerAddresses ); @@ -122,8 +120,7 @@ void returnsCorrectDatabaseName( String databaseName ) { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet writerAddresses = mock( AddressSet.class ); - when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} ); + Set writerAddresses = new LinkedHashSet<>( Collections.singleton( A ) ); when( routingTable.writers() ).thenReturn( writerAddresses ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); @@ -140,15 +137,17 @@ void shouldThrowWhenRediscoveryReturnsNoSuitableServers() { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - when( routingTable.readers() ).thenReturn( new AddressSet() ); - when( routingTable.writers() ).thenReturn( new AddressSet() ); + when( routingTable.readers() ).thenReturn( new LinkedHashSet<>() ); + when( routingTable.writers() ).thenReturn( new LinkedHashSet<>() ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); - SessionExpiredException error1 = assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( READ ) ) ) ); + SessionExpiredException error1 = + assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( READ ) ) ) ); assertThat( error1.getMessage(), startsWith( "Failed to obtain connection towards READ server" ) ); - SessionExpiredException error2 = assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( WRITE ) ) ) ); + SessionExpiredException error2 = + assertThrows( SessionExpiredException.class, () -> await( loadBalancer.acquireConnection( contextWithMode( WRITE ) ) ) ); assertThat( error2.getMessage(), startsWith( "Failed to obtain connection towards WRITE server" ) ); } @@ -162,8 +161,7 @@ void shouldSelectLeastConnectedAddress() when( connectionPool.inUseConnections( C ) ).thenReturn( 0 ); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet readerAddresses = mock( AddressSet.class ); - when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A, B, C} ); + Set readerAddresses = new LinkedHashSet<>( Arrays.asList( A, B, C ) ); when( routingTable.readers() ).thenReturn( readerAddresses ); @@ -187,8 +185,7 @@ void shouldRoundRobinWhenNoActiveConnections() ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet readerAddresses = mock( AddressSet.class ); - when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A, B, C} ); + Set readerAddresses = new LinkedHashSet<>( Arrays.asList( A, B, C ) ); when( routingTable.readers() ).thenReturn( readerAddresses ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); @@ -375,10 +372,8 @@ void expectsCompetedDatabaseNameAfterRoutingTableRegistry( boolean completed ) t { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - AddressSet readerAddresses = mock( AddressSet.class ); - AddressSet writerAddresses = mock( AddressSet.class ); - when( readerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{A} ); - when( writerAddresses.toArray() ).thenReturn( new BoltServerAddress[]{B} ); + Set readerAddresses = new LinkedHashSet<>( Collections.singleton( A ) ); + Set writerAddresses = new LinkedHashSet<>( Collections.singleton( B ) ); when( routingTable.readers() ).thenReturn( readerAddresses ); when( routingTable.writers() ).thenReturn( writerAddresses ); RoutingTableRegistry routingTables = mock( RoutingTableRegistry.class ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java index 75e85df9e4..33c3897f42 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java @@ -32,8 +32,6 @@ import org.neo4j.driver.internal.InternalDriver; import org.neo4j.driver.internal.SessionFactory; import org.neo4j.driver.internal.SessionFactoryImpl; -import org.neo4j.driver.internal.cluster.AddressSet; -import org.neo4j.driver.internal.cluster.RoutingTable; import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.summary.ResultSummary; @@ -44,69 +42,6 @@ private Matchers() { } - public static Matcher containsRouter( final BoltServerAddress address ) - { - return new TypeSafeMatcher() - { - @Override - protected boolean matchesSafely( RoutingTable routingTable ) - { - BoltServerAddress[] addresses = routingTable.routers().toArray(); - - for ( BoltServerAddress currentAddress : addresses ) - { - if ( currentAddress.equals( address ) ) - { - return true; - } - } - return false; - } - - @Override - public void describeTo( Description description ) - { - description.appendText( "routing table that contains router " ).appendValue( address ); - } - }; - } - - public static Matcher containsReader( final BoltServerAddress address ) - { - return new TypeSafeMatcher() - { - @Override - protected boolean matchesSafely( RoutingTable routingTable ) - { - return contains( routingTable.readers(), address ); - } - - @Override - public void describeTo( Description description ) - { - description.appendText( "routing table that contains reader " ).appendValue( address ); - } - }; - } - - public static Matcher containsWriter( final BoltServerAddress address ) - { - return new TypeSafeMatcher() - { - @Override - protected boolean matchesSafely( RoutingTable routingTable ) - { - return contains( routingTable.writers(), address ); - } - - @Override - public void describeTo( Description description ) - { - description.appendText( "routing table that contains writer " ).appendValue( address ); - } - }; - } - public static Matcher directDriver() { return new TypeSafeMatcher() @@ -273,19 +208,6 @@ public void describeTo( Description description ) }; } - private static boolean contains( AddressSet set, BoltServerAddress address ) - { - BoltServerAddress[] addresses = set.toArray(); - for ( BoltServerAddress currentAddress : addresses ) - { - if ( currentAddress.equals( address ) ) - { - return true; - } - } - return false; - } - private static boolean hasConnectionProvider( Driver driver, Class providerClass ) { return extractConnectionProvider( driver, providerClass ) != null; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index 99bea9d694..340962714b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -25,16 +25,16 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; +import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.DatabaseNameUtil; -import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.RoutingTableHandler; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; @@ -42,10 +42,10 @@ @Getter public class GetRoutingTable implements TestkitRequest { - private static final Function> ADDRESSES_TO_STRINGS = - ( addresses ) -> Arrays.stream( addresses.toArray() ) - .map( address -> String.format( "%s:%d", address.host(), address.port() ) ) - .collect( Collectors.toList() ); + private static final Function,List> ADDRESSES_TO_STRINGS = + ( addresses ) -> addresses.stream() + .map( address -> String.format( "%s:%d", address.host(), address.port() ) ) + .collect( Collectors.toList() ); private GetRoutingTableBody data; From c27f322d46848f19229baeb0a43d675404d3b36f Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Tue, 19 Oct 2021 14:09:24 +0100 Subject: [PATCH 2/2] Update reader, writer and router addresses handling and change type --- .../internal/cluster/ClusterRoutingTable.java | 60 ++++++++++--------- .../driver/internal/cluster/RoutingTable.java | 29 ++++++++- .../LeastConnectedLoadBalancingStrategy.java | 22 +++---- .../cluster/loadbalancing/LoadBalancer.java | 9 +-- .../loadbalancing/LoadBalancingStrategy.java | 6 +- .../internal/cluster/RediscoveryTest.java | 6 +- .../cluster/RoutingTableHandlerTest.java | 3 +- ...astConnectedLoadBalancingStrategyTest.java | 51 ++++++++-------- .../loadbalancing/LoadBalancerTest.java | 19 +++--- .../messages/requests/GetRoutingTable.java | 3 +- 10 files changed, 120 insertions(+), 88 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 5c28e90357..15c015d013 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -18,9 +18,10 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -43,18 +44,18 @@ public class ClusterRoutingTable implements RoutingTable private final ReadWriteLock tableLock = new ReentrantReadWriteLock(); private final DatabaseName databaseName; private final Clock clock; - private final Set disused = new LinkedHashSet<>(); + private final Set disused = new HashSet<>(); private long expirationTimestamp; private boolean preferInitialRouter = true; - private Set readers = Collections.emptySet(); - private Set writers = Collections.emptySet(); - private Set routers = Collections.emptySet(); + private List readers = Collections.emptyList(); + private List writers = Collections.emptyList(); + private List routers = Collections.emptyList(); public ClusterRoutingTable( DatabaseName ofDatabase, Clock clock, BoltServerAddress... routingAddresses ) { this( ofDatabase, clock ); - routers = Collections.unmodifiableSet( new LinkedHashSet<>( asList( routingAddresses ) ) ); + routers = Collections.unmodifiableList( asList( routingAddresses ) ); } private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock ) @@ -112,19 +113,19 @@ public void forget( BoltServerAddress address ) } @Override - public Set readers() + public List readers() { return executeWithLock( tableLock.readLock(), () -> readers ); } @Override - public Set writers() + public List writers() { return executeWithLock( tableLock.readLock(), () -> writers ); } @Override - public Set routers() + public List routers() { return executeWithLock( tableLock.readLock(), () -> routers ); } @@ -185,33 +186,38 @@ public String toString() expirationTimestamp, clock.millis(), routers, writers, readers, databaseName.description() ) ); } - private Set newWithoutAddressIfPresent( Set addresses, BoltServerAddress addressToSkip ) + private List newWithoutAddressIfPresent( List addresses, BoltServerAddress addressToSkip ) { - return newWithAddressReplacedIfPresent( addresses, addressToSkip, null ); + List newList = new ArrayList<>( addresses.size() ); + for ( BoltServerAddress address : addresses ) + { + if ( !address.equals( addressToSkip ) ) + { + newList.add( address ); + } + } + return Collections.unmodifiableList( newList ); } - private Set newWithAddressReplacedIfPresent( Set addresses, BoltServerAddress oldAddress, - BoltServerAddress newAddress ) + private List newWithAddressReplacedIfPresent( List addresses, BoltServerAddress oldAddress, + BoltServerAddress newAddress ) { - if ( !addresses.contains( oldAddress ) ) + List newList = new ArrayList<>( addresses.size() ); + for ( BoltServerAddress address : addresses ) { - return addresses; + newList.add( address.equals( oldAddress ) ? newAddress : address ); } - Stream addressStream = addresses.stream(); - addressStream = newAddress != null - ? addressStream.map( address -> address.equals( oldAddress ) ? newAddress : address ) - : addressStream.filter( address -> !address.equals( oldAddress ) ); - return Collections.unmodifiableSet( (Set) addressStream.collect( Collectors.toCollection( LinkedHashSet::new ) ) ); + return Collections.unmodifiableList( newList ); } - private Set newWithReusedAddresses( Set currentAddresses, Set disusedAddresses, - Set newAddresses ) + private List newWithReusedAddresses( List currentAddresses, Set disusedAddresses, + Set newAddresses ) { - Set result = Stream.concat( currentAddresses.stream(), disusedAddresses.stream() ) - .filter( address -> newAddresses.remove( toBoltServerAddress( address ) ) ) - .collect( Collectors.toCollection( LinkedHashSet::new ) ); - result.addAll( newAddresses ); - return Collections.unmodifiableSet( result ); + List newList = Stream.concat( currentAddresses.stream(), disusedAddresses.stream() ) + .filter( address -> newAddresses.remove( toBoltServerAddress( address ) ) ) + .collect( Collectors.toCollection( () -> new ArrayList<>( newAddresses.size() ) ) ); + newList.addAll( newAddresses ); + return Collections.unmodifiableList( newList ); } private BoltServerAddress toBoltServerAddress( BoltServerAddress address ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 28d2d0e780..a7afbece63 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.List; import java.util.Set; import org.neo4j.driver.AccessMode; @@ -34,12 +35,34 @@ public interface RoutingTable void forget( BoltServerAddress address ); - Set readers(); + /** + * Returns an immutable list of reader addresses. + * + * @return the immutable list of reader addresses. + */ + List readers(); - Set writers(); + /** + * Returns an immutable list of writer addresses. + * + * @return the immutable list of write addresses. + */ - Set routers(); + List writers(); + /** + * Returns an immutable list of router addresses. + * + * @return the immutable list of router addresses. + */ + + List routers(); + + /** + * Returns an immutable unordered set of all addresses known by this routing table. This includes all router, reader, writer and disused addresses. + * + * @return the immutable set of all addresses. + */ Set servers(); DatabaseName database(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java index ad632e12b4..cdd32bffd2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategy.java @@ -18,15 +18,17 @@ */ package org.neo4j.driver.internal.cluster.loadbalancing; +import java.util.List; + import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.spi.ConnectionPool; /** - * Load balancing strategy that finds server with least amount of active (checked out of the pool) connections from - * given readers or writers. It finds a start index for iteration in a round-robin fashion. This is done to prevent - * choosing same first address over and over when all addresses have same amount of active connections. + * Load balancing strategy that finds server with the least amount of active (checked out of the pool) connections from given readers or writers. It finds a + * start index for iteration in a round-robin fashion. This is done to prevent choosing same first address over and over when all addresses have the same amount + * of active connections. */ public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy { @@ -43,21 +45,21 @@ public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool, Loggi } @Override - public BoltServerAddress selectReader( BoltServerAddress[] knownReaders ) + public BoltServerAddress selectReader( List knownReaders ) { return select( knownReaders, readersIndex, "reader" ); } @Override - public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ) + public BoltServerAddress selectWriter( List knownWriters ) { return select( knownWriters, writersIndex, "writer" ); } - private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex, - String addressType ) + private BoltServerAddress select( List addresses, RoundRobinArrayIndex addressesIndex, + String addressType ) { - int size = addresses.length; + int size = addresses.size(); if ( size == 0 ) { log.trace( "Unable to select %s, no known addresses given", addressType ); @@ -71,10 +73,10 @@ private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArray BoltServerAddress leastConnectedAddress = null; int leastActiveConnections = Integer.MAX_VALUE; - // iterate over the array to find least connected address + // iterate over the array to find the least connected address do { - BoltServerAddress address = addresses[index]; + BoltServerAddress address = addresses.get( index ); int activeConnections = connectionPool.inUseConnections( address ); if ( activeConnections < leastActiveConnections ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 593d68686c..10d5dc71b6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -202,7 +201,7 @@ private CompletionStage acquire( AccessMode mode, RoutingTable routi private void acquire( AccessMode mode, RoutingTable routingTable, CompletableFuture result, List attemptErrors ) { - Set addresses = addressSet( mode, routingTable ); + List addresses = getAddressesByMode( mode, routingTable ); BoltServerAddress address = selectAddress( mode, addresses ); if ( address == null ) @@ -241,7 +240,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, CompletableFut } ); } - private static Set addressSet( AccessMode mode, RoutingTable routingTable ) + private static List getAddressesByMode( AccessMode mode, RoutingTable routingTable ) { switch ( mode ) { @@ -254,10 +253,8 @@ private static Set addressSet( AccessMode mode, RoutingTable } } - private BoltServerAddress selectAddress( AccessMode mode, Set servers ) + private BoltServerAddress selectAddress( AccessMode mode, List addresses ) { - BoltServerAddress[] addresses = servers.toArray( BOLT_SERVER_ADDRESSES_EMPTY_ARRAY ); - switch ( mode ) { case READ: diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java index dbaecdb08f..d05189e79c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancingStrategy.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.cluster.loadbalancing; +import java.util.List; + import org.neo4j.driver.internal.BoltServerAddress; /** @@ -31,7 +33,7 @@ public interface LoadBalancingStrategy * @param knownReaders array of all known readers. * @return most appropriate reader or {@code null} if it can't be selected. */ - BoltServerAddress selectReader( BoltServerAddress[] knownReaders ); + BoltServerAddress selectReader( List knownReaders ); /** * Select most appropriate write address from the given array of addresses. @@ -39,5 +41,5 @@ public interface LoadBalancingStrategy * @param knownWriters array of all known writers. * @return most appropriate writer or {@code null} if it can't be selected. */ - BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ); + BoltServerAddress selectWriter( List knownWriters ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index d86fb075db..306821beea 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -25,11 +25,10 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; @@ -519,8 +518,7 @@ private static RoutingTable routingTableMock( BoltServerAddress... routers ) private static RoutingTable routingTableMock( boolean preferInitialRouter, BoltServerAddress... routers ) { RoutingTable routingTable = mock( RoutingTable.class ); - Set addressSet = new LinkedHashSet<>( asOrderedSet( routers ) ); - when( routingTable.routers() ).thenReturn( addressSet ); + when( routingTable.routers() ).thenReturn( Arrays.asList( routers ) ); when( routingTable.database() ).thenReturn( defaultDatabase() ); when( routingTable.preferInitialRouter() ).thenReturn( preferInitialRouter ); return routingTable; diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java index 42f50e4758..e732068e40 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionStage; @@ -259,7 +260,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.isStaleFor( mode ) ).thenReturn( true ); - Set addresses = new LinkedHashSet<>( singletonList( LOCAL_DEFAULT ) ); + List addresses = singletonList( LOCAL_DEFAULT ); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); when( routingTable.database() ).thenReturn( defaultDatabase() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java index aacf936c64..8643e8bf7c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LeastConnectedLoadBalancingStrategyTest.java @@ -22,6 +22,9 @@ import org.junit.jupiter.api.Test; import org.mockito.Mock; +import java.util.Arrays; +import java.util.Collections; + import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; import org.neo4j.driver.internal.BoltServerAddress; @@ -53,15 +56,15 @@ void setUp() } @Test - void shouldHandleEmptyReadersArray() + void shouldHandleEmptyReaders() { - assertNull( strategy.selectReader( new BoltServerAddress[0] ) ); + assertNull( strategy.selectReader( Collections.emptyList() ) ); } @Test - void shouldHandleEmptyWritersArray() + void shouldHandleEmptyWriters() { - assertNull( strategy.selectWriter( new BoltServerAddress[0] ) ); + assertNull( strategy.selectWriter( Collections.emptyList() ) ); } @Test @@ -69,7 +72,7 @@ void shouldHandleSingleReaderWithoutActiveConnections() { BoltServerAddress address = new BoltServerAddress( "reader", 9999 ); - assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectReader( Collections.singletonList( address ) ) ); } @Test @@ -77,7 +80,7 @@ void shouldHandleSingleWriterWithoutActiveConnections() { BoltServerAddress address = new BoltServerAddress( "writer", 9999 ); - assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectWriter( Collections.singletonList( address ) ) ); } @Test @@ -86,7 +89,7 @@ void shouldHandleSingleReaderWithActiveConnections() BoltServerAddress address = new BoltServerAddress( "reader", 9999 ); when( connectionPool.inUseConnections( address ) ).thenReturn( 42 ); - assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectReader( Collections.singletonList( address ) ) ); } @Test @@ -95,7 +98,7 @@ void shouldHandleSingleWriterWithActiveConnections() BoltServerAddress address = new BoltServerAddress( "writer", 9999 ); when( connectionPool.inUseConnections( address ) ).thenReturn( 24 ); - assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) ); + assertEquals( address, strategy.selectWriter( Collections.singletonList( address ) ) ); } @Test @@ -109,7 +112,7 @@ void shouldHandleMultipleReadersWithActiveConnections() when( connectionPool.inUseConnections( address2 ) ).thenReturn( 4 ); when( connectionPool.inUseConnections( address3 ) ).thenReturn( 1 ); - assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); + assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); } @Test @@ -126,7 +129,7 @@ void shouldHandleMultipleWritersWithActiveConnections() when( connectionPool.inUseConnections( address4 ) ).thenReturn( 1 ); assertEquals( address3, - strategy.selectWriter( new BoltServerAddress[]{address1, address2, address3, address4} ) ); + strategy.selectWriter( Arrays.asList( address1, address2, address3, address4 ) ) ); } @Test @@ -136,13 +139,13 @@ void shouldReturnDifferentReaderOnEveryInvocationWhenNoActiveConnections() BoltServerAddress address2 = new BoltServerAddress( "reader", 2 ); BoltServerAddress address3 = new BoltServerAddress( "reader", 3 ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); - assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); + assertEquals( address3, strategy.selectReader( Arrays.asList( address1, address2, address3 ) ) ); } @Test @@ -151,11 +154,11 @@ void shouldReturnDifferentWriterOnEveryInvocationWhenNoActiveConnections() BoltServerAddress address1 = new BoltServerAddress( "writer", 1 ); BoltServerAddress address2 = new BoltServerAddress( "writer", 2 ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); - assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); - assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) ); + assertEquals( address1, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); + assertEquals( address2, strategy.selectReader( Arrays.asList( address1, address2 ) ) ); } @Test @@ -167,8 +170,8 @@ void shouldTraceLogWhenNoAddressSelected() LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging ); - strategy.selectReader( new BoltServerAddress[0] ); - strategy.selectWriter( new BoltServerAddress[0] ); + strategy.selectReader( Collections.emptyList() ); + strategy.selectWriter( Collections.emptyList() ); verify( logger ).trace( startsWith( "Unable to select" ), eq( "reader" ) ); verify( logger ).trace( startsWith( "Unable to select" ), eq( "writer" ) ); @@ -185,8 +188,8 @@ void shouldTraceLogSelectedAddress() LoadBalancingStrategy strategy = new LeastConnectedLoadBalancingStrategy( connectionPool, logging ); - strategy.selectReader( new BoltServerAddress[]{A} ); - strategy.selectWriter( new BoltServerAddress[]{A} ); + strategy.selectReader( Collections.singletonList( A ) ); + strategy.selectWriter( Collections.singletonList( A ) ); verify( logger ).trace( startsWith( "Selected" ), eq( "reader" ), eq( A ), eq( 42 ) ); verify( logger ).trace( startsWith( "Selected" ), eq( "writer" ), eq( A ), eq( 42 ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index f45023d67f..8b174ad2c4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -101,8 +102,8 @@ void returnsCorrectAccessMode( AccessMode mode ) { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - Set readerAddresses = new LinkedHashSet<>( Collections.singleton( A ) ); - Set writerAddresses = new LinkedHashSet<>( Collections.singleton( B ) ); + List readerAddresses = Collections.singletonList( A ); + List writerAddresses = Collections.singletonList( B ); when( routingTable.readers() ).thenReturn( readerAddresses ); when( routingTable.writers() ).thenReturn( writerAddresses ); @@ -120,7 +121,7 @@ void returnsCorrectDatabaseName( String databaseName ) { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - Set writerAddresses = new LinkedHashSet<>( Collections.singleton( A ) ); + List writerAddresses = Collections.singletonList( A ); when( routingTable.writers() ).thenReturn( writerAddresses ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); @@ -137,8 +138,8 @@ void shouldThrowWhenRediscoveryReturnsNoSuitableServers() { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - when( routingTable.readers() ).thenReturn( new LinkedHashSet<>() ); - when( routingTable.writers() ).thenReturn( new LinkedHashSet<>() ); + when( routingTable.readers() ).thenReturn( Collections.emptyList() ); + when( routingTable.writers() ).thenReturn( Collections.emptyList() ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); @@ -161,7 +162,7 @@ void shouldSelectLeastConnectedAddress() when( connectionPool.inUseConnections( C ) ).thenReturn( 0 ); RoutingTable routingTable = mock( RoutingTable.class ); - Set readerAddresses = new LinkedHashSet<>( Arrays.asList( A, B, C ) ); + List readerAddresses = Arrays.asList( A, B, C ); when( routingTable.readers() ).thenReturn( readerAddresses ); @@ -185,7 +186,7 @@ void shouldRoundRobinWhenNoActiveConnections() ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - Set readerAddresses = new LinkedHashSet<>( Arrays.asList( A, B, C ) ); + List readerAddresses = Arrays.asList( A, B, C ); when( routingTable.readers() ).thenReturn( readerAddresses ); LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable ); @@ -372,8 +373,8 @@ void expectsCompetedDatabaseNameAfterRoutingTableRegistry( boolean completed ) t { ConnectionPool connectionPool = newConnectionPoolMock(); RoutingTable routingTable = mock( RoutingTable.class ); - Set readerAddresses = new LinkedHashSet<>( Collections.singleton( A ) ); - Set writerAddresses = new LinkedHashSet<>( Collections.singleton( B ) ); + List readerAddresses = Collections.singletonList( A ); + List writerAddresses = Collections.singletonList( B ); when( routingTable.readers() ).thenReturn( readerAddresses ); when( routingTable.writers() ).thenReturn( writerAddresses ); RoutingTableRegistry routingTables = mock( RoutingTableRegistry.class ); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index 340962714b..0be682d5e3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -26,7 +26,6 @@ import reactor.core.publisher.Mono; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; @@ -42,7 +41,7 @@ @Getter public class GetRoutingTable implements TestkitRequest { - private static final Function,List> ADDRESSES_TO_STRINGS = + private static final Function,List> ADDRESSES_TO_STRINGS = ( addresses ) -> addresses.stream() .map( address -> String.format( "%s:%d", address.host(), address.port() ) ) .collect( Collectors.toList() );