diff --git a/balancer_conn_wrappers.go b/balancer_wrapper.go similarity index 57% rename from balancer_conn_wrappers.go rename to balancer_wrapper.go index a4411c22bfc8..b5e30cff0215 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_wrapper.go @@ -32,21 +32,13 @@ import ( "google.golang.org/grpc/resolver" ) -type ccbMode int - -const ( - ccbModeActive = iota - ccbModeIdle - ccbModeClosed - ccbModeExitingIdle -) - // ccBalancerWrapper sits between the ClientConn and the Balancer. // // ccBalancerWrapper implements methods corresponding to the ones on the // balancer.Balancer interface. The ClientConn is free to call these methods // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn -// to the Balancer happen synchronously and in order. +// to the Balancer happen in order by performing them in the serializer, without +// any mutexes held. // // ccBalancerWrapper also implements the balancer.ClientConn interface and is // passed to the Balancer implementations. It invokes unexported methods on the @@ -57,87 +49,75 @@ const ( type ccBalancerWrapper struct { // The following fields are initialized when the wrapper is created and are // read-only afterwards, and therefore can be accessed without a mutex. - cc *ClientConn - opts balancer.BuildOptions + cc *ClientConn + opts balancer.BuildOptions + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc - // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled in the serializer. Fields - // accessed *only* in these serializer callbacks, can therefore be accessed - // without a mutex. - balancer *gracefulswitch.Balancer + // The following fields are only accessed within the serializer or during + // initialization. curBalancerName string + balancer *gracefulswitch.Balancer - // mu guards access to the below fields. Access to the serializer and its - // cancel function needs to be mutex protected because they are overwritten - // when the wrapper exits idle mode. - mu sync.Mutex - serializer *grpcsync.CallbackSerializer // To serialize all outoing calls. - serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time. - mode ccbMode // Tracks the current mode of the wrapper. + // The following field is protected by mu. Caller must take cc.mu before + // taking mu. + mu sync.Mutex + closed bool } -// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer -// is not created until the switchTo() method is invoked. -func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { - ctx, cancel := context.WithCancel(context.Background()) +// newCCBalancerWrapper creates a new balancer wrapper in idle state. The +// underlying balancer is not created until the switchTo() method is invoked. +func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper { + ctx, cancel := context.WithCancel(cc.ctx) ccb := &ccBalancerWrapper{ - cc: cc, - opts: bopts, + cc: cc, + opts: balancer.BuildOptions{ + DialCreds: cc.dopts.copts.TransportCredentials, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + Authority: cc.authority, + CustomUserAgent: cc.dopts.copts.UserAgent, + ChannelzParentID: cc.channelzID, + Target: cc.parsedTarget, + }, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, } - ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) + ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) return ccb } // updateClientConnState is invoked by grpc to push a ClientConnState update to -// the underlying balancer. +// the underlying balancer. This is always executed from the serializer, so +// it is safe to call into the balancer here. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { - ccb.mu.Lock() - errCh := make(chan error, 1) - // Here and everywhere else where Schedule() is called, it is done with the - // lock held. But the lock guards only the scheduling part. The actual - // callback is called asynchronously without the lock being held. - ok := ccb.serializer.Schedule(func(_ context.Context) { - errCh <- ccb.balancer.UpdateClientConnState(*ccs) + errCh := make(chan error) + ok := ccb.serializer.Schedule(func(ctx context.Context) { + defer close(errCh) + if ctx.Err() != nil || ccb.balancer == nil { + return + } + err := ccb.balancer.UpdateClientConnState(*ccs) + if logger.V(2) && err != nil { + logger.Infof("error from balancer.UpdateClientConnState: %v", err) + } + errCh <- err }) if !ok { - // If we are unable to schedule a function with the serializer, it - // indicates that it has been closed. A serializer is only closed when - // the wrapper is closed or is in idle. - ccb.mu.Unlock() - return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") - } - ccb.mu.Unlock() - - // We get here only if the above call to Schedule succeeds, in which case it - // is guaranteed that the scheduled function will run. Therefore it is safe - // to block on this channel. - err := <-errCh - if logger.V(2) && err != nil { - logger.Infof("error from balancer.UpdateClientConnState: %v", err) + return nil } - return err -} - -// updateSubConnState is invoked by grpc to push a subConn state update to the -// underlying balancer. -func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { - // Even though it is optional for balancers, gracefulswitch ensures - // opts.StateListener is set, so this cannot ever be nil. - sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) - }) - ccb.mu.Unlock() + return <-errCh } +// resolverError is invoked by grpc to push a resolver error to the underlying +// balancer. The call to the balancer is executed from the serializer. func (ccb *ccBalancerWrapper) resolverError(err error) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccb.balancer == nil { + return + } ccb.balancer.ResolverError(err) }) - ccb.mu.Unlock() } // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -151,8 +131,10 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { - ccb.mu.Lock() - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccb.balancer == nil { + return + } // TODO: Other languages use case-sensitive balancer registries. We should // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. if strings.EqualFold(ccb.curBalancerName, name) { @@ -160,7 +142,6 @@ func (ccb *ccBalancerWrapper) switchTo(name string) { } ccb.buildLoadBalancingPolicy(name) }) - ccb.mu.Unlock() } // buildLoadBalancingPolicy performs the following: @@ -187,115 +168,49 @@ func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { ccb.curBalancerName = builder.Name() } +// close initiates async shutdown of the wrapper. cc.mu must be held when +// calling this function. To determine the wrapper has finished shutting down, +// the channel should block on ccb.serializer.Done() without cc.mu held. func (ccb *ccBalancerWrapper) close() { - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") - ccb.closeBalancer(ccbModeClosed) -} - -// enterIdleMode is invoked by grpc when the channel enters idle mode upon -// expiry of idle_timeout. This call blocks until the balancer is closed. -func (ccb *ccBalancerWrapper) enterIdleMode() { - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") - ccb.closeBalancer(ccbModeIdle) -} - -// closeBalancer is invoked when the channel is being closed or when it enters -// idle mode upon expiry of idle_timeout. -func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { ccb.mu.Lock() - if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { - ccb.mu.Unlock() - return - } - - ccb.mode = m - done := ccb.serializer.Done() - b := ccb.balancer - ok := ccb.serializer.Schedule(func(_ context.Context) { - // Close the serializer to ensure that no more calls from gRPC are sent - // to the balancer. - ccb.serializerCancel() - // Empty the current balancer name because we don't have a balancer - // anymore and also so that we act on the next call to switchTo by - // creating a new balancer specified by the new resolver. - ccb.curBalancerName = "" - }) - if !ok { - ccb.mu.Unlock() - return - } + ccb.closed = true ccb.mu.Unlock() - - // Give enqueued callbacks a chance to finish before closing the balancer. - <-done - b.Close() -} - -// exitIdleMode is invoked by grpc when the channel exits idle mode either -// because of an RPC or because of an invocation of the Connect() API. This -// recreates the balancer that was closed previously when entering idle mode. -// -// If the channel is not in idle mode, we know for a fact that we are here as a -// result of the user calling the Connect() method on the ClientConn. In this -// case, we can simply forward the call to the underlying balancer, instructing -// it to reconnect to the backends. -func (ccb *ccBalancerWrapper) exitIdleMode() { - ccb.mu.Lock() - if ccb.mode == ccbModeClosed { - // Request to exit idle is a no-op when wrapper is already closed. - ccb.mu.Unlock() - return - } - - if ccb.mode == ccbModeIdle { - // Recreate the serializer which was closed when we entered idle. - ctx, cancel := context.WithCancel(context.Background()) - ccb.serializer = grpcsync.NewCallbackSerializer(ctx) - ccb.serializerCancel = cancel - } - - // The ClientConn guarantees that mutual exclusion between close() and - // exitIdleMode(), and since we just created a new serializer, we can be - // sure that the below function will be scheduled. - done := make(chan struct{}) - ccb.serializer.Schedule(func(_ context.Context) { - defer close(done) - - ccb.mu.Lock() - defer ccb.mu.Unlock() - - if ccb.mode != ccbModeIdle { - ccb.balancer.ExitIdle() + channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") + ccb.serializer.Schedule(func(context.Context) { + if ccb.balancer == nil { return } - - // Gracefulswitch balancer does not support a switchTo operation after - // being closed. Hence we need to create a new one here. - ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) - ccb.mode = ccbModeActive - channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") - + ccb.balancer.Close() + ccb.balancer = nil }) - ccb.mu.Unlock() - - <-done + ccb.serializerCancel() } -func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { - ccb.mu.Lock() - defer ccb.mu.Unlock() - return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed +// exitIdle invokes the balancer's exitIdle method in the serializer. +func (ccb *ccBalancerWrapper) exitIdle() { + ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccb.balancer == nil { + return + } + ccb.balancer.ExitIdle() + }) } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { - if ccb.isIdleOrClosed() { - return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") + ccb.cc.mu.Lock() + defer ccb.cc.mu.Unlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() + return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed") } + ccb.mu.Unlock() if len(addrs) == 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } - ac, err := ccb.cc.newAddrConn(addrs, opts) + ac, err := ccb.cc.newAddrConnLocked(addrs, opts) if err != nil { channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err @@ -316,10 +231,6 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { - if ccb.isIdleOrClosed() { - return - } - acbw, ok := sc.(*acBalancerWrapper) if !ok { return @@ -328,25 +239,39 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { - if ccb.isIdleOrClosed() { + ccb.cc.mu.Lock() + defer ccb.cc.mu.Unlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() return } - + ccb.mu.Unlock() // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is // updated later, we could call the "connecting" picker when the state is // updated, and then call the "ready" picker after the picker gets updated. - ccb.cc.blockingpicker.updatePicker(s.Picker) + + // Note that there is no need to check if the balancer wrapper was closed, + // as we know the graceful switch LB policy will not call cc if it has been + // closed. + ccb.cc.pickerWrapper.updatePicker(s.Picker) ccb.cc.csMgr.updateState(s.ConnectivityState) } func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { - if ccb.isIdleOrClosed() { + ccb.cc.mu.RLock() + defer ccb.cc.mu.RUnlock() + + ccb.mu.Lock() + if ccb.closed { + ccb.mu.Unlock() return } - - ccb.cc.resolveNow(o) + ccb.mu.Unlock() + ccb.cc.resolveNowLocked(o) } func (ccb *ccBalancerWrapper) Target() string { @@ -364,6 +289,20 @@ type acBalancerWrapper struct { producers map[balancer.ProducerBuilder]*refCountedProducer } +// updateState is invoked by grpc to push a subConn state update to the +// underlying balancer. +func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) { + acbw.ccb.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || acbw.ccb.balancer == nil { + return + } + // Even though it is optional for balancers, gracefulswitch ensures + // opts.StateListener is set, so this cannot ever be nil. + // TODO: delete this comment when UpdateSubConnState is removed. + acbw.stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) + }) +} + func (acbw *acBalancerWrapper) String() string { return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int()) } @@ -377,20 +316,7 @@ func (acbw *acBalancerWrapper) Connect() { } func (acbw *acBalancerWrapper) Shutdown() { - ccb := acbw.ccb - if ccb.isIdleOrClosed() { - // It it safe to ignore this call when the balancer is closed or in idle - // because the ClientConn takes care of closing the connections. - // - // Not returning early from here when the balancer is closed or in idle - // leads to a deadlock though, because of the following sequence of - // calls when holding cc.mu: - // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> - // ccb.RemoveAddrConn --> cc.removeAddrConn - return - } - - ccb.cc.removeAddrConn(acbw.ac, errConnDrain) + acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain) } // NewStream begins a streaming RPC on the addrConn. If the addrConn is not diff --git a/balancer_conn_wrappers_test.go b/balancer_wrapper_test.go similarity index 95% rename from balancer_conn_wrappers_test.go rename to balancer_wrapper_test.go index 4fd09c145a9f..2892136384fd 100644 --- a/balancer_conn_wrappers_test.go +++ b/balancer_wrapper_test.go @@ -55,7 +55,7 @@ func (s) TestBalancer_StateListenerBeforeConnect(t *testing.T) { t.Error("Unexpected call to StateListener with:", scs) }, }) - if err != nil && !strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "is deleted") && !strings.Contains(err.Error(), "is closed or idle") { + if err != nil && !strings.Contains(err.Error(), "connection is closing") && !strings.Contains(err.Error(), "is deleted") && !strings.Contains(err.Error(), "is closed or idle") && !strings.Contains(err.Error(), "balancer is being closed") { t.Error("Unexpected error creating subconn:", err) } wg.Done() diff --git a/clientconn.go b/clientconn.go index c7bf6849f07e..e6f2625b6844 100644 --- a/clientconn.go +++ b/clientconn.go @@ -33,9 +33,7 @@ import ( "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/idle" @@ -119,23 +117,8 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires }, nil } -// DialContext creates a client connection to the given target. By default, it's -// a non-blocking dial (the function won't wait for connections to be -// established, and connecting happens in the background). To make it a blocking -// dial, use WithBlock() dial option. -// -// In the non-blocking case, the ctx does not act against the connection. It -// only controls the setup steps. -// -// In the blocking case, ctx can be used to cancel or expire the pending -// connection. Once this function returns, the cancellation and expiration of -// ctx will be noop. Users should call ClientConn.Close to terminate all the -// pending operations after this function returns. -// -// The target name syntax is defined in -// https://github.com/grpc/grpc/blob/master/doc/naming.md. -// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. -func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { +// newClient returns a new client in idle mode. +func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, conns: make(map[*addrConn]struct{}), @@ -143,23 +126,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * czData: new(channelzData), } - // We start the channel off in idle mode, but kick it out of idle at the end - // of this method, instead of waiting for the first RPC. Other gRPC - // implementations do wait for the first RPC to kick the channel out of - // idle. But doing so would be a major behavior change for our users who are - // used to seeing the channel active after Dial. - // - // Taking this approach of kicking it out of idle at the end of this method - // allows us to share the code between channel creation and exiting idle - // mode. This will also make it easy for us to switch to starting the - // channel off in idle, if at all we ever get to do that. - cc.idlenessState = ccIdlenessStateIdle - cc.retryThrottler.Store((*retryThrottler)(nil)) cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) - cc.exitIdleCond = sync.NewCond(&cc.mu) + // Apply dial options. disableGlobalOpts := false for _, opt := range opts { if _, ok := opt.(*disableGlobalDialOptions); ok { @@ -177,21 +148,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * for _, opt := range opts { opt.apply(&cc.dopts) } - chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) - defer func() { - if err != nil { - cc.Close() - } - }() - - // Register ClientConn with channelz. - cc.channelzRegistration(target) - - cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) - if err := cc.validateTransportCredentials(); err != nil { return nil, err } @@ -205,10 +164,80 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } cc.mkp = cc.dopts.copts.KeepaliveParams - if cc.dopts.copts.UserAgent != "" { - cc.dopts.copts.UserAgent += " " + grpcUA - } else { - cc.dopts.copts.UserAgent = grpcUA + // Register ClientConn with channelz. + cc.channelzRegistration(target) + + // TODO: Ideally it should be impossible to error from this function after + // channelz registration. This will require removing some channelz logs + // from the following functions that can error. Errors can be returned to + // the user, and successful logs can be emitted here, after the checks have + // passed and channelz is subsequently registered. + + // Determine the resolver to use. + if err := cc.parseTargetAndFindResolver(); err != nil { + channelz.RemoveEntry(cc.channelzID) + return nil, err + } + if err = cc.determineAuthority(); err != nil { + channelz.RemoveEntry(cc.channelzID) + return nil, err + } + + cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) + cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) + + cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc. + cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout) + return cc, nil +} + +// DialContext creates a client connection to the given target. By default, it's +// a non-blocking dial (the function won't wait for connections to be +// established, and connecting happens in the background). To make it a blocking +// dial, use WithBlock() dial option. +// +// In the non-blocking case, the ctx does not act against the connection. It +// only controls the setup steps. +// +// In the blocking case, ctx can be used to cancel or expire the pending +// connection. Once this function returns, the cancellation and expiration of +// ctx will be noop. Users should call ClientConn.Close to terminate all the +// pending operations after this function returns. +// +// The target name syntax is defined in +// https://github.com/grpc/grpc/blob/master/doc/naming.md. +// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. +func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { + cc, err := newClient(target, opts...) + if err != nil { + return nil, err + } + + // We start the channel off in idle mode, but kick it out of idle now, + // instead of waiting for the first RPC. Other gRPC implementations do wait + // for the first RPC to kick the channel out of idle. But doing so would be + // a major behavior change for our users who are used to seeing the channel + // active after Dial. + // + // Taking this approach of kicking it out of idle at the end of this method + // allows us to share the code between channel creation and exiting idle + // mode. This will also make it easy for us to switch to starting the + // channel off in idle, i.e. by making newClient exported. + + defer func() { + if err != nil { + cc.Close() + } + }() + + // This creates the name resolver, load balancer, etc. + if err := cc.idlenessMgr.ExitIdleMode(); err != nil { + return nil, err + } + + // Return now for non-blocking dials. + if !cc.dopts.block { + return cc, nil } if cc.dopts.timeout > 0 { @@ -231,49 +260,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() - if cc.dopts.bs == nil { - cc.dopts.bs = backoff.DefaultExponential - } - - // Determine the resolver to use. - if err := cc.parseTargetAndFindResolver(); err != nil { - return nil, err - } - if err = cc.determineAuthority(); err != nil { - return nil, err - } - - if cc.dopts.scChan != nil { - // Blocking wait for the initial service config. - select { - case sc, ok := <-cc.dopts.scChan: - if ok { - cc.sc = &sc - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) - } - case <-ctx.Done(): - return nil, ctx.Err() - } - } - if cc.dopts.scChan != nil { - go cc.scWatcher() - } - - // This creates the name resolver, load balancer, blocking picker etc. - if err := cc.exitIdleMode(); err != nil { - return nil, err - } - - // Configure idleness support with configured idle timeout or default idle - // timeout duration. Idleness can be explicitly disabled by the user, by - // setting the dial option to 0. - cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger}) - - // Return early for non-blocking dials. - if !cc.dopts.block { - return cc, nil - } - // A blocking dial blocks until the clientConn is ready. for { s := cc.GetState() @@ -320,8 +306,8 @@ func (cc *ClientConn) addTraceEvent(msg string) { type idler ClientConn -func (i *idler) EnterIdleMode() error { - return (*ClientConn)(i).enterIdleMode() +func (i *idler) EnterIdleMode() { + (*ClientConn)(i).enterIdleMode() } func (i *idler) ExitIdleMode() error { @@ -329,117 +315,71 @@ func (i *idler) ExitIdleMode() error { } // exitIdleMode moves the channel out of idle mode by recreating the name -// resolver and load balancer. -func (cc *ClientConn) exitIdleMode() error { +// resolver and load balancer. This should never be called directly; use +// cc.idlenessMgr.ExitIdleMode instead. +func (cc *ClientConn) exitIdleMode() (err error) { cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() return errConnClosing } - if cc.idlenessState != ccIdlenessStateIdle { - channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState) - cc.mu.Unlock() - return nil - } - - defer func() { - // When Close() and exitIdleMode() race against each other, one of the - // following two can happen: - // - Close() wins the race and runs first. exitIdleMode() runs after, and - // sees that the ClientConn is already closed and hence returns early. - // - exitIdleMode() wins the race and runs first and recreates the balancer - // and releases the lock before recreating the resolver. If Close() runs - // in this window, it will wait for exitIdleMode to complete. - // - // We achieve this synchronization using the below condition variable. - cc.mu.Lock() - cc.idlenessState = ccIdlenessStateActive - cc.exitIdleCond.Signal() - cc.mu.Unlock() - }() - - cc.idlenessState = ccIdlenessStateExitingIdle - exitedIdle := false - if cc.blockingpicker == nil { - cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) - } else { - cc.blockingpicker.exitIdleMode() - exitedIdle = true - } - - var credsClone credentials.TransportCredentials - if creds := cc.dopts.copts.TransportCredentials; creds != nil { - credsClone = creds.Clone() - } - if cc.balancerWrapper == nil { - cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ - DialCreds: credsClone, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - Authority: cc.authority, - CustomUserAgent: cc.dopts.copts.UserAgent, - ChannelzParentID: cc.channelzID, - Target: cc.parsedTarget, - }) - } else { - cc.balancerWrapper.exitIdleMode() - } - cc.firstResolveEvent = grpcsync.NewEvent() cc.mu.Unlock() // This needs to be called without cc.mu because this builds a new resolver - // which might update state or report error inline which needs to be handled - // by cc.updateResolverState() which also grabs cc.mu. - if err := cc.initResolverWrapper(credsClone); err != nil { + // which might update state or report error inline, which would then need to + // acquire cc.mu. + if err := cc.resolverWrapper.start(); err != nil { return err } - if exitedIdle { - cc.addTraceEvent("exiting idle mode") - } + cc.addTraceEvent("exiting idle mode") return nil } +// initIdleStateLocked initializes common state to how it should be while idle. +func (cc *ClientConn) initIdleStateLocked() { + cc.resolverWrapper = newCCResolverWrapper(cc) + cc.balancerWrapper = newCCBalancerWrapper(cc) + cc.firstResolveEvent = grpcsync.NewEvent() + // cc.conns == nil is a proxy for the ClientConn being closed. So, instead + // of setting it to nil here, we recreate the map. This also means that we + // don't have to do this when exiting idle mode. + cc.conns = make(map[*addrConn]struct{}) +} + // enterIdleMode puts the channel in idle mode, and as part of it shuts down the -// name resolver, load balancer and any subchannels. -func (cc *ClientConn) enterIdleMode() error { +// name resolver, load balancer, and any subchannels. This should never be +// called directly; use cc.idlenessMgr.EnterIdleMode instead. +func (cc *ClientConn) enterIdleMode() { cc.mu.Lock() - defer cc.mu.Unlock() if cc.conns == nil { - return ErrClientConnClosing - } - if cc.idlenessState != ccIdlenessStateActive { - channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) - return nil + cc.mu.Unlock() + return } - // cc.conns == nil is a proxy for the ClientConn being closed. So, instead - // of setting it to nil here, we recreate the map. This also means that we - // don't have to do this when exiting idle mode. conns := cc.conns - cc.conns = make(map[*addrConn]struct{}) - // TODO: Currently, we close the resolver wrapper upon entering idle mode - // and create a new one upon exiting idle mode. This means that the - // `cc.resolverWrapper` field would be overwritten everytime we exit idle - // mode. While this means that we need to hold `cc.mu` when accessing - // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should - // try to do the same for the balancer and picker wrappers too. - cc.resolverWrapper.close() - cc.blockingpicker.enterIdleMode() - cc.balancerWrapper.enterIdleMode() + rWrapper := cc.resolverWrapper + rWrapper.close() + cc.pickerWrapper.reset() + bWrapper := cc.balancerWrapper + bWrapper.close() cc.csMgr.updateState(connectivity.Idle) - cc.idlenessState = ccIdlenessStateIdle cc.addTraceEvent("entering idle mode") - go func() { - for ac := range conns { - ac.tearDown(errConnIdling) - } - }() + cc.initIdleStateLocked() - return nil + cc.mu.Unlock() + + // Block until the name resolver and LB policy are closed. + <-rWrapper.serializer.Done() + <-bWrapper.serializer.Done() + + // Close all subchannels after the LB policy is closed. + for ac := range conns { + ac.tearDown(errConnIdling) + } } // validateTransportCredentials performs a series of checks on the configured @@ -649,66 +589,35 @@ type ClientConn struct { dopts dialOptions // Default and user specified dial options. channelzID *channelz.Identifier // Channelz identifier for the channel. resolverBuilder resolver.Builder // See parseTargetAndFindResolver(). - balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. - idlenessMgr idle.Manager + idlenessMgr *idle.Manager // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. csMgr *connectivityStateManager - blockingpicker *pickerWrapper + pickerWrapper *pickerWrapper safeConfigSelector iresolver.SafeConfigSelector czData *channelzData retryThrottler atomic.Value // Updated from service config. - // firstResolveEvent is used to track whether the name resolver sent us at - // least one update. RPCs block on this event. - firstResolveEvent *grpcsync.Event - // mu protects the following fields. // TODO: split mu so the same mutex isn't used for everything. mu sync.RWMutex - resolverWrapper *ccResolverWrapper // Initialized in Dial; cleared in Close. + resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close. + balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close. sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. - idlenessState ccIdlenessState // Tracks idleness state of the channel. - exitIdleCond *sync.Cond // Signalled when channel exits idle. + // firstResolveEvent is used to track whether the name resolver sent us at + // least one update. RPCs block on this event. May be accessed without mu + // if we know we cannot be asked to enter idle mode while accessing it (e.g. + // when the idle manager has already been closed, or if we are already + // entering idle mode). + firstResolveEvent *grpcsync.Event lceMu sync.Mutex // protects lastConnectionError lastConnectionError error } -// ccIdlenessState tracks the idleness state of the channel. -// -// Channels start off in `active` and move to `idle` after a period of -// inactivity. When moving back to `active` upon an incoming RPC, they -// transition through `exiting_idle`. This state is useful for synchronization -// with Close(). -// -// This state tracking is mostly for self-protection. The idlenessManager is -// expected to keep track of the state as well, and is expected not to call into -// the ClientConn unnecessarily. -type ccIdlenessState int8 - -const ( - ccIdlenessStateActive ccIdlenessState = iota - ccIdlenessStateIdle - ccIdlenessStateExitingIdle -) - -func (s ccIdlenessState) String() string { - switch s { - case ccIdlenessStateActive: - return "active" - case ccIdlenessStateIdle: - return "idle" - case ccIdlenessStateExitingIdle: - return "exitingIdle" - default: - return "unknown" - } -} - // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or // ctx expires. A true value is returned in former case and false in latter. // @@ -748,29 +657,15 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - cc.exitIdleMode() + if err := cc.idlenessMgr.ExitIdleMode(); err != nil { + cc.addTraceEvent(err.Error()) + return + } // If the ClientConn was not in idle mode, we need to call ExitIdle on the // LB policy so that connections can be created. - cc.balancerWrapper.exitIdleMode() -} - -func (cc *ClientConn) scWatcher() { - for { - select { - case sc, ok := <-cc.dopts.scChan: - if !ok { - return - } - cc.mu.Lock() - // TODO: load balance policy runtime change is ignored. - // We may revisit this decision in the future. - cc.sc = &sc - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) - cc.mu.Unlock() - case <-cc.ctx.Done(): - return - } - } + cc.mu.Lock() + cc.balancerWrapper.exitIdle() + cc.mu.Unlock() } // waitForResolvedAddrs blocks until the resolver has provided addresses or the @@ -804,11 +699,11 @@ func init() { internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() { return cc.csMgr.pubSub.Subscribe(s) } - internal.EnterIdleModeForTesting = func(cc *ClientConn) error { - return cc.enterIdleMode() + internal.EnterIdleModeForTesting = func(cc *ClientConn) { + cc.idlenessMgr.EnterIdleModeForTesting() } internal.ExitIdleModeForTesting = func(cc *ClientConn) error { - return cc.exitIdleMode() + return cc.idlenessMgr.ExitIdleMode() } } @@ -824,9 +719,8 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { } } -func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { +func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error { defer cc.firstResolveEvent.Fire() - cc.mu.Lock() // Check if the ClientConn is already closed. Some fields (e.g. // balancerWrapper) are set to nil when closing the ClientConn, and could // cause nil pointer panic if we don't have this check. @@ -872,7 +766,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { if cc.sc == nil { // Apply the failing LB only if we haven't received valid service config // from the name resolver in the past. - cc.applyFailingLB(s.ServiceConfig) + cc.applyFailingLBLocked(s.ServiceConfig) cc.mu.Unlock() return ret } @@ -894,15 +788,13 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { return ret } -// applyFailingLB is akin to configuring an LB policy on the channel which +// applyFailingLBLocked is akin to configuring an LB policy on the channel which // always fails RPCs. Here, an actual LB policy is not configured, but an always // erroring picker is configured, which returns errors with information about // what was invalid in the received service config. A config selector with no // service config is configured, and the connectivity state of the channel is // set to TransientFailure. -// -// Caller must hold cc.mu. -func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { +func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) { var err error if sc.Err != nil { err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err) @@ -910,14 +802,10 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config) } cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) - cc.blockingpicker.updatePicker(base.NewErrPicker(err)) + cc.pickerWrapper.updatePicker(base.NewErrPicker(err)) cc.csMgr.updateState(connectivity.TransientFailure) } -func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { - cc.balancerWrapper.updateSubConnState(sc, s, err) -} - // Makes a copy of the input addresses slice and clears out the balancer // attributes field. Addresses are passed during subconn creation and address // update operations. In both cases, we will clear the balancer attributes by @@ -932,10 +820,14 @@ func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Ad return out } -// newAddrConn creates an addrConn for addrs and adds it to cc.conns. +// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns. // // Caller needs to make sure len(addrs) > 0. -func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { +func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { + if cc.conns == nil { + return nil, ErrClientConnClosing + } + ac := &addrConn{ state: connectivity.Idle, cc: cc, @@ -947,12 +839,6 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub stateChan: make(chan struct{}), } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) - // Track ac in cc. This needs to be done before any getTransport(...) is called. - cc.mu.Lock() - defer cc.mu.Unlock() - if cc.conns == nil { - return nil, ErrClientConnClosing - } var err error ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "") @@ -968,6 +854,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub }, }) + // Track ac in cc. This needs to be done before any getTransport(...) is called. cc.conns[ac] = struct{}{} return ac, nil } @@ -1174,7 +1061,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { } func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) { - return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ + return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{ Ctx: ctx, FullMethodName: method, }) @@ -1216,12 +1103,12 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { cc.mu.RLock() - r := cc.resolverWrapper + cc.resolverWrapper.resolveNow(o) cc.mu.RUnlock() - if r == nil { - return - } - go r.resolveNow(o) +} + +func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) { + cc.resolverWrapper.resolveNow(o) } // ResetConnectBackoff wakes up all subchannels in transient failure and causes @@ -1253,40 +1140,32 @@ func (cc *ClientConn) Close() error { <-cc.csMgr.pubSub.Done() }() + // Prevent calls to enter/exit idle immediately, and ensure we are not + // currently entering/exiting idle mode. + cc.idlenessMgr.Close() + cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() return ErrClientConnClosing } - for cc.idlenessState == ccIdlenessStateExitingIdle { - cc.exitIdleCond.Wait() - } - conns := cc.conns cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) - pWrapper := cc.blockingpicker - rWrapper := cc.resolverWrapper - bWrapper := cc.balancerWrapper - idlenessMgr := cc.idlenessMgr + // We can safely unlock and continue to access all fields now as + // cc.conns==nil, preventing any further operations on cc. cc.mu.Unlock() + cc.resolverWrapper.close() // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. - if pWrapper != nil { - pWrapper.close() - } - if bWrapper != nil { - bWrapper.close() - } - if rWrapper != nil { - rWrapper.close() - } - if idlenessMgr != nil { - idlenessMgr.Close() - } + cc.pickerWrapper.close() + cc.balancerWrapper.close() + + <-cc.resolverWrapper.serializer.Done() + <-cc.balancerWrapper.serializer.Done() for ac := range conns { ac.tearDown(ErrClientConnClosing) @@ -1307,7 +1186,7 @@ type addrConn struct { cc *ClientConn dopts dialOptions - acbw balancer.SubConn + acbw *acBalancerWrapper scopts balancer.NewSubConnOptions // transport is set when there's a viable transport (note: ac state may not be READY as LB channel @@ -1345,7 +1224,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) } else { channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr) } - ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) + ac.acbw.updateState(s, lastErr) } // adjustParams updates parameters used to create transports upon @@ -1849,7 +1728,7 @@ func (cc *ClientConn) parseTargetAndFindResolver() error { if err != nil { channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err) } else { - channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) + channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget) rb = cc.getResolver(parsedTarget.URL.Scheme) if rb != nil { cc.parsedTarget = parsedTarget @@ -2007,32 +1886,3 @@ func (cc *ClientConn) determineAuthority() error { channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority) return nil } - -// initResolverWrapper creates a ccResolverWrapper, which builds the name -// resolver. This method grabs the lock to assign the newly built resolver -// wrapper to the cc.resolverWrapper field. -func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error { - rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ - target: cc.parsedTarget, - builder: cc.resolverBuilder, - bOpts: resolver.BuildOptions{ - DisableServiceConfig: cc.dopts.disableServiceConfig, - DialCreds: creds, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - }, - channelzID: cc.channelzID, - }) - if err != nil { - return fmt.Errorf("failed to build resolver: %v", err) - } - // Resolver implementations may report state update or error inline when - // built (or right after), and this is handled in cc.updateResolverState. - // Also, an error from the resolver might lead to a re-resolution request - // from the balancer, which is handled in resolveNow() where - // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. - cc.mu.Lock() - cc.resolverWrapper = rw - cc.mu.Unlock() - return nil -} diff --git a/dialoptions.go b/dialoptions.go index 4d47e2cb8da5..5b4c37b0424e 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -64,7 +64,6 @@ type dialOptions struct { block bool returnLastError bool timeout time.Duration - scChan <-chan ServiceConfig authority string binaryLogger binarylog.Logger copts transport.ConnectOptions @@ -251,19 +250,6 @@ func WithDecompressor(dc Decompressor) DialOption { }) } -// WithServiceConfig returns a DialOption which has a channel to read the -// service configuration. -// -// Deprecated: service config should be received through name resolver or via -// WithDefaultServiceConfig, as specified at -// https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be -// removed in a future 1.x release. -func WithServiceConfig(c <-chan ServiceConfig) DialOption { - return newFuncDialOption(func(o *dialOptions) { - o.scChan = c - }) -} - // WithConnectParams configures the ClientConn to use the provided ConnectParams // for creating and maintaining connections to servers. // @@ -497,7 +483,7 @@ func FailOnNonTempDialError(f bool) DialOption { // the RPCs. func WithUserAgent(s string) DialOption { return newFuncDialOption(func(o *dialOptions) { - o.copts.UserAgent = s + o.copts.UserAgent = s + " " + grpcUA }) } @@ -647,14 +633,16 @@ func withHealthCheckFunc(f internal.HealthChecker) DialOption { func defaultDialOptions() dialOptions { return dialOptions{ - healthCheckFunc: internal.HealthCheckFunc, copts: transport.ConnectOptions{ - WriteBufferSize: defaultWriteBufSize, ReadBufferSize: defaultReadBufSize, + WriteBufferSize: defaultWriteBufSize, UseProxy: true, + UserAgent: grpcUA, }, - recvBufferPool: nopBufferPool{}, - idleTimeout: 30 * time.Minute, + bs: internalbackoff.DefaultExponential, + healthCheckFunc: internal.HealthCheckFunc, + idleTimeout: 30 * time.Minute, + recvBufferPool: nopBufferPool{}, } } diff --git a/internal/buffer/unbounded.go b/internal/buffer/unbounded.go index 4399c3df4959..11f91668ac9b 100644 --- a/internal/buffer/unbounded.go +++ b/internal/buffer/unbounded.go @@ -18,7 +18,10 @@ // Package buffer provides an implementation of an unbounded buffer. package buffer -import "sync" +import ( + "errors" + "sync" +) // Unbounded is an implementation of an unbounded buffer which does not use // extra goroutines. This is typically used for passing updates from one entity @@ -36,6 +39,7 @@ import "sync" type Unbounded struct { c chan any closed bool + closing bool mu sync.Mutex backlog []any } @@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded { return &Unbounded{c: make(chan any, 1)} } +var errBufferClosed = errors.New("Put called on closed buffer.Unbounded") + // Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t any) { +func (b *Unbounded) Put(t any) error { b.mu.Lock() defer b.mu.Unlock() - if b.closed { - return + if b.closing { + return errBufferClosed } if len(b.backlog) == 0 { select { case b.c <- t: - return + return nil default: } } b.backlog = append(b.backlog, t) + return nil } -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a +// Load sends the earliest buffered data, if any, onto the read channel returned +// by Get(). Users are expected to call this every time they successfully read a // value from the read channel. func (b *Unbounded) Load() { b.mu.Lock() defer b.mu.Unlock() - if b.closed { - return - } if len(b.backlog) > 0 { select { case b.c <- b.backlog[0]: @@ -78,6 +82,8 @@ func (b *Unbounded) Load() { b.backlog = b.backlog[1:] default: } + } else if b.closing && !b.closed { + close(b.c) } } @@ -88,18 +94,23 @@ func (b *Unbounded) Load() { // send the next buffered value onto the channel if there is any. // // If the unbounded buffer is closed, the read channel returned by this method -// is closed. +// is closed after all data is drained. func (b *Unbounded) Get() <-chan any { return b.c } -// Close closes the unbounded buffer. +// Close closes the unbounded buffer. No subsequent data may be Put(), and the +// channel returned from Get() will be closed after all the data is read and +// Load() is called for the final time. func (b *Unbounded) Close() { b.mu.Lock() defer b.mu.Unlock() - if b.closed { + if b.closing { return } - b.closed = true - close(b.c) + b.closing = true + if len(b.backlog) == 0 { + b.closed = true + close(b.c) + } } diff --git a/internal/buffer/unbounded_test.go b/internal/buffer/unbounded_test.go index 1708391e7f27..ef24d0fb7a8c 100644 --- a/internal/buffer/unbounded_test.go +++ b/internal/buffer/unbounded_test.go @@ -52,7 +52,7 @@ func init() { } // TestSingleWriter starts one reader and one writer goroutine and makes sure -// that the reader gets all the value added to the buffer by the writer. +// that the reader gets all the values added to the buffer by the writer. func (s) TestSingleWriter(t *testing.T) { ub := NewUnbounded() reads := []int{} @@ -124,14 +124,25 @@ func (s) TestMultipleWriters(t *testing.T) { // buffer is closed. func (s) TestClose(t *testing.T) { ub := NewUnbounded() + if err := ub.Put(1); err != nil { + t.Fatalf("Unbounded.Put() = %v; want nil", err) + } ub.Close() - if v, ok := <-ub.Get(); ok { - t.Errorf("Unbounded.Get() = %v, want closed channel", v) + if err := ub.Put(1); err == nil { + t.Fatalf("Unbounded.Put() = ; want non-nil error") + } + if v, ok := <-ub.Get(); !ok { + t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true) + } + if err := ub.Put(1); err == nil { + t.Fatalf("Unbounded.Put() = ; want non-nil error") } - ub.Put(1) ub.Load() if v, ok := <-ub.Get(); ok { t.Errorf("Unbounded.Get() = %v, want closed channel", v) } - ub.Close() + if err := ub.Put(1); err == nil { + t.Fatalf("Unbounded.Put() = ; want non-nil error") + } + ub.Close() // ignored } diff --git a/internal/channelz/funcs.go b/internal/channelz/funcs.go index 5395e77529cd..fc094f3441b8 100644 --- a/internal/channelz/funcs.go +++ b/internal/channelz/funcs.go @@ -31,6 +31,7 @@ import ( "time" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" ) const ( @@ -58,6 +59,12 @@ func TurnOn() { } } +func init() { + internal.ChannelzTurnOffForTesting = func() { + atomic.StoreInt32(&curState, 0) + } +} + // IsOn returns whether channelz data collection is on. func IsOn() bool { return atomic.LoadInt32(&curState) == 1 diff --git a/internal/grpcsync/callback_serializer.go b/internal/grpcsync/callback_serializer.go index 900917dbe6c1..f7f40a16acee 100644 --- a/internal/grpcsync/callback_serializer.go +++ b/internal/grpcsync/callback_serializer.go @@ -20,7 +20,6 @@ package grpcsync import ( "context" - "sync" "google.golang.org/grpc/internal/buffer" ) @@ -38,8 +37,6 @@ type CallbackSerializer struct { done chan struct{} callbacks *buffer.Unbounded - closedMu sync.Mutex - closed bool } // NewCallbackSerializer returns a new CallbackSerializer instance. The provided @@ -65,56 +62,34 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { // callbacks to be executed by the serializer. It is not possible to add // callbacks once the context passed to NewCallbackSerializer is cancelled. func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { - cs.closedMu.Lock() - defer cs.closedMu.Unlock() - - if cs.closed { - return false - } - cs.callbacks.Put(f) - return true + return cs.callbacks.Put(f) == nil } func (cs *CallbackSerializer) run(ctx context.Context) { - var backlog []func(context.Context) - defer close(cs.done) + + // TODO: when Go 1.21 is the oldest supported version, this loop and Close + // can be replaced with: + // + // context.AfterFunc(ctx, cs.callbacks.Close) for ctx.Err() == nil { select { case <-ctx.Done(): // Do nothing here. Next iteration of the for loop will not happen, // since ctx.Err() would be non-nil. - case callback, ok := <-cs.callbacks.Get(): - if !ok { - return - } + case cb := <-cs.callbacks.Get(): cs.callbacks.Load() - callback.(func(ctx context.Context))(ctx) + cb.(func(context.Context))(ctx) } } - // Fetch pending callbacks if any, and execute them before returning from - // this method and closing cs.done. - cs.closedMu.Lock() - cs.closed = true - backlog = cs.fetchPendingCallbacks() + // Close the buffer to prevent new callbacks from being added. cs.callbacks.Close() - cs.closedMu.Unlock() - for _, b := range backlog { - b(ctx) - } -} -func (cs *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { - var backlog []func(context.Context) - for { - select { - case b := <-cs.callbacks.Get(): - backlog = append(backlog, b.(func(context.Context))) - cs.callbacks.Load() - default: - return backlog - } + // Run all pending callbacks. + for cb := range cs.callbacks.Get() { + cs.callbacks.Load() + cb.(func(context.Context))(ctx) } } diff --git a/internal/idle/idle.go b/internal/idle/idle.go index 6c272476e5ef..fe49cb74c55a 100644 --- a/internal/idle/idle.go +++ b/internal/idle/idle.go @@ -26,8 +26,6 @@ import ( "sync" "sync/atomic" "time" - - "google.golang.org/grpc/grpclog" ) // For overriding in unit tests. @@ -39,27 +37,12 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { // and exit from idle mode. type Enforcer interface { ExitIdleMode() error - EnterIdleMode() error -} - -// Manager defines the functionality required to track RPC activity on a -// channel. -type Manager interface { - OnCallBegin() error - OnCallEnd() - Close() + EnterIdleMode() } -type noopManager struct{} - -func (noopManager) OnCallBegin() error { return nil } -func (noopManager) OnCallEnd() {} -func (noopManager) Close() {} - -// manager implements the Manager interface. It uses atomic operations to -// synchronize access to shared state and a mutex to guarantee mutual exclusion -// in a critical section. -type manager struct { +// Manager implements idleness detection and calls the configured Enforcer to +// enter/exit idle mode when appropriate. Must be created by NewManager. +type Manager struct { // State accessed atomically. lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed. activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. @@ -69,8 +52,7 @@ type manager struct { // Can be accessed without atomics or mutex since these are set at creation // time and read-only after that. enforcer Enforcer // Functionality provided by grpc.ClientConn. - timeout int64 // Idle timeout duration nanos stored as an int64. - logger grpclog.LoggerV2 + timeout time.Duration // idleMu is used to guarantee mutual exclusion in two scenarios: // - Opposing intentions: @@ -88,57 +70,48 @@ type manager struct { timer *time.Timer } -// ManagerOptions is a collection of options used by -// NewManager. -type ManagerOptions struct { - Enforcer Enforcer - Timeout time.Duration - Logger grpclog.LoggerV2 +// NewManager creates a new idleness manager implementation for the +// given idle timeout. It begins in idle mode. +func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { + return &Manager{ + enforcer: enforcer, + timeout: timeout, + actuallyIdle: true, + activeCallsCount: -math.MaxInt32, + } } -// NewManager creates a new idleness manager implementation for the -// given idle timeout. -func NewManager(opts ManagerOptions) Manager { - if opts.Timeout == 0 { - return noopManager{} +// resetIdleTimerLocked resets the idle timer to the given duration. Called +// when exiting idle mode or when the timer fires and we need to reset it. +func (m *Manager) resetIdleTimerLocked(d time.Duration) { + if m.isClosed() || m.timeout == 0 || m.actuallyIdle { + return } - m := &manager{ - enforcer: opts.Enforcer, - timeout: int64(opts.Timeout), - logger: opts.Logger, + // It is safe to ignore the return value from Reset() because this method is + // only ever called from the timer callback or when exiting idle mode. + if m.timer != nil { + m.timer.Stop() } - m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout) - return m + m.timer = timeAfterFunc(d, m.handleIdleTimeout) } -// resetIdleTimer resets the idle timer to the given duration. This method -// should only be called from the timer callback. -func (m *manager) resetIdleTimer(d time.Duration) { +func (m *Manager) resetIdleTimer(d time.Duration) { m.idleMu.Lock() defer m.idleMu.Unlock() - - if m.timer == nil { - // Only close sets timer to nil. We are done. - return - } - - // It is safe to ignore the return value from Reset() because this method is - // only ever called from the timer callback, which means the timer has - // already fired. - m.timer.Reset(d) + m.resetIdleTimerLocked(d) } // handleIdleTimeout is the timer callback that is invoked upon expiry of the // configured idle timeout. The channel is considered inactive if there are no // ongoing calls and no RPC activity since the last time the timer fired. -func (m *manager) handleIdleTimeout() { +func (m *Manager) handleIdleTimeout() { if m.isClosed() { return } if atomic.LoadInt32(&m.activeCallsCount) > 0 { - m.resetIdleTimer(time.Duration(m.timeout)) + m.resetIdleTimer(m.timeout) return } @@ -148,24 +121,12 @@ func (m *manager) handleIdleTimeout() { // Set the timer to fire after a duration of idle timeout, calculated // from the time the most recent RPC completed. atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) - m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano())) + m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout) return } - // This CAS operation is extremely likely to succeed given that there has - // been no activity since the last time we were here. Setting the - // activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the - // channel is either in idle mode or is trying to get there. - if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { - // This CAS operation can fail if an RPC started after we checked for - // activity at the top of this method, or one was ongoing from before - // the last time we were here. In both case, reset the timer and return. - m.resetIdleTimer(time.Duration(m.timeout)) - return - } - - // Now that we've set the active calls count to -math.MaxInt32, it's time to - // actually move to idle mode. + // Now that we've checked that there has been no activity, attempt to enter + // idle mode, which is very likely to succeed. if m.tryEnterIdleMode() { // Successfully entered idle mode. No timer needed until we exit idle. return @@ -174,8 +135,7 @@ func (m *manager) handleIdleTimeout() { // Failed to enter idle mode due to a concurrent RPC that kept the channel // active, or because of an error from the channel. Undo the attempt to // enter idle, and reset the timer to try again later. - atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) - m.resetIdleTimer(time.Duration(m.timeout)) + m.resetIdleTimer(m.timeout) } // tryEnterIdleMode instructs the channel to enter idle mode. But before @@ -185,36 +145,49 @@ func (m *manager) handleIdleTimeout() { // Return value indicates whether or not the channel moved to idle mode. // // Holds idleMu which ensures mutual exclusion with exitIdleMode. -func (m *manager) tryEnterIdleMode() bool { +func (m *Manager) tryEnterIdleMode() bool { + // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() + // that the channel is either in idle mode or is trying to get there. + if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { + // This CAS operation can fail if an RPC started after we checked for + // activity in the timer handler, or one was ongoing from before the + // last time the timer fired, or if a test is attempting to enter idle + // mode without checking. In all cases, abort going into idle mode. + return false + } + // N.B. if we fail to enter idle mode after this, we must re-add + // math.MaxInt32 to m.activeCallsCount. + m.idleMu.Lock() defer m.idleMu.Unlock() if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 { // We raced and lost to a new RPC. Very rare, but stop entering idle. + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) return false } if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { - // An very short RPC could have come in (and also finished) after we + // A very short RPC could have come in (and also finished) after we // checked for calls count and activity in handleIdleTimeout(), but // before the CAS operation. So, we need to check for activity again. + atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) return false } - // No new RPCs have come in since we last set the active calls count value - // -math.MaxInt32 in the timer callback. And since we have the lock, it is - // safe to enter idle mode now. - if err := m.enforcer.EnterIdleMode(); err != nil { - m.logger.Errorf("Failed to enter idle mode: %v", err) - return false - } - - // Successfully entered idle mode. + // No new RPCs have come in since we set the active calls count value to + // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode + // unconditionally now. + m.enforcer.EnterIdleMode() m.actuallyIdle = true return true } +func (m *Manager) EnterIdleModeForTesting() { + m.tryEnterIdleMode() +} + // OnCallBegin is invoked at the start of every RPC. -func (m *manager) OnCallBegin() error { +func (m *Manager) OnCallBegin() error { if m.isClosed() { return nil } @@ -227,7 +200,7 @@ func (m *manager) OnCallBegin() error { // Channel is either in idle mode or is in the process of moving to idle // mode. Attempt to exit idle mode to allow this RPC. - if err := m.exitIdleMode(); err != nil { + if err := m.ExitIdleMode(); err != nil { // Undo the increment to calls count, and return an error causing the // RPC to fail. atomic.AddInt32(&m.activeCallsCount, -1) @@ -238,28 +211,30 @@ func (m *manager) OnCallBegin() error { return nil } -// exitIdleMode instructs the channel to exit idle mode. -// -// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. -func (m *manager) exitIdleMode() error { +// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's +// internal state. +func (m *Manager) ExitIdleMode() error { + // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. m.idleMu.Lock() defer m.idleMu.Unlock() - if !m.actuallyIdle { - // This can happen in two scenarios: + if m.isClosed() || !m.actuallyIdle { + // This can happen in three scenarios: // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called // tryEnterIdleMode(). But before the latter could grab the lock, an RPC // came in and OnCallBegin() noticed that the calls count is negative. // - Channel is in idle mode, and multiple new RPCs come in at the same // time, all of them notice a negative calls count in OnCallBegin and get // here. The first one to get the lock would got the channel to exit idle. + // - Channel is not in idle mode, and the user calls Connect which calls + // m.ExitIdleMode. // - // Either way, nothing to do here. + // In any case, there is nothing to do here. return nil } if err := m.enforcer.ExitIdleMode(); err != nil { - return fmt.Errorf("channel failed to exit idle mode: %v", err) + return fmt.Errorf("failed to exit idle mode: %w", err) } // Undo the idle entry process. This also respects any new RPC attempts. @@ -267,12 +242,12 @@ func (m *manager) exitIdleMode() error { m.actuallyIdle = false // Start a new timer to fire after the configured idle timeout. - m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout) + m.resetIdleTimerLocked(m.timeout) return nil } // OnCallEnd is invoked at the end of every RPC. -func (m *manager) OnCallEnd() { +func (m *Manager) OnCallEnd() { if m.isClosed() { return } @@ -287,15 +262,17 @@ func (m *manager) OnCallEnd() { atomic.AddInt32(&m.activeCallsCount, -1) } -func (m *manager) isClosed() bool { +func (m *Manager) isClosed() bool { return atomic.LoadInt32(&m.closed) == 1 } -func (m *manager) Close() { +func (m *Manager) Close() { atomic.StoreInt32(&m.closed, 1) m.idleMu.Lock() - m.timer.Stop() - m.timer = nil + if m.timer != nil { + m.timer.Stop() + m.timer = nil + } m.idleMu.Unlock() } diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index d2cd9d3e3752..da98c09420dd 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -586,12 +586,8 @@ func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) { } defer cc.Close() - enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn) error) - enterIdleFunc := func() { - if err := enterIdle(cc); err != nil { - t.Errorf("Failed to enter idle mode: %v", err) - } - } + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + enterIdleFunc := func() { enterIdle(cc) } exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error) exitIdleFunc := func() { if err := exitIdle(cc); err != nil { diff --git a/internal/idle/idle_test.go b/internal/idle/idle_test.go index 22bde3ba1422..d0fc685d3908 100644 --- a/internal/idle/idle_test.go +++ b/internal/idle/idle_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/grpctest" ) @@ -55,10 +54,8 @@ func (ti *testEnforcer) ExitIdleMode() error { } -func (ti *testEnforcer) EnterIdleMode() error { +func (ti *testEnforcer) EnterIdleMode() { ti.enterIdleCh <- struct{}{} - return nil - } func newTestEnforcer() *testEnforcer { @@ -91,7 +88,7 @@ func overrideNewTimer(t *testing.T) <-chan struct{} { // TestManager_Disabled tests the case where the idleness manager is // disabled by passing an idle_timeout of 0. Verifies the following things: // - timer callback does not fire -// - an RPC does not trigger a call to ExitIdleMode on the ClientConn +// - an RPC triggers a call to ExitIdleMode on the ClientConn // - more calls to RPC termination (as compared to RPC initiation) does not // result in an error log func (s) TestManager_Disabled(t *testing.T) { @@ -100,7 +97,7 @@ func (s) TestManager_Disabled(t *testing.T) { // Create an idleness manager that is disabled because of idleTimeout being // set to `0`. enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(0), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(0)) // Ensure that the timer callback does not fire within a short deadline. select { @@ -109,13 +106,13 @@ func (s) TestManager_Disabled(t *testing.T) { case <-time.After(defaultTestShortTimeout): } - // The first invocation of OnCallBegin() would lead to a call to - // ExitIdleMode() on the enforcer, unless the idleness manager is disabled. - mgr.OnCallBegin() + // The first invocation of OnCallBegin() should lead to a call to + // ExitIdleMode() on the enforcer. + go mgr.OnCallBegin() select { case <-enforcer.exitIdleCh: - t.Fatalf("ExitIdleMode() called on enforcer when manager is disabled") case <-time.After(defaultTestShortTimeout): + t.Fatal("Timeout waiting for channel to move out of idle mode") } // If the number of calls to OnCallEnd() exceeds the number of calls to @@ -137,8 +134,9 @@ func (s) TestManager_Enabled_TimerFires(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() // Ensure that the timer callback fires within a appropriate amount of time. select { @@ -162,8 +160,9 @@ func (s) TestManager_Enabled_OngoingCall(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() // Fire up a goroutine that simulates an ongoing RPC that is terminated // after the timer callback fires for the first time. @@ -207,8 +206,9 @@ func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) { callbackCh := overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() // Fire up a goroutine that simulates unary RPCs until the timer callback // fires. @@ -233,6 +233,7 @@ func (s) TestManager_Enabled_ActiveSinceLastCheck(t *testing.T) { case <-callbackCh: close(timerFired) case <-time.After(2 * defaultTestIdleTimeout): + close(timerFired) t.Fatal("Timeout waiting for idle timer callback to fire") } select { @@ -257,9 +258,11 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { overrideNewTimer(t) enforcer := newTestEnforcer() - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(defaultTestIdleTimeout), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(defaultTestIdleTimeout)) defer mgr.Close() + mgr.ExitIdleMode() + <-enforcer.exitIdleCh // Ensure that the channel moves to idle since there are no RPCs. select { case <-enforcer.enterIdleCh: @@ -297,7 +300,7 @@ func (s) TestManager_Enabled_ExitIdleOnRPC(t *testing.T) { type racyState int32 const ( - stateInital racyState = iota + stateInitial racyState = iota stateEnteredIdle stateExitedIdle stateActiveRPCs @@ -306,12 +309,22 @@ const ( // racyIdlnessEnforcer is a test idleness enforcer used specifically to test the // race between idle timeout and incoming RPCs. type racyEnforcer struct { - state *racyState // Accessed atomically. + t *testing.T + state *racyState // Accessed atomically. + started bool } // ExitIdleMode sets the internal state to stateExitedIdle. We should only ever // exit idle when we are currently in idle. func (ri *racyEnforcer) ExitIdleMode() error { + // Set only on the initial ExitIdleMode + if ri.started == false { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateInitial)) { + return fmt.Errorf("idleness enforcer's first ExitIdleMode after EnterIdleMode") + } + ri.started = true + return nil + } if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateEnteredIdle), int32(stateExitedIdle)) { return fmt.Errorf("idleness enforcer asked to exit idle when it did not enter idle earlier") } @@ -319,46 +332,44 @@ func (ri *racyEnforcer) ExitIdleMode() error { } // EnterIdleMode attempts to set the internal state to stateEnteredIdle. We should only ever enter idle before RPCs start. -func (ri *racyEnforcer) EnterIdleMode() error { - if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInital), int32(stateEnteredIdle)) { - return fmt.Errorf("idleness enforcer asked to enter idle after rpcs started") +func (ri *racyEnforcer) EnterIdleMode() { + if !atomic.CompareAndSwapInt32((*int32)(ri.state), int32(stateInitial), int32(stateEnteredIdle)) { + ri.t.Errorf("idleness enforcer asked to enter idle after rpcs started") } - return nil } -// TestManager_IdleTimeoutRacesWithOnCallBegin tests the case where -// firing of the idle timeout races with an incoming RPC. The test verifies that -// if the timer callback win the race and puts the channel in idle, the RPCs can -// kick it out of idle. And if the RPCs win the race and keep the channel -// active, then the timer callback should not attempt to put the channel in idle -// mode. +// TestManager_IdleTimeoutRacesWithOnCallBegin tests the case where firing of +// the idle timeout races with an incoming RPC. The test verifies that if the +// timer callback wins the race and puts the channel in idle, the RPCs can kick +// it out of idle. And if the RPCs win the race and keep the channel active, +// then the timer callback should not attempt to put the channel in idle mode. func (s) TestManager_IdleTimeoutRacesWithOnCallBegin(t *testing.T) { // Run multiple iterations to simulate different possibilities. for i := 0; i < 20; i++ { t.Run(fmt.Sprintf("iteration=%d", i), func(t *testing.T) { var idlenessState racyState - enforcer := &racyEnforcer{state: &idlenessState} + enforcer := &racyEnforcer{t: t, state: &idlenessState} // Configure a large idle timeout so that we can control the // race between the timer callback and RPCs. - mgr := NewManager(ManagerOptions{Enforcer: enforcer, Timeout: time.Duration(10 * time.Minute), Logger: grpclog.Component("test")}) + mgr := NewManager(enforcer, time.Duration(10*time.Minute)) defer mgr.Close() + mgr.ExitIdleMode() var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - m := mgr.(interface{ handleIdleTimeout() }) - <-time.After(defaultTestIdleTimeout / 10) - m.handleIdleTimeout() + <-time.After(defaultTestIdleTimeout / 50) + mgr.handleIdleTimeout() }() - for j := 0; j < 100; j++ { + for j := 0; j < 5; j++ { wg.Add(1) go func() { defer wg.Done() // Wait for the configured idle timeout and simulate an RPC to // race with the idle timeout timer callback. - <-time.After(defaultTestIdleTimeout / 10) + <-time.After(defaultTestIdleTimeout / 50) if err := mgr.OnCallBegin(); err != nil { t.Errorf("OnCallBegin() failed: %v", err) } diff --git a/internal/internal.go b/internal/internal.go index f28791b89b0c..2549fe8e3b88 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -182,10 +182,12 @@ var ( GRPCResolverSchemeExtraMetadata string = "xds" // EnterIdleModeForTesting gets the ClientConn to enter IDLE mode. - EnterIdleModeForTesting any // func(*grpc.ClientConn) error + EnterIdleModeForTesting any // func(*grpc.ClientConn) // ExitIdleModeForTesting gets the ClientConn to exit IDLE mode. ExitIdleModeForTesting any // func(*grpc.ClientConn) error + + ChannelzTurnOffForTesting func() ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/picker_wrapper.go b/picker_wrapper.go index 236837f4157c..bf56faa76d3d 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -37,7 +37,6 @@ import ( type pickerWrapper struct { mu sync.Mutex done bool - idle bool blockingCh chan struct{} picker balancer.Picker statsHandlers []stats.Handler // to record blocking picker calls @@ -53,11 +52,7 @@ func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper { // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (pw *pickerWrapper) updatePicker(p balancer.Picker) { pw.mu.Lock() - if pw.done || pw.idle { - // There is a small window where a picker update from the LB policy can - // race with the channel going to idle mode. If the picker is idle here, - // it is because the channel asked it to do so, and therefore it is sage - // to ignore the update from the LB policy. + if pw.done { pw.mu.Unlock() return } @@ -210,23 +205,15 @@ func (pw *pickerWrapper) close() { close(pw.blockingCh) } -func (pw *pickerWrapper) enterIdleMode() { - pw.mu.Lock() - defer pw.mu.Unlock() - if pw.done { - return - } - pw.idle = true -} - -func (pw *pickerWrapper) exitIdleMode() { +// reset clears the pickerWrapper and prepares it for being used again when idle +// mode is exited. +func (pw *pickerWrapper) reset() { pw.mu.Lock() defer pw.mu.Unlock() if pw.done { return } pw.blockingCh = make(chan struct{}) - pw.idle = false } // dropError is a wrapper error that indicates the LB policy wishes to drop the diff --git a/resolver_balancer_ext_test.go b/resolver_balancer_ext_test.go new file mode 100644 index 000000000000..aebc6652fa4b --- /dev/null +++ b/resolver_balancer_ext_test.go @@ -0,0 +1,265 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc_test + +import ( + "context" + "errors" + "fmt" + "runtime" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" +) + +// TestResolverBalancerInteraction tests: +// 1. resolver.Builder.Build() -> +// 2. resolver.ClientConn.UpdateState() -> +// 3. balancer.Balancer.UpdateClientConnState() -> +// 4. balancer.ClientConn.ResolveNow() -> +// 5. resolver.Resolver.ResolveNow() -> +func (s) TestResolverBalancerInteraction(t *testing.T) { + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + fmt.Println(name) + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{}) + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + sc := cc.ParseServiceConfig(`{"loadBalancingConfig": [{"` + name + `":{}}]}`) + cc.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: "test"}}, + ServiceConfig: sc, + }) + } + rnCh := make(chan struct{}) + rb.ResolveNowCallback = func(resolver.ResolveNowOptions) { close(rnCh) } + resolver.Register(rb) + + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + select { + case <-rnCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timed out waiting for resolver.ResolveNow") + } +} + +type resolverBuilderWithErr struct { + resolver.Resolver + errCh <-chan error + scheme string +} + +func (b *resolverBuilderWithErr) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { + if err := <-b.errCh; err != nil { + return nil, err + } + return b, nil +} + +func (b *resolverBuilderWithErr) Scheme() string { + return b.scheme +} + +func (b *resolverBuilderWithErr) Close() {} + +// TestResolverBuildFailure tests: +// 1. resolver.Builder.Build() passes. +// 2. Channel enters idle mode. +// 3. An RPC happens. +// 4. resolver.Builder.Build() fails. +func (s) TestResolverBuildFailure(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + resErrCh := make(chan error, 1) + resolver.Register(&resolverBuilderWithErr{errCh: resErrCh, scheme: name}) + + resErrCh <- nil + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + enterIdle(cc) + const errStr = "test error from resolver builder" + t.Log("pushing res err") + resErrCh <- errors.New(errStr) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := cc.Invoke(ctx, "/a/b", nil, nil); err == nil || !strings.Contains(err.Error(), errStr) { + t.Fatalf("Invoke = %v; want %v", err, errStr) + } +} + +// TestEnterIdleDuringResolverUpdateState tests a scenario that used to deadlock +// while calling UpdateState at the same time as the resolver being closed while +// the channel enters idle mode. +func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + + // Create a manual resolver that spams UpdateState calls until it is closed. + rb := manual.NewBuilderWithScheme(name) + var cancel context.CancelFunc + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + var ctx context.Context + ctx, cancel = context.WithCancel(context.Background()) + go func() { + for ctx.Err() == nil { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + }() + } + rb.CloseCallback = func() { + cancel() + } + resolver.Register(rb) + + cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + // Start a timer so we panic out of the deadlock and can see all the + // stack traces to debug the problem. + p := time.AfterFunc(time.Second, func() { + buf := make([]byte, 8192) + buf = buf[0:runtime.Stack(buf, true)] + t.Error("Timed out waiting for enterIdle") + panic(fmt.Sprint("Stack trace:\n", string(buf))) + }) + enterIdle(cc) + p.Stop() + cc.Connect() + } +} + +// TestEnterIdleDuringBalancerUpdateState tests calling UpdateState at the same +// time as the balancer being closed while the channel enters idle mode. +func (s) TestEnterIdleDuringBalancerUpdateState(t *testing.T) { + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + + // Create a balancer that calls UpdateState once asynchronously, attempting + // to make the channel appear ready even after entering idle. + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + go func() { + bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready}) + }() + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + resolver.Register(rb) + + cc, err := grpc.Dial( + name+":///", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + enterIdle(cc) + if got, want := cc.GetState(), connectivity.Idle; got != want { + t.Fatalf("cc state = %v; want %v", got, want) + } + cc.Connect() + } +} + +// TestEnterIdleDuringBalancerNewSubConn tests calling NewSubConn at the same +// time as the balancer being closed while the channel enters idle mode. +func (s) TestEnterIdleDuringBalancerNewSubConn(t *testing.T) { + channelz.TurnOn() + defer internal.ChannelzTurnOffForTesting() + enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn)) + name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1) + + // Create a balancer that calls NewSubConn once asynchronously, attempting + // to create a subchannel after going idle. + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + go func() { + bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "test"}}, balancer.NewSubConnOptions{}) + }() + return nil + }, + } + stub.Register(name, bf) + + rb := manual.NewBuilderWithScheme(name) + rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) { + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}}) + } + resolver.Register(rb) + + cc, err := grpc.Dial( + name+":///", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`)) + if err != nil { + t.Fatalf("grpc.Dial error: %v", err) + } + defer cc.Close() + + // Enter/exit idle mode repeatedly. + for i := 0; i < 2000; i++ { + enterIdle(cc) + tcs, _ := channelz.GetTopChannels(0, 0) + if len(tcs) != 1 { + t.Fatalf("Found channels: %v; expected 1 entry", tcs) + } + if len(tcs[0].SubChans) != 0 { + t.Fatalf("Found subchannels: %v; expected 0 entries", tcs[0].SubChans) + } + cc.Connect() + } +} diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go deleted file mode 100644 index ce03600090b0..000000000000 --- a/resolver_conn_wrapper.go +++ /dev/null @@ -1,227 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package grpc - -import ( - "context" - "strings" - "sync" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/internal/channelz" - "google.golang.org/grpc/internal/grpcsync" - "google.golang.org/grpc/internal/pretty" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/serviceconfig" -) - -// resolverStateUpdater wraps the single method used by ccResolverWrapper to -// report a state update from the actual resolver implementation. -type resolverStateUpdater interface { - updateResolverState(s resolver.State, err error) error -} - -// ccResolverWrapper is a wrapper on top of cc for resolvers. -// It implements resolver.ClientConn interface. -type ccResolverWrapper struct { - // The following fields are initialized when the wrapper is created and are - // read-only afterwards, and therefore can be accessed without a mutex. - cc resolverStateUpdater - channelzID *channelz.Identifier - ignoreServiceConfig bool - opts ccResolverWrapperOpts - serializer *grpcsync.CallbackSerializer // To serialize all incoming calls. - serializerCancel context.CancelFunc // To close the serializer, accessed only from close(). - - // All incoming (resolver --> gRPC) calls are guaranteed to execute in a - // mutually exclusive manner as they are scheduled on the serializer. - // Fields accessed *only* in these serializer callbacks, can therefore be - // accessed without a mutex. - curState resolver.State - - // mu guards access to the below fields. - mu sync.Mutex - closed bool - resolver resolver.Resolver // Accessed only from outgoing calls. -} - -// ccResolverWrapperOpts wraps the arguments to be passed when creating a new -// ccResolverWrapper. -type ccResolverWrapperOpts struct { - target resolver.Target // User specified dial target to resolve. - builder resolver.Builder // Resolver builder to use. - bOpts resolver.BuildOptions // Resolver build options to use. - channelzID *channelz.Identifier // Channelz identifier for the channel. -} - -// newCCResolverWrapper uses the resolver.Builder to build a Resolver and -// returns a ccResolverWrapper object which wraps the newly built resolver. -func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) { - ctx, cancel := context.WithCancel(context.Background()) - ccr := &ccResolverWrapper{ - cc: cc, - channelzID: opts.channelzID, - ignoreServiceConfig: opts.bOpts.DisableServiceConfig, - opts: opts, - serializer: grpcsync.NewCallbackSerializer(ctx), - serializerCancel: cancel, - } - - // Cannot hold the lock at build time because the resolver can send an - // update or error inline and these incoming calls grab the lock to schedule - // a callback in the serializer. - r, err := opts.builder.Build(opts.target, ccr, opts.bOpts) - if err != nil { - cancel() - return nil, err - } - - // Any error reported by the resolver at build time that leads to a - // re-resolution request from the balancer is dropped by grpc until we - // return from this function. So, we don't have to handle pending resolveNow - // requests here. - ccr.mu.Lock() - ccr.resolver = r - ccr.mu.Unlock() - - return ccr, nil -} - -func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { - ccr.mu.Lock() - defer ccr.mu.Unlock() - - // ccr.resolver field is set only after the call to Build() returns. But in - // the process of building, the resolver may send an error update which when - // propagated to the balancer may result in a re-resolution request. - if ccr.closed || ccr.resolver == nil { - return - } - ccr.resolver.ResolveNow(o) -} - -func (ccr *ccResolverWrapper) close() { - ccr.mu.Lock() - if ccr.closed { - ccr.mu.Unlock() - return - } - - channelz.Info(logger, ccr.channelzID, "Closing the name resolver") - - // Close the serializer to ensure that no more calls from the resolver are - // handled, before actually closing the resolver. - ccr.serializerCancel() - ccr.closed = true - r := ccr.resolver - ccr.mu.Unlock() - - // Give enqueued callbacks a chance to finish. - <-ccr.serializer.Done() - - // Spawn a goroutine to close the resolver (since it may block trying to - // cleanup all allocated resources) and return early. - go r.Close() -} - -// serializerScheduleLocked is a convenience method to schedule a function to be -// run on the serializer while holding ccr.mu. -func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) { - ccr.mu.Lock() - ccr.serializer.Schedule(f) - ccr.mu.Unlock() -} - -// UpdateState is called by resolver implementations to report new state to gRPC -// which includes addresses and service config. -func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { - errCh := make(chan error, 1) - if s.Endpoints == nil { - s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) - for _, a := range s.Addresses { - ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes} - ep.Addresses[0].BalancerAttributes = nil - s.Endpoints = append(s.Endpoints, ep) - } - } - ok := ccr.serializer.Schedule(func(context.Context) { - ccr.addChannelzTraceEvent(s) - ccr.curState = s - if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { - errCh <- balancer.ErrBadResolverState - return - } - errCh <- nil - }) - if !ok { - // The only time when Schedule() fail to add the callback to the - // serializer is when the serializer is closed, and this happens only - // when the resolver wrapper is closed. - return nil - } - return <-errCh -} - -// ReportError is called by resolver implementations to report errors -// encountered during name resolution to gRPC. -func (ccr *ccResolverWrapper) ReportError(err error) { - ccr.serializerScheduleLocked(func(_ context.Context) { - channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) - ccr.cc.updateResolverState(resolver.State{}, err) - }) -} - -// NewAddress is called by the resolver implementation to send addresses to -// gRPC. -func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { - ccr.serializerScheduleLocked(func(_ context.Context) { - ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) - ccr.curState.Addresses = addrs - ccr.cc.updateResolverState(ccr.curState, nil) - }) -} - -// ParseServiceConfig is called by resolver implementations to parse a JSON -// representation of the service config. -func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult { - return parseServiceConfig(scJSON) -} - -// addChannelzTraceEvent adds a channelz trace event containing the new -// state received from resolver implementations. -func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { - var updates []string - var oldSC, newSC *ServiceConfig - var oldOK, newOK bool - if ccr.curState.ServiceConfig != nil { - oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig) - } - if s.ServiceConfig != nil { - newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig) - } - if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) { - updates = append(updates, "service config updated") - } - if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 { - updates = append(updates, "resolver returned an empty address list") - } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { - updates = append(updates, "resolver returned new addresses") - } - channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) -} diff --git a/resolver_wrapper.go b/resolver_wrapper.go new file mode 100644 index 000000000000..c79bab12149f --- /dev/null +++ b/resolver_wrapper.go @@ -0,0 +1,197 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "context" + "strings" + "sync" + + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" +) + +// ccResolverWrapper is a wrapper on top of cc for resolvers. +// It implements resolver.ClientConn interface. +type ccResolverWrapper struct { + // The following fields are initialized when the wrapper is created and are + // read-only afterwards, and therefore can be accessed without a mutex. + cc *ClientConn + ignoreServiceConfig bool + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + + resolver resolver.Resolver // only accessed within the serializer + + // The following fields are protected by mu. Caller must take cc.mu before + // taking mu. + mu sync.Mutex + curState resolver.State + closed bool +} + +// newCCResolverWrapper initializes the ccResolverWrapper. It can only be used +// after calling start, which builds the resolver. +func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper { + ctx, cancel := context.WithCancel(cc.ctx) + return &ccResolverWrapper{ + cc: cc, + ignoreServiceConfig: cc.dopts.disableServiceConfig, + serializer: grpcsync.NewCallbackSerializer(ctx), + serializerCancel: cancel, + } +} + +// start builds the name resolver using the resolver.Builder in cc and returns +// any error encountered. It must always be the first operation performed on +// any newly created ccResolverWrapper, except that close may be called instead. +func (ccr *ccResolverWrapper) start() error { + errCh := make(chan error) + ccr.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil { + return + } + opts := resolver.BuildOptions{ + DisableServiceConfig: ccr.cc.dopts.disableServiceConfig, + DialCreds: ccr.cc.dopts.copts.TransportCredentials, + CredsBundle: ccr.cc.dopts.copts.CredsBundle, + Dialer: ccr.cc.dopts.copts.Dialer, + } + var err error + ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts) + errCh <- err + }) + return <-errCh +} + +func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { + ccr.serializer.Schedule(func(ctx context.Context) { + if ctx.Err() != nil || ccr.resolver == nil { + return + } + ccr.resolver.ResolveNow(o) + }) +} + +// close initiates async shutdown of the wrapper. To determine the wrapper has +// finished shutting down, the channel should block on ccr.serializer.Done() +// without cc.mu held. +func (ccr *ccResolverWrapper) close() { + channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver") + ccr.mu.Lock() + ccr.closed = true + ccr.mu.Unlock() + + ccr.serializer.Schedule(func(context.Context) { + if ccr.resolver == nil { + return + } + ccr.resolver.Close() + ccr.resolver = nil + }) + ccr.serializerCancel() +} + +// UpdateState is called by resolver implementations to report new state to gRPC +// which includes addresses and service config. +func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { + ccr.cc.mu.Lock() + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + ccr.cc.mu.Unlock() + return nil + } + if s.Endpoints == nil { + s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) + for _, a := range s.Addresses { + ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes} + ep.Addresses[0].BalancerAttributes = nil + s.Endpoints = append(s.Endpoints, ep) + } + } + ccr.addChannelzTraceEvent(s) + ccr.curState = s + ccr.mu.Unlock() + return ccr.cc.updateResolverStateAndUnlock(s, nil) +} + +// ReportError is called by resolver implementations to report errors +// encountered during name resolution to gRPC. +func (ccr *ccResolverWrapper) ReportError(err error) { + ccr.cc.mu.Lock() + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + ccr.cc.mu.Unlock() + return + } + ccr.mu.Unlock() + channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) + ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err) +} + +// NewAddress is called by the resolver implementation to send addresses to +// gRPC. +func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { + ccr.cc.mu.Lock() + ccr.mu.Lock() + if ccr.closed { + ccr.mu.Unlock() + ccr.cc.mu.Unlock() + return + } + s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig} + ccr.addChannelzTraceEvent(s) + ccr.curState = s + ccr.mu.Unlock() + ccr.cc.updateResolverStateAndUnlock(s, nil) +} + +// ParseServiceConfig is called by resolver implementations to parse a JSON +// representation of the service config. +func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult { + return parseServiceConfig(scJSON) +} + +// addChannelzTraceEvent adds a channelz trace event containing the new +// state received from resolver implementations. +func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { + var updates []string + var oldSC, newSC *ServiceConfig + var oldOK, newOK bool + if ccr.curState.ServiceConfig != nil { + oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig) + } + if s.ServiceConfig != nil { + newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig) + } + if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) { + updates = append(updates, "service config updated") + } + if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 { + updates = append(updates, "resolver returned an empty address list") + } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { + updates = append(updates, "resolver returned new addresses") + } + channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) +} diff --git a/test/end2end_test.go b/test/end2end_test.go index 184940ec209c..97a7f1812553 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -505,7 +505,6 @@ type test struct { clientNopCompression bool unaryClientInt grpc.UnaryClientInterceptor streamClientInt grpc.StreamClientInterceptor - sc <-chan grpc.ServiceConfig clientInitialWindowSize int32 clientInitialConnWindowSize int32 perRPCCreds credentials.PerRPCCredentials @@ -760,10 +759,6 @@ func (d *nopDecompressor) Type() string { func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) { opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent)) - if te.sc != nil { - opts = append(opts, grpc.WithServiceConfig(te.sc)) - } - if te.clientCompression { opts = append(opts, grpc.WithCompressor(grpc.NewGZIPCompressor()), @@ -1103,20 +1098,10 @@ func testServiceConfigSetup(t *testing.T, e env) *test { return te } -func newBool(b bool) (a *bool) { - return &b -} - func newInt(b int) (a *int) { return &b } -func newDuration(b time.Duration) (a *time.Duration) { - a = new(time.Duration) - *a = b - return -} - func (s) TestGetMethodConfig(t *testing.T) { te := testServiceConfigSetup(t, tcpClearRREnv) defer te.tearDown() diff --git a/test/service_config_deprecated_test.go b/test/service_config_deprecated_test.go deleted file mode 100644 index a1fd44d853d9..000000000000 --- a/test/service_config_deprecated_test.go +++ /dev/null @@ -1,463 +0,0 @@ -/* - * - * Copyright 2023 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package test - -import ( - "context" - "testing" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - testgrpc "google.golang.org/grpc/interop/grpc_testing" - testpb "google.golang.org/grpc/interop/grpc_testing" -) - -// The following functions with function name ending with TD indicates that they -// should be deleted after old service config API is deprecated and deleted. -func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) { - te := newTest(t, e) - // We write before read. - ch := make(chan grpc.ServiceConfig, 1) - te.sc = ch - te.userAgent = testAppUA - te.declareLogNoise( - "transport: http2Client.notifyError got notified that the client transport was broken EOF", - "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", - "grpc: addrConn.resetTransport failed to create client transport: connection error", - "Failed to dial : context canceled; please retry.", - ) - return te, ch -} - -func (s) TestServiceConfigGetMethodConfigTD(t *testing.T) { - for _, e := range listTestEnv() { - testGetMethodConfigTD(t, e) - } -} - -func testGetMethodConfigTD(t *testing.T, e env) { - te, ch := testServiceConfigSetupTD(t, e) - defer te.tearDown() - - mc1 := grpc.MethodConfig{ - WaitForReady: newBool(true), - Timeout: newDuration(time.Millisecond), - } - mc2 := grpc.MethodConfig{WaitForReady: newBool(false)} - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc1 - m["/grpc.testing.TestService/"] = mc2 - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - cc := te.clientConn() - tc := testgrpc.NewTestServiceClient(cc) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/UnaryCall"] = mc1 - m["/grpc.testing.TestService/"] = mc2 - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - // Wait for the new service config to propagate. - for { - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - break - } - } - // The following RPCs are expected to become fail-fast. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) - } -} - -func (s) TestServiceConfigWaitForReadyTD(t *testing.T) { - for _, e := range listTestEnv() { - testServiceConfigWaitForReadyTD(t, e) - } -} - -func testServiceConfigWaitForReadyTD(t *testing.T, e env) { - te, ch := testServiceConfigSetupTD(t, e) - defer te.tearDown() - - // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. - mc := grpc.MethodConfig{ - WaitForReady: newBool(false), - Timeout: newDuration(time.Millisecond), - } - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - cc := te.clientConn() - tc := testgrpc.NewTestServiceClient(cc) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) - } - - // Generate a service config update. - // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. - mc.WaitForReady = newBool(true) - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - // Wait for the new service config to take effect. - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { - mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") - if *mc.WaitForReady { - break - } - } - if ctx.Err() != nil { - t.Fatalf("Timeout when waiting for service config to take effect") - } - - // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - if _, err := tc.FullDuplexCall(ctx); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) - } -} - -func (s) TestServiceConfigTimeoutTD(t *testing.T) { - for _, e := range listTestEnv() { - testServiceConfigTimeoutTD(t, e) - } -} - -func testServiceConfigTimeoutTD(t *testing.T, e env) { - te, ch := testServiceConfigSetupTD(t, e) - defer te.tearDown() - - // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout - // to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will - // wait until deadline exceeds. - mc := grpc.MethodConfig{ - Timeout: newDuration(time.Hour), - } - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - cc := te.clientConn() - tc := testgrpc.NewTestServiceClient(cc) - // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. - ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) - } - cancel() - ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) - } - cancel() - - // Generate a service config update. - // Case2: Client API sets timeout to be the default and ServiceConfig sets - // timeout to be 1ns. Timeout should be 1ns (min of 1ns and the default) - // and the rpc will wait until deadline exceeds. - mc.Timeout = newDuration(time.Nanosecond) - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - - // Wait for the new service config to take effect. - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - for ; ctx.Err() == nil; <-time.After(time.Millisecond) { - mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") - if *mc.Timeout == time.Nanosecond { - break - } - } - if ctx.Err() != nil { - t.Fatalf("Timeout when waiting for service config to take effect") - } - - ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded || ctx.Err() != nil { - t.Fatalf("TestService/EmptyCall(_, _) = _, %v and ctx.Err() = %v; want _, %s and ctx.Err() = nil", err, ctx.Err(), codes.DeadlineExceeded) - } - - if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded || ctx.Err() != nil { - t.Fatalf("TestService/FullDuplexCall(_) = _, %v and ctx.Err() = %v; want _, %s and ctx.Err() = nil", err, ctx.Err(), codes.DeadlineExceeded) - } -} - -func (s) TestServiceConfigMaxMsgSizeTD(t *testing.T) { - for _, e := range listTestEnv() { - testServiceConfigMaxMsgSizeTD(t, e) - } -} - -func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { - // Setting up values and objects shared across all test cases. - const smallSize = 1 - const largeSize = 1024 - const extraLargeSize = 2048 - - smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) - if err != nil { - t.Fatal(err) - } - largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) - if err != nil { - t.Fatal(err) - } - extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) - if err != nil { - t.Fatal(err) - } - - mc := grpc.MethodConfig{ - MaxReqSize: newInt(extraLargeSize), - MaxRespSize: newInt(extraLargeSize), - } - - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/UnaryCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te1, ch1 := testServiceConfigSetupTD(t, e) - te1.startServer(&testServer{security: e.security}) - defer te1.tearDown() - - ch1 <- sc - tc := testgrpc.NewTestServiceClient(te1.clientConn()) - - req := &testpb.SimpleRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseSize: int32(extraLargeSize), - Payload: smallPayload, - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // Test for unary RPC recv. - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for unary RPC send. - req.Payload = extraLargePayload - req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for streaming RPC recv. - respParam := []*testpb.ResponseParameters{ - { - Size: int32(extraLargeSize), - }, - } - sreq := &testpb.StreamingOutputCallRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseParameters: respParam, - Payload: smallPayload, - } - stream, err := tc.FullDuplexCall(te1.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) - } - - // Test for streaming RPC send. - respParam[0].Size = int32(smallSize) - sreq.Payload = extraLargePayload - stream, err = tc.FullDuplexCall(te1.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) - } - - // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te2, ch2 := testServiceConfigSetupTD(t, e) - te2.maxClientReceiveMsgSize = newInt(1024) - te2.maxClientSendMsgSize = newInt(1024) - te2.startServer(&testServer{security: e.security}) - defer te2.tearDown() - ch2 <- sc - tc = testgrpc.NewTestServiceClient(te2.clientConn()) - - // Test for unary RPC recv. - req.Payload = smallPayload - req.ResponseSize = int32(largeSize) - - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for unary RPC send. - req.Payload = largePayload - req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for streaming RPC recv. - stream, err = tc.FullDuplexCall(te2.ctx) - respParam[0].Size = int32(largeSize) - sreq.Payload = smallPayload - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) - } - - // Test for streaming RPC send. - respParam[0].Size = int32(smallSize) - sreq.Payload = largePayload - stream, err = tc.FullDuplexCall(te2.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) - } - - // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te3, ch3 := testServiceConfigSetupTD(t, e) - te3.maxClientReceiveMsgSize = newInt(4096) - te3.maxClientSendMsgSize = newInt(4096) - te3.startServer(&testServer{security: e.security}) - defer te3.tearDown() - ch3 <- sc - tc = testgrpc.NewTestServiceClient(te3.clientConn()) - - // Test for unary RPC recv. - req.Payload = smallPayload - req.ResponseSize = int32(largeSize) - - if _, err := tc.UnaryCall(ctx, req); err != nil { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) - } - - req.ResponseSize = int32(extraLargeSize) - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for unary RPC send. - req.Payload = largePayload - req.ResponseSize = int32(smallSize) - if _, err := tc.UnaryCall(ctx, req); err != nil { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) - } - - req.Payload = extraLargePayload - if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) - } - - // Test for streaming RPC recv. - stream, err = tc.FullDuplexCall(te3.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - respParam[0].Size = int32(largeSize) - sreq.Payload = smallPayload - - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err != nil { - t.Fatalf("%v.Recv() = _, %v, want ", stream, err) - } - - respParam[0].Size = int32(extraLargeSize) - - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) - } - - // Test for streaming RPC send. - respParam[0].Size = int32(smallSize) - sreq.Payload = largePayload - stream, err = tc.FullDuplexCall(te3.ctx) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - if err := stream.Send(sreq); err != nil { - t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) - } - sreq.Payload = extraLargePayload - if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted { - t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) - } -}