diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index c9014247a767..e52a34a7d29a 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -35,27 +35,27 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/xdsclient" ) const ( cdsName = "cds_experimental" - edsName = "eds_experimental" ) var ( errBalancerClosed = errors.New("cdsBalancer is closed") - // newEDSBalancer is a helper function to build a new edsBalancer and will be - // overridden in unittests. - newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { - builder := balancer.Get(edsName) + // newChildBalancer is a helper function to build a new cluster_resolver + // balancer and will be overridden in unittests. + newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { + builder := balancer.Get(clusterresolver.Name) if builder == nil { - return nil, fmt.Errorf("xds: no balancer builder with name %v", edsName) + return nil, fmt.Errorf("xds: no balancer builder with name %v", clusterresolver.Name) } - // We directly pass the parent clientConn to the - // underlying edsBalancer because the cdsBalancer does - // not deal with subConns. + // We directly pass the parent clientConn to the underlying + // cluster_resolver balancer because the cdsBalancer does not deal with + // subConns. return builder.Build(cc, opts), nil } buildProvider = buildProviderFunc @@ -126,31 +126,32 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err // ccUpdate wraps a clientConn update received from gRPC (pushed from the // xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS // watcher with the xdsClient, while a non-nil error causes it to cancel the -// existing watch and propagate the error to the underlying edsBalancer. +// existing watch and propagate the error to the underlying cluster_resolver +// balancer. type ccUpdate struct { clusterName string err error } // scUpdate wraps a subConn update received from gRPC. This is directly passed -// on to the edsBalancer. +// on to the cluster_resolver balancer. type scUpdate struct { subConn balancer.SubConn state balancer.SubConnState } -// cdsBalancer implements a CDS based LB policy. It instantiates an EDS based -// LB policy to further resolve the serviceName received from CDS, into -// localities and endpoints. Implements the balancer.Balancer interface which -// is exposed to gRPC and implements the balancer.ClientConn interface which is -// exposed to the edsBalancer. +// cdsBalancer implements a CDS based LB policy. It instantiates a +// cluster_resolver balancer to further resolve the serviceName received from +// CDS, into localities and endpoints. Implements the balancer.Balancer +// interface which is exposed to gRPC and implements the balancer.ClientConn +// interface which is exposed to the cluster_resolver balancer. type cdsBalancer struct { ccw *ccWrapper // ClientConn interface passed to child LB. bOpts balancer.BuildOptions // BuildOptions passed to child LB. updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates. xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource. clusterHandler *clusterHandler // To watch the clusters. - edsLB balancer.Balancer // EDS child policy. + childLB balancer.Balancer logger *grpclog.PrefixLogger closed *grpcsync.Event done *grpcsync.Event @@ -166,7 +167,7 @@ type cdsBalancer struct { // handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good // updates lead to registration of a CDS watch. Updates with error lead to // cancellation of existing watch and propagation of the same error to the -// edsBalancer. +// cluster_resolver balancer. func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) { // We first handle errors, if any, and then proceed with handling the // update, only if the status quo has changed. @@ -266,7 +267,7 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc } // handleWatchUpdate handles a watch update from the xDS Client. Good updates -// lead to clientConn updates being invoked on the underlying edsBalancer. +// lead to clientConn updates being invoked on the underlying cluster_resolver balancer. func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) { if err := update.err; err != nil { b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err) @@ -274,7 +275,7 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) { return } - b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.chu), pretty.ToJSON(update.securityCfg)) + b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.updates), pretty.ToJSON(update.securityCfg)) // Process the security config from the received update before building the // child policy or forwarding the update to it. We do this because the child @@ -291,47 +292,54 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) { } // The first good update from the watch API leads to the instantiation of an - // edsBalancer. Further updates/errors are propagated to the existing - // edsBalancer. - if b.edsLB == nil { - edsLB, err := newEDSBalancer(b.ccw, b.bOpts) + // cluster_resolver balancer. Further updates/errors are propagated to the existing + // cluster_resolver balancer. + if b.childLB == nil { + childLB, err := newChildBalancer(b.ccw, b.bOpts) if err != nil { - b.logger.Errorf("Failed to create child policy of type %s, %v", edsName, err) + b.logger.Errorf("Failed to create child policy of type %s, %v", clusterresolver.Name, err) return } - b.edsLB = edsLB - b.logger.Infof("Created child policy %p of type %s", b.edsLB, edsName) - } + b.childLB = childLB + b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name) + } + + dms := make([]balancerconfig.DiscoveryMechanism, len(update.updates)) + for i, cu := range update.updates { + switch cu.ClusterType { + case xdsclient.ClusterTypeEDS: + dms[i] = balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: cu.ClusterName, + EDSServiceName: cu.EDSServiceName, + MaxConcurrentRequests: cu.MaxRequests, + } + if cu.EnableLRS { + // An empty string here indicates that the cluster_resolver balancer should use the + // same xDS server for load reporting as it does for EDS + // requests/responses. + dms[i].LoadReportingServerName = new(string) - if len(update.chu) == 0 { - b.logger.Infof("got update with 0 cluster updates, should never happen. There should be at least one cluster") + } + case xdsclient.ClusterTypeLogicalDNS: + dms[i] = balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: cu.DNSHostName, + } + default: + b.logger.Infof("unexpected cluster type %v when handling update from cluster handler", cu.ClusterType) + } } - // TODO: this function is currently only handling the cluster with higher - // priority. This should work in most cases (e.g. if the cluster is not a - // aggregated cluster, or if the higher priority cluster works fine so - // there's no need to fallback). This quick fix is to unblock the testing - // work before the full fallback support is complete. Once the EDS balancer - // is updated to cluster_resolver, which has the fallback functionality, we - // will fix this to handle all the clusters in list. - cds := update.chu[0] - lbCfg := &clusterresolver.EDSConfig{ - ClusterName: cds.ClusterName, - EDSServiceName: cds.EDSServiceName, - MaxConcurrentRequests: cds.MaxRequests, + lbCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: dms, } - if cds.EnableLRS { - // An empty string here indicates that the edsBalancer should use the - // same xDS server for load reporting as it does for EDS - // requests/responses. - lbCfg.LrsLoadReportingServerName = new(string) - } ccState := balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient), BalancerConfig: lbCfg, } - if err := b.edsLB.UpdateClientConnState(ccState); err != nil { - b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err) + if err := b.childLB.UpdateClientConnState(ccState); err != nil { + b.logger.Errorf("xds: cluster_resolver balancer.UpdateClientConnState(%+v) returned error: %v", ccState, err) } } @@ -348,20 +356,20 @@ func (b *cdsBalancer) run() { b.handleClientConnUpdate(update) case *scUpdate: // SubConn updates are passthrough and are simply handed over to - // the underlying edsBalancer. - if b.edsLB == nil { - b.logger.Errorf("xds: received scUpdate {%+v} with no edsBalancer", update) + // the underlying cluster_resolver balancer. + if b.childLB == nil { + b.logger.Errorf("xds: received scUpdate {%+v} with no cluster_resolver balancer", update) break } - b.edsLB.UpdateSubConnState(update.subConn, update.state) + b.childLB.UpdateSubConnState(update.subConn, update.state) } case u := <-b.clusterHandler.updateChannel: b.handleWatchUpdate(u) case <-b.closed.Done(): b.clusterHandler.close() - if b.edsLB != nil { - b.edsLB.Close() - b.edsLB = nil + if b.childLB != nil { + b.childLB.Close() + b.childLB = nil } if b.cachedRoot != nil { b.cachedRoot.Close() @@ -389,22 +397,22 @@ func (b *cdsBalancer) run() { // - If it's from xds client, it means CDS resource were removed. The CDS // watcher should keep watching. // -// In both cases, the error will be forwarded to EDS balancer. And if error is -// resource-not-found, the child EDS balancer will stop watching EDS. +// In both cases, the error will be forwarded to the child balancer. And if +// error is resource-not-found, the child balancer will stop watching EDS. func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) { // This is not necessary today, because xds client never sends connection // errors. if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound { b.clusterHandler.close() } - if b.edsLB != nil { + if b.childLB != nil { if xdsclient.ErrType(err) != xdsclient.ErrorTypeConnection { // Connection errors will be sent to the child balancers directly. // There's no need to forward them. - b.edsLB.ResolverError(err) + b.childLB.ResolverError(err) } } else { - // If eds balancer was never created, fail the RPCs with + // If child balancer was never created, fail the RPCs with // errors. b.ccw.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index 067bc2b05369..7eb1d0889395 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -153,8 +153,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS // Override the creation of the EDS balancer to return a fake EDS balancer // implementation. edsB := newTestEDSBalancer() - oldEDSBalancerBuilder := newEDSBalancer - newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { + oldEDSBalancerBuilder := newChildBalancer + newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { edsB.parentCC = cc return edsB, nil } @@ -177,7 +177,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS } return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { - newEDSBalancer = oldEDSBalancerBuilder + newChildBalancer = oldEDSBalancerBuilder xdsC.Close() } } @@ -251,7 +251,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} wantCCS := edsCCS(serviceName, nil, false) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -306,7 +306,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. No security config is + // newChildBalancer function as part of test setup. No security config is // passed to the CDS balancer as part of this update. cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} wantCCS := edsCCS(serviceName, nil, false) @@ -464,7 +464,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. wantCCS := edsCCS(serviceName, nil, false) if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { t.Fatal(err) @@ -498,7 +498,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. wantCCS := edsCCS(serviceName, nil, false) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() @@ -551,7 +551,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. wantCCS := edsCCS(serviceName, nil, false) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() @@ -601,7 +601,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. wantCCS := edsCCS(serviceName, nil, false) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() @@ -672,7 +672,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. cdsUpdate := xdsclient.ClusterUpdate{ ClusterName: serviceName, SecurityCfg: &xdsclient.SecurityConfig{ diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 8b103143ff76..a4c6d40f7824 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" @@ -197,20 +198,26 @@ func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState { // edsCCS is a helper function to construct a good update passed from the // cdsBalancer to the edsBalancer. func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientConnState { - lbCfg := &clusterresolver.EDSConfig{ - ClusterName: service, + discoveryMechanism := balancerconfig.DiscoveryMechanism{ + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: service, MaxConcurrentRequests: countMax, } if enableLRS { - lbCfg.LrsLoadReportingServerName = new(string) + discoveryMechanism.LoadReportingServerName = new(string) + + } + lbCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{discoveryMechanism}, } + return balancer.ClientConnState{ BalancerConfig: lbCfg, } } // setup creates a cdsBalancer and an edsBalancer (and overrides the -// newEDSBalancer function to return it), and also returns a cleanup function. +// newChildBalancer function to return it), and also returns a cleanup function. func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) { t.Helper() xdsC := fakeclient.NewClient() @@ -222,14 +229,14 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x cdsB := builder.Build(tcc, balancer.BuildOptions{}) edsB := newTestEDSBalancer() - oldEDSBalancerBuilder := newEDSBalancer - newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { + oldEDSBalancerBuilder := newChildBalancer + newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) { edsB.parentCC = cc return edsB, nil } return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { - newEDSBalancer = oldEDSBalancerBuilder + newChildBalancer = oldEDSBalancerBuilder xdsC.Close() } } @@ -426,7 +433,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} wantCCS := edsCCS(serviceName, nil, false) if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -511,7 +518,7 @@ func (s) TestResolverError(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} wantCCS := edsCCS(serviceName, nil, false) if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { @@ -560,7 +567,7 @@ func (s) TestUpdateSubConnState(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} wantCCS := edsCCS(serviceName, nil, false) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -628,7 +635,7 @@ func (s) TestClose(t *testing.T) { // will trigger the watch handler on the CDS balancer, which will attempt to // create a new EDS balancer. The fake EDS balancer created above will be // returned to the CDS balancer, because we have overridden the - // newEDSBalancer function as part of test setup. + // newChildBalancer function as part of test setup. cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} wantCCS := edsCCS(serviceName, nil, false) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler.go b/xds/internal/balancer/cdsbalancer/cluster_handler.go index b0760c7630ab..1f5acafe110b 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler.go @@ -32,9 +32,9 @@ var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a type clusterHandlerUpdate struct { // securityCfg is the Security Config from the top (root) cluster. securityCfg *xdsclient.SecurityConfig - // chu is a list of ClusterUpdates from all the leaf clusters. - chu []xdsclient.ClusterUpdate - err error + // updates is a list of ClusterUpdates from all the leaf clusters. + updates []xdsclient.ClusterUpdate + err error } // clusterHandler will be given a name representing a cluster. It will then @@ -101,7 +101,7 @@ func (ch *clusterHandler) constructClusterUpdate() { } ch.updateChannel <- clusterHandlerUpdate{ securityCfg: ch.root.clusterUpdate.SecurityCfg, - chu: clusterUpdate, + updates: clusterUpdate, } } diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index 216592f9200e..dc69dd34e2af 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -95,7 +95,7 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) { fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil) select { case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" { + if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" { t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -189,7 +189,7 @@ func (s) TestSuccessCaseLeafNodeThenNewUpdate(t *testing.T) { fakeClient.InvokeWatchClusterCallback(test.newClusterUpdate, nil) select { case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" { + if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" { t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -305,7 +305,7 @@ func (s) TestUpdateRootClusterAggregateSuccess(t *testing.T) { // ordered as per the cluster update. select { case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{ + if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{ ClusterType: xdsclient.ClusterTypeEDS, ClusterName: edsService, }, { @@ -412,7 +412,7 @@ func (s) TestUpdateRootClusterAggregateThenChangeChild(t *testing.T) { select { case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{ + if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{ ClusterType: xdsclient.ClusterTypeEDS, ClusterName: edsService, }, { @@ -658,7 +658,7 @@ func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) { // Then an update should successfully be written to the update buffer. select { case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{ + if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{ ClusterType: xdsclient.ClusterTypeEDS, ClusterName: edsService2, }}); diff != "" { diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index f61b56b9a2cf..cb8176d16448 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -34,12 +34,13 @@ import ( "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/xdsclient" ) // Name is the name of the cluster_resolver balancer. -const Name = "eds_experimental" +const Name = "cluster_resolver_experimental" var ( errBalancerClosed = errors.New("cdsBalancer is closed") @@ -68,7 +69,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal } b := &clusterResolverBalancer{ - cc: cc, bOpts: opts, updateCh: buffer.NewUnbounded(), closed: grpcsync.NewEvent(), @@ -79,9 +79,11 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal } b.logger = prefixLogger(b) b.logger.Infof("Created") - b.edsWatcher = &edsWatcher{ - parent: b, - updateChannel: make(chan *watchUpdate, 1), + + b.resourceWatcher = newResourceResolver(b) + b.cc = &ccWrapper{ + ClientConn: cc, + resourceWatcher: b.resourceWatcher, } go b.run() @@ -93,9 +95,9 @@ func (bb) Name() string { } func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - var cfg EDSConfig + var cfg LBConfig if err := json.Unmarshal(c, &cfg); err != nil { - return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err) + return nil, fmt.Errorf("unable to unmarshal balancer config %s into cluster-resolver config, error: %v", string(c), err) } return &cfg, nil } @@ -114,33 +116,35 @@ type scUpdate struct { state balancer.SubConnState } -// clusterResolverBalancer manages xdsClient and the actual EDS balancer -// implementation that does load balancing. +// clusterResolverBalancer manages xdsClient and the actual EDS balancer implementation that +// does load balancing. +// +// It currently has only an clusterResolverBalancer. Later, we may add fallback. type clusterResolverBalancer struct { - cc balancer.ClientConn - bOpts balancer.BuildOptions - updateCh *buffer.Unbounded // Channel for updates from gRPC. - edsWatcher *edsWatcher - logger *grpclog.PrefixLogger - closed *grpcsync.Event - done *grpcsync.Event + cc balancer.ClientConn + bOpts balancer.BuildOptions + updateCh *buffer.Unbounded // Channel for updates from gRPC. + resourceWatcher *resourceResolver + logger *grpclog.PrefixLogger + closed *grpcsync.Event + done *grpcsync.Event priorityBuilder balancer.Builder priorityConfigParser balancer.ConfigParser - config *EDSConfig + config *LBConfig configRaw *serviceconfig.ParseResult xdsClient xdsclient.XDSClient // xDS client to watch EDS resource. attrsWithClient *attributes.Attributes // Attributes with xdsClient attached to be passed to the child policies. child balancer.Balancer - edsResp xdsclient.EndpointsUpdate + priorities []balancerconfig.PriorityConfig watchUpdateReceived bool } // handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good -// updates lead to registration of an EDS watch. Updates with error lead to -// cancellation of existing watch and propagation of the same error to the +// updates lead to registration of EDS and DNS watches. Updates with error lead +// to cancellation of existing watch and propagation of the same error to the // child balancer. func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) { // We first handle errors, if any, and then proceed with handling the @@ -151,7 +155,7 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) { } b.logger.Infof("Receive update from resolver, balancer config: %v", pretty.ToJSON(update.state.BalancerConfig)) - cfg, _ := update.state.BalancerConfig.(*EDSConfig) + cfg, _ := update.state.BalancerConfig.(*LBConfig) if cfg == nil { b.logger.Warningf("xds: unexpected LoadBalancingConfig type: %T", update.state.BalancerConfig) return @@ -159,7 +163,7 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) { b.config = cfg b.configRaw = update.state.ResolverState.ServiceConfig - b.edsWatcher.updateConfig(cfg) + b.resourceWatcher.updateMechanisms(cfg.DiscoveryMechanisms) if !b.watchUpdateReceived { // If update was not received, wait for it. @@ -175,16 +179,16 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) { // handleWatchUpdate handles a watch update from the xDS Client. Good updates // lead to clientConn updates being invoked on the underlying child balancer. -func (b *clusterResolverBalancer) handleWatchUpdate(update *watchUpdate) { +func (b *clusterResolverBalancer) handleWatchUpdate(update *resourceUpdate) { if err := update.err; err != nil { b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err) b.handleErrorFromUpdate(err, false) return } - b.logger.Infof("resource update: %+v", pretty.ToJSON(update.eds)) + b.logger.Infof("resource update: %+v", pretty.ToJSON(update.priorities)) b.watchUpdateReceived = true - b.edsResp = update.eds + b.priorities = update.priorities // A new EDS update triggers new child configs (e.g. different priorities // for the priority balancer), and new addresses (the endpoints come from @@ -206,7 +210,7 @@ func (b *clusterResolverBalancer) updateChildConfig() error { b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts) } - childCfgBytes, addrs, err := buildPriorityConfigJSON(b.edsResp, b.config) + childCfgBytes, addrs, err := balancerconfig.BuildPriorityConfigJSON(b.priorities, b.config.EndpointPickingPolicy) if err != nil { return fmt.Errorf("failed to build priority balancer config: %v", err) } @@ -243,7 +247,7 @@ func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bo // This is an error from the parent ClientConn (can be the parent CDS // balancer), and is a resource-not-found error. This means the resource // (can be either LDS or CDS) was removed. Stop the EDS watch. - b.edsWatcher.stopWatch() + b.resourceWatcher.stop() } if b.child != nil { b.child.ResolverError(err) @@ -277,13 +281,13 @@ func (b *clusterResolverBalancer) run() { } b.child.UpdateSubConnState(update.subConn, update.state) } - case u := <-b.edsWatcher.updateChannel: + case u := <-b.resourceWatcher.updateChannel: b.handleWatchUpdate(u) // Close results in cancellation of the EDS watch and closing of the // underlying child policy and is the only way to exit this goroutine. case <-b.closed.Done(): - b.edsWatcher.stopWatch() + b.resourceWatcher.stop() if b.child != nil { b.child.Close() @@ -344,3 +348,14 @@ func (b *clusterResolverBalancer) Close() { b.closed.Fire() <-b.done.Done() } + +// ccWrapper overrides ResolveNow(), so that re-resolution from the child +// policies will trigger the DNS resolver in cluster_resolver balancer. +type ccWrapper struct { + balancer.ClientConn + resourceWatcher *resourceResolver +} + +func (c *ccWrapper) ResolveNow(resolver.ResolveNowOptions) { + c.resourceWatcher.resolveNow() +} diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index 8f3644d08bed..7e2df25e0535 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -21,24 +21,19 @@ package clusterresolver import ( - "bytes" "context" - "encoding/json" "fmt" "testing" "time" - "github.com/golang/protobuf/jsonpb" - wrapperspb "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpctest" - scpb "google.golang.org/grpc/internal/proto/grpc_service_config" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" - "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" @@ -48,8 +43,8 @@ import ( const ( defaultTestTimeout = 1 * time.Second defaultTestShortTimeout = 10 * time.Millisecond - testServiceName = "test/foo" - testClusterName = "test/cluster" + testEDSServcie = "test-eds-service-name" + testClusterName = "test-cluster-name" ) var ( @@ -105,7 +100,7 @@ func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnO return nil, nil } -func (noopTestClientConn) Target() string { return testServiceName } +func (noopTestClientConn) Target() string { return testEDSServcie } type scStateChange struct { sc balancer.SubConn @@ -213,7 +208,7 @@ func (s) TestSubConnStateChange(t *testing.T) { defer cleanup() builder := balancer.Get(Name) - edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) + edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) if edsB == nil { t.Fatalf("builder.Build(%s) failed and returned nil", Name) } @@ -221,7 +216,7 @@ func (s) TestSubConnStateChange(t *testing.T) { if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, + BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), }); err != nil { t.Fatalf("edsB.UpdateClientConnState() failed: %v", err) } @@ -259,7 +254,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { defer cleanup() builder := balancer.Get(Name) - edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) + edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) if edsB == nil { t.Fatalf("builder.Build(%s) failed and returned nil", Name) } @@ -269,7 +264,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { defer cancel() if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, + BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), }); err != nil { t.Fatal(err) } @@ -323,7 +318,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { // An update with the same service name should not trigger a new watch. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, + BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), }); err != nil { t.Fatal(err) } @@ -347,7 +342,7 @@ func (s) TestErrorFromResolver(t *testing.T) { defer cleanup() builder := balancer.Get(Name) - edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) + edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) if edsB == nil { t.Fatalf("builder.Build(%s) failed and returned nil", Name) } @@ -357,7 +352,7 @@ func (s) TestErrorFromResolver(t *testing.T) { defer cancel() if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, + BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), }); err != nil { t.Fatal(err) } @@ -408,7 +403,7 @@ func (s) TestErrorFromResolver(t *testing.T) { // the previous watch was canceled. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, + BalancerConfig: newLBConfigWithOneEDS(testEDSServcie), }); err != nil { t.Fatal(err) } @@ -449,7 +444,7 @@ func (s) TestClientWatchEDS(t *testing.T) { defer cleanup() builder := balancer.Get(Name) - edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) + edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) if edsB == nil { t.Fatalf("builder.Build(%s) failed and returned nil", Name) } @@ -460,7 +455,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // If eds service name is not set, should watch for cluster name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{ClusterName: "cluster-1"}, + BalancerConfig: newLBConfigWithOneEDS("cluster-1"), }); err != nil { t.Fatal(err) } @@ -472,7 +467,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // the same. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"}, + BalancerConfig: newLBConfigWithOneEDS("foobar-1"), }); err != nil { t.Fatal(err) } @@ -486,7 +481,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // with no resource names being sent to the server. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"}, + BalancerConfig: newLBConfigWithOneEDS("foobar-2"), }); err != nil { t.Fatal(err) } @@ -495,184 +490,12 @@ func (s) TestClientWatchEDS(t *testing.T) { } } -const ( - fakeBalancerA = "fake_balancer_A" - fakeBalancerB = "fake_balancer_B" -) - -// Install two fake balancers for service config update tests. -// -// ParseConfig only accepts the json if the balancer specified is registered. -func init() { - balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA}) - balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB}) -} - -type fakeBalancerBuilder struct { - name string -} - -func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - return &fakeBalancer{cc: cc} -} - -func (b *fakeBalancerBuilder) Name() string { - return b.name -} - -type fakeBalancer struct { - cc balancer.ClientConn -} - -func (b *fakeBalancer) ResolverError(error) { - panic("implement me") -} - -func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error { - panic("implement me") -} - -func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { - panic("implement me") -} - -func (b *fakeBalancer) Close() {} - -func (s) TestBalancerConfigParsing(t *testing.T) { - const testEDSName = "eds.service" - var testLRSName = "lrs.server" - b := bytes.NewBuffer(nil) - if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{ - ChildPolicy: []*scpb.LoadBalancingConfig{ - {Policy: &scpb.LoadBalancingConfig_Xds{}}, - {Policy: &scpb.LoadBalancingConfig_RoundRobin{ - RoundRobin: &scpb.RoundRobinConfig{}, - }}, - }, - FallbackPolicy: []*scpb.LoadBalancingConfig{ - {Policy: &scpb.LoadBalancingConfig_Xds{}}, - {Policy: &scpb.LoadBalancingConfig_PickFirst{ - PickFirst: &scpb.PickFirstConfig{}, - }}, - }, - EdsServiceName: testEDSName, - LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName}, - }); err != nil { - t.Fatalf("%v", err) - } - - var testMaxConcurrentRequests uint32 = 123 - tests := []struct { - name string - js json.RawMessage - want serviceconfig.LoadBalancingConfig - wantErr bool - }{ - { - name: "bad json", - js: json.RawMessage(`i am not JSON`), - wantErr: true, - }, - { - name: "empty", - js: json.RawMessage(`{}`), - want: &EDSConfig{}, - }, - { - name: "jsonpb-generated", - js: b.Bytes(), - want: &EDSConfig{ - ChildPolicy: &loadBalancingConfig{ - Name: "round_robin", - Config: json.RawMessage("{}"), - }, - FallBackPolicy: &loadBalancingConfig{ - Name: "pick_first", - Config: json.RawMessage("{}"), - }, - EDSServiceName: testEDSName, - LrsLoadReportingServerName: &testLRSName, - }, - }, - { - // json with random balancers, and the first is not registered. - name: "manually-generated", - js: json.RawMessage(` -{ - "childPolicy": [ - {"fake_balancer_C": {}}, - {"fake_balancer_A": {}}, - {"fake_balancer_B": {}} - ], - "fallbackPolicy": [ - {"fake_balancer_C": {}}, - {"fake_balancer_B": {}}, - {"fake_balancer_A": {}} - ], - "edsServiceName": "eds.service", - "maxConcurrentRequests": 123, - "lrsLoadReportingServerName": "lrs.server" -}`), - want: &EDSConfig{ - ChildPolicy: &loadBalancingConfig{ - Name: "fake_balancer_A", - Config: json.RawMessage("{}"), - }, - FallBackPolicy: &loadBalancingConfig{ - Name: "fake_balancer_B", - Config: json.RawMessage("{}"), - }, - EDSServiceName: testEDSName, - MaxConcurrentRequests: &testMaxConcurrentRequests, - LrsLoadReportingServerName: &testLRSName, - }, - }, - { - // json with no lrs server name, LoadReportingServerName should - // be nil (not an empty string). - name: "no-lrs-server-name", - js: json.RawMessage(` -{ - "edsServiceName": "eds.service" -}`), - want: &EDSConfig{ - EDSServiceName: testEDSName, - LrsLoadReportingServerName: nil, - }, - }, - { - name: "good child policy", - js: json.RawMessage(`{"childPolicy":[{"pick_first":{}}]}`), - want: &EDSConfig{ - ChildPolicy: &loadBalancingConfig{ - Name: "pick_first", - Config: json.RawMessage(`{}`), - }, - }, - }, - { - name: "multiple good child policies", - js: json.RawMessage(`{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`), - want: &EDSConfig{ - ChildPolicy: &loadBalancingConfig{ - Name: "round_robin", - Config: json.RawMessage(`{}`), - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := bb{}.ParseConfig(tt.js) - if (err != nil) != tt.wantErr { - t.Fatalf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr) - } - if tt.wantErr { - return - } - if !cmp.Equal(got, tt.want) { - t.Errorf(cmp.Diff(got, tt.want)) - } - }) +func newLBConfigWithOneEDS(edsServiceName string) *LBConfig { + return &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + EDSServiceName: edsServiceName, + }}, } } diff --git a/xds/internal/balancer/clusterresolver/config.go b/xds/internal/balancer/clusterresolver/config.go index 0741d6586ae3..043c834399e6 100644 --- a/xds/internal/balancer/clusterresolver/config.go +++ b/xds/internal/balancer/clusterresolver/config.go @@ -1,6 +1,6 @@ /* * - * Copyright 2019 gRPC authors. + * Copyright 2021 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,110 +19,44 @@ package clusterresolver import ( "encoding/json" - "fmt" - "google.golang.org/grpc/balancer" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" ) -// EDSConfig represents the loadBalancingConfig section of the service config -// for EDS balancers. -type EDSConfig struct { - serviceconfig.LoadBalancingConfig - // ChildPolicy represents the load balancing config for the child - // policy. - ChildPolicy *loadBalancingConfig - // FallBackPolicy represents the load balancing config for the - // fallback. - FallBackPolicy *loadBalancingConfig - // ClusterName is the cluster name. - ClusterName string - // EDSServiceName is the name to use in EDS query. If not set, use - // ClusterName. - EDSServiceName string - // MaxConcurrentRequests is the max number of concurrent request allowed for - // this service. If unset, default value 1024 is used. +// LBConfig is the config for cluster resolver balancer. +type LBConfig struct { + serviceconfig.LoadBalancingConfig `json:"-"` + // DiscoveryMechanisms is an ordered list of discovery mechanisms. // - // Note that this is not defined in the service config proto. And the reason - // is, we are dropping EDS and moving the features into cluster_impl. But in - // the mean time, to keep things working, we need to add this field. And it - // should be fine to add this extra field here, because EDS is only used in - // CDS today, so we have full control. - MaxConcurrentRequests *uint32 - // LRS server to send load reports to. If not present, load reporting - // will be disabled. If set to the empty string, load reporting will - // be sent to the same server that we obtained CDS data from. - LrsLoadReportingServerName *string -} - -// edsConfigJSON is the intermediate unmarshal result of EDSConfig. ChildPolicy -// and Fallbackspolicy are post-processed, and for each, the first installed -// policy is kept. -type edsConfigJSON struct { - ChildPolicy []*loadBalancingConfig - FallbackPolicy []*loadBalancingConfig - ClusterName string - EDSServiceName string - MaxConcurrentRequests *uint32 - LRSLoadReportingServerName *string -} - -// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. -// When unmarshalling, we iterate through the childPolicy/fallbackPolicy lists -// and select the first LB policy which has been registered. -func (l *EDSConfig) UnmarshalJSON(data []byte) error { - var configJSON edsConfigJSON - if err := json.Unmarshal(data, &configJSON); err != nil { - return err - } + // Must have at least one element. Results from each discovery mechanism are + // concatenated together in successive priorities. + DiscoveryMechanisms []balancerconfig.DiscoveryMechanism `json:"discoveryMechanisms,omitempty"` - l.ClusterName = configJSON.ClusterName - l.EDSServiceName = configJSON.EDSServiceName - l.MaxConcurrentRequests = configJSON.MaxConcurrentRequests - l.LrsLoadReportingServerName = configJSON.LRSLoadReportingServerName - - for _, lbcfg := range configJSON.ChildPolicy { - if balancer.Get(lbcfg.Name) != nil { - l.ChildPolicy = lbcfg - break - } - } - - for _, lbcfg := range configJSON.FallbackPolicy { - if balancer.Get(lbcfg.Name) != nil { - l.FallBackPolicy = lbcfg - break - } - } - return nil -} - -// MarshalJSON returns a JSON encoding of l. -func (l *EDSConfig) MarshalJSON() ([]byte, error) { - return nil, fmt.Errorf("EDSConfig.MarshalJSON() is unimplemented") -} - -// loadBalancingConfig represents a single load balancing config, -// stored in JSON format. -type loadBalancingConfig struct { - Name string - Config json.RawMessage -} + // LocalityPickingPolicy is policy for locality picking. + // + // This policy's config is expected to be in the format used by the + // weighted_target policy. Note that the config should include an empty + // value for the "targets" field; that empty value will be replaced by one + // that is dynamically generated based on the EDS data. Optional; defaults + // to "weighted_target". + LocalityPickingPolicy *internalserviceconfig.BalancerConfig `json:"localityPickingPolicy,omitempty"` + + // EndpointPickingPolicy is policy for endpoint picking. + // + // This will be configured as the policy for each child in the + // locality-policy's config. Optional; defaults to "round_robin". + EndpointPickingPolicy *internalserviceconfig.BalancerConfig `json:"endpointPickingPolicy,omitempty"` -// MarshalJSON returns a JSON encoding of l. -func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) { - return nil, fmt.Errorf("loadBalancingConfig.MarshalJSON() is unimplemented") + // TODO: read and warn if endpoint is not roundrobin or locality is not + // weightedtarget. } -// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. -func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error { - var cfg map[string]json.RawMessage - if err := json.Unmarshal(data, &cfg); err != nil { - return err - } - for name, config := range cfg { - l.Name = name - l.Config = config +func parseConfig(c json.RawMessage) (*LBConfig, error) { + var cfg LBConfig + if err := json.Unmarshal(c, &cfg); err != nil { + return nil, err } - return nil + return &cfg, nil } diff --git a/xds/internal/balancer/clusterresolver/config_test.go b/xds/internal/balancer/clusterresolver/config_test.go new file mode 100644 index 000000000000..1333692b7fca --- /dev/null +++ b/xds/internal/balancer/clusterresolver/config_test.go @@ -0,0 +1,165 @@ +// +build go1.12 + +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package clusterresolver + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" +) + +const ( + testJSONConfig1 = `{ + "discoveryMechanisms": [{ + "cluster": "test-cluster-name", + "lrsLoadReportingServerName": "test-lrs-server", + "maxConcurrentRequests": 314, + "type": "EDS", + "edsServiceName": "test-eds-service-name" + }] +}` + testJSONConfig2 = `{ + "discoveryMechanisms": [{ + "cluster": "test-cluster-name", + "lrsLoadReportingServerName": "test-lrs-server", + "maxConcurrentRequests": 314, + "type": "EDS", + "edsServiceName": "test-eds-service-name" + },{ + "type": "LOGICAL_DNS" + }] +}` + testJSONConfig3 = `{ + "discoveryMechanisms": [{ + "cluster": "test-cluster-name", + "lrsLoadReportingServerName": "test-lrs-server", + "maxConcurrentRequests": 314, + "type": "EDS", + "edsServiceName": "test-eds-service-name" + }], + "localityPickingPolicy":[{"pick_first":{}}], + "endpointPickingPolicy":[{"pick_first":{}}] +}` + + testLRSServer = "test-lrs-server" + testMaxRequests = 314 +) + +func TestParseConfig(t *testing.T) { + tests := []struct { + name string + js string + want *LBConfig + wantErr bool + }{ + { + name: "empty json", + js: "", + want: nil, + wantErr: true, + }, + { + name: "OK with one discovery mechanism", + js: testJSONConfig1, + want: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + { + Cluster: testClusterName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: balancerconfig.DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServcie, + }, + }, + LocalityPickingPolicy: nil, + EndpointPickingPolicy: nil, + }, + wantErr: false, + }, + { + name: "OK with multiple discovery mechanisms", + js: testJSONConfig2, + want: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + { + Cluster: testClusterName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: balancerconfig.DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServcie, + }, + { + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + }, + }, + LocalityPickingPolicy: nil, + EndpointPickingPolicy: nil, + }, + wantErr: false, + }, + { + name: "OK with picking policy override", + js: testJSONConfig3, + want: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + { + Cluster: testClusterName, + LoadReportingServerName: newString(testLRSServer), + MaxConcurrentRequests: newUint32(testMaxRequests), + Type: balancerconfig.DiscoveryMechanismTypeEDS, + EDSServiceName: testEDSServcie, + }, + }, + LocalityPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: "pick_first", + Config: nil, + }, + EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: "pick_first", + Config: nil, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseConfig([]byte(tt.js)) + if (err != nil) != tt.wantErr { + t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if diff := cmp.Diff(got, tt.want); diff != "" { + t.Errorf("parseConfig() got unexpected output, diff (-got +want): %v", diff) + } + }) + } +} + +func newString(s string) *string { + return &s +} + +func newUint32(i uint32) *uint32 { + return &i +} diff --git a/xds/internal/balancer/clusterresolver/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go deleted file mode 100644 index 3dd3b5309248..000000000000 --- a/xds/internal/balancer/clusterresolver/configbuilder.go +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package clusterresolver - -import ( - internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" - "google.golang.org/grpc/xds/internal/xdsclient" -) - -const million = 1000000 - -func buildPriorityConfigJSON(edsResp xdsclient.EndpointsUpdate, c *EDSConfig) ([]byte, []resolver.Address, error) { - var childConfig *internalserviceconfig.BalancerConfig - if c.ChildPolicy != nil { - childConfig = &internalserviceconfig.BalancerConfig{Name: c.ChildPolicy.Name} - } - return balancerconfig.BuildPriorityConfigJSON( - []balancerconfig.PriorityConfig{ - { - Mechanism: balancerconfig.DiscoveryMechanism{ - Cluster: c.ClusterName, - LoadReportingServerName: c.LrsLoadReportingServerName, - MaxConcurrentRequests: c.MaxConcurrentRequests, - Type: balancerconfig.DiscoveryMechanismTypeEDS, - EDSServiceName: c.EDSServiceName, - }, - EDSResp: edsResp, - }, - }, childConfig, - ) -} diff --git a/xds/internal/balancer/clusterresolver/configbuilder_test.go b/xds/internal/balancer/clusterresolver/configbuilder_test.go deleted file mode 100644 index 31f17fde7a74..000000000000 --- a/xds/internal/balancer/clusterresolver/configbuilder_test.go +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package clusterresolver - -import ( - "fmt" - "testing" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/xds/internal" - "google.golang.org/grpc/xds/internal/balancer/priority" - "google.golang.org/grpc/xds/internal/xdsclient" -) - -const ( - localityCount = 4 - addressPerLocality = 2 -) - -var ( - testLocalityIDs []internal.LocalityID - testEndpoints [][]xdsclient.Endpoint -) - -func init() { - for i := 0; i < localityCount; i++ { - testLocalityIDs = append(testLocalityIDs, internal.LocalityID{Zone: fmt.Sprintf("test-zone-%d", i)}) - var ends []xdsclient.Endpoint - for j := 0; j < addressPerLocality; j++ { - addr := fmt.Sprintf("addr-%d-%d", i, j) - ends = append(ends, xdsclient.Endpoint{ - Address: addr, - HealthStatus: xdsclient.EndpointHealthStatusHealthy, - }) - } - testEndpoints = append(testEndpoints, ends) - } -} - -// TestBuildPriorityConfigJSON is a sanity check that the generated config bytes -// are valid (can be parsed back to a config struct). -// -// The correctness is covered by the unmarshalled version -// TestBuildPriorityConfig. -func TestBuildPriorityConfigJSON(t *testing.T) { - const ( - testClusterName = "cluster-name-for-watch" - testEDSServiceName = "service-name-from-parent" - testLRSServer = "lrs-addr-from-config" - testMaxReq = 314 - testDropCategory = "test-drops" - testDropOverMillion = 1 - ) - for _, lrsServer := range []*string{newString(testLRSServer), newString(""), nil} { - got, _, err := buildPriorityConfigJSON(xdsclient.EndpointsUpdate{ - Drops: []xdsclient.OverloadDropConfig{{ - Category: testDropCategory, - Numerator: testDropOverMillion, - Denominator: million, - }}, - Localities: []xdsclient.Locality{{ - Endpoints: testEndpoints[3], - ID: testLocalityIDs[3], - Weight: 80, - Priority: 1, - }, { - Endpoints: testEndpoints[1], - ID: testLocalityIDs[1], - Weight: 80, - Priority: 0, - }, { - Endpoints: testEndpoints[2], - ID: testLocalityIDs[2], - Weight: 20, - Priority: 1, - }, { - Endpoints: testEndpoints[0], - ID: testLocalityIDs[0], - Weight: 20, - Priority: 0, - }}}, - &EDSConfig{ - ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}, - ClusterName: testClusterName, - EDSServiceName: testEDSServiceName, - MaxConcurrentRequests: newUint32(testMaxReq), - LrsLoadReportingServerName: lrsServer, - }, - ) - if err != nil { - t.Fatalf("buildPriorityConfigJSON(...) failed: %v", err) - } - priorityB := balancer.Get(priority.Name) - if _, err = priorityB.(balancer.ConfigParser).ParseConfig(got); err != nil { - t.Fatalf("ParseConfig(%+v) failed: %v", got, err) - } - } -} - -func newString(s string) *string { - return &s -} - -func newUint32(i uint32) *uint32 { - return &i -} diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go index 9a41fa9e2b33..bf7e7f6c421c 100644 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/eds_impl_test.go @@ -31,9 +31,11 @@ import ( "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/stub" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/balancer/balancergroup" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/balancer/weightedtarget" "google.golang.org/grpc/xds/internal/testutils" @@ -59,11 +61,11 @@ func init() { balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond * 100 } -func setupTestEDS(t *testing.T, initChild *loadBalancingConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) { +func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) { xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) cc := testutils.NewTestClientConn(t) builder := balancer.Get(Name) - edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) + edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) if edsb == nil { t.Fatalf("builder.Build(%s) failed and returned nil", Name) } @@ -71,9 +73,12 @@ func setupTestEDS(t *testing.T, initChild *loadBalancingConfig) (balancer.Balanc defer cancel() if err := edsb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &EDSConfig{ - ClusterName: testClusterName, - ChildPolicy: initChild, + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + }}, + EndpointPickingPolicy: initChild, }, }); err != nil { edsb.Close() @@ -462,10 +467,17 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { stub.Register(balancerName, stub.BalancerFuncs{ UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error { - if len(s.ResolverState.Addresses) == 0 { - return nil + m, _ := bd.Data.(map[string]bool) + if m == nil { + m = make(map[string]bool) + bd.Data = m + } + for _, addr := range s.ResolverState.Addresses { + if !m[addr.Addr] { + m[addr.Addr] = true + bd.ClientConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) + } } - bd.ClientConn.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{}) return nil }, UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { @@ -477,9 +489,24 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { }) t.Logf("initialize with sub-balancer: stub-balancer") - edsb, cc, xdsC, cleanup := setupTestEDS(t, &loadBalancingConfig{Name: balancerName}) + edsb, cc, xdsC, cleanup := setupTestEDS(t, &internalserviceconfig.BalancerConfig{Name: balancerName}) defer cleanup() + t.Logf("update sub-balancer to stub-balancer") + if err := edsb.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + }}, + EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: balancerName, + }, + }, + }); err != nil { + t.Fatal(err) + } + // Two localities, each with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) @@ -497,10 +524,19 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Logf("update sub-balancer to round-robin") if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - BalancerConfig: &EDSConfig{ClusterName: testClusterName, ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}}, + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + }}, + EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, }); err != nil { t.Fatal(err) } + for i := 0; i < 2; i++ { <-cc.RemoveSubConnCh } @@ -518,10 +554,19 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Logf("update sub-balancer to stub-balancer") if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - BalancerConfig: &EDSConfig{ClusterName: testClusterName, ChildPolicy: &loadBalancingConfig{Name: balancerName}}, + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + }}, + EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: balancerName, + }, + }, }); err != nil { t.Fatal(err) } + for i := 0; i < 2; i++ { scToRemove := <-cc.RemoveSubConnCh if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) && @@ -542,10 +587,19 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Logf("update sub-balancer to round-robin") if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - BalancerConfig: &EDSConfig{ClusterName: testClusterName, ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}}, + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + }}, + EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, }); err != nil { t.Fatal(err) } + for i := 0; i < 2; i++ { <-cc.RemoveSubConnCh } @@ -568,14 +622,20 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { var maxRequests uint32 = 50 if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - BalancerConfig: &EDSConfig{ - ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}, - ClusterName: testClusterName, - MaxConcurrentRequests: &maxRequests, + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + MaxConcurrentRequests: &maxRequests, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + }}, + EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, }, }); err != nil { t.Fatal(err) } + // One locality with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) @@ -628,14 +688,20 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { // update afterwards). Make sure the new picker uses the new configs. var maxRequests2 uint32 = 10 if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - BalancerConfig: &EDSConfig{ - ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}, - ClusterName: testClusterName, - MaxConcurrentRequests: &maxRequests2, + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + Cluster: testClusterName, + MaxConcurrentRequests: &maxRequests2, + Type: balancerconfig.DiscoveryMechanismTypeEDS, + }}, + EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, }, }); err != nil { t.Fatal(err) } + // Picks with drops. dones = []func(){} p2 := <-cc.NewPickerCh diff --git a/xds/internal/balancer/clusterresolver/eds_watcher.go b/xds/internal/balancer/clusterresolver/eds_watcher.go deleted file mode 100644 index 02186702c0ec..000000000000 --- a/xds/internal/balancer/clusterresolver/eds_watcher.go +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package clusterresolver - -import ( - "google.golang.org/grpc/xds/internal/xdsclient" -) - -// watchUpdate wraps the information received from a registered EDS watcher. A -// non-nil error is propagated to the underlying child balancer. A valid update -// results in creating a new child balancer (priority balancer, if one doesn't -// already exist) and pushing the updated balancer config to it. -type watchUpdate struct { - eds xdsclient.EndpointsUpdate - err error -} - -// edsWatcher takes an EDS balancer config, and use the xds_client to watch EDS -// updates. The EDS updates are passed back to the balancer via a channel. -type edsWatcher struct { - parent *clusterResolverBalancer - - updateChannel chan *watchUpdate - - edsToWatch string - edsCancel func() -} - -func (ew *edsWatcher) updateConfig(config *EDSConfig) { - // If EDSServiceName is set, use it to watch EDS. Otherwise, use the cluster - // name. - newEDSToWatch := config.EDSServiceName - if newEDSToWatch == "" { - newEDSToWatch = config.ClusterName - } - - if ew.edsToWatch == newEDSToWatch { - return - } - - // Restart EDS watch when the eds name to watch has changed. - ew.edsToWatch = newEDSToWatch - - if ew.edsCancel != nil { - ew.edsCancel() - } - cancelEDSWatch := ew.parent.xdsClient.WatchEndpoints(newEDSToWatch, func(update xdsclient.EndpointsUpdate, err error) { - select { - case <-ew.updateChannel: - default: - } - ew.updateChannel <- &watchUpdate{eds: update, err: err} - }) - ew.parent.logger.Infof("Watch started on resource name %v with xds-client %p", newEDSToWatch, ew.parent.xdsClient) - ew.edsCancel = func() { - cancelEDSWatch() - ew.parent.logger.Infof("Watch cancelled on resource name %v with xds-client %p", newEDSToWatch, ew.parent.xdsClient) - } - -} - -// stopWatch stops the EDS watch. -// -// Call to updateConfig will restart the watch with the new name. -func (ew *edsWatcher) stopWatch() { - if ew.edsCancel != nil { - ew.edsCancel() - ew.edsCancel = nil - } - ew.edsToWatch = "" -} diff --git a/xds/internal/balancer/clusterresolver/priority_test.go b/xds/internal/balancer/clusterresolver/priority_test.go index a4c6d5b1c658..b2935be0c362 100644 --- a/xds/internal/balancer/clusterresolver/priority_test.go +++ b/xds/internal/balancer/clusterresolver/priority_test.go @@ -28,6 +28,8 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/testutils" ) @@ -712,3 +714,107 @@ func (s) TestEDSPriority_FirstPriorityRemoved(t *testing.T) { t.Fatal(err) } } + +// Watch resources from EDS and DNS, with EDS as the higher priority. Lower +// priority is used when higher priority is not ready. +func (s) TestFallbackToDNS(t *testing.T) { + const testDNSEndpointAddr = "3.1.4.1:5" + // dnsTargetCh, dnsCloseCh, resolveNowCh, dnsR, cleanup := setupDNS() + dnsTargetCh, _, resolveNowCh, dnsR, cleanupDNS := setupDNS() + defer cleanupDNS() + edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) + defer cleanup() + + if err := edsb.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + { + Type: balancerconfig.DiscoveryMechanismTypeEDS, + Cluster: testClusterName, + }, + { + Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: testDNSTarget, + }, + }, + }, + }); err != nil { + t.Fatal(err) + } + + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + select { + case target := <-dnsTargetCh: + if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}); diff != "" { + t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for building DNS resolver") + } + + // One locality with one backend. + clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) + + // Also send a DNS update, because the balancer needs both updates from all + // resources to move on. + dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: testDNSEndpointAddr}}}) + + addrs0 := <-cc.NewSubConnAddrsCh + if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { + t.Fatalf("sc is created with addr %v, want %v", got, want) + } + sc0 := <-cc.NewSubConnCh + + // p0 is ready. + edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Test roundrobin with only p0 subconns. + if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0}); err != nil { + t.Fatal(err) + } + + // Turn down 0, p1 (DNS) will be used. + edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + + // The transient failure above should not trigger a re-resolve to the DNS + // resolver. Need to read to clear the channel, to avoid potential deadlock + // writing to the channel later. + shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer shortCancel() + select { + case <-resolveNowCh: + t.Fatal("unexpected re-resolve trigger by transient failure from EDS endpoint") + case <-shortCtx.Done(): + } + + // The addresses used to create new SubConn should be the DNS endpoint. + addrs1 := <-cc.NewSubConnAddrsCh + if got, want := addrs1[0].Addr, testDNSEndpointAddr; got != want { + t.Fatalf("sc is created with addr %v, want %v", got, want) + } + sc1 := <-cc.NewSubConnCh + edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Test pick with 1. + if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil { + t.Fatal(err) + } + + // Turn down the DNS endpoint, this should trigger an re-resolve in the DNS + // resolver. + edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + + // The transient failure above should trigger a re-resolve to the DNS + // resolver. Need to read to clear the channel, to avoid potential deadlock + // writing to the channel later. + select { + case <-resolveNowCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for re-resolve") + } +} diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index 29aed0e72f4a..e68d77d3efe9 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -28,8 +28,8 @@ import ( // resourceUpdate is a combined update from all the resources, in the order of // priority. For example, it can be {EDS, EDS, DNS}. type resourceUpdate struct { - p []balancerconfig.PriorityConfig - err error + priorities []balancerconfig.PriorityConfig + err error } type discoveryMechanism interface { @@ -197,7 +197,7 @@ func (rr *resourceResolver) generate() { case <-rr.updateChannel: default: } - rr.updateChannel <- &resourceUpdate{p: ret} + rr.updateChannel <- &resourceUpdate{priorities: ret} } type edsDiscoveryMechanism struct { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_test.go b/xds/internal/balancer/clusterresolver/resource_resolver_test.go index 9a9438155098..621ca2a127c8 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_test.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_test.go @@ -62,14 +62,14 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) { }{ {name: "watch EDS", clusterName: testClusterName, - edsName: testServiceName, - wantName: testServiceName, + edsName: testEDSServcie, + wantName: testEDSServcie, edsUpdate: testEDSUpdates[0], want: []balancerconfig.PriorityConfig{{ Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, Cluster: testClusterName, - EDSServiceName: testServiceName, + EDSServiceName: testEDSServcie, }, EDSResp: testEDSUpdates[0], }}, @@ -110,7 +110,7 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) { fakeClient.InvokeWatchEDSCallback("", test.edsUpdate, nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, test.want); diff != "" { + if diff := cmp.Diff(u.priorities, test.want); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -123,7 +123,7 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) { t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) } if edsNameCanceled != test.wantName { - t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, testServiceName) + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, testEDSServcie) } }) } @@ -192,7 +192,7 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: test.addrs}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, test.want); diff != "" { + if diff := cmp.Diff(u.priorities, test.want); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -224,7 +224,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ Type: balancerconfig.DiscoveryMechanismTypeEDS, Cluster: testClusterName, - EDSServiceName: testServiceName, + EDSServiceName: testEDSServcie, }}) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() @@ -232,19 +232,19 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { if err != nil { t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) } - if gotEDSName1 != testServiceName { - t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testServiceName) + if gotEDSName1 != testEDSServcie { + t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testEDSServcie) } // Invoke callback, should get an update. fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, Cluster: testClusterName, - EDSServiceName: testServiceName, + EDSServiceName: testEDSServcie, }, EDSResp: testEDSUpdates[0], }}); diff != "" { @@ -264,7 +264,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) } if edsNameCanceled1 != gotEDSName1 { - t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, testServiceName) + t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, testEDSServcie) } gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx) if err != nil { @@ -287,7 +287,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, Cluster: testClusterName, @@ -314,7 +314,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { } select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, Cluster: testClusterName, @@ -385,7 +385,7 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ { Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, @@ -497,7 +497,7 @@ func (s) TestResourceResolverChangePriority(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ { Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, @@ -538,7 +538,7 @@ func (s) TestResourceResolverChangePriority(t *testing.T) { } select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ { Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, @@ -625,7 +625,7 @@ func (s) TestResourceResolverEDSAndDNS(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ { Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, @@ -687,7 +687,7 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeEDS, Cluster: testClusterName, @@ -724,7 +724,7 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, @@ -845,7 +845,7 @@ func (s) TestResourceResolverDNSResolveNow(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{ + if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ Mechanism: balancerconfig.DiscoveryMechanism{ Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, diff --git a/xds/internal/balancer/priority/config.go b/xds/internal/balancer/priority/config.go index c9cb16e323f0..37f1c9a829a8 100644 --- a/xds/internal/balancer/priority/config.go +++ b/xds/internal/balancer/priority/config.go @@ -29,7 +29,7 @@ import ( // Child is a child of priority balancer. type Child struct { Config *internalserviceconfig.BalancerConfig `json:"config,omitempty"` - IgnoreReresolutionRequests bool + IgnoreReresolutionRequests bool `json:"ignoreReresolutionRequests,omitempty"` } // LBConfig represents priority balancer's config.