From 5390859af97e747b5151619b63591b35985bf5d2 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 22 Sep 2021 16:01:18 -0700 Subject: [PATCH] [v1.41.x_backport] xds: fix parent balancers to handle Idle children (#4801) --- .../clustermanager/balancerstateaggregator.go | 9 ++- .../clustermanager/clustermanager_test.go | 65 +++++++++++++++++++ .../weightedaggregator/aggregator.go | 8 ++- .../weightedtarget/weightedtarget_test.go | 61 +++++++++++++++++ 4 files changed, 141 insertions(+), 2 deletions(-) diff --git a/xds/internal/balancer/clustermanager/balancerstateaggregator.go b/xds/internal/balancer/clustermanager/balancerstateaggregator.go index 35eb86c3590c..6e0e03299f95 100644 --- a/xds/internal/balancer/clustermanager/balancerstateaggregator.go +++ b/xds/internal/balancer/clustermanager/balancerstateaggregator.go @@ -183,13 +183,18 @@ func (bsa *balancerStateAggregator) build() balancer.State { // handling the special connecting after ready, as in UpdateState(). Then a // function to calculate the aggregated connectivity state as in this // function. - var readyN, connectingN int + // + // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated + // state. + var readyN, connectingN, idleN int for _, ps := range bsa.idToPickerState { switch ps.stateToAggregate { case connectivity.Ready: readyN++ case connectivity.Connecting: connectingN++ + case connectivity.Idle: + idleN++ } } var aggregatedState connectivity.State @@ -198,6 +203,8 @@ func (bsa *balancerStateAggregator) build() balancer.State { aggregatedState = connectivity.Ready case connectingN > 0: aggregatedState = connectivity.Connecting + case idleN > 0: + aggregatedState = connectivity.Idle default: aggregatedState = connectivity.TransientFailure } diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index a40d954ad64f..d3475ea3f5d8 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -565,3 +565,68 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { t.Fatal(err2) } } + +const initIdleBalancerName = "test-init-Idle-balancer" + +var errTestInitIdle = fmt.Errorf("init Idle balancer error 0") + +func init() { + stub.Register(initIdleBalancerName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error { + bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{}) + return nil + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + err := fmt.Errorf("wrong picker error") + if state.ConnectivityState == connectivity.Idle { + err = errTestInitIdle + } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &testutils.TestConstPicker{Err: err}, + }) + }, + }) +} + +// TestInitialIdle covers the case that if the child reports Idle, the overall +// state will be Idle. +func TestInitialIdle(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"children": { + "cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] } +} +}` + + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + sc := <-cc.NewSubConnCh + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + } + + if state1 := <-cc.NewStateCh; state1 != connectivity.Idle { + t.Fatalf("Received aggregated state: %v, want Idle", state1) + } +} diff --git a/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go b/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go index 6c36e2a69cd9..7e1d106e9ff9 100644 --- a/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go +++ b/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go @@ -200,7 +200,9 @@ func (wbsa *Aggregator) BuildAndUpdate() { func (wbsa *Aggregator) build() balancer.State { wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState) m := wbsa.idToPickerState - var readyN, connectingN int + // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated + // state. + var readyN, connectingN, idleN int readyPickerWithWeights := make([]weightedPickerState, 0, len(m)) for _, ps := range m { switch ps.stateToAggregate { @@ -209,6 +211,8 @@ func (wbsa *Aggregator) build() balancer.State { readyPickerWithWeights = append(readyPickerWithWeights, *ps) case connectivity.Connecting: connectingN++ + case connectivity.Idle: + idleN++ } } var aggregatedState connectivity.State @@ -217,6 +221,8 @@ func (wbsa *Aggregator) build() balancer.State { aggregatedState = connectivity.Ready case connectingN > 0: aggregatedState = connectivity.Connecting + case idleN > 0: + aggregatedState = connectivity.Idle default: aggregatedState = connectivity.TransientFailure } diff --git a/xds/internal/balancer/weightedtarget/weightedtarget_test.go b/xds/internal/balancer/weightedtarget/weightedtarget_test.go index eeebab733d61..b0e4df895885 100644 --- a/xds/internal/balancer/weightedtarget/weightedtarget_test.go +++ b/xds/internal/balancer/weightedtarget/weightedtarget_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/hierarchy" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -263,3 +264,63 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { return scst.SubConn } } + +const initIdleBalancerName = "test-init-Idle-balancer" + +var errTestInitIdle = fmt.Errorf("init Idle balancer error 0") + +func init() { + stub.Register(initIdleBalancerName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error { + bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{}) + return nil + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + err := fmt.Errorf("wrong picker error") + if state.ConnectivityState == connectivity.Idle { + err = errTestInitIdle + } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &testutils.TestConstPicker{Err: err}, + }) + }, + }) +} + +// TestInitialIdle covers the case that if the child reports Idle, the overall +// state will be Idle. +func TestInitialIdle(t *testing.T) { + cc := testutils.NewTestClientConn(t) + wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) + + // Start with "cluster_1: round_robin". + config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"test-init-Idle-balancer":""}]}}}`)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + } + if err := wtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + sc := <-cc.NewSubConnCh + wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + } + + if state1 := <-cc.NewStateCh; state1 != connectivity.Idle { + t.Fatalf("Received aggregated state: %v, want Idle", state1) + } +}