Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global-ratelimiter aggregator-side metrics #6171

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,10 @@ const (
// TaskValidatorScope is the metric for the taskvalidator's workflow check operation.
TaskValidatorScope

// FrontendGlobalRatelimiter is the metrics scope for frontend.GlobalRatelimiter
FrontendGlobalRatelimiter
// GlobalRatelimiter is the metrics scope for limiting-side common/quotas/global behavior
GlobalRatelimiter
// GlobalRatelimiterAggregator is the metrics scope for aggregator-side common/quotas/global behavior
GlobalRatelimiterAggregator

NumCommonScopes
)
Expand Down Expand Up @@ -1729,7 +1731,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HashringScope: {operation: "Hashring"},

// currently used by both frontend and history, but may grow to other limiting-host-services.
FrontendGlobalRatelimiter: {operation: "GlobalRatelimiter"},
GlobalRatelimiter: {operation: "GlobalRatelimiter"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using lower-case. m3 lowercase everything, and usage of CamelCase makes it harder to find metrics.

Copy link
Member Author

@Groxx Groxx Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly agree, lower-case is nicer for lots of things, but all of our "operation" tags are title-case at the moment. Not sure there's much benefit to breaking that long-held pattern.

is it just for code-grepping difficulty, or does it make metric-querying harder somehow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this is pretty easy to change if we do want to change things later, nothing we have now depends on this for monitoring/log search/etc)

GlobalRatelimiterAggregator: {operation: "GlobalRatelimiterAggregator"},
},
// Frontend Scope Names
Frontend: {
Expand Down Expand Up @@ -2204,14 +2207,24 @@ const (

AsyncRequestPayloadSize

// limiter-side metrics
GlobalRatelimiterStartupUsageHistogram
GlobalRatelimiterFailingUsageHistogram
GlobalRatelimiterGlobalUsageHistogram
GlobalRatelimiterUpdateLatency // time spent performing all Update requests, per batch attempt. ideally well below update interval.

GlobalRatelimiterUpdateLatency // time spent performing all Update requests, per batch attempt. ideally well below update interval.
GlobalRatelimiterAllowedRequestsCount // per key/type usage
GlobalRatelimiterRejectedRequestsCount // per key/type usage
GlobalRatelimiterQuota // per-global-key quota information, emitted when a key is in use
GlobalRatelimiterQuota // per-global-key quota information, emitted when a key is in us

// aggregator-side metrics
GlobalRatelimiterInitialized
GlobalRatelimiterReinitialized
GlobalRatelimiterUpdated
GlobalRatelimiterDecayed
GlobalRatelimiterLimitsQueried
GlobalRatelimiterHostLimitsQueried
GlobalRatelimiterRemovedLimits
GlobalRatelimiterRemovedHostLimits

NumCommonMetrics // Needs to be last on this list for iota numbering
)
Expand Down Expand Up @@ -2864,10 +2877,18 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
GlobalRatelimiterFailingUsageHistogram: {metricName: "global_ratelimiter_failing_usage_histogram", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterGlobalUsageHistogram: {metricName: "global_ratelimiter_global_usage_histogram", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterUpdateLatency: {metricName: "global_ratelimiter_update_latency", metricType: Timer},

GlobalRatelimiterAllowedRequestsCount: {metricName: "global_ratelimiter_allowed_requests", metricType: Counter},
GlobalRatelimiterRejectedRequestsCount: {metricName: "global_ratelimiter_rejected_requests", metricType: Counter},
GlobalRatelimiterQuota: {metricName: "global_ratelimiter_quota", metricType: Gauge},

GlobalRatelimiterInitialized: {metricName: "global_ratelimiter_initialized", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterReinitialized: {metricName: "global_ratelimiter_reinitialized", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterUpdated: {metricName: "global_ratelimiter_updated", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterDecayed: {metricName: "global_ratelimiter_decayed", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterLimitsQueried: {metricName: "global_ratelimiter_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterHostLimitsQueried: {metricName: "global_ratelimiter_host_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterRemovedLimits: {metricName: "global_ratelimiter_removed_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterRemovedHostLimits: {metricName: "global_ratelimiter_removed_host_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down Expand Up @@ -3327,15 +3348,14 @@ var PersistenceLatencyBuckets = tally.DurationBuckets([]time.Duration{
})

// GlobalRatelimiterUsageHistogram contains buckets for tracking how many ratelimiters are
// in which state (startup, healthy, failing).
var GlobalRatelimiterUsageHistogram = tally.ValueBuckets{
0, // need an explicit 0 to record zeros
1, 2, 5, 10,
25, 50, 100,
250, 500, 1000,
1250, 1500, 2000,
// TODO: almost certainly want more, but how many?
}
// in which various states (startup, healthy, failing, as well as aggregator-side quantities, deleted, etc).
//
// this is intended for coarse scale checking, not alerting, so the buckets
// should be considered unstable and can be changed whenever desired.
var GlobalRatelimiterUsageHistogram = append(
tally.ValueBuckets{0}, // need an explicit 0 or zero is reported as 1
tally.MustMakeExponentialValueBuckets(1, 2, 17)..., // 1..65536
)

// ErrorClass is an enum to help with classifying SLA vs. non-SLA errors (SLA = "service level agreement")
type ErrorClass uint8
Expand Down
56 changes: 29 additions & 27 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,38 @@ const (
goVersionTag = "go_version"
cadenceVersionTag = "cadence_version"

instance = "instance"
domain = "domain"
domainType = "domain_type"
clusterGroup = "cluster_group"
sourceCluster = "source_cluster"
targetCluster = "target_cluster"
activeCluster = "active_cluster"
taskList = "tasklist"
taskListType = "tasklistType"
workflowType = "workflowType"
activityType = "activityType"
decisionType = "decisionType"
invariantType = "invariantType"
shardScannerScanResult = "shardscanner_scan_result"
shardScannerFixResult = "shardscanner_fix_result"
kafkaPartition = "kafkaPartition"
transport = "transport"
caller = "caller"
service = "service"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
matchingHost = "matching_host"
host = "host"
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"
instance = "instance"
domain = "domain"
domainType = "domain_type"
clusterGroup = "cluster_group"
sourceCluster = "source_cluster"
targetCluster = "target_cluster"
activeCluster = "active_cluster"
taskList = "tasklist"
taskListType = "tasklistType"
workflowType = "workflowType"
activityType = "activityType"
decisionType = "decisionType"
invariantType = "invariantType"
shardScannerScanResult = "shardscanner_scan_result"
shardScannerFixResult = "shardscanner_fix_result"
kafkaPartition = "kafkaPartition"
transport = "transport"
caller = "caller"
service = "service"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
matchingHost = "matching_host"
host = "host"
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"
workflowTerminationReason = "workflow_termination_reason"

// limiter-side tags
globalRatelimitKey = "global_ratelimit_key"
globalRatelimitType = "global_ratelimit_type"
globalRatelimitCollectionName = "global_ratelimit_collection"
workflowTerminationReason = "workflow_termination_reason"

allValue = "all"
unknownValue = "_unknown_"
Expand Down
73 changes: 64 additions & 9 deletions common/quotas/global/algorithm/requestweighted.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/metrics"
)

type (
Expand All @@ -154,7 +155,8 @@ type (
impl struct {
// intentionally value-typed so caller cannot mutate the fields.
// manually copy data if this changes.
cfg Config
cfg Config
scope metrics.Scope

// mut protects usage, as it is the only mutable data
mut sync.Mutex
Expand Down Expand Up @@ -349,9 +351,10 @@ func (c configSnapshot) missedUpdateScalar(dataAge time.Duration) PerSecond {
//
// This instance is effectively single-threaded, but a small sharding wrapper should allow better concurrent
// throughput if needed (bound by CPU cores, as it's moderately CPU-costly).
func New(cfg Config) (RequestWeighted, error) {
func New(met metrics.Client, cfg Config) (RequestWeighted, error) {
i := &impl{
cfg: cfg,
scope: met.Scope(metrics.GlobalRatelimiterAggregator),
usage: make(map[Limit]map[Identity]requests, guessNumKeys), // start out relatively large

clock: clock.NewRealTimeSource(),
Expand All @@ -369,7 +372,10 @@ func (a *impl) Update(p UpdateParams) error {
return fmt.Errorf("bad args to update: %w", err)
}
a.mut.Lock()
defer a.mut.Unlock()
once := newOnce()
defer once.Do(a.mut.Unlock)

var initialized, reinitialized, updated, decayed int64

snap, err := a.snapshot()
if err != nil {
Expand All @@ -386,6 +392,7 @@ func (a *impl) Update(p UpdateParams) error {
aps := PerSecond(float64(req.Accepted) / float64(p.Elapsed/time.Second))
rps := PerSecond(float64(req.Rejected) / float64(p.Elapsed/time.Second))
if prev.lastUpdate.IsZero() {
initialized++
next = requests{
lastUpdate: snap.now,
accepted: aps, // no requests == 100% weight
Expand All @@ -394,15 +401,20 @@ func (a *impl) Update(p UpdateParams) error {
} else {
age := snap.now.Sub(prev.lastUpdate)
if snap.shouldGC(age) {
reinitialized++
// would have GC'd if we had seen it earlier, so it's the same as the zero state
next = requests{
lastUpdate: snap.now,
accepted: aps, // no requests == 100% weight
rejected: rps, // no requests == 100% weight
}
} else {
updated++
// compute the next rolling average step (`*reduce` simulates skipped updates)
reduce := snap.missedUpdateScalar(age)
if reduce < 1 {
decayed++
}
next = requests{
lastUpdate: snap.now,
// TODO: max(1, actual) so this does not lead to <1 rps allowances? or maybe just 1+actual and then reduce in used-responses?
Expand All @@ -418,14 +430,21 @@ func (a *impl) Update(p UpdateParams) error {
a.usage[key] = ih
}

once.Do(a.mut.Unlock) // don't hold the lock while emitting metrics

a.scope.RecordHistogramValue(metrics.GlobalRatelimiterInitialized, float64(initialized))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterReinitialized, float64(reinitialized))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterUpdated, float64(updated))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterDecayed, float64(decayed))

return nil
}

// getWeightsLocked returns the weights of observed hosts (based on ALL requests), and the total number of requests accepted per second.
func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Identity]HostWeight, usedRPS PerSecond) {
func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Identity]HostWeight, usedRPS PerSecond, met Metrics) {
ir := a.usage[key]
if len(ir) == 0 {
return nil, 0
return nil, 0, met
}

weights = make(map[Identity]HostWeight, len(ir))
Expand All @@ -436,6 +455,7 @@ func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Ide
if snap.shouldGC(age) {
// old, clean up
delete(ir, id)
met.RemovedHostLimits++
continue
}

Expand All @@ -445,25 +465,31 @@ func (a *impl) getWeightsLocked(key Limit, snap configSnapshot) (weights map[Ide
weights[id] = actual // populate with the reduced values so it doesn't have to be calculated again
total += actual // keep a running total to scale all values when done
usedRPS += reqs.accepted * reduce
met.HostLimits++
}

if len(ir) == 0 {
// completely empty Limit, gc it as well
delete(a.usage, key)
return nil, 0
met.RemovedLimits++
return nil, 0, met
}

for id := range ir {
// scale by the total.
// this also ensures all values are between 0 and 1 (inclusive)
weights[id] = weights[id] / total
}
return weights, usedRPS
met.Limits = 1
return weights, usedRPS, met
}

func (a *impl) HostWeights(host Identity, limits []Limit) (weights map[Limit]HostWeight, usedRPS map[Limit]PerSecond, err error) {
a.mut.Lock()
defer a.mut.Unlock()
once := newOnce()
defer once.Do(a.mut.Unlock)

var cumulative Metrics

weights = make(map[Limit]HostWeight, len(limits))
usedRPS = make(map[Limit]PerSecond, len(limits))
Expand All @@ -472,14 +498,28 @@ func (a *impl) HostWeights(host Identity, limits []Limit) (weights map[Limit]Hos
return nil, nil, err
}
for _, lim := range limits {
hosts, used := a.getWeightsLocked(lim, snap)
hosts, used, met := a.getWeightsLocked(lim, snap)

cumulative.Limits += met.Limits // always 1 or 0
cumulative.HostLimits += met.HostLimits
cumulative.RemovedLimits += met.RemovedLimits // always 0 or 1 (opposite Limits)
cumulative.RemovedHostLimits += met.RemovedHostLimits

if len(hosts) > 0 {
usedRPS[lim] = used // limit is known, has some usage
if weight, ok := hosts[host]; ok {
weights[lim] = weight // host has a known weight
}
}
}

once.Do(a.mut.Unlock) // don't hold the lock while emitting metrics

a.scope.RecordHistogramValue(metrics.GlobalRatelimiterLimitsQueried, float64(cumulative.Limits))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterHostLimitsQueried, float64(cumulative.HostLimits))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterRemovedLimits, float64(cumulative.RemovedLimits))
a.scope.RecordHistogramValue(metrics.GlobalRatelimiterRemovedHostLimits, float64(cumulative.RemovedHostLimits))

return weights, usedRPS, nil
}

Expand Down Expand Up @@ -538,3 +578,18 @@ func weighted[T numeric](newer, older T, weight float64) T {
type numeric interface {
constraints.Integer | constraints.Float
}

// non-sync version of sync.Once, for easier unlocking
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make it explicit in the comments there "Do NOT use this for concurrent cases"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is part of why I didn't make it a public type, yea. not intended for reuse.

type doOnce bool

func newOnce() *doOnce {
value := doOnce(false)
return &value
}

func (o *doOnce) Do(cb func()) {
if *o == false {
*o = true
cb()
}
}
Loading
Loading