Skip to content

Commit

Permalink
service config: default service config
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuxuan committed Mar 13, 2019
1 parent d3f95b2 commit 24cc161
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 20 deletions.
39 changes: 29 additions & 10 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ type ClientConn struct {
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc ServiceConfig
scRaw string
scRaw *string
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
Expand Down Expand Up @@ -438,7 +438,8 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revisit this decision in the future.
cc.sc = sc
cc.scRaw = ""
s := ""
cc.scRaw = &s
cc.mu.Unlock()
case <-cc.ctx.Done():
return
Expand Down Expand Up @@ -762,11 +763,33 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
func (cc *ClientConn) handleServiceConfig(js string) error {
if cc.dopts.disableServiceConfig {
return nil
if cc.dopts.defaultServiceConfig == nil {
return nil
}
// apply default service config when there's resolver service config is disabled.
return cc.applyServiceConfig(cc.dopts.defaultServiceConfig, cc.dopts.defaultServiceConfigRaw)
}
if cc.scRaw == js {

return cc.applyServiceConfig(nil, js)
}

// Parse and apply the service config. If sc is passed as a non-nil pointer, which indicates we have
// a parsed service config, we will skip the parsing. It will also skip the whole processing if
// the new service config is the same as the old one.
func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig, js string) error {
if cc.scRaw != nil && *cc.scRaw == js {
return nil
}

// the config has not been parsed, so we parse it now.
if sc == nil {
scfg, err := parseServiceConfig(js)
if err != nil {
return err
}
sc = &scfg
}

if channelz.IsOn() {
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
Expand All @@ -775,10 +798,6 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
Severity: channelz.CtINFO,
})
}
sc, err := parseServiceConfig(js)
if err != nil {
return err
}
cc.mu.Lock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
Expand All @@ -787,8 +806,8 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
cc.mu.Unlock()
return nil
}
cc.scRaw = js
cc.sc = sc
cc.scRaw = &js
cc.sc = *sc

if sc.retryThrottling != nil {
newThrottler := &retryThrottler{
Expand Down
37 changes: 37 additions & 0 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,3 +1244,40 @@ func setMinConnectTimeout(newMin time.Duration) (cleanup func()) {
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
}
}

func (s) TestDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
addr := r.Scheme() + ":///non.existent"
r.InitialAddrs([]resolver.Address{{Addr: addr}})
js := `{
"methodConfig": [
{
"name": [
{
"service": "foo",
"method": "bar"
}
],
"waitForReady": true
}
]
}`
cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig(), WithDefaultServiceConfig(js))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
r.NewServiceConfig("")
var i int
for i = 0; i < 10; i++ {
mc := cc.GetMethodConfig("/foo/bar")
if mc.WaitForReady != nil && *mc.WaitForReady == true {
break
}
time.Sleep(100 * time.Millisecond)
}
if i == 10 {
t.Fatal("default service config failed to be applied after 1s")
}
}
32 changes: 25 additions & 7 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ type dialOptions struct {
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
reqHandshake envconfig.RequireHandshakeSetting
channelzParentID int64
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
healthCheckFunc internal.HealthChecker
resolverBuilder resolver.Builder
reqHandshake envconfig.RequireHandshakeSetting
channelzParentID int64
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
healthCheckFunc internal.HealthChecker
defaultServiceConfig *ServiceConfig
defaultServiceConfigRaw string
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -446,6 +448,22 @@ func WithDisableServiceConfig() DialOption {
})
}

// WithDefaultServiceConfig returns a DialOption that configures the default service config, which
// will be used in cases where:
// 1. WithDisableServiceConfig is called.
// 2. Resolver does not return service config or if the resolver gets and invalid config.
func WithDefaultServiceConfig(s string) DialOption {
return newFuncDialOption(func(o *dialOptions) {
sc, err := parseServiceConfig(s)
if err != nil {
grpclog.Warningf("the provided service config is invalid, err: %v", err)
return
}
o.defaultServiceConfig = &sc
o.defaultServiceConfigRaw = s
})
}

// WithDisableRetry returns a DialOption that disables retries, even if the
// service config enables them. This does not impact transparent retries, which
// will happen automatically if no data is written to the wire or if the RPC is
Expand Down
4 changes: 2 additions & 2 deletions resolver_conn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (ccr *ccResolverWrapper) close() {
close(ccr.done)
}

// NewAddress is called by the resolver implemenetion to send addresses to gRPC.
// NewAddress is called by the resolver implementation to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
select {
case <-ccr.done:
Expand All @@ -116,7 +116,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.cc.handleResolvedAddrs(addrs, nil)
}

// NewServiceConfig is called by the resolver implemenetion to send service
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
select {
Expand Down
2 changes: 1 addition & 1 deletion test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
{
"service": "grpc.testing.TestService",
"method": "EmptyCall"
},
}
],
"waitForReady": false,
"timeout": ".001s"
Expand Down

0 comments on commit 24cc161

Please sign in to comment.