From c6a61d4cd6a0b9f851247e636dec8c12e621b8ba Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Wed, 13 Dec 2023 10:26:12 -0700 Subject: [PATCH] loadbalancer: fix observer name pattern Motivation: Most observers have methods that start with 'on' for event methods. That didn't happen with LoadBalancerObserver. Modifications: - Fix the observer method names. - Use the `onNoActiveHostsAvailable` event. --- .../loadbalancer/BaseHostSelector.java | 5 +++ .../servicetalk/loadbalancer/DefaultHost.java | 16 ++++----- .../loadbalancer/DefaultLoadBalancer.java | 27 +++++++++----- .../loadbalancer/HostSelector.java | 7 ++++ .../loadbalancer/LoadBalancerObserver.java | 26 ++++++++------ .../NoopLoadBalancerObserver.java | 22 ++++++------ .../loadbalancer/DefaultHostTest.java | 36 ++++++++----------- .../loadbalancer/DefaultLoadBalancerTest.java | 5 +++ 8 files changed, 84 insertions(+), 60 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java index 865fafba20..cf1056c7ca 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/BaseHostSelector.java @@ -45,6 +45,11 @@ public final Single selectConnection(Predicate selector, @Nullable Context return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve); } + @Override + public final int hostSetSize() { + return hosts.size(); + } + @Override public final boolean isUnHealthy() { // TODO: in the future we may want to make this more of a "are at least X hosts available" question diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java index fe1a16f7ed..d59011e8d9 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultHost.java @@ -113,7 +113,7 @@ private enum State { this.hostObserver = requireNonNull(hostObserver, "hostObserver"); this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); - hostObserver.hostCreated(address); + hostObserver.onHostCreated(address); } @Override @@ -133,7 +133,7 @@ public boolean markActiveIfNotClosed() { return oldConnState; }); if (oldState.state == State.EXPIRED) { - hostObserver.expiredHostRevived(address, oldState.connections.length); + hostObserver.onExpiredHostRevived(address, oldState.connections.length); } return oldState.state != State.CLOSED; } @@ -156,7 +156,7 @@ public void markClosed() { // closing should probably be re-worked to funnel closing behavior through one place // and also define what being closed means: just the host isn't used anymore for new // requests/connections or does it also mean that all connections have closed? - hostObserver.activeHostRemoved(address, toRemove.length); + hostObserver.onActiveHostRemoved(address, toRemove.length); } } @@ -186,7 +186,7 @@ public boolean markExpired() { if (connStateUpdater.compareAndSet(this, oldState, new ConnState(oldState.connections, nextState))) { cancelIfHealthCheck(oldState); - hostObserver.hostMarkedExpired(address, oldState.connections.length); + hostObserver.onHostMarkedExpired(address, oldState.connections.length); if (nextState == State.CLOSED) { // Trigger the callback to remove the host from usedHosts array. this.closeAsync().subscribe(); @@ -299,7 +299,7 @@ private void markHealthy(final HealthCheck originalHealthCheckState) { } // Only if the previous state was a healthcheck should we notify the observer. if (isUnhealthy(oldState)) { - hostObserver.hostRevived(address); + hostObserver.onHostRevived(address); } } @@ -337,7 +337,7 @@ private void markUnhealthy(final Throwable cause) { "{} time(s) in a row. Error counting threshold reached, marking this host as " + "UNHEALTHY for the selection algorithm and triggering background health-checking.", lbDescription, address, healthCheckConfig.failedThreshold, cause); - hostObserver.hostMarkedUnhealthy(address, cause); + hostObserver.onHostMarkedUnhealthy(address, cause); healthCheck.schedule(cause); break; } @@ -398,7 +398,7 @@ previous, new ConnState(newList, newState))) { cancelIfHealthCheck(previous); } // If we transitioned from unhealth to healthy we need to let the observer know. - hostObserver.hostRevived(address); + hostObserver.onHostRevived(address); } break; } @@ -444,7 +444,7 @@ previous, new ConnState(newList, newState))) { // in the next iteration. && connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) { this.closeAsync().subscribe(); - hostObserver.expiredHostRemoved(address); + hostObserver.onExpiredHostRemoved(address); break; } } else { diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index 7bf7a13e70..be0fe6d688 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -334,7 +334,7 @@ private void sequentialOnNext(Collection createHost(ResolvedAddress addr) { currentHosts, current -> current == host); // we only need to do anything else if we actually removed the host if (nextHosts.size() != currentHosts.size()) { - loadBalancerObserver.hostObserver().expiredHostRemoved(host.address()); + loadBalancerObserver.hostObserver().onExpiredHostRemoved(host.address()); sequentialUpdateUsedHosts(nextHosts); if (nextHosts.isEmpty()) { // We transitioned from non-empty to empty. That means we're not ready. @@ -472,15 +472,19 @@ private Single selectConnection0(final Predicate selector, @Nullable final final HostSelector currentHostSelector = hostSelector; Single result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve); return result.beforeOnError(exn -> { - if (exn instanceof NoActiveHostException && currentHostSelector.isUnHealthy()) { - final long currNextResubscribeTime = nextResubscribeTime; - if (currNextResubscribeTime >= 0 && - healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && - nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { - subscribeToEvents(true); + if (exn instanceof NoActiveHostException) { + if (currentHostSelector.isUnHealthy()) { + final long currNextResubscribeTime = nextResubscribeTime; + if (currNextResubscribeTime >= 0 && + healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime && + nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) { + subscribeToEvents(true); + } } + loadBalancerObserver.onNoActiveHostsAvailable( + currentHostSelector.hostSetSize(), (NoActiveHostException) exn); } else if (exn instanceof NoAvailableHostException) { - loadBalancerObserver.noHostsAvailable(); + loadBalancerObserver.onNoHostsAvailable(); } }); } @@ -561,5 +565,10 @@ public HostSelector rebuildWithHosts(List selectConnection(Predicate selector, @Nullable ContextMap context, * @return whether the load balancer believes itself unhealthy enough and unlikely to successfully serve traffic. */ boolean isUnHealthy(); + + /** + * The size of the host candidate pool for this host selector. + * Note that this is primarily for observability purposes. + * @return the size of the host candidate pool for this host selector. + */ + int hostSetSize(); } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java index fe6e221084..7b4805632f 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/LoadBalancerObserver.java @@ -36,19 +36,23 @@ interface LoadBalancerObserver { /** * Callback for when connection selection fails due to no hosts being available. */ - void noHostsAvailable(); + void onNoHostsAvailable(); /** * Callback for monitoring the changes due to a service discovery update. */ - void serviceDiscoveryEvent(Collection> events, - int oldHostSetSize, int newHostSetSize); + void onServiceDiscoveryEvent(Collection> events, + int oldHostSetSize, int newHostSetSize); /** * Callback for when connection selection fails due to all hosts being inactive. */ - void noActiveHostsAvailable(int hostSetSize, NoActiveHostException exn); + void onNoActiveHostsAvailable(int hostSetSize, NoActiveHostException exn); + /** + * An observer for {@link Host} events. + * @param the type of the resolved address. + */ interface HostObserver { /** @@ -56,45 +60,45 @@ interface HostObserver { * @param address the resolved address. * @param connectionCount the number of active connections for the host. */ - void hostMarkedExpired(ResolvedAddress address, int connectionCount); + void onHostMarkedExpired(ResolvedAddress address, int connectionCount); /** * Callback for when a host is removed by service discovery. * @param address the resolved address. * @param connectionCount the number of connections that were associated with the host. */ - void activeHostRemoved(ResolvedAddress address, int connectionCount); + void onActiveHostRemoved(ResolvedAddress address, int connectionCount); /** * Callback for when an expired host is returned to an active state. * @param address the resolved address. * @param connectionCount the number of active connections when the host was revived. */ - void expiredHostRevived(ResolvedAddress address, int connectionCount); + void onExpiredHostRevived(ResolvedAddress address, int connectionCount); /** * Callback for when an expired host is removed. * @param address the resolved address. */ - void expiredHostRemoved(ResolvedAddress address); + void onExpiredHostRemoved(ResolvedAddress address); /** * Callback for when a host is created. * @param address the resolved address. */ - void hostCreated(ResolvedAddress address); + void onHostCreated(ResolvedAddress address); /** * Callback for when a {@link Host} transitions from healthy to unhealthy. * @param address the resolved address. * @param cause the most recent cause of the transition. */ - void hostMarkedUnhealthy(ResolvedAddress address, @Nullable Throwable cause); + void onHostMarkedUnhealthy(ResolvedAddress address, @Nullable Throwable cause); /** * Callback for when a {@link Host} transitions from unhealthy to healthy. * @param address the resolved address. */ - void hostRevived(ResolvedAddress address); + void onHostRevived(ResolvedAddress address); } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java index 09e185ee02..baf5dfbf80 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/NoopLoadBalancerObserver.java @@ -34,18 +34,18 @@ public HostObserver hostObserver() { } @Override - public void noHostsAvailable() { + public void onNoHostsAvailable() { // noop } @Override - public void noActiveHostsAvailable(int hostSetSize, NoActiveHostException exn) { + public void onNoActiveHostsAvailable(int hostSetSize, NoActiveHostException exn) { // noop } @Override - public void serviceDiscoveryEvent(Collection> events, - int oldHostSetSize, int newHostSetSize) { + public void onServiceDiscoveryEvent(Collection> events, + int oldHostSetSize, int newHostSetSize) { // noop } @@ -58,37 +58,37 @@ private NoopHostObserver() { } @Override - public void hostMarkedExpired(ResolvedAddress resolvedAddress, int connectionCount) { + public void onHostMarkedExpired(ResolvedAddress resolvedAddress, int connectionCount) { // noop } @Override - public void expiredHostRemoved(ResolvedAddress resolvedAddress) { + public void onExpiredHostRemoved(ResolvedAddress resolvedAddress) { // noop } @Override - public void expiredHostRevived(ResolvedAddress resolvedAddress, int connectionCount) { + public void onExpiredHostRevived(ResolvedAddress resolvedAddress, int connectionCount) { // noop } @Override - public void activeHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) { + public void onActiveHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) { // noop } @Override - public void hostCreated(ResolvedAddress resolvedAddress) { + public void onHostCreated(ResolvedAddress resolvedAddress) { // noop } @Override - public void hostMarkedUnhealthy(ResolvedAddress address, Throwable cause) { + public void onHostMarkedUnhealthy(ResolvedAddress address, Throwable cause) { // noop } @Override - public void hostRevived(ResolvedAddress address) { + public void onHostRevived(ResolvedAddress address) { // noop } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java index 0357bc81b2..66d7b8deb1 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultHostTest.java @@ -77,21 +77,19 @@ private void buildHost() { @Test void hostCreatedEvents() { buildHost(); - verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS); + verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); // make another one, just for good measure. new DefaultHost<>("lbDescription", "address2", connectionFactory, Integer.MAX_VALUE, healthCheckConfig, mockHostObserver); - verify(mockHostObserver, times(1)).hostCreated("address2"); + verify(mockHostObserver, times(1)).onHostCreated("address2"); } @Test void activeHostClosed() { buildHost(); - verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS); + verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); host.markClosed(); - verify(mockHostObserver, times(1)) - .activeHostRemoved(DEFAULT_ADDRESS, - 0); + verify(mockHostObserver, times(1)).onActiveHostRemoved(DEFAULT_ADDRESS, 0); // TODO: this is backwards and should assert true: our `markClosed` method doesn't trigger the host to close // down. See the related comment in `DefaultHost.markClosed`. assertThat(host.onClose().toFuture().isDone(), is(false)); @@ -100,25 +98,23 @@ void activeHostClosed() { @Test void activeHostExpires() { buildHost(); - verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS); + verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); host.markExpired(); - verify(mockHostObserver, times(1)) - .hostMarkedExpired(DEFAULT_ADDRESS, 0); + verify(mockHostObserver, times(1)).onHostMarkedExpired(DEFAULT_ADDRESS, 0); assertThat(host.onClose().toFuture().isDone(), is(true)); } @Test void expiredHostClosesAfterLastConnectionClosed() throws Exception { buildHost(); - verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS); + verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); TestLoadBalancedConnection cxn = host.newConnection(any(), false, null).toFuture().get(); host.markExpired(); - verify(mockHostObserver, times(1)) - .hostMarkedExpired(DEFAULT_ADDRESS, 1); + verify(mockHostObserver, times(1)).onHostMarkedExpired(DEFAULT_ADDRESS, 1); assertThat(host.onClose().toFuture().isDone(), is(false)); cxn.closeAsync().subscribe(); assertThat(host.onClose().toFuture().isDone(), is(true)); - verify(mockHostObserver).expiredHostRemoved(DEFAULT_ADDRESS); + verify(mockHostObserver).onExpiredHostRemoved(DEFAULT_ADDRESS); // shouldn't able to revive it. assertThat(host.markActiveIfNotClosed(), is(false)); } @@ -126,14 +122,14 @@ void expiredHostClosesAfterLastConnectionClosed() throws Exception { @Test void expiredHostRevives() throws Exception { buildHost(); - verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS); + verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); host.newConnection(any(), false, null).toFuture().get(); host.markExpired(); verify(mockHostObserver, times(1)) - .hostMarkedExpired(DEFAULT_ADDRESS, 1); + .onHostMarkedExpired(DEFAULT_ADDRESS, 1); assertThat(host.onClose().toFuture().isDone(), is(false)); assertThat(host.markActiveIfNotClosed(), is(true)); - verify(mockHostObserver).expiredHostRevived(DEFAULT_ADDRESS, 1); + verify(mockHostObserver).onExpiredHostRevived(DEFAULT_ADDRESS, 1); } @Test @@ -150,20 +146,18 @@ void l4ConsecutiveFailuresAreDetected() throws Exception { Duration.ofMillis(1), Duration.ZERO); buildHost(); - verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS); + verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS); for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; i++) { assertThrows(ExecutionException.class, () -> host.newConnection(any(), false, null).toFuture().get()); } - verify(mockHostObserver, times(1)) - .hostMarkedUnhealthy(DEFAULT_ADDRESS, UNHEALTHY_HOST_EXCEPTION); + verify(mockHostObserver, times(1)).onHostMarkedUnhealthy(DEFAULT_ADDRESS, UNHEALTHY_HOST_EXCEPTION); // now revive and we should see the event and be able to get the connection. unhealthyHostConnectionFactory.advanceTime(testExecutor); assertThat(host.newConnection(any(), false, null).toFuture().get(), is(testLoadBalancedConnection)); - verify(mockHostObserver, times(1)) - .hostRevived(DEFAULT_ADDRESS); + verify(mockHostObserver, times(1)).onHostRevived(DEFAULT_ADDRESS); } static Predicate any() { diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java index 5639db1072..4e80517642 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/DefaultLoadBalancerTest.java @@ -152,6 +152,11 @@ public HostSelector rebuildWithHosts( public boolean isUnHealthy() { return false; } + + @Override + public int hostSetSize() { + return hosts.size(); + } } } }