Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Mar 11, 2024
1 parent 7c37770 commit 4b1fd3f
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 197 deletions.
54 changes: 2 additions & 52 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package grpc
import (
"context"
"fmt"
"strings"
"sync"

"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -54,10 +53,9 @@ type ccBalancerWrapper struct {
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc

// The following fields are only accessed within the serializer or during
// The following field is only accessed within the serializer or during
// initialization.
curBalancerName string
balancer *gracefulswitch.Balancer
balancer *gracefulswitch.Balancer

// The following field is protected by mu. Caller must take cc.mu before
// taking mu.
Expand Down Expand Up @@ -120,54 +118,6 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {
})
}

// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
// LB policy identified by name.
//
// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
// first good update from the name resolver, it determines the LB policy to use
// and invokes the switchTo() method. Upon receipt of every subsequent update
// from the name resolver, it invokes this method.
//
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.buildLoadBalancingPolicy(name)
})
}

// buildLoadBalancingPolicy performs the following:
// - retrieve a balancer builder for the given name. Use the default LB
// policy, pick_first, if no LB policy with name is found in the registry.
// - instruct the gracefulswitch balancer to switch to the above builder. This
// will actually build the new balancer.
// - update the `curBalancerName` field
//
// Must be called from a serializer callback.
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name)
}

if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
}

// close initiates async shutdown of the wrapper. cc.mu must be held when
// calling this function. To determine the wrapper has finished shutting down,
// the channel should block on ccb.serializer.Done() without cc.mu held.
Expand Down
12 changes: 1 addition & 11 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
var emptyServiceConfig *ServiceConfig

func init() {
balancer.Register(pickfirstBuilder{})
cfg := parseServiceConfig("{}")
if cfg.Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
Expand Down Expand Up @@ -1090,17 +1091,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}

var newBalancerName string
if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) {
// No service config or no LB policy specified in config.
newBalancerName = PickFirstBalancerName
} else if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else { // cc.sc.LB != nil
newBalancerName = *cc.sc.LB
}
cc.balancerWrapper.switchTo(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down
94 changes: 88 additions & 6 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package gracefulswitch

import (
"encoding/json"
"errors"
"fmt"
"sync"
Expand All @@ -28,6 +29,7 @@ import (
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
Expand Down Expand Up @@ -69,6 +71,58 @@ type Balancer struct {
currentMu sync.Mutex
}

type lbConfig struct {
serviceconfig.LoadBalancingConfig

childBuilder balancer.Builder
childConfig serviceconfig.LoadBalancingConfig
}

// cfg is expected to be a json.RawMessage containing a JSON array of LB policy
// names + configs as the format of the "loadBalancingConfig" field in
// ServiceConfig. It returns a type that should be passed to
// UpdateClientConnState in the BalancerConfig field.
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var lbCfg []map[string]json.RawMessage
if err := json.Unmarshal(cfg, &lbCfg); err != nil {
return nil, err
}
for _, e := range lbCfg {
if len(e) != 1 {
return nil, fmt.Errorf("expected a JSON struct with one entry; received: %v", e)
}

var name string
var jsonCfg json.RawMessage
for name, jsonCfg = range e {
}

builder := balancer.Get(name)
if builder == nil {
// Skip unregistered balancer names.
continue
}

parser, ok := builder.(balancer.ConfigParser)
if !ok {
if string(jsonCfg) != "{}" {
return nil, fmt.Errorf("non-empty balancer configuration %q, but balancer %q does not implement ParseConfig", string(jsonCfg), name)
}
// This is a valid child with no config.
return &lbConfig{childBuilder: builder}, nil
}

cfg, err := parser.ParseConfig(jsonCfg)
if err != nil {
return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err)
}

return &lbConfig{childBuilder: builder, childConfig: cfg}, nil
}

return nil, fmt.Errorf("no supported policies found in config: %v", string(cfg))
}

// swap swaps out the current lb with the pending lb and updates the ClientConn.
// The caller must hold gsb.mu.
func (gsb *Balancer) swap() {
Expand All @@ -89,19 +143,34 @@ func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
return bw == gsb.balancerCurrent || bw == gsb.balancerPending
}

// SwitchTo initializes the graceful switch process, which completes based on
// Returns true if the pending balancer was built with builder or if there is no
// pending balancer and the current balancer was built with it.
// Caller must hold gsb.mu.
func (gsb *Balancer) isNewest(builder balancer.Builder) bool {
if gsb.balancerPending != nil {
return gsb.balancerPending.builder == builder
}
return gsb.balancerCurrent != nil && gsb.balancerCurrent.builder == builder
}

// switchTo initializes the graceful switch process, which completes based on
// connectivity state changes on the current/pending balancer. Thus, the switch
// process is not complete when this method returns. This method must be called
// synchronously alongside the rest of the balancer.Balancer methods this
// Graceful Switch Balancer implements.
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
func (gsb *Balancer) switchTo(builder balancer.Builder) error {
gsb.mu.Lock()
if gsb.closed {
gsb.mu.Unlock()
return errBalancerClosed
}
if gsb.isNewest(builder) {
// Do nothing; we are already using the balancer we are switching to.
return nil
}
bw := &balancerWrapper{
gsb: gsb,
gsb: gsb,
builder: builder,
lastState: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
Expand Down Expand Up @@ -151,16 +220,28 @@ func (gsb *Balancer) latestBalancer() *balancerWrapper {
return gsb.balancerCurrent
}

func (gsb *Balancer) isClosed() bool {
gsb.mu.Lock()
defer gsb.mu.Unlock()
return gsb.closed
}

// UpdateClientConnState forwards the update to the latest balancer created.
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
if gsb.isClosed() {
return errBalancerClosed
}
// The resolver data is only relevant to the most recent LB Policy.
bc, ok := state.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("received unexpected config type: %T", state.BalancerConfig)
}
gsb.switchTo(bc.childBuilder)
state.BalancerConfig = bc.childConfig
// Perform this call without gsb.mu to prevent deadlocks if the child calls
// back into the channel. The latest balancer can never be closed during a
// call from the channel, even without gsb.mu held.
balToUpdate := gsb.latestBalancer()
return balToUpdate.UpdateClientConnState(state)
}

Expand Down Expand Up @@ -263,6 +344,7 @@ type balancerWrapper struct {
balancer.Balancer
gsb *Balancer

builder balancer.Builder
lastState balancer.State
subconns map[balancer.SubConn]bool // subconns created by this balancer
}
Expand Down
97 changes: 0 additions & 97 deletions internal/serviceconfig/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,111 +20,14 @@
package serviceconfig

import (
"encoding/json"
"fmt"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
externalserviceconfig "google.golang.org/grpc/serviceconfig"
)

var logger = grpclog.Component("core")

// BalancerConfig wraps the name and config associated with one load balancing
// policy. It corresponds to a single entry of the loadBalancingConfig field
// from ServiceConfig.
//
// It implements the json.Unmarshaler interface.
//
// https://github.com/grpc/grpc-proto/blob/54713b1e8bc6ed2d4f25fb4dff527842150b91b2/grpc/service_config/service_config.proto#L247
type BalancerConfig struct {
Name string
Config externalserviceconfig.LoadBalancingConfig
}

type intermediateBalancerConfig []map[string]json.RawMessage

// MarshalJSON implements the json.Marshaler interface.
//
// It marshals the balancer and config into a length-1 slice
// ([]map[string]config).
func (bc *BalancerConfig) MarshalJSON() ([]byte, error) {
if bc.Config == nil {
// If config is nil, return empty config `{}`.
return []byte(fmt.Sprintf(`[{%q: %v}]`, bc.Name, "{}")), nil
}
c, err := json.Marshal(bc.Config)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf(`[{%q: %s}]`, bc.Name, c)), nil
}

// UnmarshalJSON implements the json.Unmarshaler interface.
//
// ServiceConfig contains a list of loadBalancingConfigs, each with a name and
// config. This method iterates through that list in order, and stops at the
// first policy that is supported.
// - If the config for the first supported policy is invalid, the whole service
// config is invalid.
// - If the list doesn't contain any supported policy, the whole service config
// is invalid.
func (bc *BalancerConfig) UnmarshalJSON(b []byte) error {
var ir intermediateBalancerConfig
err := json.Unmarshal(b, &ir)
if err != nil {
return err
}

var names []string
for i, lbcfg := range ir {
if len(lbcfg) != 1 {
return fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
}

var (
name string
jsonCfg json.RawMessage
)
// Get the key:value pair from the map. We have already made sure that
// the map contains a single entry.
for name, jsonCfg = range lbcfg {
}

names = append(names, name)
builder := balancer.Get(name)
if builder == nil {
// If the balancer is not registered, move on to the next config.
// This is not an error.
continue
}
bc.Name = name

parser, ok := builder.(balancer.ConfigParser)
if !ok {
if string(jsonCfg) != "{}" {
logger.Warningf("non-empty balancer configuration %q, but balancer does not implement ParseConfig", string(jsonCfg))
}
// Stop at this, though the builder doesn't support parsing config.
return nil
}

cfg, err := parser.ParseConfig(jsonCfg)
if err != nil {
return fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)
}
bc.Config = cfg
return nil
}
// This is reached when the for loop iterates over all entries, but didn't
// return. This means we had a loadBalancingConfig slice but did not
// encounter a registered policy. The config is considered invalid in this
// case.
return fmt.Errorf("invalid loadBalancingConfig: no supported policies found in %v", names)
}

// MethodConfig defines the configuration recommended by the service providers for a
// particular method.
type MethodConfig struct {
Expand Down
12 changes: 2 additions & 10 deletions pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@ const (
logPrefix = "[pick-first-lb %p] "
)

func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
}

type pickfirstBuilder struct{}

func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{cc: cc}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
}

func (*pickfirstBuilder) Name() string {
func (pickfirstBuilder) Name() string {
return PickFirstBalancerName
}

Expand Down Expand Up @@ -243,7 +239,3 @@ func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

func init() {
balancer.Register(newPickfirstBuilder())
}
Loading

0 comments on commit 4b1fd3f

Please sign in to comment.