Skip to content

Commit

Permalink
HostSelector now has the collection of hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Dec 17, 2024
1 parent 504d5a9 commit 7bc9c9e
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void onNoAvailableHostException(final NoAvailableHostException exception)
}

@Override
public void onNoActiveHostException(int hostsCount, NoActiveHostException exception) {
LOGGER.debug("{}- onNoActiveHostException(hostSetSize: {})", lbDescription, hostsCount, exception);
public void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHostException exception) {
LOGGER.debug("{}- onNoActiveHostException(hosts: {})", lbDescription, hosts, exception);
}

private final class HostObserverImpl implements HostObserver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,15 @@ public final Single<C> selectConnection(Predicate<C> selector, @Nullable Context
return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
}

@Override
public final int hostSetSize() {
return hosts.size();
}

@Override
public final boolean isHealthy() {
// TODO: in the future we may want to make this more of a "are at least X hosts available" question
// so that we can compose a group of selectors into a priority set.
return anyHealthy(hosts);
}

final List<? extends Host<ResolvedAddress, C>> hosts() {
@Override
public final List<? extends Host<ResolvedAddress, C>> hosts() {
return hosts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public Addr address() {
return address;
}

@Override
public double loadBalancingWeight() {
return 1;
}

@Override
public boolean markActiveIfNotClosed() {
final ConnState oldState = connStateUpdater.getAndUpdate(this, oldConnState -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
private volatile HostSelector<ResolvedAddress, C> hostSelector;
// reads and writes are protected by `sequentialExecutor`.
private List<PrioritizedHostImpl<ResolvedAddress, C>> usedHosts = emptyList();
private List<PrioritizedHostImpl<ResolvedAddress, C>> prioritizedHosts = usedHosts;
// reads and writes are protected by `sequentialExecutor`.
private boolean isClosed;

Expand Down Expand Up @@ -502,21 +501,25 @@ public void onComplete() {
// must be called from within the SequentialExecutor
private void sequentialUpdateUsedHosts(List<PrioritizedHostImpl<ResolvedAddress, C>> nextHosts,
boolean hostSetChanged) {
final List<PrioritizedHostImpl<ResolvedAddress, C>> oldPrioritizedHosts = this.prioritizedHosts;
HostSelector<ResolvedAddress, C> oldHostSelector = hostSelector;
HostSelector<ResolvedAddress, C> newHostSelector;
if (hostSetChanged) {
this.usedHosts = nextHosts;
// We need to reset the load balancing weights before we run the host set through the rest
// of the operations that will transform and consume the load balancing weight.
for (PrioritizedHostImpl<?, ?> host : nextHosts) {
host.loadBalancingWeight(host.serviceDiscoveryWeight());
}
this.prioritizedHosts = priorityStrategy.prioritize(nextHosts);
this.prioritizedHosts = subsetter.subset(prioritizedHosts);
this.hostSelector = hostSelector.rebuildWithHosts(prioritizedHosts);
nextHosts = priorityStrategy.prioritize(nextHosts);
nextHosts = subsetter.subset(nextHosts);
this.hostSelector = newHostSelector = hostSelector.rebuildWithHosts(nextHosts);
} else {
newHostSelector = oldHostSelector;
}
loadBalancerObserver.onHostsUpdate(Collections.unmodifiableList(oldPrioritizedHosts),
Collections.unmodifiableList(prioritizedHosts));
LOGGER.debug("{}: Using addresses (size={}): {}.", this, prioritizedHosts.size(), prioritizedHosts);
final Collection<? extends Host<ResolvedAddress, C>> newHosts = newHostSelector.hosts();
loadBalancerObserver.onHostsUpdate(Collections.unmodifiableCollection(oldHostSelector.hosts()),
Collections.unmodifiableCollection(newHosts));
LOGGER.debug("{}: Using addresses (size={}): {}.", this, newHosts.size(), newHosts);
}

@Override
Expand All @@ -543,8 +546,7 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
subscribeToEvents(true);
}
}
loadBalancerObserver.onNoActiveHostException(
currentHostSelector.hostSetSize(), (NoActiveHostException) exn);
loadBalancerObserver.onNoActiveHostException(currentHostSelector.hosts(), (NoActiveHostException) exn);
} else if (exn instanceof NoAvailableHostException) {
loadBalancerObserver.onNoAvailableHostException((NoAvailableHostException) exn);
}
Expand Down Expand Up @@ -630,8 +632,8 @@ public boolean isHealthy() {
}

@Override
public int hostSetSize() {
return 0;
public List<? extends Host<ResolvedAddress, C>> hosts() {
return emptyList();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
* @param <ResolvedAddress> the type of resolved address.
* @param <C> the concrete type of returned connections.
*/
interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends ListenableAsyncCloseable, ScoreSupplier {
interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends ListenableAsyncCloseable, ScoreSupplier,
LoadBalancerObserver.Host {
/**
* Select an existing connection from the host.
* @return the selected host, or null if a suitable host couldn't be found.
Expand All @@ -49,15 +50,9 @@ interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends Listen
* The address of the host
* @return the address of the host
*/
@Override
ResolvedAddress address();

/**
* Determine the health status of this host.
* @return whether the host considers itself healthy enough to serve traffic. This is best effort and does not
* guarantee that the request will succeed.
*/
boolean isHealthy();

/**
* Determine whether the host is in a state where it can make new connections.
* @return whether the host is in a state where it can make new connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -71,10 +72,11 @@ Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
boolean isHealthy();

/**
* The size of the host candidate pool for this host selector.
* The set of hosts this selector will pick from.
* <p>
* Note that this is primarily for observability purposes.
* @return the size of the host candidate pool for this host selector.
* Note that this may differ from the hosts advertised by the {@link io.servicetalk.client.api.ServiceDiscoverer}
* due to various filtering mechanisms.
* @return the set of hosts this selector will pick from.
*/
int hostSetSize();
Collection<? extends Host<ResolvedAddress, C>> hosts();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public interface LoadBalancerObserver {

/**
* Callback for when connection selection fails due to all hosts being inactive.
* @param hostsCount the size of the current host set where all hosts are inactive.
* @param hosts the set of hosts that is eligible for selection.
* @param exception an exception with more details about the failure.
*/
void onNoActiveHostException(int hostsCount, NoActiveHostException exception);
void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHostException exception);

/**
* An observer for {@link io.servicetalk.loadbalancer.Host} events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void onNoAvailableHostException(NoAvailableHostException exception) {
}

@Override
public void onNoActiveHostException(int hostsCount, NoActiveHostException exn) {
public void onNoActiveHostException(Collection<? extends Host> hosts, NoActiveHostException exn) {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public HostSelector<ResolvedAddress, C> rebuildWithHosts(List<? extends Host<Res
@Override
Single<C> selectConnection0(final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
final int size = hostSetSize();
final int size = hosts().size();
switch (size) {
case 0:
// We shouldn't get called if the load balancer doesn't have any hosts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,8 @@ public boolean isHealthy() {
}

@Override
public int hostSetSize() {
return hosts.size();
public List<? extends Host<String, TestLoadBalancedConnection>> hosts() {
return hosts;
}
}
}
Expand Down

0 comments on commit 7bc9c9e

Please sign in to comment.