Skip to content

Commit

Permalink
loadbalancer-experimental: change host set update events (#3142)
Browse files Browse the repository at this point in the history
Motivation:

As discussed in #3140, we need to adjust some of the LoadBalancerObserver
callbacks to make the events more clear and also preserve important order
characteristics.

Modifications:

- Remove the size parameters from the `onServiceDiscoveryEvent` method as
  that required us to either precompute the size change or emit the event
  later than host observer creation events.
- Change the `onHostSetChanged` method to take two collections: one reflecting
  the previous set and the new set, both after manipulations where done such
  as prioritization and subsetting.
  • Loading branch information
bryce-anderson authored Dec 18, 2024
1 parent d9f375a commit e49939f
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,22 @@ public HostObserver hostObserver(Object resolvedAddress) {
}

@Override
public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> events, int oldHostSetSize,
int newHostSetSize) {
LOGGER.debug("{}- onServiceDiscoveryEvent(events: {}, oldHostSetSize: {}, newHostSetSize: {})",
lbDescription, events, oldHostSetSize, newHostSetSize);
public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> events) {
LOGGER.debug("{}- onServiceDiscoveryEvent(events: {}, count {})", lbDescription, events, events.size());
}

@Override
public void onHostSetChanged(Collection<? extends Host> newHosts) {
public void onHostsUpdate(Collection<? extends Host> oldHosts, Collection<? extends Host> newHosts) {
if (LOGGER.isDebugEnabled()) {
int healthyCount = 0;
for (Host host : newHosts) {
if (host.isHealthy()) {
healthyCount++;
}
}
LOGGER.debug("{}- onHostSetChanged(host set size: {}, healthy: {}). New hosts: {}", lbDescription,
newHosts.size(), healthyCount, newHosts);
LOGGER.debug("{}- onHostsUpdate(old hosts: {}, new hosts: {}), old host count: {}, new host count: {}, " +
"new healthy count: {}",
lbDescription, oldHosts, newHosts, oldHosts.size(), newHosts.size(), healthyCount);
}
}

Expand All @@ -70,8 +69,9 @@ public void onNoAvailableHostException(final NoAvailableHostException exception)
}

@Override
public void onNoActiveHostException(int hostSetSize, NoActiveHostException exception) {
LOGGER.debug("{}- onNoActiveHostException(hostSetSize: {})", lbDescription, hostSetSize, exception);
public void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHostException exception) {
LOGGER.debug("{}- onNoActiveHostException(hosts: {}, host count: {})", lbDescription, hosts, hosts.size(),
exception);
}

private final class HostObserverImpl implements HostObserver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,15 @@ public final Single<C> selectConnection(Predicate<C> selector, @Nullable Context
return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
}

@Override
public final int hostSetSize() {
return hosts.size();
}

@Override
public final boolean isHealthy() {
// TODO: in the future we may want to make this more of a "are at least X hosts available" question
// so that we can compose a group of selectors into a priority set.
return anyHealthy(hosts);
}

final List<? extends Host<ResolvedAddress, C>> hosts() {
@Override
public final List<? extends Host<ResolvedAddress, C>> hosts() {
return hosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public Addr address() {
return address;
}

@Override
public double loadBalancingWeight() {
return 1;
}

@Override
public boolean markActiveIfNotClosed() {
final ConnState oldState = connStateUpdater.getAndUpdate(this, oldConnState -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio

// When we get a health-status event we should update the host set.
this.outlierDetectorStatusChangeStream = this.outlierDetector.healthStatusChanged().forEach((ignored) ->
sequentialExecutor.execute(() -> sequentialUpdateUsedHosts(usedHosts)));
sequentialExecutor.execute(() -> sequentialUpdateUsedHosts(usedHosts, true)));

// We subscribe to events as the very last step so that if we subscribe to an eager service discoverer
// we already have all the fields initialized.
Expand Down Expand Up @@ -308,6 +308,10 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
return;
}

// Notify the observer first so that we can emit the service discovery event before we start (potentially)
// emitting events related to creating new hosts.
loadBalancerObserver.onServiceDiscoveryEvent(events);

boolean sendReadyEvent = false;
final List<PrioritizedHostImpl<ResolvedAddress, C>> nextHosts = new ArrayList<>(
usedHosts.size() + events.size());
Expand Down Expand Up @@ -380,16 +384,7 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
nextHosts.add(createHost(event));
}
}

// Always send the event regardless of if we update the actual list.
loadBalancerObserver.onServiceDiscoveryEvent(events, usedHosts.size(), nextHosts.size());
// We've built a materially different host set so now set it for consumption and send our events.
if (hostSetChanged) {
sequentialUpdateUsedHosts(nextHosts);
}

LOGGER.debug("{}: now using addresses (size={}): {}.",
DefaultLoadBalancer.this, nextHosts.size(), nextHosts);
sequentialUpdateUsedHosts(nextHosts, hostSetChanged);
if (nextHosts.isEmpty()) {
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
} else if (sendReadyEvent) {
Expand Down Expand Up @@ -442,7 +437,7 @@ private PrioritizedHostImpl<ResolvedAddress, C> createHost(ServiceDiscovererEven
currentHosts, host);
// we only need to do anything else if we actually removed the host
if (nextHosts.size() != currentHosts.size()) {
sequentialUpdateUsedHosts(nextHosts);
sequentialUpdateUsedHosts(nextHosts, true);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
Expand Down Expand Up @@ -504,17 +499,27 @@ public void onComplete() {
}

// must be called from within the SequentialExecutor
private void sequentialUpdateUsedHosts(List<PrioritizedHostImpl<ResolvedAddress, C>> nextHosts) {
this.usedHosts = nextHosts;
// We need to reset the load balancing weights before we run the host set through the rest
// of the operations that will transform and consume the load balancing weight.
for (PrioritizedHostImpl<?, ?> host : nextHosts) {
host.loadBalancingWeight(host.serviceDiscoveryWeight());
}
nextHosts = priorityStrategy.prioritize(nextHosts);
nextHosts = subsetter.subset(nextHosts);
this.hostSelector = hostSelector.rebuildWithHosts(nextHosts);
loadBalancerObserver.onHostSetChanged(Collections.unmodifiableList(nextHosts));
private void sequentialUpdateUsedHosts(List<PrioritizedHostImpl<ResolvedAddress, C>> nextHosts,
boolean hostSetChanged) {
HostSelector<ResolvedAddress, C> oldHostSelector = hostSelector;
HostSelector<ResolvedAddress, C> newHostSelector;
if (hostSetChanged) {
this.usedHosts = nextHosts;
// We need to reset the load balancing weights before we run the host set through the rest
// of the operations that will transform and consume the load balancing weight.
for (PrioritizedHostImpl<?, ?> host : nextHosts) {
host.loadBalancingWeight(host.serviceDiscoveryWeight());
}
nextHosts = priorityStrategy.prioritize(nextHosts);
nextHosts = subsetter.subset(nextHosts);
this.hostSelector = newHostSelector = hostSelector.rebuildWithHosts(nextHosts);
} else {
newHostSelector = oldHostSelector;
}
final Collection<? extends LoadBalancerObserver.Host> newHosts = newHostSelector.hosts();
loadBalancerObserver.onHostsUpdate(Collections.unmodifiableCollection(oldHostSelector.hosts()),
Collections.unmodifiableCollection(newHosts));
LOGGER.debug("{}: Using addresses (size={}): {}.", this, newHosts.size(), newHosts);
}

@Override
Expand All @@ -541,8 +546,7 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
subscribeToEvents(true);
}
}
loadBalancerObserver.onNoActiveHostException(
currentHostSelector.hostSetSize(), (NoActiveHostException) exn);
loadBalancerObserver.onNoActiveHostException(currentHostSelector.hosts(), (NoActiveHostException) exn);
} else if (exn instanceof NoAvailableHostException) {
loadBalancerObserver.onNoAvailableHostException((NoAvailableHostException) exn);
}
Expand Down Expand Up @@ -628,8 +632,8 @@ public boolean isHealthy() {
}

@Override
public int hostSetSize() {
return 0;
public Collection<? extends LoadBalancerObserver.Host> hosts() {
return emptyList();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
* @param <ResolvedAddress> the type of resolved address.
* @param <C> the concrete type of returned connections.
*/
interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends ListenableAsyncCloseable, ScoreSupplier {
interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends ListenableAsyncCloseable, ScoreSupplier,
LoadBalancerObserver.Host {
/**
* Select an existing connection from the host.
* @return the selected host, or null if a suitable host couldn't be found.
Expand All @@ -49,15 +50,9 @@ interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends Listen
* The address of the host
* @return the address of the host
*/
@Override
ResolvedAddress address();

/**
* Determine the health status of this host.
* @return whether the host considers itself healthy enough to serve traffic. This is best effort and does not
* guarantee that the request will succeed.
*/
boolean isHealthy();

/**
* Determine whether the host is in a state where it can make new connections.
* @return whether the host is in a state where it can make new connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -71,10 +72,11 @@ Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
boolean isHealthy();

/**
* The size of the host candidate pool for this host selector.
* The set of hosts this selector will pick from.
* <p>
* Note that this is primarily for observability purposes.
* @return the size of the host candidate pool for this host selector.
* Note that this may differ from the hosts advertised by the {@link io.servicetalk.client.api.ServiceDiscoverer}
* due to various filtering mechanisms.
* @return the set of hosts this selector will pick from.
*/
int hostSetSize();
Collection<? extends LoadBalancerObserver.Host> hosts();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,18 @@ public interface LoadBalancerObserver {
/**
* Callback for monitoring the changes due to a service discovery update.
* @param events the collection of {@link ServiceDiscovererEvent}s received by the load balancer.
* @param oldHostSetSize the size of the previous host set.
* @param newHostSetSize the new size of the host set.
*/
void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> events,
int oldHostSetSize, int newHostSetSize);
void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> events);

/**
* Callback for when the set of hosts used by the load balancer has changed. This set may not
* exactly reflect the state of the service discovery system due to filtering of zero-weight
* hosts and forms of sub-setting and thus may only represent the hosts that the selection
* algorithm may use.
* @param oldHosts the old set of hosts used by the selection algorithm.
* @param newHosts the new set of hosts used by the selection algorithm.
*/
void onHostSetChanged(Collection<? extends Host> newHosts);
void onHostsUpdate(Collection<? extends Host> oldHosts, Collection<? extends Host> newHosts);

/**
* Callback for when connection selection fails due to no hosts being available.
Expand All @@ -60,10 +58,10 @@ void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> eve

/**
* Callback for when connection selection fails due to all hosts being inactive.
* @param hostSetSize the size of the current host set where all hosts are inactive.
* @param hosts the set of hosts that is eligible for selection.
* @param exception an exception with more details about the failure.
*/
void onNoActiveHostException(int hostSetSize, NoActiveHostException exception);
void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHostException exception);

/**
* An observer for {@link io.servicetalk.loadbalancer.Host} events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ public HostObserver hostObserver(Object resolvedAddress) {
}

@Override
public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> events,
int oldHostSetSize, int newHostSetSize) {
public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<?>> events) {
// noop
}

@Override
public void onHostSetChanged(Collection<? extends Host> newHosts) {
public void onHostsUpdate(Collection<? extends Host> oldHosts, Collection<? extends Host> newHosts) {
// noop
}

Expand All @@ -53,7 +52,7 @@ public void onNoAvailableHostException(NoAvailableHostException exception) {
}

@Override
public void onNoActiveHostException(int hostSetSize, NoActiveHostException exn) {
public void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHostException exn) {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public HostSelector<ResolvedAddress, C> rebuildWithHosts(List<? extends Host<Res
@Override
Single<C> selectConnection0(final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
final int size = hostSetSize();
final int size = hosts().size();
switch (size) {
case 0:
// We shouldn't get called if the load balancer doesn't have any hosts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,8 @@ public boolean isHealthy() {
}

@Override
public int hostSetSize() {
return hosts.size();
public Collection<? extends LoadBalancerObserver.Host> hosts() {
return hosts;
}
}
}
Expand Down

0 comments on commit e49939f

Please sign in to comment.