Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/quotapool: various features and fixes #38587

Merged
merged 3 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion pkg/util/quotapool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ type Option interface {
apply(*config)
}

// AcquisitionFunc is used to configure a quotapool to call a function after
// an acquisition has occurred.
type AcquisitionFunc func(
ctx context.Context, poolName string, r Request, start time.Time,
)

// OnAcquisition creates an Option to configure a callback upon acquisition.
// It is often useful for recording metrics.
func OnAcquisition(f AcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onAcquisition = f
})
}

// OnSlowAcquisition creates an Option to configure a callback upon slow
// acquisitions. Only one OnSlowAcquisition may be used. If multiple are
// specified only the last will be used.
Expand Down Expand Up @@ -58,7 +72,7 @@ type optionFunc func(cfg *config)
func (f optionFunc) apply(cfg *config) { f(cfg) }

type config struct {
name string
onAcquisition AcquisitionFunc
onSlowAcquisition SlowAcquisitionFunc
slowAcquisitionThreshold time.Duration
}
5 changes: 5 additions & 0 deletions pkg/util/quotapool/intpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func (p *IntPool) AcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc,
return p.newIntAlloc(r.took), nil
}

// Len returns the current length of the queue for this IntPool.
func (p *IntPool) Len() int {
return p.qp.Len()
}

// ApproximateQuota will report approximately the amount of quota available in
// the pool. It is precise if there are no ongoing acquisitions. If there are,
// the return value can be up to 'v' less than actual available quota where 'v'
Expand Down
79 changes: 77 additions & 2 deletions pkg/util/quotapool/intpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -296,6 +298,22 @@ func TestQuotaPoolCappedAcquisition(t *testing.T) {
}
}

func TestOnAcquisition(t *testing.T) {
const quota = 100
var called bool
qp := quotapool.NewIntPool("test", quota,
quotapool.OnAcquisition(func(ctx context.Context, poolName string, _ quotapool.Request, start time.Time,
) {
assert.Equal(t, poolName, "test")
called = true
}))
ctx := context.Background()
alloc, err := qp.Acquire(ctx, 1)
assert.Nil(t, err)
assert.True(t, called)
alloc.Release()
}

// TestSlowAcquisition ensures that the SlowAcquisition callback is called
// when an Acquire call takes longer than the configured timeout.
func TestSlowAcquisition(t *testing.T) {
Expand All @@ -311,7 +329,9 @@ func TestSlowAcquisition(t *testing.T) {
firstKey := ctxKey(1)
firstCtx := context.WithValue(ctx, firstKey, "foo")
slowCalled, acquiredCalled := make(chan struct{}), make(chan struct{})
f := func(ctx context.Context, _ string, _ quotapool.Request, _ time.Time) func() {
const poolName = "test"
f := func(ctx context.Context, name string, _ quotapool.Request, _ time.Time) func() {
assert.Equal(t, poolName, name)
if ctx.Value(firstKey) != nil {
return func() {}
}
Expand All @@ -320,7 +340,7 @@ func TestSlowAcquisition(t *testing.T) {
close(acquiredCalled)
}
}
qp := quotapool.NewIntPool("test", 1, quotapool.OnSlowAcquisition(time.Microsecond, f))
qp := quotapool.NewIntPool(poolName, 1, quotapool.OnSlowAcquisition(time.Microsecond, f))
alloc, err := qp.Acquire(firstCtx, 1)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -447,6 +467,61 @@ func BenchmarkConcurrentIntQuotaPool(b *testing.B) {
}
}

// TestLen verifies that the Len() method of the IntPool works as expected.
func TestLen(t *testing.T) {
qp := quotapool.NewIntPool("test", 1, quotapool.LogSlowAcquisition)
ctx := context.Background()
allocCh := make(chan *quotapool.IntAlloc)
doAcquire := func(ctx context.Context) {
alloc, err := qp.Acquire(ctx, 1)
if ctx.Err() == nil && assert.Nil(t, err) {
allocCh <- alloc
}
}
assertLenSoon := func(exp int) {
testutils.SucceedsSoon(t, func() error {
if got := qp.Len(); got != exp {
return errors.Errorf("expected queue len to be %d, got %d", got, exp)
}
return nil
})
}
// Initially qp should have a length of 0.
assert.Equal(t, 0, qp.Len())
// Acquire all of the quota from the pool.
alloc, err := qp.Acquire(ctx, 1)
assert.Nil(t, err)
// The length should still be 0.
assert.Equal(t, 0, qp.Len())
// Launch a goroutine to acquire quota, ensure that the length increases.
go doAcquire(ctx)
assertLenSoon(1)
// Create more goroutines which will block to be canceled later in order to
// ensure that cancelations deduct from the length.
const numToCancel = 12 // an arbitrary number
ctxToCancel, cancel := context.WithCancel(ctx)
for i := 0; i < numToCancel; i++ {
go doAcquire(ctxToCancel)
}
// Ensure that all of the new goroutines are reflected in the length.
assertLenSoon(numToCancel + 1)
// Launch another goroutine with the default context.
go doAcquire(ctx)
assertLenSoon(numToCancel + 2)
// Cancel some of the goroutines.
cancel()
// Ensure that they are soon not reflected in the length.
assertLenSoon(2)
// Unblock the first goroutine.
alloc.Release()
alloc = <-allocCh
assert.Equal(t, 1, qp.Len())
// Unblock the second goroutine.
alloc.Release()
<-allocCh
assert.Equal(t, 0, qp.Len())
}

// BenchmarkIntQuotaPoolFunc benchmarks the common case where we have sufficient
// quota available in the pool and we repeatedly acquire and release quota.
func BenchmarkIntQuotaPoolFunc(b *testing.B) {
Expand Down
32 changes: 29 additions & 3 deletions pkg/util/quotapool/quotapool.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (ec *ErrClosed) Error() string {
type QuotaPool struct {
config

// name is used for logging purposes and is passed to functions used to report
// acquistions or slow acqusitions.
name string

// chanSyncPool is used to pool allocations of the channels used to notify
// goroutines waiting in Acquire.
chanSyncPool sync.Pool
Expand Down Expand Up @@ -118,6 +122,11 @@ type QuotaPool struct {
// channel buffer.
q notifyQueue

// numCanceled is the number of members of q which have been canceled.
// It is used to determine the current number of active waiters in the queue
// which is q.len() less this value.
numCanceled int

// closed is set to true when the quota pool is closed (see
// QuotaPool.Close).
closed bool
Expand All @@ -129,6 +138,7 @@ type QuotaPool struct {
// acquired without ever making more than the quota capacity available.
func New(name string, initialResource Resource, options ...Option) *QuotaPool {
qp := &QuotaPool{
name: name,
quota: make(chan Resource, 1),
done: make(chan struct{}),
chanSyncPool: sync.Pool{
Expand Down Expand Up @@ -186,12 +196,20 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) {
if closeErr != nil {
return closeErr
}
start := timeutil.Now()
// Set up onAcquisition if we have one.
if qp.config.onAcquisition != nil {
defer func() {
if err == nil {
qp.config.onAcquisition(ctx, qp.name, r, start)
}
}()
}

// Set up the infrastructure to report slow requests.
var slowTimer *timeutil.Timer
var slowTimerC <-chan time.Time
var start time.Time
if qp.onSlowAcquisition != nil {
start = timeutil.Now()
slowTimer = timeutil.NewTimer()
defer slowTimer.Stop()
// Intentionally reset only once, for we care more about the select duration in
Expand Down Expand Up @@ -230,8 +248,8 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) {
// Goroutines are not a risk of getting notified and finding
// out they're not first in line.
notifyCh <- struct{}{}
qp.mu.numCanceled++
}

qp.mu.Unlock()
return ctx.Err()
case <-qp.done:
Expand Down Expand Up @@ -312,6 +330,7 @@ func (qp *QuotaPool) notifyNextLocked() {
// shifting the queue.
<-ch
qp.chanSyncPool.Put(qp.mu.q.dequeue())
qp.mu.numCanceled--
continue
}
break
Expand All @@ -336,6 +355,13 @@ func (qp *QuotaPool) ApproximateQuota(f func(Resource)) {
}
}

// Len returns the current length of the queue for this QuotaPool.
func (qp *QuotaPool) Len() int {
qp.mu.Lock()
defer qp.mu.Unlock()
return int(qp.mu.q.len) - qp.mu.numCanceled
}

// Close signals to all ongoing and subsequent acquisitions that they are
// free to return to their callers. They will receive an *ErrClosed which
// contains this reason.
Expand Down