Skip to content
6 changes: 4 additions & 2 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,17 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
result.maxConcurrentRequests(),
result.upstreamTlsContext(),
result.filterMetadata(),
result.outlierDetection());
result.outlierDetection(),
result.backendMetricPropagation());
} else {
instance = DiscoveryMechanism.forLogicalDns(
clusterName,
result.dnsHostName(),
result.lrsServerInfo(),
result.maxConcurrentRequests(),
result.upstreamTlsContext(),
result.filterMetadata());
result.filterMetadata(),
result.backendMetricPropagation());
}
gracefulConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME),
Expand Down
24 changes: 21 additions & 3 deletions xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.LoadStatsManager2.isEnabledOrcaLrsPropagation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -46,6 +47,7 @@
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.client.BackendMetricPropagation;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
Expand Down Expand Up @@ -149,6 +151,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
childLbHelper.updateFilterMetadata(config.filterMetadata);
childLbHelper.updateBackendMetricPropagation(config.backendMetricPropagation);

childSwitchLb.handleResolvedAddresses(
resolvedAddresses.toBuilder()
Expand Down Expand Up @@ -209,6 +212,8 @@ private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
private Map<String, Struct> filterMetadata = ImmutableMap.of();
@Nullable
private final ServerInfo lrsServerInfo;
@Nullable
private BackendMetricPropagation backendMetricPropagation;

private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
this.inFlights = checkNotNull(inFlights, "inFlights");
Expand Down Expand Up @@ -321,7 +326,7 @@ private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAt
(lrsServerInfo == null)
? null
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
edsServiceName, locality);
edsServiceName, locality, backendMetricPropagation);

return new ClusterLocality(localityStats, localityName);
}
Expand Down Expand Up @@ -371,6 +376,11 @@ private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
}

private void updateBackendMetricPropagation(
@Nullable BackendMetricPropagation backendMetricPropagation) {
this.backendMetricPropagation = backendMetricPropagation;
}

private class RequestLimitingSubchannelPicker extends SubchannelPicker {
private final SubchannelPicker delegate;
private final List<DropOverload> dropPolicies;
Expand Down Expand Up @@ -506,11 +516,19 @@ private OrcaPerRpcListener(ClusterLocalityStats stats) {
}

/**
* Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
* included in the snapshot for the LRS report sent to the LRS server.
* Copies ORCA metrics from {@link MetricReport} to {@link ClusterLocalityStats}
* such that they are included in the snapshot for the LRS report sent to the LRS server.
* This includes both top-level metrics (CPU, memory, application utilization) and named
* metrics, filtered according to the backend metric propagation configuration.
*/
@Override
public void onLoadReport(MetricReport report) {
if (isEnabledOrcaLrsPropagation) {
stats.recordTopLevelMetrics(
report.getCpuUtilization(),
report.getMemoryUtilization(),
report.getApplicationUtilization());
}
stats.recordBackendLoadMetricStats(report.getNamedMetrics());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.Status;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.client.BackendMetricPropagation;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -98,11 +99,14 @@ static final class ClusterImplConfig {
// Provides the direct child policy and its config.
final Object childConfig;
final Map<String, Struct> filterMetadata;
@Nullable
final BackendMetricPropagation backendMetricPropagation;

ClusterImplConfig(String cluster, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
List<DropOverload> dropCategories, Object childConfig,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
@Nullable BackendMetricPropagation backendMetricPropagation) {
this.cluster = checkNotNull(cluster, "cluster");
this.edsServiceName = edsServiceName;
this.lrsServerInfo = lrsServerInfo;
Expand All @@ -112,6 +116,7 @@ static final class ClusterImplConfig {
this.dropCategories = Collections.unmodifiableList(
new ArrayList<>(checkNotNull(dropCategories, "dropCategories")));
this.childConfig = checkNotNull(childConfig, "childConfig");
this.backendMetricPropagation = backendMetricPropagation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
// import io.grpc.xds.client.BackendMetricPropagation;]
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
Expand Down Expand Up @@ -336,7 +337,7 @@ private static Map<String, PriorityChildConfig> generatePriorityChildConfigs(
new ClusterImplConfig(
discovery.cluster, discovery.edsServiceName, discovery.lrsServerInfo,
discovery.maxConcurrentRequests, dropOverloads, endpointLbConfig,
discovery.tlsContext, discovery.filterMetadata);
discovery.tlsContext, discovery.filterMetadata, discovery.backendMetricPropagation);
LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
Object priorityChildPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.Status;
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.client.BackendMetricPropagation;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -152,6 +153,8 @@ static final class DiscoveryMechanism {
@Nullable
final OutlierDetection outlierDetection;
final Map<String, Struct> filterMetadata;
@Nullable
final BackendMetricPropagation backendMetricPropagation;

enum Type {
EDS,
Expand All @@ -161,7 +164,8 @@ enum Type {
private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName,
@Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) {
Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection,
@Nullable BackendMetricPropagation backendMetricPropagation) {
this.cluster = checkNotNull(cluster, "cluster");
this.type = checkNotNull(type, "type");
this.edsServiceName = edsServiceName;
Expand All @@ -171,27 +175,33 @@ private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServic
this.tlsContext = tlsContext;
this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata"));
this.outlierDetection = outlierDetection;
this.backendMetricPropagation = backendMetricPropagation;
}

static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
OutlierDetection outlierDetection) {
return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo,
maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection);
OutlierDetection outlierDetection,
@Nullable BackendMetricPropagation backendMetricPropagation) {
return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName,
null, lrsServerInfo, maxConcurrentRequests, tlsContext,
filterMetadata, outlierDetection, backendMetricPropagation);
}

static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) {
@Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata,
@Nullable BackendMetricPropagation backendMetricPropagation) {
return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName,
lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null,
backendMetricPropagation);
}

@Override
public int hashCode() {
return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext,
edsServiceName, dnsHostName, filterMetadata, outlierDetection);
edsServiceName, dnsHostName, filterMetadata,
outlierDetection, backendMetricPropagation);
}

@Override
Expand Down
33 changes: 26 additions & 7 deletions xds/src/main/java/io/grpc/xds/XdsClusterResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.Bootstrapper.ServerInfo;
import static io.grpc.xds.client.LoadStatsManager2.isEnabledOrcaLrsPropagation;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -47,6 +48,7 @@
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.client.BackendMetricPropagation;
import io.grpc.xds.client.XdsClient.ResourceUpdate;
import io.grpc.xds.client.XdsResourceType;
import io.grpc.xds.internal.security.CommonTlsContextUtil;
Expand Down Expand Up @@ -227,6 +229,12 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
UpstreamTlsContext upstreamTlsContext = null;
OutlierDetection outlierDetection = null;
boolean isHttp11ProxyAvailable = false;
BackendMetricPropagation backendMetricPropagation = null;

if (isEnabledOrcaLrsPropagation) {
backendMetricPropagation = BackendMetricPropagation.fromMetricSpecs(
cluster.getLrsReportEndpointMetricsList());
}
if (cluster.hasLrsServer()) {
if (!cluster.getLrsServer().hasSelf()) {
return StructOrError.fromError(
Expand Down Expand Up @@ -326,7 +334,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(

return StructOrError.fromStruct(CdsUpdate.forEds(
clusterName, edsServiceName, lrsServerInfo, maxConcurrentRequests, upstreamTlsContext,
outlierDetection, isHttp11ProxyAvailable));
outlierDetection, isHttp11ProxyAvailable, backendMetricPropagation));
} else if (type.equals(Cluster.DiscoveryType.LOGICAL_DNS)) {
if (!cluster.hasLoadAssignment()) {
return StructOrError.fromError(
Expand Down Expand Up @@ -362,7 +370,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
Locale.US, "%s:%d", socketAddress.getAddress(), socketAddress.getPortValue());
return StructOrError.fromStruct(CdsUpdate.forLogicalDns(
clusterName, dnsHostName, lrsServerInfo, maxConcurrentRequests,
upstreamTlsContext, isHttp11ProxyAvailable));
upstreamTlsContext, isHttp11ProxyAvailable, backendMetricPropagation));
}
return StructOrError.fromError(
"Cluster " + clusterName + ": unsupported built-in discovery type: " + type);
Expand Down Expand Up @@ -614,6 +622,9 @@ abstract static class CdsUpdate implements ResourceUpdate {

abstract ImmutableMap<String, Object> parsedMetadata();

@Nullable
abstract BackendMetricPropagation backendMetricPropagation();

private static Builder newBuilder(String clusterName) {
return new AutoValue_XdsClusterResource_CdsUpdate.Builder()
.clusterName(clusterName)
Expand All @@ -622,7 +633,8 @@ private static Builder newBuilder(String clusterName) {
.choiceCount(0)
.filterMetadata(ImmutableMap.of())
.parsedMetadata(ImmutableMap.of())
.isHttp11ProxyAvailable(false);
.isHttp11ProxyAvailable(false)
.backendMetricPropagation(null);
}

static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
Expand All @@ -636,29 +648,33 @@ static Builder forEds(String clusterName, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext upstreamTlsContext,
@Nullable OutlierDetection outlierDetection,
boolean isHttp11ProxyAvailable) {
boolean isHttp11ProxyAvailable,
BackendMetricPropagation backendMetricPropagation) {
return newBuilder(clusterName)
.clusterType(ClusterType.EDS)
.edsServiceName(edsServiceName)
.lrsServerInfo(lrsServerInfo)
.maxConcurrentRequests(maxConcurrentRequests)
.upstreamTlsContext(upstreamTlsContext)
.outlierDetection(outlierDetection)
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
.isHttp11ProxyAvailable(isHttp11ProxyAvailable)
.backendMetricPropagation(backendMetricPropagation);
}

static Builder forLogicalDns(String clusterName, String dnsHostName,
@Nullable ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext upstreamTlsContext,
boolean isHttp11ProxyAvailable) {
boolean isHttp11ProxyAvailable,
BackendMetricPropagation backendMetricPropagation) {
return newBuilder(clusterName)
.clusterType(ClusterType.LOGICAL_DNS)
.dnsHostName(dnsHostName)
.lrsServerInfo(lrsServerInfo)
.maxConcurrentRequests(maxConcurrentRequests)
.upstreamTlsContext(upstreamTlsContext)
.isHttp11ProxyAvailable(isHttp11ProxyAvailable);
.isHttp11ProxyAvailable(isHttp11ProxyAvailable)
.backendMetricPropagation(backendMetricPropagation);
}

enum ClusterType {
Expand Down Expand Up @@ -749,6 +765,9 @@ Builder leastRequestLbPolicy(Integer choiceCount) {

protected abstract Builder parsedMetadata(ImmutableMap<String, Object> parsedMetadata);

protected abstract Builder backendMetricPropagation(
BackendMetricPropagation backendMetricPropagation);

abstract CdsUpdate build();
}
}
Expand Down
Loading
Loading