Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancergroup: add a ParseConfig API and remove the UpdateBuilder API #7232

Merged
merged 5 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,20 +380,26 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
// UpdateBuilder updates the builder for a current child, starting the Graceful
// Switch process for that child.
//
// TODO: update this API to take the name of the new builder instead.
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
// Returns a non-nil error if the provided builder is not registered.
func (bg *BalancerGroup) UpdateBuilder(id, builder string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You modified this to return an error, but don't check it at the call sites. Even though those used to panic, the danger is that a "normal" error will be returned from here in the future, and so not checking it is problematic.

We should either panic if the builder isn't registered (as the previous behavior) or check the error at the call sites.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the UpdateBuilder call and replaced with a ParseConfig similar to the gracefulswitch balancer as discussed offline.

bldr := balancer.Get(builder)
if bldr == nil {
return fmt.Errorf("balancer builder %q not found", builder)
}
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
// This does not deal with the balancer cache because this call should come
// after an Add call for a given child balancer. If the child is removed,
// the caller will call Add if the child balancer comes back which would
// then deal with the balancer cache.
sbc := bg.idToBalancerConfig[id]
if sbc == nil {
// simply ignore it if not present, don't error
return
return nil
}
sbc.gracefulSwitch(builder)
bg.outgoingMu.Unlock()
sbc.gracefulSwitch(bldr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be called without the lock -- are you sure this is OK to call with the lock held?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was called with the lock held earlier as well. Just that in the if sbc == nil block, we were returning without releasing the lock. With the defer now, we should release it from code paths.

return nil

}

// Remove removes the balancer with id from the group.
Expand Down
3 changes: 1 addition & 2 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,7 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
return bal.UpdateClientConnState(ccs)
},
})
builder := balancer.Get(childPolicyName)
bg.UpdateBuilder(testBalancerIDs[0], builder)
bg.UpdateBuilder(testBalancerIDs[0], childPolicyName)
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
t.Fatalf("error updating ClientConn state: %v", err)
}
Expand Down
130 changes: 33 additions & 97 deletions xds/internal/balancer/clustermanager/balancerstateaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,16 @@ func (s *subBalancerState) String() string {
}

type balancerStateAggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
cc balancer.ClientConn
logger *grpclog.PrefixLogger
csEvltr *balancer.ConnectivityStateEvaluator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional, nit: how about csEval instead? I find it much easier to say in my head.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


mu sync.Mutex
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
// This field is used to ensure that no updates are forwarded to the parent
// CC once the aggregator is closed. A closed sub-balancer could still send
// pickers to this aggregator.
closed bool
// Map from child policy name to last reported state.
idToPickerState map[string]*subBalancerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
Expand All @@ -68,34 +65,24 @@ func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLo
return &balancerStateAggregator{
cc: cc,
logger: logger,
csEvltr: &balancer.ConnectivityStateEvaluator{},
idToPickerState: make(map[string]*subBalancerState),
}
}

// Start starts the aggregator. It can be called after Close to restart the
// aggretator.
func (bsa *balancerStateAggregator) start() {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.started = true
}

// Close closes the aggregator. When the aggregator is closed, it won't call
// parent ClientConn to update balancer state.
func (bsa *balancerStateAggregator) close() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.started = false
bsa.clearStates()
bsa.closed = true
}

// add adds a sub-balancer state with weight. It adds a place holder, and waits
// for the real sub-balancer to update state.
// add adds a sub-balancer in CONNECTING state.
//
// This is called when there's a new child.
func (bsa *balancerStateAggregator) add(id string) {
bsa.mu.Lock()
defer bsa.mu.Unlock()

bsa.idToPickerState[id] = &subBalancerState{
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
Expand All @@ -106,6 +93,8 @@ func (bsa *balancerStateAggregator) add(id string) {
},
stateToAggregate: connectivity.Connecting,
}
bsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
bsa.buildAndUpdateLocked()
}

// remove removes the sub-balancer state. Future updates from this sub-balancer,
Expand All @@ -118,9 +107,15 @@ func (bsa *balancerStateAggregator) remove(id string) {
if _, ok := bsa.idToPickerState[id]; !ok {
return
}
// Setting the state of the deleted sub-balancer to Shutdown will get
// csEvltr to remove the previous state for any aggregated state
// evaluations. Transitions to and from connectivity.Shutdown are ignored
// by csEvltr.
bsa.csEvltr.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(bsa.idToPickerState, id)
bsa.buildAndUpdateLocked()
}

// pauseStateUpdates causes UpdateState calls to not propagate to the parent
Expand All @@ -140,7 +135,7 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
defer bsa.mu.Unlock()
bsa.pauseUpdateState = false
if bsa.needUpdateStateOnResume {
bsa.cc.UpdateState(bsa.build())
bsa.cc.UpdateState(bsa.buildLocked())
}
}

Expand All @@ -149,6 +144,8 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
bsa.logger.Infof("State update from sub-balancer %q: %+v", id, state)

bsa.mu.Lock()
defer bsa.mu.Unlock()
pickerSt, ok := bsa.idToPickerState[id]
Expand All @@ -162,42 +159,17 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State)
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
bsa.csEvltr.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState)
pickerSt.stateToAggregate = state.ConnectivityState
}
pickerSt.state = state

if !bsa.started {
return
}
if bsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
}

// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold bsa.mu.
func (bsa *balancerStateAggregator) clearStates() {
for _, pState := range bsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
bsa.buildAndUpdateLocked()
}

// buildAndUpdate combines the sub-state from each sub-balancer into one state,
// and update it to parent ClientConn.
func (bsa *balancerStateAggregator) buildAndUpdate() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
if !bsa.started {
// buildAndUpdateLocked combines the sub-state from each sub-balancer into one
// state, and sends a picker update to the parent ClientConn.
func (bsa *balancerStateAggregator) buildAndUpdateLocked() {
if bsa.closed {
return
}
if bsa.pauseUpdateState {
Expand All @@ -206,55 +178,19 @@ func (bsa *balancerStateAggregator) buildAndUpdate() {
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
bsa.cc.UpdateState(bsa.buildLocked())
}

// build combines sub-states into one. The picker will do a child pick.
//
// Caller must hold bsa.mu.
func (bsa *balancerStateAggregator) build() balancer.State {
// TODO: the majority of this function (and UpdateState) is exactly the same
// as weighted_target's state aggregator. Try to make a general utility
// function/struct to handle the logic.
//
// One option: make a SubBalancerState that handles Update(State), including
// handling the special connecting after ready, as in UpdateState(). Then a
// function to calculate the aggregated connectivity state as in this
// function.
//
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
for _, ps := range bsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}

// buildLocked combines sub-states into one.
func (bsa *balancerStateAggregator) buildLocked() balancer.State {
// The picker's return error might not be consistent with the
// aggregatedState. Because for this LB policy, we want to always build
// picker with all sub-pickers (not only ready sub-pickers), so even if the
// overall state is Ready, pick for certain RPCs can behave like Connecting
// or TransientFailure.
bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState)
return balancer.State{
ConnectivityState: aggregatedState,
ConnectivityState: bsa.csEvltr.CurrentState(),
Picker: newPickerGroup(bsa.idToPickerState),
}
}
20 changes: 12 additions & 8 deletions xds/internal/balancer/clustermanager/clustermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
b := &bal{}
b.logger = prefixLogger(b)
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
b.stateAggregator.start()
b.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
Expand Down Expand Up @@ -79,15 +78,13 @@ type bal struct {
}

func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) {
update := false
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)

// Remove sub-pickers and sub-balancers that are not in the new cluster list.
for name := range b.children {
if _, ok := newConfig.Children[name]; !ok {
b.stateAggregator.remove(name)
b.bg.Remove(name)
update = true
}
}

Expand All @@ -101,9 +98,15 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) {
// Then add to the balancer group.
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
} else {
// Already present, check for type change and if so send down a new builder.
// Already present, check for type change and if so send down a new
// builder.
if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name {
b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name))
// Safe to ignore the returned error value because ParseConfig
// ensures that the child policy name is a registered LB policy.
b.bg.UpdateBuilder(name, newT.ChildPolicy.Name)
// Picker update is sent to the parent ClientConn only after the
// new child policy returns a picker. So, there is no need to
// set needUpdateStateOnResume to true here.
}
}
// TODO: handle error? How to aggregate errors and return?
Expand All @@ -118,9 +121,10 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) {
}

b.children = newConfig.Children
if update {
b.stateAggregator.buildAndUpdate()
}

// Adding, removing or updating a sub-balancer will result in the
// needUpdateStateOnResume bit to true which results in a picker update once
// resumeStateUpdates() is called.
}

func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error {
Expand Down
Loading
Loading