Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions internal/grpctest/tlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ type tLogger struct {
v int
initialized bool

mu sync.Mutex // guards t, start, and errors
t *testing.T
start time.Time
errors map[*regexp.Regexp]int
mu sync.Mutex
t *testing.T
start time.Time
logs map[logType]map[*regexp.Regexp]int
}

func init() {
Expand All @@ -87,7 +87,11 @@ func init() {
}
}
// Initialize tLogr with the determined verbosity level.
tLogr = &tLogger{errors: make(map[*regexp.Regexp]int), v: vLevel}
logsMap := map[logType]map[*regexp.Regexp]int{
errorLog: {},
warningLog: {},
}
tLogr = &tLogger{logs: logsMap, v: vLevel}
}

// getCallingPrefix returns the <file:line> at the given depth from the stack.
Expand Down Expand Up @@ -115,11 +119,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) {
switch ltype {
case errorLog:
// fmt.Sprintln is used rather than fmt.Sprint because tl.Log uses fmt.Sprintln behavior.
if tl.expected(fmt.Sprintln(args...)) {
if tl.expected(fmt.Sprintln(args...), errorLog) {
tl.t.Log(args...)
} else {
tl.t.Error(args...)
}
case warningLog:
tl.expected(fmt.Sprintln(args...), warningLog)
tl.t.Log(args...)
case fatalLog:
panic(fmt.Sprint(args...))
default:
Expand All @@ -130,11 +137,14 @@ func (tl *tLogger) log(ltype logType, depth int, format string, args ...any) {
format = "%v " + format + "%s"
switch ltype {
case errorLog:
if tl.expected(fmt.Sprintf(format, args...)) {
if tl.expected(fmt.Sprintf(format, args...), errorLog) {
tl.t.Logf(format, args...)
} else {
tl.t.Errorf(format, args...)
}
case warningLog:
tl.expected(fmt.Sprintln(args...), warningLog)
tl.t.Log(args...)
case fatalLog:
panic(fmt.Sprintf(format, args...))
default:
Expand All @@ -154,7 +164,8 @@ func (tl *tLogger) update(t *testing.T) {
}
tl.t = t
tl.start = time.Now()
tl.errors = map[*regexp.Regexp]int{}
tl.logs[errorLog] = map[*regexp.Regexp]int{}
tl.logs[warningLog] = map[*regexp.Regexp]int{}
}

// ExpectError declares an error to be expected. For the next test, the first
Expand All @@ -163,40 +174,56 @@ func (tl *tLogger) update(t *testing.T) {
// Update(). Note that if an expected error is not encountered, this will cause
// the test to fail.
func ExpectError(expr string) {
ExpectErrorN(expr, 1)
expectLogsN(expr, 1, errorLog)
}

// ExpectErrorN declares an error to be expected n times.
func ExpectErrorN(expr string, n int) {
expectLogsN(expr, n, errorLog)
}

// ExpectWarning declares a warning to be expected.
func ExpectWarning(expr string) {
expectLogsN(expr, 1, warningLog)
}

func expectLogsN(expr string, n int, logType logType) {
tLogr.mu.Lock()
defer tLogr.mu.Unlock()
re, err := regexp.Compile(expr)
if err != nil {
tLogr.t.Error(err)
return
}
tLogr.errors[re] += n
tLogr.logs[logType][re] += n
}

// endTest checks if expected errors were not encountered.
func (tl *tLogger) endTest(t *testing.T) {
tl.mu.Lock()
defer tl.mu.Unlock()
for re, count := range tl.errors {
for re, count := range tl.logs[errorLog] {
if count > 0 {
t.Errorf("Expected error '%v' not encountered", re.String())
}
}
tl.errors = map[*regexp.Regexp]int{}
for re, count := range tl.logs[warningLog] {
if count > 0 {
t.Errorf("Expected warning '%v' not encountered", re.String())
}
}
tl.logs[errorLog] = map[*regexp.Regexp]int{}
tl.logs[warningLog] = map[*regexp.Regexp]int{}
}

// expected determines if the error string is protected or not.
func (tl *tLogger) expected(s string) bool {
for re, count := range tl.errors {
// expected determines if the log string of the particular type is protected or
// not.
func (tl *tLogger) expected(s string, logType logType) bool {
for re, count := range tl.logs[logType] {
if re.FindStringIndex(s) != nil {
tl.errors[re]--
tl.logs[logType][re]--
if count <= 1 {
delete(tl.errors, re)
delete(tl.logs[logType], re)
}
return true
}
Expand Down
113 changes: 113 additions & 0 deletions internal/xds/xdsclient/xdsresource/xdsconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdsresource

import "google.golang.org/grpc/resolver"

// XDSConfig holds the complete gRPC client-side xDS configuration containing
// all necessary resources.
type XDSConfig struct {
// Listener holds the listener configuration. It is guaranteed to be
// non-nil.
Listener *ListenerUpdate

// RouteConfig holds the route configuration. It will be populated even if
// the route configuration was inlined into the Listener resource. It is
// guaranteed to be non-nil.
RouteConfig *RouteConfigUpdate

// VirtualHost is selected from the route configuration whose domain field
// offers the best match against the provided dataplane authority. It is
// guaranteed to be non-nil.
VirtualHost *VirtualHost

// Clusters is a map from cluster name to its configuration.
Clusters map[string]*ClusterResult
}

// ClusterResult contains a cluster's configuration when a valid resource is
// received from the management server. It contains an error when:
// - an invalid resource is received from the management server and
// a valid resource was not already present or
// - the cluster resource does not exist on the management server
type ClusterResult struct {
Config ClusterConfig
Err error
}

// ClusterConfig contains configuration for a single cluster.
type ClusterConfig struct {
// Cluster configuration for the cluster. This field is always set to a
// non-nil value.
Cluster *ClusterUpdate
// EndpointConfig contains endpoint configuration for a leaf cluster. This
// field is only set for EDS and LOGICAL_DNS clusters.
EndpointConfig *EndpointConfig
// AggregateConfig contains configuration for an aggregate cluster. This
// field is only set for AGGREGATE clusters.
AggregateConfig *AggregateConfig
}

// AggregateConfig holds the configuration for an aggregate cluster.
type AggregateConfig struct {
// LeafClusters contains a prioritized list of names of the leaf clusters
// for the cluster.
LeafClusters []string
}

// EndpointConfig contains configuration corresponding to the endpoints in a
// cluster. Only one of EDSUpdate or DNSEndpoints will be populated based on the
// cluster type.
type EndpointConfig struct {
// Endpoint configurartion for the EDS clusters.
EDSUpdate *EndpointsUpdate
// Endpoint configuration for the LOGICAL_DNS clusters.
DNSEndpoints *DNSUpdate
// ResolutionNote stores error encountered while obtaining endpoints data for the cluster. It may contain a nil value when a valid endpoint datais received. It contains an error when:
// - an invalid resource is received from the management server or
// - the endpoint resource does not exist on the management server
ResolutionNote error
}

// DNSUpdate represents the result of a DNS resolution, containing a list of
// discovered endpoints. This is only populated for the LOGICAL_DNS clusters.
type DNSUpdate struct {
// Endpoints is the complete list of endpoints returned by the DNS resolver.
Endpoints []resolver.Endpoint
}

// xdsConfigkey is the type used as the key to store XDSConfig in the Attributes
// field of resolver.State.
type xdsConfigkey struct{}

// SetXDSConfig returns a copy of state in which the Attributes field is updated
// with the XDSConfig.
func SetXDSConfig(state resolver.State, config *XDSConfig) resolver.State {
state.Attributes = state.Attributes.WithValue(xdsConfigkey{}, config)
return state
}

// XDSConfigFromResolverState returns XDSConfig stored as an attribute in the
// resolver state.
func XDSConfigFromResolverState(state resolver.State) *XDSConfig {
state.Attributes.Value(xdsConfigkey{})
if v := state.Attributes.Value(xdsConfigkey{}); v != nil {
return v.(*XDSConfig)
}
return nil
}
115 changes: 115 additions & 0 deletions internal/xds/xdsdepmgr/watch_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xdsdepmgr

import (
"sync/atomic"

"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
)

type listenerWatcher struct {
resourceName string
cancel func()
depMgr *DependencyManager
stopped atomic.Bool
}

func newListenerWatcher(resourceName string, depMgr *DependencyManager) *listenerWatcher {
lw := &listenerWatcher{resourceName: resourceName, depMgr: depMgr}
lw.cancel = xdsresource.WatchListener(depMgr.xdsClient, resourceName, lw)
return lw
}

func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) {
if l.stopped.Load() {
onDone()
return
}
l.depMgr.onListenerResourceUpdate(update, onDone)
}

func (l *listenerWatcher) ResourceError(err error, onDone func()) {
if l.stopped.Load() {
onDone()
return
}
l.depMgr.onListenerResourceError(err, onDone)
}

func (l *listenerWatcher) AmbientError(err error, onDone func()) {
if l.stopped.Load() {
onDone()
return
}
l.depMgr.onListenerResourceAmbientError(err, onDone)
}

func (l *listenerWatcher) stop() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a guarantee that stop is not called concurrently with ResourceChanged, ResourceError or other methods? If so, we should document why this is the case so that this assumption is not broken in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only time stop() for listenerWatcher is called is from the dependency manager Close() function where we wait for all the callbacks to complete and cancel the serializer so that nothing new can be scheduled. Added a comment in both stop() functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, again coming back to the point of not needing the serializer. This means that access to this field needs to be guarded by a mutex or make it an atomic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed to an atomic. Also after the change it is not guaranteed to be called concurrently anymore since we removed the serializer. So I have removed the comment too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AmbientError is called by the xds client. Calls from the xDS client are expected to be serialized. stop is called by the xDS dependency manager.

The code is checking if l.isCancelled in AmbientError (outside the xDS dependency manager's serializer), while the field is being set (l.isCancelled = true) by a call from the dependency manager. This means that a data race seems possible if the xDS client calls AmbientError at the same time as the dependency manager calls stop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the xds client guarantee that it will not call watchers after they have unsubscribed? If it does, do we really need an isCancelled boolean to guard against calls after calling l.cancel()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the main intention behind the code in dependency manager that started the conversation about this change was we were discarding the update from the cancelled watcher (by checking if it is the current name we got in the listener) because it could be the case where the old RDS watcher already got an update before we could cancel the watch completely but since we have a new route name in LDS, we do not want to use that update anymore and just discard it.

This change will ensure that as soon as we cancel it, even if we get an update before we get a chance to unwatch the resource, it will not be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the xds client guarantee that it will not call watchers after they have unsubscribed

This is the docstring from the XDSClient interface for the WatchResource method:

	// During a race (e.g. an xDS response is received while the user is calling
	// cancel()), there's a small window where the callback can be called after
	// the watcher is canceled. Callers need to handle this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an atomic is sufficient here. We need to think about "what guarantee does the stop() method make to its callers?".

Usually, the guarantee is that once stop() returns, no more calls will be made to the caller of stop(), the Dependency Manager here.

Does this guarantee hold with an atomic?

Consider the following:

  1. ResourceChanged is called by the xds client. It checks r.stopped.Load(), which is false and continues.
  2. At the same time, stop() is called by the Dependency Manager. It sets l.stopped.Store(true), executes the entire function body, and returns.
  3. Now ResourceChanged resumes execution, incorrectly assuming r.stopped is still false and calling the Dependency Manager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One solution is to use a mutex that needs to be held while making calls into the DependencyManager. The same mutex must be held while reading and writing to the stopped boolean.

I haven't looked into the implementation of the XDSClient to see how the race mentioned in the comment is triggered. Maybe it's possible to prevent the race in the XDSClient instead of pushing this responsibility to the watchers.

l.stopped.Store(true)
l.cancel()
if l.depMgr.logger.V(2) {
l.depMgr.logger.Infof("Canceling watch on Listener resource %q", l.resourceName)
}
}

type routeConfigWatcher struct {
resourceName string
cancel func()
depMgr *DependencyManager
stopped atomic.Bool
}

func newRouteConfigWatcher(resourceName string, depMgr *DependencyManager) *routeConfigWatcher {
rw := &routeConfigWatcher{resourceName: resourceName, depMgr: depMgr}
rw.cancel = xdsresource.WatchRouteConfig(depMgr.xdsClient, resourceName, rw)
return rw
}

func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) {
if r.stopped.Load() {
onDone()
return
}
r.depMgr.onRouteConfigResourceUpdate(r.resourceName, u, onDone)
}

func (r *routeConfigWatcher) ResourceError(err error, onDone func()) {
if r.stopped.Load() {
onDone()
return
}
r.depMgr.onRouteConfigResourceError(r.resourceName, err, onDone)
}

func (r *routeConfigWatcher) AmbientError(err error, onDone func()) {
if r.stopped.Load() {
onDone()
return
}
r.depMgr.onRouteConfigResourceAmbientError(r.resourceName, err, onDone)
}

func (r *routeConfigWatcher) stop() {
r.stopped.Store(true)
r.cancel()
if r.depMgr.logger.V(2) {
r.depMgr.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName)
}
}
Loading