Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idle: move idleness manager to separate package and ~13s of tests into it #6566

Merged
merged 8 commits into from
Aug 23, 2023
6 changes: 1 addition & 5 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,11 +1030,7 @@ func (s) TestUpdateStatePauses(t *testing.T) {
// the test would fail. Waiting for the channel to become READY here
// ensures that the test does not flake because of this rare sequence of
// events.
for s := cc.GetState(); s != connectivity.Ready; s = cc.GetState() {
if !cc.WaitForStateChange(ctx, s) {
t.Fatal("Timeout when waiting for connectivity state to reach READY")
}
}
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Cache the state changes seen up to this point.
states0 := ccWrapper.getStates()
Expand Down
4 changes: 2 additions & 2 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
if err := cc.idlenessMgr.onCallBegin(); err != nil {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return err
}
defer cc.idlenessMgr.onCallEnd()
defer cc.idlenessMgr.OnCallEnd()

// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
Expand Down
2 changes: 0 additions & 2 deletions channelz/service/service_sktopt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ func protoToSocketOption(skopts []*channelzpb.SocketOption) *channelz.SocketOpti
}

func (s) TestGetSocketOptions(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
socketOptions: &channelz.SocketOptionData{
Expand Down
24 changes: 3 additions & 21 deletions channelz/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,6 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func cleanupWrapper(cleanup func() error, t *testing.T) {
if err := cleanup(); err != nil {
t.Error(err)
}
}

type protoToSocketOptFunc func([]*channelzpb.SocketOption) *channelz.SocketOptionData

// protoToSocketOpt is used in function socketProtoToStruct to extract socket option
Expand Down Expand Up @@ -311,8 +305,7 @@ func (s) TestGetTopChannels(t *testing.T) {
},
{},
}
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

for _, c := range tcs {
id := channelz.RegisterChannel(c, nil, "")
defer channelz.RemoveEntry(id)
Expand Down Expand Up @@ -364,8 +357,7 @@ func (s) TestGetServers(t *testing.T) {
lastCallStartedTimestamp: time.Now().UTC(),
},
}
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

for _, s := range ss {
id := channelz.RegisterServer(s, "")
defer channelz.RemoveEntry(id)
Expand Down Expand Up @@ -397,8 +389,6 @@ func (s) TestGetServers(t *testing.T) {
}

func (s) TestGetServerSockets(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
Expand Down Expand Up @@ -438,8 +428,6 @@ func (s) TestGetServerSockets(t *testing.T) {
// This test makes a GetServerSockets with a non-zero start ID, and expect only
// sockets with ID >= the given start ID.
func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
Expand Down Expand Up @@ -470,9 +458,6 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
}

func (s) TestGetChannel(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
Expand Down Expand Up @@ -584,8 +569,7 @@ func (s) TestGetSubChannel(t *testing.T) {
subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)
subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0")
)
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)

refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
Expand Down Expand Up @@ -662,8 +646,6 @@ func (s) TestGetSubChannel(t *testing.T) {
}

func (s) TestGetSocket(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
ss := []*dummySocket{
{
streamsStarted: 10,
Expand Down
17 changes: 14 additions & 3 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -266,7 +267,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// Configure idleness support with configured idle timeout or default idle
// timeout duration. Idleness can be explicitly disabled by the user, by
// setting the dial option to 0.
cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout)
cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why you find this single line literal struct initialization more readable than the more common multiline one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK personally I find the parameters passed in here are really obvious and uninteresting, and I'd rather have one operation per line more (for denser code, which is easier to scroll through) than I'd like to be able to more spaciously see all the details of the values. If I needed to do tricky math or call a function to set them, I'd probably make it multi-line.

Same reason I dislike most var blocks, particularly in functions, e.g.

var (
	i int
	p peer.Peer
)

Those variables aren't even related; why group them when it takes even more lines than declaring them separately?


// Return early for non-blocking dials.
if !cc.dopts.block {
Expand Down Expand Up @@ -317,6 +318,16 @@ func (cc *ClientConn) addTraceEvent(msg string) {
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
}

type idler ClientConn

func (i *idler) EnterIdleMode() error {
return (*ClientConn)(i).enterIdleMode()
}

func (i *idler) ExitIdleMode() error {
return (*ClientConn)(i).exitIdleMode()
}

// exitIdleMode moves the channel out of idle mode by recreating the name
// resolver and load balancer.
func (cc *ClientConn) exitIdleMode() error {
Expand Down Expand Up @@ -639,7 +650,7 @@ type ClientConn struct {
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
idlenessMgr idlenessManager
idlenessMgr idle.Manager

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand Down Expand Up @@ -1268,7 +1279,7 @@ func (cc *ClientConn) Close() error {
rWrapper.close()
}
if idlenessMgr != nil {
idlenessMgr.close()
idlenessMgr.Close()
}

for ac := range conns {
Expand Down
69 changes: 18 additions & 51 deletions internal/channelz/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
package channelz

import (
"context"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
Expand All @@ -40,8 +38,11 @@ const (
)

var (
db dbWrapper
idGen idGenerator
// IDGen is the global channelz entity ID generator. It should not be used
// outside this package except by tests.
IDGen IDGenerator

db dbWrapper
// EntryPerPage defines the number of channelz entries to be shown on a web page.
EntryPerPage = int64(50)
curState int32
Expand All @@ -52,14 +53,14 @@ var (
func TurnOn() {
if !IsOn() {
db.set(newChannelMap())
idGen.reset()
IDGen.Reset()
atomic.StoreInt32(&curState, 1)
}
}

// IsOn returns whether channelz data collection is on.
func IsOn() bool {
return atomic.CompareAndSwapInt32(&curState, 1, 1)
return atomic.LoadInt32(&curState) == 1
}

// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
Expand Down Expand Up @@ -97,43 +98,6 @@ func (d *dbWrapper) get() *channelMap {
return d.DB
}

// NewChannelzStorageForTesting initializes channelz data storage and id
// generator for testing purposes.
//
// Returns a cleanup function to be invoked by the test, which waits for up to
// 10s for all channelz state to be reset by the grpc goroutines when those
// entities get closed. This cleanup function helps with ensuring that tests
// don't mess up each other.
func NewChannelzStorageForTesting() (cleanup func() error) {
db.set(newChannelMap())
idGen.reset()

return func() error {
cm := db.get()
if cm == nil {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
cm.mu.RLock()
topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
cm.mu.RUnlock()

if err := ctx.Err(); err != nil {
return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
}
if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
return nil
}
<-ticker.C
}
}
}

// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
// boolean indicating whether there's more top channels to be queried for.
//
Expand Down Expand Up @@ -193,7 +157,7 @@ func GetServer(id int64) *ServerMetric {
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
id := idGen.genID()
id := IDGen.genID()
var parent int64
isTopChannel := true
if pid != nil {
Expand Down Expand Up @@ -229,7 +193,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er
if pid == nil {
return nil, errors.New("a SubChannel's parent id cannot be nil")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefSubChannel, id, pid), nil
}
Expand All @@ -251,7 +215,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterServer(s Server, ref string) *Identifier {
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefServer, id, nil)
}
Expand All @@ -277,7 +241,7 @@ func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, e
if pid == nil {
return nil, errors.New("a ListenSocket's parent id cannot be 0")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefListenSocket, id, pid), nil
}
Expand All @@ -297,7 +261,7 @@ func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, e
if pid == nil {
return nil, errors.New("a NormalSocket's parent id cannot be 0")
}
id := idGen.genID()
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefNormalSocket, id, pid), nil
}
Expand Down Expand Up @@ -776,14 +740,17 @@ func (c *channelMap) GetServer(id int64) *ServerMetric {
return sm
}

type idGenerator struct {
// IDGenerator is an incrementing atomic that tracks IDs for channelz entities.
type IDGenerator struct {
id int64
}

func (i *idGenerator) reset() {
// Reset resets the generated ID back to zero. Should only be used at
// initialization or by tests sensitive to the ID number.
func (i *IDGenerator) Reset() {
atomic.StoreInt64(&i.id, 0)
}

func (i *idGenerator) genID() int64 {
func (i *IDGenerator) genID() int64 {
return atomic.AddInt64(&i.id, 1)
}
5 changes: 5 additions & 0 deletions internal/channelz/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ type tracedChannel interface {

type channelTrace struct {
cm *channelMap
clearCalled bool
createdTime time.Time
eventCount int64
mu sync.Mutex
Expand Down Expand Up @@ -656,6 +657,10 @@ func (c *channelTrace) append(e *TraceEvent) {
}

func (c *channelTrace) clear() {
if c.clearCalled {
return
}
c.clearCalled = true
c.mu.Lock()
for _, e := range c.events {
if e.RefID != 0 {
Expand Down
Loading