Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: store server config for LRS server in xdsresource.ClusterUpdate #7191

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,21 +609,7 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste
Cluster: cluster.ClusterName,
EDSServiceName: cluster.EDSServiceName,
MaxConcurrentRequests: cluster.MaxRequests,
}
if cluster.LRSServerConfig == xdsresource.ClusterLRSServerSelf {
bootstrapConfig := b.xdsClient.BootstrapConfig()
parsedName := xdsresource.ParseName(cluster.ClusterName)
if parsedName.Scheme == xdsresource.FederationScheme {
// Is a federation resource name, find the corresponding
// authority server config.
if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok {
dm.LoadReportingServer = cfg.XDSServer
}
} else {
// Not a federation resource name, use the default
// authority.
dm.LoadReportingServer = bootstrapConfig.XDSServer
}
LoadReportingServer: cluster.LRSServerConfig,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we're still missing something because we aren't supporting LRS for aggregate clusters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #7192, we currently don't support LRS for non-EDS clusters, and according to Mark, that is a bug. And with regards to aggregate clusters, Mark mentioned that the expected behavior is specified in A75, but Go hasn't implemented A74 and A75 yet.

So, in this PR, I didn't want to add LRS support for non-EDS clusters. I will do that as part of #7192.

}
case xdsresource.ClusterTypeLogicalDNS:
dm = clusterresolver.DiscoveryMechanism{
Expand Down
11 changes: 4 additions & 7 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ type authorityArgs struct {
// (although the former is part of the latter) is because authorities in the
// bootstrap config might contain an empty server config, and in this case,
// the top-level server config is to be used.
//
// There are two code paths from where a new authority struct might be
// created. One is when a watch is registered for a resource, and one is
// when load reporting needs to be started. We have the authority name in
// the first case, but do in the second. We only have the server config in
// the second case.
serverCfg *bootstrap.ServerConfig
bootstrapCfg *bootstrap.Config
serializer *grpcsync.CallbackSerializer
Expand Down Expand Up @@ -156,7 +150,10 @@ func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate
return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL)
}

opts := &xdsresource.DecodeOptions{BootstrapConfig: a.bootstrapCfg}
opts := &xdsresource.DecodeOptions{
BootstrapConfig: a.bootstrapCfg,
ServerConfig: a.serverCfg,
}
updates, md, err := decodeAllResources(opts, rType, resourceUpdate)
a.updateResourceStateAndScheduleCallbacks(rType, updates, md)
return err
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/clientimpl_authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// authority, without holding c.authorityMu.
//
// Caller must not hold c.authorityMu.
func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref func(), _ error) {
func (c *clientImpl) findAuthority(n *xdsresource.Name) (*authority, func(), error) {
scheme, authority := n.Scheme, n.Authority

c.authorityMu.Lock()
Expand Down
9 changes: 0 additions & 9 deletions xds/internal/xdsclient/clientimpl_watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,6 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string,
return func() {}
}

// TODO: replace this with the code does the following when we have
// implemented generic watch API on the authority:
// - Parse the resource name and extract the authority.
// - Locate the corresponding authority object and acquire a reference to
// it. If the authority is not found, error out.
// - Call the watchResource() method on the authority.
// - Return a cancel function to cancel the watch on the authority and to
// release the reference.

// TODO: Make ParseName return an error if parsing fails, and
// schedule the OnError callback in that case.
n := xdsresource.ParseName(resourceName)
Expand Down
20 changes: 14 additions & 6 deletions xds/internal/xdsclient/tests/resource_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
},
wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
"resource-name-1": {
Expand All @@ -689,9 +688,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
ClusterName: "resource-name-1",
EDSServiceName: "eds-service-name",
},
wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{
"resource-name-1": {
Expand Down Expand Up @@ -763,6 +761,16 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr)
}

// For tests expected to succeed, we expect an LRS server config in
// the update from the xDS client, because the LRS bit is turned on
// in the cluster resource. We *cannot* set the LRS server config in
// the test table because we do not have the address of the xDS
// server at that point, hence we do it here before verifying the
// received update.
if test.wantErr == "" {
test.wantUpdate.LRSServerConfig = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address)
}
cmpOpts := []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type clusterResourceType struct {
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
name, cluster, err := unmarshalClusterResource(resource)
name, cluster, err := unmarshalClusterResource(resource, opts.ServerConfig)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
Expand Down
8 changes: 6 additions & 2 deletions xds/internal/xdsclient/xdsresource/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,13 @@ type ResourceData interface {
// DecodeOptions wraps the options required by ResourceType implementation for
// decoding configuration received from the xDS management server.
type DecodeOptions struct {
// BootstrapConfig contains the bootstrap configuration passed to the
// top-level xdsClient. This contains useful data for resource validation.
// BootstrapConfig contains the complete bootstrap configuration passed to
// the xDS client. This contains useful data for resource validation.
BootstrapConfig *bootstrap.Config
// ServerConfig contains the server config (from the above bootstrap
// configuration) of the xDS server from which the current resource, for
// which Decode() is being invoked, was received.
ServerConfig *bootstrap.ServerConfig
}

// DecodeResult is the result of a decode operation.
Expand Down
36 changes: 21 additions & 15 deletions xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer/leastrequest"
_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/serviceconfig"
_ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters.
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/wrapperspb"

v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
Expand All @@ -48,9 +50,9 @@ import (
v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3"
v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

_ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer.
_ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters.
)

type s struct {
Expand All @@ -66,8 +68,6 @@ const (
serviceName = "service"
)

var emptyUpdate = xdsresource.ClusterUpdate{ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff}

func wrrLocality(t *testing.T, m proto.Message) *v3wrrlocalitypb.WrrLocality {
return &v3wrrlocalitypb.WrrLocality{
EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{
Expand Down Expand Up @@ -105,6 +105,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
tests := []struct {
name string
cluster *v3clusterpb.Cluster
serverCfg *bootstrap.ServerConfig
wantUpdate xdsresource.ClusterUpdate
wantLBConfig *iserviceconfig.BalancerConfig
}{
Expand Down Expand Up @@ -164,7 +165,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff, ClusterType: xdsresource.ClusterTypeAggregate,
ClusterName: clusterName,
ClusterType: xdsresource.ClusterTypeAggregate,
PrioritizedClusterNames: []string{"a", "b", "c"},
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Expand All @@ -179,7 +181,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {
{
name: "happy-case-no-service-name-no-lrs",
cluster: e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone),
wantUpdate: emptyUpdate,
wantUpdate: xdsresource.ClusterUpdate{ClusterName: clusterName},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Config: &wrrlocality.LBConfig{
Expand All @@ -206,16 +208,17 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
{
name: "happiest-case",
name: "happiest-case-with-lrs",
cluster: e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: serviceName,
EnableLRS: true,
}),
serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Expand Down Expand Up @@ -248,10 +251,11 @@ func (s) TestValidateCluster_Success(t *testing.T) {
}
return c
}(),
serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName,
EDSServiceName: serviceName,
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"},
MaxRequests: func() *uint32 { i := uint32(512); return &i }(),
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Expand Down Expand Up @@ -298,7 +302,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST,
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Expand Down Expand Up @@ -353,7 +358,8 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName,
ClusterName: clusterName,
EDSServiceName: serviceName,
},
wantLBConfig: &iserviceconfig.BalancerConfig{
Name: "least_request_experimental",
Expand Down Expand Up @@ -527,7 +533,7 @@ func (s) TestValidateCluster_Success(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster)
update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster, test.serverCfg)
if err != nil {
t.Errorf("validateClusterAndConstructClusterUpdate(%+v) failed: %v", test.cluster, err)
}
Expand Down
21 changes: 5 additions & 16 deletions xds/internal/xdsclient/xdsresource/type_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package xdsresource
import (
"encoding/json"

"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/protobuf/types/known/anypb"
)

Expand All @@ -39,18 +40,6 @@ const (
ClusterTypeAggregate
)

// ClusterLRSServerConfigType is the type of LRS server config.
type ClusterLRSServerConfigType int

const (
// ClusterLRSOff indicates LRS is off (loads are not reported for this
// cluster).
ClusterLRSOff ClusterLRSServerConfigType = iota
// ClusterLRSServerSelf indicates loads should be reported to the same
// server (the authority) where the CDS resp is received from.
ClusterLRSServerSelf
)

// ClusterUpdate contains information from a received CDS response, which is of
// interest to the registered CDS watcher.
type ClusterUpdate struct {
Expand All @@ -60,10 +49,10 @@ type ClusterUpdate struct {
// EDSServiceName is an optional name for EDS. If it's not set, the balancer
// should watch ClusterName for the EDS resources.
EDSServiceName string
// LRSServerConfig contains the server where the load reports should be sent
// to. This can be change to an interface, to support other types, e.g. a
// ServerConfig with ServerURI, creds.
LRSServerConfig ClusterLRSServerConfigType
// LRSServerConfig contains configuration about the xDS server that sent
// this cluster resource. This is also the server where load reports are to
// be sent, for this cluster.
LRSServerConfig *bootstrap.ServerConfig
// SecurityCfg contains security configuration sent by the control plane.
SecurityCfg *SecurityConfig
// MaxRequests for circuit breaking, if any (otherwise nil).
Expand Down
19 changes: 5 additions & 14 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/pretty"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
Expand All @@ -50,7 +51,7 @@ var ValidateClusterAndConstructClusterUpdateForTesting = validateClusterAndConst
// to this value by the management server.
const transportSocketName = "envoy.transport_sockets.tls"

func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) {
func unmarshalClusterResource(r *anypb.Any, serverCfg *bootstrap.ServerConfig) (string, ClusterUpdate, error) {
r, err := UnwrapResource(r)
if err != nil {
return "", ClusterUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err)
Expand All @@ -64,7 +65,7 @@ func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) {
if err := proto.Unmarshal(r.GetValue(), cluster); err != nil {
return "", ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}
cu, err := validateClusterAndConstructClusterUpdate(cluster)
cu, err := validateClusterAndConstructClusterUpdate(cluster, serverCfg)
if err != nil {
return cluster.GetName(), ClusterUpdate{}, err
}
Expand All @@ -81,7 +82,7 @@ const (
defaultLeastRequestChoiceCount = 2
)

func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serverCfg *bootstrap.ServerConfig) (ClusterUpdate, error) {
telemetryLabels := make(map[string]string)
if fmd := cluster.GetMetadata().GetFilterMetadata(); fmd != nil {
if val, ok := fmd["com.google.csm.telemetry_labels"]; ok {
Expand Down Expand Up @@ -182,21 +183,11 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
TelemetryLabels: telemetryLabels,
}

// Note that this is different from the gRFC (gRFC A47 says to include the
// full ServerConfig{URL,creds,server feature} here). This information is
// not available here, because this function doesn't have access to the
// xdsclient bootstrap information now (can be added if necessary). The
// ServerConfig will be read and populated by the CDS balancer when
// processing this field.
// According to A27:
// If the `lrs_server` field is set, it must have its `self` field set, in
// which case the client should use LRS for load reporting. Otherwise
// (the `lrs_server` field is not set), LRS load reporting will be disabled.
if lrs := cluster.GetLrsServer(); lrs != nil {
if lrs.GetSelf() == nil {
return ClusterUpdate{}, fmt.Errorf("unsupported config_source_specifier %T in lrs_server field", lrs.ConfigSourceSpecifier)
}
ret.LRSServerConfig = ClusterLRSServerSelf
ret.LRSServerConfig = serverCfg
}

// Validate and set cluster type from the response.
Expand Down
Loading
Loading