From 4b1fd3f6e4e06147419a9a1aad6c92b69b2215f7 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Mon, 11 Mar 2024 08:53:10 -0700 Subject: [PATCH] . --- balancer_wrapper.go | 54 +---------- clientconn.go | 12 +-- .../balancer/gracefulswitch/gracefulswitch.go | 94 ++++++++++++++++-- internal/serviceconfig/serviceconfig.go | 97 ------------------- pickfirst.go | 12 +-- service_config.go | 27 ++++-- service_config_test.go | 33 ++++--- 7 files changed, 132 insertions(+), 197 deletions(-) diff --git a/balancer_wrapper.go b/balancer_wrapper.go index b5e30cff0215..79318d08dfc6 100644 --- a/balancer_wrapper.go +++ b/balancer_wrapper.go @@ -21,7 +21,6 @@ package grpc import ( "context" "fmt" - "strings" "sync" "google.golang.org/grpc/balancer" @@ -54,10 +53,9 @@ type ccBalancerWrapper struct { serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc - // The following fields are only accessed within the serializer or during + // The following field is only accessed within the serializer or during // initialization. - curBalancerName string - balancer *gracefulswitch.Balancer + balancer *gracefulswitch.Balancer // The following field is protected by mu. Caller must take cc.mu before // taking mu. @@ -120,54 +118,6 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { }) } -// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the -// LB policy identified by name. -// -// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the -// first good update from the name resolver, it determines the LB policy to use -// and invokes the switchTo() method. Upon receipt of every subsequent update -// from the name resolver, it invokes this method. -// -// the ccBalancerWrapper keeps track of the current LB policy name, and skips -// the graceful balancer switching process if the name does not change. -func (ccb *ccBalancerWrapper) switchTo(name string) { - ccb.serializer.Schedule(func(ctx context.Context) { - if ctx.Err() != nil || ccb.balancer == nil { - return - } - // TODO: Other languages use case-sensitive balancer registries. We should - // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. - if strings.EqualFold(ccb.curBalancerName, name) { - return - } - ccb.buildLoadBalancingPolicy(name) - }) -} - -// buildLoadBalancingPolicy performs the following: -// - retrieve a balancer builder for the given name. Use the default LB -// policy, pick_first, if no LB policy with name is found in the registry. -// - instruct the gracefulswitch balancer to switch to the above builder. This -// will actually build the new balancer. -// - update the `curBalancerName` field -// -// Must be called from a serializer callback. -func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { - builder := balancer.Get(name) - if builder == nil { - channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) - builder = newPickfirstBuilder() - } else { - channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) - } - - if err := ccb.balancer.SwitchTo(builder); err != nil { - channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) - return - } - ccb.curBalancerName = builder.Name() -} - // close initiates async shutdown of the wrapper. cc.mu must be held when // calling this function. To determine the wrapper has finished shutting down, // the channel should block on ccb.serializer.Done() without cc.mu held. diff --git a/clientconn.go b/clientconn.go index 85ec9c119209..5963be3b1189 100644 --- a/clientconn.go +++ b/clientconn.go @@ -692,6 +692,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { var emptyServiceConfig *ServiceConfig func init() { + balancer.Register(pickfirstBuilder{}) cfg := parseServiceConfig("{}") if cfg.Err != nil { panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) @@ -1090,17 +1091,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel } else { cc.retryThrottler.Store((*retryThrottler)(nil)) } - - var newBalancerName string - if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) { - // No service config or no LB policy specified in config. - newBalancerName = PickFirstBalancerName - } else if cc.sc.lbConfig != nil { - newBalancerName = cc.sc.lbConfig.name - } else { // cc.sc.LB != nil - newBalancerName = *cc.sc.LB - } - cc.balancerWrapper.switchTo(newBalancerName) } func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { diff --git a/internal/balancer/gracefulswitch/gracefulswitch.go b/internal/balancer/gracefulswitch/gracefulswitch.go index 3c594e6e4e55..fe28c7426f05 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/internal/balancer/gracefulswitch/gracefulswitch.go @@ -20,6 +20,7 @@ package gracefulswitch import ( + "encoding/json" "errors" "fmt" "sync" @@ -28,6 +29,7 @@ import ( "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" ) var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed") @@ -69,6 +71,58 @@ type Balancer struct { currentMu sync.Mutex } +type lbConfig struct { + serviceconfig.LoadBalancingConfig + + childBuilder balancer.Builder + childConfig serviceconfig.LoadBalancingConfig +} + +// cfg is expected to be a json.RawMessage containing a JSON array of LB policy +// names + configs as the format of the "loadBalancingConfig" field in +// ServiceConfig. It returns a type that should be passed to +// UpdateClientConnState in the BalancerConfig field. +func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + var lbCfg []map[string]json.RawMessage + if err := json.Unmarshal(cfg, &lbCfg); err != nil { + return nil, err + } + for _, e := range lbCfg { + if len(e) != 1 { + return nil, fmt.Errorf("expected a JSON struct with one entry; received: %v", e) + } + + var name string + var jsonCfg json.RawMessage + for name, jsonCfg = range e { + } + + builder := balancer.Get(name) + if builder == nil { + // Skip unregistered balancer names. + continue + } + + parser, ok := builder.(balancer.ConfigParser) + if !ok { + if string(jsonCfg) != "{}" { + return nil, fmt.Errorf("non-empty balancer configuration %q, but balancer %q does not implement ParseConfig", string(jsonCfg), name) + } + // This is a valid child with no config. + return &lbConfig{childBuilder: builder}, nil + } + + cfg, err := parser.ParseConfig(jsonCfg) + if err != nil { + return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err) + } + + return &lbConfig{childBuilder: builder, childConfig: cfg}, nil + } + + return nil, fmt.Errorf("no supported policies found in config: %v", string(cfg)) +} + // swap swaps out the current lb with the pending lb and updates the ClientConn. // The caller must hold gsb.mu. func (gsb *Balancer) swap() { @@ -89,19 +143,34 @@ func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool { return bw == gsb.balancerCurrent || bw == gsb.balancerPending } -// SwitchTo initializes the graceful switch process, which completes based on +// Returns true if the pending balancer was built with builder or if there is no +// pending balancer and the current balancer was built with it. +// Caller must hold gsb.mu. +func (gsb *Balancer) isNewest(builder balancer.Builder) bool { + if gsb.balancerPending != nil { + return gsb.balancerPending.builder == builder + } + return gsb.balancerCurrent != nil && gsb.balancerCurrent.builder == builder +} + +// switchTo initializes the graceful switch process, which completes based on // connectivity state changes on the current/pending balancer. Thus, the switch // process is not complete when this method returns. This method must be called // synchronously alongside the rest of the balancer.Balancer methods this // Graceful Switch Balancer implements. -func (gsb *Balancer) SwitchTo(builder balancer.Builder) error { +func (gsb *Balancer) switchTo(builder balancer.Builder) error { gsb.mu.Lock() if gsb.closed { gsb.mu.Unlock() return errBalancerClosed } + if gsb.isNewest(builder) { + // Do nothing; we are already using the balancer we are switching to. + return nil + } bw := &balancerWrapper{ - gsb: gsb, + gsb: gsb, + builder: builder, lastState: balancer.State{ ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), @@ -151,16 +220,28 @@ func (gsb *Balancer) latestBalancer() *balancerWrapper { return gsb.balancerCurrent } +func (gsb *Balancer) isClosed() bool { + gsb.mu.Lock() + defer gsb.mu.Unlock() + return gsb.closed +} + // UpdateClientConnState forwards the update to the latest balancer created. func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error { - // The resolver data is only relevant to the most recent LB Policy. - balToUpdate := gsb.latestBalancer() - if balToUpdate == nil { + if gsb.isClosed() { return errBalancerClosed } + // The resolver data is only relevant to the most recent LB Policy. + bc, ok := state.BalancerConfig.(*lbConfig) + if !ok { + return fmt.Errorf("received unexpected config type: %T", state.BalancerConfig) + } + gsb.switchTo(bc.childBuilder) + state.BalancerConfig = bc.childConfig // Perform this call without gsb.mu to prevent deadlocks if the child calls // back into the channel. The latest balancer can never be closed during a // call from the channel, even without gsb.mu held. + balToUpdate := gsb.latestBalancer() return balToUpdate.UpdateClientConnState(state) } @@ -263,6 +344,7 @@ type balancerWrapper struct { balancer.Balancer gsb *Balancer + builder balancer.Builder lastState balancer.State subconns map[balancer.SubConn]bool // subconns created by this balancer } diff --git a/internal/serviceconfig/serviceconfig.go b/internal/serviceconfig/serviceconfig.go index 51e733e495a3..4c3e1bb72005 100644 --- a/internal/serviceconfig/serviceconfig.go +++ b/internal/serviceconfig/serviceconfig.go @@ -20,111 +20,14 @@ package serviceconfig import ( - "encoding/json" - "fmt" "time" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" - externalserviceconfig "google.golang.org/grpc/serviceconfig" ) var logger = grpclog.Component("core") -// BalancerConfig wraps the name and config associated with one load balancing -// policy. It corresponds to a single entry of the loadBalancingConfig field -// from ServiceConfig. -// -// It implements the json.Unmarshaler interface. -// -// https://github.com/grpc/grpc-proto/blob/54713b1e8bc6ed2d4f25fb4dff527842150b91b2/grpc/service_config/service_config.proto#L247 -type BalancerConfig struct { - Name string - Config externalserviceconfig.LoadBalancingConfig -} - -type intermediateBalancerConfig []map[string]json.RawMessage - -// MarshalJSON implements the json.Marshaler interface. -// -// It marshals the balancer and config into a length-1 slice -// ([]map[string]config). -func (bc *BalancerConfig) MarshalJSON() ([]byte, error) { - if bc.Config == nil { - // If config is nil, return empty config `{}`. - return []byte(fmt.Sprintf(`[{%q: %v}]`, bc.Name, "{}")), nil - } - c, err := json.Marshal(bc.Config) - if err != nil { - return nil, err - } - return []byte(fmt.Sprintf(`[{%q: %s}]`, bc.Name, c)), nil -} - -// UnmarshalJSON implements the json.Unmarshaler interface. -// -// ServiceConfig contains a list of loadBalancingConfigs, each with a name and -// config. This method iterates through that list in order, and stops at the -// first policy that is supported. -// - If the config for the first supported policy is invalid, the whole service -// config is invalid. -// - If the list doesn't contain any supported policy, the whole service config -// is invalid. -func (bc *BalancerConfig) UnmarshalJSON(b []byte) error { - var ir intermediateBalancerConfig - err := json.Unmarshal(b, &ir) - if err != nil { - return err - } - - var names []string - for i, lbcfg := range ir { - if len(lbcfg) != 1 { - return fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg) - } - - var ( - name string - jsonCfg json.RawMessage - ) - // Get the key:value pair from the map. We have already made sure that - // the map contains a single entry. - for name, jsonCfg = range lbcfg { - } - - names = append(names, name) - builder := balancer.Get(name) - if builder == nil { - // If the balancer is not registered, move on to the next config. - // This is not an error. - continue - } - bc.Name = name - - parser, ok := builder.(balancer.ConfigParser) - if !ok { - if string(jsonCfg) != "{}" { - logger.Warningf("non-empty balancer configuration %q, but balancer does not implement ParseConfig", string(jsonCfg)) - } - // Stop at this, though the builder doesn't support parsing config. - return nil - } - - cfg, err := parser.ParseConfig(jsonCfg) - if err != nil { - return fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err) - } - bc.Config = cfg - return nil - } - // This is reached when the for loop iterates over all entries, but didn't - // return. This means we had a loadBalancingConfig slice but did not - // encounter a registered policy. The config is considered invalid in this - // case. - return fmt.Errorf("invalid loadBalancingConfig: no supported policies found in %v", names) -} - // MethodConfig defines the configuration recommended by the service providers for a // particular method. type MethodConfig struct { diff --git a/pickfirst.go b/pickfirst.go index 5128f9364dd1..ac1ff92e7618 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -38,19 +38,15 @@ const ( logPrefix = "[pick-first-lb %p] " ) -func newPickfirstBuilder() balancer.Builder { - return &pickfirstBuilder{} -} - type pickfirstBuilder struct{} -func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { +func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { b := &pickfirstBalancer{cc: cc} b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) return b } -func (*pickfirstBuilder) Name() string { +func (pickfirstBuilder) Name() string { return PickFirstBalancerName } @@ -243,7 +239,3 @@ func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { i.subConn.Connect() return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } - -func init() { - balancer.Register(newPickfirstBuilder()) -} diff --git a/service_config.go b/service_config.go index 0df11fc09882..1aa9a99e21ee 100644 --- a/service_config.go +++ b/service_config.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/gracefulswitch" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/serviceconfig" ) @@ -42,8 +43,7 @@ const maxInt = int(^uint(0) >> 1) type MethodConfig = internalserviceconfig.MethodConfig type lbConfig struct { - name string - cfg serviceconfig.LoadBalancingConfig + cfg serviceconfig.LoadBalancingConfig // always a gracefulswitch lbConfig } // ServiceConfig is provided by the service provider and contains parameters for how @@ -164,7 +164,7 @@ type jsonMC struct { // TODO(lyuxuan): delete this struct after cleaning up old service config implementation. type jsonSC struct { LoadBalancingPolicy *string - LoadBalancingConfig *internalserviceconfig.BalancerConfig + LoadBalancingConfig *json.RawMessage MethodConfig *[]jsonMC RetryThrottling *retryThrottlingPolicy HealthCheckConfig *healthCheckConfig @@ -190,12 +190,25 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult { healthCheckConfig: rsc.HealthCheckConfig, rawJSONString: js, } - if c := rsc.LoadBalancingConfig; c != nil { - sc.lbConfig = &lbConfig{ - name: c.Name, - cfg: c.Config, + c := rsc.LoadBalancingConfig + if c == nil { + name := PickFirstBalancerName + if sc.LB != nil { + name = *sc.LB } + cfg := []map[string]any{{name: struct{}{}}} + strCfg, err := json.Marshal(cfg) + if err != nil { + return &serviceconfig.ParseResult{Err: fmt.Errorf("unexpected error marshaling simple LB config: %w", err)} + } + r := json.RawMessage(strCfg) + c = &r + } + cfg, err := gracefulswitch.ParseConfig(*c) + if err != nil { + return &serviceconfig.ParseResult{Err: err} } + sc.lbConfig = &lbConfig{cfg: cfg} if rsc.MethodConfig == nil { return &serviceconfig.ParseResult{Config: &sc} diff --git a/service_config_test.go b/service_config_test.go index 90ed40a68021..8dcac366db6a 100644 --- a/service_config_test.go +++ b/service_config_test.go @@ -43,6 +43,9 @@ func runParseTests(t *testing.T, testCases []parseTestCase) { if !c.wantErr { c.wantSC.rawJSONString = c.scjs } + if sc != nil { + sc.lbConfig = nil + } if c.wantErr != (scpr.Err != nil) || !reflect.DeepEqual(sc, c.wantSC) { t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, scpr.Err, c.wantSC, c.wantErr) } @@ -77,22 +80,24 @@ func init() { balancer.Register(parseBalancerBuilder{}) } -func (s) TestParseLBConfig(t *testing.T) { - testcases := []parseTestCase{ - { - `{ - "loadBalancingConfig": [{"pbb": { "foo": "hi" } }] -}`, - &ServiceConfig{ - Methods: make(map[string]MethodConfig), - lbConfig: &lbConfig{name: "pbb", cfg: pbbData{Foo: "hi"}}, +/* + func (s) TestParseLBConfig(t *testing.T) { + testcases := []parseTestCase{ + { + `{ + "loadBalancingConfig": [{"pbb": { "foo": "hi" } }] + }`, + + &ServiceConfig{ + Methods: make(map[string]MethodConfig), + lbConfig: &lbConfig{cfg: pbbData{Foo: "hi"}}, + }, + false, }, - false, - }, + } + runParseTests(t, testcases) } - runParseTests(t, testcases) -} - +*/ func (s) TestParseNoLBConfigSupported(t *testing.T) { // We have a loadBalancingConfig field but will not encounter a supported // policy. The config will be considered invalid in this case.