diff --git a/api/src/main/java/io/grpc/NameResolver.java b/api/src/main/java/io/grpc/NameResolver.java index a1dea016377..b35601289a3 100644 --- a/api/src/main/java/io/grpc/NameResolver.java +++ b/api/src/main/java/io/grpc/NameResolver.java @@ -290,6 +290,7 @@ public static final class Args { @Nullable private final ChannelLogger channelLogger; @Nullable private final Executor executor; @Nullable private final String overrideAuthority; + @Nullable private final MetricRecorder metricRecorder; private Args( Integer defaultPort, @@ -299,7 +300,8 @@ private Args( @Nullable ScheduledExecutorService scheduledExecutorService, @Nullable ChannelLogger channelLogger, @Nullable Executor executor, - @Nullable String overrideAuthority) { + @Nullable String overrideAuthority, + @Nullable MetricRecorder metricRecorder) { this.defaultPort = checkNotNull(defaultPort, "defaultPort not set"); this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set"); this.syncContext = checkNotNull(syncContext, "syncContext not set"); @@ -308,6 +310,7 @@ private Args( this.channelLogger = channelLogger; this.executor = executor; this.overrideAuthority = overrideAuthority; + this.metricRecorder = metricRecorder; } /** @@ -405,6 +408,14 @@ public String getOverrideAuthority() { return overrideAuthority; } + /** + * Returns the {@link MetricRecorder} that the channel uses to record metrics. + */ + @Nullable + public MetricRecorder getMetricRecorder() { + return metricRecorder; + } + @Override public String toString() { @@ -417,6 +428,7 @@ public String toString() { .add("channelLogger", channelLogger) .add("executor", executor) .add("overrideAuthority", overrideAuthority) + .add("metricRecorder", metricRecorder) .toString(); } @@ -435,6 +447,7 @@ public Builder toBuilder() { builder.setChannelLogger(channelLogger); builder.setOffloadExecutor(executor); builder.setOverrideAuthority(overrideAuthority); + builder.setMetricRecorder(metricRecorder); return builder; } @@ -461,6 +474,7 @@ public static final class Builder { private ChannelLogger channelLogger; private Executor executor; private String overrideAuthority; + private MetricRecorder metricRecorder; Builder() { } @@ -547,6 +561,14 @@ public Builder setOverrideAuthority(String authority) { return this; } + /** + * See {@link Args#getMetricRecorder()}. This is an optional field. + */ + public Builder setMetricRecorder(MetricRecorder metricRecorder) { + this.metricRecorder = metricRecorder; + return this; + } + /** * Builds an {@link Args}. * @@ -556,7 +578,8 @@ public Args build() { return new Args( defaultPort, proxyDetector, syncContext, serviceConfigParser, - scheduledExecutorService, channelLogger, executor, overrideAuthority); + scheduledExecutorService, channelLogger, executor, overrideAuthority, + metricRecorder); } } } diff --git a/api/src/test/java/io/grpc/NameResolverTest.java b/api/src/test/java/io/grpc/NameResolverTest.java index 1bc32ee7b1d..1a7c59f8df5 100644 --- a/api/src/test/java/io/grpc/NameResolverTest.java +++ b/api/src/test/java/io/grpc/NameResolverTest.java @@ -64,6 +64,7 @@ public class NameResolverTest { private final ChannelLogger channelLogger = mock(ChannelLogger.class); private final Executor executor = Executors.newSingleThreadExecutor(); private final String overrideAuthority = "grpc.io"; + private final MetricRecorder metricRecorder = new MetricRecorder() {}; @Mock NameResolver.Listener mockListener; @Test @@ -77,6 +78,7 @@ public void args() { assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor); assertThat(args.getOverrideAuthority()).isSameInstanceAs(overrideAuthority); + assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder); NameResolver.Args args2 = args.toBuilder().build(); assertThat(args2.getDefaultPort()).isEqualTo(defaultPort); @@ -87,6 +89,7 @@ public void args() { assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor); assertThat(args2.getOverrideAuthority()).isSameInstanceAs(overrideAuthority); + assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder); assertThat(args2).isNotSameInstanceAs(args); assertThat(args2).isNotEqualTo(args); @@ -102,6 +105,7 @@ private NameResolver.Args createArgs() { .setChannelLogger(channelLogger) .setOffloadExecutor(executor) .setOverrideAuthority(overrideAuthority) + .setMetricRecorder(metricRecorder) .build(); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index b89a126517f..88fbf5e2c62 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -589,6 +589,8 @@ ClientStream newSubstream( builder.maxHedgedAttempts, loadBalancerFactory); this.authorityOverride = builder.authorityOverride; + this.metricRecorder = new MetricRecorderImpl(builder.metricSinks, + MetricInstrumentRegistry.getDefaultRegistry()); this.nameResolverArgs = NameResolver.Args.newBuilder() .setDefaultPort(builder.getDefaultPort()) @@ -599,6 +601,7 @@ ClientStream newSubstream( .setChannelLogger(channelLogger) .setOffloadExecutor(this.offloadExecutorHolder) .setOverrideAuthority(this.authorityOverride) + .setMetricRecorder(this.metricRecorder) .build(); this.nameResolver = getNameResolver( targetUri, authorityOverride, nameResolverProvider, nameResolverArgs); @@ -671,8 +674,6 @@ public CallTracer create() { } serviceConfigUpdated = true; } - this.metricRecorder = new MetricRecorderImpl(builder.metricSinks, - MetricInstrumentRegistry.getDefaultRegistry()); } @VisibleForTesting diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 16700096827..535086716bd 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -687,6 +687,30 @@ public void metricRecorder_recordsToMetricSink() { eq(optionalLabelValues)); } + @Test + public void metricRecorder_fromNameResolverArgs_recordsToMetricSink() { + MetricSink mockSink1 = mock(MetricSink.class); + MetricSink mockSink2 = mock(MetricSink.class); + channelBuilder.addMetricSink(mockSink1); + channelBuilder.addMetricSink(mockSink2); + createChannel(); + + LongCounterMetricInstrument counter = metricInstrumentRegistry.registerLongCounter( + "test_counter", "Time taken by metric recorder", "s", + ImmutableList.of("grpc.method"), Collections.emptyList(), false); + List requiredLabelValues = ImmutableList.of("testMethod"); + List optionalLabelValues = Collections.emptyList(); + + NameResolver.Args args = helper.getNameResolverArgs(); + assertThat(args.getMetricRecorder()).isNotNull(); + args.getMetricRecorder() + .addLongCounter(counter, 10, requiredLabelValues, optionalLabelValues); + verify(mockSink1).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues), + eq(optionalLabelValues)); + verify(mockSink2).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues), + eq(optionalLabelValues)); + } + @Test public void shutdownWithNoTransportsEverCreated() { channelBuilder.nameResolverFactory( @@ -2240,6 +2264,7 @@ public void lbHelper_getNameResolverArgs() { assertThat(args.getSynchronizationContext()) .isSameInstanceAs(helper.getSynchronizationContext()); assertThat(args.getServiceConfigParser()).isNotNull(); + assertThat(args.getMetricRecorder()).isNotNull(); } @Test diff --git a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java index 0073cce1a88..85b59fabfa0 100644 --- a/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java @@ -17,6 +17,7 @@ package io.grpc.xds; import io.grpc.Internal; +import io.grpc.MetricRecorder; import io.grpc.internal.ObjectPool; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsInitializationException; @@ -36,6 +37,11 @@ public static void setDefaultProviderBootstrapOverride(Map bootstrap) public static ObjectPool getOrCreate(String target) throws XdsInitializationException { - return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target); + return getOrCreate(target, new MetricRecorder() {}); + } + + public static ObjectPool getOrCreate(String target, MetricRecorder metricRecorder) + throws XdsInitializationException { + return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target, metricRecorder); } } diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index c9195896d82..779349744ff 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import io.grpc.MetricRecorder; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ObjectPool; @@ -51,6 +52,8 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory { private static final boolean LOG_XDS_NODE_ID = Boolean.parseBoolean( System.getenv("GRPC_LOG_XDS_NODE_ID")); private static final Logger log = Logger.getLogger(XdsClientImpl.class.getName()); + private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER = + new ExponentialBackoffPolicy.Provider(); private final Bootstrapper bootstrapper; private final Object lock = new Object(); @@ -82,7 +85,8 @@ public ObjectPool get(String target) { } @Override - public ObjectPool getOrCreate(String target) throws XdsInitializationException { + public ObjectPool getOrCreate(String target, MetricRecorder metricRecorder) + throws XdsInitializationException { ObjectPool ref = targetToXdsClientMap.get(target); if (ref == null) { synchronized (lock) { @@ -98,7 +102,7 @@ public ObjectPool getOrCreate(String target) throws XdsInitialization if (bootstrapInfo.servers().isEmpty()) { throw new XdsInitializationException("No xDS server provided"); } - ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target); + ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder); targetToXdsClientMap.put(target, ref); } } @@ -111,19 +115,17 @@ public ImmutableList getTargets() { return ImmutableList.copyOf(targetToXdsClientMap.keySet()); } - private static class SharedXdsClientPoolProviderHolder { private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider(); } @ThreadSafe @VisibleForTesting - static class RefCountedXdsClientObjectPool implements ObjectPool { + class RefCountedXdsClientObjectPool implements ObjectPool { - private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER = - new ExponentialBackoffPolicy.Provider(); private final BootstrapInfo bootstrapInfo; private final String target; // The target associated with the xDS client. + private final MetricRecorder metricRecorder; private final Object lock = new Object(); @GuardedBy("lock") private ScheduledExecutorService scheduler; @@ -131,11 +133,15 @@ static class RefCountedXdsClientObjectPool implements ObjectPool { private XdsClient xdsClient; @GuardedBy("lock") private int refCount; + @GuardedBy("lock") + private XdsClientMetricReporterImpl metricReporter; @VisibleForTesting - RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) { + RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target, + MetricRecorder metricRecorder) { this.bootstrapInfo = checkNotNull(bootstrapInfo); this.target = target; + this.metricRecorder = metricRecorder; } @Override @@ -146,6 +152,7 @@ public XdsClient getObject() { log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId()); } scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); + metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target); xdsClient = new XdsClientImpl( DEFAULT_XDS_TRANSPORT_FACTORY, bootstrapInfo, @@ -154,7 +161,9 @@ public XdsClient getObject() { GrpcUtil.STOPWATCH_SUPPLIER, TimeProvider.SYSTEM_TIME_PROVIDER, MessagePrinter.INSTANCE, - new TlsContextManagerImpl(bootstrapInfo)); + new TlsContextManagerImpl(bootstrapInfo), + metricReporter); + metricReporter.setXdsClient(xdsClient); } refCount++; return xdsClient; @@ -168,6 +177,9 @@ public XdsClient returnObject(Object object) { if (refCount == 0) { xdsClient.shutdown(); xdsClient = null; + metricReporter.close(); + metricReporter = null; + targetToXdsClientMap.remove(target); scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler); } return null; diff --git a/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java new file mode 100644 index 00000000000..fa88237a7ea --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java @@ -0,0 +1,215 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.LongCounterMetricInstrument; +import io.grpc.LongGaugeMetricInstrument; +import io.grpc.MetricInstrumentRegistry; +import io.grpc.MetricRecorder; +import io.grpc.MetricRecorder.BatchCallback; +import io.grpc.MetricRecorder.BatchRecorder; +import io.grpc.MetricRecorder.Registration; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClient.ResourceMetadata; +import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus; +import io.grpc.xds.client.XdsClient.ServerConnectionCallback; +import io.grpc.xds.client.XdsClientMetricReporter; +import io.grpc.xds.client.XdsResourceType; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * XdsClientMetricReporter implementation. + */ +final class XdsClientMetricReporterImpl implements XdsClientMetricReporter { + + private static final Logger logger = Logger.getLogger( + XdsClientMetricReporterImpl.class.getName()); + private static final LongCounterMetricInstrument SERVER_FAILURE_COUNTER; + private static final LongCounterMetricInstrument RESOURCE_UPDATES_VALID_COUNTER; + private static final LongCounterMetricInstrument RESOURCE_UPDATES_INVALID_COUNTER; + private static final LongGaugeMetricInstrument CONNECTED_GAUGE; + private static final LongGaugeMetricInstrument RESOURCES_GAUGE; + + private final MetricRecorder metricRecorder; + private final String target; + @Nullable + private Registration gaugeRegistration = null; + + static { + MetricInstrumentRegistry metricInstrumentRegistry + = MetricInstrumentRegistry.getDefaultRegistry(); + SERVER_FAILURE_COUNTER = metricInstrumentRegistry.registerLongCounter( + "grpc.xds_client.server_failure", + "EXPERIMENTAL. A counter of xDS servers going from healthy to unhealthy. A server goes" + + " unhealthy when we have a connectivity failure or when the ADS stream fails without" + + " seeing a response message, as per gRFC A57.", "{failure}", + Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false); + RESOURCE_UPDATES_VALID_COUNTER = metricInstrumentRegistry.registerLongCounter( + "grpc.xds_client.resource_updates_valid", + "EXPERIMENTAL. A counter of resources received that were considered valid. The counter will" + + " be incremented even for resources that have not changed.", "{resource}", + Arrays.asList("grpc.target", "grpc.xds.server", "grpc.xds.resource_type"), + Collections.emptyList(), false); + RESOURCE_UPDATES_INVALID_COUNTER = metricInstrumentRegistry.registerLongCounter( + "grpc.xds_client.resource_updates_invalid", + "EXPERIMENTAL. A counter of resources received that were considered invalid.", "{resource}", + Arrays.asList("grpc.target", "grpc.xds.server", "grpc.xds.resource_type"), + Collections.emptyList(), false); + CONNECTED_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.connected", + "EXPERIMENTAL. Whether or not the xDS client currently has a working ADS stream to the xDS" + + " server. For a given server, this will be set to 1 when the stream is initially" + + " created. It will be set to 0 when we have a connectivity failure or when the ADS" + + " stream fails without seeing a response message, as per gRFC A57. Once set to 0, it" + + " will be reset to 1 when we receive the first response on an ADS stream.", "{bool}", + Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false); + RESOURCES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.resources", + "EXPERIMENTAL. Number of xDS resources.", "{resource}", + Arrays.asList("grpc.target", "grpc.xds.authority", "grpc.xds.cache_state", + "grpc.xds.resource_type"), Collections.emptyList(), false); + } + + XdsClientMetricReporterImpl(MetricRecorder metricRecorder, String target) { + this.metricRecorder = metricRecorder; + this.target = target; + } + + @Override + public void reportResourceUpdates(long validResourceCount, long invalidResourceCount, + String xdsServer, String resourceType) { + metricRecorder.addLongCounter(RESOURCE_UPDATES_VALID_COUNTER, validResourceCount, + Arrays.asList(target, xdsServer, resourceType), Collections.emptyList()); + metricRecorder.addLongCounter(RESOURCE_UPDATES_INVALID_COUNTER, invalidResourceCount, + Arrays.asList(target, xdsServer, resourceType), Collections.emptyList()); + } + + @Override + public void reportServerFailure(long serverFailure, String xdsServer) { + metricRecorder.addLongCounter(SERVER_FAILURE_COUNTER, serverFailure, + Arrays.asList(target, xdsServer), Collections.emptyList()); + } + + void setXdsClient(XdsClient xdsClient) { + assert gaugeRegistration == null; + // register gauge here + this.gaugeRegistration = metricRecorder.registerBatchCallback(new BatchCallback() { + @Override + public void accept(BatchRecorder recorder) { + reportCallbackMetrics(recorder, xdsClient); + } + }, CONNECTED_GAUGE, RESOURCES_GAUGE); + } + + void close() { + if (gaugeRegistration != null) { + gaugeRegistration.close(); + gaugeRegistration = null; + } + } + + void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) { + MetricReporterCallback callback = new MetricReporterCallback(recorder, target); + try { + Future reportServerConnectionsCompleted = xdsClient.reportServerConnections(callback); + + ListenableFuture, Map>> + getResourceMetadataCompleted = xdsClient.getSubscribedResourcesMetadataSnapshot(); + + Map, Map> metadataByType = + getResourceMetadataCompleted.get(10, TimeUnit.SECONDS); + + computeAndReportResourceCounts(metadataByType, callback); + + // Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking + Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); // re-set the current thread's interruption state + } + logger.log(Level.WARNING, "Failed to report gauge metrics", e); + } + } + + private void computeAndReportResourceCounts( + Map, Map> metadataByType, + MetricReporterCallback callback) { + for (Map.Entry, Map> metadataByTypeEntry : + metadataByType.entrySet()) { + XdsResourceType type = metadataByTypeEntry.getKey(); + + Map resourceCountsByState = new HashMap<>(); + for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) { + String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached()); + resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1); + } + + resourceCountsByState.forEach((cacheState, count) -> + callback.reportResourceCountGauge(count, cacheState, type.typeUrl())); + } + } + + private static String cacheStateFromResourceStatus(ResourceMetadataStatus metadataStatus, + boolean isResourceCached) { + switch (metadataStatus) { + case REQUESTED: + return "requested"; + case DOES_NOT_EXIST: + return "does_not_exist"; + case ACKED: + return "acked"; + case NACKED: + return isResourceCached ? "nacked_but_cached" : "nacked"; + default: + return "unknown"; + } + } + + @VisibleForTesting + static final class MetricReporterCallback implements ServerConnectionCallback { + private final BatchRecorder recorder; + private final String target; + + MetricReporterCallback(BatchRecorder recorder, String target) { + this.recorder = recorder; + this.target = target; + } + + // TODO(dnvindhya): include the "authority" label once xds.authority is available. + void reportResourceCountGauge(long resourceCount, String cacheState, + String resourceType) { + recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount, + Arrays.asList(target, cacheState, resourceType), Collections.emptyList()); + } + + @Override + public void reportServerConnectionGauge(boolean isConnected, String xdsServer) { + recorder.recordLongGauge(CONNECTED_GAUGE, isConnected ? 1 : 0, + Arrays.asList(target, xdsServer), Collections.emptyList()); + } + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java index 313eb675116..f10d6504d79 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java @@ -16,6 +16,7 @@ package io.grpc.xds; +import io.grpc.MetricRecorder; import io.grpc.internal.ObjectPool; import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsInitializationException; @@ -29,7 +30,8 @@ interface XdsClientPoolFactory { @Nullable ObjectPool get(String target); - ObjectPool getOrCreate(String target) throws XdsInitializationException; + ObjectPool getOrCreate(String target, MetricRecorder metricRecorder) + throws XdsInitializationException; List getTargets(); } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 0beb6dc2483..c51709c174c 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -41,6 +41,7 @@ import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; @@ -125,6 +126,7 @@ final class XdsNameResolver extends NameResolver { private final ConcurrentMap clusterRefs = new ConcurrentHashMap<>(); private final ConfigSelector configSelector = new ConfigSelector(); private final long randomChannelId; + private final MetricRecorder metricRecorder; private volatile RoutingConfig routingConfig = RoutingConfig.empty; private Listener2 listener; @@ -140,10 +142,12 @@ final class XdsNameResolver extends NameResolver { URI targetUri, String name, @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, - @Nullable Map bootstrapOverride) { + @Nullable Map bootstrapOverride, + MetricRecorder metricRecorder) { this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser, syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(), - ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride); + ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride, + metricRecorder); } @VisibleForTesting @@ -152,7 +156,8 @@ final class XdsNameResolver extends NameResolver { @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser, SynchronizationContext syncContext, ScheduledExecutorService scheduler, XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random, - FilterRegistry filterRegistry, @Nullable Map bootstrapOverride) { + FilterRegistry filterRegistry, @Nullable Map bootstrapOverride, + MetricRecorder metricRecorder) { this.targetAuthority = targetAuthority; target = targetUri.toString(); @@ -170,6 +175,7 @@ final class XdsNameResolver extends NameResolver { this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride); this.random = checkNotNull(random, "random"); this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry"); + this.metricRecorder = metricRecorder; randomChannelId = random.nextLong(); logId = InternalLogId.allocate("xds-resolver", name); logger = XdsLogger.withLogId(logId); @@ -185,7 +191,7 @@ public String getServiceAuthority() { public void start(Listener2 listener) { this.listener = checkNotNull(listener, "listener"); try { - xdsClientPool = xdsClientPoolFactory.getOrCreate(target); + xdsClientPool = xdsClientPoolFactory.getOrCreate(target, metricRecorder); } catch (Exception e) { listener.onError( Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e)); diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java index 8d0e59eaa91..74518331269 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java @@ -81,7 +81,8 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) { targetUri, name, args.getOverrideAuthority(), args.getServiceConfigParser(), args.getSynchronizationContext(), args.getScheduledExecutorService(), - bootstrapOverride); + bootstrapOverride, + args.getMetricRecorder()); } return null; } diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index bd622a71124..8c03e64f185 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -29,6 +29,7 @@ import io.grpc.InternalServerInterceptors; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.MetricRecorder; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCall; @@ -171,7 +172,9 @@ public void run() { private void internalStart() { try { - xdsClientPool = xdsClientPoolFactory.getOrCreate(""); + // TODO(dnvindhya): Add "#server" as "grpc.target" attribute value for + // xDS enabled servers. + xdsClientPool = xdsClientPoolFactory.getOrCreate("", new MetricRecorder() {}); } catch (Exception e) { StatusException statusException = Status.UNAVAILABLE.withDescription( "Failed to initialize xDS").withCause(e).asException(); diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 5ac979277c4..62076fb8bf1 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -78,6 +78,7 @@ final class ControlPlaneClient { private final Map, String> versions = new HashMap<>(); private boolean shutdown; + private boolean streamClosedNoResponse; @Nullable private AdsStream adsStream; @Nullable @@ -224,6 +225,19 @@ void readyHandler() { xdsClient.startSubscriberTimersIfNeeded(serverInfo); } + /** + * Indicates whether there is an active ADS stream. + * + *

Return {@code true} when the {@code AdsStream} is created. + * {@code false} when the ADS stream fails without a response. Resets to true + * upon receiving the first response on a new ADS stream. + */ + // Must be synchronized + boolean hasWorkingAdsStream() { + return !streamClosedNoResponse; + } + + /** * Establishes the RPC connection by creating a new RPC stream on the given channel for * xDS protocol communication. @@ -332,6 +346,8 @@ public void onRecvMessage(DiscoveryResponse response) { syncContext.execute(new Runnable() { @Override public void run() { + // Reset flag as message has been received on a stream + streamClosedNoResponse = false; XdsResourceType type = fromTypeUrl(response.getTypeUrl()); if (logger.isLoggable(XdsLogLevel.DEBUG)) { logger.log( @@ -408,6 +424,7 @@ private void handleRpcStreamClosed(Status status) { "ADS stream closed by server after a response was received"); } } else { + streamClosedNoResponse = true; // If the ADS stream is closed without ever having received a response from the server, then // the XdsClient should consider that a connectivity error (see gRFC A57). if (status.isOk()) { diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClient.java b/xds/src/main/java/io/grpc/xds/client/XdsClient.java index fc7e1777384..06f15005c22 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClient.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -154,44 +155,46 @@ public static final class ResourceMetadata { private final String version; private final ResourceMetadataStatus status; private final long updateTimeNanos; + private final boolean cached; @Nullable private final Any rawResource; @Nullable private final UpdateFailureState errorState; private ResourceMetadata( - ResourceMetadataStatus status, String version, long updateTimeNanos, + ResourceMetadataStatus status, String version, long updateTimeNanos, boolean cached, @Nullable Any rawResource, @Nullable UpdateFailureState errorState) { this.status = checkNotNull(status, "status"); this.version = checkNotNull(version, "version"); this.updateTimeNanos = updateTimeNanos; + this.cached = cached; this.rawResource = rawResource; this.errorState = errorState; } - static ResourceMetadata newResourceMetadataUnknown() { - return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, null, null); + public static ResourceMetadata newResourceMetadataUnknown() { + return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, false,null, null); } - static ResourceMetadata newResourceMetadataRequested() { - return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, null, null); + public static ResourceMetadata newResourceMetadataRequested() { + return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, false, null, null); } - static ResourceMetadata newResourceMetadataDoesNotExist() { - return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, null, null); + public static ResourceMetadata newResourceMetadataDoesNotExist() { + return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null); } public static ResourceMetadata newResourceMetadataAcked( Any rawResource, String version, long updateTimeNanos) { checkNotNull(rawResource, "rawResource"); return new ResourceMetadata( - ResourceMetadataStatus.ACKED, version, updateTimeNanos, rawResource, null); + ResourceMetadataStatus.ACKED, version, updateTimeNanos, true, rawResource, null); } - static ResourceMetadata newResourceMetadataNacked( + public static ResourceMetadata newResourceMetadataNacked( ResourceMetadata metadata, String failedVersion, long failedUpdateTime, - String failedDetails) { + String failedDetails, boolean cached) { checkNotNull(metadata, "metadata"); return new ResourceMetadata(ResourceMetadataStatus.NACKED, - metadata.getVersion(), metadata.getUpdateTimeNanos(), metadata.getRawResource(), + metadata.getVersion(), metadata.getUpdateTimeNanos(), cached, metadata.getRawResource(), new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails)); } @@ -210,6 +213,11 @@ public long getUpdateTimeNanos() { return updateTimeNanos; } + /** Returns whether the resource was cached. */ + public boolean isCached() { + return cached; + } + /** The last successfully updated xDS resource as it was returned by the server. */ @Nullable public Any getRawResource() { @@ -378,6 +386,23 @@ public Map getServerLrsClientMap() { throw new UnsupportedOperationException(); } + /** Callback used to report a gauge metric value for server connections. */ + public interface ServerConnectionCallback { + void reportServerConnectionGauge(boolean isConnected, String xdsServer); + } + + /** + * Reports whether xDS client has a "working" ADS stream to xDS server. The definition of a + * working stream is defined in gRFC A78. + * + * @see + * A78-grpc-metrics-wrr-pf-xds.md + */ + public Future reportServerConnections(ServerConnectionCallback callback) { + throw new UnsupportedOperationException(); + } + static final class ProcessingTracker { private final AtomicInteger pendingTask = new AtomicInteger(1); private final Executor executor; diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index e2380b9ed73..529ac2747df 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -41,7 +41,6 @@ import io.grpc.xds.client.Bootstrapper.AuthorityInfo; import io.grpc.xds.client.Bootstrapper.ServerInfo; import io.grpc.xds.client.XdsClient.ResourceStore; -import io.grpc.xds.client.XdsClient.XdsResponseHandler; import io.grpc.xds.client.XdsLogger.XdsLogLevel; import java.net.URI; import java.util.Collection; @@ -52,6 +51,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -60,7 +60,7 @@ * XdsClient implementation. */ @Internal -public final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore { +public final class XdsClientImpl extends XdsClient implements ResourceStore { // Longest time to wait, since the subscription to some resource, for concluding its absence. @VisibleForTesting @@ -100,6 +100,7 @@ public void uncaughtException(Thread t, Throwable e) { private final XdsLogger logger; private volatile boolean isShutdown; private final MessagePrettyPrinter messagePrinter; + private final XdsClientMetricReporter metricReporter; public XdsClientImpl( XdsTransportFactory xdsTransportFactory, @@ -109,7 +110,8 @@ public XdsClientImpl( Supplier stopwatchSupplier, TimeProvider timeProvider, MessagePrettyPrinter messagePrinter, - Object securityConfig) { + Object securityConfig, + XdsClientMetricReporter metricReporter) { this.xdsTransportFactory = xdsTransportFactory; this.bootstrapInfo = bootstrapInfo; this.timeService = timeService; @@ -118,13 +120,13 @@ public XdsClientImpl( this.timeProvider = timeProvider; this.messagePrinter = messagePrinter; this.securityConfig = securityConfig; + this.metricReporter = metricReporter; logId = InternalLogId.allocate("xds-client", null); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created"); } - @Override - public void handleResourceResponse( + private void handleResourceResponse( XdsResourceType xdsResourceType, ServerInfo serverInfo, String versionInfo, List resources, String nonce, ProcessingTracker processingTracker) { checkNotNull(xdsResourceType, "xdsResourceType"); @@ -138,11 +140,11 @@ public void handleResourceResponse( handleResourceUpdate(args, resources, xdsResourceType, processingTracker); } - @Override - public void handleStreamClosed(Status error) { + private void handleStreamClosed(Status error, ServerInfo serverInfo) { syncContext.throwIfNotInThisSynchronizationContext(); cleanUpResourceTimers(); if (!error.isOk()) { + metricReporter.reportServerFailure(1L, serverInfo.target()); for (Map> subscriberMap : resourceSubscribers.values()) { for (ResourceSubscriber subscriber : subscriberMap.values()) { @@ -154,8 +156,7 @@ public void handleStreamClosed(Status error) { } } - @Override - public void handleStreamRestarted(ServerInfo serverInfo) { + private void handleStreamRestarted(ServerInfo serverInfo) { syncContext.throwIfNotInThisSynchronizationContext(); for (Map> subscriberMap : resourceSubscribers.values()) { @@ -394,7 +395,27 @@ public ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) { xdsTransport, serverInfo, bootstrapInfo.node(), - this, + new XdsResponseHandler() { + + @Override + public void handleResourceResponse( + XdsResourceType resourceType, ServerInfo serverInfo, String versionInfo, + List resources, String nonce, ProcessingTracker processingTracker) { + XdsClientImpl.this.handleResourceResponse(resourceType, serverInfo, versionInfo, + resources, nonce, + processingTracker); + } + + @Override + public void handleStreamClosed(Status error) { + XdsClientImpl.this.handleStreamClosed(error, serverInfo); + } + + @Override + public void handleStreamRestarted(ServerInfo serverInfo) { + XdsClientImpl.this.handleStreamRestarted(serverInfo); + } + }, this, timeService, syncContext, @@ -448,6 +469,10 @@ private void handleResourceUpdate( xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources); Map> parsedResources = result.parsedResources; Set invalidResources = result.invalidResources; + metricReporter.reportResourceUpdates(Long.valueOf(parsedResources.size()), + Long.valueOf(invalidResources.size()), + args.getServerInfo().target(), xdsResourceType.typeUrl()); + List errors = result.errors; String errorDetail = null; if (errors.isEmpty()) { @@ -504,9 +529,19 @@ private void handleResourceUpdate( } } - /** - * Tracks a single subscribed resource. - */ + @Override + public Future reportServerConnections(ServerConnectionCallback callback) { + SettableFuture future = SettableFuture.create(); + syncContext.execute(() -> { + serverCpClientMap.forEach((serverInfo, controlPlaneClient) -> + callback.reportServerConnectionGauge( + controlPlaneClient.hasWorkingAdsStream(), serverInfo.target())); + future.set(null); + }); + return future; + } + + /** Tracks a single subscribed resource. */ private final class ResourceSubscriber { @Nullable private final ServerInfo serverInfo; @Nullable private final ControlPlaneClient controlPlaneClient; @@ -644,10 +679,10 @@ void onData(ParsedResource parsedResource, String version, long updateTime, respTimer.cancel(); respTimer = null; } - this.metadata = ResourceMetadata - .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime); ResourceUpdate oldData = this.data; this.data = parsedResource.getResourceUpdate(); + this.metadata = ResourceMetadata + .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime); absent = false; if (resourceDeletionIgnored) { logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version " @@ -741,7 +776,8 @@ void onError(Status error, @Nullable ProcessingTracker tracker) { void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) { metadata = ResourceMetadata - .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails); + .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails, + data != null); } private void notifyWatcher(ResourceWatcher watcher, T update) { diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java b/xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java new file mode 100644 index 00000000000..a044d501759 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.client; + +import io.grpc.Internal; + +/** + * Interface for reporting metrics from the xDS client. + */ +@Internal +public interface XdsClientMetricReporter { + + /** + * Reports number of valid and invalid resources. + * + * @param validResourceCount Number of resources that were valid. + * @param invalidResourceCount Number of resources that were invalid. + * @param xdsServer Target URI of the xDS server with which the XdsClient is communicating. + * @param resourceType Type of XDS resource (e.g., "envoy.config.listener.v3.Listener"). + */ + default void reportResourceUpdates(long validResourceCount, long invalidResourceCount, + String xdsServer, String resourceType) { + } + + /** + * Reports number of xDS servers going from healthy to unhealthy. + * + * @param serverFailure Number of xDS server failures. + * @param xdsServer Target URI of the xDS server with which the XdsClient is communicating. + */ + default void reportServerFailure(long serverFailure, String xdsServer) { + } + +} diff --git a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java index 8166027033f..7c6821dc560 100644 --- a/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java +++ b/xds/src/test/java/io/grpc/xds/CsdsServiceTest.java @@ -39,6 +39,7 @@ import io.envoyproxy.envoy.type.matcher.v3.NodeMatcher; import io.grpc.Deadline; import io.grpc.InsecureChannelCredentials; +import io.grpc.MetricRecorder; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; @@ -553,8 +554,9 @@ public void setBootstrapOverride(Map bootstrap) { throw new UnsupportedOperationException("Should not be called"); } + @Override - public ObjectPool getOrCreate(String target) { + public ObjectPool getOrCreate(String target, MetricRecorder metricRecorder) { throw new UnsupportedOperationException("Should not be called"); } } diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index 8ecb40383b1..198faea7fdc 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -95,7 +95,9 @@ import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState; import io.grpc.xds.client.XdsClient.ResourceUpdate; import io.grpc.xds.client.XdsClient.ResourceWatcher; +import io.grpc.xds.client.XdsClient.ServerConnectionCallback; import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsResourceType; import io.grpc.xds.client.XdsResourceType.ResourceInvalidException; import io.grpc.xds.client.XdsTransportFactory; @@ -112,6 +114,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -142,6 +145,7 @@ // The base class was used to test both xds v2 and v3. V2 is dropped now so the base class is not // necessary. Still keep it for future version usage. Remove if too much trouble to maintain. public abstract class GrpcXdsClientImplTestBase { + private static final String SERVER_URI = "trafficdirector.googleapis.com"; private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com"; private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com"; @@ -287,6 +291,10 @@ public long currentTimeNanos() { private ResourceWatcher cdsResourceWatcher; @Mock private ResourceWatcher edsResourceWatcher; + @Mock + private XdsClientMetricReporter xdsClientMetricReporter; + @Mock + private ServerConnectionCallback serverConnectionCallback; private ManagedChannel channel; private ManagedChannel channelForCustomAuthority; @@ -369,7 +377,8 @@ public XdsTransport create(ServerInfo serverInfo) { fakeClock.getStopwatchSupplier(), timeProvider, MessagePrinter.INSTANCE, - new TlsContextManagerImpl(bootstrapInfo)); + new TlsContextManagerImpl(bootstrapInfo), + xdsClientMetricReporter); assertThat(resourceDiscoveryCalls).isEmpty(); assertThat(loadReportCalls).isEmpty(); @@ -476,10 +485,11 @@ private void verifyResourceMetadataAcked( private void verifyResourceMetadataNacked( XdsResourceType type, String resourceName, Any rawResource, String versionInfo, long updateTime, String failedVersion, long failedUpdateTimeNanos, - List failedDetails) { + List failedDetails, boolean cached) { ResourceMetadata resourceMetadata = verifyResourceMetadata(type, resourceName, rawResource, ResourceMetadataStatus.NACKED, versionInfo, updateTime, true); + assertThat(resourceMetadata.isCached()).isEqualTo(cached); UpdateFailureState errorState = resourceMetadata.getErrorState(); assertThat(errorState).isNotNull(); @@ -602,6 +612,48 @@ private void validateGoldenClusterLoadAssignment(EdsUpdate edsUpdate) { LocalityLbEndpoints.create(ImmutableList.of(), 2, 1)); } + /** + * Verifies that the {@link XdsClientMetricReporter#reportResourceUpdates} method has been called + * the expected number of times with the expected values for valid resource count, invalid + * resource count, and corresponding metric labels. + */ + private void verifyResourceValidInvalidCount(int times, long validResourceCount, + long invalidResourceCount, String xdsServerTargetLabel, + String resourceType) { + verify(xdsClientMetricReporter, times(times)).reportResourceUpdates( + eq(validResourceCount), + eq(invalidResourceCount), + eq(xdsServerTargetLabel), + eq(resourceType)); + } + + private void verifyServerFailureCount(int times, long serverFailureCount, String xdsServer) { + verify(xdsClientMetricReporter, times(times)).reportServerFailure( + eq(serverFailureCount), + eq(xdsServer)); + } + + /** + * Invokes the callback, which will be called by {@link XdsClientMetricReporter} to record + * whether XdsClient has a working ADS stream. + */ + private void callback_ReportServerConnection() { + try { + Future unused = xdsClient.reportServerConnections(serverConnectionCallback); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new AssertionError(e); + } + } + + private void verifyServerConnection(int times, boolean isConnected, String xdsServer) { + verify(serverConnectionCallback, times(times)).reportServerConnectionGauge( + eq(isConnected), + eq(xdsServer)); + } + @Test public void ldsResourceNotFound() { DiscoveryRpcCall call = startResourceWatcher(XdsListenerResource.getInstance(), LDS_RESOURCE, @@ -621,6 +673,7 @@ public void ldsResourceNotFound() { verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); verifyResourceMetadataDoesNotExist(LDS, LDS_RESOURCE); + // Check metric data. verifySubscribedResourcesMetadataSizes(1, 0, 0, 0); } @@ -628,7 +681,7 @@ public void ldsResourceNotFound() { public void ldsResourceUpdated_withXdstpResourceName_withUnknownAuthority() { String ldsResourceName = "xdstp://unknown.example.com/envoy.config.listener.v3.Listener/listener1"; - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),ldsResourceName, + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceName, ldsResourceWatcher); verify(ldsResourceWatcher).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -636,7 +689,7 @@ public void ldsResourceUpdated_withXdstpResourceName_withUnknownAuthority() { assertThat(error.getDescription()).isEqualTo( "Wrong configuration: xds server does not exist for resource " + ldsResourceName); assertThat(resourceDiscoveryCalls.poll()).isNull(); - xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(),ldsResourceName, + xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, ldsResourceWatcher); assertThat(resourceDiscoveryCalls.poll()).isNull(); } @@ -682,15 +735,16 @@ public void ldsResponseErrorHandling_someResourcesFailedUnpack() { /** * Tests a subscribed LDS resource transitioned to and from the invalid state. * - * @see - * A40-csds-support.md + * @see + * A40-csds-support.md */ @Test public void ldsResponseErrorHandling_subscribedResourceInvalid() { List subscribedResourceNames = ImmutableList.of("A", "B", "C"); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),"A", ldsResourceWatcher); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),"B", ldsResourceWatcher); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),"C", ldsResourceWatcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "A", ldsResourceWatcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "B", ldsResourceWatcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "C", ldsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); verifyResourceMetadataRequested(LDS, "A"); @@ -708,6 +762,8 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid() { verifyResourceMetadataAcked(LDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); + // Check metric data. + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), LDS.typeUrl()); call.verifyRequest(LDS, subscribedResourceNames, VERSION_1, "0000", NODE); // LDS -> {A, B}, version 2 @@ -722,7 +778,9 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid() { List errorsV2 = ImmutableList.of("LDS response Listener 'B' validation error: "); verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + VERSION_2, TIME_INCREMENT * 2, errorsV2, true); + // Check metric data. + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), LDS.typeUrl()); if (!ignoreResourceDeletion()) { verifyResourceMetadataDoesNotExist(LDS, "C"); } else { @@ -738,6 +796,8 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid() { call.sendResponse(LDS, resourcesV3.values().asList(), VERSION_3, "0002"); // {A} -> does not exist // {B, C} -> ACK, version 3 + // Check metric data. + verifyResourceValidInvalidCount(1, 2, 0, xdsServerInfo.target(), LDS.typeUrl()); if (!ignoreResourceDeletion()) { verifyResourceMetadataDoesNotExist(LDS, "A"); } else { @@ -753,12 +813,12 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid() { @Test public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscription() { List subscribedResourceNames = ImmutableList.of("A", "B", "C"); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),"A", ldsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),"A.1", rdsResourceWatcher); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),"B", ldsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),"B.1", rdsResourceWatcher); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),"C", ldsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),"C.1", rdsResourceWatcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "A", ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "A.1", rdsResourceWatcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "B", ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "B.1", rdsResourceWatcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "C", ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "C.1", rdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); verifyResourceMetadataRequested(LDS, "A"); @@ -776,6 +836,7 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscripti "C", Any.pack(mf.buildListenerWithApiListenerForRds("C", "C.1"))); call.sendResponse(LDS, resourcesV1.values().asList(), VERSION_1, "0000"); // {A, B, C} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), LDS.typeUrl()); verifyResourceMetadataAcked(LDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); @@ -792,6 +853,8 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscripti verifyResourceMetadataAcked(RDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2); verifyResourceMetadataAcked(RDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2); verifyResourceMetadataAcked(RDS, "C.1", resourcesV11.get("C.1"), VERSION_1, TIME_INCREMENT * 2); + // Check metric data. + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), RDS.typeUrl()); // LDS -> {A, B}, version 2 // Failed to parse endpoint B @@ -802,11 +865,13 @@ public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscripti // {A} -> ACK, version 2 // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B // {C} -> does not exist + // Check metric data. + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), LDS.typeUrl()); List errorsV2 = ImmutableList.of("LDS response Listener 'B' validation error: "); verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 3); verifyResourceMetadataNacked( LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3, - errorsV2); + errorsV2, true); if (!ignoreResourceDeletion()) { verifyResourceMetadataDoesNotExist(LDS, "C"); } else { @@ -860,11 +925,11 @@ public void wrappedLdsResource_preferWrappedName() { ldsResourceWatcher); Any innerResource = Any.pack(mf.buildListenerWithApiListener("random_name" /* name */, - mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(VHOST_SIZE)))); + mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(VHOST_SIZE)))); // Client sends an ACK LDS request. call.sendResponse(LDS, mf.buildWrappedResourceWithName(innerResource, LDS_RESOURCE), VERSION_1, - "0000"); + "0000"); call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue()); @@ -899,7 +964,7 @@ public void cachedLdsResource_data() { call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE); ResourceWatcher watcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, watcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, watcher); verify(watcher).onChanged(ldsUpdateCaptor.capture()); verifyGoldenListenerRds(ldsUpdateCaptor.getValue()); call.verifyNoMoreRequest(); @@ -916,7 +981,7 @@ public void cachedLdsResource_absent() { verify(ldsResourceWatcher).onResourceDoesNotExist(LDS_RESOURCE); // Add another watcher. ResourceWatcher watcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, watcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(LDS_RESOURCE); call.verifyNoMoreRequest(); verifyResourceMetadataDoesNotExist(LDS, LDS_RESOURCE); @@ -1048,7 +1113,7 @@ public void ldsResourceUpdated_withXdstpResourceName_withWrongType() { call.verifyRequestNack( LDS, ldsResourceName, "", "0000", NODE, ImmutableList.of( - "Unsupported resource name: " + ldsResourceNameWithWrongType + " for type: LDS")); + "Unsupported resource name: " + ldsResourceNameWithWrongType + " for type: LDS")); } @Test @@ -1074,7 +1139,7 @@ public void rdsResourceUpdated_withXdstpResourceName_withWrongType() { public void rdsResourceUpdated_withXdstpResourceName_unknownAuthority() { String rdsResourceName = "xdstp://unknown.example.com/envoy.config.route.v3.RouteConfiguration/route1"; - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),rdsResourceName, + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), rdsResourceName, rdsResourceWatcher); verify(rdsResourceWatcher).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -1083,7 +1148,7 @@ public void rdsResourceUpdated_withXdstpResourceName_unknownAuthority() { "Wrong configuration: xds server does not exist for resource " + rdsResourceName); assertThat(resourceDiscoveryCalls.size()).isEqualTo(0); xdsClient.cancelXdsResourceWatch( - XdsRouteConfigureResource.getInstance(),rdsResourceName, rdsResourceWatcher); + XdsRouteConfigureResource.getInstance(), rdsResourceName, rdsResourceWatcher); assertThat(resourceDiscoveryCalls.size()).isEqualTo(0); } @@ -1109,7 +1174,7 @@ public void cdsResourceUpdated_withXdstpResourceName_withWrongType() { @Test public void cdsResourceUpdated_withXdstpResourceName_unknownAuthority() { String cdsResourceName = "xdstp://unknown.example.com/envoy.config.cluster.v3.Cluster/cluster1"; - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),cdsResourceName, + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), cdsResourceName, cdsResourceWatcher); verify(cdsResourceWatcher).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -1117,7 +1182,7 @@ public void cdsResourceUpdated_withXdstpResourceName_unknownAuthority() { assertThat(error.getDescription()).isEqualTo( "Wrong configuration: xds server does not exist for resource " + cdsResourceName); assertThat(resourceDiscoveryCalls.poll()).isNull(); - xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(),cdsResourceName, + xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), cdsResourceName, cdsResourceWatcher); assertThat(resourceDiscoveryCalls.poll()).isNull(); } @@ -1254,7 +1319,7 @@ public void ldsResourceDeleted() { /** * When ignore_resource_deletion server feature is on, xDS client should keep the deleted listener * on empty response, and resume the normal work when LDS contains the listener again. - * */ + */ @Test public void ldsResourceDeleted_ignoreResourceDeletion() { Assume.assumeTrue(ignoreResourceDeletion()); @@ -1298,9 +1363,9 @@ public void multipleLdsWatchers() { String ldsResourceTwo = "bar.googleapis.com"; ResourceWatcher watcher1 = mock(ResourceWatcher.class); ResourceWatcher watcher2 = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),ldsResourceTwo, watcher1); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),ldsResourceTwo, watcher2); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceTwo, watcher1); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceTwo, watcher2); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(LDS, ImmutableList.of(LDS_RESOURCE, ldsResourceTwo), "", "", NODE); // Both LDS resources were requested. @@ -1340,7 +1405,7 @@ public void rdsResourceNotFound() { DiscoveryRpcCall call = startResourceWatcher(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); Any routeConfig = Any.pack(mf.buildRouteConfiguration("route-bar.googleapis.com", - mf.buildOpaqueVirtualHosts(2))); + mf.buildOpaqueVirtualHosts(2))); call.sendResponse(RDS, routeConfig, VERSION_1, "0000"); // Client sends an ACK RDS request. @@ -1442,15 +1507,16 @@ public void rdsResponseErrorHandling_nackWeightedSumZero() { /** * Tests a subscribed RDS resource transitioned to and from the invalid state. * - * @see - * A40-csds-support.md + * @see + * A40-csds-support.md */ @Test public void rdsResponseErrorHandling_subscribedResourceInvalid() { List subscribedResourceNames = ImmutableList.of("A", "B", "C"); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),"A", rdsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),"B", rdsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),"C", rdsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "A", rdsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "B", rdsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "C", rdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); verifyResourceMetadataRequested(RDS, "A"); @@ -1469,6 +1535,8 @@ public void rdsResponseErrorHandling_subscribedResourceInvalid() { verifyResourceMetadataAcked(RDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(RDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(RDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); + // Check metric data. + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), RDS.typeUrl()); call.verifyRequest(RDS, subscribedResourceNames, VERSION_1, "0000", NODE); // RDS -> {A, B}, version 2 @@ -1480,11 +1548,13 @@ public void rdsResponseErrorHandling_subscribedResourceInvalid() { // {A} -> ACK, version 2 // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B // {C} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), + RDS.typeUrl()); List errorsV2 = ImmutableList.of("RDS response RouteConfiguration 'B' validation error: "); verifyResourceMetadataAcked(RDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(RDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + VERSION_2, TIME_INCREMENT * 2, errorsV2, true); verifyResourceMetadataAcked(RDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); call.verifyRequestNack(RDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2); @@ -1496,6 +1566,8 @@ public void rdsResponseErrorHandling_subscribedResourceInvalid() { call.sendResponse(RDS, resourcesV3.values().asList(), VERSION_3, "0002"); // {A} -> ACK, version 2 // {B, C} -> ACK, version 3 + verifyResourceValidInvalidCount(1, 2, 0, xdsServerInfo.target(), + RDS.typeUrl()); verifyResourceMetadataAcked(RDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataAcked(RDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3); verifyResourceMetadataAcked(RDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3); @@ -1544,7 +1616,7 @@ public void cachedRdsResource_data() { call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE); ResourceWatcher watcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, watcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, watcher); verify(watcher).onChanged(rdsUpdateCaptor.capture()); verifyGoldenRouteConfig(rdsUpdateCaptor.getValue()); call.verifyNoMoreRequest(); @@ -1561,7 +1633,7 @@ public void cachedRdsResource_absent() { verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE); // Add another watcher. ResourceWatcher watcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, watcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(RDS_RESOURCE); call.verifyNoMoreRequest(); verifyResourceMetadataDoesNotExist(RDS, RDS_RESOURCE); @@ -1592,14 +1664,43 @@ public void rdsResourceUpdated() { assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(4); verifyResourceMetadataAcked(RDS, RDS_RESOURCE, routeConfigUpdated, VERSION_2, TIME_INCREMENT * 2); - verifySubscribedResourcesMetadataSizes(0, 0, 1, 0); + } + + @Test + public void rdsResourceInvalid() { + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "A", rdsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), "B", rdsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + verifyResourceMetadataRequested(RDS, "A"); + verifyResourceMetadataRequested(RDS, "B"); + verifySubscribedResourcesMetadataSizes(0, 0, 2, 0); + + // RDS -> {A, B}, version 1 + // Failed to parse endpoint B + List vhostsV1 = mf.buildOpaqueVirtualHosts(1); + ImmutableMap resourcesV1 = ImmutableMap.of( + "A", Any.pack(mf.buildRouteConfiguration("A", vhostsV1)), + "B", Any.pack(mf.buildRouteConfigurationInvalid("B"))); + call.sendResponse(RDS, resourcesV1.values().asList(), VERSION_1, "0000"); + + // {A} -> ACK, version 1 + // {B} -> NACK, version 1, rejected version 1, rejected reason: Failed to parse B + List errorsV1 = + ImmutableList.of("RDS response RouteConfiguration 'B' validation error: "); + verifyResourceMetadataAcked(RDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); + verifyResourceMetadataNacked(RDS, "B", null, "", 0, + VERSION_1, TIME_INCREMENT, errorsV1, false); + // Check metric data. + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), RDS.typeUrl()); + verifySubscribedResourcesMetadataSizes(0, 0, 2, 0); } @Test public void rdsResourceDeletedByLdsApiListener() { - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); verifyResourceMetadataRequested(LDS, LDS_RESOURCE); verifyResourceMetadataRequested(RDS, RDS_RESOURCE); @@ -1707,10 +1808,10 @@ public void multipleRdsWatchers() { String rdsResourceTwo = "route-bar.googleapis.com"; ResourceWatcher watcher1 = mock(ResourceWatcher.class); ResourceWatcher watcher2 = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),rdsResourceTwo, watcher1); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),rdsResourceTwo, watcher2); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), rdsResourceTwo, watcher1); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), rdsResourceTwo, watcher2); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(RDS, Arrays.asList(RDS_RESOURCE, rdsResourceTwo), "", "", NODE); // Both RDS resources were requested. @@ -1814,15 +1915,16 @@ public void cdsResponseErrorHandling_someResourcesFailedUnpack() { /** * Tests a subscribed CDS resource transitioned to and from the invalid state. * - * @see - * A40-csds-support.md + * @see + * A40-csds-support.md */ @Test public void cdsResponseErrorHandling_subscribedResourceInvalid() { List subscribedResourceNames = ImmutableList.of("A", "B", "C"); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),"A", cdsResourceWatcher); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),"B", cdsResourceWatcher); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),"C", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "A", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "B", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "C", cdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); verifyResourceMetadataRequested(CDS, "A"); @@ -1843,6 +1945,8 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid() { ))); call.sendResponse(CDS, resourcesV1.values().asList(), VERSION_1, "0000"); // {A, B, C} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), + CDS.typeUrl()); verifyResourceMetadataAcked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); @@ -1859,10 +1963,12 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid() { // {A} -> ACK, version 2 // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B // {C} -> does not exist + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), + CDS.typeUrl()); List errorsV2 = ImmutableList.of("CDS response Cluster 'B' validation error: "); verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + VERSION_2, TIME_INCREMENT * 2, errorsV2, true); if (!ignoreResourceDeletion()) { verifyResourceMetadataDoesNotExist(CDS, "C"); } else { @@ -1882,6 +1988,8 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid() { call.sendResponse(CDS, resourcesV3.values().asList(), VERSION_3, "0002"); // {A} -> does not exit // {B, C} -> ACK, version 3 + verifyResourceValidInvalidCount(1, 2, 0, xdsServerInfo.target(), + CDS.typeUrl()); if (!ignoreResourceDeletion()) { verifyResourceMetadataDoesNotExist(CDS, "A"); } else { @@ -1890,18 +1998,19 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid() { } verifyResourceMetadataAcked(CDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3); verifyResourceMetadataAcked(CDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3); + call.verifyRequest(CDS, subscribedResourceNames, VERSION_3, "0002", NODE); } @Test public void cdsResponseErrorHandling_subscribedResourceInvalid_withEdsSubscription() { List subscribedResourceNames = ImmutableList.of("A", "B", "C"); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),"A", cdsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),"A.1", edsResourceWatcher); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),"B", cdsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),"B.1", edsResourceWatcher); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),"C", cdsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),"C.1", edsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "A", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "B", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "B.1", edsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "C", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "C.1", edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); verifyResourceMetadataRequested(CDS, "A"); @@ -1925,6 +2034,8 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid_withEdsSubscripti ))); call.sendResponse(CDS, resourcesV1.values().asList(), VERSION_1, "0000"); // {A, B, C} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), + CDS.typeUrl()); verifyResourceMetadataAcked(CDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); @@ -1939,6 +2050,8 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid_withEdsSubscripti "C.1", Any.pack(mf.buildClusterLoadAssignment("C.1", endpointsV1, dropOverloads))); call.sendResponse(EDS, resourcesV11.values().asList(), VERSION_1, "0000"); // {A.1, B.1, C.1} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), + EDS.typeUrl()); verifyResourceMetadataAcked(EDS, "A.1", resourcesV11.get("A.1"), VERSION_1, TIME_INCREMENT * 2); verifyResourceMetadataAcked(EDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2); verifyResourceMetadataAcked(EDS, "C.1", resourcesV11.get("C.1"), VERSION_1, TIME_INCREMENT * 2); @@ -1954,11 +2067,13 @@ public void cdsResponseErrorHandling_subscribedResourceInvalid_withEdsSubscripti // {A} -> ACK, version 2 // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B // {C} -> does not exist + // Check metric data. + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), CDS.typeUrl()); List errorsV2 = ImmutableList.of("CDS response Cluster 'B' validation error: "); verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 3); verifyResourceMetadataNacked( CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3, - errorsV2); + errorsV2, true); if (!ignoreResourceDeletion()) { verifyResourceMetadataDoesNotExist(CDS, "C"); } else { @@ -2144,7 +2259,7 @@ public void cdsResponseWithUpstreamTlsContext() { "envoy.transport_sockets.tls", null, null)); List clusters = ImmutableList.of( Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", - "dns-service-bar.googleapis.com", 443, "round_robin", null, null,false, null, null)), + "dns-service-bar.googleapis.com", 443, "round_robin", null, null, false, null, null)), clusterEds, Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, null, false, null, "envoy.transport_sockets.tls", null, null))); @@ -2176,7 +2291,7 @@ public void cdsResponseWithNewUpstreamTlsContext() { // Management server sends back CDS response with UpstreamTlsContext. Any clusterEds = Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", "round_robin", - null, null,true, + null, null, true, mf.buildNewUpstreamTlsContext("cert-instance-name", "cert1"), "envoy.transport_sockets.tls", null, null)); List clusters = ImmutableList.of( @@ -2217,10 +2332,10 @@ public void cdsResponseErrorHandling_badUpstreamTlsContext() { // The response NACKed with errors indicating indices of the failed resources. String errorMsg = "CDS response Cluster 'cluster.googleapis.com' validation error: " - + "Cluster cluster.googleapis.com: malformed UpstreamTlsContext: " - + "io.grpc.xds.client.XdsResourceType$ResourceInvalidException: " - + "ca_certificate_provider_instance or system_root_certs is required in " - + "upstream-tls-context"; + + "Cluster cluster.googleapis.com: malformed UpstreamTlsContext: " + + "io.grpc.xds.client.XdsResourceType$ResourceInvalidException: " + + "ca_certificate_provider_instance or system_root_certs is required in " + + "upstream-tls-context"; call.verifyRequestNack(CDS, CDS_RESOURCE, "", "0000", NODE, ImmutableList.of(errorMsg)); verify(cdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); @@ -2257,7 +2372,7 @@ public void cdsResponseWithOutlierDetection() { "envoy.transport_sockets.tls", null, outlierDetectionXds)); List clusters = ImmutableList.of( Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", - "dns-service-bar.googleapis.com", 443, "round_robin", null, null,false, null, null)), + "dns-service-bar.googleapis.com", 443, "round_robin", null, null, false, null, null)), clusterEds, Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, null, false, null, "envoy.transport_sockets.tls", null, outlierDetectionXds))); @@ -2316,7 +2431,7 @@ public void cdsResponseWithInvalidOutlierDetectionNacks() { "envoy.transport_sockets.tls", null, outlierDetectionXds)); List clusters = ImmutableList.of( Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", - "dns-service-bar.googleapis.com", 443, "round_robin", null, null,false, null, null)), + "dns-service-bar.googleapis.com", 443, "round_robin", null, null, false, null, null)), clusterEds, Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, null, false, null, "envoy.transport_sockets.tls", null, outlierDetectionXds))); @@ -2436,8 +2551,7 @@ public void cdsResponseErrorHandling_xdstpWithoutEdsConfig() { )); final Any okClusterRoundRobin = Any.pack(mf.buildEdsCluster(cdsResourceName, "eds-service-bar.googleapis.com", - "round_robin", null,null, false, null, "envoy.transport_sockets.tls", null, null)); - + "round_robin", null, null, false, null, "envoy.transport_sockets.tls", null, null)); DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), cdsResourceName, cdsResourceWatcher); @@ -2483,7 +2597,7 @@ public void cachedCdsResource_absent() { fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(cdsResourceWatcher).onResourceDoesNotExist(CDS_RESOURCE); ResourceWatcher watcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),CDS_RESOURCE, watcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(CDS_RESOURCE); call.verifyNoMoreRequest(); verifyResourceMetadataDoesNotExist(CDS, CDS_RESOURCE); @@ -2620,7 +2734,7 @@ public void cdsResourceDeleted() { /** * When ignore_resource_deletion server feature is on, xDS client should keep the deleted cluster * on empty response, and resume the normal work when CDS contains the cluster again. - * */ + */ @Test public void cdsResourceDeleted_ignoreResourceDeletion() { Assume.assumeTrue(ignoreResourceDeletion()); @@ -2666,9 +2780,9 @@ public void multipleCdsWatchers() { String cdsResourceTwo = "cluster-bar.googleapis.com"; ResourceWatcher watcher1 = mock(ResourceWatcher.class); ResourceWatcher watcher2 = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),CDS_RESOURCE, cdsResourceWatcher); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),cdsResourceTwo, watcher1); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),cdsResourceTwo, watcher2); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), cdsResourceTwo, watcher1); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), cdsResourceTwo, watcher2); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(CDS, Arrays.asList(CDS_RESOURCE, cdsResourceTwo), "", "", NODE); verifyResourceMetadataRequested(CDS, CDS_RESOURCE); @@ -2774,7 +2888,7 @@ public void edsCleanupNonceAfterUnsubscription() { List dropOverloads = ImmutableList.of(); List endpointsV1 = ImmutableList.of(lbEndpointHealthy); ImmutableMap resourcesV1 = ImmutableMap.of( - "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads))); + "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads))); call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000"); // {A.1} -> ACK, version 1 call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE); @@ -2835,21 +2949,21 @@ public void edsResponseErrorHandling_someResourcesFailedUnpack() { /** * Tests a subscribed EDS resource transitioned to and from the invalid state. * - * @see - * A40-csds-support.md + * @see + * A40-csds-support.md */ @Test public void edsResponseErrorHandling_subscribedResourceInvalid() { List subscribedResourceNames = ImmutableList.of("A", "B", "C"); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),"A", edsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),"B", edsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),"C", edsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A", edsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "B", edsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "C", edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); assertThat(call).isNotNull(); verifyResourceMetadataRequested(EDS, "A"); verifyResourceMetadataRequested(EDS, "B"); verifyResourceMetadataRequested(EDS, "C"); - verifySubscribedResourcesMetadataSizes(0, 0, 0, 3); // EDS -> {A, B, C}, version 1 List dropOverloads = ImmutableList.of(mf.buildDropOverload("lb", 200)); @@ -2860,6 +2974,7 @@ public void edsResponseErrorHandling_subscribedResourceInvalid() { "C", Any.pack(mf.buildClusterLoadAssignment("C", endpointsV1, dropOverloads))); call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000"); // {A, B, C} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), EDS.typeUrl()); verifyResourceMetadataAcked(EDS, "A", resourcesV1.get("A"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(EDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(EDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); @@ -2875,11 +2990,13 @@ public void edsResponseErrorHandling_subscribedResourceInvalid() { // {A} -> ACK, version 2 // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B // {C} -> ACK, version 1 + // Check metric data. + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), EDS.typeUrl()); List errorsV2 = ImmutableList.of("EDS response ClusterLoadAssignment 'B' validation error: "); verifyResourceMetadataAcked(EDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataNacked(EDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, - VERSION_2, TIME_INCREMENT * 2, errorsV2); + VERSION_2, TIME_INCREMENT * 2, errorsV2, true); verifyResourceMetadataAcked(EDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT); call.verifyRequestNack(EDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2); @@ -2892,6 +3009,8 @@ public void edsResponseErrorHandling_subscribedResourceInvalid() { call.sendResponse(EDS, resourcesV3.values().asList(), VERSION_3, "0002"); // {A} -> ACK, version 2 // {B, C} -> ACK, version 3 + // Check metric data. + verifyResourceValidInvalidCount(1, 2, 0, xdsServerInfo.target(), EDS.typeUrl()); verifyResourceMetadataAcked(EDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2); verifyResourceMetadataAcked(EDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3); verifyResourceMetadataAcked(EDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3); @@ -2940,7 +3059,7 @@ public void cachedEdsResource_data() { call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE); // Add another watcher. ResourceWatcher watcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),EDS_RESOURCE, watcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), EDS_RESOURCE, watcher); verify(watcher).onChanged(edsUpdateCaptor.capture()); validateGoldenClusterLoadAssignment(edsUpdateCaptor.getValue()); call.verifyNoMoreRequest(); @@ -2957,7 +3076,7 @@ public void cachedEdsResource_absent() { fakeClock.forwardTime(XdsClientImpl.INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); ResourceWatcher watcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),EDS_RESOURCE, watcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), EDS_RESOURCE, watcher); verify(watcher).onResourceDoesNotExist(EDS_RESOURCE); call.verifyNoMoreRequest(); verifyResourceMetadataDoesNotExist(EDS, EDS_RESOURCE); @@ -3161,10 +3280,10 @@ public void edsResourceDeletedByCds() { String resource = "backend-service.googleapis.com"; ResourceWatcher cdsWatcher = mock(ResourceWatcher.class); ResourceWatcher edsWatcher = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),resource, cdsWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),resource, edsWatcher); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),CDS_RESOURCE, cdsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),EDS_RESOURCE, edsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), resource, cdsWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resource, edsWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), EDS_RESOURCE, edsResourceWatcher); verifyResourceMetadataRequested(CDS, CDS_RESOURCE); verifyResourceMetadataRequested(CDS, resource); verifyResourceMetadataRequested(EDS, EDS_RESOURCE); @@ -3242,7 +3361,6 @@ public void edsResourceDeletedByCds() { EDS, resource, clusterLoadAssignments.get(1), VERSION_1, TIME_INCREMENT * 2); // no change verifyResourceMetadataAcked(CDS, resource, clusters.get(0), VERSION_2, TIME_INCREMENT * 3); verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusters.get(1), VERSION_2, TIME_INCREMENT * 3); - verifySubscribedResourcesMetadataSizes(0, 2, 0, 2); } @Test @@ -3251,9 +3369,9 @@ public void multipleEdsWatchers() { String edsResourceTwo = "cluster-load-assignment-bar.googleapis.com"; ResourceWatcher watcher1 = mock(ResourceWatcher.class); ResourceWatcher watcher2 = mock(ResourceWatcher.class); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),EDS_RESOURCE, edsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),edsResourceTwo, watcher1); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),edsResourceTwo, watcher2); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), EDS_RESOURCE, edsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), edsResourceTwo, watcher1); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), edsResourceTwo, watcher2); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(EDS, Arrays.asList(EDS_RESOURCE, edsResourceTwo), "", "", NODE); verifyResourceMetadataRequested(EDS, EDS_RESOURCE); @@ -3338,12 +3456,18 @@ public void useIndependentRpcContext() { @Test public void streamClosedWithNoResponse() { - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, true, xdsServerInfo.target()); // Management server closes the RPC stream before sending any response. call.sendCompleted(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, false, xdsServerInfo.target()); verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) .onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, @@ -3355,20 +3479,29 @@ public void streamClosedWithNoResponse() { @Test public void streamClosedAfterSendingResponses() { - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, true, xdsServerInfo.target()); ScheduledTask ldsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); ScheduledTask rdsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); call.sendResponse(LDS, testListenerRds, VERSION_1, "0000"); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(2, true, xdsServerInfo.target()); assertThat(ldsResourceTimeout.isCancelled()).isTrue(); call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000"); assertThat(rdsResourceTimeout.isCancelled()).isTrue(); // Management server closes the RPC stream after sending responses. call.sendCompleted(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(3, true, xdsServerInfo.target()); verify(ldsResourceWatcher, never()).onError(errorCaptor.capture()); verify(rdsResourceWatcher, never()).onError(errorCaptor.capture()); } @@ -3376,11 +3509,14 @@ public void streamClosedAfterSendingResponses() { @Test public void streamClosedAndRetryWithBackoff() { InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); - xdsClient.watchXdsResource(XdsListenerResource.getInstance(),LDS_RESOURCE, ldsResourceWatcher); - xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),RDS_RESOURCE, + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, true, xdsServerInfo.target()); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); - xdsClient.watchXdsResource(XdsClusterResource.getInstance(),CDS_RESOURCE, cdsResourceWatcher); - xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),EDS_RESOURCE, edsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), EDS_RESOURCE, edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.verifyRequest(LDS, LDS_RESOURCE, "", "", NODE); call.verifyRequest(RDS, RDS_RESOURCE, "", "", NODE); @@ -3399,6 +3535,10 @@ public void streamClosedAndRetryWithBackoff() { verify(edsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, false, xdsServerInfo.target()); + // Retry after backoff. inOrder.verify(backoffPolicyProvider).get(); inOrder.verify(backoffPolicy1).nextBackoffNanos(); @@ -3412,6 +3552,10 @@ public void streamClosedAndRetryWithBackoff() { call.verifyRequest(CDS, CDS_RESOURCE, "", "", NODE); call.verifyRequest(EDS, EDS_RESOURCE, "", "", NODE); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(2, false, xdsServerInfo.target()); + // Management server becomes unreachable. String errorMsg = "my fault"; call.sendError(Status.UNAVAILABLE.withDescription(errorMsg).asException()); @@ -3424,6 +3568,10 @@ public void streamClosedAndRetryWithBackoff() { verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(3, false, xdsServerInfo.target()); + // Retry after backoff. inOrder.verify(backoffPolicy1).nextBackoffNanos(); retryTask = @@ -3441,6 +3589,9 @@ public void streamClosedAndRetryWithBackoff() { mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(2))))); call.sendResponse(LDS, listeners, "63", "3242"); call.verifyRequest(LDS, LDS_RESOURCE, "63", "3242", NODE); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(2, true, xdsServerInfo.target()); List routeConfigs = ImmutableList.of( Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2)))); @@ -3455,6 +3606,10 @@ public void streamClosedAndRetryWithBackoff() { verify(edsResourceWatcher, times(2)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, errorMsg); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(3, true, xdsServerInfo.target()); + // Reset backoff sequence and retry after backoff. inOrder.verify(backoffPolicyProvider).get(); inOrder.verify(backoffPolicy2).nextBackoffNanos(); @@ -3477,6 +3632,10 @@ public void streamClosedAndRetryWithBackoff() { verify(edsResourceWatcher, times(3)).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(4, false, xdsServerInfo.target()); + // Retry after backoff. inOrder.verify(backoffPolicy2).nextBackoffNanos(); retryTask = @@ -3489,6 +3648,10 @@ public void streamClosedAndRetryWithBackoff() { call.verifyRequest(CDS, CDS_RESOURCE, "", "", NODE); call.verifyRequest(EDS, EDS_RESOURCE, "", "", NODE); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(5, false, xdsServerInfo.target()); + inOrder.verifyNoMoreInteractions(); } @@ -3499,6 +3662,9 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, rdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, true, xdsServerInfo.target()); call.sendError(Status.UNAVAILABLE.asException()); verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) .onError(errorCaptor.capture()); @@ -3509,6 +3675,10 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, false, xdsServerInfo.target()); + xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), @@ -3523,11 +3693,19 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { call.verifyRequest(EDS, EDS_RESOURCE, "", "", NODE); call.verifyNoMoreRequest(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(2,false, xdsServerInfo.target()); + call.sendResponse(LDS, testListenerRds, VERSION_1, "0000"); List routeConfigs = ImmutableList.of( Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(VHOST_SIZE)))); call.sendResponse(RDS, routeConfigs, VERSION_1, "0000"); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(2, true, xdsServerInfo.target()); + verifyNoMoreInteractions(ldsResourceWatcher, rdsResourceWatcher); } @@ -3539,6 +3717,9 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher); xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), EDS_RESOURCE, edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, true, xdsServerInfo.target()); ScheduledTask ldsResourceTimeout = Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); ScheduledTask rdsResourceTimeout = @@ -3549,9 +3730,15 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe Iterables.getOnlyElement(fakeClock.getPendingTasks(EDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); call.sendResponse(LDS, testListenerRds, VERSION_1, "0000"); assertThat(ldsResourceTimeout.isCancelled()).isTrue(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(2, true, xdsServerInfo.target()); call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000"); assertThat(rdsResourceTimeout.isCancelled()).isTrue(); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(3, true, xdsServerInfo.target()); call.sendError(Status.UNAVAILABLE.asException()); assertThat(cdsResourceTimeout.isCancelled()).isTrue(); @@ -3560,6 +3747,9 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe verify(rdsResourceWatcher, never()).onError(errorCaptor.capture()); verify(cdsResourceWatcher, never()).onError(errorCaptor.capture()); verify(edsResourceWatcher, never()).onError(errorCaptor.capture()); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(4, true, xdsServerInfo.target()); fakeClock.forwardNanos(10L); assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).hasSize(0); @@ -3578,13 +3768,13 @@ public void reportLoadStatsToServer() { lrsCall.sendResponse(Collections.singletonList(clusterName), 1000L); fakeClock.forwardNanos(1000L); - lrsCall.verifyNextReportClusters(Collections.singletonList(new String[] {clusterName, null})); + lrsCall.verifyNextReportClusters(Collections.singletonList(new String[]{clusterName, null})); dropStats.release(); fakeClock.forwardNanos(1000L); // In case of having unreported cluster stats, one last report will be sent after corresponding // stats object released. - lrsCall.verifyNextReportClusters(Collections.singletonList(new String[] {clusterName, null})); + lrsCall.verifyNextReportClusters(Collections.singletonList(new String[]{clusterName, null})); fakeClock.forwardNanos(1000L); // Currently load reporting continues (with empty stats) even if all stats objects have been @@ -3658,18 +3848,18 @@ public void serverSideListenerNotFound() { @Test public void serverSideListenerResponseErrorHandling_badDownstreamTlsContext() { GrpcXdsClientImplTestBase.DiscoveryRpcCall call = - startResourceWatcher(XdsListenerResource.getInstance(), LISTENER_RESOURCE, - ldsResourceWatcher); + startResourceWatcher(XdsListenerResource.getInstance(), LISTENER_RESOURCE, + ldsResourceWatcher); Message hcmFilter = mf.buildHttpConnectionManagerFilter( - "route-foo.googleapis.com", null, + "route-foo.googleapis.com", null, Collections.singletonList(mf.buildTerminalFilter())); Message downstreamTlsContext = CommonTlsContextTestsUtil.buildTestDownstreamTlsContext( - null, null,false); + null, null, false); Message filterChain = mf.buildFilterChain( - Collections.emptyList(), downstreamTlsContext, "envoy.transport_sockets.tls", + Collections.emptyList(), downstreamTlsContext, "envoy.transport_sockets.tls", hcmFilter); Message listener = - mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain); + mf.buildListenerWithFilterChain(LISTENER_RESOURCE, 7000, "0.0.0.0", filterChain); List listeners = ImmutableList.of(Any.pack(listener)); call.sendResponse(LDS, listeners, "0", "0000"); // The response NACKed with errors indicating indices of the failed resources. @@ -3690,7 +3880,7 @@ public void serverSideListenerResponseErrorHandling_badTransportSocketName() { "route-foo.googleapis.com", null, Collections.singletonList(mf.buildTerminalFilter())); Message downstreamTlsContext = CommonTlsContextTestsUtil.buildTestDownstreamTlsContext( - "cert1", "cert2",false); + "cert1", "cert2", false); Message filterChain = mf.buildFilterChain( Collections.emptyList(), downstreamTlsContext, "envoy.transport_sockets.bad1", hcmFilter); @@ -3721,6 +3911,9 @@ public void sendingToStoppedServer() throws Exception { xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); fakeClock.forwardTime(14, TimeUnit.SECONDS); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, false, xdsServerInfo.target()); // Restart the server xdsServer = cleanupRule.register( @@ -3735,11 +3928,18 @@ public void sendingToStoppedServer() throws Exception { verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); fakeClock.forwardTime(20, TimeUnit.SECONDS); // Trigger rpcRetryTimer DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(2, false, xdsServerInfo.target()); if (call == null) { // The first rpcRetry may have happened before the channel was ready fakeClock.forwardTime(50, TimeUnit.SECONDS); call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); } + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(3, false, xdsServerInfo.target()); + // NOTE: There is a ScheduledExecutorService that may get involved due to the reconnect // so you cannot rely on the logic being single threaded. The timeout() in verifyRequest // is therefore necessary to avoid flakiness. @@ -3751,6 +3951,9 @@ public void sendingToStoppedServer() throws Exception { assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(1, 1, 0, 0); + // Check metric data. + callback_ReportServerConnection(); + verifyServerConnection(1, true, xdsServerInfo.target()); } catch (Throwable t) { throw t; // This allows putting a breakpoint here for debugging } @@ -3782,6 +3985,152 @@ public void sendToNonexistentServer() throws Exception { client.shutdown(); } + @Test + public void validAndInvalidResourceMetricReport() { + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "A", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "B", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "B.1", edsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), "C", cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "C.1", edsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + + // CDS -> {A, B, C}, version 1 + ImmutableMap resourcesV1 = ImmutableMap.of( + "A", Any.pack(mf.buildEdsCluster("A", "A.1", "round_robin", null, null, false, null, + "envoy.transport_sockets.tls", null, null + )), + "B", Any.pack(mf.buildEdsCluster("B", "B.1", "round_robin", null, null, false, null, + "envoy.transport_sockets.tls", null, null + )), + "C", Any.pack(mf.buildEdsCluster("C", "C.1", "round_robin", null, null, false, null, + "envoy.transport_sockets.tls", null, null + ))); + call.sendResponse(CDS, resourcesV1.values().asList(), VERSION_1, "0000"); + // {A, B, C} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), CDS.typeUrl()); + + // EDS -> {A.1, B.1, C.1}, version 1 + List dropOverloads = ImmutableList.of(); + List endpointsV1 = ImmutableList.of(lbEndpointHealthy); + ImmutableMap resourcesV11 = ImmutableMap.of( + "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)), + "B.1", Any.pack(mf.buildClusterLoadAssignment("B.1", endpointsV1, dropOverloads)), + "C.1", Any.pack(mf.buildClusterLoadAssignment("C.1", endpointsV1, dropOverloads))); + call.sendResponse(EDS, resourcesV11.values().asList(), VERSION_1, "0000"); + // {A.1, B.1, C.1} -> ACK, version 1 + verifyResourceValidInvalidCount(1, 3, 0, xdsServerInfo.target(), EDS.typeUrl()); + + // CDS -> {A, B}, version 2 + // Failed to parse endpoint B + ImmutableMap resourcesV2 = ImmutableMap.of( + "A", Any.pack(mf.buildEdsCluster("A", "A.2", "round_robin", null, null, false, null, + "envoy.transport_sockets.tls", null, null + )), + "B", Any.pack(mf.buildClusterInvalid("B"))); + call.sendResponse(CDS, resourcesV2.values().asList(), VERSION_2, "0001"); + // {A} -> ACK, version 2 + // {B} -> NACK, version 1, rejected version 2, rejected reason: Failed to parse B + // {C} -> does not exist + verifyResourceValidInvalidCount(1, 1, 1, xdsServerInfo.target(), CDS.typeUrl()); + } + + @Test + public void serverFailureMetricReport() { + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, + rdsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + // Management server closes the RPC stream before sending any response. + call.sendCompleted(); + verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) + .onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, + "ADS stream closed with OK before receiving a response"); + verify(rdsResourceWatcher).onError(errorCaptor.capture()); + verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, + "ADS stream closed with OK before receiving a response"); + verifyServerFailureCount(1, 1, xdsServerInfo.target()); + } + + @Test + public void serverFailureMetricReport_forRetryAndBackoff() { + InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE, + rdsResourceWatcher); + xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher); + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), EDS_RESOURCE, edsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + + // Management server closes the RPC stream with an error. + call.sendError(Status.UNKNOWN.asException()); + verifyServerFailureCount(1, 1, xdsServerInfo.target()); + + // Retry after backoff. + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + ScheduledTask retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(10L); + fakeClock.forwardNanos(10L); + call = resourceDiscoveryCalls.poll(); + + // Management server becomes unreachable. + String errorMsg = "my fault"; + call.sendError(Status.UNAVAILABLE.withDescription(errorMsg).asException()); + verifyServerFailureCount(2, 1, xdsServerInfo.target()); + + // Retry after backoff. + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(100L); + fakeClock.forwardNanos(100L); + call = resourceDiscoveryCalls.poll(); + + List resources = ImmutableList.of(FAILING_ANY, testListenerRds, FAILING_ANY); + call.sendResponse(LDS, resources, "63", "3242"); + + List routeConfigs = ImmutableList.of(FAILING_ANY, testRouteConfig, FAILING_ANY); + call.sendResponse(RDS, routeConfigs, "5", "6764"); + + call.sendError(Status.DEADLINE_EXCEEDED.asException()); + // Server Failure metric will not be reported, as stream is closed with an error after receiving + // a response + verifyServerFailureCount(2, 1, xdsServerInfo.target()); + + // Reset backoff sequence and retry after backoff. + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(20L); + fakeClock.forwardNanos(20L); + call = resourceDiscoveryCalls.poll(); + + // Management server becomes unreachable again. + call.sendError(Status.UNAVAILABLE.asException()); + verifyServerFailureCount(3, 1, xdsServerInfo.target()); + + // Retry after backoff. + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + retryTask = + Iterables.getOnlyElement(fakeClock.getPendingTasks(RPC_RETRY_TASK_FILTER)); + assertThat(retryTask.getDelay(TimeUnit.NANOSECONDS)).isEqualTo(200L); + fakeClock.forwardNanos(200L); + call = resourceDiscoveryCalls.poll(); + + List clusters = ImmutableList.of(FAILING_ANY, testClusterRoundRobin); + call.sendResponse(CDS, clusters, VERSION_1, "0000"); + call.sendCompleted(); + // Server Failure metric will not be reported once again, as stream is closed after receiving a + // response + verifyServerFailureCount(3, 1, xdsServerInfo.target()); + } + + private XdsClientImpl createXdsClient(String serverUri) { BootstrapInfo bootstrapInfo = buildBootStrap(serverUri); return new XdsClientImpl( @@ -3792,10 +4141,11 @@ private XdsClientImpl createXdsClient(String serverUri) { fakeClock.getStopwatchSupplier(), timeProvider, MessagePrinter.INSTANCE, - new TlsContextManagerImpl(bootstrapInfo)); + new TlsContextManagerImpl(bootstrapInfo), + xdsClientMetricReporter); } - private BootstrapInfo buildBootStrap(String serverUri) { + private BootstrapInfo buildBootStrap(String serverUri) { ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS, ignoreResourceDeletion(), true); @@ -3902,7 +4252,7 @@ protected void sendResponse( } protected void sendResponse(XdsResourceType type, Any resource, String versionInfo, - String nonce) { + String nonce) { sendResponse(type, ImmutableList.of(resource), versionInfo, nonce); } @@ -3934,6 +4284,7 @@ protected void sendResponse(List clusters, long loadReportIntervalNano) } protected abstract static class MessageFactory { + /** Throws {@link InvalidProtocolBufferException} on {@link Any#unpack(Class)}. */ protected static final Any FAILING_ANY = Any.newBuilder().setTypeUrl("fake").build(); diff --git a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java index ee164938b2d..4fb77f0be42 100644 --- a/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/SharedXdsClientPoolProviderTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import io.grpc.InsecureChannelCredentials; +import io.grpc.MetricRecorder; import io.grpc.internal.ObjectPool; import io.grpc.xds.SharedXdsClientPoolProvider.RefCountedXdsClientObjectPool; import io.grpc.xds.client.Bootstrapper.BootstrapInfo; @@ -51,6 +52,7 @@ public class SharedXdsClientPoolProviderTest { @Rule public final ExpectedException thrown = ExpectedException.none(); private final Node node = Node.newBuilder().setId("SharedXdsClientPoolProviderTest").build(); + private final MetricRecorder metricRecorder = new MetricRecorder() {}; private static final String DUMMY_TARGET = "dummy"; @Mock @@ -64,7 +66,7 @@ public void noServer() throws XdsInitializationException { SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); thrown.expect(XdsInitializationException.class); thrown.expectMessage("No xDS server provided"); - provider.getOrCreate(DUMMY_TARGET); + provider.getOrCreate(DUMMY_TARGET, metricRecorder); assertThat(provider.get(DUMMY_TARGET)).isNull(); } @@ -77,9 +79,9 @@ public void sharedXdsClientObjectPool() throws XdsInitializationException { SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); assertThat(provider.get(DUMMY_TARGET)).isNull(); - ObjectPool xdsClientPool = provider.getOrCreate(DUMMY_TARGET); + ObjectPool xdsClientPool = provider.getOrCreate(DUMMY_TARGET, metricRecorder); verify(bootstrapper).bootstrap(); - assertThat(provider.getOrCreate(DUMMY_TARGET)).isSameInstanceAs(xdsClientPool); + assertThat(provider.getOrCreate(DUMMY_TARGET, metricRecorder)).isSameInstanceAs(xdsClientPool); assertThat(provider.get(DUMMY_TARGET)).isNotNull(); assertThat(provider.get(DUMMY_TARGET)).isSameInstanceAs(xdsClientPool); verifyNoMoreInteractions(bootstrapper); @@ -90,8 +92,9 @@ public void refCountedXdsClientObjectPool_delayedCreation() { ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()); BootstrapInfo bootstrapInfo = BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); + SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); RefCountedXdsClientObjectPool xdsClientPool = - new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); + provider.new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET, metricRecorder); assertThat(xdsClientPool.getXdsClientForTest()).isNull(); XdsClient xdsClient = xdsClientPool.getObject(); assertThat(xdsClientPool.getXdsClientForTest()).isNotNull(); @@ -103,8 +106,9 @@ public void refCountedXdsClientObjectPool_refCounted() { ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()); BootstrapInfo bootstrapInfo = BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); + SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); RefCountedXdsClientObjectPool xdsClientPool = - new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); + provider.new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET, metricRecorder); // getObject once XdsClient xdsClient = xdsClientPool.getObject(); assertThat(xdsClient).isNotNull(); @@ -123,8 +127,9 @@ public void refCountedXdsClientObjectPool_getObjectCreatesNewInstanceIfAlreadySh ServerInfo server = ServerInfo.create(SERVER_URI, InsecureChannelCredentials.create()); BootstrapInfo bootstrapInfo = BootstrapInfo.builder().servers(Collections.singletonList(server)).node(node).build(); + SharedXdsClientPoolProvider provider = new SharedXdsClientPoolProvider(bootstrapper); RefCountedXdsClientObjectPool xdsClientPool = - new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET); + provider.new RefCountedXdsClientObjectPool(bootstrapInfo, DUMMY_TARGET, metricRecorder); XdsClient xdsClient1 = xdsClientPool.getObject(); assertThat(xdsClientPool.returnObject(xdsClient1)).isNull(); assertThat(xdsClient1.isShutDown()).isTrue(); diff --git a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java index 0b8e89de721..b2b713e9a8e 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientFederationTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.grpc.MetricRecorder; import io.grpc.internal.ObjectPool; import io.grpc.xds.Filter.NamedFilterConfig; import io.grpc.xds.XdsListenerResource.LdsUpdate; @@ -73,12 +74,13 @@ public class XdsClientFederationTest { private ObjectPool xdsClientPool; private XdsClient xdsClient; private static final String DUMMY_TARGET = "dummy"; + private final MetricRecorder metricRecorder = new MetricRecorder() {}; @Before public void setUp() throws XdsInitializationException { SharedXdsClientPoolProvider clientPoolProvider = new SharedXdsClientPoolProvider(); clientPoolProvider.setBootstrapOverride(defaultBootstrapOverride()); - xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET); + xdsClientPool = clientPoolProvider.getOrCreate(DUMMY_TARGET, metricRecorder); xdsClient = xdsClientPool.getObject(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java new file mode 100644 index 00000000000..9ee3f88d921 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java @@ -0,0 +1,394 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.Any; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.grpc.MetricInstrument; +import io.grpc.MetricRecorder; +import io.grpc.MetricRecorder.BatchCallback; +import io.grpc.MetricRecorder.BatchRecorder; +import io.grpc.MetricSink; +import io.grpc.xds.XdsClientMetricReporterImpl.MetricReporterCallback; +import io.grpc.xds.client.XdsClient; +import io.grpc.xds.client.XdsClient.ResourceMetadata; +import io.grpc.xds.client.XdsClient.ServerConnectionCallback; +import io.grpc.xds.client.XdsResourceType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatcher; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** + * Unit tests for {@link XdsClientMetricReporterImpl}. + */ +@RunWith(JUnit4.class) +public class XdsClientMetricReporterImplTest { + + private static final String target = "test-target"; + private static final String server = "trafficdirector.googleapis.com"; + private static final String resourceTypeUrl = + "resourceTypeUrl.googleapis.com/envoy.config.cluster.v3.Cluster"; + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + + @Mock + private MetricRecorder mockMetricRecorder; + @Mock + private XdsClient mockXdsClient; + @Mock + private BatchRecorder mockBatchRecorder; + @Captor + private ArgumentCaptor gaugeBatchCallbackCaptor; + + private XdsClientMetricReporterImpl reporter; + + @Before + public void setUp() { + reporter = new XdsClientMetricReporterImpl(mockMetricRecorder, target); + } + + @Test + public void reportResourceUpdates() { + // TODO(dnvindhya): add the "authority" label once available. + reporter.reportResourceUpdates(10, 5, server, resourceTypeUrl); + verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.xds_client.resource_updates_valid"), eq((long) 10), + eq(Lists.newArrayList(target, server, resourceTypeUrl)), + eq(Lists.newArrayList())); + verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.xds_client.resource_updates_invalid"), + eq((long) 5), + eq(Lists.newArrayList(target, server, resourceTypeUrl)), + eq(Lists.newArrayList())); + } + + @Test + public void reportServerFailure() { + reporter.reportServerFailure(1, server); + verify(mockMetricRecorder).addLongCounter( + eqMetricInstrumentName("grpc.xds_client.server_failure"), eq((long) 1), + eq(Lists.newArrayList(target, server)), + eq(Lists.newArrayList())); + } + + @Test + public void setXdsClient_reportMetrics() throws Exception { + SettableFuture future = SettableFuture.create(); + future.set(null); + when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( + ImmutableMap.of())); + when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class))) + .thenReturn(future); + reporter.setXdsClient(mockXdsClient); + verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(), + eqMetricInstrumentName("grpc.xds_client.connected"), + eqMetricInstrumentName("grpc.xds_client.resources")); + gaugeBatchCallbackCaptor.getValue().accept(mockBatchRecorder); + verify(mockXdsClient).reportServerConnections(any(ServerConnectionCallback.class)); + } + + @Test + public void setXdsClient_reportCallbackMetrics_resourceCountsFails() { + TestlogHandler testLogHandler = new TestlogHandler(); + Logger logger = Logger.getLogger(XdsClientMetricReporterImpl.class.getName()); + logger.addHandler(testLogHandler); + + // For reporting resource counts connections, return a normally completed future + SettableFuture future = SettableFuture.create(); + future.set(null); + when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( + ImmutableMap.of())); + + // Create a future that will throw an exception + SettableFuture serverConnectionsFeature = SettableFuture.create(); + serverConnectionsFeature.setException(new Exception("test")); + when(mockXdsClient.reportServerConnections(any())).thenReturn(serverConnectionsFeature); + + reporter.setXdsClient(mockXdsClient); + verify(mockMetricRecorder) + .registerBatchCallback(gaugeBatchCallbackCaptor.capture(), any(), any()); + gaugeBatchCallbackCaptor.getValue().accept(mockBatchRecorder); + // Verify that the xdsClient methods were called + // verify(mockXdsClient).reportResourceCounts(any()); + verify(mockXdsClient).reportServerConnections(any()); + + assertThat(testLogHandler.getLogs().size()).isEqualTo(1); + assertThat(testLogHandler.getLogs().get(0).getLevel()).isEqualTo(Level.WARNING); + assertThat(testLogHandler.getLogs().get(0).getMessage()).isEqualTo( + "Failed to report gauge metrics"); + logger.removeHandler(testLogHandler); + } + + @Test + public void metricGauges() { + SettableFuture future = SettableFuture.create(); + future.set(null); + when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture( + ImmutableMap.of())); + when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class))) + .thenReturn(future); + reporter.setXdsClient(mockXdsClient); + verify(mockMetricRecorder).registerBatchCallback(gaugeBatchCallbackCaptor.capture(), + eqMetricInstrumentName("grpc.xds_client.connected"), + eqMetricInstrumentName("grpc.xds_client.resources")); + BatchCallback gaugeBatchCallback = gaugeBatchCallbackCaptor.getValue(); + InOrder inOrder = inOrder(mockBatchRecorder); + // Trigger the internal call to reportCallbackMetrics() + gaugeBatchCallback.accept(mockBatchRecorder); + + ArgumentCaptor serverConnectionCallbackCaptor = + ArgumentCaptor.forClass(ServerConnectionCallback.class); + // verify(mockXdsClient).reportResourceCounts(resourceCallbackCaptor.capture()); + verify(mockXdsClient).reportServerConnections(serverConnectionCallbackCaptor.capture()); + + // Get the captured callback + MetricReporterCallback callback = (MetricReporterCallback) + serverConnectionCallbackCaptor.getValue(); + + // Verify that reportResourceCounts and reportServerConnections were called + // with the captured callback + callback.reportResourceCountGauge(10, "acked", resourceTypeUrl); + inOrder.verify(mockBatchRecorder) + .recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(), + any()); + callback.reportServerConnectionGauge(true, "xdsServer"); + inOrder.verify(mockBatchRecorder) + .recordLongGauge(eqMetricInstrumentName("grpc.xds_client.connected"), eq(1L), any(), any()); + + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void metricReporterCallback() { + MetricReporterCallback callback = + new MetricReporterCallback(mockBatchRecorder, target); + + callback.reportServerConnectionGauge(true, server); + verify(mockBatchRecorder, times(1)).recordLongGauge( + eqMetricInstrumentName("grpc.xds_client.connected"), eq(1L), + eq(Lists.newArrayList(target, server)), + eq(Lists.newArrayList())); + + String cacheState = "requested"; + callback.reportResourceCountGauge(10, cacheState, resourceTypeUrl); + verify(mockBatchRecorder, times(1)).recordLongGauge( + eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), + eq(Arrays.asList(target, cacheState, resourceTypeUrl)), + eq(Collections.emptyList())); + } + + @Test + public void reportCallbackMetrics_computeAndReportResourceCounts() { + Map, Map> metadataByType = new HashMap<>(); + XdsResourceType listenerResource = XdsListenerResource.getInstance(); + XdsResourceType routeConfigResource = XdsRouteConfigureResource.getInstance(); + XdsResourceType clusterResource = XdsClusterResource.getInstance(); + + Any rawListener = + Any.pack(Listener.newBuilder().setName("listener.googleapis.com").build()); + long nanosLastUpdate = 1577923199_606042047L; + + Map ldsResourceMetadataMap = new HashMap<>(); + ldsResourceMetadataMap.put("resource1", + ResourceMetadata.newResourceMetadataRequested()); + ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42", + nanosLastUpdate); + ldsResourceMetadataMap.put("resource2", ackedLdsResource); + ldsResourceMetadataMap.put("resource3", + ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate)); + ldsResourceMetadataMap.put("resource4", + ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate, + "nacked after previous ack", true)); + + Map rdsResourceMetadataMap = new HashMap<>(); + ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested(); + rdsResourceMetadataMap.put("resource5", + ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24", + nanosLastUpdate, "nacked after request", false)); + rdsResourceMetadataMap.put("resource6", + ResourceMetadata.newResourceMetadataDoesNotExist()); + + Map cdsResourceMetadataMap = new HashMap<>(); + cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown()); + + metadataByType.put(listenerResource, ldsResourceMetadataMap); + metadataByType.put(routeConfigResource, rdsResourceMetadataMap); + metadataByType.put(clusterResource, cdsResourceMetadataMap); + + SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); + reportServerConnectionsCompleted.set(null); + when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class))) + .thenReturn(reportServerConnectionsCompleted); + + ListenableFuture, Map>> + getResourceMetadataCompleted = Futures.immediateFuture(metadataByType); + when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) + .thenReturn(getResourceMetadataCompleted); + + reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); + + // LDS resource requested + verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), + eq(1L), eq(Arrays.asList(target, "requested", listenerResource.typeUrl())), any()); + // LDS resources acked + verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), + eq(2L), eq(Arrays.asList(target, "acked", listenerResource.typeUrl())), any()); + // LDS resource nacked but cached + verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), + eq(1L), eq(Arrays.asList(target, "nacked_but_cached", listenerResource.typeUrl())), any()); + + // RDS resource nacked + verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), + eq(1L), eq(Arrays.asList(target, "nacked", routeConfigResource.typeUrl())), any()); + // RDS resource does not exist + verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), + eq(1L), eq(Arrays.asList(target, "does_not_exist", routeConfigResource.typeUrl())), any()); + + // CDS resource unknown + verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), + eq(1L), eq(Arrays.asList(target, "unknown", clusterResource.typeUrl())), any()); + verifyNoMoreInteractions(mockBatchRecorder); + } + + @Test + public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources() { + Map, Map> metadataByType = new HashMap<>(); + XdsResourceType listenerResource = XdsListenerResource.getInstance(); + metadataByType.put(listenerResource, Collections.emptyMap()); + + SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); + reportServerConnectionsCompleted.set(null); + when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class))) + .thenReturn(reportServerConnectionsCompleted); + + ListenableFuture, Map>> + getResourceMetadataCompleted = Futures.immediateFuture(metadataByType); + when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) + .thenReturn(getResourceMetadataCompleted); + + reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); + + // Verify that reportResourceCountGauge is never called + verifyNoInteractions(mockBatchRecorder); + } + + @Test + public void reportCallbackMetrics_computeAndReportResourceCounts_nullMetadata() { + TestlogHandler testLogHandler = new TestlogHandler(); + Logger logger = Logger.getLogger(XdsClientMetricReporterImpl.class.getName()); + logger.addHandler(testLogHandler); + + SettableFuture reportServerConnectionsCompleted = SettableFuture.create(); + reportServerConnectionsCompleted.set(null); + when(mockXdsClient.reportServerConnections(any(MetricReporterCallback.class))) + .thenReturn(reportServerConnectionsCompleted); + + ListenableFuture, Map>> + getResourceMetadataCompleted = Futures.immediateFailedFuture( + new Exception("Error generating metadata snapshot")); + when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()) + .thenReturn(getResourceMetadataCompleted); + + reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient); + assertThat(testLogHandler.getLogs().size()).isEqualTo(1); + assertThat(testLogHandler.getLogs().get(0).getLevel()).isEqualTo(Level.WARNING); + assertThat(testLogHandler.getLogs().get(0).getMessage()).isEqualTo( + "Failed to report gauge metrics"); + logger.removeHandler(testLogHandler); + } + + @Test + public void close_closesGaugeRegistration() { + MetricSink.Registration mockRegistration = mock(MetricSink.Registration.class); + when(mockMetricRecorder.registerBatchCallback(any(MetricRecorder.BatchCallback.class), + eqMetricInstrumentName("grpc.xds_client.connected"), + eqMetricInstrumentName("grpc.xds_client.resources"))).thenReturn(mockRegistration); + + // Sets XdsClient and register the gauges + reporter.setXdsClient(mockXdsClient); + // Closes registered gauges + reporter.close(); + verify(mockRegistration, times(1)).close(); + } + + @SuppressWarnings("TypeParameterUnusedInFormals") + private T eqMetricInstrumentName(String name) { + return argThat(new ArgumentMatcher() { + @Override + public boolean matches(T instrument) { + return instrument.getName().equals(name); + } + }); + } + + static class TestlogHandler extends Handler { + List logs = new ArrayList<>(); + + @Override + public void publish(LogRecord record) { + logs.add(record); + } + + @Override + public void close() {} + + @Override + public void flush() {} + + public List getLogs() { + return logs; + } + } + +} diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverProviderTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverProviderTest.java index a216c3de028..33aae268c60 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverProviderTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import io.grpc.ChannelLogger; import io.grpc.InternalServiceProviders; +import io.grpc.MetricRecorder; import io.grpc.NameResolver; import io.grpc.NameResolver.ServiceConfigParser; import io.grpc.NameResolverProvider; @@ -57,6 +58,7 @@ public void uncaughtException(Thread t, Throwable e) { .setServiceConfigParser(mock(ServiceConfigParser.class)) .setScheduledExecutorService(fakeClock.getScheduledExecutorService()) .setChannelLogger(mock(ChannelLogger.class)) + .setMetricRecorder(mock(MetricRecorder.class)) .build(); private XdsNameResolverProvider provider = new XdsNameResolverProvider(); diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index f32c198f21f..6f4c1503cee 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -55,6 +55,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; +import io.grpc.MetricRecorder; import io.grpc.NameResolver; import io.grpc.NameResolver.ConfigOrError; import io.grpc.NameResolver.ResolutionResult; @@ -152,6 +153,7 @@ public ConfigOrError parseServiceConfig(Map rawServiceConfig) { private final CallInfo call1 = new CallInfo("HelloService", "hi"); private final CallInfo call2 = new CallInfo("GreetService", "bye"); private final TestChannel channel = new TestChannel(); + private final MetricRecorder metricRecorder = new MetricRecorder() {}; private BootstrapInfo bootstrapInfo = BootstrapInfo.builder() .servers(ImmutableList.of(ServerInfo.create( "td.googleapis.com", InsecureChannelCredentials.create()))) @@ -187,7 +189,7 @@ public void setUp() { RouterFilter.INSTANCE); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, filterRegistry, null); + xdsClientPoolFactory, mockRandom, filterRegistry, null, metricRecorder); } @After @@ -215,7 +217,8 @@ public ObjectPool get(String target) { } @Override - public ObjectPool getOrCreate(String target) throws XdsInitializationException { + public ObjectPool getOrCreate(String target, MetricRecorder metricRecorder) + throws XdsInitializationException { throw new XdsInitializationException("Fail to read bootstrap file"); } @@ -227,7 +230,8 @@ public List getTargets() { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -240,7 +244,8 @@ public List getTargets() { public void resolving_withTargetAuthorityNotFound() { resolver = new XdsNameResolver(targetUri, "notfound.google.com", AUTHORITY, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); verify(mockListener).onError(errorCaptor.capture()); Status error = errorCaptor.getValue(); @@ -262,7 +267,7 @@ public void resolving_noTargetAuthority_templateWithoutXdstp() { resolver = new XdsNameResolver( targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, - mockRandom, FilterRegistry.getDefaultRegistry(), null); + mockRandom, FilterRegistry.getDefaultRegistry(), null, metricRecorder); resolver.start(mockListener); verify(mockListener, never()).onError(any(Status.class)); } @@ -282,7 +287,8 @@ public void resolving_noTargetAuthority_templateWithXdstp() { + "%5B::FFFF:129.144.52.38%5D:80?id=1"; resolver = new XdsNameResolver( targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); verify(mockListener, never()).onError(any(Status.class)); } @@ -302,7 +308,8 @@ public void resolving_noTargetAuthority_xdstpWithMultipleSlashes() { + "path/to/service?id=1"; resolver = new XdsNameResolver( targetUri, null, serviceAuthority, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); // The Service Authority must be URL encoded, but unlike the LDS resource name. @@ -330,7 +337,8 @@ public void resolving_targetAuthorityInAuthoritiesMap() { + "%5B::FFFF:129.144.52.38%5D:80?bar=2&foo=1"; // query param canonified resolver = new XdsNameResolver(targetUri, "xds.authority.com", serviceAuthority, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); verify(mockListener, never()).onError(any(Status.class)); } @@ -362,7 +370,8 @@ public void resolving_ldsResourceUpdateRdsName() { .build(); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); // use different ldsResourceName and service authority. The virtualhost lookup should use // service authority. expectedLdsResourceName = "test-" + expectedLdsResourceName; @@ -543,7 +552,8 @@ public void resolving_matchingVirtualHostNotFound_matchingOverrideAuthority() { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); @@ -566,7 +576,8 @@ public void resolving_matchingVirtualHostNotFound_notMatchingOverrideAuthority() resolver = new XdsNameResolver(targetUri, null, AUTHORITY, "random", serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate(0L, Arrays.asList(virtualHost)); @@ -577,7 +588,8 @@ public void resolving_matchingVirtualHostNotFound_notMatchingOverrideAuthority() public void resolving_matchingVirtualHostNotFoundForOverrideAuthority() { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, AUTHORITY, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate(0L, buildUnmatchedVirtualHosts()); @@ -661,7 +673,8 @@ public void retryPolicyInPerMethodConfigGeneratedByResolverIsValid() { ServiceConfigParser realParser = new ScParser( true, 5, 5, new AutoConfiguredLoadBalancerFactory("pick-first")); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, realParser, syncContext, - scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + scheduler, xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); RetryPolicy retryPolicy = RetryPolicy.create( @@ -871,7 +884,8 @@ public void resolved_rpcHashingByChannelId() { when(mockRandom.nextLong()).thenReturn(123L); resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, - xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null); + xdsClientPoolFactory, mockRandom, FilterRegistry.getDefaultRegistry(), null, + metricRecorder); resolver.start(mockListener); xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( @@ -904,7 +918,7 @@ public void resolved_rpcHashingByChannelId() { public void resolved_routeActionHasAutoHostRewrite_emitsCallOptionForTheSame() { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, - FilterRegistry.getDefaultRegistry(), null); + FilterRegistry.getDefaultRegistry(), null, metricRecorder); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( @@ -935,7 +949,7 @@ public void resolved_routeActionHasAutoHostRewrite_emitsCallOptionForTheSame() { public void resolved_routeActionNoAutoHostRewrite_doesntEmitCallOptionForTheSame() { resolver = new XdsNameResolver(targetUri, null, AUTHORITY, null, serviceConfigParser, syncContext, scheduler, xdsClientPoolFactory, mockRandom, - FilterRegistry.getDefaultRegistry(), null); + FilterRegistry.getDefaultRegistry(), null, metricRecorder); resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); xdsClient.deliverLdsUpdate( @@ -1994,7 +2008,8 @@ public ObjectPool get(String target) { } @Override - public ObjectPool getOrCreate(String target) throws XdsInitializationException { + public ObjectPool getOrCreate(String target, MetricRecorder metricRecorder) + throws XdsInitializationException { targets.add(target); return new ObjectPool() { @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java index 791318c5355..a27c2917712 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.SettableFuture; import io.grpc.InsecureChannelCredentials; +import io.grpc.MetricRecorder; import io.grpc.internal.ObjectPool; import io.grpc.xds.EnvoyServerProtoData.ConnectionSourceType; import io.grpc.xds.EnvoyServerProtoData.FilterChain; @@ -151,7 +152,8 @@ public ObjectPool get(String target) { } @Override - public ObjectPool getOrCreate(String target) throws XdsInitializationException { + public ObjectPool getOrCreate(String target, MetricRecorder metricRecorder) + throws XdsInitializationException { return new ObjectPool() { @Override public XdsClient getObject() {