Skip to content

Commit

Permalink
loadbalancer: fix observer name pattern
Browse files Browse the repository at this point in the history
Motivation:

Most observers have methods that start with 'on' for
event methods. That didn't happen with LoadBalancerObserver.

Modifications:

- Fix the observer method names.
- Use the `onNoActiveHostsAvailable` event.
  • Loading branch information
bryce-anderson committed Dec 13, 2023
1 parent 82ad584 commit c6a61d4
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public final Single<C> selectConnection(Predicate<C> selector, @Nullable Context
return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
}

@Override
public final int hostSetSize() {
return hosts.size();
}

@Override
public final boolean isUnHealthy() {
// TODO: in the future we may want to make this more of a "are at least X hosts available" question
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private enum State {
this.hostObserver = requireNonNull(hostObserver, "hostObserver");
this.closeable = toAsyncCloseable(graceful ->
graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync));
hostObserver.hostCreated(address);
hostObserver.onHostCreated(address);
}

@Override
Expand All @@ -133,7 +133,7 @@ public boolean markActiveIfNotClosed() {
return oldConnState;
});
if (oldState.state == State.EXPIRED) {
hostObserver.expiredHostRevived(address, oldState.connections.length);
hostObserver.onExpiredHostRevived(address, oldState.connections.length);
}
return oldState.state != State.CLOSED;
}
Expand All @@ -156,7 +156,7 @@ public void markClosed() {
// closing should probably be re-worked to funnel closing behavior through one place
// and also define what being closed means: just the host isn't used anymore for new
// requests/connections or does it also mean that all connections have closed?
hostObserver.activeHostRemoved(address, toRemove.length);
hostObserver.onActiveHostRemoved(address, toRemove.length);
}
}

Expand Down Expand Up @@ -186,7 +186,7 @@ public boolean markExpired() {
if (connStateUpdater.compareAndSet(this, oldState,
new ConnState(oldState.connections, nextState))) {
cancelIfHealthCheck(oldState);
hostObserver.hostMarkedExpired(address, oldState.connections.length);
hostObserver.onHostMarkedExpired(address, oldState.connections.length);
if (nextState == State.CLOSED) {
// Trigger the callback to remove the host from usedHosts array.
this.closeAsync().subscribe();
Expand Down Expand Up @@ -299,7 +299,7 @@ private void markHealthy(final HealthCheck<Addr, C> originalHealthCheckState) {
}
// Only if the previous state was a healthcheck should we notify the observer.
if (isUnhealthy(oldState)) {
hostObserver.hostRevived(address);
hostObserver.onHostRevived(address);
}
}

Expand Down Expand Up @@ -337,7 +337,7 @@ private void markUnhealthy(final Throwable cause) {
"{} time(s) in a row. Error counting threshold reached, marking this host as " +
"UNHEALTHY for the selection algorithm and triggering background health-checking.",
lbDescription, address, healthCheckConfig.failedThreshold, cause);
hostObserver.hostMarkedUnhealthy(address, cause);
hostObserver.onHostMarkedUnhealthy(address, cause);
healthCheck.schedule(cause);
break;
}
Expand Down Expand Up @@ -398,7 +398,7 @@ previous, new ConnState(newList, newState))) {
cancelIfHealthCheck(previous);
}
// If we transitioned from unhealth to healthy we need to let the observer know.
hostObserver.hostRevived(address);
hostObserver.onHostRevived(address);
}
break;
}
Expand Down Expand Up @@ -444,7 +444,7 @@ previous, new ConnState(newList, newState))) {
// in the next iteration.
&& connStateUpdater.compareAndSet(this, currentConnState, CLOSED_CONN_STATE)) {
this.closeAsync().subscribe();
hostObserver.expiredHostRemoved(address);
hostObserver.onExpiredHostRemoved(address);
break;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
}

// Always send the event regardless of if we update the actual list.
loadBalancerObserver.serviceDiscoveryEvent(events, usedHosts.size(), nextHosts.size());
loadBalancerObserver.onServiceDiscoveryEvent(events, usedHosts.size(), nextHosts.size());
// We've built a materially different host set so now set it for consumption and send our events.
if (hostSetChanged) {
sequentialUpdateUsedHosts(nextHosts);
Expand Down Expand Up @@ -388,7 +388,7 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
currentHosts, current -> current == host);
// we only need to do anything else if we actually removed the host
if (nextHosts.size() != currentHosts.size()) {
loadBalancerObserver.hostObserver().expiredHostRemoved(host.address());
loadBalancerObserver.hostObserver().onExpiredHostRemoved(host.address());
sequentialUpdateUsedHosts(nextHosts);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
Expand Down Expand Up @@ -472,15 +472,19 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
final HostSelector<ResolvedAddress, C> currentHostSelector = hostSelector;
Single<C> result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
return result.beforeOnError(exn -> {
if (exn instanceof NoActiveHostException && currentHostSelector.isUnHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) {
subscribeToEvents(true);
if (exn instanceof NoActiveHostException) {
if (currentHostSelector.isUnHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
nextResubscribeTimeUpdater.compareAndSet(this, currNextResubscribeTime, RESUBSCRIBING)) {
subscribeToEvents(true);
}
}
loadBalancerObserver.onNoActiveHostsAvailable(
currentHostSelector.hostSetSize(), (NoActiveHostException) exn);
} else if (exn instanceof NoAvailableHostException) {
loadBalancerObserver.noHostsAvailable();
loadBalancerObserver.onNoHostsAvailable();
}
});
}
Expand Down Expand Up @@ -561,5 +565,10 @@ public HostSelector<ResolvedAddress, C> rebuildWithHosts(List<Host<ResolvedAddre
public boolean isUnHealthy() {
return false;
}

@Override
public int hostSetSize() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,11 @@ Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
* @return whether the load balancer believes itself unhealthy enough and unlikely to successfully serve traffic.
*/
boolean isUnHealthy();

/**
* The size of the host candidate pool for this host selector.
* Note that this is primarily for observability purposes.
* @return the size of the host candidate pool for this host selector.
*/
int hostSetSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,65 +36,69 @@ interface LoadBalancerObserver<ResolvedAddress> {
/**
* Callback for when connection selection fails due to no hosts being available.
*/
void noHostsAvailable();
void onNoHostsAvailable();

/**
* Callback for monitoring the changes due to a service discovery update.
*/
void serviceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events,
int oldHostSetSize, int newHostSetSize);
void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events,
int oldHostSetSize, int newHostSetSize);

/**
* Callback for when connection selection fails due to all hosts being inactive.
*/
void noActiveHostsAvailable(int hostSetSize, NoActiveHostException exn);
void onNoActiveHostsAvailable(int hostSetSize, NoActiveHostException exn);

/**
* An observer for {@link Host} events.
* @param <ResolvedAddress> the type of the resolved address.
*/
interface HostObserver<ResolvedAddress> {

/**
* Callback for when an active host is marked expired.
* @param address the resolved address.
* @param connectionCount the number of active connections for the host.
*/
void hostMarkedExpired(ResolvedAddress address, int connectionCount);
void onHostMarkedExpired(ResolvedAddress address, int connectionCount);

/**
* Callback for when a host is removed by service discovery.
* @param address the resolved address.
* @param connectionCount the number of connections that were associated with the host.
*/
void activeHostRemoved(ResolvedAddress address, int connectionCount);
void onActiveHostRemoved(ResolvedAddress address, int connectionCount);

/**
* Callback for when an expired host is returned to an active state.
* @param address the resolved address.
* @param connectionCount the number of active connections when the host was revived.
*/
void expiredHostRevived(ResolvedAddress address, int connectionCount);
void onExpiredHostRevived(ResolvedAddress address, int connectionCount);

/**
* Callback for when an expired host is removed.
* @param address the resolved address.
*/
void expiredHostRemoved(ResolvedAddress address);
void onExpiredHostRemoved(ResolvedAddress address);

/**
* Callback for when a host is created.
* @param address the resolved address.
*/
void hostCreated(ResolvedAddress address);
void onHostCreated(ResolvedAddress address);

/**
* Callback for when a {@link Host} transitions from healthy to unhealthy.
* @param address the resolved address.
* @param cause the most recent cause of the transition.
*/
void hostMarkedUnhealthy(ResolvedAddress address, @Nullable Throwable cause);
void onHostMarkedUnhealthy(ResolvedAddress address, @Nullable Throwable cause);

/**
* Callback for when a {@link Host} transitions from unhealthy to healthy.
* @param address the resolved address.
*/
void hostRevived(ResolvedAddress address);
void onHostRevived(ResolvedAddress address);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ public HostObserver<ResolvedAddress> hostObserver() {
}

@Override
public void noHostsAvailable() {
public void onNoHostsAvailable() {
// noop
}

@Override
public void noActiveHostsAvailable(int hostSetSize, NoActiveHostException exn) {
public void onNoActiveHostsAvailable(int hostSetSize, NoActiveHostException exn) {
// noop
}

@Override
public void serviceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events,
int oldHostSetSize, int newHostSetSize) {
public void onServiceDiscoveryEvent(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events,
int oldHostSetSize, int newHostSetSize) {
// noop
}

Expand All @@ -58,37 +58,37 @@ private NoopHostObserver() {
}

@Override
public void hostMarkedExpired(ResolvedAddress resolvedAddress, int connectionCount) {
public void onHostMarkedExpired(ResolvedAddress resolvedAddress, int connectionCount) {
// noop
}

@Override
public void expiredHostRemoved(ResolvedAddress resolvedAddress) {
public void onExpiredHostRemoved(ResolvedAddress resolvedAddress) {
// noop
}

@Override
public void expiredHostRevived(ResolvedAddress resolvedAddress, int connectionCount) {
public void onExpiredHostRevived(ResolvedAddress resolvedAddress, int connectionCount) {
// noop
}

@Override
public void activeHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) {
public void onActiveHostRemoved(ResolvedAddress resolvedAddress, int connectionCount) {
// noop
}

@Override
public void hostCreated(ResolvedAddress resolvedAddress) {
public void onHostCreated(ResolvedAddress resolvedAddress) {
// noop
}

@Override
public void hostMarkedUnhealthy(ResolvedAddress address, Throwable cause) {
public void onHostMarkedUnhealthy(ResolvedAddress address, Throwable cause) {
// noop
}

@Override
public void hostRevived(ResolvedAddress address) {
public void onHostRevived(ResolvedAddress address) {
// noop
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,19 @@ private void buildHost() {
@Test
void hostCreatedEvents() {
buildHost();
verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS);
verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS);
// make another one, just for good measure.
new DefaultHost<>("lbDescription", "address2", connectionFactory, Integer.MAX_VALUE,
healthCheckConfig, mockHostObserver);
verify(mockHostObserver, times(1)).hostCreated("address2");
verify(mockHostObserver, times(1)).onHostCreated("address2");
}

@Test
void activeHostClosed() {
buildHost();
verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS);
verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS);
host.markClosed();
verify(mockHostObserver, times(1))
.activeHostRemoved(DEFAULT_ADDRESS,
0);
verify(mockHostObserver, times(1)).onActiveHostRemoved(DEFAULT_ADDRESS, 0);
// TODO: this is backwards and should assert true: our `markClosed` method doesn't trigger the host to close
// down. See the related comment in `DefaultHost.markClosed`.
assertThat(host.onClose().toFuture().isDone(), is(false));
Expand All @@ -100,40 +98,38 @@ void activeHostClosed() {
@Test
void activeHostExpires() {
buildHost();
verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS);
verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS);
host.markExpired();
verify(mockHostObserver, times(1))
.hostMarkedExpired(DEFAULT_ADDRESS, 0);
verify(mockHostObserver, times(1)).onHostMarkedExpired(DEFAULT_ADDRESS, 0);
assertThat(host.onClose().toFuture().isDone(), is(true));
}

@Test
void expiredHostClosesAfterLastConnectionClosed() throws Exception {
buildHost();
verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS);
verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS);
TestLoadBalancedConnection cxn = host.newConnection(any(), false, null).toFuture().get();
host.markExpired();
verify(mockHostObserver, times(1))
.hostMarkedExpired(DEFAULT_ADDRESS, 1);
verify(mockHostObserver, times(1)).onHostMarkedExpired(DEFAULT_ADDRESS, 1);
assertThat(host.onClose().toFuture().isDone(), is(false));
cxn.closeAsync().subscribe();
assertThat(host.onClose().toFuture().isDone(), is(true));
verify(mockHostObserver).expiredHostRemoved(DEFAULT_ADDRESS);
verify(mockHostObserver).onExpiredHostRemoved(DEFAULT_ADDRESS);
// shouldn't able to revive it.
assertThat(host.markActiveIfNotClosed(), is(false));
}

@Test
void expiredHostRevives() throws Exception {
buildHost();
verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS);
verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS);
host.newConnection(any(), false, null).toFuture().get();
host.markExpired();
verify(mockHostObserver, times(1))
.hostMarkedExpired(DEFAULT_ADDRESS, 1);
.onHostMarkedExpired(DEFAULT_ADDRESS, 1);
assertThat(host.onClose().toFuture().isDone(), is(false));
assertThat(host.markActiveIfNotClosed(), is(true));
verify(mockHostObserver).expiredHostRevived(DEFAULT_ADDRESS, 1);
verify(mockHostObserver).onExpiredHostRevived(DEFAULT_ADDRESS, 1);
}

@Test
Expand All @@ -150,20 +146,18 @@ void l4ConsecutiveFailuresAreDetected() throws Exception {
Duration.ofMillis(1),
Duration.ZERO);
buildHost();
verify(mockHostObserver, times(1)).hostCreated(DEFAULT_ADDRESS);
verify(mockHostObserver, times(1)).onHostCreated(DEFAULT_ADDRESS);
for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; i++) {
assertThrows(ExecutionException.class,
() -> host.newConnection(any(), false, null).toFuture().get());
}
verify(mockHostObserver, times(1))
.hostMarkedUnhealthy(DEFAULT_ADDRESS, UNHEALTHY_HOST_EXCEPTION);
verify(mockHostObserver, times(1)).onHostMarkedUnhealthy(DEFAULT_ADDRESS, UNHEALTHY_HOST_EXCEPTION);

// now revive and we should see the event and be able to get the connection.
unhealthyHostConnectionFactory.advanceTime(testExecutor);
assertThat(host.newConnection(any(), false, null).toFuture().get(),
is(testLoadBalancedConnection));
verify(mockHostObserver, times(1))
.hostRevived(DEFAULT_ADDRESS);
verify(mockHostObserver, times(1)).onHostRevived(DEFAULT_ADDRESS);
}

static <T> Predicate<T> any() {
Expand Down
Loading

0 comments on commit c6a61d4

Please sign in to comment.