Skip to content

Commit

Permalink
Decouple load balancing logic from address set
Browse files Browse the repository at this point in the history
Round-robin load balancing was previously coupled with the address
set implementation and made it hard to use different load balancing
algorithm.

This commit turns RoundRobinAddressSet into a simple concurrent set of
addresses and moves load balancing logic into a separate component that
takes list of available addresses and returns one that should be used
next. This allows easier implementation of new load balancing
algorithms. Rediscovery procedure will now not use load balancing and
query routers in a known order (which is randomized by the database).
  • Loading branch information
lutovich committed Jul 4, 2017
1 parent dbc4912 commit ac73660
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,23 @@

import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.driver.internal.net.BoltServerAddress;

public class RoundRobinAddressSet
public class AddressSet
{
private static final BoltServerAddress[] NONE = {};
private final AtomicInteger offset = new AtomicInteger();
private volatile BoltServerAddress[] addresses = NONE;

public int size()
{
return addresses.length;
}
private volatile BoltServerAddress[] addresses = NONE;

public BoltServerAddress next()
public BoltServerAddress[] toArray()
{
BoltServerAddress[] addresses = this.addresses;
if ( addresses.length == 0 )
{
return null;
}
return addresses[next( addresses.length )];
return addresses;
}

int next( int divisor )
public int size()
{
int index = offset.getAndIncrement();
for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() )
{
offset.compareAndSet( Integer.MIN_VALUE, index % divisor );
}
return index % divisor;
return addresses.length;
}

public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
Expand Down Expand Up @@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address )
@Override
public String toString()
{
return "RoundRobinAddressSet=" + Arrays.toString( addresses );
}

/** breaking encapsulation in order to perform white-box testing of boundary case */
void setOffset( int target )
{
offset.set( target );
return "AddressSet=" + Arrays.toString( addresses );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable

private final Clock clock;
private volatile long expirationTimeout;
private final RoundRobinAddressSet readers;
private final RoundRobinAddressSet writers;
private final RoundRobinAddressSet routers;
private final AddressSet readers;
private final AddressSet writers;
private final AddressSet routers;

public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
{
Expand All @@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock )
this.clock = clock;
this.expirationTimeout = clock.millis() - 1;

this.readers = new RoundRobinAddressSet();
this.writers = new RoundRobinAddressSet();
this.routers = new RoundRobinAddressSet();
this.readers = new AddressSet();
this.writers = new AddressSet();
this.routers = new AddressSet();
}

@Override
Expand Down Expand Up @@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address )
}

@Override
public RoundRobinAddressSet readers()
public AddressSet readers()
{
return readers;
}

@Override
public RoundRobinAddressSet writers()
public AddressSet writers()
{
return writers;
}

@Override
public BoltServerAddress nextRouter()
public AddressSet routers()
{
return routers.next();
}

@Override
public int routerSize()
{
return routers.size();
return routers;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
private final ConnectionPool connections;
private final RoutingTable routingTable;
private final Rediscovery rediscovery;
private final LoadBalancingStrategy loadBalancingStrategy;
private final Logger log;

public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
Expand All @@ -59,6 +60,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
this.connections = connections;
this.routingTable = routingTable;
this.rediscovery = rediscovery;
this.loadBalancingStrategy = new RoundRobinLoadBalancingStrategy();
this.log = log;

refreshRoutingTable();
Expand All @@ -67,7 +69,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
@Override
public PooledConnection acquireConnection( AccessMode mode )
{
RoundRobinAddressSet addressSet = addressSetFor( mode );
AddressSet addressSet = addressSetFor( mode );
PooledConnection connection = acquireConnection( mode, addressSet );
return new RoutingPooledConnection( connection, this, mode );
}
Expand All @@ -90,10 +92,10 @@ public void close() throws Exception
connections.close();
}

private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers )
private PooledConnection acquireConnection( AccessMode mode, AddressSet servers )
{
ensureRouting( mode );
for ( BoltServerAddress address; (address = servers.next()) != null; )
for ( BoltServerAddress address; (address = selectAddress( mode, servers )) != null; )
{
try
{
Expand Down Expand Up @@ -141,7 +143,7 @@ synchronized void refreshRoutingTable()
log.info( "Refreshed routing information. %s", routingTable );
}

private RoundRobinAddressSet addressSetFor( AccessMode mode )
private AddressSet addressSetFor( AccessMode mode )
{
switch ( mode )
{
Expand All @@ -150,7 +152,22 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
case WRITE:
return routingTable.writers();
default:
throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
throw unknownMode( mode );
}
}

private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers )
{
BoltServerAddress[] addresses = servers.toArray();

switch ( mode )
{
case READ:
return loadBalancingStrategy.selectReader( addresses );
case WRITE:
return loadBalancingStrategy.selectWriter( addresses );
default:
throw unknownMode( mode );
}
}

Expand All @@ -161,4 +178,9 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R
new RoutingProcedureClusterCompositionProvider( clock, log, settings );
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) );
}

private static RuntimeException unknownMode( AccessMode mode )
{
return new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import org.neo4j.driver.internal.net.BoltServerAddress;

public interface LoadBalancingStrategy
{
BoltServerAddress selectReader( BoltServerAddress[] knownReaders );

BoltServerAddress selectWriter( BoltServerAddress[] knownWriters );
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
Set<BoltServerAddress> seenServers )
{
int size = routingTable.routerSize();
for ( int i = 0; i < size; i++ )
{
BoltServerAddress address = routingTable.nextRouter();
if ( address == null )
{
break;
}
BoltServerAddress[] addresses = routingTable.routers().toArray();

for ( BoltServerAddress address : addresses )
{
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
if ( composition != null )
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinArrayIndex
{
private final AtomicInteger offset;

RoundRobinArrayIndex()
{
this( 0 );
}

// only for testing
RoundRobinArrayIndex( int initialOffset )
{
this.offset = new AtomicInteger( initialOffset );
}

public int next( int arrayLength )
{
if ( arrayLength == 0 )
{
return -1;
}

int nextOffset;
while ( (nextOffset = offset.getAndIncrement()) < 0 )
{
// overflow, try resetting back to zero
offset.compareAndSet( nextOffset + 1, 0 );
}
return nextOffset % arrayLength;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import org.neo4j.driver.internal.net.BoltServerAddress;

public class RoundRobinLoadBalancingStrategy implements LoadBalancingStrategy
{
private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex();
private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex();

@Override
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
{
return select( knownReaders, readersIndex );
}

@Override
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
{
return select( knownWriters, writersIndex );
}

private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex roundRobinIndex )
{
int length = addresses.length;
if ( length == 0 )
{
return null;
}
int index = roundRobinIndex.next( length );
return addresses[index];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@ public interface RoutingTable

void forget( BoltServerAddress address );

RoundRobinAddressSet readers();
AddressSet readers();

RoundRobinAddressSet writers();
AddressSet writers();

BoltServerAddress nextRouter();

int routerSize();
AddressSet routers();

void removeWriter( BoltServerAddress toRemove );
}
Original file line number Diff line number Diff line change
Expand Up @@ -808,11 +808,11 @@ public void shouldRetryReadTransactionAndPerformRediscoveryUntilSuccess() throws
@Test
public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throws Exception
{
StubServer router1 = StubServer.start( "acquire_endpoints.script", 9010 );
StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9007 );
StubServer router1 = StubServer.start( "discover_servers.script", 9010 );
StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9001 );
StubServer router2 = StubServer.start( "acquire_endpoints.script", 9002 );
StubServer brokenWriter2 = StubServer.start( "dead_write_server.script", 9008 );
StubServer router2 = StubServer.start( "discover_servers.script", 9002 );
StubServer writer = StubServer.start( "write_server.script", 9001 );
StubServer writer = StubServer.start( "write_server.script", 9007 );

try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9010" );
Session session = driver.session() )
Expand All @@ -827,9 +827,9 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw
{
assertEquals( 0, router1.exitStatus() );
assertEquals( 0, brokenWriter1.exitStatus() );
assertEquals( 0, brokenWriter2.exitStatus() );
assertEquals( 0, router2.exitStatus() );
assertEquals( 0, writer.exitStatus() );
assertEquals( 0, brokenWriter2.exitStatus() );
}
}

Expand Down
Loading

0 comments on commit ac73660

Please sign in to comment.