Skip to content

Commit

Permalink
xds: Revert xds flow control change. (#10784)
Browse files Browse the repository at this point in the history
* Revert "xds: fix flow control test failure (#10773)"

This reverts commit f67ec2e.

* Revert "xDS: implement ADS stream flow control mechanism (#10674)"

This reverts commit 0a704a5.
  • Loading branch information
YifeiZhuang authored Dec 28, 2023
1 parent f67ec2e commit 846e008
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 520 deletions.
145 changes: 81 additions & 64 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ private ClusterState(String name) {

private void start() {
shutdown = false;
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this, syncContext);
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
}

void shutdown() {
Expand All @@ -341,85 +341,102 @@ public void onError(Status error) {
String.format("Unable to load CDS %s. xDS server returned: %s: %s",
name, error.getCode(), error.getDescription()))
.withCause(error.getCause());
if (shutdown) {
return;
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(status);
}
}
});
}

@Override
public void onResourceDoesNotExist(String resourceName) {
if (shutdown) {
return;
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
childClusterStates = null;
}
handleClusterDiscovered();
}
childClusterStates = null;
}
handleClusterDiscovered();
});
}

@Override
public void onChanged(final CdsUpdate update) {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
class ClusterDiscovered implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();

logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : update.prioritizedClusterNames()) {
if (newChildStates.containsKey(cluster)) {
logger.log(XdsLogLevel.WARNING,
String.format("duplicate cluster name %s in aggregate %s is being ignored",
cluster, update.clusterName()));
continue;
}
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState;
if (clusterStates.containsKey(cluster)) {
childState = clusterStates.get(cluster);
if (childState.shutdown) {
childState.start();
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
} else {
childState = new ClusterState(cluster);
clusterStates.put(cluster, childState);
childState.start();
}
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
}
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}
handleClusterDiscovered();
}
childClusterStates = newChildStates;
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}
handleClusterDiscovered();
}

syncContext.execute(new ClusterDiscovered());
}
}
}
}
51 changes: 30 additions & 21 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,7 @@ private EdsClusterState(String name, @Nullable String edsServiceName,
void start() {
String resourceName = edsServiceName != null ? edsServiceName : name;
logger.log(XdsLogLevel.INFO, "Start watching EDS resource {0}", resourceName);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(),
resourceName, this, syncContext);
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), resourceName, this);
}

@Override
Expand Down Expand Up @@ -453,7 +452,7 @@ public void run() {
}
}

new EndpointsUpdated().run();
syncContext.execute(new EndpointsUpdated());
}

private List<String> generatePriorityNames(String name,
Expand Down Expand Up @@ -492,28 +491,38 @@ private List<String> generatePriorityNames(String name,

@Override
public void onResourceDoesNotExist(final String resourceName) {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.INFO, "Resource {0} unavailable", resourceName);
status = Status.OK;
resolved = true;
result = null; // resource revoked
handleEndpointResourceUpdate();
}
});
}

@Override
public void onError(final Status error) {
if (shutdown) {
return;
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
String resourceName = edsServiceName != null ? edsServiceName : name;
status = Status.UNAVAILABLE
.withDescription(String.format("Unable to load EDS %s. xDS server returned: %s: %s",
resourceName, error.getCode(), error.getDescription()))
.withCause(error.getCause());
logger.log(XdsLogLevel.WARNING, "Received EDS error: {0}", error);
handleEndpointResolutionError();
}
});
}
}

Expand Down
21 changes: 4 additions & 17 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ProcessingTracker;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
Expand Down Expand Up @@ -288,8 +288,6 @@ private abstract class AbstractAdsStream {

abstract boolean isReady();

abstract void request(int count);

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
Expand All @@ -316,10 +314,7 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
}
responseReceived = true;
respNonces.put(type, nonce);
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
processingTracker);
processingTracker.onComplete();
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
}

final void handleRpcError(Throwable t) {
Expand Down Expand Up @@ -377,15 +372,14 @@ private void cleanUp() {
}

private final class AdsStreamV3 extends AbstractAdsStream {
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
private StreamObserver<DiscoveryRequest> requestWriter;

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
Expand All @@ -395,7 +389,6 @@ final class AdsClientResponseObserver

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}

Expand Down Expand Up @@ -444,8 +437,7 @@ public void run() {
}
}

requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
requestWriter = stub.streamAggregatedResources(new AdsClientResponseObserver());
}

@Override
Expand Down Expand Up @@ -475,11 +467,6 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
}
}

@Override
void request(int count) {
requestWriter.request(count);
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
Expand Down
34 changes: 2 additions & 32 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Splitter;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.grpc.xds.Bootstrapper.ServerInfo;
Expand All @@ -37,8 +36,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -307,15 +304,9 @@ TlsContextManager getTlsContextManager() {
/**
* Registers a data watcher for the given Xds resource.
*/
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
throw new UnsupportedOperationException();
}

<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
throw new UnsupportedOperationException();
}

/**
Expand Down Expand Up @@ -362,32 +353,11 @@ Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
throw new UnsupportedOperationException();
}

static final class ProcessingTracker {
private final AtomicInteger pendingTask = new AtomicInteger(1);
private final Executor executor;
private final Runnable completionListener;

ProcessingTracker(Runnable completionListener, Executor executor) {
this.executor = executor;
this.completionListener = completionListener;
}

void startTask() {
pendingTask.incrementAndGet();
}

void onComplete() {
if (pendingTask.decrementAndGet() == 0) {
executor.execute(completionListener);
}
}
}

interface XdsResponseHandler {
/** Called when a xds response is received. */
void handleResourceResponse(
XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
List<Any> resources, String nonce, ProcessingTracker processingTracker);
List<Any> resources, String nonce);

/** Called when the ADS stream is closed passively. */
// Must be synchronized.
Expand Down
Loading

0 comments on commit 846e008

Please sign in to comment.