Skip to content

Commit

Permalink
client: simplify initialization and cleanup a bit (grpc#6798)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Dec 7, 2023
1 parent 2afc15a commit 4e8bdb7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 70 deletions.
23 changes: 12 additions & 11 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,14 @@ type ccBalancerWrapper struct {
mode ccbMode // Tracks the current mode of the wrapper.
}

// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
// is not created until the switchTo() method is invoked.
// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(context.Background())
ccb := &ccBalancerWrapper{
cc: cc,
opts: bopts,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
cc: cc,
opts: bopts,
mode: ccbModeIdle,
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
return ccb
}

Expand Down Expand Up @@ -258,7 +255,7 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {
// exitIdleMode(), and since we just created a new serializer, we can be
// sure that the below function will be scheduled.
done := make(chan struct{})
ccb.serializer.Schedule(func(_ context.Context) {
ccb.serializer.Schedule(func(context.Context) {
defer close(done)

ccb.mu.Lock()
Expand All @@ -271,7 +268,11 @@ func (ccb *ccBalancerWrapper) exitIdleMode() {

// Gracefulswitch balancer does not support a switchTo operation after
// being closed. Hence we need to create a new one here.
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
opts := ccb.opts
if c := opts.DialCreds; c != nil {
opts.DialCreds = c.Clone()
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, opts)
ccb.mode = ccbModeActive
channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")

Expand Down Expand Up @@ -337,7 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}

Expand Down
104 changes: 45 additions & 59 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.exitIdleCond = sync.NewCond(&cc.mu)

// Apply dial options.
disableGlobalOpts := false
for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
Expand All @@ -177,21 +178,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
for _, opt := range opts {
opt.apply(&cc.dopts)
}

chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)

defer func() {
if err != nil {
cc.Close()
}
}()

// Register ClientConn with channelz.
cc.channelzRegistration(target)

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)

if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
Expand All @@ -211,6 +200,37 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.dopts.copts.UserAgent = grpcUA
}

// Register ClientConn with channelz.
cc.channelzRegistration(target)

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}
if err = cc.determineAuthority(); err != nil {
channelz.RemoveEntry(cc.channelzID)
return nil, err
}

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})

defer func() {
if err != nil {
cc.Close()
}
}()

if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
Expand All @@ -235,14 +255,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.dopts.bs = backoff.DefaultExponential
}

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
return nil, err
}
if err = cc.determineAuthority(); err != nil {
return nil, err
}

if cc.dopts.scChan != nil {
// Blocking wait for the initial service config.
select {
Expand Down Expand Up @@ -359,31 +371,13 @@ func (cc *ClientConn) exitIdleMode() error {
}()

cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
}
cc.pickerWrapper.exitIdleMode()

var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
if cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
} else {
cc.balancerWrapper.exitIdleMode()
}
cc.balancerWrapper.exitIdleMode()
cc.firstResolveEvent = grpcsync.NewEvent()
cc.mu.Unlock()

Expand All @@ -394,9 +388,7 @@ func (cc *ClientConn) exitIdleMode() error {
return err
}

if exitedIdle {
cc.addTraceEvent("exiting idle mode")
}
cc.addTraceEvent("exiting idle mode")
return nil
}

Expand Down Expand Up @@ -427,7 +419,7 @@ func (cc *ClientConn) enterIdleMode() error {
// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
// try to do the same for the balancer and picker wrappers too.
cc.resolverWrapper.close()
cc.blockingpicker.enterIdleMode()
cc.pickerWrapper.enterIdleMode()
cc.balancerWrapper.enterIdleMode()
cc.csMgr.updateState(connectivity.Idle)
cc.idlenessState = ccIdlenessStateIdle
Expand Down Expand Up @@ -655,7 +647,7 @@ type ClientConn struct {
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
blockingpicker *pickerWrapper
pickerWrapper *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
czData *channelzData
retryThrottler atomic.Value // Updated from service config.
Expand Down Expand Up @@ -910,7 +902,7 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
}

Expand Down Expand Up @@ -1174,7 +1166,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
}

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
Expand Down Expand Up @@ -1267,24 +1259,18 @@ func (cc *ClientConn) Close() error {
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)

pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
bWrapper := cc.balancerWrapper
idlenessMgr := cc.idlenessMgr
// We can safely unlock and continue to access all fields now as
// cc.conns==nil, preventing any further operations on cc.
cc.mu.Unlock()

// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
if pWrapper != nil {
pWrapper.close()
}
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
cc.pickerWrapper.close()
cc.balancerWrapper.close()
if rWrapper := cc.resolverWrapper; rWrapper != nil {
rWrapper.close()
}
if idlenessMgr != nil {
if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil {
idlenessMgr.Close()
}

Expand Down

0 comments on commit 4e8bdb7

Please sign in to comment.