Skip to content

Commit

Permalink
Merge pull request #5727 from lcabancla/lcabancla.resource_pool.waitt…
Browse files Browse the repository at this point in the history
…ime.percentiles_3

Added resource pool wait time histogram metrics
  • Loading branch information
sougou authored Mar 24, 2020
2 parents 75971db + 6b8b6b4 commit 8732595
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 21 deletions.
5 changes: 4 additions & 1 deletion go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type ResourcePool struct {
resources chan resourceWrapper
factory Factory
idleTimer *timer.Timer
logWait func(time.Time)
}

type resourceWrapper struct {
Expand All @@ -89,7 +90,7 @@ type resourceWrapper struct {
// An idleTimeout of 0 means that there is no timeout.
// A non-zero value of prefillParallelism causes the pool to be pre-filled.
// The value specifies how many resources can be opened in parallel.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool {
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int, logWait func(time.Time)) *ResourcePool {
if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
panic(errors.New("invalid/out of range capacity"))
}
Expand All @@ -99,6 +100,7 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
available: sync2.NewAtomicInt64(int64(capacity)),
capacity: sync2.NewAtomicInt64(int64(capacity)),
idleTimeout: sync2.NewAtomicDuration(idleTimeout),
logWait: logWait,
}
for i := 0; i < capacity; i++ {
rp.resources <- resourceWrapper{}
Expand Down Expand Up @@ -325,6 +327,7 @@ func (rp *ResourcePool) SetCapacity(capacity int) error {
func (rp *ResourcePool) recordWait(start time.Time) {
rp.waitCount.Add(1)
rp.waitTime.Add(time.Since(start))
rp.logWait(start)
}

// SetIdleTimeout sets the idle timeout. It can only be used if there was an
Expand Down
50 changes: 37 additions & 13 deletions go/pools/resource_pool_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

var lastID, count sync2.AtomicInt64
var waitStarts []time.Time

type TestResource struct {
num int64
Expand All @@ -39,6 +40,10 @@ func (tr *TestResource) Close() {
}
}

func logWait(start time.Time) {
waitStarts = append(waitStarts, start)
}

func PoolFactory() (Resource, error) {
count.Add(1)
return &TestResource{lastID.Add(1), false}, nil
Expand All @@ -57,7 +62,9 @@ func TestOpen(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0)
waitStarts = waitStarts[:0]

p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0, logWait)
p.SetCapacity(5)
var resources [10]Resource

Expand All @@ -74,6 +81,9 @@ func TestOpen(t *testing.T) {
if p.WaitCount() != 0 {
t.Errorf("expecting 0, received %d", p.WaitCount())
}
if len(waitStarts) != 0 {
t.Errorf("expecting 0, received %d", len(waitStarts))
}
if p.WaitTime() != 0 {
t.Errorf("expecting 0, received %d", p.WaitTime())
}
Expand Down Expand Up @@ -109,6 +119,15 @@ func TestOpen(t *testing.T) {
if p.WaitCount() != 5 {
t.Errorf("Expecting 5, received %d", p.WaitCount())
}
if int64(len(waitStarts)) != p.WaitCount() {
t.Errorf("expecting %d, received %d", p.WaitCount(), len(waitStarts))
}
// verify start times are monotonic increasing
for i := 1; i < len(waitStarts); i++ {
if waitStarts[i].Before(waitStarts[i-1]) {
t.Errorf("Expecting monotonic increasing start times")
}
}
if p.WaitTime() == 0 {
t.Errorf("Expecting non-zero")
}
Expand Down Expand Up @@ -198,12 +217,12 @@ func TestOpen(t *testing.T) {
func TestPrefill(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if p.Active() != 5 {
t.Errorf("p.Active(): %d, want 5", p.Active())
}
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1)
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
Expand All @@ -218,7 +237,7 @@ func TestPrefillTimeout(t *testing.T) {
defer func() { prefillTimeout = saveTimeout }()

start := time.Now()
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1)
p := NewResourcePool(SlowFailFactory, 5, 5, time.Second, 1, logWait)
defer p.Close()
if elapsed := time.Since(start); elapsed > 20*time.Millisecond {
t.Errorf("elapsed: %v, should be around 10ms", elapsed)
Expand All @@ -232,7 +251,9 @@ func TestShrinking(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
waitStarts = waitStarts[:0]

p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
var resources [10]Resource
// Leave one empty slot in the pool
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -315,6 +336,9 @@ func TestShrinking(t *testing.T) {
if p.WaitCount() != 1 {
t.Errorf("Expecting 1, received %d", p.WaitCount())
}
if int64(len(waitStarts)) != p.WaitCount() {
t.Errorf("Expecting %d, received %d", p.WaitCount(), len(waitStarts))
}
if count.Get() != 2 {
t.Errorf("Expecting 2, received %d", count.Get())
}
Expand Down Expand Up @@ -371,7 +395,7 @@ func TestClosing(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
var resources [10]Resource
for i := 0; i < 5; i++ {
r, err := p.Get(ctx)
Expand Down Expand Up @@ -425,7 +449,7 @@ func TestIdleTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait)
defer p.Close()

r, err := p.Get(ctx)
Expand Down Expand Up @@ -536,7 +560,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0, logWait)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -557,7 +581,7 @@ func TestCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0, logWait)
defer p.Close()
if _, err := p.Get(ctx); err.Error() != "Failed" {
t.Errorf("Expecting Failed, received %v", err)
Expand All @@ -573,7 +597,7 @@ func TestCreateFailOnPut(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0, logWait)
defer p.Close()
_, err := p.Get(ctx)
if err != nil {
Expand All @@ -590,7 +614,7 @@ func TestSlowCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0, logWait)
defer p.Close()
ch := make(chan bool)
// The third Get should not wait indefinitely
Expand All @@ -612,7 +636,7 @@ func TestTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -631,7 +655,7 @@ func TestTimeout(t *testing.T) {
func TestExpired(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0, logWait)
defer p.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
r, err := p.Get(ctx)
Expand Down
14 changes: 12 additions & 2 deletions go/vt/dbconnpool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ type ConnectionPool struct {
hostIsNotIP bool

mysqlStats *stats.Timings
name string
}

// NewConnectionPool creates a new ConnectionPool. The name is used
// to publish stats only.
func NewConnectionPool(name string, capacity int, idleTimeout time.Duration, dnsResolutionFrequency time.Duration) *ConnectionPool {
cp := &ConnectionPool{capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency}
cp := &ConnectionPool{name: name, capacity: capacity, idleTimeout: idleTimeout, resolutionFrequency: dnsResolutionFrequency}
if name == "" || usedNames[name] {
return cp
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (cp *ConnectionPool) Open(info dbconfigs.Connector, mysqlStats *stats.Timin
defer cp.mu.Unlock()
cp.info = info
cp.mysqlStats = mysqlStats
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0)
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0, cp.getLogWaitCallback())
// Check if we need to resolve a hostname (The Host is not just an IP address).
if cp.resolutionFrequency > 0 && net.ParseIP(info.Host()) == nil {
cp.hostIsNotIP = true
Expand All @@ -168,6 +169,15 @@ func (cp *ConnectionPool) Open(info dbconfigs.Connector, mysqlStats *stats.Timin
}
}

func (cp *ConnectionPool) getLogWaitCallback() func(time.Time) {
if cp.name == "" {
return func(start time.Time) {} // no op
}
return func(start time.Time) {
cp.mysqlStats.Record(cp.name+"ResourceWaitTime", start)
}
}

// connect is used by the resource pool to create a new Resource.
func (cp *ConnectionPool) connect() (pools.Resource, error) {
c, err := NewDBConnection(cp.info, cp.mysqlStats)
Expand Down
4 changes: 0 additions & 4 deletions go/vt/vttablet/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,6 @@ func TestConsolidation(t *testing.T) {
wg.Wait()

vend := framework.DebugVars()
if err := compareIntDiff(vend, "Waits/TotalCount", vstart, 1); err != nil {
t.Logf("DebugVars Waits/TotalCount not incremented with sleep=%v", sleep)
continue
}
if err := compareIntDiff(vend, "Waits/Histograms/Consolidations/Count", vstart, 1); err != nil {
t.Logf("DebugVars Waits/Histograms/Consolidations/Count not incremented with sleep=%v", sleep)
continue
Expand Down
42 changes: 42 additions & 0 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -812,3 +813,44 @@ func TestManualTwopcz(t *testing.T) {
fmt.Print("Sleeping for 30 seconds\n")
time.Sleep(30 * time.Second)
}

func TestTransactionPoolResourceWaitTime(t *testing.T) {
defer framework.Server.SetPoolSize(framework.Server.TxPoolSize())
defer framework.Server.SetTxPoolTimeout(framework.Server.TxPoolTimeout())
framework.Server.SetTxPoolSize(1)
framework.Server.SetTxPoolTimeout(10 * time.Second)
debugVarPath := "Waits/Histograms/TransactionPoolResourceWaitTime/Count"

for sleep := 0.1; sleep < 10.0; sleep *= 2 {
vstart := framework.DebugVars()
var wg sync.WaitGroup
wg.Add(2)

transactionFunc := func() {
client := framework.NewClient()

bv := map[string]*querypb.BindVariable{}
query := fmt.Sprintf("select sleep(%v) from dual", sleep)
if _, err := client.BeginExecute(query, bv); err != nil {
t.Error(err)
return
}
if err := client.Rollback(); err != nil {
t.Error(err)
return
}
wg.Done()
}
go transactionFunc()
go transactionFunc()
wg.Wait()
vend := framework.DebugVars()
if err := compareIntDiff(vend, debugVarPath, vstart, 1); err != nil {
t.Logf("DebugVars %v not incremented with sleep=%v", debugVarPath, sleep)
continue
}
t.Logf("DebugVars %v properly incremented with sleep=%v", debugVarPath, sleep)
return
}
t.Errorf("DebugVars %v not incremented", debugVarPath)
}
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,21 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams dbconfigs.Connector) {
f := func() (pools.Resource, error) {
return NewDBConn(cp, appParams)
}
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism)
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism, cp.getLogWaitCallback())
cp.appDebugParams = appDebugParams

cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats)
}

func (cp *Pool) getLogWaitCallback() func(time.Time) {
if cp.name == "" {
return func(start time.Time) {} // no op
}
return func(start time.Time) {
tabletenv.WaitStats.Record(cp.name+"ResourceWaitTime", start)
}
}

// Close will close the pool and wait for connections to be returned before
// exiting.
func (cp *Pool) Close() {
Expand Down

0 comments on commit 8732595

Please sign in to comment.