Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into bl_anderson/load_bala…
Browse files Browse the repository at this point in the history
…ncer_observer
  • Loading branch information
bryce-anderson committed Dec 11, 2023
2 parents 244b2df + bad9558 commit 4044694
Show file tree
Hide file tree
Showing 15 changed files with 635 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static java.util.Objects.requireNonNull;
Expand All @@ -27,17 +30,53 @@ abstract class BaseHostSelector<ResolvedAddress, C extends LoadBalancedConnectio
implements HostSelector<ResolvedAddress, C> {

private final String targetResource;
BaseHostSelector(final String targetResource) {
private final List<Host<ResolvedAddress, C>> hosts;
BaseHostSelector(final List<Host<ResolvedAddress, C>> hosts, final String targetResource) {
this.hosts = hosts;
this.targetResource = requireNonNull(targetResource, "targetResource");
}

protected abstract Single<C> selectConnection0(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve);

@Override
public final Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
}

@Override
public final boolean isUnHealthy() {
// TODO: in the future we may want to make this more of a "are at least X hosts available" question
// so that we can compose a group of selectors into a priority set.
return allUnhealthy(hosts);
}

protected final String getTargetResource() {
return targetResource;
}

protected final Single<C> noActiveHosts(List<Host<ResolvedAddress, C>> usedHosts) {
protected final Single<C> noActiveHostsFailure(List<Host<ResolvedAddress, C>> usedHosts) {
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
getTargetResource() + ". Either all are busy, expired, or unhealthy: " + usedHosts,
this.getClass(), "selectConnection(...)"));
}

private Single<C> noHostsFailure() {
return failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
this.getClass(), "selectConnection(...)"));
}

private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
for (Host<ResolvedAddress, C> host : usedHosts) {
if (!host.isUnhealthy()) {
allUnhealthy = false;
break;
}
}
return allUnhealthy;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;

import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
Expand Down Expand Up @@ -155,8 +156,8 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(String targetResource,
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
loadBalancingPolicy.buildSelector(targetResource), connectionFactory, linearSearchSpace,
healthCheckConfig, loadBalancerObserver);
loadBalancingPolicy.buildSelector(Collections.emptyList(), targetResource), connectionFactory,
linearSearchSpace, healthCheckConfig, loadBalancerObserver);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,22 @@

import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Interface abstracting away the method of host selection.
* <p>
* Thread Safety
* Because the HostSelector is used on the hot path some care must be paid to thread safety. The easiest
* pattern to use is to make the internal state effectively immutable and rebuilds (see below) generate new
* immutable instances as necessary. The interface is expected to adhere to the following rules:
*
* <li>The {@link HostSelector#selectConnection(Predicate, ContextMap, boolean)} method will be used
* concurrently with calls to itself as well as calls to {@link HostSelector#rebuildWithHosts(List)}.</li>
* <li>The {@link HostSelector#rebuildWithHosts(List)} will only be called sequentially with respect to itself.</li>
* <p>
* Note that the HostSelector does not own the provided {@link Host}s and therefore should not
* attempt to manage their lifecycle.
*/
interface HostSelector<ResolvedAddress, C extends LoadBalancedConnection> {

Expand All @@ -35,6 +46,27 @@ interface HostSelector<ResolvedAddress, C extends LoadBalancedConnection> {
* This method will be called concurrently with other selectConnection calls and
* hostSetChanged calls and must be thread safe under those conditions.
*/
Single<C> selectConnection(@Nonnull List<Host<ResolvedAddress, C>> hosts, @Nonnull Predicate<C> selector,
@Nullable ContextMap context, boolean forceNewConnectionAndReserve);
Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve);

/**
* Generate another HostSelector using the provided host list.
* <p>
* This method will be called when the host set is updated and provides a way for the
* HostSelector to rebuild any data structures necessary. Note that the method can return
* {@code this} or a new selector depending on the convenience of implementation.
* @param hosts the new list of {@link Host}s the returned selector should choose from.
* @return the next selector that should be used for host selection.
*/
HostSelector<ResolvedAddress, C> rebuildWithHosts(List<Host<ResolvedAddress, C>> hosts);

/**
* Whether the load balancer believes itself to unhealthy for serving traffic.
* <p>
* Note that this is both racy and best effort: just because a {@link HostSelector} is
* unhealthy doesn't guarantee that a request will fail nor does a healthy status indicate
* that this selector is guaranteed to successfully serve a request.
* @return whether the load balancer believes itself unhealthy enough and unlikely to successfully serve traffic.
*/
boolean isUnHealthy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@

import io.servicetalk.client.api.LoadBalancedConnection;

import java.util.List;

/**
* Definition of the selector mechanism used for load balancing.
*/
interface LoadBalancingPolicy<ResolvedAddress, C extends LoadBalancedConnection> {
/**
* The name of the load balancing policy
*
* @return the name of the load balancing policy
*/
String name();

/**
* Construct a {@link HostSelector}.
* @param hosts the set of {@link Host}s to select from.
* @param targetResource the name of the target resource, useful for debugging purposes.
* @return a {@link HostSelector}
*/
HostSelector<ResolvedAddress, C> buildSelector(String targetResource);
HostSelector<ResolvedAddress, C> buildSelector(List<Host<ResolvedAddress, C>> hosts, String targetResource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;

import java.util.List;
import java.util.Random;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -47,8 +48,8 @@ private P2CLoadBalancingPolicy(final int maxEffort, @Nullable final Random rando
}

@Override
public HostSelector<ResolvedAddress, C> buildSelector(String targetResource) {
return new P2CSelector<>(targetResource, maxEffort, random);
public HostSelector<ResolvedAddress, C> buildSelector(List<Host<ResolvedAddress, C>> hosts, String targetResource) {
return new P2CSelector<>(hosts, targetResource, maxEffort, random);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.succeeded;
Expand All @@ -42,19 +41,24 @@ final class P2CSelector<ResolvedAddress, C extends LoadBalancedConnection>
@Nullable
private final Random random;
private final int maxEffort;
private final List<Host<ResolvedAddress, C>> hosts;

P2CSelector(final String targetResource, final int maxEffort, @Nullable final Random random) {
super(targetResource);
P2CSelector(List<Host<ResolvedAddress, C>> hosts,
final String targetResource, final int maxEffort, @Nullable final Random random) {
super(hosts, targetResource);
this.hosts = hosts;
this.maxEffort = maxEffort;
this.random = random;
}

@Override
public Single<C> selectConnection(
@Nonnull List<Host<ResolvedAddress, C>> hosts,
@Nonnull Predicate<C> selector,
@Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
public HostSelector<ResolvedAddress, C> rebuildWithHosts(List<Host<ResolvedAddress, C>> hosts) {
return new P2CSelector<>(hosts, getTargetResource(), maxEffort, random);
}

@Override
protected Single<C> selectConnection0(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
final int size = hosts.size();
switch (size) {
case 0:
Expand All @@ -64,7 +68,7 @@ public Single<C> selectConnection(
case 1:
// There is only a single host, so we don't need to do any of the looping or comparison logic.
Single<C> connection = selectFromHost(hosts.get(0), selector, forceNewConnectionAndReserve, context);
return connection == null ? noActiveHosts(hosts) : connection;
return connection == null ? noActiveHostsFailure(hosts) : connection;
default:
return p2c(size, hosts, getRandom(), selector, forceNewConnectionAndReserve, context);
}
Expand Down Expand Up @@ -104,7 +108,7 @@ private Single<C> p2c(int size, List<Host<ResolvedAddress, C>> hosts, Random ran
// Neither t1 nor t2 yielded a connection. Fall through, potentially for another attempt.
}
// Max effort exhausted. We failed to find a healthy and active host.
return noActiveHosts(hosts);
return noActiveHostsFailure(hosts);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -95,7 +96,8 @@ private <T extends C> LoadBalancer<T> useNewRoundRobinLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher, new RoundRobinSelector<>(targetResource),
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
new RoundRobinSelector<>(Collections.emptyList(), targetResource),
connectionFactory, linearSearchSpace, healthCheckConfig, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.servicetalk.client.api.LoadBalancedConnection;

import java.util.List;

/**
* A round-robin load balancing policy.
*
Expand All @@ -33,8 +35,9 @@ private RoundRobinLoadBalancingPolicy() {
}

@Override
public HostSelector<ResolvedAddress, C> buildSelector(final String targetResource) {
return new RoundRobinSelector<>(targetResource);
public HostSelector<ResolvedAddress, C>
buildSelector(final List<Host<ResolvedAddress, C>> hosts, final String targetResource) {
return new RoundRobinSelector<>(hosts, targetResource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,36 @@
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.succeeded;

final class RoundRobinSelector<ResolvedAddress, C extends LoadBalancedConnection>
extends BaseHostSelector<ResolvedAddress, C> {

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<RoundRobinSelector> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinSelector.class, "index");
private final AtomicInteger index;
private final List<Host<ResolvedAddress, C>> usedHosts;

@SuppressWarnings("unused")
private volatile int index;
RoundRobinSelector(final List<Host<ResolvedAddress, C>> usedHosts, final String targetResource) {
this(new AtomicInteger(), usedHosts, targetResource);
}

RoundRobinSelector(final String targetResource) {
super(targetResource);
private RoundRobinSelector(final AtomicInteger index, final List<Host<ResolvedAddress, C>> usedHosts,
final String targetResource) {
super(usedHosts, targetResource);
this.index = index;
this.usedHosts = usedHosts;
}

@Override
public Single<C> selectConnection(
final List<Host<ResolvedAddress, C>> usedHosts,
protected Single<C> selectConnection0(
final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
// try one loop over hosts and if all are expired, give up
final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size();
final int cursor = (index.getAndIncrement() & Integer.MAX_VALUE) % usedHosts.size();
Host<ResolvedAddress, C> pickedHost = null;
for (int i = 0; i < usedHosts.size(); ++i) {
// for a particular iteration we maintain a local cursor without contention with other requests
Expand All @@ -70,9 +73,14 @@ public Single<C> selectConnection(
}
}
if (pickedHost == null) {
return noActiveHosts(usedHosts);
return noActiveHostsFailure(usedHosts);
}
// We have a host but no connection was selected: create a new one.
return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context);
}

@Override
public HostSelector<ResolvedAddress, C> rebuildWithHosts(@Nonnull List<Host<ResolvedAddress, C>> hosts) {
return new RoundRobinSelector<>(index, hosts, getTargetResource());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,21 @@ interface ExceptionHandler {

private final ExceptionHandler exceptionHandler;
private final AtomicReference<Cell> tail = new AtomicReference<>();
@Nullable
private Thread currentDrainingThread;

SequentialExecutor(final ExceptionHandler exceptionHandler) {
this.exceptionHandler = requireNonNull(exceptionHandler, "exceptionHandler");
}

public boolean isCurrentThreadDraining() {
// Even though `currentDrainingThread` is not a volatile field this is thread safe:
// the only way that `currentDrainingThread` will ever equal this thread, even if
// we get a stale value, is if _this_ thread set it.
// The null check is just an optimization: it's really the second check that matters.
return currentDrainingThread != null && currentDrainingThread == Thread.currentThread();
}

@Override
public void execute(Runnable command) {
// Make sure we propagate async contexts.
Expand All @@ -73,6 +83,8 @@ public void execute(Runnable command) {
}

private void drain(Cell next) {
final Thread thisThread = Thread.currentThread();
currentDrainingThread = thisThread;
for (;;) {
assert next != null;
try {
Expand All @@ -86,10 +98,14 @@ private void drain(Cell next) {
if (n == null) {
// There doesn't seem to be another element linked. See if it was the tail and if so terminate draining.
// Note that a successful CAS established a happens-before relationship with future draining threads.
// Note that we also have to clear the draining thread before the CAS to prevent races.
currentDrainingThread = null;
if (tail.compareAndSet(next, null)) {
break;
}
// next isn't the tail but the link hasn't resolved: we must poll until it does.
// Next isn't the tail but the link hasn't resolved: we must re-set the draining thread and poll until
// it does resolve then we can keep on trucking.
currentDrainingThread = thisThread;
while ((n = next.next) == null) {
// Still not resolved: yield and then try again.
Thread.yield();
Expand Down
Loading

0 comments on commit 4044694

Please sign in to comment.