From d32ffc604586401ef9578ba03d29b2c695e5fdaf Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 13 May 2024 16:07:22 +0000 Subject: [PATCH 1/5] xds: remove duplicate code in clustermanager by using ConnectivityStateEvaluator --- internal/balancergroup/balancergroup.go | 16 ++- internal/balancergroup/balancergroup_test.go | 3 +- .../clustermanager/balancerstateaggregator.go | 130 +++++------------- .../balancer/clustermanager/clustermanager.go | 20 +-- .../clustermanager/clustermanager_test.go | 26 ++-- 5 files changed, 70 insertions(+), 125 deletions(-) diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index 4cee66aeb6e6..9187285f7f1a 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -380,9 +380,14 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { // UpdateBuilder updates the builder for a current child, starting the Graceful // Switch process for that child. // -// TODO: update this API to take the name of the new builder instead. -func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) { +// Returns a non-nil error if the provided builder is not registered. +func (bg *BalancerGroup) UpdateBuilder(id, builder string) error { + bldr := balancer.Get(builder) + if bldr == nil { + return fmt.Errorf("balancer builder %q not found", builder) + } bg.outgoingMu.Lock() + defer bg.outgoingMu.Unlock() // This does not deal with the balancer cache because this call should come // after an Add call for a given child balancer. If the child is removed, // the caller will call Add if the child balancer comes back which would @@ -390,10 +395,11 @@ func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) { sbc := bg.idToBalancerConfig[id] if sbc == nil { // simply ignore it if not present, don't error - return + return nil } - sbc.gracefulSwitch(builder) - bg.outgoingMu.Unlock() + sbc.gracefulSwitch(bldr) + return nil + } // Remove removes the balancer with id from the group. diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index f1153e48ef7a..4503b05d2e9d 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -609,8 +609,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { return bal.UpdateClientConnState(ccs) }, }) - builder := balancer.Get(childPolicyName) - bg.UpdateBuilder(testBalancerIDs[0], builder) + bg.UpdateBuilder(testBalancerIDs[0], childPolicyName) if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil { t.Fatalf("error updating ClientConn state: %v", err) } diff --git a/xds/internal/balancer/clustermanager/balancerstateaggregator.go b/xds/internal/balancer/clustermanager/balancerstateaggregator.go index 4b971a3e241b..9b124c25d306 100644 --- a/xds/internal/balancer/clustermanager/balancerstateaggregator.go +++ b/xds/internal/balancer/clustermanager/balancerstateaggregator.go @@ -43,19 +43,16 @@ func (s *subBalancerState) String() string { } type balancerStateAggregator struct { - cc balancer.ClientConn - logger *grpclog.PrefixLogger + cc balancer.ClientConn + logger *grpclog.PrefixLogger + csEvltr *balancer.ConnectivityStateEvaluator mu sync.Mutex - // If started is false, no updates should be sent to the parent cc. A closed - // sub-balancer could still send pickers to this aggregator. This makes sure - // that no updates will be forwarded to parent when the whole balancer group - // and states aggregator is closed. - started bool - // All balancer IDs exist as keys in this map, even if balancer group is not - // started. - // - // If an ID is not in map, it's either removed or never added. + // This field is used to ensure that no updates are forwarded to the parent + // CC once the aggregator is closed. A closed sub-balancer could still send + // pickers to this aggregator. + closed bool + // Map from child policy name to last reported state. idToPickerState map[string]*subBalancerState // Set when UpdateState call propagation is paused. pauseUpdateState bool @@ -68,34 +65,24 @@ func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLo return &balancerStateAggregator{ cc: cc, logger: logger, + csEvltr: &balancer.ConnectivityStateEvaluator{}, idToPickerState: make(map[string]*subBalancerState), } } -// Start starts the aggregator. It can be called after Close to restart the -// aggretator. -func (bsa *balancerStateAggregator) start() { - bsa.mu.Lock() - defer bsa.mu.Unlock() - bsa.started = true -} - -// Close closes the aggregator. When the aggregator is closed, it won't call -// parent ClientConn to update balancer state. func (bsa *balancerStateAggregator) close() { bsa.mu.Lock() defer bsa.mu.Unlock() - bsa.started = false - bsa.clearStates() + bsa.closed = true } -// add adds a sub-balancer state with weight. It adds a place holder, and waits -// for the real sub-balancer to update state. +// add adds a sub-balancer in CONNECTING state. // // This is called when there's a new child. func (bsa *balancerStateAggregator) add(id string) { bsa.mu.Lock() defer bsa.mu.Unlock() + bsa.idToPickerState[id] = &subBalancerState{ // Start everything in CONNECTING, so if one of the sub-balancers // reports TransientFailure, the RPCs will still wait for the other @@ -106,6 +93,8 @@ func (bsa *balancerStateAggregator) add(id string) { }, stateToAggregate: connectivity.Connecting, } + bsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting) + bsa.buildAndUpdateLocked() } // remove removes the sub-balancer state. Future updates from this sub-balancer, @@ -118,9 +107,15 @@ func (bsa *balancerStateAggregator) remove(id string) { if _, ok := bsa.idToPickerState[id]; !ok { return } + // Setting the state of the deleted sub-balancer to Shutdown will get + // csEvltr to remove the previous state for any aggregated state + // evaluations. Transitions to and from connectivity.Shutdown are ignored + // by csEvltr. + bsa.csEvltr.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown) // Remove id and picker from picker map. This also results in future updates // for this ID to be ignored. delete(bsa.idToPickerState, id) + bsa.buildAndUpdateLocked() } // pauseStateUpdates causes UpdateState calls to not propagate to the parent @@ -140,7 +135,7 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() { defer bsa.mu.Unlock() bsa.pauseUpdateState = false if bsa.needUpdateStateOnResume { - bsa.cc.UpdateState(bsa.build()) + bsa.cc.UpdateState(bsa.buildLocked()) } } @@ -149,6 +144,8 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() { // // It calls parent ClientConn's UpdateState with the new aggregated state. func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) { + bsa.logger.Infof("State update from sub-balancer %q: %+v", id, state) + bsa.mu.Lock() defer bsa.mu.Unlock() pickerSt, ok := bsa.idToPickerState[id] @@ -162,42 +159,17 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) // update the state, to prevent the aggregated state from being always // CONNECTING. Otherwise, stateToAggregate is the same as // state.ConnectivityState. + bsa.csEvltr.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState) pickerSt.stateToAggregate = state.ConnectivityState } pickerSt.state = state - - if !bsa.started { - return - } - if bsa.pauseUpdateState { - // If updates are paused, do not call UpdateState, but remember that we - // need to call it when they are resumed. - bsa.needUpdateStateOnResume = true - return - } - bsa.cc.UpdateState(bsa.build()) -} - -// clearState Reset everything to init state (Connecting) but keep the entry in -// map (to keep the weight). -// -// Caller must hold bsa.mu. -func (bsa *balancerStateAggregator) clearStates() { - for _, pState := range bsa.idToPickerState { - pState.state = balancer.State{ - ConnectivityState: connectivity.Connecting, - Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), - } - pState.stateToAggregate = connectivity.Connecting - } + bsa.buildAndUpdateLocked() } -// buildAndUpdate combines the sub-state from each sub-balancer into one state, -// and update it to parent ClientConn. -func (bsa *balancerStateAggregator) buildAndUpdate() { - bsa.mu.Lock() - defer bsa.mu.Unlock() - if !bsa.started { +// buildAndUpdateLocked combines the sub-state from each sub-balancer into one +// state, and sends a picker update to the parent ClientConn. +func (bsa *balancerStateAggregator) buildAndUpdateLocked() { + if bsa.closed { return } if bsa.pauseUpdateState { @@ -206,47 +178,11 @@ func (bsa *balancerStateAggregator) buildAndUpdate() { bsa.needUpdateStateOnResume = true return } - bsa.cc.UpdateState(bsa.build()) + bsa.cc.UpdateState(bsa.buildLocked()) } -// build combines sub-states into one. The picker will do a child pick. -// -// Caller must hold bsa.mu. -func (bsa *balancerStateAggregator) build() balancer.State { - // TODO: the majority of this function (and UpdateState) is exactly the same - // as weighted_target's state aggregator. Try to make a general utility - // function/struct to handle the logic. - // - // One option: make a SubBalancerState that handles Update(State), including - // handling the special connecting after ready, as in UpdateState(). Then a - // function to calculate the aggregated connectivity state as in this - // function. - // - // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated - // state. - var readyN, connectingN, idleN int - for _, ps := range bsa.idToPickerState { - switch ps.stateToAggregate { - case connectivity.Ready: - readyN++ - case connectivity.Connecting: - connectingN++ - case connectivity.Idle: - idleN++ - } - } - var aggregatedState connectivity.State - switch { - case readyN > 0: - aggregatedState = connectivity.Ready - case connectingN > 0: - aggregatedState = connectivity.Connecting - case idleN > 0: - aggregatedState = connectivity.Idle - default: - aggregatedState = connectivity.TransientFailure - } - +// buildLocked combines sub-states into one. +func (bsa *balancerStateAggregator) buildLocked() balancer.State { // The picker's return error might not be consistent with the // aggregatedState. Because for this LB policy, we want to always build // picker with all sub-pickers (not only ready sub-pickers), so even if the @@ -254,7 +190,7 @@ func (bsa *balancerStateAggregator) build() balancer.State { // or TransientFailure. bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState) return balancer.State{ - ConnectivityState: aggregatedState, + ConnectivityState: bsa.csEvltr.CurrentState(), Picker: newPickerGroup(bsa.idToPickerState), } } diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index db8332b90eac..7e152e1b798e 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -46,7 +46,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal b := &bal{} b.logger = prefixLogger(b) b.stateAggregator = newBalancerStateAggregator(cc, b.logger) - b.stateAggregator.start() b.bg = balancergroup.New(balancergroup.Options{ CC: cc, BuildOpts: opts, @@ -79,7 +78,6 @@ type bal struct { } func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { - update := false addressesSplit := hierarchy.Group(s.ResolverState.Addresses) // Remove sub-pickers and sub-balancers that are not in the new cluster list. @@ -87,7 +85,6 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { if _, ok := newConfig.Children[name]; !ok { b.stateAggregator.remove(name) b.bg.Remove(name) - update = true } } @@ -101,9 +98,15 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { // Then add to the balancer group. b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name)) } else { - // Already present, check for type change and if so send down a new builder. + // Already present, check for type change and if so send down a new + // builder. if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name { - b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name)) + // Safe to ignore the returned error value because ParseConfig + // ensures that the child policy name is a registered LB policy. + b.bg.UpdateBuilder(name, newT.ChildPolicy.Name) + // Picker update is sent to the parent ClientConn only after the + // new child policy returns a picker. So, there is no need to + // set needUpdateStateOnResume to true here. } } // TODO: handle error? How to aggregate errors and return? @@ -118,9 +121,10 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { } b.children = newConfig.Children - if update { - b.stateAggregator.buildAndUpdate() - } + + // Adding, removing or updating a sub-balancer will result in the + // needUpdateStateOnResume bit to true which results in a picker update once + // resumeStateUpdates() is called. } func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error { diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index b998c1b35f29..48359f0931d2 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -73,7 +73,7 @@ func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC ba } } -func TestClusterPicks(t *testing.T) { +func (s) TestClusterPicks(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) @@ -152,7 +152,7 @@ func TestClusterPicks(t *testing.T) { // TestConfigUpdateAddCluster covers the cases the balancer receives config // update with extra clusters. -func TestConfigUpdateAddCluster(t *testing.T) { +func (s) TestConfigUpdateAddCluster(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) @@ -310,7 +310,7 @@ func TestConfigUpdateAddCluster(t *testing.T) { // TestRoutingConfigUpdateDeleteAll covers the cases the balancer receives // config update with no clusters. Pick should fail with details in error. -func TestRoutingConfigUpdateDeleteAll(t *testing.T) { +func (s) TestRoutingConfigUpdateDeleteAll(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) @@ -471,7 +471,7 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { } } -func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { +func (s) TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { const ( userAgent = "ua" defaultTestTimeout = 1 * time.Second @@ -525,7 +525,7 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { } } -const initIdleBalancerName = "test-init-Idle-balancer" +const initIdleBalancerName = "test-init-idle-balancer" var errTestInitIdle = fmt.Errorf("init Idle balancer error 0") @@ -555,18 +555,18 @@ func init() { // TestInitialIdle covers the case that if the child reports Idle, the overall // state will be Idle. -func TestInitialIdle(t *testing.T) { +func (s) TestInitialIdle(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) bal := builder.Build(cc, balancer.BuildOptions{}) - configJSON1 := `{ + configJSON := `{ "children": { - "cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] } + "cds:cluster_1":{ "childPolicy": [{"test-init-idle-balancer":""}] } } }` - config1, err := parser.ParseConfig([]byte(configJSON1)) + config, err := parser.ParseConfig([]byte(configJSON)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } @@ -579,7 +579,7 @@ func TestInitialIdle(t *testing.T) { ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), }}, - BalancerConfig: config1, + BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } @@ -591,8 +591,8 @@ func TestInitialIdle(t *testing.T) { sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) } - if state1 := <-cc.NewStateCh; state1 != connectivity.Idle { - t.Fatalf("Received aggregated state: %v, want Idle", state1) + if state := <-cc.NewStateCh; state != connectivity.Idle { + t.Fatalf("Received aggregated state: %v, want Idle", state) } } @@ -602,7 +602,7 @@ func TestInitialIdle(t *testing.T) { // switches this child to a pick first load balancer. Once that balancer updates // it's state and completes the graceful switch process the new picker should // reflect this change. -func TestClusterGracefulSwitch(t *testing.T) { +func (s) TestClusterGracefulSwitch(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) From 6b8a66c9e7f63a1eb9b8e3eb57d0be91cb13e008 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 13 May 2024 19:49:46 +0000 Subject: [PATCH 2/5] s/csEvltr/csEval/ --- .../clustermanager/balancerstateaggregator.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/xds/internal/balancer/clustermanager/balancerstateaggregator.go b/xds/internal/balancer/clustermanager/balancerstateaggregator.go index 9b124c25d306..92c69f5e1fc2 100644 --- a/xds/internal/balancer/clustermanager/balancerstateaggregator.go +++ b/xds/internal/balancer/clustermanager/balancerstateaggregator.go @@ -43,9 +43,9 @@ func (s *subBalancerState) String() string { } type balancerStateAggregator struct { - cc balancer.ClientConn - logger *grpclog.PrefixLogger - csEvltr *balancer.ConnectivityStateEvaluator + cc balancer.ClientConn + logger *grpclog.PrefixLogger + csEval *balancer.ConnectivityStateEvaluator mu sync.Mutex // This field is used to ensure that no updates are forwarded to the parent @@ -65,7 +65,7 @@ func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLo return &balancerStateAggregator{ cc: cc, logger: logger, - csEvltr: &balancer.ConnectivityStateEvaluator{}, + csEval: &balancer.ConnectivityStateEvaluator{}, idToPickerState: make(map[string]*subBalancerState), } } @@ -93,7 +93,7 @@ func (bsa *balancerStateAggregator) add(id string) { }, stateToAggregate: connectivity.Connecting, } - bsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting) + bsa.csEval.RecordTransition(connectivity.Shutdown, connectivity.Connecting) bsa.buildAndUpdateLocked() } @@ -111,7 +111,7 @@ func (bsa *balancerStateAggregator) remove(id string) { // csEvltr to remove the previous state for any aggregated state // evaluations. Transitions to and from connectivity.Shutdown are ignored // by csEvltr. - bsa.csEvltr.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown) + bsa.csEval.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown) // Remove id and picker from picker map. This also results in future updates // for this ID to be ignored. delete(bsa.idToPickerState, id) @@ -159,7 +159,7 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) // update the state, to prevent the aggregated state from being always // CONNECTING. Otherwise, stateToAggregate is the same as // state.ConnectivityState. - bsa.csEvltr.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState) + bsa.csEval.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState) pickerSt.stateToAggregate = state.ConnectivityState } pickerSt.state = state @@ -190,7 +190,7 @@ func (bsa *balancerStateAggregator) buildLocked() balancer.State { // or TransientFailure. bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState) return balancer.State{ - ConnectivityState: bsa.csEvltr.CurrentState(), + ConnectivityState: bsa.csEval.CurrentState(), Picker: newPickerGroup(bsa.idToPickerState), } } From 2151bd432d88ef78091f6837412328c5307be191 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 13 May 2024 20:12:31 +0000 Subject: [PATCH 3/5] api changes to balancergroup --- internal/balancergroup/balancergroup.go | 55 +-- internal/balancergroup/balancergroup_test.go | 12 +- .../balancer/clustermanager/clustermanager.go | 106 ++++-- .../clustermanager/clustermanager_test.go | 106 +++--- .../e2e_test/clustermanager_test.go | 332 ++++++++++++++++++ 5 files changed, 486 insertions(+), 125 deletions(-) create mode 100644 xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index 9187285f7f1a..5496b99dd5c4 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -19,6 +19,7 @@ package balancergroup import ( + "encoding/json" "fmt" "sync" "time" @@ -29,6 +30,7 @@ import ( "google.golang.org/grpc/internal/cache" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" ) // subBalancerWrapper is used to keep the configurations that will be used to start @@ -148,20 +150,6 @@ func (sbc *subBalancerWrapper) resolverError(err error) { b.ResolverError(err) } -func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) { - sbc.builder = builder - b := sbc.balancer - // Even if you get an add and it persists builder but doesn't start - // balancer, this would leave graceful switch being nil, in which we are - // correctly overwriting with the recent builder here as well to use later. - // The graceful switch balancer's presence is an invariant of whether the - // balancer group is closed or not (if closed, nil, if started, present). - if sbc.balancer != nil { - sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name()) - b.SwitchTo(sbc.builder) - } -} - func (sbc *subBalancerWrapper) stopBalancer() { if sbc.balancer == nil { return @@ -170,7 +158,8 @@ func (sbc *subBalancerWrapper) stopBalancer() { sbc.balancer = nil } -// BalancerGroup takes a list of balancers, and make them into one balancer. +// BalancerGroup takes a list of balancers, each behind a gracefulswitch +// balancer, and make them into one balancer. // // Note that this struct doesn't implement balancer.Balancer, because it's not // intended to be used directly as a balancer. It's expected to be used as a @@ -377,31 +366,6 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { bg.AddWithClientConn(id, builder.Name(), bg.cc) } -// UpdateBuilder updates the builder for a current child, starting the Graceful -// Switch process for that child. -// -// Returns a non-nil error if the provided builder is not registered. -func (bg *BalancerGroup) UpdateBuilder(id, builder string) error { - bldr := balancer.Get(builder) - if bldr == nil { - return fmt.Errorf("balancer builder %q not found", builder) - } - bg.outgoingMu.Lock() - defer bg.outgoingMu.Unlock() - // This does not deal with the balancer cache because this call should come - // after an Add call for a given child balancer. If the child is removed, - // the caller will call Add if the child balancer comes back which would - // then deal with the balancer cache. - sbc := bg.idToBalancerConfig[id] - if sbc == nil { - // simply ignore it if not present, don't error - return nil - } - sbc.gracefulSwitch(bldr) - return nil - -} - // Remove removes the balancer with id from the group. // // But doesn't close the balancer. The balancer is kept in a cache, and will be @@ -642,3 +606,14 @@ func (bg *BalancerGroup) ExitIdleOne(id string) { } bg.outgoingMu.Unlock() } + +// ParseConfig parses a child config list and returns a LB config for the +// gracefulswitch Balancer. +// +// cfg is expected to be a json.RawMessage containing a JSON array of LB policy +// names + configs as the format of the "loadBalancingConfig" field in +// ServiceConfig. It returns a type that should be passed to +// UpdateClientConnState in the BalancerConfig field. +func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return gracefulswitch.ParseConfig(cfg) +} diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index 4503b05d2e9d..b182ec8489e2 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -18,6 +18,7 @@ package balancergroup import ( "context" + "encoding/json" "fmt" "testing" "time" @@ -609,8 +610,15 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) { return bal.UpdateClientConnState(ccs) }, }) - bg.UpdateBuilder(testBalancerIDs[0], childPolicyName) - if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil { + cfgJSON := json.RawMessage(fmt.Sprintf(`[{%q: {}}]`, t.Name())) + lbCfg, err := ParseConfig(cfgJSON) + if err != nil { + t.Fatalf("ParseConfig(%s) failed: %v", string(cfgJSON), err) + } + if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}, + BalancerConfig: lbCfg, + }); err != nil { t.Fatalf("error updating ClientConn state: %v", err) } diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index 7e152e1b798e..1ab473b90f8c 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -25,6 +25,8 @@ import ( "time" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/balancergroup" internalgrpclog "google.golang.org/grpc/internal/grpclog" @@ -67,20 +69,28 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err } type bal struct { - logger *internalgrpclog.PrefixLogger - - // TODO: make this package not dependent on xds specific code. Same as for - // weighted target balancer. + logger *internalgrpclog.PrefixLogger bg *balancergroup.BalancerGroup stateAggregator *balancerStateAggregator children map[string]childConfig } -func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { +func (b *bal) setErrorPickerForChild(childName string, err error) { + b.logger.Warningf("%v", err) + b.stateAggregator.UpdateState(childName, balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: base.NewErrPicker(err), + }) +} + +func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) error { + // TODO: Get rid of handling hierarchy in addresses. This LB policy never + // gets addresses from the resolver. addressesSplit := hierarchy.Group(s.ResolverState.Addresses) - // Remove sub-pickers and sub-balancers that are not in the new cluster list. + // Remove sub-balancers that are not in the new list from the aggregator and + // balancergroup. for name := range b.children { if _, ok := newConfig.Children[name]; !ok { b.stateAggregator.remove(name) @@ -88,41 +98,78 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { } } - // For sub-balancers in the new cluster list, - // - add to balancer group if it's new, - // - forward the address/balancer config update. - for name, newT := range newConfig.Children { - if _, ok := b.children[name]; !ok { - // If this is a new sub-balancer, add it to the picker map. - b.stateAggregator.add(name) - // Then add to the balancer group. - b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name)) + var retErr error + for childName, childCfg := range newConfig.Children { + if _, ok := b.children[childName]; !ok { + // Add new sub-balancers to the aggregator and balancergroup. + b.stateAggregator.add(childName) + b.bg.Add(childName, balancer.Get(childCfg.ChildPolicy.Name)) } else { - // Already present, check for type change and if so send down a new - // builder. - if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name { - // Safe to ignore the returned error value because ParseConfig - // ensures that the child policy name is a registered LB policy. - b.bg.UpdateBuilder(name, newT.ChildPolicy.Name) + // If the child policy type has changed for existing sub-balancers, + // parse the new config and send down the config update to the + // balancergroup, which will take care of gracefully switching the + // child over to the new policy. + // + // If we run into errors here, we need to ensure that RPCs to this + // child fail, while RPCs to other children with good configs + // continue to succeed. + newPolicyName, oldPolicyName := childCfg.ChildPolicy.Name, b.children[childName].ChildPolicy.Name + if newPolicyName != oldPolicyName { + newCfg, err := childCfg.ChildPolicy.MarshalJSON() + if err != nil { + retErr = fmt.Errorf("failed to JSON marshal load balancing policy for child %q: %v", childName, err) + b.setErrorPickerForChild(childName, retErr) + continue + } + lbCfg, err := balancergroup.ParseConfig(newCfg) + if err != nil { + retErr = fmt.Errorf("failed to parse load balancing policy for child %q: %v", childName, err) + b.setErrorPickerForChild(childName, retErr) + continue + } + if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: addressesSplit[childName], + ServiceConfig: s.ResolverState.ServiceConfig, + Attributes: s.ResolverState.Attributes, + }, + BalancerConfig: lbCfg, + }); err != nil { + retErr = fmt.Errorf("failed to update load balancing policy for child %q from %q to %q: %v", childName, oldPolicyName, newPolicyName, err) + b.setErrorPickerForChild(childName, retErr) + continue + } // Picker update is sent to the parent ClientConn only after the // new child policy returns a picker. So, there is no need to // set needUpdateStateOnResume to true here. + continue } } - // TODO: handle error? How to aggregate errors and return? - _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{ + + // We get here for new sub-balancers and existing ones whose child + // policy type did not change and push the new configuration to them. + if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{ ResolverState: resolver.State{ - Addresses: addressesSplit[name], + Addresses: addressesSplit[childName], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, }, - BalancerConfig: newT.ChildPolicy.Config, - }) + BalancerConfig: childCfg.ChildPolicy.Config, + }); err != nil { + retErr = fmt.Errorf("failed to push new configuration %v to child %q", childCfg.ChildPolicy.Config, childName) + b.setErrorPickerForChild(childName, retErr) + } } b.children = newConfig.Children - // Adding, removing or updating a sub-balancer will result in the + // If multiple sub-balancers run into errors, we will return only the last + // one, which is still good enough, since the grpc channel will anyways + // return this error as balancer.ErrBadResolver to the name resolver, + // resulting in re-resolution attempts. + return retErr + + // Adding or removing a sub-balancer will result in the // needUpdateStateOnResume bit to true which results in a picker update once // resumeStateUpdates() is called. } @@ -132,12 +179,11 @@ func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error { if !ok { return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } - b.logger.Infof("update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState) + b.logger.Infof("Update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState) b.stateAggregator.pauseStateUpdates() defer b.stateAggregator.resumeStateUpdates() - b.updateChildren(s, newConfig) - return nil + return b.updateChildren(s, newConfig) } func (b *bal) ResolverError(err error) { diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index 48359f0931d2..f3ea58c2496d 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -73,18 +73,18 @@ func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC ba } } -func (s) TestClusterPicks(t *testing.T) { +func TestClusterPicks(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ -"children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } -} -}` + "children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } + } + }` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -152,18 +152,18 @@ func (s) TestClusterPicks(t *testing.T) { // TestConfigUpdateAddCluster covers the cases the balancer receives config // update with extra clusters. -func (s) TestConfigUpdateAddCluster(t *testing.T) { +func TestConfigUpdateAddCluster(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ -"children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } -} -}` + "children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } + } + }` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -231,12 +231,12 @@ func (s) TestConfigUpdateAddCluster(t *testing.T) { // A config update with different routes, and different actions. Expect a // new subconn and a picker update. configJSON2 := `{ -"children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_3":{ "childPolicy": [{"round_robin":""}] } -} -}` + "children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_3":{ "childPolicy": [{"round_robin":""}] } + } + }` config2, err := parser.ParseConfig([]byte(configJSON2)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -310,18 +310,18 @@ func (s) TestConfigUpdateAddCluster(t *testing.T) { // TestRoutingConfigUpdateDeleteAll covers the cases the balancer receives // config update with no clusters. Pick should fail with details in error. -func (s) TestRoutingConfigUpdateDeleteAll(t *testing.T) { +func TestRoutingConfigUpdateDeleteAll(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ -"children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } -} -}` + "children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } + } + }` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -471,7 +471,7 @@ func (s) TestRoutingConfigUpdateDeleteAll(t *testing.T) { } } -func (s) TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { +func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { const ( userAgent = "ua" defaultTestTimeout = 1 * time.Second @@ -502,10 +502,10 @@ func (s) TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { bal := builder.Build(cc, bOpts) configJSON1 := fmt.Sprintf(`{ -"children": { - "cds:cluster_1":{ "childPolicy": [{"%s":""}] } -} -}`, t.Name()) + "children": { + "cds:cluster_1":{ "childPolicy": [{"%s":""}] } + } + }`, t.Name()) config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -525,7 +525,7 @@ func (s) TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { } } -const initIdleBalancerName = "test-init-idle-balancer" +const initIdleBalancerName = "test-init-Idle-balancer" var errTestInitIdle = fmt.Errorf("init Idle balancer error 0") @@ -555,18 +555,18 @@ func init() { // TestInitialIdle covers the case that if the child reports Idle, the overall // state will be Idle. -func (s) TestInitialIdle(t *testing.T) { +func TestInitialIdle(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) bal := builder.Build(cc, balancer.BuildOptions{}) - configJSON := `{ -"children": { - "cds:cluster_1":{ "childPolicy": [{"test-init-idle-balancer":""}] } -} -}` - config, err := parser.ParseConfig([]byte(configJSON)) + configJSON1 := `{ + "children": { + "cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] } + } + }` + config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } @@ -579,7 +579,7 @@ func (s) TestInitialIdle(t *testing.T) { ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), }}, - BalancerConfig: config, + BalancerConfig: config1, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } @@ -591,8 +591,8 @@ func (s) TestInitialIdle(t *testing.T) { sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) } - if state := <-cc.NewStateCh; state != connectivity.Idle { - t.Fatalf("Received aggregated state: %v, want Idle", state) + if state1 := <-cc.NewStateCh; state1 != connectivity.Idle { + t.Fatalf("Received aggregated state: %v, want Idle", state1) } } @@ -602,17 +602,17 @@ func (s) TestInitialIdle(t *testing.T) { // switches this child to a pick first load balancer. Once that balancer updates // it's state and completes the graceful switch process the new picker should // reflect this change. -func (s) TestClusterGracefulSwitch(t *testing.T) { +func TestClusterGracefulSwitch(t *testing.T) { cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(balancerName) parser := builder.(balancer.ConfigParser) bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ -"children": { - "csp:cluster":{ "childPolicy": [{"round_robin":""}] } -} -}` + "children": { + "csp:cluster":{ "childPolicy": [{"round_robin":""}] } + } + }` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -651,10 +651,10 @@ func (s) TestClusterGracefulSwitch(t *testing.T) { }) // Same cluster, different balancer type. configJSON2 := fmt.Sprintf(`{ -"children": { - "csp:cluster":{ "childPolicy": [{"%s":""}] } -} -}`, childPolicyName) + "children": { + "csp:cluster":{ "childPolicy": [{"%s":""}] } + } + }`, childPolicyName) config2, err := parser.ParseConfig([]byte(configJSON2)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -732,10 +732,10 @@ func (s) TestUpdateStatePauses(t *testing.T) { bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ -"children": { - "cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] } -} -}` + "children": { + "cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] } + } + }` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) diff --git a/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go b/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go new file mode 100644 index 000000000000..4109502fa2b8 --- /dev/null +++ b/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go @@ -0,0 +1,332 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/structpb" + + v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3pickfirstpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/pick_first/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/xds" // Register the xDS name resolver and related LB policies. +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + +func makeEmptyCallRPCAndVerifyPeer(ctx context.Context, client testgrpc.TestServiceClient, wantPeer string) error { + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil { + return fmt.Errorf("EmptyCall() failed: %v", err) + } + if gotPeer := peer.Addr.String(); gotPeer != wantPeer { + return fmt.Errorf("EmptyCall() routed to %q, want to be routed to: %q", gotPeer, wantPeer) + } + return nil +} + +func makeUnaryCallRPCAndVerifyPeer(ctx context.Context, client testgrpc.TestServiceClient, wantPeer string) error { + peer := &peer.Peer{} + if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.Peer(peer)); err != nil { + return fmt.Errorf("UnaryCall() failed: %v", err) + } + if gotPeer := peer.Addr.String(); gotPeer != wantPeer { + return fmt.Errorf("EmptyCall() routed to %q, want to be routed to: %q", gotPeer, wantPeer) + } + return nil +} + +func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) { + // Spin up an xDS management server. + mgmtServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + + // Configure client side xDS resources on the management server. + const ( + serviceName = "my-service-client-side-xds" + routeConfigName = "route-" + serviceName + clusterName1 = "cluster1-" + serviceName + clusterName2 = "cluster2-" + serviceName + endpointsName1 = "endpoints1-" + serviceName + endpointsName2 = "endpoints2-" + serviceName + endpointsName3 = "endpoints3-" + serviceName + ) + // A single Listener resource pointing to the following Route + // configuration: + // - "/grpc.testing.TestService/EmptyCall" --> cluster1 + // - "/grpc.testing.TestService/UnaryCall" --> cluster2 + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)} + routes := []*v3routepb.RouteConfiguration{{ + Name: routeConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{serviceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName1}, + }}, + }, + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/UnaryCall"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName2}, + }}, + }, + }, + }}, + }} + // Two cluster resources corresponding to the ones mentioned in the above + // route configuration resource. These are configured with round_robin as + // their endpoint picking policy. + clusters := []*v3clusterpb.Cluster{ + e2e.DefaultCluster(clusterName1, endpointsName1, e2e.SecurityLevelNone), + e2e.DefaultCluster(clusterName2, endpointsName2, e2e.SecurityLevelNone), + } + // Spin up two test backends, one for each cluster below. + server1 := stubserver.StartTestService(t, nil) + defer server1.Stop() + server2 := stubserver.StartTestService(t, nil) + defer server2.Stop() + // Two endpoints resources, each with one backend from above. + endpoints := []*v3endpointpb.ClusterLoadAssignment{ + e2e.DefaultEndpoint(endpointsName1, "localhost", []uint32{testutils.ParsePort(t, server1.Address)}), + e2e.DefaultEndpoint(endpointsName2, "localhost", []uint32{testutils.ParsePort(t, server2.Address)}), + } + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: listeners, + Routes: routes, + Clusters: clusters, + Endpoints: endpoints, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Make an EmptyCall RPC and verify that it is routed to cluster1. + client := testgrpc.NewTestServiceClient(cc) + if err := makeEmptyCallRPCAndVerifyPeer(ctx, client, server1.Address); err != nil { + t.Fatal(err) + } + + // Make a UnaryCall RPC and verify that it is routed to cluster2. + if err := makeUnaryCallRPCAndVerifyPeer(ctx, client, server2.Address); err != nil { + t.Fatal(err) + } + + // Create a wrapped pickfirst LB policy. When the endpoint picking policy on + // the cluster resource is changed to pickfirst, this will allow us to + // verify that load balancing configuration is pushed to it. + pfBuilder := balancer.Get(grpc.PickFirstBalancerName) + internal.BalancerUnregister(pfBuilder.Name()) + + lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1) + stub.Register(pfBuilder.Name(), stub.BalancerFuncs{ + ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return pfBuilder.(balancer.ConfigParser).ParseConfig(lbCfg) + }, + Init: func(bd *stub.BalancerData) { + bd.Data = pfBuilder.Build(bd.ClientConn, bd.BuildOptions) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + select { + case lbCfgCh <- ccs.BalancerConfig: + default: + } + bal := bd.Data.(balancer.Balancer) + return bal.UpdateClientConnState(ccs) + }, + Close: func(bd *stub.BalancerData) { + bal := bd.Data.(balancer.Balancer) + bal.Close() + }, + }) + + // Send a config update that changes the child policy configuration for one + // of the clusters to pickfirst. The endpoints resource is also changed here + // to ensure that we can verify that the new child policy + cluster2 := &v3clusterpb.Cluster{ + Name: clusterName2, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: endpointsName3, + }, + LoadBalancingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: testutils.MarshalAny(t, &v3pickfirstpb.PickFirst{ + ShuffleAddressList: true, + }), + }, + }, + }, + }, + } + server3 := stubserver.StartTestService(t, nil) + defer server3.Stop() + endpoints3 := e2e.DefaultEndpoint(endpointsName3, "localhost", []uint32{testutils.ParsePort(t, server3.Address)}) + resources.Clusters[1] = cluster2 + resources.Endpoints = append(resources.Endpoints, endpoints3) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + select { + case <-ctx.Done(): + t.Fatalf("Timeout when waiting for configuration to be pushed to the new pickfirst child policy") + case <-lbCfgCh: + } + + // Ensure RPCs are still succeeding. + + // Make an EmptyCall RPC and verify that it is routed to cluster1. + if err := makeEmptyCallRPCAndVerifyPeer(ctx, client, server1.Address); err != nil { + t.Fatal(err) + } + + // Make a UnaryCall RPC and verify that it is routed to cluster2, and the + // new endpoints resource. + for ; ctx.Err() != nil; <-time.After(defaultTestShortTimeout) { + if err := makeUnaryCallRPCAndVerifyPeer(ctx, client, server3.Address); err != nil { + t.Log(err) + } + // If we get here, it means that the RPC was routed to endpoint3, and + // that means that the new pickfirst policy is active. + break + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to cluster2 to be routed to the new endpoints resource") + } + + // Send a config update that changes the child policy configuration for one + // of the clusters to an unsupported LB policy. This should result in + // failure of RPCs to that cluster. + cluster2 = &v3clusterpb.Cluster{ + Name: clusterName2, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: endpointsName3, + }, + LoadBalancingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + // The type not registered in gRPC Policy registry. + TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: "type.googleapis.com/myorg.ThisTypeDoesNotExist", + Value: &structpb.Struct{}, + }), + }, + }, + }, + }, + } + resources.Clusters[1] = cluster2 + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // At this point, RPCs to cluster1 should continue to succeed, while RPCs to + // cluster2 should start to fail. + + // Make an EmptyCall RPC and verify that it is routed to cluster1. + if err := makeEmptyCallRPCAndVerifyPeer(ctx, client, server1.Address); err != nil { + t.Fatal(err) + } + + // Make a UnaryCall RPC and verify that it starts to fail. + for ; ctx.Err() != nil; <-time.After(defaultTestShortTimeout) { + _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}) + if got, want := status.Code(err), codes.Unavailable; got != want { + t.Logf("UnaryCall() returned code: %v, want %v", got, want) + continue + } + // If we get here, it means that the RPC failed with UNAVAILABLE, and + // that should be due to the fact that the new cluster resource has an + // unsupported LB policy. + break + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to cluster2 to start failing") + } + + // Channel should still be READY. + if got, want := cc.GetState(), connectivity.Ready; got != want { + t.Fatalf("grpc.ClientConn in state %v, want %v", got, want) + } +} From 2bae25838aada32b0a21de5d3c0c033771d05a6f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 15 May 2024 15:20:55 +0000 Subject: [PATCH 4/5] make vet happy --- .../e2e_test/clustermanager_test.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go b/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go index 4109502fa2b8..e9fd3c389fef 100644 --- a/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/e2e_test/clustermanager_test.go @@ -257,12 +257,10 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) { // Make a UnaryCall RPC and verify that it is routed to cluster2, and the // new endpoints resource. for ; ctx.Err() != nil; <-time.After(defaultTestShortTimeout) { - if err := makeUnaryCallRPCAndVerifyPeer(ctx, client, server3.Address); err != nil { - t.Log(err) + if err := makeUnaryCallRPCAndVerifyPeer(ctx, client, server3.Address); err == nil { + break } - // If we get here, it means that the RPC was routed to endpoint3, and - // that means that the new pickfirst policy is active. - break + t.Log(err) } if ctx.Err() != nil { t.Fatal("Timeout when waiting for RPCs to cluster2 to be routed to the new endpoints resource") @@ -312,14 +310,11 @@ func (s) TestConfigUpdate_ChildPolicyChange(t *testing.T) { // Make a UnaryCall RPC and verify that it starts to fail. for ; ctx.Err() != nil; <-time.After(defaultTestShortTimeout) { _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}) - if got, want := status.Code(err), codes.Unavailable; got != want { - t.Logf("UnaryCall() returned code: %v, want %v", got, want) - continue + got := status.Code(err) + if got == codes.Unavailable { + break } - // If we get here, it means that the RPC failed with UNAVAILABLE, and - // that should be due to the fact that the new cluster resource has an - // unsupported LB policy. - break + t.Logf("UnaryCall() returned code: %v, want %v", got, codes.Unavailable) } if ctx.Err() != nil { t.Fatal("Timeout when waiting for RPCs to cluster2 to start failing") From e06d55cbfe86c879905f7cd7ce9f18000c01d940 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 15 May 2024 16:56:41 +0000 Subject: [PATCH 5/5] review comments --- .../balancer/clustermanager/clustermanager.go | 36 ++++---- .../clustermanager/clustermanager_test.go | 82 +++++++++---------- 2 files changed, 55 insertions(+), 63 deletions(-) diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index 1ab473b90f8c..e6d751ecbee4 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -77,7 +77,6 @@ type bal struct { } func (b *bal) setErrorPickerForChild(childName string, err error) { - b.logger.Warningf("%v", err) b.stateAggregator.UpdateState(childName, balancer.State{ ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(err), @@ -100,6 +99,7 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) er var retErr error for childName, childCfg := range newConfig.Children { + lbCfg := childCfg.ChildPolicy.Config if _, ok := b.children[childName]; !ok { // Add new sub-balancers to the aggregator and balancergroup. b.stateAggregator.add(childName) @@ -115,50 +115,42 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) er // continue to succeed. newPolicyName, oldPolicyName := childCfg.ChildPolicy.Name, b.children[childName].ChildPolicy.Name if newPolicyName != oldPolicyName { - newCfg, err := childCfg.ChildPolicy.MarshalJSON() + var err error + var cfgJSON []byte + cfgJSON, err = childCfg.ChildPolicy.MarshalJSON() if err != nil { retErr = fmt.Errorf("failed to JSON marshal load balancing policy for child %q: %v", childName, err) b.setErrorPickerForChild(childName, retErr) continue } - lbCfg, err := balancergroup.ParseConfig(newCfg) + // This overwrites lbCfg to be in the format expected by the + // gracefulswitch balancer. So, when this config is pushed to + // the child (below), it will result in a graceful switch to the + // new child policy. + lbCfg, err = balancergroup.ParseConfig(cfgJSON) if err != nil { retErr = fmt.Errorf("failed to parse load balancing policy for child %q: %v", childName, err) b.setErrorPickerForChild(childName, retErr) continue } - if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{ - ResolverState: resolver.State{ - Addresses: addressesSplit[childName], - ServiceConfig: s.ResolverState.ServiceConfig, - Attributes: s.ResolverState.Attributes, - }, - BalancerConfig: lbCfg, - }); err != nil { - retErr = fmt.Errorf("failed to update load balancing policy for child %q from %q to %q: %v", childName, oldPolicyName, newPolicyName, err) - b.setErrorPickerForChild(childName, retErr) - continue - } - // Picker update is sent to the parent ClientConn only after the - // new child policy returns a picker. So, there is no need to - // set needUpdateStateOnResume to true here. - continue } } - // We get here for new sub-balancers and existing ones whose child - // policy type did not change and push the new configuration to them. if err := b.bg.UpdateClientConnState(childName, balancer.ClientConnState{ ResolverState: resolver.State{ Addresses: addressesSplit[childName], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, }, - BalancerConfig: childCfg.ChildPolicy.Config, + BalancerConfig: lbCfg, }); err != nil { retErr = fmt.Errorf("failed to push new configuration %v to child %q", childCfg.ChildPolicy.Config, childName) b.setErrorPickerForChild(childName, retErr) } + + // Picker update is sent to the parent ClientConn only after the + // new child policy returns a picker. So, there is no need to + // set needUpdateStateOnResume to true here. } b.children = newConfig.Children diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index f3ea58c2496d..b998c1b35f29 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -80,11 +80,11 @@ func TestClusterPicks(t *testing.T) { bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ - "children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } - } - }` +"children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } +} +}` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -159,11 +159,11 @@ func TestConfigUpdateAddCluster(t *testing.T) { bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ - "children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } - } - }` +"children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } +} +}` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -231,12 +231,12 @@ func TestConfigUpdateAddCluster(t *testing.T) { // A config update with different routes, and different actions. Expect a // new subconn and a picker update. configJSON2 := `{ - "children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_3":{ "childPolicy": [{"round_robin":""}] } - } - }` +"children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_3":{ "childPolicy": [{"round_robin":""}] } +} +}` config2, err := parser.ParseConfig([]byte(configJSON2)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -317,11 +317,11 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) { bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ - "children": { - "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, - "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } - } - }` +"children": { + "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] }, + "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] } +} +}` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -502,10 +502,10 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { bal := builder.Build(cc, bOpts) configJSON1 := fmt.Sprintf(`{ - "children": { - "cds:cluster_1":{ "childPolicy": [{"%s":""}] } - } - }`, t.Name()) +"children": { + "cds:cluster_1":{ "childPolicy": [{"%s":""}] } +} +}`, t.Name()) config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -562,10 +562,10 @@ func TestInitialIdle(t *testing.T) { bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ - "children": { - "cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] } - } - }` +"children": { + "cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] } +} +}` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -609,10 +609,10 @@ func TestClusterGracefulSwitch(t *testing.T) { bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ - "children": { - "csp:cluster":{ "childPolicy": [{"round_robin":""}] } - } - }` +"children": { + "csp:cluster":{ "childPolicy": [{"round_robin":""}] } +} +}` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -651,10 +651,10 @@ func TestClusterGracefulSwitch(t *testing.T) { }) // Same cluster, different balancer type. configJSON2 := fmt.Sprintf(`{ - "children": { - "csp:cluster":{ "childPolicy": [{"%s":""}] } - } - }`, childPolicyName) +"children": { + "csp:cluster":{ "childPolicy": [{"%s":""}] } +} +}`, childPolicyName) config2, err := parser.ParseConfig([]byte(configJSON2)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) @@ -732,10 +732,10 @@ func (s) TestUpdateStatePauses(t *testing.T) { bal := builder.Build(cc, balancer.BuildOptions{}) configJSON1 := `{ - "children": { - "cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] } - } - }` +"children": { + "cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] } +} +}` config1, err := parser.ParseConfig([]byte(configJSON1)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err)