From df18b5433b2502ee82a823a5895a807e58025002 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 12 Dec 2019 13:39:56 -0800 Subject: [PATCH] xds: support ack/nack (#3227) The client will send a request with version/nonce after receiving a response, to ack/nack. Ack versions for different xds types are independent. Some other changes - merge sendRequests to one shared function, with fields for version/nonce - deleted enum for xds types, and always use const URL string --- xds/internal/client/cds.go | 23 +- xds/internal/client/cds_test.go | 20 +- xds/internal/client/client_test.go | 7 +- xds/internal/client/eds.go | 21 +- xds/internal/client/eds_test.go | 11 +- xds/internal/client/lds.go | 25 +- xds/internal/client/rds.go | 29 +-- xds/internal/client/types.go | 31 +-- xds/internal/client/v2client.go | 316 ++++++++++++++--------- xds/internal/client/v2client_ack_test.go | 294 +++++++++++++++++++++ xds/internal/client/v2client_test.go | 77 +++--- 11 files changed, 575 insertions(+), 279 deletions(-) create mode 100644 xds/internal/client/v2client_ack_test.go diff --git a/xds/internal/client/cds.go b/xds/internal/client/cds.go index 2449f3dfaf30..a0ffda21e6a3 100644 --- a/xds/internal/client/cds.go +++ b/xds/internal/client/cds.go @@ -23,36 +23,15 @@ import ( xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/golang/protobuf/ptypes" - "google.golang.org/grpc/grpclog" ) -// newCDSRequest generates an CDS request proto for the provided clusterName, -// to be sent out on the wire. -func (v2c *v2Client) newCDSRequest(clusterName []string) *xdspb.DiscoveryRequest { - return &xdspb.DiscoveryRequest{ - Node: v2c.nodeProto, - TypeUrl: clusterURL, - ResourceNames: clusterName, - } -} - -// sendCDS sends an CDS request for provided clusterName on the provided -// stream. -func (v2c *v2Client) sendCDS(stream adsStream, clusterName []string) bool { - if err := stream.Send(v2c.newCDSRequest(clusterName)); err != nil { - grpclog.Warningf("xds: CDS request for resource %v failed: %v", clusterName, err) - return false - } - return true -} - // handleCDSResponse processes an CDS response received from the xDS server. On // receipt of a good response, it also invokes the registered watcher callback. func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { v2c.mu.Lock() defer v2c.mu.Unlock() - wi := v2c.watchMap[cdsResource] + wi := v2c.watchMap[cdsURL] if wi == nil { return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) } diff --git a/xds/internal/client/cds_test.go b/xds/internal/client/cds_test.go index ab4655dd791c..025e8a55e96e 100644 --- a/xds/internal/client/cds_test.go +++ b/xds/internal/client/cds_test.go @@ -397,7 +397,7 @@ func TestCDSCaching(t *testing.T) { }, // Push an empty CDS response. This should clear the cache. { - responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}}, + responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}}, wantOpErr: false, wantCDSCache: map[string]CDSUpdate{}, wantWatchCallback: true, @@ -466,11 +466,11 @@ var ( badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: clusterURL, + TypeUrl: cdsURL, Value: []byte{1, 2, 3, 4}, }, }, - TypeUrl: clusterURL, + TypeUrl: cdsURL, } goodCluster1 = &xdspb.Cluster{ Name: clusterName1, @@ -508,32 +508,32 @@ var ( goodCDSResponse1 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: clusterURL, + TypeUrl: cdsURL, Value: marshaledCluster1, }, }, - TypeUrl: clusterURL, + TypeUrl: cdsURL, } goodCDSResponse2 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: clusterURL, + TypeUrl: cdsURL, Value: marshaledCluster2, }, }, - TypeUrl: clusterURL, + TypeUrl: cdsURL, } cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: clusterURL, + TypeUrl: cdsURL, Value: marshaledCluster1, }, { - TypeUrl: clusterURL, + TypeUrl: cdsURL, Value: marshaledCluster2, }, }, - TypeUrl: clusterURL, + TypeUrl: cdsURL, } ) diff --git a/xds/internal/client/client_test.go b/xds/internal/client/client_test.go index df5fbfaf68cc..f9edff4f1e66 100644 --- a/xds/internal/client/client_test.go +++ b/xds/internal/client/client_test.go @@ -93,7 +93,11 @@ func TestNew(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - if _, err := New(test.opts); (err != nil) != test.wantErr { + c, err := New(test.opts) + if err == nil { + defer c.Close() + } + if (err != nil) != test.wantErr { t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr) } }) @@ -260,6 +264,7 @@ func TestWatchServiceWithClientClose(t *testing.T) { if err != nil { t.Fatalf("New returned error: %v", err) } + defer xdsClient.Close() t.Log("Created an xdsClient...") callbackCh := make(chan error, 1) diff --git a/xds/internal/client/eds.go b/xds/internal/client/eds.go index 720b25698297..d18b598730f9 100644 --- a/xds/internal/client/eds.go +++ b/xds/internal/client/eds.go @@ -164,30 +164,11 @@ func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate { return u } -// newEDSRequest generates an EDS request proto for the provided clusterName, to -// be sent out on the wire. -func (v2c *v2Client) newEDSRequest(clusterName []string) *xdspb.DiscoveryRequest { - return &xdspb.DiscoveryRequest{ - Node: v2c.nodeProto, - TypeUrl: endpointURL, - ResourceNames: clusterName, - } -} - -// sendEDS sends an EDS request for provided clusterName on the provided stream. -func (v2c *v2Client) sendEDS(stream adsStream, clusterName []string) bool { - if err := stream.Send(v2c.newEDSRequest(clusterName)); err != nil { - grpclog.Warningf("xds: EDS request for resource %v failed: %v", clusterName, err) - return false - } - return true -} - func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { v2c.mu.Lock() defer v2c.mu.Unlock() - wi := v2c.watchMap[edsResource] + wi := v2c.watchMap[edsURL] if wi == nil { return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp) } diff --git a/xds/internal/client/eds_test.go b/xds/internal/client/eds_test.go index ae9aac34d727..1c75768a9370 100644 --- a/xds/internal/client/eds_test.go +++ b/xds/internal/client/eds_test.go @@ -120,11 +120,11 @@ var ( badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: endpointURL, + TypeUrl: edsURL, Value: []byte{1, 2, 3, 4}, }, }, - TypeUrl: endpointURL, + TypeUrl: edsURL, } badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ @@ -133,7 +133,7 @@ var ( Value: marshaledConnMgr1, }, }, - TypeUrl: endpointURL, + TypeUrl: edsURL, } goodEDSResponse1 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ @@ -145,7 +145,7 @@ var ( return a }(), }, - TypeUrl: endpointURL, + TypeUrl: edsURL, } goodEDSResponse2 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ @@ -157,7 +157,7 @@ var ( return a }(), }, - TypeUrl: endpointURL, + TypeUrl: edsURL, } ) @@ -283,6 +283,7 @@ func TestEDSHandleResponseWithoutWatch(t *testing.T) { sCleanup() }() v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() if v2c.handleEDSResponse(goodEDSResponse1) == nil { t.Fatal("v2c.handleEDSResponse() succeeded, should have failed") diff --git a/xds/internal/client/lds.go b/xds/internal/client/lds.go index 55b622e5dfd8..661b976fca7f 100644 --- a/xds/internal/client/lds.go +++ b/xds/internal/client/lds.go @@ -21,39 +21,18 @@ package client import ( "fmt" - "github.com/golang/protobuf/ptypes" - "google.golang.org/grpc/grpclog" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" + "github.com/golang/protobuf/ptypes" ) -// newLDSRequest generates an LDS request proto for the provided target, to be -// sent out on the wire. -func (v2c *v2Client) newLDSRequest(target []string) *xdspb.DiscoveryRequest { - return &xdspb.DiscoveryRequest{ - Node: v2c.nodeProto, - TypeUrl: listenerURL, - ResourceNames: target, - } -} - -// sendLDS sends an LDS request for provided target on the provided stream. -func (v2c *v2Client) sendLDS(stream adsStream, target []string) bool { - if err := stream.Send(v2c.newLDSRequest(target)); err != nil { - grpclog.Warningf("xds: LDS request for resource %v failed: %v", target, err) - return false - } - return true -} - // handleLDSResponse processes an LDS response received from the xDS server. On // receipt of a good response, it also invokes the registered watcher callback. func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error { v2c.mu.Lock() defer v2c.mu.Unlock() - wi := v2c.watchMap[ldsResource] + wi := v2c.watchMap[ldsURL] if wi == nil { return fmt.Errorf("xds: no LDS watcher found when handling LDS response: %+v", resp) } diff --git a/xds/internal/client/rds.go b/xds/internal/client/rds.go index a3cc397f7e27..35c017b81aec 100644 --- a/xds/internal/client/rds.go +++ b/xds/internal/client/rds.go @@ -23,31 +23,10 @@ import ( "net" "strings" - "github.com/golang/protobuf/ptypes" - "google.golang.org/grpc/grpclog" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/golang/protobuf/ptypes" ) -// newRDSRequest generates an RDS request proto for the provided routeName, to -// be sent out on the wire. -func (v2c *v2Client) newRDSRequest(routeName []string) *xdspb.DiscoveryRequest { - return &xdspb.DiscoveryRequest{ - Node: v2c.nodeProto, - TypeUrl: routeURL, - ResourceNames: routeName, - } -} - -// sendRDS sends an RDS request for provided routeName on the provided stream. -func (v2c *v2Client) sendRDS(stream adsStream, routeName []string) bool { - if err := stream.Send(v2c.newRDSRequest(routeName)); err != nil { - grpclog.Warningf("xds: RDS request for resource %v failed: %v", routeName, err) - return false - } - return true -} - // handleRDSResponse processes an RDS response received from the xDS server. On // receipt of a good response, it caches validated resources and also invokes // the registered watcher callback. @@ -55,12 +34,12 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error { v2c.mu.Lock() defer v2c.mu.Unlock() - if v2c.watchMap[ldsResource] == nil { + if v2c.watchMap[ldsURL] == nil { return fmt.Errorf("xds: unexpected RDS response when no LDS watcher is registered: %+v", resp) } - target := v2c.watchMap[ldsResource].target[0] + target := v2c.watchMap[ldsURL].target[0] - wi := v2c.watchMap[rdsResource] + wi := v2c.watchMap[rdsURL] if wi == nil { return fmt.Errorf("xds: no RDS watcher found when handling RDS response: %+v", resp) } diff --git a/xds/internal/client/types.go b/xds/internal/client/types.go index 04e21749f05c..6162d7e92bf3 100644 --- a/xds/internal/client/types.go +++ b/xds/internal/client/types.go @@ -27,20 +27,10 @@ import ( type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient const ( - listenerURL = "type.googleapis.com/envoy.api.v2.Listener" - routeURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration" - clusterURL = "type.googleapis.com/envoy.api.v2.Cluster" - endpointURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" -) - -// resourceType is an enum to represent the different xDS resources. -type resourceType int - -const ( - ldsResource resourceType = iota - rdsResource - cdsResource - edsResource + ldsURL = "type.googleapis.com/envoy.api.v2.Listener" + rdsURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration" + cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" + edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" ) // watchState is an enum to represent the state of a watch call. @@ -54,9 +44,10 @@ const ( // watchInfo holds all the information about a watch call. type watchInfo struct { - wType resourceType - target []string - state watchState + typeURL string + target []string + state watchState + callback interface{} expiryTimer *time.Timer } @@ -76,6 +67,12 @@ func (wi *watchInfo) stopTimer() { } } +type ackInfo struct { + typeURL string + version string // Nack if version is an empty string. + nonce string +} + type ldsUpdate struct { routeName string } diff --git a/xds/internal/client/v2client.go b/xds/internal/client/v2client.go index 1baf614ab2ce..f3fbdf8eb658 100644 --- a/xds/internal/client/v2client.go +++ b/xds/internal/client/v2client.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/buffer" + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" ) @@ -51,9 +52,9 @@ type v2Client struct { nodeProto *corepb.Node backoff func(int) time.Duration - // watchCh in the channel onto which watchInfo objects are pushed by the + // sendCh in the channel onto which watchInfo objects are pushed by the // watch API, and it is read and acted upon by the send() goroutine. - watchCh *buffer.Unbounded + sendCh *buffer.Unbounded mu sync.Mutex // Message specific watch infos, protected by the above mutex. These are @@ -62,7 +63,12 @@ type v2Client struct { // messages. When the user of this client object cancels a watch call, // these are set to nil. All accesses to the map protected and any value // inside the map should be protected with the above mutex. - watchMap map[resourceType]*watchInfo + watchMap map[string]*watchInfo + // ackMap contains the version that was acked (the version in the ack + // request that was sent on wire). The key is typeURL, the value is the + // version string, becaues the versions for different resource types + // should be independent. + ackMap map[string]string // rdsCache maintains a mapping of {routeConfigName --> clusterName} from // validated route configurations received in RDS responses. We cache all // valid route configurations, whether or not we are interested in them @@ -92,8 +98,9 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) cc: cc, nodeProto: nodeProto, backoff: backoff, - watchCh: buffer.NewUnbounded(), - watchMap: make(map[resourceType]*watchInfo), + sendCh: buffer.NewUnbounded(), + watchMap: make(map[string]*watchInfo), + ackMap: make(map[string]string), rdsCache: make(map[string]string), cdsCache: make(map[string]CDSUpdate), } @@ -153,6 +160,31 @@ func (v2c *v2Client) run() { } } +// sendRequest sends a request for provided typeURL and resource on the provided +// stream. +// +// version is the ack version to be sent with the request +// - If this is the new request (not an ack/nack), version will be an empty +// string +// - If this is an ack, version will be the version from the response +// - If this is a nack, version will be the previous acked version (from +// ackMap). If there was no ack before, it will be an empty string +func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool { + req := &xdspb.DiscoveryRequest{ + Node: v2c.nodeProto, + TypeUrl: typeURL, + ResourceNames: resourceNames, + VersionInfo: version, + ResponseNonce: nonce, + // TODO: populate ErrorDetails for nack. + } + if err := stream.Send(req); err != nil { + grpclog.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err) + return false + } + return true +} + // sendExisting sends out xDS requests for registered watchers when recovering // from a broken stream. // @@ -164,30 +196,80 @@ func (v2c *v2Client) sendExisting(stream adsStream) bool { v2c.mu.Lock() defer v2c.mu.Unlock() - for wType, wi := range v2c.watchMap { - switch wType { - case ldsResource: - if !v2c.sendLDS(stream, wi.target) { - return false - } - case rdsResource: - if !v2c.sendRDS(stream, wi.target) { - return false - } - case cdsResource: - if !v2c.sendCDS(stream, wi.target) { - return false - } - case edsResource: - if !v2c.sendEDS(stream, wi.target) { - return false - } + // Reset the ack versions when the stream restarts. + v2c.ackMap = make(map[string]string) + + for typeURL, wi := range v2c.watchMap { + if !v2c.sendRequest(stream, wi.target, typeURL, "", "") { + return false } } return true } +// processWatchInfo pulls the fields needed by the request from a watchInfo. +// +// It also calls callback with cached response, and updates the watch map in +// v2c. +// +// If the watch was already canceled, it returns false for send +func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) { + v2c.mu.Lock() + defer v2c.mu.Unlock() + if t.state == watchCancelled { + return // This returns all zero values, and false for send. + } + t.state = watchStarted + send = true + + typeURL = t.typeURL + target = t.target + v2c.checkCacheAndUpdateWatchMap(t) + // TODO: if watch is called again with the same resource names, + // there's no need to send another request. + // + // TODO: should we reset version (for ack) when a new watch is + // started? Or do this only if the resource names are different + // (so we send a new request)? + return +} + +// processAckInfo pulls the fields needed by the ack request from a ackInfo. +// +// If no active watch is found for this ack, it returns false for send. +func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) { + typeURL = t.typeURL + + v2c.mu.Lock() + defer v2c.mu.Unlock() + wi, ok := v2c.watchMap[typeURL] + if !ok { + // We don't send the request ack if there's no active watch (this can be + // either the server sends responses before any request, or the watch is + // canceled while the ackInfo is in queue), because there's no resource + // name. And if we send a request with empty resource name list, the + // server may treat it as a wild card and send us everything. + grpclog.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL) + return // This returns all zero values, and false for send. + } + send = true + + version = t.version + nonce = t.nonce + target = wi.target + if version == "" { + // This is a nack, get the previous acked version. + version = v2c.ackMap[typeURL] + // version will still be an empty string if typeURL isn't + // found in ackMap, this can happen if there wasn't any ack + // before. + } else { + v2c.ackMap[typeURL] = version + } + return +} + // send reads watch infos from update channel and sends out actual xDS requests // on the provided ADS stream. func (v2c *v2Client) send(stream adsStream, done chan struct{}) { @@ -199,36 +281,25 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) { select { case <-v2c.ctx.Done(): return - case u := <-v2c.watchCh.Get(): - v2c.watchCh.Load() - wi := u.(*watchInfo) - v2c.mu.Lock() - if wi.state == watchCancelled { - v2c.mu.Unlock() + case u := <-v2c.sendCh.Get(): + v2c.sendCh.Load() + + var ( + target []string + typeURL, version, nonce string + send bool + ) + switch t := u.(type) { + case *watchInfo: + target, typeURL, version, nonce, send = v2c.processWatchInfo(t) + case *ackInfo: + target, typeURL, version, nonce, send = v2c.processAckInfo(t) + } + if !send { continue } - wi.state = watchStarted - target := wi.target - v2c.checkCacheAndUpdateWatchMap(wi) - v2c.mu.Unlock() - - switch wi.wType { - case ldsResource: - if !v2c.sendLDS(stream, target) { - return - } - case rdsResource: - if !v2c.sendRDS(stream, target) { - return - } - case cdsResource: - if !v2c.sendCDS(stream, target) { - return - } - case edsResource: - if !v2c.sendEDS(stream, target) { - return - } + if !v2c.sendRequest(stream, target, typeURL, version, nonce) { + return } case <-done: return @@ -247,30 +318,36 @@ func (v2c *v2Client) recv(stream adsStream) bool { grpclog.Warningf("xds: ADS stream recv failed: %v", err) return success } + var respHandleErr error switch resp.GetTypeUrl() { - case listenerURL: - if err := v2c.handleLDSResponse(resp); err != nil { - grpclog.Warningf("xds: LDS response handler failed: %v", err) - return success - } - case routeURL: - if err := v2c.handleRDSResponse(resp); err != nil { - grpclog.Warningf("xds: RDS response handler failed: %v", err) - return success - } - case clusterURL: - if err := v2c.handleCDSResponse(resp); err != nil { - grpclog.Warningf("xds: CDS response handler failed: %v", err) - return success - } - case endpointURL: - if err := v2c.handleEDSResponse(resp); err != nil { - grpclog.Warningf("xds: EDS response handler failed: %v", err) - return success - } + case ldsURL: + respHandleErr = v2c.handleLDSResponse(resp) + case rdsURL: + respHandleErr = v2c.handleRDSResponse(resp) + case cdsURL: + respHandleErr = v2c.handleCDSResponse(resp) + case edsURL: + respHandleErr = v2c.handleEDSResponse(resp) default: grpclog.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl()) + continue + } + + typeURL := resp.GetTypeUrl() + if respHandleErr != nil { + grpclog.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr) + v2c.sendCh.Put(&ackInfo{ + typeURL: typeURL, + version: "", + nonce: resp.GetNonce(), + }) + continue } + v2c.sendCh.Put(&ackInfo{ + typeURL: typeURL, + version: resp.GetVersionInfo(), + nonce: resp.GetNonce(), + }) success = true } } @@ -282,18 +359,11 @@ func (v2c *v2Client) recv(stream adsStream) bool { // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) { - wi := &watchInfo{wType: ldsResource, target: []string{target}, callback: ldsCb} - v2c.watchCh.Put(wi) - return func() { - v2c.mu.Lock() - defer v2c.mu.Unlock() - if wi.state == watchEnqueued { - wi.state = watchCancelled - return - } - v2c.watchMap[ldsResource].cancel() - delete(v2c.watchMap, ldsResource) - } + return v2c.watch(&watchInfo{ + typeURL: ldsURL, + target: []string{target}, + callback: ldsCb, + }) } // watchRDS registers an RDS watcher for the provided routeName. Updates @@ -303,21 +373,14 @@ func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) { - wi := &watchInfo{wType: rdsResource, target: []string{routeName}, callback: rdsCb} - v2c.watchCh.Put(wi) - return func() { - v2c.mu.Lock() - defer v2c.mu.Unlock() - if wi.state == watchEnqueued { - wi.state = watchCancelled - return - } - v2c.watchMap[rdsResource].cancel() - delete(v2c.watchMap, rdsResource) - // TODO: Once a registered RDS watch is cancelled, we should send an - // RDS request with no resources. This will let the server know that we - // are no longer interested in this resource. - } + return v2c.watch(&watchInfo{ + typeURL: rdsURL, + target: []string{routeName}, + callback: rdsCb, + }) + // TODO: Once a registered RDS watch is cancelled, we should send an RDS + // request with no resources. This will let the server know that we are no + // longer interested in this resource. } // watchCDS registers an CDS watcher for the provided clusterName. Updates @@ -327,18 +390,11 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func( // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { - wi := &watchInfo{wType: cdsResource, target: []string{clusterName}, callback: cdsCb} - v2c.watchCh.Put(wi) - return func() { - v2c.mu.Lock() - defer v2c.mu.Unlock() - if wi.state == watchEnqueued { - wi.state = watchCancelled - return - } - v2c.watchMap[cdsResource].cancel() - delete(v2c.watchMap, cdsResource) - } + return v2c.watch(&watchInfo{ + typeURL: cdsURL, + target: []string{clusterName}, + callback: cdsCb, + }) } // watchEDS registers an EDS watcher for the provided clusterName. Updates @@ -348,8 +404,18 @@ func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel fun // The provided callback should not block or perform any expensive operations // or call other methods of the v2Client object. func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) { - wi := &watchInfo{wType: edsResource, target: []string{clusterName}, callback: edsCb} - v2c.watchCh.Put(wi) + return v2c.watch(&watchInfo{ + typeURL: edsURL, + target: []string{clusterName}, + callback: edsCb, + }) + // TODO: Once a registered EDS watch is cancelled, we should send an EDS + // request with no resources. This will let the server know that we are no + // longer interested in this resource. +} + +func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { + v2c.sendCh.Put(wi) return func() { v2c.mu.Lock() defer v2c.mu.Unlock() @@ -357,11 +423,9 @@ func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel fun wi.state = watchCancelled return } - v2c.watchMap[edsResource].cancel() - delete(v2c.watchMap, edsResource) - // TODO: Once a registered EDS watch is cancelled, we should send an - // EDS request with no resources. This will let the server know that we - // are no longer interested in this resource. + v2c.watchMap[wi.typeURL].cancel() + delete(v2c.watchMap, wi.typeURL) + // TODO: should we reset ack version string when cancelling the watch? } } @@ -372,25 +436,25 @@ func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel fun // // Caller should hold v2c.mu func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { - if existing := v2c.watchMap[wi.wType]; existing != nil { + if existing := v2c.watchMap[wi.typeURL]; existing != nil { existing.cancel() } - v2c.watchMap[wi.wType] = wi - switch wi.wType { + v2c.watchMap[wi.typeURL] = wi + switch wi.typeURL { // We need to grab the lock inside of the expiryTimer's afterFunc because // we need to access the watchInfo, which is stored in the watchMap. - case ldsResource: + case ldsURL: wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { v2c.mu.Lock() wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found", wi.target)) v2c.mu.Unlock() }) - case rdsResource: + case rdsURL: routeName := wi.target[0] if cluster := v2c.rdsCache[routeName]; cluster != "" { var err error - if v2c.watchMap[ldsResource] == nil { + if v2c.watchMap[ldsURL] == nil { cluster = "" err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName) } @@ -404,11 +468,11 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found", wi.target)) v2c.mu.Unlock() }) - case cdsResource: + case cdsURL: clusterName := wi.target[0] if update, ok := v2c.cdsCache[clusterName]; ok { var err error - if v2c.watchMap[cdsResource] == nil { + if v2c.watchMap[cdsURL] == nil { err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) } wi.callback.(cdsCallback)(update, err) @@ -419,7 +483,7 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) v2c.mu.Unlock() }) - case edsResource: + case edsURL: wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { v2c.mu.Lock() wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target)) diff --git a/xds/internal/client/v2client_ack_test.go b/xds/internal/client/v2client_ack_test.go new file mode 100644 index 000000000000..1e73dd529e22 --- /dev/null +++ b/xds/internal/client/v2client_ack_test.go @@ -0,0 +1,294 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "fmt" + "strconv" + "testing" + "time" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/golang/protobuf/proto" + anypb "github.com/golang/protobuf/ptypes/any" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/xds/internal/client/fakexds" +) + +func emptyChanRecvWithTimeout(ch <-chan struct{}, d time.Duration) error { + timer := time.NewTimer(d) + select { + case <-timer.C: + return fmt.Errorf("timeout") + case <-ch: + timer.Stop() + return nil + } +} + +func requestChanRecvWithTimeout(ch <-chan *fakexds.Request, d time.Duration) (*fakexds.Request, error) { + timer := time.NewTimer(d) + select { + case <-timer.C: + return nil, fmt.Errorf("timeout waiting for request") + case r := <-ch: + timer.Stop() + return r, nil + } +} + +// compareXDSRequest reads requests from channel, compare it with want. +func compareXDSRequest(ch <-chan *fakexds.Request, d time.Duration, want *xdspb.DiscoveryRequest, version, nonce string) error { + r, err := requestChanRecvWithTimeout(ch, d) + if err != nil { + return err + } + if r.Err != nil { + return fmt.Errorf("unexpected error from request: %v", r.Err) + } + wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest) + wantClone.VersionInfo = version + wantClone.ResponseNonce = nonce + if !cmp.Equal(r.Req, wantClone, cmp.Comparer(proto.Equal)) { + return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(r.Req, wantClone)) + } + return nil +} + +func sendXDSRespWithVersion(ch chan<- *fakexds.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) { + respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse) + respToSend.VersionInfo = strconv.Itoa(version) + nonce = strconv.Itoa(int(time.Now().UnixNano())) + respToSend.Nonce = nonce + ch <- &fakexds.Response{Resp: respToSend} + return +} + +// TestV2ClientAck verifies that valid responses are acked, and invalid ones are +// nacked. +// +// This test also verifies the version for different types are independent. +func TestV2ClientAck(t *testing.T) { + var ( + versionLDS = 1000 + versionRDS = 2000 + versionCDS = 3000 + versionEDS = 4000 + ) + + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() + v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start the watch, send a good response, and check for ack. + cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ + cbRDS := startXDS(t, "RDS", v2c, fakeServer, goodRDSRequest) + sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) + versionRDS++ + cbCDS := startXDS(t, "CDS", v2c, fakeServer, goodCDSRequest) + sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) + versionCDS++ + cbEDS := startXDS(t, "EDS", v2c, fakeServer, goodEDSRequest) + sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) + versionEDS++ + + // Send a bad response, and check for nack. + sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest) + versionLDS++ + sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest) + versionRDS++ + sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest) + versionCDS++ + sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest) + versionEDS++ + + // send another good response, and check for ack, with the new version. + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ + sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) + versionRDS++ + sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) + versionCDS++ + sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) + versionEDS++ +} + +// startXDS calls watch to send the first request. It then sends a good response +// and checks for ack. +func startXDS(t *testing.T, xdsname string, v2c *v2Client, fakeServer *fakexds.Server, goodReq *xdspb.DiscoveryRequest) <-chan struct{} { + callbackCh := make(chan struct{}, 1) + switch xdsname { + case "LDS": + v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh <- struct{}{} + }) + case "RDS": + v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh <- struct{}{} + }) + case "CDS": + v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh <- struct{}{} + }) + case "EDS": + v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh <- struct{}{} + }) + } + + if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodReq, "", ""); err != nil { + t.Fatalf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("FakeServer received %s request...", xdsname) + return callbackCh +} + +// sendGoodResp sends the good response, with the given version, and a random +// nonce. +// +// It also waits and checks that the ack request contains the given version, and +// the generated nonce. +func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh <-chan struct{}) { + nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, goodResp, version) + t.Logf("Good %s response pushed to fakeServer...", xdsname) + + if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version), nonce); err != nil { + t.Errorf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("Good %s response acked", xdsname) + if err := emptyChanRecvWithTimeout(callbackCh, defaultTestTimeout); err != nil { + t.Errorf("Timeout when expecting %s update", xdsname) + } + t.Logf("Good %s response callback executed", xdsname) +} + +// sendBadResp sends a bad response with the given version. This response will +// be nacked, so we expect a request with the previous version (version-1). +// +// But the nonce in request should be the new nonce. +func sendBadResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, wantReq *xdspb.DiscoveryRequest) { + var typeURL string + switch xdsname { + case "LDS": + typeURL = ldsURL + case "RDS": + typeURL = rdsURL + case "CDS": + typeURL = cdsURL + case "EDS": + typeURL = edsURL + } + nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{{}}, + TypeUrl: typeURL, + }, version) + t.Logf("Bad %s response pushed to fakeServer...", xdsname) + if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version-1), nonce); err != nil { + t.Errorf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("Bad %s response nacked", xdsname) +} + +// Test when the first response is invalid, and is nacked, the nack requests +// should have an empty version string. +func TestV2ClientAckFirstIsNack(t *testing.T) { + var versionLDS = 1000 + + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() + v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start the watch, send a good response, and check for ack. + cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + + nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{{}}, + TypeUrl: ldsURL, + }, versionLDS) + t.Logf("Bad response pushed to fakeServer...") + + // The expected version string is an empty string, because this is the first + // response, and it's nacked (so there's no previous ack version). + if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, "", nonce); err != nil { + t.Errorf("Failed to receive request: %v", err) + } + t.Logf("Bad response nacked") + versionLDS++ + + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ +} + +// Test when a nack is sent after a new watch, we nack with the previous acked +// version (instead of resetting to empty string). +func TestV2ClientAckNackAfterNewWatch(t *testing.T) { + var versionLDS = 1000 + + fakeServer, sCleanup := fakexds.StartServer(t) + client, cCleanup := fakeServer.GetClientConn(t) + defer func() { + cCleanup() + sCleanup() + }() + v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start the watch, send a good response, and check for ack. + cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ + + // Start a new watch. + cbLDS = startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest) + + // This is an invalid response after the new watch. + nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{{}}, + TypeUrl: ldsURL, + }, versionLDS) + t.Logf("Bad response pushed to fakeServer...") + + // The expected version string is the previous acked version. + if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { + t.Errorf("Failed to receive request: %v", err) + } + t.Logf("Bad response nacked") + versionLDS++ + + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ +} diff --git a/xds/internal/client/v2client_test.go b/xds/internal/client/v2client_test.go index b52a4a80511b..a844f6888a2a 100644 --- a/xds/internal/client/v2client_test.go +++ b/xds/internal/client/v2client_test.go @@ -64,9 +64,24 @@ var ( } goodLDSRequest = &xdspb.DiscoveryRequest{ Node: goodNodeProto, - TypeUrl: listenerURL, + TypeUrl: ldsURL, ResourceNames: []string{goodLDSTarget1}, } + goodRDSRequest = &xdspb.DiscoveryRequest{ + Node: goodNodeProto, + TypeUrl: rdsURL, + ResourceNames: []string{goodRouteName1}, + } + goodCDSRequest = &xdspb.DiscoveryRequest{ + Node: goodNodeProto, + TypeUrl: cdsURL, + ResourceNames: []string{goodClusterName1}, + } + goodEDSRequest = &xdspb.DiscoveryRequest{ + Node: goodNodeProto, + TypeUrl: edsURL, + ResourceNames: []string{goodEDSName}, + } goodHTTPConnManager1 = &httppb.HttpConnectionManager{ RouteSpecifier: &httppb.HttpConnectionManager_Rds{ Rds: &httppb.Rds{ @@ -130,7 +145,7 @@ var ( Name: goodLDSTarget1, ApiListener: &listenerpb.ApiListener{ ApiListener: &anypb.Any{ - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledListener1, }, }, @@ -156,30 +171,30 @@ var ( goodLDSResponse1 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledListener1, }, }, - TypeUrl: listenerURL, + TypeUrl: ldsURL, } goodLDSResponse2 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledListener2, }, }, - TypeUrl: listenerURL, + TypeUrl: ldsURL, } - emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: listenerURL} + emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: ldsURL} badlyMarshaledLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: []byte{1, 2, 3, 4}, }, }, - TypeUrl: listenerURL, + TypeUrl: ldsURL, } badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ @@ -188,55 +203,55 @@ var ( Value: marshaledConnMgr1, }, }, - TypeUrl: listenerURL, + TypeUrl: ldsURL, } ldsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledListener2, }, { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledListener1, }, }, - TypeUrl: listenerURL, + TypeUrl: ldsURL, } noAPIListenerLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledNoAPIListener, }, }, - TypeUrl: listenerURL, + TypeUrl: ldsURL, } goodBadUglyLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledListener2, }, { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: marshaledListener1, }, { - TypeUrl: listenerURL, + TypeUrl: ldsURL, Value: badlyMarshaledAPIListener2, }, }, - TypeUrl: listenerURL, + TypeUrl: ldsURL, } badlyMarshaledRDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: routeURL, + TypeUrl: rdsURL, Value: []byte{1, 2, 3, 4}, }, }, - TypeUrl: routeURL, + TypeUrl: rdsURL, } badResourceTypeInRDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ @@ -245,7 +260,7 @@ var ( Value: marshaledConnMgr1, }, }, - TypeUrl: routeURL, + TypeUrl: rdsURL, } emptyRouteConfig = &xdspb.RouteConfiguration{} marshaledEmptyRouteConfig, _ = proto.Marshal(emptyRouteConfig) @@ -255,11 +270,11 @@ var ( noVirtualHostsInRDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: routeURL, + TypeUrl: rdsURL, Value: marshaledEmptyRouteConfig, }, }, - TypeUrl: routeURL, + TypeUrl: rdsURL, } goodRouteConfig1 = &xdspb.RouteConfiguration{ Name: goodRouteName1, @@ -346,29 +361,29 @@ var ( goodRDSResponse1 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: routeURL, + TypeUrl: rdsURL, Value: marshaledGoodRouteConfig1, }, }, - TypeUrl: routeURL, + TypeUrl: rdsURL, } goodRDSResponse2 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: routeURL, + TypeUrl: rdsURL, Value: marshaledGoodRouteConfig2, }, }, - TypeUrl: routeURL, + TypeUrl: rdsURL, } uninterestingRDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { - TypeUrl: routeURL, + TypeUrl: rdsURL, Value: marshaledUninterestingRouteConfig, }, }, - TypeUrl: routeURL, + TypeUrl: rdsURL, } ) @@ -448,6 +463,8 @@ func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { case <-callbackCh: timer.Stop() } + // Read the ack, so the next request is sent after stream re-creation. + <-fakeServer.RequestChan fakeServer.ResponseChan <- &fakexds.Response{Err: errors.New("RPC error")} t.Log("Bad LDS response pushed to fakeServer...")