diff --git a/servicetalk-loadbalancer-experimental-provider/src/main/java/io/servicetalk/loadbalancer/experimental/DefaultLoadBalancerObserver.java b/servicetalk-loadbalancer-experimental-provider/src/main/java/io/servicetalk/loadbalancer/experimental/DefaultLoadBalancerObserver.java index 208ebade32..6a1a0eca3e 100644 --- a/servicetalk-loadbalancer-experimental-provider/src/main/java/io/servicetalk/loadbalancer/experimental/DefaultLoadBalancerObserver.java +++ b/servicetalk-loadbalancer-experimental-provider/src/main/java/io/servicetalk/loadbalancer/experimental/DefaultLoadBalancerObserver.java @@ -44,14 +44,12 @@ public HostObserver hostObserver(Object resolvedAddress) { } @Override - public void onServiceDiscoveryEvent(Collection> events, int oldHostSetSize, - int newHostSetSize) { - LOGGER.debug("{}- onServiceDiscoveryEvent(events: {}, oldHostSetSize: {}, newHostSetSize: {})", - lbDescription, events, oldHostSetSize, newHostSetSize); + public void onServiceDiscoveryEvent(Collection> events) { + LOGGER.debug("{}- onServiceDiscoveryEvent(events: {}, count {})", lbDescription, events, events.size()); } @Override - public void onHostSetChanged(Collection newHosts) { + public void onHostsUpdate(Collection oldHosts, Collection newHosts) { if (LOGGER.isDebugEnabled()) { int healthyCount = 0; for (Host host : newHosts) { @@ -59,8 +57,9 @@ public void onHostSetChanged(Collection newHosts) { healthyCount++; } } - LOGGER.debug("{}- onHostSetChanged(host set size: {}, healthy: {}). New hosts: {}", lbDescription, - newHosts.size(), healthyCount, newHosts); + LOGGER.debug("{}- onHostsUpdate(old hosts: {}, new hosts: {}), old host count: {}, new host count: {}, " + + "new healthy count: {}", + lbDescription, oldHosts, newHosts, oldHosts.size(), newHosts.size(), healthyCount); } } @@ -70,8 +69,9 @@ public void onNoAvailableHostException(final NoAvailableHostException exception) } @Override - public void onNoActiveHostException(int hostSetSize, NoActiveHostException exception) { - LOGGER.debug("{}- onNoActiveHostException(hostSetSize: {})", lbDescription, hostSetSize, exception); + public void onNoActiveHostException(Collection hosts, NoActiveHostException exception) { + LOGGER.debug("{}- onNoActiveHostException(hosts: {}, host count: {})", lbDescription, hosts, hosts.size(), + exception); } private final class HostObserverImpl implements HostObserver { diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java index ddf1f90e31..01aaa9a7d2 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java @@ -48,11 +48,6 @@ public final Single selectConnection(Predicate selector, @Nullable Context return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve); } - @Override - public final int hostSetSize() { - return hosts.size(); - } - @Override public final boolean isHealthy() { // TODO: in the future we may want to make this more of a "are at least X hosts available" question @@ -60,7 +55,8 @@ public final boolean isHealthy() { return anyHealthy(hosts); } - final List> hosts() { + @Override + public final List> hosts() { return hosts; } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java index 5940dc92f0..b05228e5cb 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java @@ -116,6 +116,11 @@ public Addr address() { return address; } + @Override + public double loadBalancingWeight() { + return 1; + } + @Override public boolean markActiveIfNotClosed() { final ConnState oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index 274e480967..eae092a136 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -174,7 +174,7 @@ final class DefaultLoadBalancer - sequentialExecutor.execute(() -> sequentialUpdateUsedHosts(usedHosts))); + sequentialExecutor.execute(() -> sequentialUpdateUsedHosts(usedHosts, true))); // We subscribe to events as the very last step so that if we subscribe to an eager service discoverer // we already have all the fields initialized. @@ -308,6 +308,10 @@ private void sequentialOnNext(Collection> nextHosts = new ArrayList<>( usedHosts.size() + events.size()); @@ -380,16 +384,7 @@ private void sequentialOnNext(Collection createHost(ServiceDiscovererEven currentHosts, host); // we only need to do anything else if we actually removed the host if (nextHosts.size() != currentHosts.size()) { - sequentialUpdateUsedHosts(nextHosts); + sequentialUpdateUsedHosts(nextHosts, true); if (nextHosts.isEmpty()) { // We transitioned from non-empty to empty. That means we're not ready. eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT); @@ -504,17 +499,27 @@ public void onComplete() { } // must be called from within the SequentialExecutor - private void sequentialUpdateUsedHosts(List> nextHosts) { - this.usedHosts = nextHosts; - // We need to reset the load balancing weights before we run the host set through the rest - // of the operations that will transform and consume the load balancing weight. - for (PrioritizedHostImpl host : nextHosts) { - host.loadBalancingWeight(host.serviceDiscoveryWeight()); - } - nextHosts = priorityStrategy.prioritize(nextHosts); - nextHosts = subsetter.subset(nextHosts); - this.hostSelector = hostSelector.rebuildWithHosts(nextHosts); - loadBalancerObserver.onHostSetChanged(Collections.unmodifiableList(nextHosts)); + private void sequentialUpdateUsedHosts(List> nextHosts, + boolean hostSetChanged) { + HostSelector oldHostSelector = hostSelector; + HostSelector newHostSelector; + if (hostSetChanged) { + this.usedHosts = nextHosts; + // We need to reset the load balancing weights before we run the host set through the rest + // of the operations that will transform and consume the load balancing weight. + for (PrioritizedHostImpl host : nextHosts) { + host.loadBalancingWeight(host.serviceDiscoveryWeight()); + } + nextHosts = priorityStrategy.prioritize(nextHosts); + nextHosts = subsetter.subset(nextHosts); + this.hostSelector = newHostSelector = hostSelector.rebuildWithHosts(nextHosts); + } else { + newHostSelector = oldHostSelector; + } + final Collection newHosts = newHostSelector.hosts(); + loadBalancerObserver.onHostsUpdate(Collections.unmodifiableCollection(oldHostSelector.hosts()), + Collections.unmodifiableCollection(newHosts)); + LOGGER.debug("{}: Using addresses (size={}): {}.", this, newHosts.size(), newHosts); } @Override @@ -541,8 +546,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final subscribeToEvents(true); } } - loadBalancerObserver.onNoActiveHostException( - currentHostSelector.hostSetSize(), (NoActiveHostException) exn); + loadBalancerObserver.onNoActiveHostException(currentHostSelector.hosts(), (NoActiveHostException) exn); } else if (exn instanceof NoAvailableHostException) { loadBalancerObserver.onNoAvailableHostException((NoAvailableHostException) exn); } @@ -628,8 +632,8 @@ public boolean isHealthy() { } @Override - public int hostSetSize() { - return 0; + public Collection hosts() { + return emptyList(); } } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java index b0ab11ffb1..9497ba0d23 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/Host.java @@ -29,7 +29,8 @@ * @param the type of resolved address. * @param the concrete type of returned connections. */ -interface Host extends ListenableAsyncCloseable, ScoreSupplier { +interface Host extends ListenableAsyncCloseable, ScoreSupplier, + LoadBalancerObserver.Host { /** * Select an existing connection from the host. * @return the selected host, or null if a suitable host couldn't be found. @@ -49,15 +50,9 @@ interface Host extends Listen * The address of the host * @return the address of the host */ + @Override ResolvedAddress address(); - /** - * Determine the health status of this host. - * @return whether the host considers itself healthy enough to serve traffic. This is best effort and does not - * guarantee that the request will succeed. - */ - boolean isHealthy(); - /** * Determine whether the host is in a state where it can make new connections. * @return whether the host is in a state where it can make new connections. diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostSelector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostSelector.java index 489db08e46..5febbacb9a 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostSelector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/HostSelector.java @@ -19,6 +19,7 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; +import java.util.Collection; import java.util.List; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -71,10 +72,11 @@ Single selectConnection(Predicate selector, @Nullable ContextMap context, boolean isHealthy(); /** - * The size of the host candidate pool for this host selector. + * The set of hosts this selector will pick from. *

- * Note that this is primarily for observability purposes. - * @return the size of the host candidate pool for this host selector. + * Note that this may differ from the hosts advertised by the {@link io.servicetalk.client.api.ServiceDiscoverer} + * due to various filtering mechanisms. + * @return the set of hosts this selector will pick from. */ - int hostSetSize(); + Collection hosts(); } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java index df344be58e..c223ce4db5 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java @@ -37,20 +37,18 @@ public interface LoadBalancerObserver { /** * Callback for monitoring the changes due to a service discovery update. * @param events the collection of {@link ServiceDiscovererEvent}s received by the load balancer. - * @param oldHostSetSize the size of the previous host set. - * @param newHostSetSize the new size of the host set. */ - void onServiceDiscoveryEvent(Collection> events, - int oldHostSetSize, int newHostSetSize); + void onServiceDiscoveryEvent(Collection> events); /** * Callback for when the set of hosts used by the load balancer has changed. This set may not * exactly reflect the state of the service discovery system due to filtering of zero-weight * hosts and forms of sub-setting and thus may only represent the hosts that the selection * algorithm may use. + * @param oldHosts the old set of hosts used by the selection algorithm. * @param newHosts the new set of hosts used by the selection algorithm. */ - void onHostSetChanged(Collection newHosts); + void onHostsUpdate(Collection oldHosts, Collection newHosts); /** * Callback for when connection selection fails due to no hosts being available. @@ -60,10 +58,10 @@ void onServiceDiscoveryEvent(Collection> eve /** * Callback for when connection selection fails due to all hosts being inactive. - * @param hostSetSize the size of the current host set where all hosts are inactive. + * @param hosts the set of hosts that is eligible for selection. * @param exception an exception with more details about the failure. */ - void onNoActiveHostException(int hostSetSize, NoActiveHostException exception); + void onNoActiveHostException(Collection hosts, NoActiveHostException exception); /** * An observer for {@link io.servicetalk.loadbalancer.Host} events. diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java index 9839d75983..ddbc2770e3 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java @@ -37,13 +37,12 @@ public HostObserver hostObserver(Object resolvedAddress) { } @Override - public void onServiceDiscoveryEvent(Collection> events, - int oldHostSetSize, int newHostSetSize) { + public void onServiceDiscoveryEvent(Collection> events) { // noop } @Override - public void onHostSetChanged(Collection newHosts) { + public void onHostsUpdate(Collection oldHosts, Collection newHosts) { // noop } @@ -53,7 +52,7 @@ public void onNoAvailableHostException(NoAvailableHostException exception) { } @Override - public void onNoActiveHostException(int hostSetSize, NoActiveHostException exn) { + public void onNoActiveHostException(Collection hosts, NoActiveHostException exn) { // noop } diff --git a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java index b414215342..0ae8535674 100644 --- a/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java +++ b/servicetalk-loadbalancer-experimental/src/main/java/io/servicetalk/loadbalancer/P2CSelector.java @@ -68,7 +68,7 @@ public HostSelector rebuildWithHosts(List selectConnection0(final Predicate selector, @Nullable final ContextMap context, final boolean forceNewConnectionAndReserve) { - final int size = hostSetSize(); + final int size = hosts().size(); switch (size) { case 0: // We shouldn't get called if the load balancer doesn't have any hosts. diff --git a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java index e7d479a99f..37ad089230 100644 --- a/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java +++ b/servicetalk-loadbalancer-experimental/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java @@ -486,8 +486,8 @@ public boolean isHealthy() { } @Override - public int hostSetSize() { - return hosts.size(); + public Collection hosts() { + return hosts; } } }