Skip to content

Commit

Permalink
util/quotapool: add optional OnAcquire function
Browse files Browse the repository at this point in the history
This change is used in upcoming work on admission control but felt sufficiently
isolated to warrant its own PR. This function is especially useful for
recording metrics after an acquisition has occurred.

Release note: None
  • Loading branch information
ajwerner committed Jul 2, 2019
1 parent 3f0ccce commit 8dcbaa3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
15 changes: 15 additions & 0 deletions pkg/util/quotapool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ type Option interface {
apply(*config)
}

// AcquisitionFunc is used to configure a quotapool to call a function after
// an acquisition has occurred.
type AcquisitionFunc func(
ctx context.Context, poolName string, r Request, start time.Time,
)

// OnAcquisition creates an Option to configure a callback upon acquisition.
// It is often useful for recording metrics.
func OnAcquisition(f AcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onAcquisition = f
})
}

// OnSlowAcquisition creates an Option to configure a callback upon slow
// acquisitions. Only one OnSlowAcquisition may be used. If multiple are
// specified only the last will be used.
Expand Down Expand Up @@ -58,6 +72,7 @@ type optionFunc func(cfg *config)
func (f optionFunc) apply(cfg *config) { f(cfg) }

type config struct {
onAcquisition AcquisitionFunc
onSlowAcquisition SlowAcquisitionFunc
slowAcquisitionThreshold time.Duration
}
16 changes: 16 additions & 0 deletions pkg/util/quotapool/intpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,22 @@ func TestQuotaPoolCappedAcquisition(t *testing.T) {
}
}

func TestOnAcquisition(t *testing.T) {
const quota = 100
var called bool
qp := quotapool.NewIntPool("test", quota,
quotapool.OnAcquisition(func(ctx context.Context, poolName string, _ quotapool.Request, start time.Time,
) {
assert.Equal(t, poolName, "test")
called = true
}))
ctx := context.Background()
alloc, err := qp.Acquire(ctx, 1)
assert.Nil(t, err)
assert.True(t, called)
alloc.Release()
}

// TestSlowAcquisition ensures that the SlowAcquisition callback is called
// when an Acquire call takes longer than the configured timeout.
func TestSlowAcquisition(t *testing.T) {
Expand Down
12 changes: 10 additions & 2 deletions pkg/util/quotapool/quotapool.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,20 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) {
if closeErr != nil {
return closeErr
}
start := timeutil.Now()
// Set up onAcquisition if we have one.
if qp.config.onAcquisition != nil {
defer func() {
if err == nil {
qp.config.onAcquisition(ctx, qp.name, r, start)
}
}()
}

// Set up the infrastructure to report slow requests.
var slowTimer *timeutil.Timer
var slowTimerC <-chan time.Time
var start time.Time
if qp.onSlowAcquisition != nil {
start = timeutil.Now()
slowTimer = timeutil.NewTimer()
defer slowTimer.Stop()
// Intentionally reset only once, for we care more about the select duration in
Expand Down

0 comments on commit 8dcbaa3

Please sign in to comment.