From fece71844f8eafe2fa76d29af62869195bb2541f Mon Sep 17 00:00:00 2001 From: Zhen Date: Thu, 2 Nov 2017 16:56:09 +0100 Subject: [PATCH] Terms: * `ConnectionPools` which is a map from addresses to connection pools * `RoutingTable` which contains addresses for routers, writers, and readers For a routing driver, once an address is removed from the current `RoutingTable`, the address is no longer accessable. a.k.a. no new connection will be created in the corresponding connection pool. When updating routing table, we also need to signal the addresses in `ConnectionPools` to be active if they are newly added into the current routing table or passive if they have already been removed from the routing table. For the pools connected to addresses that have been removed, when a connection is free, the connection should be terminated rather than reused. When there is no connection in the pool, the pool could be safely removed from `ConnectionPools`. So the logic that need to be changed: * When a new `RoutingTable` is available, compute `added_addr = distinct_addr_in(new_routingTable) - distinct_addr_in(pre_routingTable)` `removed_addr = distinct_addr_in(pre_routingTable) - distinct_addr_in(new_routingTable)` * Mark all addresses in set `added_addr` in `ConnectionPools` to be `active` connection pools * Mark all addresses in set `removed_addr` in `ConnectionPools` to be `passive` connection pools * Remove `passive` connection pools if no connection is `inUse` (all connections are idle) * When returning a connection to a `passive` connection pool, terminate the connection directly, [and remove the connection pool if no connections is `InUse`] --- .../internal/cluster/ClusterRoutingTable.java | 28 +++-- .../cluster/RoundRobinAddressSet.java | 60 ++-------- .../cluster/ClusterRoutingTableTest.java | 16 +++ .../internal/cluster/LoadBalancerTest.java | 3 +- .../cluster/RoundRobinAddressSetTest.java | 113 +----------------- 5 files changed, 51 insertions(+), 169 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 5103ca3534..cdbf69962e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -43,8 +43,7 @@ public class ClusterRoutingTable implements RoutingTable public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses ) { this( clock ); - routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet(), - new HashSet() ); + routers.update( new LinkedHashSet<>( asList( routingAddresses ) )); } private ClusterRoutingTable( Clock clock ) @@ -66,16 +65,29 @@ public boolean isStaleFor( AccessMode mode ) mode == AccessMode.WRITE && writers.size() == 0; } + private Set servers() + { + Set servers = new HashSet<>(); + servers.addAll( readers.servers() ); + servers.addAll( writers.servers() ); + servers.addAll( routers.servers() ); + return servers; + } + @Override public synchronized RoutingTableChange update( ClusterComposition cluster ) { expirationTimeout = cluster.expirationTimestamp(); - // todo: what if server is added as reader and removed as writer? we should not treat it as removed - Set added = new HashSet<>(); - Set removed = new HashSet<>(); - readers.update( cluster.readers(), added, removed ); - writers.update( cluster.writers(), added, removed ); - routers.update( cluster.routers(), added, removed ); + Set pre = servers(); + readers.update( cluster.readers() ); + writers.update( cluster.writers() ); + routers.update( cluster.routers() ); + Set cur = servers(); + + Set added = new HashSet<>( cur ); + Set removed = new HashSet<>( pre ); + added.removeAll( pre ); + removed.removeAll( cur ); return new RoutingTableChange( added, removed ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java index ed0a6ac98e..367cdbb55f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java @@ -19,7 +19,7 @@ package org.neo4j.driver.internal.cluster; import java.util.Arrays; -import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +46,11 @@ public BoltServerAddress next() return addresses[next( addresses.length )]; } + public Set servers() + { + return new HashSet<>( Arrays.asList( addresses ) ); + } + int next( int divisor ) { int index = offset.getAndIncrement(); @@ -56,58 +61,9 @@ int next( int divisor ) return index % divisor; } - public synchronized void update( Set addresses, Set added, - Set removed ) + public synchronized void update( Set addresses ) { - BoltServerAddress[] prev = this.addresses; - if ( addresses.isEmpty() ) - { - this.addresses = NONE; - Collections.addAll( removed, prev ); - return; - } - if ( prev.length == 0 ) - { - this.addresses = addresses.toArray( NONE ); - Collections.addAll( added, this.addresses ); - return; - } - BoltServerAddress[] copy = null; - if ( addresses.size() != prev.length ) - { - copy = new BoltServerAddress[addresses.size()]; - } - int j = 0; - for ( int i = 0; i < prev.length; i++ ) - { - if ( addresses.remove( prev[i] ) ) - { - if ( copy != null ) - { - copy[j++] = prev[i]; - } - } - else - { - removed.add( prev[i] ); - if ( copy == null ) - { - copy = new BoltServerAddress[prev.length]; - System.arraycopy( prev, 0, copy, 0, i ); - j = i; - } - } - } - if ( copy == null ) - { - return; - } - for ( BoltServerAddress address : addresses ) - { - copy[j++] = address; - added.add( address ); - } - this.addresses = copy; + this.addresses = addresses.toArray( NONE ); } public synchronized void remove( BoltServerAddress address ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java index 3a883b89bd..099af4159b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -215,4 +215,20 @@ public void shouldReturnCorrectChangeWhenUpdated() assertEquals( 2, change.removed().size() ); assertThat( change.removed(), containsInAnyOrder( A, D ) ); } + + @Test + public void shouldNotRemoveServerIfPreWriterNowReader() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + routingTable.update( createClusterComposition( singletonList( A ), singletonList( B ), singletonList( C ) ) ); + + ClusterComposition newComposition = + createClusterComposition( singletonList( D ), singletonList( E ), singletonList( B ) ); + RoutingTableChange change = routingTable.update( newComposition ); + + assertEquals( 2, change.added().size() ); + assertThat( change.added(), containsInAnyOrder( D, E ) ); + assertEquals( 2, change.removed().size() ); + assertThat( change.removed(), containsInAnyOrder( A, C ) ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 8ed9e6fced..fb117a071b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -341,8 +341,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( RoutingTableChange.EMPTY ); RoundRobinAddressSet addresses = new RoundRobinAddressSet(); - addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ), new HashSet(), - new HashSet() ); + addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) )); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java index 85a8b74566..e89fd7e37f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java @@ -21,7 +21,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -29,7 +28,6 @@ import org.neo4j.driver.internal.net.BoltServerAddress; import static java.util.Arrays.asList; -import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; @@ -57,7 +55,7 @@ public void shouldReturnRoundRobin() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); - set.update( addresses, new HashSet(), new HashSet() ); + set.update( addresses ); // when BoltServerAddress a = set.next(); @@ -85,7 +83,7 @@ public void shouldPreserveOrderWhenAdding() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); List order = new ArrayList<>(); for ( int i = 3 * 4 + 1; i-- > 0; ) @@ -100,7 +98,7 @@ public void shouldPreserveOrderWhenAdding() throws Exception // when servers.add( new BoltServerAddress( "fyr" ) ); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); // then assertEquals( order.get( 1 ), set.next() ); @@ -126,7 +124,7 @@ public void shouldPreserveOrderWhenRemoving() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); List order = new ArrayList<>(); for ( int i = 3 * 2 + 1; i-- > 0; ) @@ -158,7 +156,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception new BoltServerAddress( "two" ), new BoltServerAddress( "tre" ) ) ); RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); List order = new ArrayList<>(); for ( int i = 3 * 2 + 1; i-- > 0; ) @@ -173,7 +171,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception // when servers.remove( order.get( 1 ) ); - set.update( servers, new HashSet(), new HashSet() ); + set.update( servers ); // then assertEquals( order.get( 2 ), set.next() ); @@ -182,106 +180,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception assertEquals( order.get( 0 ), set.next() ); } - @Test - public void shouldRecordRemovedAddressesWhenUpdating() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - Set addresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - set.update( addresses, new HashSet(), new HashSet() ); - - // when - Set removed = new HashSet<>(); - Set newAddresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "fyr" ) ) ); - set.update( newAddresses, new HashSet(), removed ); - - // then - assertEquals( singleton( new BoltServerAddress( "tre" ) ), removed ); - } - @Test - public void shouldRecordRemovedAddressesWhenUpdateIsEmpty() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - Set addresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ) ) ); - set.update( addresses, new HashSet(), new HashSet() ); - - Set update = Collections.emptySet(); - Set removed = new HashSet<>(); - set.update( update, new HashSet(), removed ); - - assertEquals( addresses, removed ); - } - - @Test - public void shouldRecordAddedAddressesWhenUpdatingAnEmptySet() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - Set added1 = new HashSet<>(); - Set addresses1 = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - set.update( addresses1, added1, new HashSet() ); - - assertEquals( addresses1, added1 ); - } - - @Test - public void shouldRecordAddedAddressesWhenUpdating() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - Set addresses1 = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - set.update( addresses1, new HashSet(), new HashSet() ); - - Set added = new HashSet<>(); - Set newAddresses = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "tre" ), - new BoltServerAddress( "four" ) ) ); - set.update( newAddresses, added, new HashSet() ); - - assertEquals( singleton( new BoltServerAddress( "four" ) ), added ); - } - - @Test - public void shouldRecordBothAddedAndRemovedAddressesWhenUpdating() - { - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - Set addresses1 = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "three" ) ) ); - set.update( addresses1, new HashSet(), new HashSet() ); - - Set newAddresses = new HashSet<>( asList( - new BoltServerAddress( "two" ), - new BoltServerAddress( "four" ), - new BoltServerAddress( "five" ) ) ); - - Set added = new HashSet<>(); - Set removed = new HashSet<>(); - set.update( newAddresses, added, removed ); - - assertEquals( - new HashSet<>( asList( new BoltServerAddress( "four" ), new BoltServerAddress( "five" ) ) ), added ); - assertEquals( - new HashSet<>( asList( new BoltServerAddress( "one" ), new BoltServerAddress( "three" ) ) ), removed ); - } @Test public void shouldPreserveOrderEvenWhenIntegerOverflows() throws Exception