Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sliding window and rt quantile metrics #2356

Merged
merged 3 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/vault/sdk v0.7.0
github.com/influxdata/tdigest v0.0.1
github.com/jinzhu/copier v0.3.5
github.com/knadh/koanf v1.5.0
github.com/kr/pretty v0.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
Expand Down Expand Up @@ -1627,8 +1629,10 @@ golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNq
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM=
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
Expand Down
6 changes: 6 additions & 0 deletions metrics/prometheus/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ const (
ipKey = constant.IpKey
methodKey = constant.MethodKey
versionKey = constant.VersionKey
)

const (
providerField = "provider"
consumerField = "consumer"

Expand All @@ -47,3 +49,7 @@ const (
processingField = "processing"
succeedField = "succeed"
)

var (
quantiles = []float64{0.5, 0.9, 0.95, 0.99}
)
17 changes: 17 additions & 0 deletions metrics/prometheus/metric_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package prometheus

import (
"fmt"
"strconv"
"strings"
)

Expand Down Expand Up @@ -49,6 +51,7 @@ type rpcCommonMetrics struct {
rtMillisecondsSum *prometheus.CounterVec
rtMillisecondsAvg *GaugeVecWithSyncMap
rtMillisecondsLast *prometheus.GaugeVec
rtMillisecondsQuantiles *quantileGaugeVec
}

type providerMetrics struct {
Expand All @@ -64,6 +67,7 @@ func (pm *providerMetrics) init(reporterConfig *metrics.ReporterConfig) {
pm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(providerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(providerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(providerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
pm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(providerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
}

type consumerMetrics struct {
Expand All @@ -79,8 +83,10 @@ func (cm *consumerMetrics) init(reporterConfig *metrics.ReporterConfig) {
cm.rtMillisecondsSum = newAutoCounterVec(buildMetricsName(consumerField, rtField, milliSecondsField, sumField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsAvg = newAutoGaugeVecWithSyncMap(buildMetricsName(consumerField, rtField, milliSecondsField, avgField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsLast = newAutoGaugeVec(buildMetricsName(consumerField, rtField, milliSecondsField, lastField), reporterConfig.Namespace, labelNames)
cm.rtMillisecondsQuantiles = newQuantileGaugeVec(buildRTQuantilesMetricsNames(consumerField, quantiles), reporterConfig.Namespace, labelNames, quantiles)
}

// buildMetricsName builds metrics name split by "_".
func buildMetricsName(args ...string) string {
sb := strings.Builder{}
for _, arg := range args {
Expand All @@ -90,3 +96,14 @@ func buildMetricsName(args ...string) string {
res := strings.TrimPrefix(sb.String(), "_")
return res
}

// buildRTQuantilesMetricsNames is only used for building rt quantiles metric names.
func buildRTQuantilesMetricsNames(role string, quantiles []float64) []string {
res := make([]string, 0, len(quantiles))
for _, q := range quantiles {
quantileField := fmt.Sprintf("p%v", strconv.FormatFloat(q*100, 'f', -1, 64))
name := buildMetricsName(role, rtField, milliSecondsField, quantileField)
res = append(res, name)
}
return res
}
47 changes: 46 additions & 1 deletion metrics/prometheus/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

import (
"dubbo.apache.org/dubbo-go/v3/metrics/util/aggregate"
)

func newHistogramVec(name, namespace string, labels []string) *prometheus.HistogramVec {
return prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -151,7 +155,7 @@ func newAutoSummaryVec(name, namespace string, labels []string, maxAge int64) *p

type GaugeVecWithSyncMap struct {
GaugeVec *prometheus.GaugeVec
SyncMap *sync.Map
SyncMap *sync.Map // key: labels, value: *atomic.Value
}

func newAutoGaugeVecWithSyncMap(name, namespace string, labels []string) *GaugeVecWithSyncMap {
Expand Down Expand Up @@ -253,3 +257,44 @@ func (gv *GaugeVecWithSyncMap) updateAvg(labels *prometheus.Labels, curValue int
}
}
}

type quantileGaugeVec struct {
gaugeVecSlice []*prometheus.GaugeVec
quantiles []float64
syncMap *sync.Map // key: labels string, value: TimeWindowQuantile
}

// Notice: names and quantiles should be the same length and same order.
func newQuantileGaugeVec(names []string, namespace string, labels []string, quantiles []float64) *quantileGaugeVec {
gvs := make([]*prometheus.GaugeVec, len(names))
for i, name := range names {
gvs[i] = newAutoGaugeVec(name, namespace, labels)
}
gv := &quantileGaugeVec{
gaugeVecSlice: gvs,
quantiles: quantiles,
syncMap: &sync.Map{},
}
return gv
}

func (gv *quantileGaugeVec) updateQuantile(labels *prometheus.Labels, curValue int64) {
key := convertLabelsToMapKey(*labels)
cur := aggregate.NewTimeWindowQuantile(100, 10, 120)
cur.Add(float64(curValue))

updateFunc := func(td *aggregate.TimeWindowQuantile) {
qs := td.Quantiles(gv.quantiles)
for i, q := range qs {
gv.gaugeVecSlice[i].With(*labels).Set(q)
}
}

if actual, loaded := gv.syncMap.LoadOrStore(key, cur); loaded {
store := actual.(*aggregate.TimeWindowQuantile)
store.Add(float64(curValue))
updateFunc(store)
} else {
updateFunc(cur)
}
}
2 changes: 2 additions & 0 deletions metrics/prometheus/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,13 @@ func (reporter *PrometheusReporter) reportRTMilliseconds(role string, labels *pr
go reporter.provider.rtMillisecondsMin.updateMin(labels, costMs)
go reporter.provider.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.provider.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.provider.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
case consumerField:
go reporter.consumer.rtMillisecondsLast.With(*labels).Set(float64(costMs))
go reporter.consumer.rtMillisecondsSum.With(*labels).Add(float64(costMs))
go reporter.consumer.rtMillisecondsMin.updateMin(labels, costMs)
go reporter.consumer.rtMillisecondsMax.updateMax(labels, costMs)
go reporter.consumer.rtMillisecondsAvg.updateAvg(labels, costMs)
go reporter.consumer.rtMillisecondsQuantiles.updateQuantile(labels, costMs)
}
}
42 changes: 42 additions & 0 deletions metrics/util/aggregate/pane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

// pane represents a window over a period of time.
// It uses interface{} to store any type of value.
type pane struct {
startInMs int64
endInMs int64
intervalInMs int64
value interface{}
}

func newPane(intervalInMs, startInMs int64, value interface{}) *pane {
return &pane{
startInMs: startInMs,
endInMs: startInMs + intervalInMs,
intervalInMs: intervalInMs,
value: value,
}
}

func (p *pane) resetTo(startInMs int64, value interface{}) {
p.startInMs = startInMs
p.endInMs = startInMs + p.intervalInMs
p.value = value
}
85 changes: 85 additions & 0 deletions metrics/util/aggregate/quantile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

import (
"sync"
"time"
)

import (
"github.com/influxdata/tdigest"
)

// TimeWindowQuantile wrappers sliding window around T-Digest.
//
// It uses T-Digest algorithm to calculate quantile.
// The window is divided into several panes, and each pane's value is a TDigest instance.
type TimeWindowQuantile struct {
compression float64
window *slidingWindow
mux sync.RWMutex
}

func NewTimeWindowQuantile(compression float64, paneCount int, timeWindowSeconds int64) *TimeWindowQuantile {
return &TimeWindowQuantile{
compression: compression,
window: newSlidingWindow(paneCount, timeWindowSeconds*1000),
}
}

// Quantile returns a quantile of the sliding window by merging all panes.
func (t *TimeWindowQuantile) Quantile(q float64) float64 {
return t.mergeTDigests().Quantile(q)
}

// Quantiles returns quantiles of the sliding window by merging all panes.
func (t *TimeWindowQuantile) Quantiles(qs []float64) []float64 {
td := t.mergeTDigests()

res := make([]float64, len(qs))
for i, q := range qs {
res[i] = td.Quantile(q)
}

return res
}

// mergeTDigests merges all panes' TDigests into one TDigest.
func (t *TimeWindowQuantile) mergeTDigests() *tdigest.TDigest {
t.mux.RLock()
defer t.mux.RUnlock()

td := tdigest.NewWithCompression(t.compression)
for _, v := range t.window.values(time.Now().UnixMilli()) {
td.AddCentroidList(v.(*tdigest.TDigest).Centroids())
}
return td
}

// Add adds a value to the sliding window's current pane.
func (t *TimeWindowQuantile) Add(value float64) {
t.mux.Lock()
defer t.mux.Unlock()

t.window.currentPane(time.Now().UnixMilli(), t.newEmptyValue).value.(*tdigest.TDigest).Add(value, 1)
}

func (t *TimeWindowQuantile) newEmptyValue() interface{} {
return tdigest.NewWithCompression(t.compression)
}
60 changes: 60 additions & 0 deletions metrics/util/aggregate/quantile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package aggregate

import "testing"

func TestAddAndQuantile(t1 *testing.T) {
timeWindowQuantile := NewTimeWindowQuantile(100, 10, 1)
for i := 1; i <= 100; i++ {
timeWindowQuantile.Add(float64(i))
}

type args struct {
q float64
}

tests := []struct {
name string
args args
want float64
}{
{
name: "Quantile: 0.01",
args: args{
q: 0.01,
},
want: 1.5,
},
{
name: "Quantile: 0.99",
args: args{
q: 0.99,
},
want: 99.5,
},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := timeWindowQuantile
if got := t.Quantile(tt.args.q); got != tt.want {
t1.Errorf("Quantile() = %v, want %v", got, tt.want)
}
})
}
}
Loading