Skip to content

Commit

Permalink
xds: support ack/nack (#3227)
Browse files Browse the repository at this point in the history
The client will send a request with version/nonce after receiving a
response, to ack/nack.

Ack versions for different xds types are independent.

Some other changes
- merge sendRequests to one shared function, with fields for version/nonce
- deleted enum for xds types, and always use const URL string
  • Loading branch information
menghanl authored Dec 12, 2019
1 parent 032a379 commit df18b54
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 279 deletions.
23 changes: 1 addition & 22 deletions xds/internal/client/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,15 @@ import (

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"
)

// newCDSRequest generates an CDS request proto for the provided clusterName,
// to be sent out on the wire.
func (v2c *v2Client) newCDSRequest(clusterName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: clusterURL,
ResourceNames: clusterName,
}
}

// sendCDS sends an CDS request for provided clusterName on the provided
// stream.
func (v2c *v2Client) sendCDS(stream adsStream, clusterName []string) bool {
if err := stream.Send(v2c.newCDSRequest(clusterName)); err != nil {
grpclog.Warningf("xds: CDS request for resource %v failed: %v", clusterName, err)
return false
}
return true
}

// handleCDSResponse processes an CDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()

wi := v2c.watchMap[cdsResource]
wi := v2c.watchMap[cdsURL]
if wi == nil {
return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
}
Expand Down
20 changes: 10 additions & 10 deletions xds/internal/client/cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func TestCDSCaching(t *testing.T) {
},
// Push an empty CDS response. This should clear the cache.
{
responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}},
responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}},
wantOpErr: false,
wantCDSCache: map[string]CDSUpdate{},
wantWatchCallback: true,
Expand Down Expand Up @@ -466,11 +466,11 @@ var (
badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
goodCluster1 = &xdspb.Cluster{
Name: clusterName1,
Expand Down Expand Up @@ -508,32 +508,32 @@ var (
goodCDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster1,
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
goodCDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster2,
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster1,
},
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster2,
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
)
7 changes: 6 additions & 1 deletion xds/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ func TestNew(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if _, err := New(test.opts); (err != nil) != test.wantErr {
c, err := New(test.opts)
if err == nil {
defer c.Close()
}
if (err != nil) != test.wantErr {
t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr)
}
})
Expand Down Expand Up @@ -260,6 +264,7 @@ func TestWatchServiceWithClientClose(t *testing.T) {
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")

callbackCh := make(chan error, 1)
Expand Down
21 changes: 1 addition & 20 deletions xds/internal/client/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,30 +164,11 @@ func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
return u
}

// newEDSRequest generates an EDS request proto for the provided clusterName, to
// be sent out on the wire.
func (v2c *v2Client) newEDSRequest(clusterName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: endpointURL,
ResourceNames: clusterName,
}
}

// sendEDS sends an EDS request for provided clusterName on the provided stream.
func (v2c *v2Client) sendEDS(stream adsStream, clusterName []string) bool {
if err := stream.Send(v2c.newEDSRequest(clusterName)); err != nil {
grpclog.Warningf("xds: EDS request for resource %v failed: %v", clusterName, err)
return false
}
return true
}

func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()

wi := v2c.watchMap[edsResource]
wi := v2c.watchMap[edsURL]
if wi == nil {
return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp)
}
Expand Down
11 changes: 6 additions & 5 deletions xds/internal/client/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ var (
badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: endpointURL,
TypeUrl: edsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
Expand All @@ -133,7 +133,7 @@ var (
Value: marshaledConnMgr1,
},
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
goodEDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
Expand All @@ -145,7 +145,7 @@ var (
return a
}(),
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
goodEDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
Expand All @@ -157,7 +157,7 @@ var (
return a
}(),
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
)

Expand Down Expand Up @@ -283,6 +283,7 @@ func TestEDSHandleResponseWithoutWatch(t *testing.T) {
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()

if v2c.handleEDSResponse(goodEDSResponse1) == nil {
t.Fatal("v2c.handleEDSResponse() succeeded, should have failed")
Expand Down
25 changes: 2 additions & 23 deletions xds/internal/client/lds.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,18 @@ package client
import (
"fmt"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
"github.com/golang/protobuf/ptypes"
)

// newLDSRequest generates an LDS request proto for the provided target, to be
// sent out on the wire.
func (v2c *v2Client) newLDSRequest(target []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: listenerURL,
ResourceNames: target,
}
}

// sendLDS sends an LDS request for provided target on the provided stream.
func (v2c *v2Client) sendLDS(stream adsStream, target []string) bool {
if err := stream.Send(v2c.newLDSRequest(target)); err != nil {
grpclog.Warningf("xds: LDS request for resource %v failed: %v", target, err)
return false
}
return true
}

// handleLDSResponse processes an LDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()

wi := v2c.watchMap[ldsResource]
wi := v2c.watchMap[ldsURL]
if wi == nil {
return fmt.Errorf("xds: no LDS watcher found when handling LDS response: %+v", resp)
}
Expand Down
29 changes: 4 additions & 25 deletions xds/internal/client/rds.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,23 @@ import (
"net"
"strings"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"

xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes"
)

// newRDSRequest generates an RDS request proto for the provided routeName, to
// be sent out on the wire.
func (v2c *v2Client) newRDSRequest(routeName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: routeURL,
ResourceNames: routeName,
}
}

// sendRDS sends an RDS request for provided routeName on the provided stream.
func (v2c *v2Client) sendRDS(stream adsStream, routeName []string) bool {
if err := stream.Send(v2c.newRDSRequest(routeName)); err != nil {
grpclog.Warningf("xds: RDS request for resource %v failed: %v", routeName, err)
return false
}
return true
}

// handleRDSResponse processes an RDS response received from the xDS server. On
// receipt of a good response, it caches validated resources and also invokes
// the registered watcher callback.
func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()

if v2c.watchMap[ldsResource] == nil {
if v2c.watchMap[ldsURL] == nil {
return fmt.Errorf("xds: unexpected RDS response when no LDS watcher is registered: %+v", resp)
}
target := v2c.watchMap[ldsResource].target[0]
target := v2c.watchMap[ldsURL].target[0]

wi := v2c.watchMap[rdsResource]
wi := v2c.watchMap[rdsURL]
if wi == nil {
return fmt.Errorf("xds: no RDS watcher found when handling RDS response: %+v", resp)
}
Expand Down
31 changes: 14 additions & 17 deletions xds/internal/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,10 @@ import (
type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient

const (
listenerURL = "type.googleapis.com/envoy.api.v2.Listener"
routeURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration"
clusterURL = "type.googleapis.com/envoy.api.v2.Cluster"
endpointURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)

// resourceType is an enum to represent the different xDS resources.
type resourceType int

const (
ldsResource resourceType = iota
rdsResource
cdsResource
edsResource
ldsURL = "type.googleapis.com/envoy.api.v2.Listener"
rdsURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration"
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)

// watchState is an enum to represent the state of a watch call.
Expand All @@ -54,9 +44,10 @@ const (

// watchInfo holds all the information about a watch call.
type watchInfo struct {
wType resourceType
target []string
state watchState
typeURL string
target []string
state watchState

callback interface{}
expiryTimer *time.Timer
}
Expand All @@ -76,6 +67,12 @@ func (wi *watchInfo) stopTimer() {
}
}

type ackInfo struct {
typeURL string
version string // Nack if version is an empty string.
nonce string
}

type ldsUpdate struct {
routeName string
}
Expand Down
Loading

0 comments on commit df18b54

Please sign in to comment.