Skip to content

Conversation

@eshitachandwani
Copy link
Member

This PR moves the LDS and RDS watchers to dependency manager without chaning the current functionality or behaviour. This is a part of implementation of gRFC A74.

RELEASE NOTES: None

@eshitachandwani eshitachandwani added this to the 1.77 Release milestone Oct 14, 2025
@eshitachandwani eshitachandwani added Type: Internal Cleanup Refactors, etc Area: xDS Includes everything xDS related, including LB policies used with xDS. labels Oct 14, 2025
@codecov
Copy link

codecov bot commented Oct 15, 2025

Codecov Report

❌ Patch coverage is 73.12500% with 43 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.33%. Comparing base (7be33f4) to head (d623feb).
⚠️ Report is 5 commits behind head on master.

Files with missing lines Patch % Lines
internal/xds/xdsdependencymanager/watch_service.go 60.41% 12 Missing and 7 partials ⚠️
...xds/xdsdependencymanager/xds_dependency_manager.go 87.32% 6 Missing and 3 partials ⚠️
internal/xds/xdsclient/xdsresource/xdsconfig.go 0.00% 8 Missing ⚠️
internal/grpctest/tlogger.go 78.78% 4 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8651      +/-   ##
==========================================
+ Coverage   83.28%   83.33%   +0.04%     
==========================================
  Files         415      419       +4     
  Lines       32134    32452     +318     
==========================================
+ Hits        26763    27044     +281     
- Misses       4005     4029      +24     
- Partials     1366     1379      +13     
Files with missing lines Coverage Δ
internal/grpctest/tlogger.go 75.51% <78.78%> (+0.31%) ⬆️
internal/xds/xdsclient/xdsresource/xdsconfig.go 0.00% <0.00%> (ø)
...xds/xdsdependencymanager/xds_dependency_manager.go 87.32% <87.32%> (ø)
internal/xds/xdsdependencymanager/watch_service.go 60.41% <60.41%> (ø)

... and 30 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@easwars
Copy link
Contributor

easwars commented Oct 15, 2025

The tests are failing. Is this ready for review?

Copy link
Contributor

@easwars easwars left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at the tests yet. But I guess these comments will give you enough to make progress.

// Only executed in the context of a serializer callback.
func (m *DependencyManager) onListenerResourceError(err error) {
if m.logger.V(2) {
m.logger.Infof("Received resource error for Listener resource %q: %v", m.ldsResourceName, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have some code in the xDS client to ensure that the returned errors contain the xDS node ID. Could you please ensure that that property still holds. Thanks.

}
// Initialize tLogr with the determined verbosity level.
tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), v: vLevel}
tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), warnings: make(map[*regexp.Regexp]int), v: vLevel}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating the code for error and warning logs, can we change the errors map to be slice of maps instead, i.e. []map[*regexp.Regexp]int. We can use the logType enum to index into thls slice of maps.

const (
infoLog logType = iota
warningLog
errorLog
fatalLog
)

If we log an error, we can update logs[errorLog] and if we log a warning we can update logs[warningLog]. This should help make the code DRY.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you!

@@ -0,0 +1,134 @@
/*
*
* Copyright 2020 gRPC authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The date is off by 5 years.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not a new file and just moved from one package to another, I am not sure if we should change the date.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should do one of the two: either use git mv so that version history is maintained, or change the year.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the date.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The date is still 2020. Did you miss pushing a commit or something?

l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) stop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a guarantee that stop is not called concurrently with ResourceChanged, ResourceError or other methods? If so, we should document why this is the case so that this assumption is not broken in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only time stop() for listenerWatcher is called is from the dependency manager Close() function where we wait for all the callbacks to complete and cancel the serializer so that nothing new can be scheduled. Added a comment in both stop() functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, again coming back to the point of not needing the serializer. This means that access to this field needs to be guarded by a mutex or make it an atomic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed to an atomic. Also after the change it is not guaranteed to be called concurrently anymore since we removed the serializer. So I have removed the comment too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AmbientError is called by the xds client. Calls from the xDS client are expected to be serialized. stop is called by the xDS dependency manager.

The code is checking if l.isCancelled in AmbientError (outside the xDS dependency manager's serializer), while the field is being set (l.isCancelled = true) by a call from the dependency manager. This means that a data race seems possible if the xDS client calls AmbientError at the same time as the dependency manager calls stop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the xds client guarantee that it will not call watchers after they have unsubscribed? If it does, do we really need an isCancelled boolean to guard against calls after calling l.cancel()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the main intention behind the code in dependency manager that started the conversation about this change was we were discarding the update from the cancelled watcher (by checking if it is the current name we got in the listener) because it could be the case where the old RDS watcher already got an update before we could cancel the watch completely but since we have a new route name in LDS, we do not want to use that update anymore and just discard it.

This change will ensure that as soon as we cancel it, even if we get an update before we get a chance to unwatch the resource, it will not be used.

@arjan-bal
Copy link
Contributor

@eshitachandwani it looks like you need to merge the master branch and resolve the build failures.

@eshitachandwani
Copy link
Member Author

Thank you for pointing it out @arjan-bal , I have fixed it now!

// ClusterConfig contains configuration for a single cluster.
type ClusterConfig struct {
// Cluster configuration for the cluster. This field is always set to a
// non-zero value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: non-nil value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I suggested "non-zero" value when the fields was not a pointer. Since it's a pointer now, we can be more specific and say "non-nil".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.


// AggregateConfig holds the configuration for an aggregate cluster.
type AggregateConfig struct {
// LeafClusters specifies the names of the leaf clusters for the cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// LeafClusters specifies the names of the leaf clusters for the cluster.
// LeafClusters contains a prioritized list of underlying cluster names.

Here, and above, we cannot assume that an aggregate cluster only contains leaf clusters. Aggregate clusters may contain other aggregate clusters too.

The list of child clusters in an aggregate is prioritized. This is an important detail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! But we don't store names of every child for aggregate cluster, only store its leaf clusters. If an aggregate cluster has a child which is also an aggregate cluster, we recursively populate only its leaf clusters.

}

// xdsConfigkey is the type used as the key to store XDSConfig in the Attributes
// field of resolver.states.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/resolver.states/resolver.State/

return state
}

// XDSConfigFromResolverState returns XDSConfig stored in attribute in resolver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please fix grammar here. Either say "stored in the Attributes field" or say "stored as attributes" ( ... in state)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

// ID.
func (m *DependencyManager) annotateErrorWithNodeID(err error) error {
nodeID := m.xdsClient.BootstrapConfig().Node().GetId()
return fmt.Errorf("[xDS node id: %v]: %w", nodeID, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want to wrap the error here with a %w verb. I think %v should be good enough. Any reason for the wrapping?

I see that the resolver is doing that though. We should be certain whether we want the wrapping or not, since wrapping errors makes them part of the API.

See: go/go-style/best-practices#error-extra-info

l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) stop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, again coming back to the point of not needing the serializer. This means that access to this field needs to be guarded by a mutex or make it an atomic.

type listenerWatcher struct {
resourceName string
cancel func()
isCancelled bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/isCancelled/stopped? or s/isCancelled/cancelled.

Although stopped is shorter and conveys the meaning in a better way since the method that mutates this field is called stop anyways.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@easwars easwars assigned eshitachandwani and unassigned easwars Oct 30, 2025
}

// newDependencyManagerForTest creates a new DependencyManager for testing purposes.
func newDependencyManagerForTest(t *testing.T, listenerName string, target string, bootstrapContents []byte, watcher ConfigWatcher) *DependencyManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really only creating the xDS client. Everything else seems to be pass through. So, I'd recommend changing this function to be something like createXDSClient, and have the dependency manager be created in the main test goroutine by directly calling New.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return &s
}

// testWatcher is a mock implementation of the ConfigWatcher interface that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no mocking happening here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Changed!

Comment on lines 65 to 70
cmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(xdsresource.HTTPFilter{}, "Filter", "Config"),
cmpopts.IgnoreFields(xdsresource.ListenerUpdate{}, "Raw"),
cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw"),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a helper function that takes a context, a channel to read the XDSConfig from, and a wantXDSConfig. If we have that, all tests can call that, and these options can be local to that function. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 72 to 92
wantXdsConfig = xdsresource.XDSConfig{
Listener: &xdsresource.ListenerUpdate{
RouteConfigName: defaultTestRouteConfigName,
HTTPFilters: []xdsresource.HTTPFilter{{Name: "router"}}},
RouteConfig: &xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{defaultTestServiceName},
Routes: []*xdsresource.Route{{Prefix: newStringP("/"),
WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
ActionType: xdsresource.RouteActionRoute}},
},
},
},
VirtualHost: &xdsresource.VirtualHost{
Domains: []string{defaultTestServiceName},
Routes: []*xdsresource.Route{{Prefix: newStringP("/"),
WeightedClusters: []xdsresource.WeightedCluster{{Name: defaultTestClusterName, Weight: 100}},
ActionType: xdsresource.RouteActionRoute}},
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd really like to not see these global structs in tests. We should ideally move them to individual tests even if that means more code. The reason I'm saying this is also because once we add the CDS and EDS functionalities here, we are going to have a whole bunch of globals here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing it , but when we get EDS and CDS resources too, the struct is going to be bigger and similar struct in each test is going to make code ever more.


func verifyError(gotErr error, wantErr, wantNodeID string) error {
if gotErr == nil {
return fmt.Errorf("got nil error from resolver, want error")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include the wantErr string in the returned error message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

}},
}
resources.Listeners = []*v3listenerpb.Listener{lis}
resources.Routes = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to set the routes to nil here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

// can be sure the abmient error reaches the dependency manager since there is
// no other way to wait for it .
func (s) TestAmbientError(t *testing.T) {
grpctest.ExpectWarning("Listener resource ambient error")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is very easy to miss, and this is the core of this test. Please add a comment to bring the attention of the reader to this line. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

case <-ctx.Done():
t.Fatal("Timeout waiting for initial update from dependency manager")
case update := <-updateCh:
if gotCluster := update.VirtualHost.Routes[0].WeightedClusters[0]; gotCluster.Name != defaultTestClusterName || gotCluster.Weight != 100 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and down below, please verify the full config struct. Please see: go/go-style/decisions#compare-full-structures

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a test for a case where the Listener contains a route configuration, but then receives an update that changes it to be inlined. We can verify that OnUpdate is called with the newly updated config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added a test that receives RDS from management server then changed to inline then again changed to receive from management server to test both cases.

*
*/

package xdsdepmgr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be in package xdsdepmgr_test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
// Initialize tLogr with the determined verbosity level.
tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), v: vLevel}
tLogr = &tLogger{logs: []map[*regexp.Regexp]int{{}, {}, {}, {}}, v: vLevel}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This declaration seems a little unreadable. How about changing the data structure from a slice of maps to a map of maps keyed by the logType (map[logType]map[*regexp.Regexp]int)? This would make it explicit that the first key is the logType and the second is the regex. The initialization could also be more explicit:

	logsMap := map[logType]map[*regexp.Regexp]int{}
	logsMap[errorLog] = map[*regexp.Regexp]int{}
	logsMap[warningLog] = map[*regexp.Regexp]int{}
	tLogr = &tLogger{
		logs: logsMap,
		v:    vLevel,
	}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// ExpectWarning declares a warning to be expected.
func ExpectWarning(expr string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we re-use make the ExpectErrorN more general by accepting a logType param and make it handle warning logs also?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpectErrorN is used in alot of places and is specific to error logs, so I have created I helper function named expectLogsN to abstract the common parts.

Comment on lines 43 to 47
// ClusterResult contains a cluster's configuration when we receive a valid
// resource from the management server. It contains an error when:
// - we receive an invalid resource from the management server and
// we did not already have a valid resource or
// - the cluster resource does not exist on the management server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, it's best to avoid using "we" in public-facing documentation like godocs. It can be ambiguous (who is "we"? The code? The library? The developer?) and sounds more informal. Using the passive voice is a common and clear pattern.
For example, instead of:

// - we receive an invalid resource from the management server and
// we did not already have a valid resource or

You could write:

// - an invalid resource is received from the management server and
// a valid resource was not already present or

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed and noted. Thank you.

// ClusterConfig contains configuration for a single cluster.
type ClusterConfig struct {
// Cluster configuration for the cluster. This field is always set to a
// non-zero value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I suggested "non-zero" value when the fields was not a pointer. Since it's a pointer now, we can be more specific and say "non-nil".

// Endpoint configurartion for the EDS type cluster.
EDSUpdate *EndpointsUpdate
// Endpoint configuration for the LOGICAL_DNS type cluster.
DNSEndpoints DNSUpdate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this field is optional, it should also be a pointer, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Changed.

Comment on lines 80 to 81
// Stores error encountered while obtaining endpoints data for the cluster.
ResolutionNote error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add more details about when the resolution note may be present and its intended use. In the Go resolver API, there's a ResolverError function that acts as a resolution note used by c-core, so adding more details should help readers who aren't aware of c-core's resolution note.

}

// DNSUpdate represents the result of a DNS resolution, containing a list of
// discovered endpoints. This is only populated for the LOGICAL_DNS cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only populated for the LOGICAL_DNS cluster type.

The line above should be present struct field in EndpointConfig instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there too right?
// Endpoint configuration for the LOGICAL_DNS type cluster.

And the comment for EndpointConfig also says that :

// Only one of EDSUpdate or DNSEndpoints will be populated based on the cluster type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more common for the logger to be declared at the top of the file, with the same name as the package. In my opinion, creating a new file for 5 lines of code is pretty inefficient. I would be interested to know if there's any benefit to placing the logger in a separate file.

l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) stop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AmbientError is called by the xds client. Calls from the xDS client are expected to be serialized. stop is called by the xDS dependency manager.

The code is checking if l.isCancelled in AmbientError (outside the xDS dependency manager's serializer), while the field is being set (l.isCancelled = true) by a call from the dependency manager. This means that a data race seems possible if the xDS client calls AmbientError at the same time as the dependency manager calls stop.

l.parent.serializer.ScheduleOr(handleError, onDone)
}

func (l *listenerWatcher) stop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the xds client guarantee that it will not call watchers after they have unsubscribed? If it does, do we really need an isCancelled boolean to guard against calls after calling l.cancel()?

@arjan-bal arjan-bal removed their assignment Nov 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area: xDS Includes everything xDS related, including LB policies used with xDS. Type: Internal Cleanup Refactors, etc

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants