From 8dcbaa38d34f4f5b61908200ec5b8b434477b482 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 1 Jul 2019 16:22:28 -0400 Subject: [PATCH] util/quotapool: add optional OnAcquire function This change is used in upcoming work on admission control but felt sufficiently isolated to warrant its own PR. This function is especially useful for recording metrics after an acquisition has occurred. Release note: None --- pkg/util/quotapool/config.go | 15 +++++++++++++++ pkg/util/quotapool/intpool_test.go | 16 ++++++++++++++++ pkg/util/quotapool/quotapool.go | 12 ++++++++++-- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/pkg/util/quotapool/config.go b/pkg/util/quotapool/config.go index 0e6227d4f49a..c4a2998f00ed 100644 --- a/pkg/util/quotapool/config.go +++ b/pkg/util/quotapool/config.go @@ -24,6 +24,20 @@ type Option interface { apply(*config) } +// AcquisitionFunc is used to configure a quotapool to call a function after +// an acquisition has occurred. +type AcquisitionFunc func( + ctx context.Context, poolName string, r Request, start time.Time, +) + +// OnAcquisition creates an Option to configure a callback upon acquisition. +// It is often useful for recording metrics. +func OnAcquisition(f AcquisitionFunc) Option { + return optionFunc(func(cfg *config) { + cfg.onAcquisition = f + }) +} + // OnSlowAcquisition creates an Option to configure a callback upon slow // acquisitions. Only one OnSlowAcquisition may be used. If multiple are // specified only the last will be used. @@ -58,6 +72,7 @@ type optionFunc func(cfg *config) func (f optionFunc) apply(cfg *config) { f(cfg) } type config struct { + onAcquisition AcquisitionFunc onSlowAcquisition SlowAcquisitionFunc slowAcquisitionThreshold time.Duration } diff --git a/pkg/util/quotapool/intpool_test.go b/pkg/util/quotapool/intpool_test.go index ba0053157e66..73a668b87452 100644 --- a/pkg/util/quotapool/intpool_test.go +++ b/pkg/util/quotapool/intpool_test.go @@ -298,6 +298,22 @@ func TestQuotaPoolCappedAcquisition(t *testing.T) { } } +func TestOnAcquisition(t *testing.T) { + const quota = 100 + var called bool + qp := quotapool.NewIntPool("test", quota, + quotapool.OnAcquisition(func(ctx context.Context, poolName string, _ quotapool.Request, start time.Time, + ) { + assert.Equal(t, poolName, "test") + called = true + })) + ctx := context.Background() + alloc, err := qp.Acquire(ctx, 1) + assert.Nil(t, err) + assert.True(t, called) + alloc.Release() +} + // TestSlowAcquisition ensures that the SlowAcquisition callback is called // when an Acquire call takes longer than the configured timeout. func TestSlowAcquisition(t *testing.T) { diff --git a/pkg/util/quotapool/quotapool.go b/pkg/util/quotapool/quotapool.go index e966d1a8d2c5..ac821eb856ad 100644 --- a/pkg/util/quotapool/quotapool.go +++ b/pkg/util/quotapool/quotapool.go @@ -196,12 +196,20 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) { if closeErr != nil { return closeErr } + start := timeutil.Now() + // Set up onAcquisition if we have one. + if qp.config.onAcquisition != nil { + defer func() { + if err == nil { + qp.config.onAcquisition(ctx, qp.name, r, start) + } + }() + } + // Set up the infrastructure to report slow requests. var slowTimer *timeutil.Timer var slowTimerC <-chan time.Time - var start time.Time if qp.onSlowAcquisition != nil { - start = timeutil.Now() slowTimer = timeutil.NewTimer() defer slowTimer.Stop() // Intentionally reset only once, for we care more about the select duration in