Skip to content

Commit

Permalink
Implement zone aware load balancing for XdsEndpointGroup (#5808)
Browse files Browse the repository at this point in the history
Motivation:

This changeset attempts to implement envoy's [zone aware
routing](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/zone_aware).
A small sample on how this can be configured can be seen
[here](https://www.envoyproxy.io/docs/envoy/latest/faq/configuration/zone_aware_routing)
as well.

The basic algorithm in layman's terms is the following:
1) A client supplies a locality and a local cluster. The corresponding
cluster should also be added.
2) The percentage of hosts for each locality is computed for both
upstream and local clusters.
3) The percentage is compared and a new "weight" is computed to
determine which locality to route to when selecting an endpoint.

The code has been mostly adapted from envoy's implementation:

- Computing the locality routing state
-
https://github.com/envoyproxy/envoy/blob/ae1811aaf8736e2a4ca9fc0d045f6696f58e45e2/source/extensions/load_balancing_policies/common/load_balancer_impl.cc#L450-L542
- Routing the locality for each request
-
https://github.com/envoyproxy/envoy/blob/ae1811aaf8736e2a4ca9fc0d045f6696f58e45e2/source/extensions/load_balancing_policies/common/load_balancer_impl.cc#L772-L816

Modifications:

- (Breaking) Previously we had been accepting a `ListenerRoot` when
creating an `XdsEndpointGroup`. However, we need more information like
local locality from the bootstrap. To make this intention clear,
`XdsEndpointGroup` now accepts an `XdsBootstrap`. Additionally,
`XdsBootstrap#bootstrap` has been added to access the local cluster name
and locality.
- `ClusterManager` now maintains a `LocalCluster` data structure which
watches changes in the local cluster if exists. When a `ClusterEntry` is
created, the existing `LocalCluster` is passed.
- The upstream `ClusterEntry` needs to listen to the local
`ClusterEntry` for updates in order to pre-compute the state.
`ClusterEntry` is now an `AbstractListenable` and notifies the current
`LoadBalancer`.
- Following this change, the `PrioritySet` is required to compute the
locality state. Added a `LoadBalancer#prioritySet` method to retrieve
the current `PrioritySet`.
- `XdsLoadBalancer` has been added as an extra abstraction layer to
avoid conflicts with #5779
- `ClusterEntry` now has two update points: 1) when the endpoints are
updated 2) when the local cluster is updated. Now, the two entry points
call a single `tryRefresh` to refresh the load balancer.
- Added `LocalityRoutingStateFactory` and modified `DefaultLoadBalancer`
to support zone aware routing. The implementation should look very
similar to envoy's source.
- Testing was difficult due to multiple locations where random was used.
To make the tests deterministic, I've added a `XdsRandom` interface for
easier testing. Additionally, to guarantee that the local cluster was
fully loaded before proceeding with tests,
`ClusterEntry#initialLocalEntryStateFuture` has been added.
- In order to access `XdsEndpointGroup#clusterEntries`, the factory
methods for `XdsEndpointGroup` now return the concrete type.

Result:

- `XdsEndpointGroup` now supports zone-aware load balancing.
  • Loading branch information
jrhee17 authored Aug 8, 2024
1 parent fde4260 commit 966a1bc
Show file tree
Hide file tree
Showing 26 changed files with 1,401 additions and 214 deletions.
5 changes: 5 additions & 0 deletions xds/src/main/java/com/linecorp/armeria/xds/XdsBootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,9 @@ static XdsBootstrap of(Bootstrap bootstrap, EventExecutor eventLoop) {
* Returns the event loop used to notify events.
*/
EventExecutor eventLoop();

/**
* Returns the original {@link Bootstrap} this {@link XdsBootstrap} is based on.
*/
Bootstrap bootstrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.netty.util.concurrent.EventExecutor;

final class XdsBootstrapImpl implements XdsBootstrap {
private final Bootstrap bootstrap;
private final EventExecutor eventLoop;

private final Map<ConfigSource, ConfigSourceClient> clientMap = new HashMap<>();
Expand All @@ -57,6 +58,7 @@ final class XdsBootstrapImpl implements XdsBootstrap {
@VisibleForTesting
XdsBootstrapImpl(Bootstrap bootstrap, EventExecutor eventLoop,
Consumer<GrpcClientBuilder> configClientCustomizer) {
this.bootstrap = bootstrap;
this.eventLoop = requireNonNull(eventLoop, "eventLoop");
this.configClientCustomizer = configClientCustomizer;
configSourceMapper = new ConfigSourceMapper(bootstrap);
Expand Down Expand Up @@ -139,6 +141,11 @@ public EventExecutor eventLoop() {
return eventLoop;
}

@Override
public Bootstrap bootstrap() {
return bootstrap;
}

ConfigSourceMapper configSourceMapper() {
return configSourceMapper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,69 +31,117 @@
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.AbstractListenable;
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.xds.ClusterSnapshot;
import com.linecorp.armeria.xds.EndpointSnapshot;
import com.linecorp.armeria.xds.client.endpoint.ClusterManager.LocalCluster;
import com.linecorp.armeria.xds.client.endpoint.LocalityRoutingStateFactory.LocalityRoutingState;

import io.netty.util.concurrent.EventExecutor;

final class ClusterEntry implements AsyncCloseable {
final class ClusterEntry extends AbstractListenable<XdsLoadBalancer> implements AsyncCloseable {

private static final Logger logger = LoggerFactory.getLogger(ClusterEntry.class);
private final Consumer<XdsLoadBalancer> localClusterEntryListener = this::updateLocalLoadBalancer;

@Nullable
private volatile XdsLoadBalancer loadBalancer;
@Nullable
private XdsLoadBalancer localLoadBalancer;
@Nullable
private EndpointsState endpointsState;
private final EndpointsPool endpointsPool;
@Nullable
private volatile LoadBalancer loadBalancer;
private final ClusterManager clusterManager;
private final LocalCluster localCluster;
private final EventExecutor eventExecutor;
private List<Endpoint> endpoints = ImmutableList.of();
private boolean closing;

ClusterEntry(ClusterSnapshot clusterSnapshot,
ClusterManager clusterManager, EventExecutor eventExecutor) {
this.clusterManager = clusterManager;
ClusterEntry(EventExecutor eventExecutor, @Nullable LocalCluster localCluster) {
this.eventExecutor = eventExecutor;
endpointsPool = new EndpointsPool(eventExecutor);
updateClusterSnapshot(clusterSnapshot);
this.localCluster = localCluster;
if (localCluster != null) {
localCluster.clusterEntry().addListener(localClusterEntryListener, true);
}
}

@Nullable
Endpoint selectNow(ClientRequestContext ctx) {
final LoadBalancer loadBalancer = this.loadBalancer;
final LoadBalancer loadBalancer = latestValue();
if (loadBalancer == null) {
return null;
}
return loadBalancer.selectNow(ctx);
}

void updateClusterSnapshot(ClusterSnapshot clusterSnapshot) {
final EndpointSnapshot endpointSnapshot = clusterSnapshot.endpointSnapshot();
assert endpointSnapshot != null;
endpointsPool.updateClusterSnapshot(clusterSnapshot, endpoints -> {
accept(clusterSnapshot, endpoints);
});
endpointsPool.updateClusterSnapshot(clusterSnapshot, this::updateEndpoints);
}

void updateEndpoints(EndpointsState endpointsState) {
assert eventExecutor.inEventLoop();
this.endpointsState = endpointsState;
tryRefresh();
}

void accept(ClusterSnapshot clusterSnapshot, List<Endpoint> endpoints) {
private void updateLocalLoadBalancer(XdsLoadBalancer localLoadBalancer) {
assert eventExecutor.inEventLoop();
this.endpoints = ImmutableList.copyOf(endpoints);
this.localLoadBalancer = localLoadBalancer;
tryRefresh();
}

void tryRefresh() {
if (closing) {
return;
}
final EndpointsState endpointsState = this.endpointsState;
if (endpointsState == null) {
return;
}

final ClusterSnapshot clusterSnapshot = endpointsState.clusterSnapshot;
final List<Endpoint> endpoints = endpointsState.endpoints;

final PrioritySet prioritySet = new PriorityStateManager(clusterSnapshot, endpoints).build();
if (logger.isTraceEnabled()) {
logger.trace("XdsEndpointGroup is using a new PrioritySet({})", prioritySet);
}
LoadBalancer loadBalancer = new DefaultLoadBalancer(prioritySet);

LocalityRoutingState localityRoutingState = null;
if (localLoadBalancer != null) {
assert localCluster != null;
localityRoutingState = localCluster.stateFactory().create(prioritySet,
localLoadBalancer.prioritySet());
logger.trace("Local routing is enabled with LocalityRoutingState({})", localityRoutingState);
}
XdsLoadBalancer loadBalancer = new DefaultLoadBalancer(prioritySet, localityRoutingState);
if (clusterSnapshot.xdsResource().resource().hasLbSubsetConfig()) {
loadBalancer = new SubsetLoadBalancer(prioritySet, loadBalancer);
}
this.loadBalancer = loadBalancer;
clusterManager.notifyListeners();
notifyListeners(loadBalancer);
}

@Override
@Nullable
protected XdsLoadBalancer latestValue() {
return loadBalancer;
}

List<Endpoint> allEndpoints() {
return endpoints;
final EndpointsState endpointsState = this.endpointsState;
if (endpointsState == null) {
return ImmutableList.of();
}
return endpointsState.endpoints;
}

@Override
public CompletableFuture<?> closeAsync() {
closing = true;
if (localCluster != null) {
localCluster.clusterEntry().removeListener(localClusterEntryListener);
}
return endpointsPool.closeAsync();
}

Expand All @@ -106,8 +155,26 @@ public String toString() {
return MoreObjects.toStringHelper(this)
.add("endpointsPool", endpointsPool)
.add("loadBalancer", loadBalancer)
.add("numEndpoints", endpoints.size())
.add("endpoints", truncate(endpoints, 10))
.add("endpointsState", endpointsState)
.toString();
}

static final class EndpointsState {
private final ClusterSnapshot clusterSnapshot;
private final List<Endpoint> endpoints;

EndpointsState(ClusterSnapshot clusterSnapshot, List<Endpoint> endpoints) {
this.clusterSnapshot = clusterSnapshot;
this.endpoints = ImmutableList.copyOf(endpoints);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("clusterSnapshot", clusterSnapshot)
.add("numEndpoints", endpoints.size())
.add("endpoints", truncate(endpoints, 10))
.toString();
}
}
}
Loading

0 comments on commit 966a1bc

Please sign in to comment.