Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancergroup: add a ParseConfig API and remove the UpdateBuilder API #7232

Merged
merged 5 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 15 additions & 34 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package balancergroup

import (
"encoding/json"
"fmt"
"sync"
"time"
Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

// subBalancerWrapper is used to keep the configurations that will be used to start
Expand Down Expand Up @@ -148,20 +150,6 @@ func (sbc *subBalancerWrapper) resolverError(err error) {
b.ResolverError(err)
}

func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) {
sbc.builder = builder
b := sbc.balancer
// Even if you get an add and it persists builder but doesn't start
// balancer, this would leave graceful switch being nil, in which we are
// correctly overwriting with the recent builder here as well to use later.
// The graceful switch balancer's presence is an invariant of whether the
// balancer group is closed or not (if closed, nil, if started, present).
if sbc.balancer != nil {
sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name())
b.SwitchTo(sbc.builder)
}
}

func (sbc *subBalancerWrapper) stopBalancer() {
if sbc.balancer == nil {
return
Expand All @@ -170,7 +158,8 @@ func (sbc *subBalancerWrapper) stopBalancer() {
sbc.balancer = nil
}

// BalancerGroup takes a list of balancers, and make them into one balancer.
// BalancerGroup takes a list of balancers, each behind a gracefulswitch
// balancer, and make them into one balancer.
//
// Note that this struct doesn't implement balancer.Balancer, because it's not
// intended to be used directly as a balancer. It's expected to be used as a
Expand Down Expand Up @@ -377,25 +366,6 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
bg.AddWithClientConn(id, builder.Name(), bg.cc)
}

// UpdateBuilder updates the builder for a current child, starting the Graceful
// Switch process for that child.
//
// TODO: update this API to take the name of the new builder instead.
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
bg.outgoingMu.Lock()
// This does not deal with the balancer cache because this call should come
// after an Add call for a given child balancer. If the child is removed,
// the caller will call Add if the child balancer comes back which would
// then deal with the balancer cache.
sbc := bg.idToBalancerConfig[id]
if sbc == nil {
// simply ignore it if not present, don't error
return
}
sbc.gracefulSwitch(builder)
bg.outgoingMu.Unlock()
}

// Remove removes the balancer with id from the group.
//
// But doesn't close the balancer. The balancer is kept in a cache, and will be
Expand Down Expand Up @@ -636,3 +606,14 @@ func (bg *BalancerGroup) ExitIdleOne(id string) {
}
bg.outgoingMu.Unlock()
}

// ParseConfig parses a child config list and returns a LB config for the
// gracefulswitch Balancer.
//
// cfg is expected to be a json.RawMessage containing a JSON array of LB policy
// names + configs as the format of the "loadBalancingConfig" field in
// ServiceConfig. It returns a type that should be passed to
// UpdateClientConnState in the BalancerConfig field.
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return gracefulswitch.ParseConfig(cfg)
}
13 changes: 10 additions & 3 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package balancergroup

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -609,9 +610,15 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
return bal.UpdateClientConnState(ccs)
},
})
builder := balancer.Get(childPolicyName)
bg.UpdateBuilder(testBalancerIDs[0], builder)
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
cfgJSON := json.RawMessage(fmt.Sprintf(`[{%q: {}}]`, t.Name()))
lbCfg, err := ParseConfig(cfgJSON)
if err != nil {
t.Fatalf("ParseConfig(%s) failed: %v", string(cfgJSON), err)
}
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{
ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]},
BalancerConfig: lbCfg,
}); err != nil {
t.Fatalf("error updating ClientConn state: %v", err)
}

Expand Down
126 changes: 31 additions & 95 deletions xds/internal/balancer/clustermanager/balancerstateaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,14 @@ func (s *subBalancerState) String() string {
type balancerStateAggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
csEval *balancer.ConnectivityStateEvaluator

mu sync.Mutex
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
// This field is used to ensure that no updates are forwarded to the parent
// CC once the aggregator is closed. A closed sub-balancer could still send
// pickers to this aggregator.
closed bool
// Map from child policy name to last reported state.
idToPickerState map[string]*subBalancerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
Expand All @@ -68,34 +65,24 @@ func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLo
return &balancerStateAggregator{
cc: cc,
logger: logger,
csEval: &balancer.ConnectivityStateEvaluator{},
idToPickerState: make(map[string]*subBalancerState),
}
}

// Start starts the aggregator. It can be called after Close to restart the
// aggretator.
func (bsa *balancerStateAggregator) start() {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.started = true
}

// Close closes the aggregator. When the aggregator is closed, it won't call
// parent ClientConn to update balancer state.
func (bsa *balancerStateAggregator) close() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.started = false
bsa.clearStates()
bsa.closed = true
}

// add adds a sub-balancer state with weight. It adds a place holder, and waits
// for the real sub-balancer to update state.
// add adds a sub-balancer in CONNECTING state.
//
// This is called when there's a new child.
func (bsa *balancerStateAggregator) add(id string) {
bsa.mu.Lock()
defer bsa.mu.Unlock()

bsa.idToPickerState[id] = &subBalancerState{
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
Expand All @@ -106,6 +93,8 @@ func (bsa *balancerStateAggregator) add(id string) {
},
stateToAggregate: connectivity.Connecting,
}
bsa.csEval.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
bsa.buildAndUpdateLocked()
}

// remove removes the sub-balancer state. Future updates from this sub-balancer,
Expand All @@ -118,9 +107,15 @@ func (bsa *balancerStateAggregator) remove(id string) {
if _, ok := bsa.idToPickerState[id]; !ok {
return
}
// Setting the state of the deleted sub-balancer to Shutdown will get
// csEvltr to remove the previous state for any aggregated state
// evaluations. Transitions to and from connectivity.Shutdown are ignored
// by csEvltr.
bsa.csEval.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(bsa.idToPickerState, id)
bsa.buildAndUpdateLocked()
}

// pauseStateUpdates causes UpdateState calls to not propagate to the parent
Expand All @@ -140,7 +135,7 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
defer bsa.mu.Unlock()
bsa.pauseUpdateState = false
if bsa.needUpdateStateOnResume {
bsa.cc.UpdateState(bsa.build())
bsa.cc.UpdateState(bsa.buildLocked())
}
}

Expand All @@ -149,6 +144,8 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
bsa.logger.Infof("State update from sub-balancer %q: %+v", id, state)

bsa.mu.Lock()
defer bsa.mu.Unlock()
pickerSt, ok := bsa.idToPickerState[id]
Expand All @@ -162,42 +159,17 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State)
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
bsa.csEval.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState)
pickerSt.stateToAggregate = state.ConnectivityState
}
pickerSt.state = state

if !bsa.started {
return
}
if bsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
}

// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold bsa.mu.
func (bsa *balancerStateAggregator) clearStates() {
for _, pState := range bsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
bsa.buildAndUpdateLocked()
}

// buildAndUpdate combines the sub-state from each sub-balancer into one state,
// and update it to parent ClientConn.
func (bsa *balancerStateAggregator) buildAndUpdate() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
if !bsa.started {
// buildAndUpdateLocked combines the sub-state from each sub-balancer into one
// state, and sends a picker update to the parent ClientConn.
func (bsa *balancerStateAggregator) buildAndUpdateLocked() {
if bsa.closed {
return
}
if bsa.pauseUpdateState {
Expand All @@ -206,55 +178,19 @@ func (bsa *balancerStateAggregator) buildAndUpdate() {
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
bsa.cc.UpdateState(bsa.buildLocked())
}

// build combines sub-states into one. The picker will do a child pick.
//
// Caller must hold bsa.mu.
func (bsa *balancerStateAggregator) build() balancer.State {
// TODO: the majority of this function (and UpdateState) is exactly the same
// as weighted_target's state aggregator. Try to make a general utility
// function/struct to handle the logic.
//
// One option: make a SubBalancerState that handles Update(State), including
// handling the special connecting after ready, as in UpdateState(). Then a
// function to calculate the aggregated connectivity state as in this
// function.
//
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
for _, ps := range bsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}

// buildLocked combines sub-states into one.
func (bsa *balancerStateAggregator) buildLocked() balancer.State {
// The picker's return error might not be consistent with the
// aggregatedState. Because for this LB policy, we want to always build
// picker with all sub-pickers (not only ready sub-pickers), so even if the
// overall state is Ready, pick for certain RPCs can behave like Connecting
// or TransientFailure.
bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState)
return balancer.State{
ConnectivityState: aggregatedState,
ConnectivityState: bsa.csEval.CurrentState(),
Picker: newPickerGroup(bsa.idToPickerState),
}
}
Loading
Loading