Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idle: move idleness manager to separate package and ~13s of tests into it #6566

Merged
merged 8 commits into from
Aug 23, 2023
6 changes: 1 addition & 5 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,11 +1030,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
// the test would fail. Waiting for the channel to become READY here
// ensures that the test does not flake because of this rare sequence of
// events.
for s := cc.GetState(); s != connectivity.Ready; s = cc.GetState() {
if !cc.WaitForStateChange(ctx, s) {
t.Fatal("Timeout when waiting for connectivity state to reach READY")
}
}
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Cache the state changes seen up to this point.
states0 := ccWrapper.getStates()
Expand Down
4 changes: 2 additions & 2 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
if err := cc.idlenessMgr.onCallBegin(); err != nil {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return err
}
defer cc.idlenessMgr.onCallEnd()
defer cc.idlenessMgr.OnCallEnd()

// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
Expand Down
17 changes: 14 additions & 3 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -266,7 +267,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// Configure idleness support with configured idle timeout or default idle
// timeout duration. Idleness can be explicitly disabled by the user, by
// setting the dial option to 0.
cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout)
cc.idlenessMgr = idle.NewIdlenessManager(idle.IdlenessManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger})

// Return early for non-blocking dials.
if !cc.dopts.block {
Expand Down Expand Up @@ -317,6 +318,16 @@ func (cc *ClientConn) addTraceEvent(msg string) {
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
}

type idler ClientConn

func (i *idler) EnterIdleMode() error {
return (*ClientConn)(i).enterIdleMode()
}

func (i *idler) ExitIdleMode() error {
return (*ClientConn)(i).exitIdleMode()
}

// exitIdleMode moves the channel out of idle mode by recreating the name
// resolver and load balancer.
func (cc *ClientConn) exitIdleMode() error {
Expand Down Expand Up @@ -639,7 +650,7 @@ type ClientConn struct {
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
idlenessMgr idlenessManager
idlenessMgr idle.IdlenessManager

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand Down Expand Up @@ -1268,7 +1279,7 @@ func (cc *ClientConn) Close() error {
rWrapper.close()
}
if idlenessMgr != nil {
idlenessMgr.close()
idlenessMgr.Close()
}

for ac := range conns {
Expand Down
86 changes: 50 additions & 36 deletions idle.go → internal/idle/idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,45 @@
*
*/

package grpc
// Package idle contains a component for managing idleness (entering and exiting)
// based on RPC activity.
package idle

import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/grpclog"
)

// For overriding in unit tests.
var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
return time.AfterFunc(d, f)
}

// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter
// IdlenessEnforcer is the functionality provided by grpc.ClientConn to enter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all of these names not include the Idleness prefix?

I'm ok with names being idle.Enforcer, idle.Manager, and noopManager, managerImpl, idle.ManagerOptions and idle.NewManager().

If you feel that is not descriptive enough, we can rename the package idleness and then these names would be idleness.Enforcer and so on.

Wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'm fine with all these naming changes. I'll stick with idle as the package name.

// and exit from idle mode.
type idlenessEnforcer interface {
exitIdleMode() error
enterIdleMode() error
type IdlenessEnforcer interface {
ExitIdleMode() error
EnterIdleMode() error
}

// idlenessManager defines the functionality required to track RPC activity on a
// IdlenessManager defines the functionality required to track RPC activity on a
// channel.
type idlenessManager interface {
onCallBegin() error
onCallEnd()
close()
type IdlenessManager interface {
OnCallBegin() error
OnCallEnd()
Close()
}

type noopIdlenessManager struct{}

func (noopIdlenessManager) onCallBegin() error { return nil }
func (noopIdlenessManager) onCallEnd() {}
func (noopIdlenessManager) close() {}
func (noopIdlenessManager) OnCallBegin() error { return nil }
func (noopIdlenessManager) OnCallEnd() {}
func (noopIdlenessManager) Close() {}

// idlenessManagerImpl implements the idlenessManager interface. It uses atomic
// operations to synchronize access to shared state and a mutex to guarantee
Expand All @@ -64,14 +68,15 @@ type idlenessManagerImpl struct {

// Can be accessed without atomics or mutex since these are set at creation
// time and read-only after that.
enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
enforcer IdlenessEnforcer // Functionality provided by grpc.ClientConn.
timeout int64 // Idle timeout duration nanos stored as an int64.
logger grpclog.LoggerV2

// idleMu is used to guarantee mutual exclusion in two scenarios:
// - Opposing intentions:
// - a: Idle timeout has fired and handleIdleTimeout() is trying to put
// the channel in idle mode because the channel has been inactive.
// - b: At the same time an RPC is made on the channel, and onCallBegin()
// - b: At the same time an RPC is made on the channel, and OnCallBegin()
// is trying to prevent the channel from going idle.
// - Competing intentions:
// - The channel is in idle mode and there are multiple RPCs starting at
Expand All @@ -83,18 +88,27 @@ type idlenessManagerImpl struct {
timer *time.Timer
}

// newIdlenessManager creates a new idleness manager implementation for the
// IdlenessManagerOptions is a collection of options used by
// NewIdlenessManager.
type IdlenessManagerOptions struct {
Enforcer IdlenessEnforcer
Timeout time.Duration
Logger grpclog.LoggerV2
}

// NewIdlenessManager creates a new idleness manager implementation for the
// given idle timeout.
func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager {
if idleTimeout == 0 {
func NewIdlenessManager(opts IdlenessManagerOptions) IdlenessManager {
if opts.Timeout == 0 {
return noopIdlenessManager{}
}

i := &idlenessManagerImpl{
enforcer: enforcer,
timeout: int64(idleTimeout),
enforcer: opts.Enforcer,
timeout: int64(opts.Timeout),
logger: opts.Logger,
}
i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout)
i.timer = timeAfterFunc(opts.Timeout, i.handleIdleTimeout)
return i
}

Expand Down Expand Up @@ -140,7 +154,7 @@ func (i *idlenessManagerImpl) handleIdleTimeout() {

// This CAS operation is extremely likely to succeed given that there has
// been no activity since the last time we were here. Setting the
// activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the
// activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the
// channel is either in idle mode or is trying to get there.
if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) {
// This CAS operation can fail if an RPC started after we checked for
Expand Down Expand Up @@ -170,7 +184,7 @@ func (i *idlenessManagerImpl) handleIdleTimeout() {
//
// Return value indicates whether or not the channel moved to idle mode.
//
// Holds idleMu which ensures mutual exclusion with exitIdleMode.
// Holds idleMu which ensures mutual exclusion with ExitIdleMode.
func (i *idlenessManagerImpl) tryEnterIdleMode() bool {
i.idleMu.Lock()
defer i.idleMu.Unlock()
Expand All @@ -189,8 +203,8 @@ func (i *idlenessManagerImpl) tryEnterIdleMode() bool {
// No new RPCs have come in since we last set the active calls count value
// -math.MaxInt32 in the timer callback. And since we have the lock, it is
// safe to enter idle mode now.
if err := i.enforcer.enterIdleMode(); err != nil {
logger.Errorf("Failed to enter idle mode: %v", err)
if err := i.enforcer.EnterIdleMode(); err != nil {
i.logger.Errorf("Failed to enter idle mode: %v", err)
return false
}

Expand All @@ -199,8 +213,8 @@ func (i *idlenessManagerImpl) tryEnterIdleMode() bool {
return true
}

// onCallBegin is invoked at the start of every RPC.
func (i *idlenessManagerImpl) onCallBegin() error {
// OnCallBegin is invoked at the start of every RPC.
func (i *idlenessManagerImpl) OnCallBegin() error {
if i.isClosed() {
return nil
}
Expand All @@ -213,7 +227,7 @@ func (i *idlenessManagerImpl) onCallBegin() error {

// Channel is either in idle mode or is in the process of moving to idle
// mode. Attempt to exit idle mode to allow this RPC.
if err := i.exitIdleMode(); err != nil {
if err := i.ExitIdleMode(); err != nil {
// Undo the increment to calls count, and return an error causing the
// RPC to fail.
atomic.AddInt32(&i.activeCallsCount, -1)
Expand All @@ -224,27 +238,27 @@ func (i *idlenessManagerImpl) onCallBegin() error {
return nil
}

// exitIdleMode instructs the channel to exit idle mode.
// ExitIdleMode instructs the channel to exit idle mode.
//
// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
func (i *idlenessManagerImpl) exitIdleMode() error {
func (i *idlenessManagerImpl) ExitIdleMode() error {
i.idleMu.Lock()
defer i.idleMu.Unlock()

if !i.actuallyIdle {
// This can happen in two scenarios:
// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
// tryEnterIdleMode(). But before the latter could grab the lock, an RPC
// came in and onCallBegin() noticed that the calls count is negative.
// came in and OnCallBegin() noticed that the calls count is negative.
// - Channel is in idle mode, and multiple new RPCs come in at the same
// time, all of them notice a negative calls count in onCallBegin and get
// time, all of them notice a negative calls count in OnCallBegin and get
// here. The first one to get the lock would got the channel to exit idle.
//
// Either way, nothing to do here.
return nil
}

if err := i.enforcer.exitIdleMode(); err != nil {
if err := i.enforcer.ExitIdleMode(); err != nil {
return fmt.Errorf("channel failed to exit idle mode: %v", err)
}

Expand All @@ -257,8 +271,8 @@ func (i *idlenessManagerImpl) exitIdleMode() error {
return nil
}

// onCallEnd is invoked at the end of every RPC.
func (i *idlenessManagerImpl) onCallEnd() {
// OnCallEnd is invoked at the end of every RPC.
func (i *idlenessManagerImpl) OnCallEnd() {
if i.isClosed() {
return
}
Expand All @@ -277,7 +291,7 @@ func (i *idlenessManagerImpl) isClosed() bool {
return atomic.LoadInt32(&i.closed) == 1
}

func (i *idlenessManagerImpl) close() {
func (i *idlenessManagerImpl) Close() {
atomic.StoreInt32(&i.closed, 1)

i.idleMu.Lock()
Expand Down
Loading