From 4e8bdb7af888f2ab7082e071e82f0964e0d74f19 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 10:47:19 -0800 Subject: [PATCH] client: simplify initialization and cleanup a bit (#6798) --- balancer_conn_wrappers.go | 23 +++++---- clientconn.go | 104 +++++++++++++++++--------------------- 2 files changed, 57 insertions(+), 70 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index a4411c22bfc8..4b82ff8168e1 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -76,17 +76,14 @@ type ccBalancerWrapper struct { mode ccbMode // Tracks the current mode of the wrapper. } -// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer -// is not created until the switchTo() method is invoked. +// newCCBalancerWrapper creates a new balancer wrapper in idle state. The +// underlying balancer is not created until the switchTo() method is invoked. func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { - ctx, cancel := context.WithCancel(context.Background()) ccb := &ccBalancerWrapper{ - cc: cc, - opts: bopts, - serializer: grpcsync.NewCallbackSerializer(ctx), - serializerCancel: cancel, + cc: cc, + opts: bopts, + mode: ccbModeIdle, } - ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) return ccb } @@ -258,7 +255,7 @@ func (ccb *ccBalancerWrapper) exitIdleMode() { // exitIdleMode(), and since we just created a new serializer, we can be // sure that the below function will be scheduled. done := make(chan struct{}) - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(context.Context) { defer close(done) ccb.mu.Lock() @@ -271,7 +268,11 @@ func (ccb *ccBalancerWrapper) exitIdleMode() { // Gracefulswitch balancer does not support a switchTo operation after // being closed. Hence we need to create a new one here. - ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) + opts := ccb.opts + if c := opts.DialCreds; c != nil { + opts.DialCreds = c.Clone() + } + ccb.balancer = gracefulswitch.NewBalancer(ccb, opts) ccb.mode = ccbModeActive channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") @@ -337,7 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { // case where we wait for ready and then perform an RPC. If the picker is // updated later, we could call the "connecting" picker when the state is // updated, and then call the "ready" picker after the picker gets updated. - ccb.cc.blockingpicker.updatePicker(s.Picker) + ccb.cc.pickerWrapper.updatePicker(s.Picker) ccb.cc.csMgr.updateState(s.ConnectivityState) } diff --git a/clientconn.go b/clientconn.go index c7bf6849f07e..1e4e74d6cfec 100644 --- a/clientconn.go +++ b/clientconn.go @@ -160,6 +160,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.exitIdleCond = sync.NewCond(&cc.mu) + // Apply dial options. disableGlobalOpts := false for _, opt := range opts { if _, ok := opt.(*disableGlobalDialOptions); ok { @@ -177,21 +178,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * for _, opt := range opts { opt.apply(&cc.dopts) } - chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) - defer func() { - if err != nil { - cc.Close() - } - }() - - // Register ClientConn with channelz. - cc.channelzRegistration(target) - - cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) - if err := cc.validateTransportCredentials(); err != nil { return nil, err } @@ -211,6 +200,37 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.dopts.copts.UserAgent = grpcUA } + // Register ClientConn with channelz. + cc.channelzRegistration(target) + + // Determine the resolver to use. + if err := cc.parseTargetAndFindResolver(); err != nil { + channelz.RemoveEntry(cc.channelzID) + return nil, err + } + if err = cc.determineAuthority(); err != nil { + channelz.RemoveEntry(cc.channelzID) + return nil, err + } + + cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) + cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) + cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ + DialCreds: cc.dopts.copts.TransportCredentials, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + Authority: cc.authority, + CustomUserAgent: cc.dopts.copts.UserAgent, + ChannelzParentID: cc.channelzID, + Target: cc.parsedTarget, + }) + + defer func() { + if err != nil { + cc.Close() + } + }() + if cc.dopts.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) @@ -235,14 +255,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.dopts.bs = backoff.DefaultExponential } - // Determine the resolver to use. - if err := cc.parseTargetAndFindResolver(); err != nil { - return nil, err - } - if err = cc.determineAuthority(); err != nil { - return nil, err - } - if cc.dopts.scChan != nil { // Blocking wait for the initial service config. select { @@ -359,31 +371,13 @@ func (cc *ClientConn) exitIdleMode() error { }() cc.idlenessState = ccIdlenessStateExitingIdle - exitedIdle := false - if cc.blockingpicker == nil { - cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) - } else { - cc.blockingpicker.exitIdleMode() - exitedIdle = true - } + cc.pickerWrapper.exitIdleMode() var credsClone credentials.TransportCredentials if creds := cc.dopts.copts.TransportCredentials; creds != nil { credsClone = creds.Clone() } - if cc.balancerWrapper == nil { - cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ - DialCreds: credsClone, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - Authority: cc.authority, - CustomUserAgent: cc.dopts.copts.UserAgent, - ChannelzParentID: cc.channelzID, - Target: cc.parsedTarget, - }) - } else { - cc.balancerWrapper.exitIdleMode() - } + cc.balancerWrapper.exitIdleMode() cc.firstResolveEvent = grpcsync.NewEvent() cc.mu.Unlock() @@ -394,9 +388,7 @@ func (cc *ClientConn) exitIdleMode() error { return err } - if exitedIdle { - cc.addTraceEvent("exiting idle mode") - } + cc.addTraceEvent("exiting idle mode") return nil } @@ -427,7 +419,7 @@ func (cc *ClientConn) enterIdleMode() error { // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should // try to do the same for the balancer and picker wrappers too. cc.resolverWrapper.close() - cc.blockingpicker.enterIdleMode() + cc.pickerWrapper.enterIdleMode() cc.balancerWrapper.enterIdleMode() cc.csMgr.updateState(connectivity.Idle) cc.idlenessState = ccIdlenessStateIdle @@ -655,7 +647,7 @@ type ClientConn struct { // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. csMgr *connectivityStateManager - blockingpicker *pickerWrapper + pickerWrapper *pickerWrapper safeConfigSelector iresolver.SafeConfigSelector czData *channelzData retryThrottler atomic.Value // Updated from service config. @@ -910,7 +902,7 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config) } cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) - cc.blockingpicker.updatePicker(base.NewErrPicker(err)) + cc.pickerWrapper.updatePicker(base.NewErrPicker(err)) cc.csMgr.updateState(connectivity.TransientFailure) } @@ -1174,7 +1166,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { } func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) { - return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ + return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{ Ctx: ctx, FullMethodName: method, }) @@ -1267,24 +1259,18 @@ func (cc *ClientConn) Close() error { cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) - pWrapper := cc.blockingpicker - rWrapper := cc.resolverWrapper - bWrapper := cc.balancerWrapper - idlenessMgr := cc.idlenessMgr + // We can safely unlock and continue to access all fields now as + // cc.conns==nil, preventing any further operations on cc. cc.mu.Unlock() // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. - if pWrapper != nil { - pWrapper.close() - } - if bWrapper != nil { - bWrapper.close() - } - if rWrapper != nil { + cc.pickerWrapper.close() + cc.balancerWrapper.close() + if rWrapper := cc.resolverWrapper; rWrapper != nil { rWrapper.close() } - if idlenessMgr != nil { + if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil { idlenessMgr.Close() }