Skip to content

Conversation

@eshitachandwani
Copy link
Member

Part of A74 changes.
This PR add the functions and functionalities to be used for cluster subscription for cluster refcounts and also for dynamic cluster subscription. These functions will be used in subsequent PRs.
RELEASE NOTES: None

@eshitachandwani eshitachandwani added this to the 1.79 Release milestone Dec 26, 2025
@eshitachandwani eshitachandwani added Type: Internal Cleanup Refactors, etc Area: xDS Includes everything xDS related, including LB policies used with xDS. labels Dec 26, 2025
@codecov
Copy link

codecov bot commented Dec 26, 2025

Codecov Report

❌ Patch coverage is 69.56522% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.30%. Comparing base (4046676) to head (30d9ec7).
⚠️ Report is 8 commits behind head on master.

Files with missing lines Patch % Lines
internal/xds/xdsdepmgr/xds_dependency_manager.go 65.78% 11 Missing and 2 partials ⚠️
internal/xds/resolver/xds_resolver.go 87.50% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8792      +/-   ##
==========================================
- Coverage   83.42%   83.30%   -0.12%     
==========================================
  Files         418      417       -1     
  Lines       32897    33018     +121     
==========================================
+ Hits        27443    27507      +64     
- Misses       4069     4101      +32     
- Partials     1385     1410      +25     
Files with missing lines Coverage Δ
internal/xds/resolver/xds_resolver.go 88.70% <87.50%> (-0.06%) ⬇️
internal/xds/xdsdepmgr/xds_dependency_manager.go 80.47% <65.78%> (-0.41%) ⬇️

... and 42 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines 153 to 155
r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r)
// Initialize the dependency manager in a serializer because it may be
// accessed concurrently when creating multiple concurrent channels.
r.serializer.TrySchedule(func(context.Context) { r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r) })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only ref to r at this point is in this function. So it seems impossible that another goroutine can concurrently access r.dm here. Why do we need to initialize it in a serializer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a race when creating concurrent channels like in TestConcurrentChannels. When we create many channels, the XDSClient already has all the resources and sends the resource updates very quickly. And because we started the watch from the New() function of dependency manager itself (which returns a pointer to the dependency manager and is stores in xds_resolver struct to be used later: i.e. set in attribute to be sent to balancer), the dependency manager received updates before the New() function returned causing a race between assignment of dependency manager ti xds_resolver struct and trying to set it as an attribute.
I have fixed the race by creating a new Start() function which will start the listener watch and will be called after new. So the assignment in xds_resolver will happen first and not cause the race.

Let me know what you think.

Copy link
Contributor

@arjan-bal arjan-bal Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that such a race is possible even with a single xDS channel. Using the serializer to set r.dm is still prone to races as ResolveNow accesses the dm field outside the serializer.

Using a Start() method seems fine. An alternate would be to use a channel or grpcsync.Event in the resolver to pause processing of updates from the Dependency Manager until the resolver is initialized.

Update: I thought of a simpler solution. We can schedule a callback in the serializer that blocks until the dm field is set. This would block the serializer and prevent updates from being processed before initialization of the resolver.

Comment on lines 153 to 155
r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r)
// Initialize the dependency manager in a serializer because it may be
// accessed concurrently when creating multiple concurrent channels.
r.serializer.TrySchedule(func(context.Context) { r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r) })
Copy link
Contributor

@arjan-bal arjan-bal Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that such a race is possible even with a single xDS channel. Using the serializer to set r.dm is still prone to races as ResolveNow accesses the dm field outside the serializer.

Using a Start() method seems fine. An alternate would be to use a channel or grpcsync.Event in the resolver to pause processing of updates from the Dependency Manager until the resolver is initialized.

Update: I thought of a simpler solution. We can schedule a callback in the serializer that blocks until the dm field is set. This would block the serializer and prevent updates from being processed before initialization of the resolver.

Comment on lines 959 to 960
// This cluster is no longer in the route config, and it has no more
// references. Now is the time to cancel the watch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment may be misleading. If a cluster is not a root cluster, but it comes under some Aggregated cluster, it will still need a watch. It seems that maybeSendUpdateLocked should correctly handle this case by keeping the watch alive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean? We always only have reference to the root level cluster. We never have references to clusters under any aggregate cluster. All the cases of child clusters are already handled by the maybeSendUpdateLocked function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to this part specifically: Now is the time to cancel the watch.

Consider a case where an aggregate cluster A is present in the route config, and A has a child cluster B. There will be CDS watches for both A and B. Additionally, there is an explicit ClusterSubscription for B from the RLS balancer. When the ClusterSubscription is released, the check _, ok := c.m.clustersFromRouteConfig[c.name]; !ok would pass because B isn't present in the route config directly. However, the watch should not be canceled because its parent, A, is still present in the route config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! CHanged the wording to , we might need to cancel the watch for it.

Comment on lines 952 to 954
// in the DependencyManager. If Unsubscribe is called in a blocking manner while
// handling the update for any resource type, it will deadlock because both the
// update handler and this function have to acquire m.mu.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid mentioning implementation details in the godoc as they may change. Here, we can only mention that a deadlock will occur: "Calling Unsubscribe in a blocking manner while handling an update will lead to a deadlock".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// DependencyManager if the reference count reaches zero.
type ClusterRef struct {
name string
refCount int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put a comment above this field stating that access to this field is protected by the DependencyManager's mutex and hence it doesn't need to be an atomic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 1494 to 1499
if !seenSecondClusterOnly {
if resourcesMatch(resourceNames, []string{clusterB}) {
seenSecondClusterOnly = true
gotOnlySecondCluster.Fire()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be inverted to return early and reduce a level of indentation.

if seenSecondClusterOnly {
    return
}
// remaining code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 959 to 960
// This cluster is no longer in the route config, and it has no more
// references. Now is the time to cancel the watch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to this part specifically: Now is the time to cancel the watch.

Consider a case where an aggregate cluster A is present in the route config, and A has a child cluster B. There will be CDS watches for both A and B. Additionally, there is an explicit ClusterSubscription for B from the RLS balancer. When the ClusterSubscription is released, the check _, ok := c.m.clustersFromRouteConfig[c.name]; !ok would pass because B isn't present in the route config directly. However, the watch should not be canceled because its parent, A, is still present in the route config.

return
case w.updateCh <- cfg:
}
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to spawn a goroutine here? This will result in a violation of the FIFO ordering of updates being written to w.updateCh.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Reverted the change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area: xDS Includes everything xDS related, including LB policies used with xDS. Type: Internal Cleanup Refactors, etc

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants