Skip to content

Commit 54380b2

Browse files
committed
test to verify both update and error are sent
1 parent c007e8a commit 54380b2

File tree

2 files changed

+52
-17
lines changed

2 files changed

+52
-17
lines changed

xds/internal/xdsclient/authority.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -633,13 +633,19 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
633633
// Always add the new watcher to the set of watchers.
634634
state.watchers[watcher] = true
635635

636-
// If we have a cached copy of the resource, notify the new watcher.
636+
// If we have a cached copy of the resource, notify the new watcher
637+
// immediately.
637638
if state.cache != nil {
638639
if a.logger.V(2) {
639640
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
640641
}
641642
resource := state.cache
642643
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
644+
// If last update was NACK'd, notify the new watcher of error
645+
// immediately as well.
646+
if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil {
647+
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
648+
}
643649
}
644650
cleanup = a.unwatchResource(rType, resourceName, watcher)
645651
}, func() {

xds/internal/xdsclient/tests/lds_watchers_test.go

+45-16
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,39 @@ func (cw *listenerWatcher) OnUpdate(update *xdsresource.ListenerResourceData, on
7777
}
7878

7979
func (cw *listenerWatcher) OnError(err error, onDone xdsresource.OnDoneFunc) {
80+
cw.updateCh.Replace(listenerUpdateErrTuple{err: err})
81+
onDone()
82+
}
83+
84+
func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
85+
cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
86+
onDone()
87+
}
88+
89+
type listenerWatcherMultiple struct {
90+
updateCh *testutils.Channel
91+
}
92+
93+
func newListenerWatcherMultiple() *listenerWatcherMultiple {
94+
return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(2)}
95+
}
96+
97+
func (cw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
98+
cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
99+
onDone()
100+
}
101+
102+
func (cw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) {
80103
// When used with a go-control-plane management server that continuously
81104
// resends resources which are NACKed by the xDS client, using a `Replace()`
82105
// here and in OnResourceDoesNotExist() simplifies tests which will have
83106
// access to the most recently received error.
84-
cw.updateCh.Replace(listenerUpdateErrTuple{err: err})
107+
cw.updateCh.Send(listenerUpdateErrTuple{err: err})
85108
onDone()
86109
}
87110

88-
func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
89-
cw.updateCh.Replace(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
111+
func (cw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
112+
cw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
90113
onDone()
91114
}
92115

@@ -923,9 +946,9 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
923946

924947
// TestLDSWatch_ResourceCaching_WithNACKError covers the case where a watch is
925948
// registered for a resource which is already present in the cache with an old
926-
// good update and latest NACK error. The test verifies that new watcher
927-
// receives both good update and error without request being sent to the
928-
// management server.
949+
// good update as well latest NACK error. The test verifies that new watcher
950+
// receives both good update and error without a new resource request being
951+
// sent to the management server.
929952
func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
930953
firstRequestReceived := false
931954
firstAckReceived := grpcsync.NewEvent()
@@ -943,6 +966,13 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
943966
firstAckReceived.Fire()
944967
return nil
945968
}
969+
// If the request version remains "1" while the nonce keeps
970+
// increasing, it indicates the client is repeatedly NACKing
971+
// updates from the server but not sending any new resource
972+
// request.
973+
if req.GetVersionInfo() == "1" {
974+
return nil
975+
}
946976
// Any requests after the first request and ack, are not expected.
947977
secondRequestReceived.Fire()
948978
return nil
@@ -1018,21 +1048,12 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
10181048

10191049
// Register another watch for the same resource. This should get the update
10201050
// and error from the cache.
1021-
lw2 := newListenerWatcher()
1051+
lw2 := newListenerWatcherMultiple()
10221052
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
10231053
defer ldsCancel2()
10241054
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
10251055
t.Fatal(err)
10261056
}
1027-
u, err = lw2.updateCh.Receive(ctx)
1028-
if err != nil {
1029-
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
1030-
}
1031-
gotErr = u.(listenerUpdateErrTuple).err
1032-
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
1033-
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
1034-
}
1035-
10361057
// No request should get sent out as part of this watch.
10371058
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
10381059
defer sCancel()
@@ -1041,6 +1062,14 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
10411062
case <-secondRequestReceived.Done():
10421063
t.Fatal("xdsClient sent out request instead of using update from cache")
10431064
}
1065+
u, err = lw2.updateCh.Receive(ctx)
1066+
if err != nil {
1067+
t.Fatalf("timeout when waiting for a listener resource from the management server: %v", err)
1068+
}
1069+
gotErr = u.(listenerUpdateErrTuple).err
1070+
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
1071+
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
1072+
}
10441073
}
10451074

10461075
// TestLDSWatch_PartialValid covers the case where a response from the

0 commit comments

Comments
 (0)