Skip to content

Commit

Permalink
xds: support LRS server config
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed May 6, 2024
1 parent 4879d51 commit 308bf78
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 166 deletions.
16 changes: 1 addition & 15 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,21 +609,7 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
Cluster: cluster.ClusterName,
EDSServiceName: cluster.EDSServiceName,
MaxConcurrentRequests: cluster.MaxRequests,
}
if cluster.LRSServerConfig == xdsresource.ClusterLRSServerSelf {
bootstrapConfig := b.xdsClient.BootstrapConfig()
parsedName := xdsresource.ParseName(cluster.ClusterName)
if parsedName.Scheme == xdsresource.FederationScheme {
// Is a federation resource name, find the corresponding
// authority server config.
if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok {
dm.LoadReportingServer = cfg.XDSServer
}
} else {
// Not a federation resource name, use the default
// authority.
dm.LoadReportingServer = bootstrapConfig.XDSServer
}
LoadReportingServer: cluster.LRSServerConfig,
}
case xdsresource.ClusterTypeLogicalDNS:
dm = clusterresolver.DiscoveryMechanism{
Expand Down
11 changes: 4 additions & 7 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ type authorityArgs struct {
// (although the former is part of the latter) is because authorities in the
// bootstrap config might contain an empty server config, and in this case,
// the top-level server config is to be used.
//
// There are two code paths from where a new authority struct might be
// created. One is when a watch is registered for a resource, and one is
// when load reporting needs to be started. We have the authority name in
// the first case, but do in the second. We only have the server config in
// the second case.
serverCfg *bootstrap.ServerConfig
bootstrapCfg *bootstrap.Config
serializer *grpcsync.CallbackSerializer
Expand Down Expand Up @@ -156,7 +150,10 @@ func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate
return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL)
}

opts := &xdsresource.DecodeOptions{BootstrapConfig: a.bootstrapCfg}
opts := &xdsresource.DecodeOptions{
BootstrapConfig: a.bootstrapCfg,
ServerConfig: a.serverCfg,
}
updates, md, err := decodeAllResources(opts, rType, resourceUpdate)
a.updateResourceStateAndScheduleCallbacks(rType, updates, md)
return err
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/clientimpl_authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// authority, without holding c.authorityMu.
//
// Caller must not hold c.authorityMu.
func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref func(), _ error) {
func (c *clientImpl) findAuthority(n *xdsresource.Name) (*authority, func(), error) {
scheme, authority := n.Scheme, n.Authority

c.authorityMu.Lock()
Expand Down
9 changes: 0 additions & 9 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,6 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
return func() {}
}

// TODO: replace this with the code does the following when we have
// implemented generic watch API on the authority:
// - Parse the resource name and extract the authority.
// - Locate the corresponding authority object and acquire a reference to
// it. If the authority is not found, error out.
// - Call the watchResource() method on the authority.
// - Return a cancel function to cancel the watch on the authority and to
// release the reference.

// TODO: Make ParseName return an error if parsing fails, and
// schedule the OnError callback in that case.
n := xdsresource.ParseName(resourceName)
Expand Down
20 changes: 14 additions & 6 deletions xds/internal/xdsclient/tests/resource_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
},
wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
"resource-name-1": {
Expand All @@ -689,9 +688,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
},
wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
"resource-name-1": {
Expand Down Expand Up @@ -763,6 +761,16 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
}

// For tests expected to succeed, we expect an LRS server config in
// the update from the xDS client, because the LRS bit is turned on
// in the cluster resource. We *cannot* set the LRS server config in
// the test table because we do not have the address of the xDS
// server at that point, hence we do it here before verifying the
// received update.
if test.wantErr == "" {
test.wantUpdate.LRSServerConfig = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address)
}
cmpOpts := []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type clusterResourceType struct {
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
name, cluster, err := unmarshalClusterResource(resource)
name, cluster, err := unmarshalClusterResource(resource, opts.ServerConfig)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
Expand Down
8 changes: 6 additions & 2 deletions xds/internal/xdsclient/xdsresource/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,13 @@ type ResourceData interface {
// DecodeOptions wraps the options required by ResourceType implementation for
// decoding configuration received from the xDS management server.
type DecodeOptions struct {
// BootstrapConfig contains the bootstrap configuration passed to the
// top-level xdsClient. This contains useful data for resource validation.
// BootstrapConfig contains the complete bootstrap configuration passed to
// the xDS client. This contains useful data for resource validation.
BootstrapConfig *bootstrap.Config
// ServerConfig contains the server config (from the above bootstrap
// configuration) of the xDS server from which the current resource, for
// which Decode() is being invoked, was received.
ServerConfig *bootstrap.ServerConfig
}

// DecodeResult is the result of a decode operation.
Expand Down
36 changes: 21 additions & 15 deletions xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer/leastrequest"
_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/serviceconfig"
_ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters.
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/wrapperspb"

v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
Expand All @@ -48,9 +50,9 @@ import (
v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
_ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters.
)

type s struct {
Expand All @@ -66,8 +68,6 @@ const (
serviceName = "service"
)

var emptyUpdate = xdsresource.ClusterUpdate{ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff}

func wrrLocality(t *testing.T, m proto.Message) *v3wrrlocalitypb.WrrLocality {
return &v3wrrlocalitypb.WrrLocality{
EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{
Expand Down Expand Up @@ -105,6 +105,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
tests := []struct {
name string
cluster *v3clusterpb.Cluster
serverCfg *bootstrap.ServerConfig
wantUpdate xdsresource.ClusterUpdate
wantLBConfig *iserviceconfig.BalancerConfig
}{
Expand Down Expand Up @@ -164,7 +165,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff, ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: clusterName,
ClusterType: xdsresource.ClusterTypeAggregate,
PrioritizedClusterNames: []string{"a", "b", "c"},
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Expand All @@ -179,7 +181,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
{
name: "happy-case-no-service-name-no-lrs",
cluster: e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone),
wantUpdate: emptyUpdate,
wantUpdate: xdsresource.ClusterUpdate{ClusterName: clusterName},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Config: &wrrlocality.LBConfig{
Expand All @@ -206,16 +208,17 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
{
name: "happiest-case",
name: "happiest-case-with-lrs",
cluster: e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: serviceName,
EnableLRS: true,
}),
serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Expand Down Expand Up @@ -248,10 +251,11 @@ func (s) TestValidateCluster_Success(t *testing.T) {
}
return c
}(),
serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
MaxRequests: func() *uint32 { i := uint32(512); return &i }(),
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Expand Down Expand Up @@ -298,7 +302,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Expand Down Expand Up @@ -353,7 +358,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Expand Down Expand Up @@ -527,7 +533,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster)
update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster, test.serverCfg)
if err != nil {
t.Errorf("validateClusterAndConstructClusterUpdate(%+v) failed: %v", test.cluster, err)
}
Expand Down
21 changes: 5 additions & 16 deletions xds/internal/xdsclient/xdsresource/type_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package xdsresource
import (
"encoding/json"

"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/anypb"
)

Expand All @@ -39,18 +40,6 @@ const (
ClusterTypeAggregate
)

// ClusterLRSServerConfigType is the type of LRS server config.
type ClusterLRSServerConfigType int

const (
// ClusterLRSOff indicates LRS is off (loads are not reported for this
// cluster).
ClusterLRSOff ClusterLRSServerConfigType = iota
// ClusterLRSServerSelf indicates loads should be reported to the same
// server (the authority) where the CDS resp is received from.
ClusterLRSServerSelf
)

// ClusterUpdate contains information from a received CDS response, which is of
// interest to the registered CDS watcher.
type ClusterUpdate struct {
Expand All @@ -60,10 +49,10 @@ type ClusterUpdate struct {
// EDSServiceName is an optional name for EDS. If it's not set, the balancer
// should watch ClusterName for the EDS resources.
EDSServiceName string
// LRSServerConfig contains the server where the load reports should be sent
// to. This can be change to an interface, to support other types, e.g. a
// ServerConfig with ServerURI, creds.
LRSServerConfig ClusterLRSServerConfigType
// LRSServerConfig contains configuration about the xDS server that sent
// this cluster resource. This is also the server where load reports are to
// be sent, for this cluster.
LRSServerConfig *bootstrap.ServerConfig
// SecurityCfg contains security configuration sent by the control plane.
SecurityCfg *SecurityConfig
// MaxRequests for circuit breaking, if any (otherwise nil).
Expand Down
19 changes: 5 additions & 14 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/grpc/internal/pretty"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/proto"
Expand All @@ -50,7 +51,7 @@ var ValidateClusterAndConstructClusterUpdateForTesting = validateClusterAndConst
// to this value by the management server.
const transportSocketName = "envoy.transport_sockets.tls"

func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) {
func unmarshalClusterResource(r *anypb.Any, serverCfg *bootstrap.ServerConfig) (string, ClusterUpdate, error) {
r, err := UnwrapResource(r)
if err != nil {
return "", ClusterUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
Expand All @@ -64,7 +65,7 @@ func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) {
if err := proto.Unmarshal(r.GetValue(), cluster); err != nil {
return "", ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}
cu, err := validateClusterAndConstructClusterUpdate(cluster)
cu, err := validateClusterAndConstructClusterUpdate(cluster, serverCfg)
if err != nil {
return cluster.GetName(), ClusterUpdate{}, err
}
Expand All @@ -81,7 +82,7 @@ const (
defaultLeastRequestChoiceCount = 2
)

func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serverCfg *bootstrap.ServerConfig) (ClusterUpdate, error) {
telemetryLabels := make(map[string]string)
if fmd := cluster.GetMetadata().GetFilterMetadata(); fmd != nil {
if val, ok := fmd["com.google.csm.telemetry_labels"]; ok {
Expand Down Expand Up @@ -182,21 +183,11 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
TelemetryLabels: telemetryLabels,
}

// Note that this is different from the gRFC (gRFC A47 says to include the
// full ServerConfig{URL,creds,server feature} here). This information is
// not available here, because this function doesn't have access to the
// xdsclient bootstrap information now (can be added if necessary). The
// ServerConfig will be read and populated by the CDS balancer when
// processing this field.
// According to A27:
// If the `lrs_server` field is set, it must have its `self` field set, in
// which case the client should use LRS for load reporting. Otherwise
// (the `lrs_server` field is not set), LRS load reporting will be disabled.
if lrs := cluster.GetLrsServer(); lrs != nil {
if lrs.GetSelf() == nil {
return ClusterUpdate{}, fmt.Errorf("unsupported config_source_specifier %T in lrs_server field", lrs.ConfigSourceSpecifier)
}
ret.LRSServerConfig = ClusterLRSServerSelf
ret.LRSServerConfig = serverCfg
}

// Validate and set cluster type from the response.
Expand Down
Loading

0 comments on commit 308bf78

Please sign in to comment.