Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer-experimental: Rename HealthChecker to OutlierDetector #2860

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.servicetalk.http.netty.HttpClients;
import io.servicetalk.loadbalancer.LoadBalancers;
import io.servicetalk.loadbalancer.OutlierDetectorConfig;
import io.servicetalk.loadbalancer.XdsHealthCheckerFactory;
import io.servicetalk.loadbalancer.XdsOutlierDetectorFactory;
import io.servicetalk.transport.api.HostAndPort;

import java.net.InetSocketAddress;
Expand All @@ -49,7 +49,7 @@ private static LoadBalancerFactory<InetSocketAddress, FilterableStreamingHttpLoa
String id) {
return LoadBalancers.<InetSocketAddress, FilterableStreamingHttpLoadBalancedConnection>
builder(id)
.healthCheckerFactory(new XdsHealthCheckerFactory<>(
.outlierDetectorFactory(new XdsOutlierDetectorFactory<>(
new OutlierDetectorConfig.Builder().build()
)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ interface ConnectTracker {
long beforeConnectStart();

/**
* Callback to notify the parent {@link HealthChecker} that an attempt to connect to this host has succeeded.
* Callback to notify the parent {@link OutlierDetector} that an attempt to connect to this host has succeeded.
* @param beforeConnectStart the time that the connection attempt was initiated.
*/
void onConnectSuccess(long beforeConnectStart);

/**
* Callback to notify the parent {@link HealthChecker} that an attempt to connect to this host has failed.
* Callback to notify the parent {@link OutlierDetector} that an attempt to connect to this host has failed.
* @param beforeConnectStart the time that the connection attempt was initiated.
*/
void onConnectError(long beforeConnectStart);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private enum State {
@Nullable
private final HealthCheckConfig healthCheckConfig;
@Nullable
private final HealthIndicator healthIndicator;
private final HealthIndicator<Addr, C> healthIndicator;
private final LoadBalancerObserver.HostObserver hostObserver;
private final ConnectionFactory<Addr, ? extends C> connectionFactory;
private final int linearSearchSpace;
Expand All @@ -111,7 +111,8 @@ private enum State {
DefaultHost(final String lbDescription, final Addr address,
final ConnectionFactory<Addr, ? extends C> connectionFactory,
final int linearSearchSpace, final HostObserver hostObserver,
final @Nullable HealthCheckConfig healthCheckConfig, final @Nullable HealthIndicator healthIndicator) {
final @Nullable HealthCheckConfig healthCheckConfig,
final @Nullable HealthIndicator<Addr, C> healthIndicator) {
this.lbDescription = requireNonNull(lbDescription, "lbDescription");
this.address = requireNonNull(address, "address");
this.linearSearchSpace = linearSearchSpace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
@Nullable
private final HealthCheckConfig healthCheckConfig;
@Nullable
private final HealthChecker<ResolvedAddress> healthChecker;
private final OutlierDetector<ResolvedAddress, C> healthChecker;
private final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
private final ListenableAsyncCloseable asyncCloseable;

Expand All @@ -133,7 +133,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final int linearSearchSpace,
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver,
@Nullable final HealthCheckConfig healthCheckConfig,
@Nullable final Function<String, HealthChecker<ResolvedAddress>> healthCheckerFactory) {
@Nullable final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
Expand All @@ -149,7 +149,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
this.asyncCloseable = toAsyncCloseable(this::doClose);
// Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal.
eventStream.ignoreElements().subscribe();
this.healthChecker = healthCheckerFactory == null ? null : healthCheckerFactory.apply(lbDescription);
this.healthChecker = outlierDetectorFactory == null ? null : outlierDetectorFactory.apply(lbDescription);
// We subscribe to events as the very last step so that if we subscribe to an eager service discoverer
// we already have all the fields initialized.
subscribeToEvents(false);
Expand Down Expand Up @@ -391,6 +391,9 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
null : healthChecker.newHealthIndicator(addr, hostObserver);
final Host<ResolvedAddress, C> host = new DefaultHost<>(lbDescription, addr, connectionFactory,
linearSearchSpace, hostObserver, healthCheckConfig, indicator);
if (indicator != null) {
indicator.setHost(host);
}
host.onClose().afterFinally(() ->
sequentialExecutor.execute(() -> {
final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedCo
@Nullable
private LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
@Nullable
private HealthCheckerFactory<ResolvedAddress> healthCheckerFactory;
private OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
private Duration healthCheckJitter = DEFAULT_HEALTH_CHECK_JITTER;
private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
Expand Down Expand Up @@ -85,9 +85,9 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckerFactory(
HealthCheckerFactory<ResolvedAddress> healthCheckerFactory) {
this.healthCheckerFactory = healthCheckerFactory;
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory) {
this.outlierDetectorFactory = outlierDetectorFactory;
return this;
}

Expand Down Expand Up @@ -136,13 +136,13 @@ public LoadBalancerFactory<ResolvedAddress, C> build() {
}
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver = this.loadBalancerObserver != null ?
this.loadBalancerObserver : NoopLoadBalancerObserver.instance();
Function<String, HealthChecker<ResolvedAddress>> healthCheckerSupplier;
if (healthCheckerFactory == null) {
Function<String, OutlierDetector<ResolvedAddress, C>> healthCheckerSupplier;
if (outlierDetectorFactory == null) {
healthCheckerSupplier = null;
} else {
final Executor executor = getExecutor();
healthCheckerSupplier = (String lbDescrption) ->
healthCheckerFactory.newHealthChecker(executor, lbDescrption);
outlierDetectorFactory.newHealthChecker(executor, lbDescrption);
}

return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, linearSearchSpace, healthCheckConfig,
Expand All @@ -157,29 +157,38 @@ private static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends
private final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver;
private final int linearSearchSpace;
@Nullable
private final Function<String, HealthChecker<ResolvedAddress>> healthCheckerFactory;
private final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory;
@Nullable
private final HealthCheckConfig healthCheckConfig;

DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
final int linearSearchSpace, final HealthCheckConfig healthCheckConfig,
final LoadBalancerObserver<ResolvedAddress> loadBalancerObserver,
final Function<String, HealthChecker<ResolvedAddress>> healthCheckerFactory) {
final Function<String, OutlierDetector<ResolvedAddress, C>> outlierDetectorFactory) {
this.id = requireNonNull(id, "id");
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
this.loadBalancerObserver = requireNonNull(loadBalancerObserver, "loadBalancerObserver");
this.linearSearchSpace = linearSearchSpace;
this.healthCheckConfig = healthCheckConfig;
this.healthCheckerFactory = healthCheckerFactory;
this.outlierDetectorFactory = outlierDetectorFactory;
}

@Override
public <T extends C> LoadBalancer<T> newLoadBalancer(String targetResource,
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<ResolvedAddress, T>(id, targetResource, eventPublisher,
// TODO: this is a deprecated API that should be removed in 0.43
throw new AssertionError("Generic factory is not implemented.");
}

@Override
public LoadBalancer<C> newLoadBalancer(
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, C> connectionFactory,
String targetResource) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
loadBalancingPolicy.buildSelector(Collections.emptyList(), targetResource), connectionFactory,
linearSearchSpace, loadBalancerObserver, healthCheckConfig, healthCheckerFactory);
linearSearchSpace, loadBalancerObserver, healthCheckConfig, outlierDetectorFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> healthCheckerFactory(
HealthCheckerFactory<ResolvedAddress> healthCheckerFactory) {
delegate = delegate.healthCheckerFactory(healthCheckerFactory);
public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory) {
delegate = delegate.outlierDetectorFactory(outlierDetectorFactory);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,32 @@
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;

import static io.servicetalk.loadbalancer.OutlierDetectorConfig.enforcing;

final class FailurePercentageXdsOutlierDetector implements XdsOutlierDetector {
final class FailurePercentageXdsOutlierDetectorAlgorithm<ResolvedAddress, C extends LoadBalancedConnection>
implements XdsOutlierDetectorAlgorithm<ResolvedAddress, C> {

private static final Logger LOGGER = LoggerFactory.getLogger(FailurePercentageXdsOutlierDetector.class);
private static final Logger LOGGER = LoggerFactory.getLogger(FailurePercentageXdsOutlierDetectorAlgorithm.class);

// We use a sentinel value to mark values as 'skipped' so we don't need to create a dynamically sized
// data structure for doubles which would require boxing.
private static final long NOT_EVALUATED = Long.MAX_VALUE;

public static final XdsOutlierDetector INSTANCE = new FailurePercentageXdsOutlierDetector();

private FailurePercentageXdsOutlierDetector() {
}

@Override
public void detectOutliers(OutlierDetectorConfig config, Collection<XdsHealthIndicator> indicators) {
public void detectOutliers(final OutlierDetectorConfig config,
final Collection<XdsHealthIndicator<ResolvedAddress, C>> indicators) {
final long[] failurePercentages = new long[indicators.size()];
int i = 0;
int enoughVolumeHosts = 0;
int alreadyEjectedHosts = 0;
for (XdsHealthIndicator indicator : indicators) {
for (XdsHealthIndicator<?, ?> indicator : indicators) {
if (!indicator.isHealthy()) {
failurePercentages[i] = NOT_EVALUATED;
alreadyEjectedHosts++;
Expand Down Expand Up @@ -71,7 +70,7 @@ public void detectOutliers(OutlierDetectorConfig config, Collection<XdsHealthInd
final double failurePercentageThreshold = config.failurePercentageThreshold();
int ejectedCount = 0;
i = 0;
for (XdsHealthIndicator indicator : indicators) {
for (XdsHealthIndicator<?, ?> indicator : indicators) {
long failurePercentage = failurePercentages[i++];
if (indicator.updateOutlierStatus(config, failurePercentage == NOT_EVALUATED ||
failurePercentage >= failurePercentageThreshold &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,31 @@
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.ScoreSupplier;
import io.servicetalk.concurrent.Cancellable;

/**
* An abstraction used by a {@link Host} to interact with the {@link HealthChecker} currently monitoring
* An abstraction used by a {@link Host} to interact with the {@link OutlierDetector} currently monitoring
* the host.
* <p>
* This abstraction serves as a sort of two-way channel between a host and the health check system: the
* health check system can give the host information about it's perceived health and the host can give the
* health check system information about request results.
*/
interface HealthIndicator extends RequestTracker, ConnectTracker, ScoreSupplier, Cancellable {
interface HealthIndicator<ResolvedAddress, C extends LoadBalancedConnection> extends
RequestTracker, ConnectTracker, ScoreSupplier, Cancellable {

/**
* Whether the host is considered healthy by the HealthIndicator.
*
* @return true if the HealthIndicator considers the host healthy, false otherwise.
*/
boolean isHealthy();

/**
* Set the {@link Host} associated with this health indicator.
* @param host which will be associated with this health indicator.
*/
void setHost(Host<ResolvedAddress, C> host);
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
@Nullable LoadBalancerObserver<ResolvedAddress> loadBalancerObserver);

/**
* Set the {@link HealthCheckerFactory} to use with this load balancer.
* @param healthCheckerFactory the {@link HealthCheckerFactory} to use, or {@code null} to not use a
* {@link HealthChecker}.
* Set the {@link OutlierDetectorFactory} to use with this load balancer.
* @param outlierDetectorFactory the {@link OutlierDetectorFactory} to use, or {@code null} to not use a
* {@link OutlierDetector}.
* @return {code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> healthCheckerFactory(
HealthCheckerFactory<ResolvedAddress> healthCheckerFactory);
LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorFactory(
OutlierDetectorFactory<ResolvedAddress, C> outlierDetectorFactory);

/**
* This {@link LoadBalancer} may monitor hosts to which connection establishment has failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.loadbalancer.LoadBalancerObserver.HostObserver;

/**
* The representation of a health checking system for use with load balancing.
*/
interface HealthChecker<ResolvedAddress> extends Cancellable {
interface OutlierDetector<ResolvedAddress, C extends LoadBalancedConnection> extends Cancellable {

/**
* Construct a new {@link HealthIndicator}.
Expand All @@ -30,5 +32,5 @@ interface HealthChecker<ResolvedAddress> extends Cancellable {
* @param address the resolved address of the destination.
* @return new {@link HealthIndicator}.
*/
HealthIndicator newHealthIndicator(ResolvedAddress address, LoadBalancerObserver.HostObserver hostObserver);
HealthIndicator<ResolvedAddress, C> newHealthIndicator(ResolvedAddress address, HostObserver hostObserver);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.Executor;

/**
* A factory of {@link HealthChecker} instances. The factory will be used by load balancer
* A factory of {@link OutlierDetector} instances. The factory will be used by load balancer
* builders and may make more than one health checker per-load balancer.
* @param <ResolvedAddress> the type of the resolved address.
* @param <C> the type of the load balanced connection.
*/
public interface HealthCheckerFactory<ResolvedAddress> {
public interface OutlierDetectorFactory<ResolvedAddress, C extends LoadBalancedConnection> {
/**
* Create a new {@link HealthChecker}.
* Create a new {@link OutlierDetector}.
* @param executor the {@link Executor} to use for scheduling tasks and obtaining the current time.
* @param lbDescription a description of the load balancer for logging purposes.
* @return a new {@link HealthChecker}.
* @return a new {@link OutlierDetector}.
*/
HealthChecker<ResolvedAddress> newHealthChecker(Executor executor, String lbDescription);
OutlierDetector<ResolvedAddress, C> newHealthChecker(Executor executor, String lbDescription);
}
Loading
Loading