1818
1919import static com .google .common .base .Preconditions .checkArgument ;
2020import static com .google .common .base .Preconditions .checkNotNull ;
21- import static com .google .common .base .Preconditions .checkState ;
2221import static io .grpc .xds .client .XdsClient .ResourceUpdate ;
2322
2423import com .google .common .annotations .VisibleForTesting ;
3534import io .grpc .xds .client .XdsClient ;
3635import io .grpc .xds .client .XdsClient .ResourceWatcher ;
3736import io .grpc .xds .client .XdsResourceType ;
37+ import java .io .Closeable ;
38+ import java .io .IOException ;
3839import java .util .Collections ;
3940import java .util .HashMap ;
4041import java .util .HashSet ;
5556final class XdsDependencyManager implements XdsConfig .XdsClusterSubscriptionRegistry {
5657 public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource .getInstance ();
5758 public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource .getInstance ();
58- private static final int MAX_CLUSTER_RECURSION_DEPTH = 16 ; // Specified by gRFC A37
59+ private static final int MAX_CLUSTER_RECURSION_DEPTH = 16 ; // Matches C++
5960 private final String listenerName ;
6061 private final XdsClient xdsClient ;
62+ private final XdsConfigWatcher xdsConfigWatcher ;
6163 private final SynchronizationContext syncContext ;
6264 private final String dataPlaneAuthority ;
63- private XdsConfigWatcher xdsConfigWatcher ;
6465
6566 private StatusOr <XdsConfig > lastUpdate = null ;
6667 private final Map <XdsResourceType <?>, TypeWatchers <?>> resourceWatchers = new HashMap <>();
6768 private final Set <ClusterSubscription > subscriptions = new HashSet <>();
6869
69- XdsDependencyManager (XdsClient xdsClient ,
70+ XdsDependencyManager (XdsClient xdsClient , XdsConfigWatcher xdsConfigWatcher ,
7071 SynchronizationContext syncContext , String dataPlaneAuthority ,
7172 String listenerName , NameResolver .Args nameResolverArgs ,
7273 ScheduledExecutorService scheduler ) {
7374 this .listenerName = checkNotNull (listenerName , "listenerName" );
7475 this .xdsClient = checkNotNull (xdsClient , "xdsClient" );
76+ this .xdsConfigWatcher = checkNotNull (xdsConfigWatcher , "xdsConfigWatcher" );
7577 this .syncContext = checkNotNull (syncContext , "syncContext" );
7678 this .dataPlaneAuthority = checkNotNull (dataPlaneAuthority , "dataPlaneAuthority" );
7779 checkNotNull (nameResolverArgs , "nameResolverArgs" );
7880 checkNotNull (scheduler , "scheduler" );
81+
82+ // start the ball rolling
83+ syncContext .execute (() -> addWatcher (new LdsWatcher (listenerName )));
7984 }
8085
8186 public static String toContextStr (String typeName , String resourceName ) {
8287 return typeName + " resource " + resourceName ;
8388 }
8489
85- public void start (XdsConfigWatcher xdsConfigWatcher ) {
86- checkState (this .xdsConfigWatcher == null , "dep manager may not be restarted" );
87- this .xdsConfigWatcher = checkNotNull (xdsConfigWatcher , "xdsConfigWatcher" );
88- // start the ball rolling
89- syncContext .execute (() -> addWatcher (new LdsWatcher (listenerName )));
90- }
91-
9290 @ Override
93- public XdsConfig .Subscription subscribeToCluster (String clusterName ) {
94- checkState (this .xdsConfigWatcher != null , "dep manager must first be started" );
91+ public Closeable subscribeToCluster (String clusterName ) {
9592 checkNotNull (clusterName , "clusterName" );
9693 ClusterSubscription subscription = new ClusterSubscription (clusterName );
9794
@@ -294,17 +291,10 @@ private static void addConfigForCluster(
294291 addConfigForCluster (clusters , childCluster , ancestors , tracer );
295292 StatusOr <XdsConfig .XdsClusterConfig > config = clusters .get (childCluster );
296293 if (!config .hasValue ()) {
297- // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
298- // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
299- // watchers reports a transient ADS stream error, the policy should report that it is in
300- // TRANSIENT_FAILURE if it has never passed a config to its child.
301- //
302- // But there's currently disagreement about whether that is actually what we want, and
303- // that was not originally implemented in gRPC Java. So we're keeping Java's old
304- // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
305- // cycle).
306- leafNames .add (childCluster );
307- continue ;
294+ clusters .put (clusterName , StatusOr .fromStatus (Status .INTERNAL .withDescription (
295+ "Unable to get leaves for " + clusterName + ": "
296+ + config .getStatus ().getDescription ())));
297+ return ;
308298 }
309299 XdsConfig .XdsClusterConfig .ClusterChild children = config .getValue ().getChildren ();
310300 if (children instanceof AggregateConfig ) {
@@ -336,11 +326,6 @@ private static void addConfigForCluster(
336326 child = new EndpointConfig (StatusOr .fromStatus (Status .UNAVAILABLE .withDescription (
337327 "Unknown type in cluster " + clusterName + " " + cdsUpdate .clusterType ())));
338328 }
339- if (clusters .containsKey (clusterName )) {
340- // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
341- // present. We don't want to overwrite it with a non-error value.
342- return ;
343- }
344329 clusters .put (clusterName , StatusOr .fromValue (
345330 new XdsConfig .XdsClusterConfig (clusterName , cdsUpdate , child )));
346331 }
@@ -422,7 +407,7 @@ public interface XdsConfigWatcher {
422407 void onUpdate (StatusOr <XdsConfig > config );
423408 }
424409
425- private final class ClusterSubscription implements XdsConfig . Subscription {
410+ private final class ClusterSubscription implements Closeable {
426411 private final String clusterName ;
427412 boolean closed ; // Accessed from syncContext
428413
@@ -435,7 +420,7 @@ String getClusterName() {
435420 }
436421
437422 @ Override
438- public void close () {
423+ public void close () throws IOException {
439424 releaseSubscription (this );
440425 }
441426 }
0 commit comments