Skip to content

Commit

Permalink
Move selection out of NewRoundRobinLoadBalancer and into Host (#2738)
Browse files Browse the repository at this point in the history
Motivation:

The RR load balancers define the selection logic for picking a
member from the Host connection pool.

Modifications:

- Copy the old Host back into the old RoundRobinLoadBalancer,
  bringing it closer to it's original known-good state.
- Move the connection selection logic from NewRoundRobinLoadBalancer
  into Host making Host more of a connection pool and
  NewRoundRobinLoadBalancer less of a connection pool.
  • Loading branch information
bryce-anderson authored Oct 30, 2023
1 parent 231ec64 commit 9718405
Show file tree
Hide file tree
Showing 3 changed files with 586 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.context.api.ContextMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,21 +33,43 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffDeltaJitter;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static java.lang.Math.min;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
import static java.util.stream.Collectors.toList;

final class Host<Addr, C extends LoadBalancedConnection> implements ListenableAsyncCloseable {

/**
* With a relatively small number of connections we can minimize connection creation under moderate concurrency by
* exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per
* selection attempt.
*/
private static final int MIN_RANDOM_SEARCH_SPACE = 64;

/**
* For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for
* trying to locate an available connection when most connections are in use. This increases tail latencies, thus
* after some number of failed attempts it appears to be more beneficial to open a new connection instead.
* <p>
* The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection
* counts, larger connection counts, low connection churn, high connection churn.
*/
private static final float RANDOM_SEARCH_FACTOR = 0.75f;

private static final Object[] EMPTY_ARRAY = new Object[0];
private static final Logger LOGGER = LoggerFactory.getLogger(Host.class);

Expand All @@ -70,13 +94,18 @@ private enum State {
final Addr address;
@Nullable
private final HealthCheckConfig healthCheckConfig;
private final ConnectionFactory<Addr, ? extends C> connectionFactory;
private final int linearSearchSpace;
private final ListenableAsyncCloseable closeable;
private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE;

Host(String lbDescription, Addr address, @Nullable HealthCheckConfig healthCheckConfig) {
Host(String lbDescription, Addr address, ConnectionFactory<Addr, ? extends C> connectionFactory,
int linearSearchSpace, @Nullable HealthCheckConfig healthCheckConfig) {
this.lbDescription = lbDescription;
this.address = address;
this.healthCheckConfig = healthCheckConfig;
this.connectionFactory = connectionFactory;
this.linearSearchSpace = linearSearchSpace;
this.closeable = toAsyncCloseable(graceful ->
graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync));
}
Expand Down Expand Up @@ -140,7 +169,88 @@ void markExpired() {
}
}

void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
@Nullable
C pickConnection(Predicate<C> selector, @Nullable final ContextMap context) {
final Object[] connections = connState.connections;
// Exhaust the linear search space first:
final int linearAttempts = min(connections.length, linearSearchSpace);
for (int j = 0; j < linearAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[j];
if (selector.test(connection)) {
return connection;
}
}
// Try other connections randomly:
if (connections.length > linearAttempts) {
final int diff = connections.length - linearAttempts;
// With small enough search space, attempt number of times equal to number of remaining connections.
// Back off after exploring most of the search space, it gives diminishing returns.
final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff :
(int) (diff * RANDOM_SEARCH_FACTOR);
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int j = 0; j < randomAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)];
if (selector.test(connection)) {
return connection;
}
}
}
// So sad, we didn't find a healthy connection.
return null;
}

Single<C> newConnection(
Predicate<C> selector, final boolean forceNewConnectionAndReserve, @Nullable final ContextMap context) {
// This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here.
// Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver.
Single<? extends C> establishConnection = connectionFactory.newConnection(address, context, null);
if (healthCheckConfig != null) {
// Schedule health check before returning
establishConnection = establishConnection.beforeOnError(t -> markUnhealthy(t));
}
return establishConnection
.flatMap(newCnx -> {
if (forceNewConnectionAndReserve && !newCnx.tryReserve()) {
return newCnx.closeAsync().<C>concat(failed(
Exceptions.StacklessConnectionRejectedException.newInstance(
"Newly created connection " + newCnx + " for " + lbDescription
+ " could not be reserved.",
RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
}

// Invoke the selector before adding the connection to the pool, otherwise, connection can be
// used concurrently and hence a new connection can be rejected by the selector.
if (!selector.test(newCnx)) {
// Failure in selection could be the result of connection factory returning cached connection,
// and not having visibility into max-concurrent-requests, or other threads already selected the
// connection which uses all the max concurrent request count.

// If there is caching Propagate the exception and rely upon retry strategy.
Single<C> failedSingle = failed(Exceptions.StacklessConnectionRejectedException.newInstance(
"Newly created connection " + newCnx + " for " + lbDescription
+ " was rejected by the selection filter.",
RoundRobinLoadBalancer.class, "selectConnection0(...)"));

// Just in case the connection is not closed add it to the host so we don't lose track,
// duplicates will be filtered out.
return (addConnection(newCnx, null) ?
failedSingle : newCnx.closeAsync().concat(failedSingle)).shareContextOnSubscribe();
}
if (addConnection(newCnx, null)) {
return succeeded(newCnx).shareContextOnSubscribe();
}
return newCnx.closeAsync().<C>concat(
failed(Exceptions.StacklessConnectionRejectedException.newInstance(
"Failed to add newly created connection " + newCnx + " for " + toString(),
RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
});
}

private void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
// Marking healthy is called when we need to recover from an unexpected error.
// However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed
// to open connections and entered the UNHEALTHY state before the original thread continues execution here.
Expand All @@ -159,7 +269,7 @@ void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
}
}

void markUnhealthy(final Throwable cause, final ConnectionFactory<Addr, ? extends C> connectionFactory) {
private void markUnhealthy(final Throwable cause) {
assert healthCheckConfig != null;
for (;;) {
ConnState previous = connStateUpdater.get(this);
Expand Down Expand Up @@ -215,7 +325,7 @@ private static boolean isUnhealthy(ConnState connState) {
return HealthCheck.class.equals(connState.state.getClass());
}

boolean addConnection(final C connection, final @Nullable HealthCheck<Addr, C> currentHealthCheck) {
private boolean addConnection(final C connection, final @Nullable HealthCheck<Addr, C> currentHealthCheck) {
int addAttempt = 0;
for (;;) {
final ConnState previous = connStateUpdater.get(this);
Expand Down Expand Up @@ -325,10 +435,6 @@ Map.Entry<Addr, List<C>> asEntry() {
Stream.of(connState.connections).map(conn -> (C) conn).collect(toList()));
}

Object[] connections() {
return connState.connections;
}

@Override
public Completable closeAsync() {
return closeable.closeAsync();
Expand Down
Loading

0 comments on commit 9718405

Please sign in to comment.