Skip to content

Commit a3c89bb

Browse files
committed
test to verify both update and error are sent
1 parent c007e8a commit a3c89bb

File tree

2 files changed

+39
-39
lines changed

2 files changed

+39
-39
lines changed

xds/internal/xdsclient/authority.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -633,13 +633,19 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
633633
// Always add the new watcher to the set of watchers.
634634
state.watchers[watcher] = true
635635

636-
// If we have a cached copy of the resource, notify the new watcher.
636+
// If we have a cached copy of the resource, notify the new watcher
637+
// immediately.
637638
if state.cache != nil {
638639
if a.logger.V(2) {
639640
a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
640641
}
641642
resource := state.cache
642643
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnUpdate(resource, func() {}) })
644+
// If last update was NACK'd, notify the new watcher of error
645+
// immediately as well.
646+
if state.md.Status == xdsresource.ServiceStatusNACKed && state.md.ErrState != nil {
647+
a.watcherCallbackSerializer.TrySchedule(func(context.Context) { watcher.OnError(state.md.ErrState.Err, func() {}) })
648+
}
643649
}
644650
cleanup = a.unwatchResource(rType, resourceName, watcher)
645651
}, func() {

xds/internal/xdsclient/tests/lds_watchers_test.go

+32-38
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,33 @@ func (cw *listenerWatcher) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc)
9090
onDone()
9191
}
9292

93+
type listenerWatcherMultiple struct {
94+
updateCh *testutils.Channel
95+
}
96+
97+
func newListenerWatcherMultiple() *listenerWatcherMultiple {
98+
return &listenerWatcherMultiple{updateCh: testutils.NewChannelWithSize(2)}
99+
}
100+
101+
func (cw *listenerWatcherMultiple) OnUpdate(update *xdsresource.ListenerResourceData, onDone xdsresource.OnDoneFunc) {
102+
cw.updateCh.Send(listenerUpdateErrTuple{update: update.Resource})
103+
onDone()
104+
}
105+
106+
func (cw *listenerWatcherMultiple) OnError(err error, onDone xdsresource.OnDoneFunc) {
107+
// When used with a go-control-plane management server that continuously
108+
// resends resources which are NACKed by the xDS client, using a `Replace()`
109+
// here and in OnResourceDoesNotExist() simplifies tests which will have
110+
// access to the most recently received error.
111+
cw.updateCh.Send(listenerUpdateErrTuple{err: err})
112+
onDone()
113+
}
114+
115+
func (cw *listenerWatcherMultiple) OnResourceDoesNotExist(onDone xdsresource.OnDoneFunc) {
116+
cw.updateCh.Send(listenerUpdateErrTuple{err: xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "Listener not found in received response")})
117+
onDone()
118+
}
119+
93120
// badListenerResource returns a listener resource for the given name which does
94121
// not contain the `RouteSpecifier` field in the HTTPConnectionManager, and
95122
// hence is expected to be NACKed by the client.
@@ -547,7 +574,7 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
547574
// a resource which is already present in the cache. The test verifies that the
548575
// watch callback is invoked with the contents from the cache, instead of a
549576
// request being sent to the management server.
550-
func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
577+
func TestLDSWatch_ResourceCaching(t *testing.T) {
551578
firstRequestReceived := false
552579
firstAckReceived := grpcsync.NewEvent()
553580
secondRequestReceived := grpcsync.NewEvent()
@@ -926,28 +953,9 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
926953
// good update and latest NACK error. The test verifies that new watcher
927954
// receives both good update and error without request being sent to the
928955
// management server.
929-
func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
930-
firstRequestReceived := false
931-
firstAckReceived := grpcsync.NewEvent()
932-
secondRequestReceived := grpcsync.NewEvent()
956+
func TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
933957

934-
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
935-
OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
936-
// The first request has an empty version string.
937-
if !firstRequestReceived && req.GetVersionInfo() == "" {
938-
firstRequestReceived = true
939-
return nil
940-
}
941-
// The first ack has a non-empty version string.
942-
if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" {
943-
firstAckReceived.Fire()
944-
return nil
945-
}
946-
// Any requests after the first request and ack, are not expected.
947-
secondRequestReceived.Fire()
948-
return nil
949-
},
950-
})
958+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
951959

952960
nodeID := uuid.New().String()
953961
bc := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address)
@@ -975,7 +983,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
975983
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
976984
SkipValidation: true,
977985
}
978-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
986+
ctx, cancel := context.WithTimeout(context.Background(), 1000000*defaultTestTimeout)
979987
defer cancel()
980988
if err := mgmtServer.Update(ctx, resources); err != nil {
981989
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
@@ -990,11 +998,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
990998
if err := verifyListenerUpdate(ctx, lw1.updateCh, wantUpdate); err != nil {
991999
t.Fatal(err)
9921000
}
993-
select {
994-
case <-ctx.Done():
995-
t.Fatal("timeout when waiting for receipt of ACK at the management server")
996-
case <-firstAckReceived.Done():
997-
}
9981001

9991002
// Configure the management server to return a single listener resource
10001003
// which is expected to be NACKed by the client.
@@ -1018,7 +1021,7 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
10181021

10191022
// Register another watch for the same resource. This should get the update
10201023
// and error from the cache.
1021-
lw2 := newListenerWatcher()
1024+
lw2 := newListenerWatcherMultiple()
10221025
ldsCancel2 := xdsresource.WatchListener(client, ldsName, lw2)
10231026
defer ldsCancel2()
10241027
if err := verifyListenerUpdate(ctx, lw2.updateCh, wantUpdate); err != nil {
@@ -1032,15 +1035,6 @@ func (s) TestLDSWatch_ResourceCaching_NACKError(t *testing.T) {
10321035
if gotErr == nil || !strings.Contains(gotErr.Error(), wantListenerNACKErr) {
10331036
t.Fatalf("update received with error: %v, want %q", gotErr, wantListenerNACKErr)
10341037
}
1035-
1036-
// No request should get sent out as part of this watch.
1037-
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
1038-
defer sCancel()
1039-
select {
1040-
case <-sCtx.Done():
1041-
case <-secondRequestReceived.Done():
1042-
t.Fatal("xdsClient sent out request instead of using update from cache")
1043-
}
10441038
}
10451039

10461040
// TestLDSWatch_PartialValid covers the case where a response from the

0 commit comments

Comments
 (0)