From 8e27194fb1bc2c045beb15f14c3be3c3915aa90b Mon Sep 17 00:00:00 2001 From: Steven L Date: Tue, 16 Jul 2024 20:33:00 -0500 Subject: [PATCH 1/3] aggregator metrics --- common/metrics/defs.go | 52 +++++++++---- common/metrics/tags.go | 56 +++++++------- .../global/algorithm/requestweighted.go | 73 ++++++++++++++++--- .../global/algorithm/requestweighted_test.go | 16 ++-- common/quotas/global/collection/collection.go | 2 +- common/quotas/global/rpc/client.go | 2 +- service/history/handler/handler_test.go | 15 ++-- service/history/resource/resource.go | 15 ++-- 8 files changed, 162 insertions(+), 69 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 4af92cac865..9d23e9dc75b 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -840,8 +840,10 @@ const ( // TaskValidatorScope is the metric for the taskvalidator's workflow check operation. TaskValidatorScope - // FrontendGlobalRatelimiter is the metrics scope for frontend.GlobalRatelimiter - FrontendGlobalRatelimiter + // GlobalRatelimiter is the metrics scope for limiting-side common/quotas/global behavior + GlobalRatelimiter + // GlobalRatelimiterAggregator is the metrics scope for aggregator-side common/quotas/global behavior + GlobalRatelimiterAggregator NumCommonScopes ) @@ -1729,7 +1731,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HashringScope: {operation: "Hashring"}, // currently used by both frontend and history, but may grow to other limiting-host-services. - FrontendGlobalRatelimiter: {operation: "GlobalRatelimiter"}, + GlobalRatelimiter: {operation: "GlobalRatelimiter"}, + GlobalRatelimiterAggregator: {operation: "GlobalRatelimiterAggregator"}, }, // Frontend Scope Names Frontend: { @@ -2204,14 +2207,24 @@ const ( AsyncRequestPayloadSize + // limiter-side metrics GlobalRatelimiterStartupUsageHistogram GlobalRatelimiterFailingUsageHistogram GlobalRatelimiterGlobalUsageHistogram - GlobalRatelimiterUpdateLatency // time spent performing all Update requests, per batch attempt. ideally well below update interval. - + GlobalRatelimiterUpdateLatency // time spent performing all Update requests, per batch attempt. ideally well below update interval. GlobalRatelimiterAllowedRequestsCount // per key/type usage GlobalRatelimiterRejectedRequestsCount // per key/type usage - GlobalRatelimiterQuota // per-global-key quota information, emitted when a key is in use + GlobalRatelimiterQuota // per-global-key quota information, emitted when a key is in us + + // aggregator-side metrics + GlobalRatelimiterInitialized + GlobalRatelimiterReinitialized + GlobalRatelimiterUpdated + GlobalRatelimiterDecayed + GlobalRatelimiterLimitsQueried + GlobalRatelimiterHostLimitsQueried + GlobalRatelimiterRemovedLimits + GlobalRatelimiterRemovedHostLimits NumCommonMetrics // Needs to be last on this list for iota numbering ) @@ -2864,10 +2877,18 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ GlobalRatelimiterFailingUsageHistogram: {metricName: "global_ratelimiter_failing_usage_histogram", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, GlobalRatelimiterGlobalUsageHistogram: {metricName: "global_ratelimiter_global_usage_histogram", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, GlobalRatelimiterUpdateLatency: {metricName: "global_ratelimiter_update_latency", metricType: Timer}, - GlobalRatelimiterAllowedRequestsCount: {metricName: "global_ratelimiter_allowed_requests", metricType: Counter}, GlobalRatelimiterRejectedRequestsCount: {metricName: "global_ratelimiter_rejected_requests", metricType: Counter}, GlobalRatelimiterQuota: {metricName: "global_ratelimiter_quota", metricType: Gauge}, + + GlobalRatelimiterInitialized: {metricName: "global_ratelimiter_initialized", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + GlobalRatelimiterReinitialized: {metricName: "global_ratelimiter_reinitialized", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + GlobalRatelimiterUpdated: {metricName: "global_ratelimiter_updated", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + GlobalRatelimiterDecayed: {metricName: "global_ratelimiter_decayed", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + GlobalRatelimiterLimitsQueried: {metricName: "global_ratelimiter_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + GlobalRatelimiterHostLimitsQueried: {metricName: "global_ratelimiter_host_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + GlobalRatelimiterRemovedLimits: {metricName: "global_ratelimiter_removed_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + GlobalRatelimiterRemovedHostLimits: {metricName: "global_ratelimiter_removed_host_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, }, History: { TaskRequests: {metricName: "task_requests", metricType: Counter}, @@ -3327,14 +3348,17 @@ var PersistenceLatencyBuckets = tally.DurationBuckets([]time.Duration{ }) // GlobalRatelimiterUsageHistogram contains buckets for tracking how many ratelimiters are -// in which state (startup, healthy, failing). +// in which various states (startup, healthy, failing, as well as aggregator-side quantities, deleted, etc). +// +// this is intended for coarse scale checking, not alerting, so the buckets +// should be considered unstable and can be changed whenever desired. var GlobalRatelimiterUsageHistogram = tally.ValueBuckets{ - 0, // need an explicit 0 to record zeros - 1, 2, 5, 10, - 25, 50, 100, - 250, 500, 1000, - 1250, 1500, 2000, - // TODO: almost certainly want more, but how many? + 0, // need an explicit 0 or zero is reported as 1 + 1, 2, 5, + 10, 25, 50, + 100, 250, 500, + 1000, 2500, 5000, + 10000, 25000, 50000, } // ErrorClass is an enum to help with classifying SLA vs. non-SLA errors (SLA = "service level agreement") diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 71130944a39..368554c414d 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -34,36 +34,38 @@ const ( goVersionTag = "go_version" cadenceVersionTag = "cadence_version" - instance = "instance" - domain = "domain" - domainType = "domain_type" - clusterGroup = "cluster_group" - sourceCluster = "source_cluster" - targetCluster = "target_cluster" - activeCluster = "active_cluster" - taskList = "tasklist" - taskListType = "tasklistType" - workflowType = "workflowType" - activityType = "activityType" - decisionType = "decisionType" - invariantType = "invariantType" - shardScannerScanResult = "shardscanner_scan_result" - shardScannerFixResult = "shardscanner_fix_result" - kafkaPartition = "kafkaPartition" - transport = "transport" - caller = "caller" - service = "service" - signalName = "signalName" - workflowVersion = "workflow_version" - shardID = "shard_id" - matchingHost = "matching_host" - host = "host" - pollerIsolationGroup = "poller_isolation_group" - asyncWFRequestType = "async_wf_request_type" + instance = "instance" + domain = "domain" + domainType = "domain_type" + clusterGroup = "cluster_group" + sourceCluster = "source_cluster" + targetCluster = "target_cluster" + activeCluster = "active_cluster" + taskList = "tasklist" + taskListType = "tasklistType" + workflowType = "workflowType" + activityType = "activityType" + decisionType = "decisionType" + invariantType = "invariantType" + shardScannerScanResult = "shardscanner_scan_result" + shardScannerFixResult = "shardscanner_fix_result" + kafkaPartition = "kafkaPartition" + transport = "transport" + caller = "caller" + service = "service" + signalName = "signalName" + workflowVersion = "workflow_version" + shardID = "shard_id" + matchingHost = "matching_host" + host = "host" + pollerIsolationGroup = "poller_isolation_group" + asyncWFRequestType = "async_wf_request_type" + workflowTerminationReason = "workflow_termination_reason" + + // limiter-side tags globalRatelimitKey = "global_ratelimit_key" globalRatelimitType = "global_ratelimit_type" globalRatelimitCollectionName = "global_ratelimit_collection" - workflowTerminationReason = "workflow_termination_reason" allValue = "all" unknownValue = "_unknown_" diff --git a/common/quotas/global/algorithm/requestweighted.go b/common/quotas/global/algorithm/requestweighted.go index 5700a531272..bc06a0b8d48 100644 --- a/common/quotas/global/algorithm/requestweighted.go +++ b/common/quotas/global/algorithm/requestweighted.go @@ -130,6 +130,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/metrics" ) type ( @@ -154,7 +155,8 @@ type ( impl struct { // intentionally value-typed so caller cannot mutate the fields. // manually copy data if this changes. - cfg Config + cfg Config + scope metrics.Scope // mut protects usage, as it is the only mutable data mut sync.Mutex @@ -349,9 +351,10 @@ func (c configSnapshot) missedUpdateScalar(dataAge time.Duration) PerSecond { // // This instance is effectively single-threaded, but a small sharding wrapper should allow better concurrent // throughput if needed (bound by CPU cores, as it's moderately CPU-costly). -func New(cfg Config) (RequestWeighted, error) { +func New(met metrics.Client, cfg Config) (RequestWeighted, error) { i := &impl{ cfg: cfg, + scope: met.Scope(metrics.GlobalRatelimiterAggregator), usage: make(map[Limit]map[Identity]requests, guessNumKeys), // start out relatively large clock: clock.NewRealTimeSource(), @@ -369,7 +372,10 @@ func (a *impl) Update(p UpdateParams) error { return fmt.Errorf("bad args to update: %w", err) } a.mut.Lock() - defer a.mut.Unlock() + once := newOnce() + defer once.Do(a.mut.Unlock) + + var initialized, reinitialized, updated, decayed int64 snap, err := a.snapshot() if err != nil { @@ -386,6 +392,7 @@ func (a *impl) Update(p UpdateParams) error { aps := PerSecond(float64(req.Accepted) / float64(p.Elapsed/time.Second)) rps := PerSecond(float64(req.Rejected) / float64(p.Elapsed/time.Second)) if prev.lastUpdate.IsZero() { + initialized++ next = requests{ lastUpdate: snap.now, accepted: aps, // no requests == 100% weight @@ -394,6 +401,7 @@ func (a *impl) Update(p UpdateParams) error { } else { age := snap.now.Sub(prev.lastUpdate) if snap.shouldGC(age) { + reinitialized++ // would have GC'd if we had seen it earlier, so it's the same as the zero state next = requests{ lastUpdate: snap.now, @@ -401,8 +409,12 @@ func (a *impl) Update(p UpdateParams) error { rejected: rps, // no requests == 100% weight } } else { + updated++ // compute the next rolling average step (`*reduce` simulates skipped updates) reduce := snap.missedUpdateScalar(age) + if reduce < 1 { + decayed++ + } next = requests{ lastUpdate: snap.now, // TODO: max(1, actual) so this does not lead to <1 rps allowances? or maybe just 1+actual and then reduce in used-responses? @@ -418,14 +430,21 @@ func (a *impl) Update(p UpdateParams) error { a.usage[key] = ih } + once.Do(a.mut.Unlock) // don't hold the lock while emitting metrics + + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterInitialized, float64(initialized)) + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterReinitialized, float64(reinitialized)) + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterUpdated, float64(updated)) + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterDecayed, float64(decayed)) + return nil } // getWeightsLocked returns the weights of observed hosts (based on ALL requests), and the total number of requests accepted per second. -func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Identity]HostWeight, usedRPS PerSecond) { +func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Identity]HostWeight, usedRPS PerSecond, met Metrics) { ir := a.usage[key] if len(ir) == 0 { - return nil, 0 + return nil, 0, met } weights = make(map[Identity]HostWeight, len(ir)) @@ -436,6 +455,7 @@ func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Ide if snap.shouldGC(age) { // old, clean up delete(ir, id) + met.RemovedHostLimits++ continue } @@ -445,12 +465,14 @@ func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Ide weights[id] = actual // populate with the reduced values so it doesn't have to be calculated again total += actual // keep a running total to scale all values when done usedRPS += reqs.accepted * reduce + met.HostLimits++ } if len(ir) == 0 { // completely empty Limit, gc it as well delete(a.usage, key) - return nil, 0 + met.RemovedLimits++ + return nil, 0, met } for id := range ir { @@ -458,12 +480,16 @@ func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Ide // this also ensures all values are between 0 and 1 (inclusive) weights[id] = weights[id] / total } - return weights, usedRPS + met.Limits = 1 + return weights, usedRPS, met } func (a *impl) HostWeights(host Identity, limits []Limit) (weights map[Limit]HostWeight, usedRPS map[Limit]PerSecond, err error) { a.mut.Lock() - defer a.mut.Unlock() + once := newOnce() + defer once.Do(a.mut.Unlock) + + var cumulative Metrics weights = make(map[Limit]HostWeight, len(limits)) usedRPS = make(map[Limit]PerSecond, len(limits)) @@ -472,7 +498,13 @@ func (a *impl) HostWeights(host Identity, limits []Limit) (weights map[Limit]Hos return nil, nil, err } for _, lim := range limits { - hosts, used := a.getWeightsLocked(lim, snap) + hosts, used, met := a.getWeightsLocked(lim, snap) + + cumulative.Limits += met.Limits // always 1 or 0 + cumulative.HostLimits += met.HostLimits + cumulative.RemovedLimits += met.RemovedLimits // always 0 or 1 (opposite Limits) + cumulative.RemovedHostLimits += met.RemovedHostLimits + if len(hosts) > 0 { usedRPS[lim] = used // limit is known, has some usage if weight, ok := hosts[host]; ok { @@ -480,6 +512,14 @@ func (a *impl) HostWeights(host Identity, limits []Limit) (weights map[Limit]Hos } } } + + once.Do(a.mut.Unlock) // don't hold the lock while emitting metrics + + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterLimitsQueried, float64(cumulative.Limits)) + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterHostLimitsQueried, float64(cumulative.HostLimits)) + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterRemovedLimits, float64(cumulative.RemovedLimits)) + a.scope.RecordHistogramValue(metrics.GlobalRatelimiterRemovedHostLimits, float64(cumulative.RemovedHostLimits)) + return weights, usedRPS, nil } @@ -538,3 +578,18 @@ func weighted[T numeric](newer, older T, weight float64) T { type numeric interface { constraints.Integer | constraints.Float } + +// non-sync version of sync.Once, for easier unlocking +type doOnce bool + +func newOnce() *doOnce { + value := doOnce(false) + return &value +} + +func (o *doOnce) Do(cb func()) { + if *o == false { + *o = true + cb() + } +} diff --git a/common/quotas/global/algorithm/requestweighted_test.go b/common/quotas/global/algorithm/requestweighted_test.go index 4c39909f235..26ce3ac2ea2 100644 --- a/common/quotas/global/algorithm/requestweighted_test.go +++ b/common/quotas/global/algorithm/requestweighted_test.go @@ -38,6 +38,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/metrics" ) // just simplifies newForTest usage as most tests only care about rate @@ -79,13 +80,14 @@ func newForTest(t require.TestingT, snap configSnapshot, validate bool) (*impl, var agg *impl if validate { - i, err := New(cfg) + i, err := New(metrics.NewNoopMetricsClient(), cfg) require.NoError(t, err) agg = i.(*impl) } else { // need to build by hand, New returns nil on err agg = &impl{ cfg: cfg, + scope: metrics.NewNoopMetricsClient().Scope(metrics.GlobalRatelimiterAggregator), usage: make(map[Limit]map[Identity]requests), clock: nil, } @@ -365,9 +367,10 @@ func TestRapidlyCoalesces(t *testing.T) { key := Limit("start workflow") h1, h2, h3 := Identity("one"), Identity("two"), Identity("three") - weights, used := agg.getWeightsLocked(key, snapshot()) + weights, used, met := agg.getWeightsLocked(key, snapshot()) assert.Zero(t, weights, "should have no weights") assert.Zero(t, used, "should have no used RPS") + assert.Zero(t, met, "should have processed no data while calculating") push := func(host Identity, accept, reject int) { err := agg.Update(UpdateParams{ @@ -394,16 +397,18 @@ func TestRapidlyCoalesces(t *testing.T) { // which feels pretty reasonable: after ~10 seconds (3s updates), the oldest data only has ~10% weight. const target = 10 + 200 + 999 for i := 0; i < 4; i++ { - weights, used = agg.getWeightsLocked(key, snapshot()) + weights, used, met = agg.getWeightsLocked(key, snapshot()) t.Log("used:", used, "of actual:", target) t.Log("weights so far:", weights) + t.Log("calculation metrics:", met) push(h1, 10, 10) push(h2, 200, 200) push(h3, 999, 999) } - weights, used = agg.getWeightsLocked(key, snapshot()) + weights, used, met = agg.getWeightsLocked(key, snapshot()) t.Log("used:", used, "of actual:", target) t.Log("weights so far:", weights) + t.Log("calculation metrics:", met) // aggregated allowed-request values should be less than 10% off assert.InDeltaf(t, target, float64(used), target*0.1, "should have allowed >90%% of target rps by the 5th round") // actually ~94% @@ -550,9 +555,10 @@ func TestSimulate(t *testing.T) { snap, err := agg.snapshot() require.NoError(t, err) - weights, used := agg.getWeightsLocked(start, snap) + weights, used, met := agg.getWeightsLocked(start, snap) assert.Zero(t, weights, "should have no weights") assert.Zero(t, used, "should have no used RPS") + assert.Zero(t, met, "should have processed no data while calculating") // just simplifies arg-construction push := func(host Identity, key Limit, accept, reject int) { diff --git a/common/quotas/global/collection/collection.go b/common/quotas/global/collection/collection.go index 7b8be1d7a0b..c413d5192d4 100644 --- a/common/quotas/global/collection/collection.go +++ b/common/quotas/global/collection/collection.go @@ -189,7 +189,7 @@ func New( }, logger: logger.WithTags(tag.ComponentGlobalRatelimiter, tag.GlobalRatelimiterCollectionName(name)), - scope: met.Scope(metrics.FrontendGlobalRatelimiter).Tagged(metrics.GlobalRatelimiterCollectionName(name)), + scope: met.Scope(metrics.GlobalRatelimiter).Tagged(metrics.GlobalRatelimiterCollectionName(name)), keyModes: func(gkey shared.GlobalKey) string { // all collections share a single dynamic config key, // so they must use the global key to uniquely identify all keys. diff --git a/common/quotas/global/rpc/client.go b/common/quotas/global/rpc/client.go index d3f5f2c2307..ce1ee032a99 100644 --- a/common/quotas/global/rpc/client.go +++ b/common/quotas/global/rpc/client.go @@ -104,7 +104,7 @@ func New( resolver: resolver, thisHost: uuid.NewString(), // TODO: would descriptive be better? but it works, unique ensures correctness. logger: logger, - scope: met.Scope(metrics.FrontendGlobalRatelimiter), + scope: met.Scope(metrics.GlobalRatelimiter), } } diff --git a/service/history/handler/handler_test.go b/service/history/handler/handler_test.go index 37673078e81..617015d4a64 100644 --- a/service/history/handler/handler_test.go +++ b/service/history/handler/handler_test.go @@ -3619,12 +3619,15 @@ func TestRatelimitUpdate(t *testing.T) { }, }) require.NoError(t, err) - alg, err := algorithm.New(algorithm.Config{ - NewDataWeight: func(opts ...dynamicconfig.FilterOption) float64 { return 0.5 }, - UpdateInterval: func(opts ...dynamicconfig.FilterOption) time.Duration { return 3 * time.Second }, - DecayAfter: func(opts ...dynamicconfig.FilterOption) time.Duration { return 6 * time.Second }, - GcAfter: func(opts ...dynamicconfig.FilterOption) time.Duration { return time.Minute }, - }) + alg, err := algorithm.New( + metrics.NewNoopMetricsClient(), + algorithm.Config{ + NewDataWeight: func(opts ...dynamicconfig.FilterOption) float64 { return 0.5 }, + UpdateInterval: func(opts ...dynamicconfig.FilterOption) time.Duration { return 3 * time.Second }, + DecayAfter: func(opts ...dynamicconfig.FilterOption) time.Duration { return 6 * time.Second }, + GcAfter: func(opts ...dynamicconfig.FilterOption) time.Duration { return time.Minute }, + }, + ) require.NoError(t, err) h := &handlerImpl{ Resource: res, diff --git a/service/history/resource/resource.go b/service/history/resource/resource.go index 5f798a1db69..52171248065 100644 --- a/service/history/resource/resource.go +++ b/service/history/resource/resource.go @@ -133,12 +133,15 @@ func New( uint64(config.EventsCacheMaxSize()), serviceResource.GetDomainCache(), ) - ratelimitAlgorithm, err := algorithm.New(algorithm.Config{ - NewDataWeight: config.GlobalRatelimiterNewDataWeight, - UpdateInterval: config.GlobalRatelimiterUpdateInterval, - DecayAfter: config.GlobalRatelimiterDecayAfter, - GcAfter: config.GlobalRatelimiterGCAfter, - }) + ratelimitAlgorithm, err := algorithm.New( + params.MetricsClient, + algorithm.Config{ + NewDataWeight: config.GlobalRatelimiterNewDataWeight, + UpdateInterval: config.GlobalRatelimiterUpdateInterval, + DecayAfter: config.GlobalRatelimiterDecayAfter, + GcAfter: config.GlobalRatelimiterGCAfter, + }, + ) if err != nil { return nil, fmt.Errorf("invalid ratelimit algorithm config: %w", err) } From e65703bb53348271d0391877aa720cd6c1bb068e Mon Sep 17 00:00:00 2001 From: Steven L Date: Wed, 17 Jul 2024 18:36:42 -0500 Subject: [PATCH 2/3] switch to exponential histogram buckets --- common/metrics/defs.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 9d23e9dc75b..2f8c6219cda 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -3352,14 +3352,10 @@ var PersistenceLatencyBuckets = tally.DurationBuckets([]time.Duration{ // // this is intended for coarse scale checking, not alerting, so the buckets // should be considered unstable and can be changed whenever desired. -var GlobalRatelimiterUsageHistogram = tally.ValueBuckets{ - 0, // need an explicit 0 or zero is reported as 1 - 1, 2, 5, - 10, 25, 50, - 100, 250, 500, - 1000, 2500, 5000, - 10000, 25000, 50000, -} +var GlobalRatelimiterUsageHistogram = append( + tally.ValueBuckets{0}, // need an explicit 0 or zero is reported as 1 + tally.MustMakeExponentialValueBuckets(1, 2, 17)..., // 1..65536 +) // ErrorClass is an enum to help with classifying SLA vs. non-SLA errors (SLA = "service level agreement") type ErrorClass uint8 From 39c4a127a0c82b788bcfef43b0c3685d1b634afa Mon Sep 17 00:00:00 2001 From: Steven L Date: Wed, 17 Jul 2024 20:42:12 -0500 Subject: [PATCH 3/3] add a histogram test --- .../global/algorithm/requestweighted_test.go | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/common/quotas/global/algorithm/requestweighted_test.go b/common/quotas/global/algorithm/requestweighted_test.go index 26ce3ac2ea2..ea4ce944709 100644 --- a/common/quotas/global/algorithm/requestweighted_test.go +++ b/common/quotas/global/algorithm/requestweighted_test.go @@ -26,12 +26,14 @@ import ( "fmt" "math" "math/rand" + "strings" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" "go.uber.org/multierr" "golang.org/x/exp/maps" @@ -104,6 +106,97 @@ func newForTest(t require.TestingT, snap configSnapshot, validate bool) (*impl, return underlying, tick } +func TestEmitsMetrics(t *testing.T) { + t.Cleanup(func() { + if t.Failed() { + t.Log("This test is sensitive about bucket sizes, but they aren't actually important.") + t.Log("If bucket sizes have changed, just update the test to create enough data / use the new values, so each step is unique") + } + }) + + assertHistogramContents := func(name string, snap tally.HistogramSnapshot, expected map[float64]int64) { + t.Helper() + for bucket, val := range snap.Values() { // {bucket_boundary: value} + assert.Equal(t, expected[bucket], val, "bucket %v has unexpected value for histogram name %v", bucket, name) + } + } + assertAllHistogramContents := func(snap tally.Snapshot, contents map[string]map[float64]int64) { + t.Helper() + for _, data := range snap.Histograms() { // {"full.name+tags":{tags, values, etc}} + name := strings.TrimPrefix(data.Name(), "test.global_ratelimiter_") // common prefix for this test + exp, ok := contents[name] + if !ok { + // keys remain between snapshots, so they're a bit of a pain. + // values are zeroed by each snapshot though. + for bucket, val := range data.Values() { + assert.Zerof(t, val, "ignored key %v (trimmed: %v) in bucket %v has non-zero value, cannot be ignored", data.Name(), name, bucket) + } + } else { + assertHistogramContents(name, data, exp) + } + } + } + + agg, _ := newValid(t, defaultConfig(time.Second)) + ts := tally.NewTestScope("test", nil) + agg.scope = metrics.NewClient(ts, metrics.History).Scope(metrics.GlobalRatelimiterAggregator) + + h1, h2 := Identity("host 1"), Identity("host 2") + key := Limit("key") + + err := agg.Update(UpdateParams{ + ID: h1, + Load: map[Limit]Requests{key: {1, 1}}, + Elapsed: time.Second, + }) + require.NoError(t, err) + snap := ts.Snapshot() + assertAllHistogramContents(snap, map[string]map[float64]int64{ + "initialized": {1: 1}, // one key was created + "reinitialized": {0: 1}, + "updated": {0: 1}, + "decayed": {0: 1}, + }) + + err = agg.Update(UpdateParams{ + ID: h2, + Load: map[Limit]Requests{key: {1, 1}}, + Elapsed: time.Second, + }) + require.NoError(t, err) + snap = ts.Snapshot() + assertAllHistogramContents(snap, map[string]map[float64]int64{ + "initialized": {1: 1}, // keys are disjoint, so another key was created + "reinitialized": {0: 1}, + "updated": {0: 1}, + "decayed": {0: 1}, + }) + + err = agg.Update(UpdateParams{ + ID: h1, + Load: map[Limit]Requests{key: {1, 1}}, + Elapsed: time.Second, + }) + require.NoError(t, err) + snap = ts.Snapshot() + assertAllHistogramContents(snap, map[string]map[float64]int64{ + "initialized": {0: 1}, + "reinitialized": {0: 1}, + "updated": {1: 1}, // h1 was updated + "decayed": {0: 1}, + }) + + _, _, err = agg.HostWeights(h1, []Limit{key}) + require.NoError(t, err) + snap = ts.Snapshot() + assertAllHistogramContents(snap, map[string]map[float64]int64{ + "limits_queried": {1: 1}, // one limit exists + "host_limits_queried": {2: 1}, // two hosts have data for that limit + "removed_limits": {0: 1}, // none removed + "removed_host_limits": {0: 1}, // none removed + }) +} + func TestMissedUpdateHandling(t *testing.T) { agg, tick := newValid(t, configSnapshot{ weight: 0.1,