Skip to content

Commit

Permalink
loadbalancer: Simplify ConnectionFactory usage in DefaultHost (#2796)
Browse files Browse the repository at this point in the history
Motivation:

We currently use the ConnectionFactory in two places in DefaultHost:
- creating new connections
- health checking
Each of them has to go through the trouble of trying to add the
newly created connection to the connection pool and making
sure the state remains coherent.

Modifications:

DefaultHosts health checking feature now uses the `Host.newConnection`
method instead of using the factory directly. This lets it
get the connection management benefit already in
`DefaultHost.newConnection`. It also makes the path to extracting
the health check a bit easier to follow.

Result:

What is the result of this change?
  • Loading branch information
bryce-anderson authored Jan 11, 2024
1 parent 8a8e3ab commit aac806c
Showing 1 changed file with 6 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public Single<C> newConnection(
}
return newCnx.closeAsync().<C>concat(
failed(Exceptions.StacklessConnectionRejectedException.newInstance(
"Failed to add newly created connection " + newCnx + " for " + toString(),
"Failed to add newly created connection " + newCnx + " for " + this,
RoundRobinLoadBalancer.class, "selectConnection0(...)")))
.shareContextOnSubscribe();
});
Expand Down Expand Up @@ -489,7 +489,7 @@ public void schedule(final Throwable originalCause) {
.apply(0, originalCause)
// Remove any state from async context
.beforeOnSubscribe(__ -> AsyncContext.clear())
.concat(connectionFactory.newConnection(address, null, null)
.concat(newConnection(cxn -> true, false, null)
// There is no risk for StackOverflowError because result of each connection
// attempt will be invoked on IoExecutor as a new task.
.retryWhen(retryWithConstantBackoffDeltaJitter(
Expand All @@ -502,19 +502,10 @@ public void schedule(final Throwable originalCause) {
healthCheckConfig.jitter,
healthCheckConfig.executor)))
.flatMapCompletable(newCnx -> {
if (addConnection(newCnx, this)) {
LOGGER.info("{}: health check passed for {}, marked this " +
"host as ACTIVE for the selection algorithm.",
lbDescription, DefaultHost.this);
return completed();
} else {
// This happens only if the host is closed, no need to mark as healthy.
assert connState.state == State.CLOSED;
LOGGER.debug("{}: health check passed for {}, but the " +
"host rejected a new connection {}. Closing it now.",
lbDescription, DefaultHost.this, newCnx);
return newCnx.closeAsync();
}
LOGGER.info("{}: health check passed for {}, marked this " +
"host as ACTIVE for the selection algorithm.",
lbDescription, DefaultHost.this);
return completed();
})
// Use onErrorComplete instead of whenOnError to avoid double logging of an error inside
// subscribe(): SimpleCompletableSubscriber.
Expand Down

0 comments on commit aac806c

Please sign in to comment.