3636import io .grpc .xds .client .XdsClient .ResourceWatcher ;
3737import io .grpc .xds .client .XdsResourceType ;
3838import java .util .Collections ;
39- import java .util .EnumMap ;
4039import java .util .HashMap ;
4140import java .util .HashSet ;
4241import java .util .LinkedHashSet ;
5453 * applies to a single data plane authority.
5554 */
5655final class XdsDependencyManager implements XdsConfig .XdsClusterSubscriptionRegistry {
57- private enum TrackedWatcherTypeEnum {
58- LDS , RDS , CDS , EDS
59- }
60-
61- private static final TrackedWatcherType <XdsListenerResource .LdsUpdate > LDS_TYPE =
62- new TrackedWatcherType <>(TrackedWatcherTypeEnum .LDS );
63- private static final TrackedWatcherType <RdsUpdate > RDS_TYPE =
64- new TrackedWatcherType <>(TrackedWatcherTypeEnum .RDS );
65- private static final TrackedWatcherType <XdsClusterResource .CdsUpdate > CDS_TYPE =
66- new TrackedWatcherType <>(TrackedWatcherTypeEnum .CDS );
67- private static final TrackedWatcherType <XdsEndpointResource .EdsUpdate > EDS_TYPE =
68- new TrackedWatcherType <>(TrackedWatcherTypeEnum .EDS );
69-
56+ public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource .getInstance ();
57+ public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource .getInstance ();
7058 private static final int MAX_CLUSTER_RECURSION_DEPTH = 16 ; // Specified by gRFC A37
7159 private final String listenerName ;
7260 private final XdsClient xdsClient ;
@@ -75,8 +63,7 @@ private enum TrackedWatcherTypeEnum {
7563 private XdsConfigWatcher xdsConfigWatcher ;
7664
7765 private StatusOr <XdsConfig > lastUpdate = null ;
78- private final Map <TrackedWatcherTypeEnum , TypeWatchers <?>> resourceWatchers =
79- new EnumMap <>(TrackedWatcherTypeEnum .class );
66+ private final Map <XdsResourceType <?>, TypeWatchers <?>> resourceWatchers = new HashMap <>();
8067 private final Set <ClusterSubscription > subscriptions = new HashSet <>();
8168
8269 XdsDependencyManager (XdsClient xdsClient ,
@@ -99,7 +86,7 @@ public void start(XdsConfigWatcher xdsConfigWatcher) {
9986 checkState (this .xdsConfigWatcher == null , "dep manager may not be restarted" );
10087 this .xdsConfigWatcher = checkNotNull (xdsConfigWatcher , "xdsConfigWatcher" );
10188 // start the ball rolling
102- syncContext .execute (() -> addWatcher (LDS_TYPE , new LdsWatcher (listenerName )));
89+ syncContext .execute (() -> addWatcher (new LdsWatcher (listenerName )));
10390 }
10491
10592 @ Override
@@ -109,7 +96,7 @@ public XdsConfig.Subscription subscribeToCluster(String clusterName) {
10996 ClusterSubscription subscription = new ClusterSubscription (clusterName );
11097
11198 syncContext .execute (() -> {
112- if (getWatchers (LDS_TYPE ).isEmpty ()) {
99+ if (getWatchers (XdsListenerResource . getInstance () ).isEmpty ()) {
113100 subscription .closed = true ;
114101 return ; // shutdown() called
115102 }
@@ -120,28 +107,33 @@ public XdsConfig.Subscription subscribeToCluster(String clusterName) {
120107 return subscription ;
121108 }
122109
123- private <T extends ResourceUpdate > void addWatcher (
124- TrackedWatcherType <T > watcherType , XdsWatcherBase <T > watcher ) {
110+ private <T extends ResourceUpdate > void addWatcher (XdsWatcherBase <T > watcher ) {
125111 syncContext .throwIfNotInThisSynchronizationContext ();
126112 XdsResourceType <T > type = watcher .type ;
127113 String resourceName = watcher .resourceName ;
128114
129- getWatchers (watcherType ).put (resourceName , watcher );
115+ getWatchers (type ).put (resourceName , watcher );
130116 xdsClient .watchXdsResource (type , resourceName , watcher , syncContext );
131117 }
132118
133119 public void shutdown () {
134120 syncContext .execute (() -> {
135121 for (TypeWatchers <?> watchers : resourceWatchers .values ()) {
136- for (TrackedWatcher <?> watcher : watchers .watchers .values ()) {
137- watcher .close ();
138- }
122+ shutdownWatchersForType (watchers );
139123 }
140124 resourceWatchers .clear ();
141125 subscriptions .clear ();
142126 });
143127 }
144128
129+ private <T extends ResourceUpdate > void shutdownWatchersForType (TypeWatchers <T > watchers ) {
130+ for (Map .Entry <String , XdsWatcherBase <T >> watcherEntry : watchers .watchers .entrySet ()) {
131+ xdsClient .cancelXdsResourceWatch (watchers .resourceType , watcherEntry .getKey (),
132+ watcherEntry .getValue ());
133+ watcherEntry .getValue ().cancelled = true ;
134+ }
135+ }
136+
145137 private void releaseSubscription (ClusterSubscription subscription ) {
146138 checkNotNull (subscription , "subscription" );
147139 syncContext .execute (() -> {
@@ -162,12 +154,12 @@ private void releaseSubscription(ClusterSubscription subscription) {
162154 */
163155 private void maybePublishConfig () {
164156 syncContext .throwIfNotInThisSynchronizationContext ();
165- if (getWatchers (LDS_TYPE ).isEmpty ()) {
157+ if (getWatchers (XdsListenerResource . getInstance () ).isEmpty ()) {
166158 return ; // shutdown() called
167159 }
168160 boolean waitingOnResource = resourceWatchers .values ().stream ()
169161 .flatMap (typeWatchers -> typeWatchers .watchers .values ().stream ())
170- .anyMatch (TrackedWatcher ::missingResult );
162+ .anyMatch (XdsWatcherBase ::missingResult );
171163 if (waitingOnResource ) {
172164 return ;
173165 }
@@ -202,8 +194,8 @@ private static StatusOr<XdsConfig> buildUpdate(
202194
203195 // Iterate watchers and build the XdsConfig
204196
205- TrackedWatcher <XdsListenerResource .LdsUpdate > ldsWatcher
206- = tracer .getWatcher (LDS_TYPE , listenerName );
197+ XdsWatcherBase <XdsListenerResource .LdsUpdate > ldsWatcher
198+ = tracer .getWatcher (XdsListenerResource . getInstance () , listenerName );
207199 if (ldsWatcher == null ) {
208200 return StatusOr .fromStatus (Status .UNAVAILABLE .withDescription (
209201 "Bug: No listener watcher found for " + listenerName ));
@@ -249,13 +241,14 @@ private static StatusOr<XdsConfig> buildUpdate(
249241 return StatusOr .fromValue (builder .build ());
250242 }
251243
252- private <T > Map <String , TrackedWatcher <T >> getWatchers (TrackedWatcherType <T > watcherType ) {
253- TypeWatchers <?> typeWatchers = resourceWatchers .get (watcherType .typeEnum );
244+ private <T extends ResourceUpdate > Map <String , XdsWatcherBase <T >> getWatchers (
245+ XdsResourceType <T > resourceType ) {
246+ TypeWatchers <?> typeWatchers = resourceWatchers .get (resourceType );
254247 if (typeWatchers == null ) {
255- typeWatchers = new TypeWatchers <T >(watcherType );
256- resourceWatchers .put (watcherType . typeEnum , typeWatchers );
248+ typeWatchers = new TypeWatchers <T >(resourceType );
249+ resourceWatchers .put (resourceType , typeWatchers );
257250 }
258- assert typeWatchers .watcherType == watcherType ;
251+ assert typeWatchers .resourceType == resourceType ;
259252 @ SuppressWarnings ("unchecked" )
260253 TypeWatchers <T > tTypeWatchers = (TypeWatchers <T >) typeWatchers ;
261254 return tTypeWatchers .watchers ;
@@ -282,7 +275,7 @@ private static void addConfigForCluster(
282275 return ;
283276 }
284277
285- CdsWatcher cdsWatcher = (CdsWatcher ) tracer .getWatcher (CDS_TYPE , clusterName );
278+ CdsWatcher cdsWatcher = (CdsWatcher ) tracer .getWatcher (CLUSTER_RESOURCE , clusterName );
286279 StatusOr <XdsClusterResource .CdsUpdate > cdsWatcherDataOr = cdsWatcher .getData ();
287280 if (!cdsWatcherDataOr .hasValue ()) {
288281 clusters .put (clusterName , StatusOr .fromStatus (cdsWatcherDataOr .getStatus ()));
@@ -325,8 +318,8 @@ private static void addConfigForCluster(
325318 child = new AggregateConfig (ImmutableList .copyOf (leafNames ));
326319 break ;
327320 case EDS :
328- TrackedWatcher <XdsEndpointResource .EdsUpdate > edsWatcher =
329- tracer .getWatcher (EDS_TYPE , cdsWatcher .getEdsServiceName ());
321+ XdsWatcherBase <XdsEndpointResource .EdsUpdate > edsWatcher =
322+ tracer .getWatcher (ENDPOINT_RESOURCE , cdsWatcher .getEdsServiceName ());
330323 if (edsWatcher != null ) {
331324 child = new EndpointConfig (edsWatcher .getData ());
332325 } else {
@@ -353,27 +346,27 @@ private static void addConfigForCluster(
353346 }
354347
355348 private void addRdsWatcher (String resourceName ) {
356- if (getWatchers (RDS_TYPE ).containsKey (resourceName )) {
349+ if (getWatchers (XdsRouteConfigureResource . getInstance () ).containsKey (resourceName )) {
357350 return ;
358351 }
359352
360- addWatcher (RDS_TYPE , new RdsWatcher (resourceName ));
353+ addWatcher (new RdsWatcher (resourceName ));
361354 }
362355
363356 private void addEdsWatcher (String edsServiceName ) {
364- if (getWatchers (EDS_TYPE ).containsKey (edsServiceName )) {
357+ if (getWatchers (XdsEndpointResource . getInstance () ).containsKey (edsServiceName )) {
365358 return ;
366359 }
367360
368- addWatcher (EDS_TYPE , new EdsWatcher (edsServiceName ));
361+ addWatcher (new EdsWatcher (edsServiceName ));
369362 }
370363
371364 private void addClusterWatcher (String clusterName ) {
372- if (getWatchers (CDS_TYPE ).containsKey (clusterName )) {
365+ if (getWatchers (CLUSTER_RESOURCE ).containsKey (clusterName )) {
373366 return ;
374367 }
375368
376- addWatcher (CDS_TYPE , new CdsWatcher (clusterName ));
369+ addWatcher (new CdsWatcher (clusterName ));
377370 }
378371
379372 private void updateRoutes (List <VirtualHost > virtualHosts ) {
@@ -411,13 +404,13 @@ private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHos
411404 return clusters ;
412405 }
413406
414- private static class TypeWatchers <T > {
407+ private static class TypeWatchers <T extends ResourceUpdate > {
415408 // Key is resource name
416- final Map <String , TrackedWatcher <T >> watchers = new HashMap <>();
417- final TrackedWatcherType <T > watcherType ;
409+ final Map <String , XdsWatcherBase <T >> watchers = new HashMap <>();
410+ final XdsResourceType <T > resourceType ;
418411
419- TypeWatchers (TrackedWatcherType <T > watcherType ) {
420- this .watcherType = checkNotNull ( watcherType , "watcherType" ) ;
412+ TypeWatchers (XdsResourceType <T > resourceType ) {
413+ this .resourceType = resourceType ;
421414 }
422415 }
423416
@@ -449,46 +442,48 @@ public void close() {
449442
450443 /** State for tracing garbage collector. */
451444 private static final class WatcherTracer {
452- private final Map <TrackedWatcherTypeEnum , TypeWatchers <?>> resourceWatchers ;
453- private final Map <TrackedWatcherTypeEnum , TypeWatchers <?>> usedWatchers ;
445+ private final Map <XdsResourceType <?> , TypeWatchers <?>> resourceWatchers ;
446+ private final Map <XdsResourceType <?> , TypeWatchers <?>> usedWatchers ;
454447
455- public WatcherTracer (Map <TrackedWatcherTypeEnum , TypeWatchers <?>> resourceWatchers ) {
448+ public WatcherTracer (Map <XdsResourceType <?> , TypeWatchers <?>> resourceWatchers ) {
456449 this .resourceWatchers = resourceWatchers ;
457450
458- this .usedWatchers = new EnumMap <>(TrackedWatcherTypeEnum . class );
459- for (Map . Entry < TrackedWatcherTypeEnum , TypeWatchers <?>> me : resourceWatchers .entrySet ()) {
460- usedWatchers .put (me . getKey () , newTypeWatchers (me . getValue (). watcherType ));
451+ this .usedWatchers = new HashMap <>();
452+ for (XdsResourceType <?> type : resourceWatchers .keySet ()) {
453+ usedWatchers .put (type , newTypeWatchers (type ));
461454 }
462455 }
463456
464- private static <T > TypeWatchers <T > newTypeWatchers (TrackedWatcherType <T > type ) {
457+ private static <T extends ResourceUpdate > TypeWatchers <T > newTypeWatchers (
458+ XdsResourceType <T > type ) {
465459 return new TypeWatchers <T >(type );
466460 }
467461
468- public <T > TrackedWatcher <T > getWatcher (TrackedWatcherType <T > watcherType , String name ) {
469- TypeWatchers <?> typeWatchers = resourceWatchers .get (watcherType .typeEnum );
462+ public <T extends ResourceUpdate > XdsWatcherBase <T > getWatcher (
463+ XdsResourceType <T > resourceType , String name ) {
464+ TypeWatchers <?> typeWatchers = resourceWatchers .get (resourceType );
470465 if (typeWatchers == null ) {
471466 return null ;
472467 }
473- assert typeWatchers .watcherType == watcherType ;
468+ assert typeWatchers .resourceType == resourceType ;
474469 @ SuppressWarnings ("unchecked" )
475470 TypeWatchers <T > tTypeWatchers = (TypeWatchers <T >) typeWatchers ;
476- TrackedWatcher <T > watcher = tTypeWatchers .watchers .get (name );
471+ XdsWatcherBase <T > watcher = tTypeWatchers .watchers .get (name );
477472 if (watcher == null ) {
478473 return null ;
479474 }
480475 @ SuppressWarnings ("unchecked" )
481- TypeWatchers <T > usedTypeWatchers = (TypeWatchers <T >) usedWatchers .get (watcherType . typeEnum );
476+ TypeWatchers <T > usedTypeWatchers = (TypeWatchers <T >) usedWatchers .get (resourceType );
482477 usedTypeWatchers .watchers .put (name , watcher );
483478 return watcher ;
484479 }
485480
486481 /** Shut down unused watchers. */
487482 public void closeUnusedWatchers () {
488483 boolean changed = false ; // Help out the GC by preferring old objects
489- for (TrackedWatcherTypeEnum key : resourceWatchers .keySet ()) {
490- TypeWatchers <?> orig = resourceWatchers .get (key );
491- TypeWatchers <?> used = usedWatchers .get (key );
484+ for (XdsResourceType <?> type : resourceWatchers .keySet ()) {
485+ TypeWatchers <?> orig = resourceWatchers .get (type );
486+ TypeWatchers <?> used = usedWatchers .get (type );
492487 for (String name : orig .watchers .keySet ()) {
493488 if (used .watchers .containsKey (name )) {
494489 continue ;
@@ -503,33 +498,8 @@ public void closeUnusedWatchers() {
503498 }
504499 }
505500
506- @ SuppressWarnings ("UnusedTypeParameter" )
507- private static final class TrackedWatcherType <T > {
508- public final TrackedWatcherTypeEnum typeEnum ;
509-
510- public TrackedWatcherType (TrackedWatcherTypeEnum typeEnum ) {
511- this .typeEnum = checkNotNull (typeEnum , "typeEnum" );
512- }
513- }
514-
515- private interface TrackedWatcher <T > {
516- @ Nullable
517- StatusOr <T > getData ();
518-
519- default boolean missingResult () {
520- return getData () == null ;
521- }
522-
523- default boolean hasDataValue () {
524- StatusOr <T > data = getData ();
525- return data != null && data .hasValue ();
526- }
527-
528- void close ();
529- }
530-
531501 private abstract class XdsWatcherBase <T extends ResourceUpdate >
532- implements ResourceWatcher <T >, TrackedWatcher < T > {
502+ implements ResourceWatcher <T > {
533503 private final XdsResourceType <T > type ;
534504 private final String resourceName ;
535505 boolean cancelled ;
@@ -584,18 +554,24 @@ public void onChanged(T update) {
584554
585555 protected abstract void subscribeToChildren (T update );
586556
587- @ Override
588557 public void close () {
589558 cancelled = true ;
590559 xdsClient .cancelXdsResourceWatch (type , resourceName , this );
591560 }
592561
593- @ Override
562+ boolean missingResult () {
563+ return data == null ;
564+ }
565+
594566 @ Nullable
595- public StatusOr <T > getData () {
567+ StatusOr <T > getData () {
596568 return data ;
597569 }
598570
571+ boolean hasDataValue () {
572+ return data != null && data .hasValue ();
573+ }
574+
599575 public String toContextString () {
600576 return toContextStr (type .typeName (), resourceName );
601577 }
@@ -646,7 +622,7 @@ private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTr
646622 if (rdsName == null ) {
647623 return null ;
648624 }
649- return (RdsWatcher ) tracer .getWatcher (RDS_TYPE , rdsName );
625+ return (RdsWatcher ) tracer .getWatcher (XdsRouteConfigureResource . getInstance () , rdsName );
650626 }
651627
652628 public RdsUpdateSupplier getRouteSource (WatcherTracer tracer ) {
@@ -712,7 +688,7 @@ public StatusOr<RdsUpdate> getRdsUpdate() {
712688
713689 private class CdsWatcher extends XdsWatcherBase <XdsClusterResource .CdsUpdate > {
714690 CdsWatcher (String resourceName ) {
715- super (XdsClusterResource . getInstance () , checkNotNull (resourceName , "resourceName" ));
691+ super (CLUSTER_RESOURCE , checkNotNull (resourceName , "resourceName" ));
716692 }
717693
718694 @ Override
@@ -745,7 +721,7 @@ public String getEdsServiceName() {
745721
746722 private class EdsWatcher extends XdsWatcherBase <XdsEndpointResource .EdsUpdate > {
747723 private EdsWatcher (String resourceName ) {
748- super (XdsEndpointResource . getInstance () , checkNotNull (resourceName , "resourceName" ));
724+ super (ENDPOINT_RESOURCE , checkNotNull (resourceName , "resourceName" ));
749725 }
750726
751727 @ Override
0 commit comments