Skip to content

Commit

Permalink
xds: support cluster fallback in cluster_resolver (#4594)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl authored Jul 21, 2021
1 parent 65cabd7 commit 0300770
Show file tree
Hide file tree
Showing 17 changed files with 587 additions and 722 deletions.
132 changes: 70 additions & 62 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,27 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/xdsclient"
)

const (
cdsName = "cds_experimental"
edsName = "eds_experimental"
)

var (
errBalancerClosed = errors.New("cdsBalancer is closed")

// newEDSBalancer is a helper function to build a new edsBalancer and will be
// overridden in unittests.
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
builder := balancer.Get(edsName)
// newChildBalancer is a helper function to build a new cluster_resolver
// balancer and will be overridden in unittests.
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
builder := balancer.Get(clusterresolver.Name)
if builder == nil {
return nil, fmt.Errorf("xds: no balancer builder with name %v", edsName)
return nil, fmt.Errorf("xds: no balancer builder with name %v", clusterresolver.Name)
}
// We directly pass the parent clientConn to the
// underlying edsBalancer because the cdsBalancer does
// not deal with subConns.
// We directly pass the parent clientConn to the underlying
// cluster_resolver balancer because the cdsBalancer does not deal with
// subConns.
return builder.Build(cc, opts), nil
}
buildProvider = buildProviderFunc
Expand Down Expand Up @@ -126,31 +126,32 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
// ccUpdate wraps a clientConn update received from gRPC (pushed from the
// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS
// watcher with the xdsClient, while a non-nil error causes it to cancel the
// existing watch and propagate the error to the underlying edsBalancer.
// existing watch and propagate the error to the underlying cluster_resolver
// balancer.
type ccUpdate struct {
clusterName string
err error
}

// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the edsBalancer.
// on to the cluster_resolver balancer.
type scUpdate struct {
subConn balancer.SubConn
state balancer.SubConnState
}

// cdsBalancer implements a CDS based LB policy. It instantiates an EDS based
// LB policy to further resolve the serviceName received from CDS, into
// localities and endpoints. Implements the balancer.Balancer interface which
// is exposed to gRPC and implements the balancer.ClientConn interface which is
// exposed to the edsBalancer.
// cdsBalancer implements a CDS based LB policy. It instantiates a
// cluster_resolver balancer to further resolve the serviceName received from
// CDS, into localities and endpoints. Implements the balancer.Balancer
// interface which is exposed to gRPC and implements the balancer.ClientConn
// interface which is exposed to the cluster_resolver balancer.
type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource.
clusterHandler *clusterHandler // To watch the clusters.
edsLB balancer.Balancer // EDS child policy.
childLB balancer.Balancer
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
Expand All @@ -166,7 +167,7 @@ type cdsBalancer struct {
// handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good
// updates lead to registration of a CDS watch. Updates with error lead to
// cancellation of existing watch and propagation of the same error to the
// edsBalancer.
// cluster_resolver balancer.
func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) {
// We first handle errors, if any, and then proceed with handling the
// update, only if the status quo has changed.
Expand Down Expand Up @@ -266,15 +267,15 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc
}

// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying edsBalancer.
// lead to clientConn updates being invoked on the underlying cluster_resolver balancer.
func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
if err := update.err; err != nil {
b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err)
b.handleErrorFromUpdate(err, false)
return
}

b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.chu), pretty.ToJSON(update.securityCfg))
b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.updates), pretty.ToJSON(update.securityCfg))

// Process the security config from the received update before building the
// child policy or forwarding the update to it. We do this because the child
Expand All @@ -291,47 +292,54 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
}

// The first good update from the watch API leads to the instantiation of an
// edsBalancer. Further updates/errors are propagated to the existing
// edsBalancer.
if b.edsLB == nil {
edsLB, err := newEDSBalancer(b.ccw, b.bOpts)
// cluster_resolver balancer. Further updates/errors are propagated to the existing
// cluster_resolver balancer.
if b.childLB == nil {
childLB, err := newChildBalancer(b.ccw, b.bOpts)
if err != nil {
b.logger.Errorf("Failed to create child policy of type %s, %v", edsName, err)
b.logger.Errorf("Failed to create child policy of type %s, %v", clusterresolver.Name, err)
return
}
b.edsLB = edsLB
b.logger.Infof("Created child policy %p of type %s", b.edsLB, edsName)
}
b.childLB = childLB
b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name)
}

dms := make([]balancerconfig.DiscoveryMechanism, len(update.updates))
for i, cu := range update.updates {
switch cu.ClusterType {
case xdsclient.ClusterTypeEDS:
dms[i] = balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: cu.ClusterName,
EDSServiceName: cu.EDSServiceName,
MaxConcurrentRequests: cu.MaxRequests,
}
if cu.EnableLRS {
// An empty string here indicates that the cluster_resolver balancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
dms[i].LoadReportingServerName = new(string)

if len(update.chu) == 0 {
b.logger.Infof("got update with 0 cluster updates, should never happen. There should be at least one cluster")
}
case xdsclient.ClusterTypeLogicalDNS:
dms[i] = balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: cu.DNSHostName,
}
default:
b.logger.Infof("unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
}
}
// TODO: this function is currently only handling the cluster with higher
// priority. This should work in most cases (e.g. if the cluster is not a
// aggregated cluster, or if the higher priority cluster works fine so
// there's no need to fallback). This quick fix is to unblock the testing
// work before the full fallback support is complete. Once the EDS balancer
// is updated to cluster_resolver, which has the fallback functionality, we
// will fix this to handle all the clusters in list.
cds := update.chu[0]
lbCfg := &clusterresolver.EDSConfig{
ClusterName: cds.ClusterName,
EDSServiceName: cds.EDSServiceName,
MaxConcurrentRequests: cds.MaxRequests,
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
}
if cds.EnableLRS {
// An empty string here indicates that the edsBalancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
lbCfg.LrsLoadReportingServerName = new(string)

}
ccState := balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
}
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
if err := b.childLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("xds: cluster_resolver balancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
}
}

Expand All @@ -348,20 +356,20 @@ func (b *cdsBalancer) run() {
b.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are passthrough and are simply handed over to
// the underlying edsBalancer.
if b.edsLB == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no edsBalancer", update)
// the underlying cluster_resolver balancer.
if b.childLB == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no cluster_resolver balancer", update)
break
}
b.edsLB.UpdateSubConnState(update.subConn, update.state)
b.childLB.UpdateSubConnState(update.subConn, update.state)
}
case u := <-b.clusterHandler.updateChannel:
b.handleWatchUpdate(u)
case <-b.closed.Done():
b.clusterHandler.close()
if b.edsLB != nil {
b.edsLB.Close()
b.edsLB = nil
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
Expand Down Expand Up @@ -389,22 +397,22 @@ func (b *cdsBalancer) run() {
// - If it's from xds client, it means CDS resource were removed. The CDS
// watcher should keep watching.
//
// In both cases, the error will be forwarded to EDS balancer. And if error is
// resource-not-found, the child EDS balancer will stop watching EDS.
// In both cases, the error will be forwarded to the child balancer. And if
// error is resource-not-found, the child balancer will stop watching EDS.
func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
// This is not necessary today, because xds client never sends connection
// errors.
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
b.clusterHandler.close()
}
if b.edsLB != nil {
if b.childLB != nil {
if xdsclient.ErrType(err) != xdsclient.ErrorTypeConnection {
// Connection errors will be sent to the child balancers directly.
// There's no need to forward them.
b.edsLB.ResolverError(err)
b.childLB.ResolverError(err)
}
} else {
// If eds balancer was never created, fail the RPCs with
// If child balancer was never created, fail the RPCs with
// errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Expand Down
20 changes: 10 additions & 10 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
// Override the creation of the EDS balancer to return a fake EDS balancer
// implementation.
edsB := newTestEDSBalancer()
oldEDSBalancerBuilder := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
oldEDSBalancerBuilder := newChildBalancer
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
edsB.parentCC = cc
return edsB, nil
}
Expand All @@ -177,7 +177,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newEDSBalancer = oldEDSBalancerBuilder
newChildBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup. No security config is
// newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
Expand Down Expand Up @@ -464,7 +464,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -498,7 +498,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -551,7 +551,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -601,7 +601,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
Expand Down Expand Up @@ -672,7 +672,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{
ClusterName: serviceName,
SecurityCfg: &xdsclient.SecurityConfig{
Expand Down
Loading

0 comments on commit 0300770

Please sign in to comment.