Skip to content

Commit

Permalink
Test that update works (with associated fixes)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Jan 8, 2025
1 parent 12a48ad commit 5dcd069
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 85 deletions.
9 changes: 5 additions & 4 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
Set<String> addedClusters = Sets.difference(newNames, oldNames);
Set<String> deletedClusters = Sets.difference(oldNames, newNames);
addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster)));
deletedClusters.forEach((cluster) -> cancelWatcher(getCluster(cluster)));
deletedClusters.forEach((cluster) -> cancelClusterWatcherTree(getCluster(cluster)));

if (!addedClusters.isEmpty()) {
maybePublishConfig();
Expand Down Expand Up @@ -567,7 +567,7 @@ private void updateRoutes(List<VirtualHost> virtualHosts) {
oldClusters == null ? Collections.emptySet() : Sets.difference(oldClusters, clusters);

addedClusters.forEach((cluster) -> addWatcher(new CdsWatcher(cluster)));
deletedClusters.forEach(watcher -> cancelWatcher(getCluster(watcher)));
deletedClusters.forEach(watcher -> cancelClusterWatcherTree(getCluster(watcher)));
}

// Must be in SyncContext
Expand All @@ -593,8 +593,9 @@ private static String prefixedClusterName(String name) {
return "cluster:" + name;
}

private XdsWatcherBase<?> getCluster(String clusterName) {
return resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
@SuppressWarnings("unchecked")
private CdsWatcher getCluster(String clusterName) {
return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
}

}
3 changes: 1 addition & 2 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,7 @@ private <T extends ResourceUpdate> void handleResourceUpdate(
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo,
args.nonce);
controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, args.nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
logger.log(XdsLogLevel.WARNING,
Expand Down
21 changes: 17 additions & 4 deletions xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
Expand All @@ -53,6 +51,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
Expand All @@ -75,8 +75,6 @@ public class XdsDependencyManagerTest {
private Server xdsServer;

private final FakeClock fakeClock = new FakeClock();
private final BlockingDeque<XdsTestUtils.DiscoveryRpcCall> resourceDiscoveryCalls =
new LinkedBlockingDeque<>(1);
private final String serverName = InProcessServerBuilder.generateName();
private final Queue<XdsTestUtils.LrsRpcCall> loadReportCalls = new ArrayDeque<>();
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
Expand Down Expand Up @@ -142,6 +140,21 @@ public void verify_basic_config() {
testWatcher.verifyStats(1, 0, 0);
}

@Test
public void verify_config_update() {
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);

InOrder inOrder = org.mockito.Mockito.inOrder(xdsConfigWatcher);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);
testWatcher.verifyStats(1, 0, 0);

XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS2", "EDS2",
XdsTestUtils.ENDPOINT_HOSTNAME + "2", XdsTestUtils.ENDPOINT_PORT + 2);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(ArgumentMatchers.notNull());
testWatcher.verifyStats(2, 0, 0);
}

private static class TestWatcher implements XdsDependencyManager.XdsConfigWatcher {
XdsConfig lastConfig;
int numUpdates = 0;
Expand Down
4 changes: 2 additions & 2 deletions xds/src/test/java/io/grpc/xds/XdsTestControlPlaneService.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public <T extends Message> void setXdsConfig(final String type, final Map<String
public void run() {
HashMap<String, Message> copyResources = new HashMap<>(resources);
xdsResources.put(type, copyResources);
String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndDecrement());
String newVersionInfo = String.valueOf(xdsVersions.get(type).getAndIncrement());

for (Map.Entry<StreamObserver<DiscoveryResponse>, Set<String>> entry :
subscribers.get(type).entrySet()) {
Expand Down Expand Up @@ -159,7 +159,7 @@ public void run() {

DiscoveryResponse response = generateResponse(resourceType,
String.valueOf(xdsVersions.get(resourceType)),
String.valueOf(xdsNonces.get(resourceType).get(responseObserver)),
String.valueOf(xdsNonces.get(resourceType).get(responseObserver).addAndGet(1)),
requestedResourceNames);
responseObserver.onNext(response);
subscribers.get(resourceType).put(responseObserver, requestedResourceNames);
Expand Down
75 changes: 2 additions & 73 deletions xds/src/test/java/io/grpc/xds/XdsTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.BoolValue;
import com.google.protobuf.Message;
import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
Expand All @@ -46,7 +42,6 @@
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
Expand All @@ -55,14 +50,10 @@
import io.grpc.Context.CancellationListener;
import io.grpc.StatusOr;
import io.grpc.internal.JsonParser;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.client.Bootstrapper;
import io.grpc.xds.client.EnvoyProtoData;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsResourceType;
import java.io.IOException;
Expand All @@ -80,8 +71,6 @@
import javax.annotation.Nullable;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class XdsTestUtils {
private static final Logger log = Logger.getLogger(XdsTestUtils.class.getName());
Expand Down Expand Up @@ -155,7 +144,7 @@ static void setAdsConfig(XdsTestControlPlaneService service, String serverName,
service.setXdsConfig(ADS_TYPE_URL_CDS, ImmutableMap.<String, Message>of(clusterName, cluster));

ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
serverName, endpointHostname, endpointPort);
serverName, endpointHostname, endpointPort, edsName);
service.setXdsConfig(ADS_TYPE_URL_EDS,
ImmutableMap.<String, Message>of(edsName, clusterLoadAssignment));

Expand Down Expand Up @@ -208,13 +197,7 @@ static XdsConfig getDefaultXdsConfig(String serverHostName)
return builder.build();
}

private static ConfigOrError<LbConfig> getWrrLbConfig() throws IOException {
Map<String, ?> lbParsed = getWrrLbConfigAsMap();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(lbParsed);

return ConfigOrError.fromConfig(lbConfig);
}

@SuppressWarnings("unchecked")
private static ImmutableMap<String, ?> getWrrLbConfigAsMap() throws IOException {
String lbConfigStr = "{\"wrr_locality_experimental\" : "
+ "{ \"childPolicy\" : [{\"round_robin\" : {}}]}}";
Expand Down Expand Up @@ -331,60 +314,6 @@ public boolean matches(LoadStatsRequest argument) {
}
}

static class DiscoveryRpcCall {
StreamObserver<DiscoveryRequest> requestObserver;
StreamObserver<DiscoveryResponse> responseObserver;

private DiscoveryRpcCall(StreamObserver<DiscoveryRequest> requestObserver,
StreamObserver<DiscoveryResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
}

protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node, VerificationMode verificationMode) {
verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null)));
}

protected void verifyRequestNack(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node, List<String> errorMessages) {
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce,
Code.INVALID_ARGUMENT_VALUE, errorMessages)));
}

protected void verifyNoMoreRequest() {
verifyNoMoreInteractions(requestObserver);
}

protected void sendResponse(
XdsResourceType<?> type, List<Any> resources, String versionInfo, String nonce) {
DiscoveryResponse response =
DiscoveryResponse.newBuilder()
.setVersionInfo(versionInfo)
.addAllResources(resources)
.setTypeUrl(type.typeUrl())
.setNonce(nonce)
.build();
responseObserver.onNext(response);
}

protected void sendError(Throwable t) {
responseObserver.onError(t);
}

protected void sendCompleted() {
responseObserver.onCompleted();
}

protected boolean isReady() {
return ((ServerCallStreamObserver)responseObserver).isReady();
}
}

static class LrsRpcCall {
private final StreamObserver<LoadStatsRequest> requestObserver;
private final StreamObserver<LoadStatsResponse> responseObserver;
Expand Down

0 comments on commit 5dcd069

Please sign in to comment.